Parent

Class/Module Index [+]

Quicksearch

Bunny::ReaderLoop

Network activity loop that reads and passes incoming AMQP 0.9.1 methods for processing. They are dispatched further down the line in Bunny::Session and Bunny::Channel. This loop uses a separate thread internally.

This mimics the way RabbitMQ Java is designed quite closely. @private

Public Class Methods

new(transport, session, session_thread) click to toggle source
# File lib/bunny/reader_loop.rb, line 12
def initialize(transport, session, session_thread)
  @transport      = transport
  @session        = session
  @session_thread = session_thread
  @logger         = @session.logger
end

Public Instance Methods

kill() click to toggle source
# File lib/bunny/reader_loop.rb, line 89
def kill
  @thread.kill
  @thread.join
end
log_exception(e) click to toggle source
# File lib/bunny/reader_loop.rb, line 94
def log_exception(e)
  @logger.error "Exception in the reader loop: #{e.class.name}: #{e.message}"
  @logger.error "Backtrace: "
  e.backtrace.each do |line|
    @logger.error "\t#{line}"
  end
end
resume() click to toggle source
# File lib/bunny/reader_loop.rb, line 25
def resume
  start
end
run_loop() click to toggle source
# File lib/bunny/reader_loop.rb, line 30
def run_loop
  loop do
    begin
      break if @stopping || @network_is_down
      run_once
    rescue Errno::EBADF => ebadf
      # ignored, happens when we loop after the transport has already been closed
    rescue AMQ::Protocol::EmptyResponseError, IOError, SystemCallError => e
      log_exception(e)

      @network_is_down = true

      if @session.automatically_recover?
        @session.handle_network_failure(e)
      else
        @session_thread.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
      end
    rescue Exception => e
      log_exception(e)

      @network_is_down = true
      @session_thread.raise(Bunny::NetworkFailure.new("caught an unexpected exception in the network loop: #{e.message}", e))
    end
  end

  @stopped = true
end
run_once() click to toggle source
# File lib/bunny/reader_loop.rb, line 58
def run_once
  frame = @transport.read_next_frame
  return if frame.is_a?(AMQ::Protocol::HeartbeatFrame)

  if !frame.final? || frame.method_class.has_content?
    header   = @transport.read_next_frame
    content  = ''

    if header.body_size > 0
      loop do
        body_frame = @transport.read_next_frame
        content << body_frame.decode_payload

        break if content.bytesize >= header.body_size
      end
    end

    @session.handle_frameset(frame.channel, [frame.decode_payload, header.decode_payload, content])
  else
    @session.handle_frame(frame.channel, frame.decode_payload)
  end
end
start() click to toggle source
# File lib/bunny/reader_loop.rb, line 20
def start
  @thread    = Thread.new(&method(:run_loop))
  @thread.abort_on_exception = true
end
stop() click to toggle source
# File lib/bunny/reader_loop.rb, line 81
def stop
  @stopping = true
end
stopped?() click to toggle source
# File lib/bunny/reader_loop.rb, line 85
def stopped?
  @stopped
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.