Parent

Included Modules

Class/Module Index [+]

Quicksearch

AMQ::Client::Async::EventMachineClient

Constants

Deferrable

Backwards compatibility with 0.7.0.a25. MK.

Public Class Methods

connect(settings = {}, &block) click to toggle source

Initiates connection to AMQP broker. If callback is given, runs it when (and if) AMQP connection succeeds.

@option settings [String] :host ("127.0.0.1") Hostname AMQ broker runs on. @option settings [String] :port (5672) Port AMQ broker listens on. @option settings [String] :vhost ("/") Virtual host to use. @option settings [String] :user ("guest") Username to use for authentication. @option settings [String] :pass ("guest") Password to use for authentication. @option settings [String] :ssl (false) Should be use TLS (SSL) for connection? @option settings [String] :timeout (nil) Connection timeout. @option settings [String] :logging (false) Turns logging on or off. @option settings [String] :broker (nil) Broker name (use if you intend to use broker-specific features). @option settings [Fixnum] :frame_max (131072) Maximum frame size to use. If broker cannot support frames this large, broker's maximum value will be used instead.

@param [Hash] settings

# File lib/amq/client/async/adapters/event_machine.rb, line 40
def self.connect(settings = {}, &block)
  @settings = Settings.configure(settings)

  instance = EventMachine.connect(@settings[:host], @settings[:port], self, @settings)
  instance.register_connection_callback(&block)

  instance
end
new(*args) click to toggle source
# File lib/amq/client/async/adapters/event_machine.rb, line 139
def initialize(*args)
  super(*args)

  self.logger   = self.class.logger

  # channel => collected frames. MK.
  @frames            = Hash.new { Array.new }
  @channels          = Hash.new
  @callbacks         = Hash.new

  opening!

  # track TCP connection state, used to detect initial TCP connection failures.
  @tcp_connection_established       = false
  @tcp_connection_failed            = false
  @intentionally_closing_connection = false

  # EventMachine::Connection's and Adapter's constructors arity
  # make it easier to use *args. MK.
  @settings                           = Settings.configure(args.first)
  @on_tcp_connection_failure          = @settings[:on_tcp_connection_failure] || Proc.new { |settings|
    raise self.class.tcp_connection_failure_exception_class.new(settings)
  }
  @on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings|
    raise self.class.authentication_failure_exception_class.new(settings)
  }

  @mechanism         = "PLAIN"
  @locale            = @settings.fetch(:locale, "en_GB")
  @client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new))

  @auto_recovery     = (!!@settings[:auto_recovery])

  self.reset
  self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION)

  self.initialize_heartbeat_sender if self.heartbeats_enabled?
end

Public Instance Methods

authenticating?() click to toggle source

Whether we are in authentication state (after TCP connection was estabilished but before broker authenticated us).

@return [Boolean] @api public

# File lib/amq/client/async/adapters/event_machine.rb, line 196
def authenticating?
  @authenticating
end
close_connection(*args) click to toggle source

@private

# File lib/amq/client/async/adapters/event_machine.rb, line 275
def close_connection(*args)
  @intentionally_closing_connection = true

  super(*args)
end
connection_completed() click to toggle source

Called by EventMachine reactor once TCP connection is successfully estabilished. @private

# File lib/amq/client/async/adapters/event_machine.rb, line 243
def connection_completed
  # we only can safely set this value here because EventMachine is a lovely piece of
  # software that calls #post_init before #unbind even when TCP connection
  # fails. MK.
  @tcp_connection_established       = true
  @periodic_reconnection_timer.cancel if @periodic_reconnection_timer


  # again, this is because #unbind is called in different situations
  # and there is no easy way to tell initial connection failure
  # from connection loss. Not in EventMachine 0.12.x, anyway. MK.

  if @had_successfully_connected_before
    @recovered = true


    self.start_automatic_recovery
    self.upgrade_to_tls_if_necessary
  end

  # now we can set it. MK.
  @had_successfully_connected_before = true
  @reconnecting                      = false
  @handling_skipped_hearbeats        = false
  @last_server_heartbeat             = Time.now

  self.initialize_heartbeat_sender if self.heartbeat_interval > 0

  self.handshake
