Parent

Namespace

Class/Module Index [+]

Quicksearch

Celluloid::Actor

Actors are Celluloid's concurrency primitive. They're implemented as normal Ruby objects wrapped in threads which communicate with asynchronous messages.

Attributes

mailbox[R]
name[R]
proxy[R]
subject[R]
tasks[R]
thread[R]

Public Class Methods

all() click to toggle source

Obtain all running actors in the system

# File lib/celluloid/actor.rb, line 78
def all
  actors = []
  Thread.list.each do |t|
    next unless t.celluloid? && t.role == :actor
    actors << t.actor.proxy if t.actor && t.actor.respond_to?(:proxy)
  end
  actors
end
async(mailbox, meth, *args, &block) click to toggle source

Invoke a method asynchronously on an actor via its mailbox

# File lib/celluloid/actor.rb, line 66
def async(mailbox, meth, *args, &block)
  proxy = AsyncProxy.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end
call(mailbox, meth, *args, &block) click to toggle source

Invoke a method on the given actor via its mailbox

# File lib/celluloid/actor.rb, line 60
def call(mailbox, meth, *args, &block)
  proxy = SyncProxy.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end
clear_registry() click to toggle source
# File lib/celluloid/actor.rb, line 41
def clear_registry
  Registry.root.clear
end
current() click to toggle source

Obtain the current actor

# File lib/celluloid/actor.rb, line 46
def current
  actor = Thread.current[:celluloid_actor]
  raise NotActorError, "not in actor scope" unless actor
  actor.proxy
end
future(mailbox, meth, *args, &block) click to toggle source

Call a method asynchronously and retrieve its value later

# File lib/celluloid/actor.rb, line 72
def future(mailbox, meth, *args, &block)
  proxy = FutureProxy.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end
join(actor, timeout = nil) click to toggle source

Wait for an actor to terminate

# File lib/celluloid/actor.rb, line 131
def join(actor, timeout = nil)
  actor.thread.join(timeout)
  actor
end
kill(actor) click to toggle source

Forcibly kill a given actor

# File lib/celluloid/actor.rb, line 122
def kill(actor)
  actor.thread.kill
  begin
    actor.mailbox.shutdown
  rescue DeadActorError
  end
end
linked_to?(actor) click to toggle source

Are we bidirectionally linked to the given actor?

# File lib/celluloid/actor.rb, line 117
def linked_to?(actor)
  monitoring?(actor) && Thread.current[:celluloid_actor].links.include?(actor)
end
monitor(actor) click to toggle source

Watch for exit events from another actor

# File lib/celluloid/actor.rb, line 88
def monitor(actor)
  raise NotActorError, "can't link outside actor context" unless Celluloid.actor?
  Thread.current[:celluloid_actor].linking_request(actor, :link)
end
monitoring?(actor) click to toggle source

Are we monitoring the given actor?

# File lib/celluloid/actor.rb, line 112
def monitoring?(actor)
  actor.links.include? Actor.current
end
name() click to toggle source

Obtain the name of the current actor

# File lib/celluloid/actor.rb, line 53
def name
  actor = Thread.current[:celluloid_actor]
  raise NotActorError, "not in actor scope" unless actor
  actor.name
end
new(subject, options = {}) click to toggle source

Wrap the given subject with an Actor

# File lib/celluloid/actor.rb, line 138
def initialize(subject, options = {})
  @subject      = subject
  @mailbox      = options[:mailbox] || Mailbox.new
  @exit_handler = options[:exit_handler]
  @exclusives   = options[:exclusive_methods]
  @receiver_block_executions = options[:receiver_block_executions]
  @task_class   = options[:task_class] || Celluloid.task_class

  @tasks     = TaskSet.new
  @links     = Links.new
  @signals   = Signals.new
  @receivers = Receivers.new
  @timers    = Timers.new
  @running   = true
  @exclusive = false
  @name      = nil

  @thread = ThreadHandle.new(:actor) do
    setup_thread
    run
  end

  @proxy = (options[:proxy_class] || ActorProxy).new(self)
  @subject.instance_variable_set(OWNER_IVAR, self)
end
registered() click to toggle source
# File lib/celluloid/actor.rb, line 37
def registered
  Registry.root.names
