class Dynflow::Executors::Parallel::Pool
Public Class Methods
new(core, pool_size, transaction_adapter)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 38 def initialize(core, pool_size, transaction_adapter) @executor_core = core @pool_size = pool_size @free_workers = Array.new(pool_size) { |i| Worker.spawn("worker-#{i}", reference, transaction_adapter) } @jobs = JobStorage.new end
Public Instance Methods
handle_persistence_error(error)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 56 def handle_persistence_error(error) @executor_core.tell(:handle_persistence_error, error) end
schedule_work(work)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 45 def schedule_work(work) @jobs.add work distribute_jobs end
start_termination(*args)
click to toggle source
Calls superclass method
Dynflow::Actor#start_termination
# File lib/dynflow/executors/parallel/pool.rb, line 60 def start_termination(*args) super try_to_terminate end
worker_done(worker, step)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 50 def worker_done(worker, step) @executor_core.tell([:finish_step, step]) @free_workers << worker distribute_jobs end
Private Instance Methods
distribute_jobs()
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 75 def distribute_jobs try_to_terminate @free_workers.pop << @jobs.pop until @free_workers.empty? || @jobs.empty? end
try_to_terminate()
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 67 def try_to_terminate if terminating? && @free_workers.size == @pool_size @free_workers.map { |worker| worker.ask(:terminate!) }.map(&:wait) @executor_core.tell(:finish_termination) finish_termination end end