class Chef::ChefFS::Parallelizer
Tries to balance several guarantees, in order of priority:
-
don't get deadlocked
-
provide results in desired order
-
provide results as soon as they are available
-
process input as soon as possible
Public Class Methods
new(num_threads)
click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 32 def initialize(num_threads) @tasks = Queue.new @threads = [] @stop_thread = {} resize(num_threads) end
parallel_do(enumerable, options = {}, &block)
click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 28 def self.parallel_do(enumerable, options = {}, &block) parallelizer.parallel_do(enumerable, options, &block) end
parallelize(enumerable, options = {}, &block)
click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 24 def self.parallelize(enumerable, options = {}, &block) parallelizer.parallelize(enumerable, options, &block) end
parallelizer()
click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 20 def self.parallelizer @@parallelizer ||= Parallelizer.new(@@threads) end
threads=(value)
click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 15 def self.threads=(value) @@threads = value @@parallelizer.resize(value) if @@parallelizer end
Public Instance Methods
kill()
click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 78 def kill @threads.each do |thread| Thread.kill(thread) @stop_thread.delete(thread) end @threads = [] end
num_threads()
click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 39 def num_threads @threads.size end
parallel_do(enumerable, options = {}, &block)
click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 47 def parallel_do(enumerable, options = {}, &block) ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait end
parallelize(enumerable, options = {}, &block)
click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 43 def parallelize(enumerable, options = {}, &block) ParallelEnumerable.new(@tasks, enumerable, options, &block) end
resize(to_threads, wait = true, timeout = nil)
click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 55 def resize(to_threads, wait = true, timeout = nil) if to_threads < num_threads threads_to_stop = @threads[to_threads..num_threads-1] @threads = @threads.slice(0, to_threads) threads_to_stop.each do |thread| @stop_thread[thread] = true end if wait start_time = Time.now threads_to_stop.each do |thread| thread_timeout = timeout ? timeout - (Time.now - start_time) : nil thread.join(thread_timeout) end end else num_threads.upto(to_threads - 1) do |i| @threads[i] = Thread.new(&method(:worker_loop)) end end end
stop(wait = true, timeout = nil)
click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 51 def stop(wait = true, timeout = nil) resize(0, wait, timeout) end
Private Instance Methods
worker_loop()
click to toggle source
# File lib/chef/chef_fs/parallelizer.rb, line 88 def worker_loop begin while !@stop_thread[Thread.current] begin task = @tasks.pop task.call rescue puts "ERROR #{$!}" puts $!.backtrace end end ensure @stop_thread.delete(Thread.current) end end