end
unmonitor(actor) click to toggle source

Stop waiting for exit events from another actor

# File lib/celluloid/actor.rb, line 94
def unmonitor(actor)
  raise NotActorError, "can't link outside actor context" unless Celluloid.actor?
  Thread.current[:celluloid_actor].linking_request(actor, :unlink)
end

Public Instance Methods

after(interval, &block) click to toggle source

Schedule a block to run at the given time

# File lib/celluloid/actor.rb, line 281
def after(interval, &block)
  @timers.after(interval) { task(:timer, &block) }
end
cleanup(exit_event) click to toggle source

Clean up after this actor

# File lib/celluloid/actor.rb, line 402
def cleanup(exit_event)
  @mailbox.shutdown
  @links.each do |actor|
    begin
      actor.mailbox << exit_event
    rescue MailboxError
      # We're exiting/crashing, they're dead. Give up :(
    end
  end

  tasks.each { |task| task.terminate }
rescue => ex
  Logger.crash("#{@subject.class}: CLEANUP CRASHED!", ex)
end
every(interval, &block) click to toggle source

Schedule a block to run at the given time

# File lib/celluloid/actor.rb, line 286
def every(interval, &block)
  @timers.every(interval) { task(:timer, &block) }
end
exclusive() click to toggle source

Execute a code block in exclusive mode.

# File lib/celluloid/actor.rb, line 202
def exclusive
  if @exclusive
    yield
  else
    begin
      @exclusive = true
      yield
    ensure
      @exclusive = false
    end
  end
end
exclusive?() click to toggle source

Is this actor running in exclusive mode?

# File lib/celluloid/actor.rb, line 197
def exclusive?
  @exclusive
end
handle_crash(exception) click to toggle source

Handle any exceptions that occur within a running actor

# File lib/celluloid/actor.rb, line 367
def handle_crash(exception)
  Logger.crash("#{@subject.class} crashed!", exception)
  shutdown ExitEvent.new(@proxy, exception)
rescue => ex
  Logger.crash("#{@subject.class}: ERROR HANDLER CRASHED!", ex)
end
handle_exit_event(event) click to toggle source

Handle exit events received by this actor

# File lib/celluloid/actor.rb, line 355
def handle_exit_event(event)
  @links.delete event.actor

  # Run the exit handler if available
  return @subject.send(@exit_handler, event.actor, event.reason) if @exit_handler

  # Reraise exceptions from linked actors
  # If no reason is given, actor terminated cleanly
  raise event.reason if event.reason
end
handle_message(message) click to toggle source

Handle standard low-priority messages

# File lib/celluloid/actor.rb, line 312
def handle_message(message)
  case message
  when SystemEvent
    handle_system_event message
  when Call
    task(:call, message.method) {
      if @receiver_block_executions && meth = message.method
        if meth == :__send__
          meth = message.arguments.first
        end
        if @receiver_block_executions.include?(meth.to_sym)
          message.execute_block_on_receiver
        end
      end
      message.dispatch(@subject)
    }
  when BlockCall
    task(:invoke_block) { message.dispatch }
  when BlockResponse, Response
    message.dispatch
  else
    @receivers.handle_message(message)
  end
  message
end
handle_system_event(event) click to toggle source

Handle high-priority system event messages

# File lib/celluloid/actor.rb, line 339
def handle_system_event(event)
  case event
  when ExitEvent
    task(:exit_handler, @exit_handler) { handle_exit_event event }
  when LinkingRequest
    event.process(links)
  when NamingRequest
    @name = event.name
  when TerminationRequest
    @running = false
  when SignalConditionRequest
    event.call
  end
end
linking_request(receiver, type) click to toggle source

Perform a linking request with another actor

# File lib/celluloid/actor.rb, line 216
def linking_request(receiver, type)
  exclusive do
    start_time = Time.now
    receiver.mailbox << LinkingRequest.new(Actor.current, type)
    system_events = []

    loop do
      wait_interval = start_time + LINKING_TIMEOUT - Time.now
      message = @mailbox.receive(wait_interval) do |msg|
        msg.is_a?(LinkingResponse) &&
        msg.actor.mailbox.address == receiver.mailbox.address &&
        msg.type == type
      end

      case message
      when LinkingResponse
        # We're done!
        system_events.each { |ev| handle_system_event(ev) }
        return
      when NilClass
        raise TimeoutError, "linking timeout of #{LINKING_TIMEOUT} seconds exceeded"
      when SystemEvent
        # Queue up pending system events to be processed after we've successfully linked
        system_events << message
      else raise 'wtf'
      end
    end
  end