end
connection_successful() click to toggle source

Called by AMQ::Client::Connection after we receive connection.open-ok. @api public

# File lib/amq/client/async/adapters/event_machine.rb, line 331
def connection_successful
  @authenticating = false
  opened!

  @connection_deferrable.succeed
end
disconnection_successful() click to toggle source

Called by AMQ::Client::Connection after we receive connection.close-ok.

@api public

# File lib/amq/client/async/adapters/event_machine.rb, line 342
def disconnection_successful
  @disconnection_deferrable.succeed

  # true for "after writing buffered data"
  self.close_connection(true)
  self.reset
  closed!
end
establish_connection(settings) click to toggle source

For EventMachine adapter, this is a no-op. @api public

# File lib/amq/client/async/adapters/event_machine.rb, line 182
def establish_connection(settings)
  # Unfortunately there doesn't seem to be any sane way
  # how to get EventMachine connect to the instance level.
end
handle_skipped_hearbeats() click to toggle source

Called when time since last server heartbeat received is greater or equal to the heartbeat interval set via :heartbeat_interval option on connection.

@api plugin

# File lib/amq/client/async/adapters/event_machine.rb, line 355
def handle_skipped_hearbeats
  if !@handling_skipped_hearbeats && @tcp_connection_established && !@intentionally_closing_connection
    @handling_skipped_hearbeats = true
    @heartbeats_timer.cancel

    self.run_skipped_heartbeats_callbacks
  end
end
initialize_heartbeat_sender() click to toggle source

@private

# File lib/amq/client/async/adapters/event_machine.rb, line 365
def initialize_heartbeat_sender
  @last_server_heartbeat = Time.now
  @heartbeats_timer      = EventMachine::PeriodicTimer.new(self.heartbeat_interval, &method(:send_heartbeat))
end
on_closed(&block) click to toggle source

Defines a callback that will be run when broker confirms connection termination (client receives connection.close-ok). You can define more than one callback.

@api public

# File lib/amq/client/async/adapters/event_machine.rb, line 118
def on_closed(&block)
  @disconnection_deferrable.callback(&block)
end
Also aliased as: on_disconnection
on_connection(&block) click to toggle source
Alias for: on_open
on_disconnection(&block) click to toggle source
Alias for: on_closed
on_open(&block) click to toggle source

Defines a callback that will be executed when AMQP connection is considered open: client and broker has agreed on max channel identifier and maximum allowed frame size and authentication succeeds. You can define more than one callback.

@see on_possible_authentication_failure @api public

# File lib/amq/client/async/adapters/event_machine.rb, line 109
def on_open(&block)
  @connection_deferrable.callback(&block)
end
Also aliased as: on_connection
periodically_reconnect(period = 5) click to toggle source

Periodically try to reconnect.

@param [Fixnum] period Period of time, in seconds, to wait before reconnection attempt. @param [Boolean] force If true, enforces immediate reconnection. @api public

# File lib/amq/client/async/adapters/event_machine.rb, line 89
def periodically_reconnect(period = 5)
  @reconnecting = true
  self.reset

  @periodic_reconnection_timer = EventMachine::PeriodicTimer.new(period) {
    EventMachine.reconnect(@settings[:host], @settings[:port], self)
  }
end
post_init() click to toggle source

EventMachine reactor callback. Is run when TCP connection is estabilished but before resumption of the network loop. Note that this includes cases when TCP connection has failed. @private

# File lib/amq/client/async/adapters/event_machine.rb, line 225
def post_init
  reset

  # note that upgrading to TLS in #connection_completed causes
  # Erlang SSL app that RabbitMQ relies on to report
  # error on TCP connection <0.1465.0>:{ssl_upgrade_error,"record overflow"}
  # and close TCP connection down. Investigation of this issue is likely
  # to take some time and to not be worth in as long as #post_init
  # works fine. MK.
  upgrade_to_tls_if_necessary
rescue Exception => error
  raise error
end
receive_data(chunk) click to toggle source

EventMachine receives data in chunks, sometimes those chunks are smaller than the size of AMQP frame. That's why you need to add some kind of buffer.

