module Dynflow::Action::WithSubPlans
Constants
- SubPlanFinished
Public Instance Methods
calculate_time_distribution(count)
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 85 def calculate_time_distribution(count) time = input[:concurrency_control][:time] unless time.nil? || time.is_a?(Hash) # Assume concurrency level 1 unless stated otherwise level = input[:concurrency_control].fetch(:level, {}).fetch(:free, 1) semaphore = ::Dynflow::Semaphores::Stateful.new(nil, level, :interval => time.to_f / (count * level), :time_span => time) input[:concurrency_control][:time] = semaphore.to_hash end end
cancel!()
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 65 def cancel! @world.throttle_limiter.cancel!(execution_plan_id) sub_plans('state' => 'running').each(&:cancel) suspend end
check_for_errors!()
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 205 def check_for_errors! fail "A sub task failed" if output[:failed_count] > 0 end
counts_set?()
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 201 def counts_set? output[:total_count] && output[:success_count] && output[:failed_count] && output[:pending_count] end
create_sub_plans()
click to toggle source
@abstract when the logic for the initiation of the subtasks
is different from the default one.
@returns a triggered task or array of triggered tasks @example
def create_sub_plans trigger(MyAction, "Hello") end
@example
def create_sub_plans [trigger(MyAction, "Hello 1"), trigger(MyAction, "Hello 2")] end
# File lib/dynflow/action/with_sub_plans.rb, line 57 def create_sub_plans raise NotImplementedError end
distribute_over_time(time_span)
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 97 def distribute_over_time(time_span) input[:concurrency_control] ||= {} input[:concurrency_control][:time] = time_span end
done?()
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 166 def done? if counts_set? output[:total_count] - output[:success_count] - output[:failed_count] <= 0 else false end end
initiate()
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 28 def initiate sub_plans = create_sub_plans sub_plans = Array[sub_plans] unless sub_plans.is_a? Array if uses_concurrency_control planned, failed = sub_plans.partition { |plan| plan.state == :planned } calculate_time_distribution sub_plans.count sub_plans = world.throttle_limiter.handle_plans!(execution_plan_id, planned.map(&:id), failed.map(&:id), input[:concurrency_control]) end wait_for_sub_plans sub_plans end
limit_concurrency_level(level)
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 80 def limit_concurrency_level(level) input[:concurrency_control] ||= {} input[:concurrency_control][:level] = ::Dynflow::Semaphores::Stateful.new(level).to_hash end
mark_as_done(plan_id, success)
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 157 def mark_as_done(plan_id, success) if success output[:success_count] += 1 else output[:failed_count] += 1 end output[:pending_count] -= 1 end
notify_on_finish(plans)
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 147 def notify_on_finish(plans) suspend do |suspended_action| plans.each do |plan| plan.finished.on_completion! do |success, value| suspended_action << SubPlanFinished[plan.id, success && (value.result == :success)] end end end end
on_finish()
click to toggle source
@api method to be called after all the sub tasks finished
# File lib/dynflow/action/with_sub_plans.rb, line 62 def on_finish end
recalculate_counts()
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 182 def recalculate_counts output.update(total_count: 0, failed_count: 0, success_count: 0, pending_count: 0) sub_plans.each do |sub_plan| output[:total_count] += 1 if sub_plan.state == :stopped if sub_plan.error? output[:failed_count] += 1 else output[:success_count] += 1 end else output[:pending_count] += 1 end end end
resume()
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 133 def resume if sub_plans.all? { |sub_plan| sub_plan.error_in_plan? } initiate else recalculate_counts try_to_finish or fail "Some sub plans are still not finished" end end
run(event = nil)
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 10 def run(event = nil) match event, (on nil do if output[:total_count] resume else initiate end end), (on SubPlanFinished do mark_as_done(event.execution_plan_id, event.success) try_to_finish or suspend end), (on Action::Cancellable::Cancel do cancel! end) end
run_progress()
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 174 def run_progress if counts_set? && output[:total_count] > 0 (output[:success_count] + output[:failed_count]).to_f / output[:total_count] else 0.1 end end
sub_plans(filter = {})
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 142 def sub_plans(filter = {}) @sub_plans ||= world.persistence.find_execution_plans(filters: { 'caller_execution_plan_id' => execution_plan_id, 'caller_action_id' => self.id }.merge(filter) ) end
trigger(*args)
click to toggle source
Helper for creating sub plans
# File lib/dynflow/action/with_sub_plans.rb, line 72 def trigger(*args) if uses_concurrency_control world.plan_with_caller(self, *args) else world.trigger { world.plan_with_caller(self, *args) } end end
try_to_finish()
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 123 def try_to_finish if done? check_for_errors! on_finish return true else return false end end
uses_concurrency_control()
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 209 def uses_concurrency_control @uses_concurrency_control = input.key? :concurrency_control end
wait_for_sub_plans(sub_plans)
click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 102 def wait_for_sub_plans(sub_plans) output.update(total_count: 0, failed_count: 0, success_count: 0, pending_count: 0) planned, failed = sub_plans.partition(&:planned?) sub_plan_ids = (planned + failed).map(&:execution_plan_id) output[:total_count] = sub_plan_ids.size output[:failed_count] = failed.size output[:pending_count] = planned.size if planned.any? notify_on_finish(planned) else check_for_errors! end end