end
receive(timeout = nil, &block) click to toggle source

Receive an asynchronous message

# File lib/celluloid/actor.rb, line 257
def receive(timeout = nil, &block)
  loop do
    message = @receivers.receive(timeout, &block)
    break message unless message.is_a?(SystemEvent)

    handle_system_event(message)
  end
end
run() click to toggle source

Run the actor loop

# File lib/celluloid/actor.rb, line 170
def run
  begin
    while @running
      if message = @mailbox.receive(timeout_interval)
        handle_message message
      else
        # No message indicates a timeout
        @timers.fire
        @receivers.fire_timers
      end
    end
  rescue MailboxShutdown
    # If the mailbox detects shutdown, exit the actor
  end

  shutdown
rescue Exception => ex
  handle_crash(ex)
  raise unless ex.is_a? StandardError
end
run_finalizer() click to toggle source

Run the user-defined finalizer, if one is set

# File lib/celluloid/actor.rb, line 384
def run_finalizer
  # FIXME: remove before Celluloid 1.0
  if @subject.respond_to?(:finalize) && @subject.class.finalizer != :finalize
    Logger.warn("DEPRECATION WARNING: #{@subject.class}#finalize is deprecated and will be removed in Celluloid 1.0. " +
      "Define finalizers with '#{@subject.class}.finalizer :callback.'")

    task(:finalizer, :finalize) { @subject.finalize }
  end

  finalizer = @subject.class.finalizer
  if finalizer && @subject.respond_to?(finalizer, true)
    task(:finalizer, :finalize) { @subject.__send__(finalizer) }
  end
rescue => ex
  Logger.crash("#{@subject.class}#finalize crashed!", ex)
end
setup_thread() click to toggle source
# File lib/celluloid/actor.rb, line 164
def setup_thread
  Thread.current[:celluloid_actor]   = self
  Thread.current[:celluloid_mailbox] = @mailbox
end
shutdown(exit_event = ExitEvent.new(@proxy)) click to toggle source

Handle cleaning up this actor after it exits

# File lib/celluloid/actor.rb, line 375
def shutdown(exit_event = ExitEvent.new(@proxy))
  run_finalizer
  cleanup exit_event
ensure
  Thread.current[:celluloid_actor]   = nil
  Thread.current[:celluloid_mailbox] = nil
end
signal(name, value = nil) click to toggle source

Send a signal with the given name to all waiting methods

# File lib/celluloid/actor.rb, line 247
def signal(name, value = nil)
  @signals.send name, value
end
sleep(interval) click to toggle source

Sleep for the given amount of time

# File lib/celluloid/actor.rb, line 306
def sleep(interval)
  sleeper = Sleeper.new(@timers, interval)
  Celluloid.suspend(:sleeping, sleeper)
end
task(task_type, method_name = nil, &block) click to toggle source

Run a method inside a task unless it's exclusive

# File lib/celluloid/actor.rb, line 418
def task(task_type, method_name = nil, &block)
  if @exclusives && (@exclusives == :all || (method_name && @exclusives.include?(method_name.to_sym)))
    exclusive { block.call }
  else
    @task_class.new(task_type, &block).resume
  end
end
terminate() click to toggle source

Terminate this actor

# File lib/celluloid/actor.rb, line 192
def terminate
  @running = false
end
timeout_interval() click to toggle source

How long to wait until the next timer fires

# File lib/celluloid/actor.rb, line 267
def timeout_interval
  i1 = @timers.wait_interval
  i2 = @receivers.wait_interval

  if i1 and i2
    i1 < i2 ? i1 : i2
  elsif i1
    i1
  else
    i2
  end
end
wait(name) click to toggle source

Wait for the given signal

# File lib/celluloid/actor.rb, line 252
def wait(name)
  @signals.wait name
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.