class Celluloid::Actor
Actors are Celluloid's concurrency primitive. They're implemented as normal Ruby objects wrapped in threads which communicate with asynchronous messages.
Attributes
Public Class Methods
Obtain all running actors in the system
# File lib/celluloid/actor.rb, line 49 def all Celluloid.actor_system.running end
Invoke a method asynchronously on an actor via its mailbox
# File lib/celluloid/actor.rb, line 37 def async(mailbox, meth, *args, &block) proxy = Proxy::Async.new(mailbox, "UnknownClass") proxy.method_missing(meth, *args, &block) end
Invoke a method on the given actor via its mailbox
# File lib/celluloid/actor.rb, line 31 def call(mailbox, meth, *args, &block) proxy = Proxy::Sync.new(mailbox, "UnknownClass") proxy.method_missing(meth, *args, &block) end
Obtain the current actor
# File lib/celluloid/actor.rb, line 17 def current actor = Thread.current[:celluloid_actor] fail NotActorError, "not in actor scope" unless actor actor.behavior_proxy end
Call a method asynchronously and retrieve its value later
# File lib/celluloid/actor.rb, line 43 def future(mailbox, meth, *args, &block) proxy = Proxy::Future.new(mailbox, "UnknownClass") proxy.method_missing(meth, *args, &block) end
Wait for an actor to terminate
# File lib/celluloid/actor.rb, line 96 def join(actor, timeout = nil) actor.thread.join(timeout) actor end
Forcibly kill a given actor
# File lib/celluloid/actor.rb, line 89 def kill(actor) actor.thread.kill actor.mailbox.shutdown if actor.mailbox.alive? end
Link to another actor
# File lib/celluloid/actor.rb, line 66 def link(actor) monitor actor Thread.current[:celluloid_actor].links << actor end
Are we bidirectionally linked to the given actor?
# File lib/celluloid/actor.rb, line 83 def linked_to?(actor) monitoring?(actor) && Thread.current[:celluloid_actor].links.include?(actor) end
Watch for exit events from another actor
# File lib/celluloid/actor.rb, line 54 def monitor(actor) fail NotActorError, "can't link outside actor context" unless Celluloid.actor? Thread.current[:celluloid_actor].linking_request(actor, :link) end
Are we monitoring the given actor?
# File lib/celluloid/actor.rb, line 78 def monitoring?(actor) actor.links.include? Actor.current end
# File lib/celluloid/actor.rb, line 102 def initialize(behavior, options) @behavior = behavior @actor_system = options.fetch(:actor_system) @mailbox = options.fetch(:mailbox_class, Mailbox).new @mailbox.max_size = options.fetch(:mailbox_size, nil) @task_class = options[:task_class] || Celluloid.task_class @exit_handler = method(:default_exit_handler) @exclusive = options.fetch(:exclusive, false) @timers = Timers::Group.new @tasks = Internals::TaskSet.new @links = Internals::Links.new @handlers = Internals::Handlers.new @receivers = Internals::Receivers.new(@timers) @signals = Internals::Signals.new @running = false @name = nil handle(SystemEvent) do |message| handle_system_event message end end
Obtain the name of the current actor
# File lib/celluloid/actor.rb, line 24 def registered_name actor = Thread.current[:celluloid_actor] fail NotActorError, "not in actor scope" unless actor actor.name end
Unlink from another actor
# File lib/celluloid/actor.rb, line 72 def unlink(actor) unmonitor actor Thread.current[:celluloid_actor].links.delete actor end
Stop waiting for exit events from another actor
# File lib/celluloid/actor.rb, line 60 def unmonitor(actor) fail NotActorError, "can't link outside actor context" unless Celluloid.actor? Thread.current[:celluloid_actor].linking_request(actor, :unlink) end
Public Instance Methods
Schedule a block to run at the given time
# File lib/celluloid/actor.rb, line 238 def after(interval, &block) @timers.after(interval) { task(:timer, &block) } end
# File lib/celluloid/actor.rb, line 139 def behavior_proxy @behavior.proxy end
Clean up after this actor
# File lib/celluloid/actor.rb, line 314 def cleanup(exit_event) Celluloid::Probe.actor_died(self) if $CELLULOID_MONITORING @mailbox.shutdown @links.each do |actor| actor.mailbox << exit_event if actor.mailbox.alive? end tasks.to_a.each do |task| begin task.terminate rescue DeadTaskError # TODO: not tested (failed on Travis) end end rescue => ex # TODO: metadata Internals::Logger.crash("CLEANUP CRASHED!", ex) end
# File lib/celluloid/actor.rb, line 291 def default_exit_handler(event) fail event.reason if event.reason end
Schedule a block to run at the given time
# File lib/celluloid/actor.rb, line 243 def every(interval, &block) @timers.every(interval) { task(:timer, &block) } end
Register a new handler for a given pattern
# File lib/celluloid/actor.rb, line 223 def handle(*patterns, &block) @handlers.handle(*patterns, &block) end
Handle any exceptions that occur within a running actor
# File lib/celluloid/actor.rb, line 296 def handle_crash(exception) # TODO: add meta info Internals::Logger.crash("Actor crashed!", exception) shutdown ExitEvent.new(behavior_proxy, exception) rescue => ex Internals::Logger.crash("Actor#handle_crash CRASHED!", ex) end
Handle standard low-priority messages
# File lib/celluloid/actor.rb, line 282 def handle_message(message) unless @handlers.handle_message(message) unless @receivers.handle_message(message) Internals::Logger.debug "Discarded message (unhandled): #{message}" if $CELLULOID_DEBUG end end message end
Handle high-priority system event messages
# File lib/celluloid/system_events.rb, line 4 def handle_system_event(event) if handler = SystemEvent.handle(event.class) send(handler, event) else Internals::Logger.debug "Discarded message (unhandled): #{message}" if $CELLULOID_DEBUG end end
Perform a linking request with another actor
# File lib/celluloid/actor.rb, line 181 def linking_request(receiver, type) Celluloid.exclusive do receiver.mailbox << LinkingRequest.new(Actor.current, type) system_events = [] Timers::Wait.for(LINKING_TIMEOUT) do |remaining| begin message = @mailbox.receive(remaining) do |msg| msg.is_a?(LinkingResponse) && msg.actor.mailbox.address == receiver.mailbox.address && msg.type == type end rescue TaskTimeout next # IO reactor did something, no message in queue yet. end if message.instance_of? LinkingResponse Celluloid::Probe.actors_linked(self, receiver) if $CELLULOID_MONITORING system_events.each { |ev| @mailbox << ev } return elsif message.is_a? SystemEvent # Queue up pending system events to be processed after we've successfully linked system_events << message else fail "Unexpected message type: #{message.class}. Expected LinkingResponse, NilClass, SystemEvent." end end fail TaskTimeout, "linking timeout of #{LINKING_TIMEOUT} seconds exceeded with receiver: #{receiver}" end end
Receive an asynchronous message
# File lib/celluloid/actor.rb, line 228 def receive(timeout = nil, &block) while true message = @receivers.receive(timeout, &block) return message unless message.is_a?(SystemEvent) handle_system_event(message) end end
Run the actor loop
# File lib/celluloid/actor.rb, line 149 def run while @running begin @timers.wait do |interval| interval = 0 if interval && interval < 0 if message = @mailbox.check(interval) handle_message(message) break unless @running end end rescue MailboxShutdown @running = false rescue MailboxDead # TODO: not tests (but fails occasionally in tests) @running = false end end shutdown rescue ::Exception => ex handle_crash(ex) raise unless ex.is_a?(StandardError) || ex.is_a?(Celluloid::Interruption) end
# File lib/celluloid/actor.rb, line 143 def setup_thread Thread.current[:celluloid_actor] = self Thread.current[:celluloid_mailbox] = @mailbox end
Handle cleaning up this actor after it exits
# File lib/celluloid/actor.rb, line 305 def shutdown(exit_event = ExitEvent.new(behavior_proxy)) @behavior.shutdown cleanup exit_event ensure Thread.current[:celluloid_actor] = nil Thread.current[:celluloid_mailbox] = nil end
Send a signal with the given name to all waiting methods
# File lib/celluloid/actor.rb, line 213 def signal(name, value = nil) @signals.broadcast name, value end
Sleep for the given amount of time
# File lib/celluloid/actor.rb, line 276 def sleep(interval) sleeper = Sleeper.new(@timers, interval) Celluloid.suspend(:sleeping, sleeper) end
# File lib/celluloid/actor.rb, line 127 def start @running = true @thread = Internals::ThreadHandle.new(@actor_system, :actor) do setup_thread run end @proxy = Proxy::Actor.new(@mailbox, @thread) Celluloid::Probe.actor_created(self) if $CELLULOID_MONITORING Celluloid::Actor::Manager.actor_created(self) if $CELLULOID_MANAGED end
Run a method inside a task unless it's exclusive
# File lib/celluloid/actor.rb, line 334 def task(task_type, meta = nil) @task_class.new(task_type, meta) do if @exclusive Celluloid.exclusive { yield } else yield end end.resume end
Terminate this actor
# File lib/celluloid/actor.rb, line 176 def terminate @running = false end
# File lib/celluloid/actor.rb, line 247 def timeout(duration) bt = caller task = Task.current timer = @timers.after(duration) do exception = TaskTimeout.new("execution expired") exception.set_backtrace bt task.resume exception end yield ensure timer.cancel if timer end
Wait for the given signal
# File lib/celluloid/actor.rb, line 218 def wait(name) @signals.wait name end