@private

# File lib/amq/client/async/adapters/event_machine.rb, line 321
def receive_data(chunk)
  @chunk_buffer << chunk
  while frame = get_next_frame
    self.receive_frame(AMQ::Client::Framing::String::Frame.decode(frame))
  end
end
reconnect(force = false, period = 5) click to toggle source

Reconnect after a period of wait.

@param [Fixnum] period Period of time, in seconds, to wait before reconnection attempt. @param [Boolean] force If true, enforces immediate reconnection. @api public

# File lib/amq/client/async/adapters/event_machine.rb, line 54
def reconnect(force = false, period = 5)
  if @reconnecting and not force
    EventMachine::Timer.new(period) {
      reconnect(true, period)
    }
    return
  end

  if !@reconnecting
    @reconnecting = true
    self.reset
  end

  EventMachine.reconnect(@settings[:host], @settings[:port], self)
end
reconnect_to(settings, period = 5) click to toggle source

Similar to reconnect, but uses different connection settings @see reconnect @api public

# File lib/amq/client/async/adapters/event_machine.rb, line 73
def reconnect_to(settings, period = 5)
  if !@reconnecting
    @reconnecting = true
    self.reset
  end

  @settings = Settings.configure(settings)
  EventMachine.reconnect(@settings[:host], @settings[:port], self)
end
register_connection_callback(&block) click to toggle source

@see on_open @private

# File lib/amq/client/async/adapters/event_machine.rb, line 125
def register_connection_callback(&block)
  unless block.nil?
    # delay calling block we were given till after we receive
    # connection.open-ok. Connection will notify us when
    # that happens.
    self.on_open do
      block.call(self)
    end
  end
end
tcp_connection_established?() click to toggle source

IS TCP connection estabilished and currently active? @return [Boolean] @api public

# File lib/amq/client/async/adapters/event_machine.rb, line 203
def tcp_connection_established?
  @tcp_connection_established
end
unbind(exception = nil) click to toggle source

Called by EventMachine reactor when

  • We close TCP connection down

  • Our peer closes TCP connection down

  • There is a network connection issue

  • Initial TCP connection fails

@private

# File lib/amq/client/async/adapters/event_machine.rb, line 288
def unbind(exception = nil)
  if !@tcp_connection_established && !@had_successfully_connected_before && !@intentionally_closing_connection
    @tcp_connection_failed = true
    logger.error "[amqp] Detected TCP connection failure"
    self.tcp_connection_failed
  end

  closing!
  @tcp_connection_established = false

  self.handle_connection_interruption if @reconnecting
  @disconnection_deferrable.succeed

  closed!


  self.tcp_connection_lost if !@intentionally_closing_connection && @had_successfully_connected_before

  # since AMQP spec dictates that authentication failure is a protocol exception
  # and protocol exceptions result in connection closure, check whether we are
  # in the authentication stage. If so, it is likely to signal an authentication
  # issue. Java client behaves the same way. MK.
  if authenticating? && !@intentionally_closing_connection
    @on_possible_authentication_failure.call(@settings) if @on_possible_authentication_failure
  end
end

Protected Instance Methods

reset() click to toggle source
# File lib/amq/client/async/adapters/event_machine.rb, line 400
def reset
  @size      = 0
  @payload   = ""
  @frames    = Array.new

  @chunk_buffer                 = ""
  @connection_deferrable        = EventMachine::DefaultDeferrable.new
  @disconnection_deferrable     = EventMachine::DefaultDeferrable.new

  # used to track down whether authentication succeeded. AMQP 0.9.1 dictates
  # that on authentication failure broker must close TCP connection without sending
  # any more data. This is why we need to explicitly track whether we are past
  # authentication stage to signal possible authentication failures.
  @authenticating           = false
end
upgrade_to_tls_if_necessary() click to toggle source
# File lib/amq/client/async/adapters/event_machine.rb, line 416
def upgrade_to_tls_if_necessary
  tls_options = @settings[:ssl]

  if tls_options.is_a?(Hash)
    start_tls(tls_options)
  elsif tls_options
    start_tls
  end
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.