Included Modules

AMQP::Client

Public Class Methods

connect(opts = {}) click to toggle source
# File lib/amqp/client.rb, line 187
def self.connect opts = {}
  opts = AMQP.settings.merge(opts)
  EM.connect opts[:host], opts[:port], self, opts
end
new(opts = {}) click to toggle source
# File lib/amqp/client.rb, line 61
def initialize opts = {}
  @settings = opts
  extend AMQP.client

  @on_disconnect ||= proc{ raise Error, "Could not connect to server #{opts[:host]}:#{opts[:port]}" }

  timeout @settings[:timeout] if @settings[:timeout]
  errback{ @on_disconnect.call } unless @reconnecting

  @connected = false
end

Public Instance Methods

add_channel(mq) click to toggle source
# File lib/amqp/client.rb, line 100
def add_channel mq
  (@_channel_mutex ||= Mutex.new).synchronize do
    channels[ key = (channels.keys.max || 0) + 1 ] = mq
    key
  end
end
channels() click to toggle source
# File lib/amqp/client.rb, line 107
def channels
  @channels ||= {}
end
close(&on_disconnect) click to toggle source

def send_data data

log 'send_data', data
super

end

# File lib/amqp/client.rb, line 142
def close &on_disconnect
  if on_disconnect
    @closing = true
    @on_disconnect = proc{
      on_disconnect.call
      @closing = false
    }
  end

  callback{ |c|
    if c.channels.any?
      c.channels.each do |ch, mq|
        mq.close
      end
    else
      send Protocol::Connection::Close.new(:reply_code => 200,
                                           :reply_text => 'Goodbye',
                                           :class_id => 0,
                                           :method_id => 0)
    end
  }
end
connected?() click to toggle source
# File lib/amqp/client.rb, line 90
def connected?
  @connected
end
connection_completed() click to toggle source
# File lib/amqp/client.rb, line 73
def connection_completed
  start_tls if @settings[:ssl]
  log 'connected'
  # @on_disconnect = proc{ raise Error, 'Disconnected from server' }
  unless @closing
    @on_disconnect = method(:disconnected)
    @reconnecting = false
  end

  @connected = true
  @connection_status.call(:connected) if @connection_status

  @buf = Buffer.new
  send_data HEADER
  send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4')
end
connection_status(&blk) click to toggle source
# File lib/amqp/client.rb, line 192
def connection_status &blk
  @connection_status = blk
end
process_frame(frame) click to toggle source
# File lib/amqp/client.rb, line 121
def process_frame frame
  # this is a stub meant to be
  # replaced by the module passed into initialize
end
receive_data(data) click to toggle source
# File lib/amqp/client.rb, line 111
def receive_data data
  # log 'receive_data', data
  @buf << data

  while frame = Frame.parse(@buf)
    log 'receive', frame
    process_frame frame
  end
end
reconnect(force = false) click to toggle source
# File lib/amqp/client.rb, line 165
def reconnect force = false
  if @reconnecting and not force
    # wait 1 second after first reconnect attempt, in between each subsequent attempt
    EM.add_timer(1){ reconnect(true) }
    return
  end

  unless @reconnecting
    @reconnecting = true

    @deferred_status = nil
    initialize(@settings)

    mqs = @channels
    @channels = {}
    mqs.each{ |_,mq| mq.reset } if mqs
  end

  log 'reconnecting'
  EM.reconnect @settings[:host], @settings[:port], self
end
send(data, opts = {}) click to toggle source
# File lib/amqp/client.rb, line 126
def send data, opts = {}
  channel = opts[:channel] ||= 0
  data = data.to_frame(channel) unless data.is_a? Frame
  data.channel = channel

  log 'send', data
  send_data data.to_s
end
unbind() click to toggle source
# File lib/amqp/client.rb, line 94
def unbind
  log 'disconnected'
  @connected = false
  EM.next_tick{ @on_disconnect.call }
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.