# File lib/amqp/client.rb, line 187 def self.connect opts = {} opts = AMQP.settings.merge(opts) EM.connect opts[:host], opts[:port], self, opts end
# File lib/amqp/client.rb, line 61 def initialize opts = {} @settings = opts extend AMQP.client @on_disconnect ||= proc{ raise Error, "Could not connect to server #{opts[:host]}:#{opts[:port]}" } timeout @settings[:timeout] if @settings[:timeout] errback{ @on_disconnect.call } unless @reconnecting @connected = false end
# File lib/amqp/client.rb, line 100 def add_channel mq (@_channel_mutex ||= Mutex.new).synchronize do channels[ key = (channels.keys.max || 0) + 1 ] = mq key end end
# File lib/amqp/client.rb, line 107 def channels @channels ||= {} end
def send_data data
log 'send_data', data super
end
# File lib/amqp/client.rb, line 142 def close &on_disconnect if on_disconnect @closing = true @on_disconnect = proc{ on_disconnect.call @closing = false } end callback{ |c| if c.channels.any? c.channels.each do |ch, mq| mq.close end else send Protocol::Connection::Close.new(:reply_code => 200, :reply_text => 'Goodbye', :class_id => 0, :method_id => 0) end } end
# File lib/amqp/client.rb, line 90 def connected? @connected end
# File lib/amqp/client.rb, line 73 def connection_completed start_tls if @settings[:ssl] log 'connected' # @on_disconnect = proc{ raise Error, 'Disconnected from server' } unless @closing @on_disconnect = method(:disconnected) @reconnecting = false end @connected = true @connection_status.call(:connected) if @connection_status @buf = Buffer.new send_data HEADER send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4') end
# File lib/amqp/client.rb, line 192 def connection_status &blk @connection_status = blk end
# File lib/amqp/client.rb, line 121 def process_frame frame # this is a stub meant to be # replaced by the module passed into initialize end
# File lib/amqp/client.rb, line 111 def receive_data data # log 'receive_data', data @buf << data while frame = Frame.parse(@buf) log 'receive', frame process_frame frame end end
# File lib/amqp/client.rb, line 165 def reconnect force = false if @reconnecting and not force # wait 1 second after first reconnect attempt, in between each subsequent attempt EM.add_timer(1){ reconnect(true) } return end unless @reconnecting @reconnecting = true @deferred_status = nil initialize(@settings) mqs = @channels @channels = {} mqs.each{ |_,mq| mq.reset } if mqs end log 'reconnecting' EM.reconnect @settings[:host], @settings[:port], self end
Generated with the Darkfish Rdoc Generator 2.