class Bosh::ThreadPool
Public Class Methods
new(options = {})
click to toggle source
# File lib/common/thread_pool.rb, line 5 def initialize(options = {}) @actions = [] @lock = Mutex.new @cv = ConditionVariable.new @max_threads = options[:max_threads] || 1 @available_threads = @max_threads @logger = options[:logger] @boom = nil @original_thread = Thread.current @threads = [] @state = :open end
Public Instance Methods
create_thread()
click to toggle source
# File lib/common/thread_pool.rb, line 60 def create_thread thread = Thread.new do begin loop do action = nil @lock.synchronize do action = @actions.shift unless @boom unless action @logger.debug('Thread is no longer needed, cleaning up') @available_threads += 1 @threads.delete(thread) if @state == :open end end break unless action begin action.call rescue Exception => e raise_worker_exception(e) end end end @lock.synchronize { @cv.signal unless working? } end @threads << thread end
pause()
click to toggle source
# File lib/common/thread_pool.rb, line 27 def pause @lock.synchronize do @state = :paused end end
process(&block)
click to toggle source
# File lib/common/thread_pool.rb, line 43 def process(&block) @lock.synchronize do @actions << block if @state == :open if @available_threads > 0 @logger.debug('Creating new thread') @available_threads -= 1 create_thread else @logger.debug('All threads are currently busy, queuing action') end elsif @state == :paused @logger.debug('Pool is paused, queueing action') end end end
raise_worker_exception(exception)
click to toggle source
# File lib/common/thread_pool.rb, line 88 def raise_worker_exception(exception) if exception.respond_to?(:backtrace) @logger.error("Worker thread raised exception: #{exception} - #{exception.backtrace.join("\n")}") else @logger.error("Worker thread raised exception: #{exception}") end @lock.synchronize do @boom = exception if @boom.nil? end end
resume()
click to toggle source
# File lib/common/thread_pool.rb, line 33 def resume @lock.synchronize do @state = :open [@available_threads, @actions.size].min.times do @available_threads -= 1 create_thread end end end
shutdown()
click to toggle source
# File lib/common/thread_pool.rb, line 111 def shutdown return if @state == :closed @logger.debug('Shutting down pool') @lock.synchronize do return if @state == :closed @state = :closed @actions.clear end @threads.each { |t| t.join } end
wait()
click to toggle source
# File lib/common/thread_pool.rb, line 103 def wait @logger.debug('Waiting for tasks to complete') @lock.synchronize do @cv.wait(@lock) while working? raise @boom if @boom end end
working?()
click to toggle source
# File lib/common/thread_pool.rb, line 99 def working? @boom.nil? && (@available_threads != @max_threads || !@actions.empty?) end
wrap() { |self| ... }
click to toggle source
# File lib/common/thread_pool.rb, line 18 def wrap begin yield self wait ensure shutdown end end