class Chef::ChefFS::Parallelizer

Tries to balance several guarantees, in order of priority:

Public Class Methods

new(num_threads) click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 32
def initialize(num_threads)
  @tasks = Queue.new
  @threads = []
  @stop_thread = {}
  resize(num_threads)
end
parallel_do(enumerable, options = {}, &block) click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 28
def self.parallel_do(enumerable, options = {}, &block)
  parallelizer.parallel_do(enumerable, options, &block)
end
parallelize(enumerable, options = {}, &block) click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 24
def self.parallelize(enumerable, options = {}, &block)
  parallelizer.parallelize(enumerable, options, &block)
end
parallelizer() click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 20
def self.parallelizer
  @@parallelizer ||= Parallelizer.new(@@threads)
end
threads=(value) click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 15
def self.threads=(value)
  @@threads = value
  @@parallelizer.resize(value) if @@parallelizer
end

Public Instance Methods

kill() click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 78
def kill
  @threads.each do |thread|
    Thread.kill(thread)
    @stop_thread.delete(thread)
  end
  @threads = []
end
num_threads() click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 39
def num_threads
  @threads.size
end
parallel_do(enumerable, options = {}, &block) click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 47
def parallel_do(enumerable, options = {}, &block)
  ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait
end
parallelize(enumerable, options = {}, &block) click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 43
def parallelize(enumerable, options = {}, &block)
  ParallelEnumerable.new(@tasks, enumerable, options, &block)
end
resize(to_threads, wait = true, timeout = nil) click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 55
def resize(to_threads, wait = true, timeout = nil)
  if to_threads < num_threads
    threads_to_stop = @threads[to_threads..num_threads-1]
    @threads = @threads.slice(0, to_threads)
    threads_to_stop.each do |thread|
      @stop_thread[thread] = true
    end

    if wait
      start_time = Time.now
      threads_to_stop.each do |thread|
        thread_timeout = timeout ? timeout - (Time.now - start_time) : nil
        thread.join(thread_timeout)
      end
    end

  else
    num_threads.upto(to_threads - 1) do |i|
      @threads[i] = Thread.new(&method(:worker_loop))
    end
  end
end
stop(wait = true, timeout = nil) click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 51
def stop(wait = true, timeout = nil)
  resize(0, wait, timeout)
end

Private Instance Methods

worker_loop() click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 88
def worker_loop
  begin
    while !@stop_thread[Thread.current]
      begin
        task = @tasks.pop
        task.call
      rescue
        puts "ERROR #{$!}"
        puts $!.backtrace
      end
    end
  ensure
    @stop_thread.delete(Thread.current)
  end
end