class Dynflow::Executors::Parallel::RunningStepsManager
Handles the events generated while running actions, makes sure the events are sent to the action only when in suspended state
Public Class Methods
new(world)
click to toggle source
# File lib/dynflow/executors/parallel/running_steps_manager.rb, line 10 def initialize(world) @world = Type! world, World @running_steps = {} @events = WorkQueue.new(Integer, Work) end
Public Instance Methods
add(step, work)
click to toggle source
# File lib/dynflow/executors/parallel/running_steps_manager.rb, line 25 def add(step, work) Type! step, ExecutionPlan::Steps::RunStep @running_steps[step.id] = step # we make sure not to run any event when the step is still being executed @events.push(step.id, work) self end
done(step)
click to toggle source
@returns [Work, nil]
# File lib/dynflow/executors/parallel/running_steps_manager.rb, line 34 def done(step) Type! step, ExecutionPlan::Steps::RunStep @events.shift(step.id).tap do |work| work.event.result.success true if Work::Event === work end if step.state == :suspended return true, @events.first(step.id) else while (event = @events.shift(step.id)) message = "step #{step.execution_plan_id}:#{step.id} dropping event #{event.event}" @world.logger.warn message event.event.result.fail UnprocessableEvent.new(message). tap { |e| e.set_backtrace(caller) } end raise 'assert' unless @events.empty?(step.id) @running_steps.delete(step.id) return false, nil end end
event(event)
click to toggle source
@returns [Work, nil]
# File lib/dynflow/executors/parallel/running_steps_manager.rb, line 63 def event(event) Type! event, Parallel::Event step = @running_steps[event.step_id] unless step event.result.fail UnprocessableEvent.new( 'step is not suspended, it cannot process events') return nil end can_run_event = @events.empty?(step.id) work = Work::Event[step, event.execution_plan_id, event] @events.push(step.id, work) work if can_run_event end
terminate()
click to toggle source
# File lib/dynflow/executors/parallel/running_steps_manager.rb, line 16 def terminate pending_work = @events.clear.values.flatten(1) pending_work.each do |w| if Work::Event === w w.event.result.fail UnprocessableEvent.new("dropping due to termination") end end end
try_to_terminate()
click to toggle source
# File lib/dynflow/executors/parallel/running_steps_manager.rb, line 55 def try_to_terminate @running_steps.delete_if do |_, step| step.state != :running end return @running_steps.empty? end