class Dynflow::Executors::Parallel::RunningStepsManager

Handles the events generated while running actions, makes sure the events are sent to the action only when in suspended state

Public Class Methods

new(world) click to toggle source
# File lib/dynflow/executors/parallel/running_steps_manager.rb, line 10
def initialize(world)
  @world         = Type! world, World
  @running_steps = {}
  @events        = WorkQueue.new(Integer, Work)
end

Public Instance Methods

add(step, work) click to toggle source
# File lib/dynflow/executors/parallel/running_steps_manager.rb, line 25
def add(step, work)
  Type! step, ExecutionPlan::Steps::RunStep
  @running_steps[step.id] = step
  # we make sure not to run any event when the step is still being executed
  @events.push(step.id, work)
  self
end
done(step) click to toggle source

@returns [Work, nil]

# File lib/dynflow/executors/parallel/running_steps_manager.rb, line 34
def done(step)
  Type! step, ExecutionPlan::Steps::RunStep
  @events.shift(step.id).tap do |work|
    work.event.result.success true if Work::Event === work
  end

  if step.state == :suspended
    return true, @events.first(step.id)
  else
    while (event = @events.shift(step.id))
      message = "step #{step.execution_plan_id}:#{step.id} dropping event #{event.event}"
      @world.logger.warn message
      event.event.result.fail UnprocessableEvent.new(message).
                                  tap { |e| e.set_backtrace(caller) }
    end
    raise 'assert' unless @events.empty?(step.id)
    @running_steps.delete(step.id)
    return false, nil
  end
end
event(event) click to toggle source

@returns [Work, nil]

# File lib/dynflow/executors/parallel/running_steps_manager.rb, line 63
def event(event)
  Type! event, Parallel::Event

  step = @running_steps[event.step_id]
  unless step
    event.result.fail UnprocessableEvent.new(
                          'step is not suspended, it cannot process events')
    return nil
  end

  can_run_event = @events.empty?(step.id)
  work          = Work::Event[step, event.execution_plan_id, event]
  @events.push(step.id, work)
  work if can_run_event
end
terminate() click to toggle source
# File lib/dynflow/executors/parallel/running_steps_manager.rb, line 16
def terminate
  pending_work = @events.clear.values.flatten(1)
  pending_work.each do |w|
    if Work::Event === w
      w.event.result.fail UnprocessableEvent.new("dropping due to termination")
    end
  end
end
try_to_terminate() click to toggle source
# File lib/dynflow/executors/parallel/running_steps_manager.rb, line 55
def try_to_terminate
  @running_steps.delete_if do |_, step|
    step.state != :running
  end
  return @running_steps.empty?
end