Object
Actors are Celluloid's concurrency primitive. They're implemented as normal Ruby objects wrapped in threads which communicate with asynchronous messages.
Obtain all running actors in the system
# File lib/celluloid/actor.rb, line 78 def all actors = [] Thread.list.each do |t| next unless t.celluloid? && t.role == :actor actors << t.actor.proxy if t.actor && t.actor.respond_to?(:proxy) end actors end
Invoke a method asynchronously on an actor via its mailbox
# File lib/celluloid/actor.rb, line 66 def async(mailbox, meth, *args, &block) proxy = AsyncProxy.new(mailbox, "UnknownClass") proxy.method_missing(meth, *args, &block) end
Invoke a method on the given actor via its mailbox
# File lib/celluloid/actor.rb, line 60 def call(mailbox, meth, *args, &block) proxy = SyncProxy.new(mailbox, "UnknownClass") proxy.method_missing(meth, *args, &block) end
# File lib/celluloid/actor.rb, line 41 def clear_registry Registry.root.clear end
Obtain the current actor
# File lib/celluloid/actor.rb, line 46 def current actor = Thread.current[:celluloid_actor] raise NotActorError, "not in actor scope" unless actor actor.proxy end
Call a method asynchronously and retrieve its value later
# File lib/celluloid/actor.rb, line 72 def future(mailbox, meth, *args, &block) proxy = FutureProxy.new(mailbox, "UnknownClass") proxy.method_missing(meth, *args, &block) end
Wait for an actor to terminate
# File lib/celluloid/actor.rb, line 131 def join(actor, timeout = nil) actor.thread.join(timeout) actor end
Forcibly kill a given actor
# File lib/celluloid/actor.rb, line 122 def kill(actor) actor.thread.kill begin actor.mailbox.shutdown rescue DeadActorError end end
Link to another actor
# File lib/celluloid/actor.rb, line 100 def link(actor) monitor actor Thread.current[:celluloid_actor].links << actor end
Are we bidirectionally linked to the given actor?
# File lib/celluloid/actor.rb, line 117 def linked_to?(actor) monitoring?(actor) && Thread.current[:celluloid_actor].links.include?(actor) end
Watch for exit events from another actor
# File lib/celluloid/actor.rb, line 88 def monitor(actor) raise NotActorError, "can't link outside actor context" unless Celluloid.actor? Thread.current[:celluloid_actor].linking_request(actor, :link) end
Are we monitoring the given actor?
# File lib/celluloid/actor.rb, line 112 def monitoring?(actor) actor.links.include? Actor.current end
Obtain the name of the current actor
# File lib/celluloid/actor.rb, line 53 def name actor = Thread.current[:celluloid_actor] raise NotActorError, "not in actor scope" unless actor actor.name end
Wrap the given subject with an Actor
# File lib/celluloid/actor.rb, line 138 def initialize(subject, options = {}) @subject = subject @mailbox = options[:mailbox] || Mailbox.new @exit_handler = options[:exit_handler] @exclusives = options[:exclusive_methods] @receiver_block_executions = options[:receiver_block_executions] @task_class = options[:task_class] || Celluloid.task_class @tasks = TaskSet.new @links = Links.new @signals = Signals.new @receivers = Receivers.new @timers = Timers.new @running = true @exclusive = false @name = nil @thread = ThreadHandle.new(:actor) do setup_thread run end @proxy = (options[:proxy_class] || ActorProxy).new(self) @subject.instance_variable_set(OWNER_IVAR, self) end
# File lib/celluloid/actor.rb, line 37 def registered Registry.root.names end
Schedule a block to run at the given time
# File lib/celluloid/actor.rb, line 281 def after(interval, &block) @timers.after(interval) { task(:timer, &block) } end
Clean up after this actor
# File lib/celluloid/actor.rb, line 402 def cleanup(exit_event) @mailbox.shutdown @links.each do |actor| begin actor.mailbox << exit_event rescue MailboxError # We're exiting/crashing, they're dead. Give up :( end end tasks.each { |task| task.terminate } rescue => ex Logger.crash("#{@subject.class}: CLEANUP CRASHED!", ex) end
Schedule a block to run at the given time
# File lib/celluloid/actor.rb, line 286 def every(interval, &block) @timers.every(interval) { task(:timer, &block) } end
Execute a code block in exclusive mode.
# File lib/celluloid/actor.rb, line 202 def exclusive if @exclusive yield else begin @exclusive = true yield ensure @exclusive = false end end end
Is this actor running in exclusive mode?
# File lib/celluloid/actor.rb, line 197 def exclusive? @exclusive end
Handle any exceptions that occur within a running actor
# File lib/celluloid/actor.rb, line 367 def handle_crash(exception) Logger.crash("#{@subject.class} crashed!", exception) shutdown ExitEvent.new(@proxy, exception) rescue => ex Logger.crash("#{@subject.class}: ERROR HANDLER CRASHED!", ex) end
Handle exit events received by this actor
# File lib/celluloid/actor.rb, line 355 def handle_exit_event(event) @links.delete event.actor # Run the exit handler if available return @subject.send(@exit_handler, event.actor, event.reason) if @exit_handler # Reraise exceptions from linked actors # If no reason is given, actor terminated cleanly raise event.reason if event.reason end
Handle standard low-priority messages
# File lib/celluloid/actor.rb, line 312 def handle_message(message) case message when SystemEvent handle_system_event message when Call task(:call, message.method) { if @receiver_block_executions && meth = message.method if meth == :__send__ meth = message.arguments.first end if @receiver_block_executions.include?(meth.to_sym) message.execute_block_on_receiver end end message.dispatch(@subject) } when BlockCall task(:invoke_block) { message.dispatch } when BlockResponse, Response message.dispatch else @receivers.handle_message(message) end message end
Handle high-priority system event messages
# File lib/celluloid/actor.rb, line 339 def handle_system_event(event) case event when ExitEvent task(:exit_handler, @exit_handler) { handle_exit_event event } when LinkingRequest event.process(links) when NamingRequest @name = event.name when TerminationRequest @running = false when SignalConditionRequest event.call end end
Perform a linking request with another actor
# File lib/celluloid/actor.rb, line 216 def linking_request(receiver, type) exclusive do start_time = Time.now receiver.mailbox << LinkingRequest.new(Actor.current, type) system_events = [] loop do wait_interval = start_time + LINKING_TIMEOUT - Time.now message = @mailbox.receive(wait_interval) do |msg| msg.is_a?(LinkingResponse) && msg.actor.mailbox.address == receiver.mailbox.address && msg.type == type end case message when LinkingResponse # We're done! system_events.each { |ev| handle_system_event(ev) } return when NilClass raise TimeoutError, "linking timeout of #{LINKING_TIMEOUT} seconds exceeded" when SystemEvent # Queue up pending system events to be processed after we've successfully linked system_events << message else raise 'wtf' end end end end
Receive an asynchronous message
# File lib/celluloid/actor.rb, line 257 def receive(timeout = nil, &block) loop do message = @receivers.receive(timeout, &block) break message unless message.is_a?(SystemEvent) handle_system_event(message) end end
Run the actor loop
# File lib/celluloid/actor.rb, line 170 def run begin while @running if message = @mailbox.receive(timeout_interval) handle_message message else # No message indicates a timeout @timers.fire @receivers.fire_timers end end rescue MailboxShutdown # If the mailbox detects shutdown, exit the actor end shutdown rescue Exception => ex handle_crash(ex) raise unless ex.is_a? StandardError end
Run the user-defined finalizer, if one is set
# File lib/celluloid/actor.rb, line 384 def run_finalizer # FIXME: remove before Celluloid 1.0 if @subject.respond_to?(:finalize) && @subject.class.finalizer != :finalize Logger.warn("DEPRECATION WARNING: #{@subject.class}#finalize is deprecated and will be removed in Celluloid 1.0. " + "Define finalizers with '#{@subject.class}.finalizer :callback.'") task(:finalizer, :finalize) { @subject.finalize } end finalizer = @subject.class.finalizer if finalizer && @subject.respond_to?(finalizer, true) task(:finalizer, :finalize) { @subject.__send__(finalizer) } end rescue => ex Logger.crash("#{@subject.class}#finalize crashed!", ex) end
# File lib/celluloid/actor.rb, line 164 def setup_thread Thread.current[:celluloid_actor] = self Thread.current[:celluloid_mailbox] = @mailbox end
Handle cleaning up this actor after it exits
# File lib/celluloid/actor.rb, line 375 def shutdown(exit_event = ExitEvent.new(@proxy)) run_finalizer cleanup exit_event ensure Thread.current[:celluloid_actor] = nil Thread.current[:celluloid_mailbox] = nil end
Send a signal with the given name to all waiting methods
# File lib/celluloid/actor.rb, line 247 def signal(name, value = nil) @signals.send name, value end
Sleep for the given amount of time
# File lib/celluloid/actor.rb, line 306 def sleep(interval) sleeper = Sleeper.new(@timers, interval) Celluloid.suspend(:sleeping, sleeper) end
Run a method inside a task unless it's exclusive
# File lib/celluloid/actor.rb, line 418 def task(task_type, method_name = nil, &block) if @exclusives && (@exclusives == :all || (method_name && @exclusives.include?(method_name.to_sym))) exclusive { block.call } else @task_class.new(task_type, &block).resume end end
Terminate this actor
# File lib/celluloid/actor.rb, line 192 def terminate @running = false end
Generated with the Darkfish Rdoc Generator 2.