class Celluloid::Supervision::Container::Pool
Manages a fixed-size pool of actors Delegates work (i.e. methods) and supervises actors Don't use this class directly. Instead use MyKlass.pool
Attributes
actors[R]
size[R]
Public Class Methods
new(options={})
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 15 def initialize(options={}) @idle = [] @busy = [] @klass = options[:actors] @actors = Set.new @mutex = Mutex.new @size = options[:size] || [Celluloid.cores || 2, 2].max @args = options[:args] ? Array(options[:args]) : [] # Do this last since it can suspend and/or crash @idle = @size.times.map { __spawn_actor__ } end
pooling_options(config={}, mixins={})
click to toggle source
# File lib/celluloid/supervision/container/behavior/pool.rb, line 50 def pooling_options(config={}, mixins={}) combined = {type: Celluloid::Supervision::Container::Pool}.merge(config).merge(mixins) combined[:args] = [[:block, :actors, :size, :args].inject({}) { |e, p| e[p] = combined.delete(p) if combined[p]; e }] combined end
Public Instance Methods
__busy()
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 121 def __busy @mutex.synchronize { @busy } end
__busy?(actor)
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 117 def __busy?(actor) @mutex.synchronize { @busy.include? actor } end
__crash_handler__(actor, reason)
click to toggle source
Spawn a new worker for every crashed one
# File lib/celluloid/supervision/container/pool.rb, line 160 def __crash_handler__(actor, reason) @busy.delete actor @idle.delete actor @actors.delete actor return unless reason @idle << __spawn_actor__ signal :respawn_complete end
__idle()
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 125 def __idle @mutex.synchronize { @idle } end
__idle?(actor)
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 113 def __idle?(actor) @mutex.synchronize { @idle.include? actor } end
__provision_actor__()
click to toggle source
Provision a new actor ( take it out of idle, move it into busy, and avail it )
# File lib/celluloid/supervision/container/pool.rb, line 144 def __provision_actor__ Task.current.guard_warnings = true @mutex.synchronize do while @idle.empty? # Wait for responses from one of the busy actors response = exclusive { receive { |msg| msg.is_a?(Internals::Response) } } Thread.current[:celluloid_actor].handle_message(response) end actor = @idle.shift @busy << actor actor end end
__shutdown__()
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 29 def __shutdown__ return unless defined?(@actors) && @actors # TODO: these can be nil if initializer crashes terminators = @actors.map do |actor| begin actor.future(:terminate) rescue DeadActorError end end terminators.compact.each { |terminator| terminator.value rescue nil } end
__spawn_actor__()
click to toggle source
Instantiate an actor, add it to the actor Set, and return it
# File lib/celluloid/supervision/container/pool.rb, line 136 def __spawn_actor__ actor = @klass.new_link(*@args) @mutex.synchronize { @actors.add(actor) } @actors.add(actor) actor end
__state(actor)
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 129 def __state(actor) return :busy if __busy?(actor) return :idle if __idle?(actor) :missing end
_send_(method, *args, &block)
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 42 def _send_(method, *args, &block) actor = __provision_actor__ begin actor._send_ method, *args, &block rescue DeadActorError # if we get a dead actor out of the pool wait :respawn_complete actor = __provision_actor__ retry rescue ::Exception => ex abort ex ensure if actor.alive? @idle << actor @busy.delete actor # Broadcast that actor is done processing and # waiting idle signal :actor_idle end end end
busy_size()
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 105 def busy_size @mutex.synchronize { @busy.length } end
idle_size()
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 109 def idle_size @mutex.synchronize { @idle.length } end
inspect()
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 84 def inspect _send_ :inspect end
is_a?(klass)
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 68 def is_a?(klass) _send_ :is_a?, klass end
kind_of?(klass)
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 72 def kind_of?(klass) _send_ :kind_of?, klass end
method(meth)
click to toggle source
Since Pool allocates worker objects only just before calling them, we can still help Celluloid::Call detect passing invalid parameters to async methods by checking for those methods on the worker class
Calls superclass method
# File lib/celluloid/supervision/container/pool.rb, line 202 def method(meth) super rescue NameError @klass.instance_method(meth.to_sym) end
method_missing(method, *args, &block)
click to toggle source
Calls superclass method
# File lib/celluloid/supervision/container/pool.rb, line 191 def method_missing(method, *args, &block) if respond_to?(method) _send_ method, *args, &block else super end end
methods(include_ancestors = true)
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 76 def methods(include_ancestors = true) _send_ :methods, include_ancestors end
name()
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 64 def name _send_ @mailbox, :name end
respond_to?(meth, include_private = false)
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 169 def respond_to?(meth, include_private = false) # NOTE: use method() here since this class # shouldn't be used directly, and method() is less # likely to be "reimplemented" inconsistently # with other Object.*method* methods. found = method(meth) if include_private found ? true : false else if found.is_a?(UnboundMethod) found.owner.public_instance_methods.include?(meth) || found.owner.protected_instance_methods.include?(meth) else found.receiver.public_methods.include?(meth) || found.receiver.protected_methods.include?(meth) end end rescue NameError false end
size=(new_size)
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 88 def size=(new_size) new_size = [0, new_size].max if new_size > size delta = new_size - size delta.times { @idle << __spawn_actor__ } else (size - new_size).times do actor = __provision_actor__ unlink actor @busy.delete actor @actors.delete actor actor.terminate end end @size = new_size end
to_s()
click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 80 def to_s _send_ :to_s end