class Chef::ChefFS::Parallelizer::ParallelEnumerable
Attributes
Public Class Methods
options: :ordered [true|false] - whether the output should stay in the same order
as the input (even though it may not actually be processed in that order). Default: true
:stop_on_exception [true|false] - if true, when an exception occurs in either
input or output, we wait for any outstanding processing to complete, but will not process any new inputs. Default: false
:main_thread_processing [true|false] - whether the main thread pulling
on each() is allowed to process inputs. Default: true NOTE: If you set this to false, parallelizer.kill will stop each() in its tracks, so you need to know for sure that won't happen.
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 20 def initialize(parent_task_queue, input_enumerable, options = {}, &block) @parent_task_queue = parent_task_queue @input_enumerable = input_enumerable @options = options @block = block @unconsumed_input = Queue.new @in_process = {} @unconsumed_output = Queue.new end
Public Instance Methods
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 87 def count(*args, &block) if args.size == 0 && block.nil? @input_enumerable.count else original_count(*args, &block) end end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 103 def drop(n) restricted_copy(@input_enumerable.drop(n)).to_a end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 36 def each each_with_input do |output, index, input, type| yield output end end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 64 def each_with_exceptions(&block) if @options[:ordered] == false each_with_exceptions_unordered(&block) else each_with_exceptions_ordered(&block) end end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 42 def each_with_index each_with_input do |output, index, input| yield output, index end end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 48 def each_with_input exception = nil each_with_exceptions do |output, index, input, type| if type == :exception if @options[:ordered] == false exception ||= output else raise output end else yield output, index, input end end raise exception if exception end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 95 def first(n=nil) if n restricted_copy(@input_enumerable.first(n)).to_a else first(1)[0] end end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 107 def flatten(levels = nil) FlattenEnumerable.new(self, levels) end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 139 def lazy RestrictedLazy.new(self, original_lazy) end
Enumerable methods
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 81 def restricted_copy(enumerable) ParallelEnumerable.new(@parent_task_queue, enumerable, @options, &@block) end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 111 def take(n) restricted_copy(@input_enumerable.take(n)).to_a end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 72 def wait exception = nil each_with_exceptions_unordered do |output, index, input, type| exception ||= output if type == :exception end raise exception if exception end
Private Instance Methods
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 211 def each_with_exceptions_ordered next_to_yield = 0 unconsumed = {} each_with_exceptions_unordered do |output, index, input, type| unconsumed[index] = [ output, input, type ] while unconsumed[next_to_yield] input_output = unconsumed.delete(next_to_yield) yield input_output[0], next_to_yield, input_output[1], input_output[2] next_to_yield += 1 end end input_exception = unconsumed.delete(nil) if input_exception yield input_exception[0], next_to_yield, input_exception[1], input_exception[2] end end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 146 def each_with_exceptions_unordered if @each_running raise "each() called on parallel enumerable twice simultaneously! Bad mojo" end @each_running = true begin # Grab all the inputs, yielding any responses during enumeration # in case the enumeration itself takes time begin @input_enumerable.each_with_index do |input, index| @unconsumed_input.push([ input, index ]) @parent_task_queue.push(method(:process_one)) stop_processing_input = false while !@unconsumed_output.empty? output, index, input, type = @unconsumed_output.pop yield output, index, input, type if type == :exception && @options[:stop_on_exception] stop_processing_input = true break end end if stop_processing_input break end end rescue # We still want to wait for the rest of the outputs to process @unconsumed_output.push([$!, nil, nil, :exception]) if @options[:stop_on_exception] @unconsumed_input.clear end end while !finished? # yield thread to others (for 1.8.7) if @unconsumed_output.empty? sleep(0.01) end while !@unconsumed_output.empty? yield @unconsumed_output.pop end # If no one is working on our tasks and we're allowed to # work on them in the main thread, process an input to # move things forward. if @in_process.size == 0 && !(@options[:main_thread_processing] == false) process_one end end rescue # If we exited early, perhaps due to any? finding a result, we want # to make sure and throw away any extra results (gracefully) so that # the next enumerator can start over. if !finished? stop end raise ensure @each_running = false end end
This is thread safe only if called from the main thread pulling on each(). The order of these checks is important, as well, to be thread safe.
-
If @unconsumed_input.empty? is true, then we will never have any more
work legitimately picked up.
-
If @in_process == 0, then there is no work in process, and because ofwhen unconsumed_input is empty, it will never go back up, because
this is called after the input enumerator is finished. Note that switching #2 and #1 could cause a race, because in_process is incremented before consuming input.
-
If @unconsumed_output.empty? is true, then we are done with outputs.
Thus, 1+2 means no more output will ever show up, and 3 means we've passed all existing outputs to the user.
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 248 def finished? @unconsumed_input.empty? && @in_process.size == 0 && @unconsumed_output.empty? end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 265 def process_input(input, index) begin output = @block.call(input) @unconsumed_output.push([ output, index, input, :result ]) rescue if @options[:stop_on_exception] @unconsumed_input.clear end @unconsumed_output.push([ $!, index, input, :exception ]) end index end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 252 def process_one @in_process[Thread.current] = true begin begin input, index = @unconsumed_input.pop(true) process_input(input, index) rescue ThreadError end ensure @in_process.delete(Thread.current) end end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 228 def stop @unconsumed_input.clear while @in_process.size > 0 sleep(0.05) end @unconsumed_output.clear end