class MessageBus::TimerThread

Attributes

jobs[R]

Public Class Methods

new() click to toggle source
# File lib/message_bus/timer_thread.rb, line 24
def initialize
  @stopped = false
  @jobs = []
  @mutex = Mutex.new
  @next = nil
  @thread = Thread.new{do_work}
  @on_error = lambda{|e| STDERR.puts "Exception while processing Timer:\n #{e.backtrace.join("\n")}"}
end

Public Instance Methods

every(delay, &block) click to toggle source
# File lib/message_bus/timer_thread.rb, line 45
def every(delay, &block)
  result = CancelableEvery.new
  do_work = proc do
    begin
      block.call
    ensure
      result.current = queue(delay, &do_work)
    end
  end
  result.current = queue(delay,&do_work)
  result
end
on_error(&block) click to toggle source
# File lib/message_bus/timer_thread.rb, line 87
def on_error(&block)
  @on_error = block
end
queue(delay=0, &block) click to toggle source

queue a block to run after a certain delay (in seconds)

# File lib/message_bus/timer_thread.rb, line 59
def queue(delay=0, &block)
  queue_time = Time.new.to_f + delay
  job = [queue_time, block]

  @mutex.synchronize do
    i = @jobs.length
    while i > 0
      i -= 1
      current,_ = @jobs[i]
      i+=1 and break if current < queue_time
    end
    @jobs.insert(i, job)
    @next = queue_time if i==0
  end

  unless @thread.alive?
    @mutex.synchronize do
      @thread = Thread.new{do_work} unless @thread.alive?
    end
  end

  if @thread.status == "sleep".freeze
    @thread.wakeup
  end

  Cancelable.new(job)
end
stop() click to toggle source
# File lib/message_bus/timer_thread.rb, line 33
def stop
  @stopped = true
  running = true
  while running
    @mutex.synchronize do
      running = @thread && @thread.alive?
      @thread.wakeup if running
    end
    sleep 0
  end
end

Protected Instance Methods

do_work() click to toggle source
# File lib/message_bus/timer_thread.rb, line 93
def do_work
  while !@stopped
    if @next && @next <= Time.new.to_f
      _,blk = @mutex.synchronize { @jobs.shift }
      begin
        blk.call
      rescue => e
        @on_error.call(e) if @on_error
      end
      @mutex.synchronize do
        @next,_ = @jobs[0]
      end
    end
    unless @next && @next <= Time.new.to_f
      sleep_time = 1000
      @mutex.synchronize do
        sleep_time = @next-Time.new.to_f if @next
      end
      sleep [0,sleep_time].max
    end
  end
end