Parent

Class/Module Index [+]

Quicksearch

Celluloid::InternalPool

Maintain a thread pool FOR SPEED!!

Attributes

max_idle[RW]

Public Class Methods

new() click to toggle source
# File lib/celluloid/internal_pool.rb, line 8
def initialize
  @group = ThreadGroup.new
  @mutex = Mutex.new
  @threads = []

  # TODO: should really adjust this based on usage
  @max_idle = 16
  @running = true
end

Public Instance Methods

active?() click to toggle source
# File lib/celluloid/internal_pool.rb, line 47
def active?
  to_a.any?
end
assert_inactive() click to toggle source
# File lib/celluloid/internal_pool.rb, line 32
def assert_inactive
  if active?
    message = "Thread pool is still active"
    if defined?(JRUBY_VERSION)
      Celluloid.logger.warn message
    else
      raise Error, message
    end
  end
end
assert_running() click to toggle source
# File lib/celluloid/internal_pool.rb, line 26
def assert_running
  unless running?
    raise Error, "Thread pool is not running"
  end
end
busy_size() click to toggle source
# File lib/celluloid/internal_pool.rb, line 18
def busy_size
  @threads.select(&:busy).size
end
clean_thread_locals(thread) click to toggle source

Clean the thread locals of an incoming thread

# File lib/celluloid/internal_pool.rb, line 116
def clean_thread_locals(thread)
  thread.keys.each do |key|
    next if key == :celluloid_queue

    # Ruby seems to lack an API for deleting thread locals. WTF, Ruby?
    thread[key] = nil
  end
end
create() click to toggle source

Create a new thread with an associated queue of procs to run

# File lib/celluloid/internal_pool.rb, line 95
def create
  queue = Queue.new
  thread = Thread.new do
    while proc = queue.pop
      begin
        proc.call
      rescue => ex
        Logger.crash("thread crashed", ex)
      end

      put thread
    end
  end

  thread[:celluloid_queue] = queue
  @threads << thread
  @group.add(thread)
  thread
end
each() click to toggle source
# File lib/celluloid/internal_pool.rb, line 51
def each
  @threads.each do |thread|
    yield thread
  end
end
get(&block) click to toggle source

Get a thread from the pool, running the given block

# File lib/celluloid/internal_pool.rb, line 62
def get(&block)
  @mutex.synchronize do
    assert_running

    begin
      idle = @threads.reject(&:busy)
      if idle.empty?
        thread = create
      else
        thread = idle.first
      end
    end until thread.status # handle crashed threads

    thread.busy = true
    thread[:celluloid_queue] << block
    thread
  end
end
idle_size() click to toggle source
# File lib/celluloid/internal_pool.rb, line 22
def idle_size
  @threads.reject(&:busy).size
end
kill() click to toggle source
# File lib/celluloid/internal_pool.rb, line 134
def kill
  @mutex.synchronize do
    finalize
    @running = false

    @threads.shift.kill until @threads.empty?
    @group.list.each(&:kill)
  end
end
put(thread) click to toggle source

Return a thread to the pool

# File lib/celluloid/internal_pool.rb, line 82
def put(thread)
  @mutex.synchronize do
    thread.busy = false
    if idle_size >= @max_idle
      thread[:celluloid_queue] << nil
      @threads.delete(thread)
    else
      clean_thread_locals(thread)
    end
  end
end
running?() click to toggle source
# File lib/celluloid/internal_pool.rb, line 43
def running?
  @running
end
shutdown() click to toggle source
# File lib/celluloid/internal_pool.rb, line 125
def shutdown
  @mutex.synchronize do
    finalize
    @threads.each do |thread|
      thread[:celluloid_queue] << nil
    end
  end
end
to_a() click to toggle source
# File lib/celluloid/internal_pool.rb, line 57
def to_a
  @threads
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.