class Dynflow::Executors::Parallel::Core
Attributes
logger[R]
Public Class Methods
new(world, pool_size)
click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 8 def initialize(world, pool_size) @logger = world.logger @world = Type! world, World @pool = Pool.spawn('pool', reference, pool_size, world.transaction_adapter) @execution_plan_managers = {} @plan_ids_in_rescue = Set.new @terminated = nil end
Public Instance Methods
finish_step(step)
click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 39 def finish_step(step) update_manager(step) end
finish_termination()
click to toggle source
Calls superclass method
Dynflow::Actor#finish_termination
# File lib/dynflow/executors/parallel/core.rb, line 55 def finish_termination unless @execution_plan_managers.empty? logger.error "... cleaning #{@execution_plan_managers.size} execution plans ..." begin @execution_plan_managers.values.each do |manager| manager.terminate end rescue Errors::PersistenceError logger.error "could not to clean the data properly" end @execution_plan_managers.values.each do |manager| finish_plan(manager.execution_plan.id) end end logger.error '... core terminated.' super end
handle_event(event)
click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 21 def handle_event(event) Type! event, Parallel::Event if terminating? raise Dynflow::Error, "cannot accept event: #{event} core is terminating" end execution_plan_manager = @execution_plan_managers[event.execution_plan_id] if execution_plan_manager feed_pool execution_plan_manager.event(event) true else raise Dynflow::Error, "no manager for #{event.inspect}" end rescue Dynflow::Error => e event.result.fail e.message raise e end
handle_execution(execution_plan_id, finished)
click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 17 def handle_execution(execution_plan_id, finished) start_executing track_execution_plan(execution_plan_id, finished) end
handle_persistence_error(error)
click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 43 def handle_persistence_error(error) logger.fatal "PersistenceError in executor: terminating" logger.fatal error @world.terminate end
start_termination(*args)
click to toggle source
Calls superclass method
Dynflow::Actor#start_termination
# File lib/dynflow/executors/parallel/core.rb, line 49 def start_termination(*args) super logger.info 'shutting down Core ...' @pool.tell([:start_termination, Concurrent.future]) end
Private Instance Methods
continue_manager(manager, next_work)
click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 114 def continue_manager(manager, next_work) if manager.done? finish_plan manager.execution_plan.id else feed_pool next_work end end
feed_pool(work_items)
click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 140 def feed_pool(work_items) return if terminating? Type! work_items, Array, Work, NilClass return if work_items.nil? work_items = [work_items] if work_items.is_a? Work work_items.all? { |i| Type! i, Work } work_items.each { |new_work| @pool.tell([:schedule_work, new_work]) } end
finish_plan(execution_plan_id)
click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 149 def finish_plan(execution_plan_id) manager = @execution_plan_managers.delete(execution_plan_id) if rescue?(manager) rescue!(manager) else set_future(manager) end end
on_message(message)
click to toggle source
Calls superclass method
Dynflow::MethodicActor#on_message
# File lib/dynflow/executors/parallel/core.rb, line 75 def on_message(message) super rescue Errors::PersistenceError => e self.tell(:handle_persistence_error, e) end
rescue!(manager)
click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 128 def rescue!(manager) # TODO: after moving to concurrent-ruby actors, there should be better place # to put this logic of making sure we don't run rescues in endless loop @plan_ids_in_rescue << manager.execution_plan.id rescue_plan_id = manager.execution_plan.rescue_plan_id if rescue_plan_id reference.tell([:handle_execution, rescue_plan_id, manager.future]) else set_future(manager) end end
rescue?(manager)
click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 122 def rescue?(manager) return false if terminating? @world.auto_rescue && manager.execution_plan.state == :paused && !@plan_ids_in_rescue.include?(manager.execution_plan.id) end
set_future(manager)
click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 158 def set_future(manager) @plan_ids_in_rescue.delete(manager.execution_plan.id) manager.future.success manager.execution_plan end
start_executing(manager)
click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 163 def start_executing(manager) return if manager.nil? Type! manager, ExecutionPlanManager next_work = manager.start continue_manager manager, next_work end
track_execution_plan(execution_plan_id, finished)
click to toggle source
@return
# File lib/dynflow/executors/parallel/core.rb, line 82 def track_execution_plan(execution_plan_id, finished) execution_plan = @world.persistence.load_execution_plan(execution_plan_id) if terminating? raise Dynflow::Error, "cannot accept execution_plan_id:#{execution_plan_id} core is terminating" end if @execution_plan_managers[execution_plan_id] raise Dynflow::Error, "cannot execute execution_plan_id:#{execution_plan_id} it's already running" end if execution_plan.state == :stopped raise Dynflow::Error, "cannot execute execution_plan_id:#{execution_plan_id} it's stopped" end @execution_plan_managers[execution_plan_id] = ExecutionPlanManager.new(@world, execution_plan, finished) rescue Dynflow::Error => e finished.fail e nil end
update_manager(finished_work)
click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 108 def update_manager(finished_work) manager = @execution_plan_managers[finished_work.execution_plan_id] next_work = manager.what_is_next(finished_work) continue_manager manager, next_work end