class AMQP::Session

AMQP session represents connection to the broker. Session objects let you define callbacks for various TCP connection lifecycle events, for instance:

h2. Key methods

@api public

Constants

Deferrable

Backwards compatibility with 0.7.0.a25. MK.

Attributes

callbacks[R]

@return [Array<#call>]

channel_max[RW]

Maximum channel number that the server permits this connection to use. Usable channel numbers are in the range 1..channel_max. Zero indicates no specified limit.

@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.4.2.5.1 and 1.4.2.6.1)

channels[R]

Channels within this connection.

@see bit.ly/amqp091spec AMQP 0.9.1 specification (Section 2.2.5)

client_properties[RW]

Client capabilities

@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2.1)

frame_max[RW]

Maximum frame size that the server permits this connection to use.

@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.4.2.5.2 and 1.4.2.6.2)

known_hosts[R]
locale[RW]

The locale defines the language in which the server will send reply texts.

@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2)

logger[RW]

API

mechanism[R]

Authentication mechanism used.

@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2)

server_authentication_mechanisms[R]

Authentication mechanisms broker supports.

@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2)

server_capabilities[R]

Server capabilities

@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3)

server_locales[R]

Locales server supports

@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3)

server_properties[R]

Server properties

@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3)

settings[RW]

Public Class Methods

authentication_failure_exception_class() click to toggle source

Overrides authentication failure exception to one that inherits from AMQP::Error and thus is backwards compatible.

@private @api plugin @return [Class] AMQP::PossibleAuthenticationFailureError

# File lib/amqp/session.rb, line 432
def self.authentication_failure_exception_class
  @authentication_failure_exception_class ||= AMQP::PossibleAuthenticationFailureError
end
connect(settings = {}, &block) click to toggle source

Initiates connection to AMQP broker. If callback is given, runs it when (and if) AMQP connection succeeds.

@option settings [String] :host (“127.0.0.1”) Hostname AMQ broker runs on. @option settings [String] :port (5672) Port AMQ broker listens on. @option settings [String] :vhost (“/”) Virtual host to use. @option settings [String] :user (“guest”) Username to use for authentication. @option settings [String] :pass (“guest”) Password to use for authentication. @option settings [String] :auth_mechanism (“PLAIN”) SASL authentication mechanism to use. @option settings [String] :ssl (false) Should be use TLS (SSL) for connection? @option settings [String] :timeout (nil) Connection timeout. @option settings [Fixnum] :heartbeat (0) Connection heartbeat, in seconds. @option settings [Fixnum] :frame_max (131072) Maximum frame size to use. If broker cannot support frames this large, broker's maximum value will be used instead.

@param [Hash] settings

# File lib/amqp/session.rb, line 453
def self.connect(settings = {}, &block)
  @settings = Settings.configure(settings)

  instance = EventMachine.connect(@settings[:host], @settings[:port], self, @settings)
  instance.register_connection_callback(&block)

  instance
end
logger() click to toggle source
# File lib/amqp/session.rb, line 124
def logger
  @logger ||= begin
                require "logger"
                Logger.new(STDERR)
              end
end
logger=(logger) click to toggle source
# File lib/amqp/session.rb, line 131
def logger=(logger)
  methods = AMQP::Logging::REQUIRED_METHODS
  unless methods.all? { |method| logger.respond_to?(method) }
    raise AMQP::Logging::IncompatibleLoggerError.new(methods)
  end

  @logger = logger
end
logging() click to toggle source

@return [Boolean] Current value of logging flag.

# File lib/amqp/session.rb, line 141
def logging
  settings[:logging]
end
logging=(boolean) click to toggle source

Turns loggin on or off.

# File lib/amqp/session.rb, line 146
def logging=(boolean)
  settings[:logging] = boolean
end
new(*args, &block) click to toggle source

@group Connecting, reconnecting, disconnecting

Calls superclass method
# File lib/amqp/session.rb, line 155
def initialize(*args, &block)
  super(*args)

  self.logger   = self.class.logger

  # channel => collected frames. MK.
  @frames            = Hash.new { Array.new }
  @channels          = Hash.new
  @callbacks         = Hash.new

  opening!

  # track TCP connection state, used to detect initial TCP connection failures.
  @tcp_connection_established       = false
  @tcp_connection_failed            = false
  @intentionally_closing_connection = false

  # EventMachine::Connection's and Adapter's constructors arity
  # make it easier to use *args. MK.
  @settings                           = Settings.configure(args.first)

  @on_tcp_connection_failure          = Proc.new { |settings|
    closed!
    if cb = @settings[:on_tcp_connection_failure]
      cb.call(settings)
    else
      raise self.class.tcp_connection_failure_exception_class.new(settings)
    end
  }

  @on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings|
    raise self.class.authentication_failure_exception_class.new(settings)
  }

  @mechanism         = @settings.fetch(:auth_mechanism, "PLAIN")
  @locale            = @settings.fetch(:locale, "en_GB")
  @client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new))

  @auto_recovery     = (!!@settings[:auto_recovery])

  self.reset
  self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION)
end
settings() click to toggle source

Settings

# File lib/amqp/session.rb, line 120
def settings
  @settings ||= AMQP::Settings.default
end
tcp_connection_failure_exception_class() click to toggle source

Overrides TCP connection failure exception to one that inherits from AMQP::Error and thus is backwards compatible.

@private @api plugin @return [Class] AMQP::TCPConnectionFailed

# File lib/amqp/session.rb, line 421
def self.tcp_connection_failure_exception_class
  @tcp_connection_failure_exception_class ||= AMQP::TCPConnectionFailed
end

Public Instance Methods

after_connection_interruption(&block)
after_recovery(&block)
Alias for: on_recovery
auth_mechanism_adapter() click to toggle source

Retrieves an AuthMechanismAdapter that will encode credentials for this Adapter.

@api plugin

# File lib/amqp/session.rb, line 918
def auth_mechanism_adapter
  @auth_mechanism_adapter ||= AuthMechanismAdapter.for_adapter(self)
end
authenticating?() click to toggle source

Whether we are in authentication state (after TCP connection was estabilished but before broker authenticated us).

@return [Boolean] @api public

# File lib/amqp/session.rb, line 544
def authenticating?
  @authenticating
end
auto_recover() click to toggle source

Performs recovery of channels that are in the automatic recovery mode.

@see AMQP::Channel#auto_recover @see AMQP::Queue#auto_recover @see AMQP::Exchange#auto_recover @api plugin

# File lib/amqp/session.rb, line 391
def auto_recover
  @channels.select { |channel_id, ch| ch.auto_recovering? }.each { |n, ch| ch.auto_recover }
end
auto_recovering?() click to toggle source

@return [Boolean] whether connection is in the automatic recovery mode @api public

# File lib/amqp/session.rb, line 379
def auto_recovering?
  !!@auto_recovery
end
Also aliased as: auto_recovery?
auto_recovery?()
Alias for: auto_recovering?
before_recovery(&block) click to toggle source

Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amqp/session.rb, line 362
def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end
broker() click to toggle source

@return [AMQP::Broker] Broker this connection is established with

# File lib/amqp/session.rb, line 281
def broker
  @broker ||= AMQP::Broker.new(@server_properties)
end
broker_endpoint() click to toggle source

@return [String] Broker endpoint in the form of HOST:PORT/VHOST @api public

# File lib/amqp/session.rb, line 220
def broker_endpoint
  "#{self.hostname}:#{self.port}/#{self.vhost}"
end
cancel_heartbeat_sender() click to toggle source

@private

# File lib/amqp/session.rb, line 717
def cancel_heartbeat_sender
  @heartbeats_timer.cancel if @heartbeats_timer
end
clear_frames_on(channel_id) click to toggle source

Clears frames that were received but not processed on given channel. Needs to be called when the channel is closed. @private

# File lib/amqp/session.rb, line 968
def clear_frames_on(channel_id)
  raise ArgumentError, "channel id cannot be nil!" if channel_id.nil?

  @frames[channel_id].clear
end
close(reply_code = 200, reply_text = "Goodbye", &block)
Alias for: disconnect
close_connection(*args) click to toggle source

@private

Calls superclass method
# File lib/amqp/session.rb, line 621
def close_connection(*args)
  @intentionally_closing_connection = true

  super(*args)
end
connected?() click to toggle source

@return [Boolean] true if this AMQP connection is currently open @api plugin

# File lib/amqp/session.rb, line 201
def connected?
  self.opened?
end
connection_completed() click to toggle source

Called by EventMachine reactor once TCP connection is successfully estabilished. @private

# File lib/amqp/session.rb, line 591
def connection_completed
  # we only can safely set this value here because EventMachine is a lovely piece of
  # software that calls #post_init before #unbind even when TCP connection
  # fails. MK.
  @tcp_connection_established       = true
  @periodic_reconnection_timer.cancel if @periodic_reconnection_timer


  # again, this is because #unbind is called in different situations
  # and there is no easy way to tell initial connection failure
  # from connection loss. Not in EventMachine 0.12.x, anyway. MK.

  if @had_successfully_connected_before
    @recovered = true


    self.start_automatic_recovery
    self.upgrade_to_tls_if_necessary
  end

  # now we can set it. MK.
  @had_successfully_connected_before = true
  @reconnecting                      = false
  @handling_skipped_heartbeats       = false
  @last_server_heartbeat             = Time.now

  self.handshake
end
connection_successful() click to toggle source

Called by AMQP::Connection after we receive connection.open-ok. @api public

# File lib/amqp/session.rb, line 677
def connection_successful
  @authenticating = false
  opened!

  @connection_deferrable.succeed
end
disconnect(reply_code = 200, reply_text = "Goodbye", &block) click to toggle source

Properly close connection with AMQ broker, as described in section 2.2.4 of the {files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification}.

@api plugin @see close_connection

# File lib/amqp/session.rb, line 237
def disconnect(reply_code = 200, reply_text = "Goodbye", &block)
  @intentionally_closing_connection = true
  self.on_disconnection do
    @frames.clear
    block.call if block
  end

  # ruby-amqp/amqp#66, MK.
  if self.open?
    closing!
    self.send_frame(AMQ::Protocol::Connection::Close.encode(reply_code, reply_text, 0, 0))
  elsif self.closing?
    # no-op
  else
    self.disconnection_successful
  end
end
Also aliased as: close, close
disconnection_successful() click to toggle source

Called by AMQP::Connection after we receive connection.close-ok.

@api public

# File lib/amqp/session.rb, line 688
def disconnection_successful
  @disconnection_deferrable.succeed

  # true for "after writing buffered data"
  self.close_connection(true)
  self.reset
  closed!
end
encode_credentials(username, password) click to toggle source

@api plugin @see tools.ietf.org/rfc/rfc2595.txt RFC 2595

# File lib/amqp/session.rb, line 910
def encode_credentials(username, password)
  auth_mechanism_adapter.encode_credentials(username, password)
end
handle_close(conn_close) click to toggle source

Handles connection.close. When broker detects a connection level exception, this method is called.

@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.5.2.9)

# File lib/amqp/session.rb, line 1047
def handle_close(conn_close)
  closed!
  # getting connection.close during connection negotiation means authentication
  # has failed (RabbitMQ 3.2+):
  # http://www.rabbitmq.com/auth-notification.html
  if authenticating?
    @on_possible_authentication_failure.call(@settings) if @on_possible_authentication_failure
  end
  self.exec_callback_yielding_self(:error, conn_close)
end
handle_close_ok(close_ok) click to toggle source

Handles Connection.Close-Ok.

@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.10)

# File lib/amqp/session.rb, line 1063
def handle_close_ok(close_ok)
  closed!
  self.disconnection_successful
end
handle_connection_blocked(connection_blocked) click to toggle source
# File lib/amqp/session.rb, line 1069
def handle_connection_blocked(connection_blocked)
  @on_blocked.call(self, connection_blocked) if @on_blocked
end
handle_connection_interruption() click to toggle source

@private @api plugin

# File lib/amqp/session.rb, line 814
def handle_connection_interruption
  self.cancel_heartbeat_sender

  @channels.each { |n, c| c.handle_connection_interruption }
  self.exec_callback_yielding_self(:after_connection_interruption)
end
handle_connection_unblocked(connection_unblocked) click to toggle source
# File lib/amqp/session.rb, line 1073
def handle_connection_unblocked(connection_unblocked)
  @on_unblocked.call(self, connection_unblocked) if @on_unblocked
end
handle_open_ok(open_ok) click to toggle source

Handles Connection.Open-Ok.

@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.8.)

# File lib/amqp/session.rb, line 1035
def handle_open_ok(open_ok)
  @known_hosts = open_ok.known_hosts.dup.freeze

  opened!
  self.connection_successful if self.respond_to?(:connection_successful)
end
handle_skipped_heartbeats() click to toggle source

Called when time since last server heartbeat received is greater or equal to the heartbeat interval set via :heartbeat_interval option on connection.

@api plugin

# File lib/amqp/session.rb, line 701
def handle_skipped_heartbeats
  if !@handling_skipped_heartbeats && @tcp_connection_established && !@intentionally_closing_connection
    @handling_skipped_heartbeats = true
    self.cancel_heartbeat_sender

    self.run_skipped_heartbeats_callbacks
  end
end
handle_start(connection_start) click to toggle source

Handles connection.start.

@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.)

# File lib/amqp/session.rb, line 995
def handle_start(connection_start)
  @server_properties                = connection_start.server_properties
  @server_capabilities              = @server_properties["capabilities"]

  @server_authentication_mechanisms = (connection_start.mechanisms || "").split(" ")
  @server_locales                   = Array(connection_start.locales)

  username = @settings[:user] || @settings[:username]
  password = @settings[:pass] || @settings[:password]

  # It's not clear whether we should transition to :opening state here
  # or in #open but in case authentication fails, it would be strange to have
  # @status undefined. So lets do this. MK.
  opening!

  self.send_frame(AMQ::Protocol::Connection::StartOk.encode(@client_properties, mechanism, self.encode_credentials(username, password), @locale))
end
handle_tune(connection_tune) click to toggle source

Handles Connection.Tune-Ok.

@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.6)

# File lib/amqp/session.rb, line 1018
def handle_tune(connection_tune)
  @channel_max        = connection_tune.channel_max.freeze
  @frame_max          = connection_tune.frame_max.freeze

  client_heartbeat    = @settings[:heartbeat] || @settings[:heartbeat_interval] || 0

  @heartbeat_interval = negotiate_heartbeat_value(client_heartbeat, connection_tune.heartbeat)

  self.send_frame(AMQ::Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval))
  self.initialize_heartbeat_sender if heartbeats_enabled?
end
handshake() click to toggle source

Sends connection preamble to the broker. @api plugin

# File lib/amqp/session.rb, line 887
def handshake
  @authenticating = true
  self.send_preamble
end
heartbeat_interval() click to toggle source

Returns heartbeat interval this client uses, in seconds. This value may or may not be used depending on broker capabilities. Zero means the server does not want a heartbeat.

@return [Fixnum] Heartbeat interval this client uses, in seconds. @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.6)

# File lib/amqp/session.rb, line 766
def heartbeat_interval
  @heartbeat_interval
end
heartbeats_enabled?() click to toggle source

Returns true if heartbeats are enabled (heartbeat interval is greater than 0) @return [Boolean]

# File lib/amqp/session.rb, line 772
def heartbeats_enabled?
  @heartbeat_interval && (@heartbeat_interval > 0)
end
host()
Alias for: hostname
hostname() click to toggle source

@return [String] Broker hostname this connection uses @api public

# File lib/amqp/session.rb, line 207
def hostname
  @settings[:host]
end
Also aliased as: host
initialize_heartbeat_sender() click to toggle source

@private

# File lib/amqp/session.rb, line 711
def initialize_heartbeat_sender
  @last_server_heartbeat = Time.now
  @heartbeats_timer      = EventMachine::PeriodicTimer.new(self.heartbeat_interval, &method(:send_heartbeat))
end
on_blocked(&fn) click to toggle source

@group Blocked connection notifications

# File lib/amqp/session.rb, line 400
def on_blocked(&fn)
  @on_blocked = fn
end
on_closed(&block) click to toggle source

Defines a callback that will be run when broker confirms connection termination (client receives connection.close-ok). You can define more than one callback.

@see on_closed @api public

# File lib/amqp/session.rb, line 308
def on_closed(&block)
  @disconnection_deferrable.callback(&block)
end
Also aliased as: on_disconnection
on_connection(&block)
Alias for: on_open
on_connection_interruption(&block) click to toggle source

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amqp/session.rb, line 341
def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end
on_disconnection(&block)
Alias for: on_closed
on_error(&block) click to toggle source

Defines a callback that will be executed when connection is closed after connection-level exception. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amqp/session.rb, line 352
def on_error(&block)
  self.redefine_callback(:error, &block)
end
on_open(&block) click to toggle source

Defines a callback that will be executed when AMQP connection is considered open: after client and broker has agreed on max channel identifier and maximum allowed frame size and authentication succeeds. You can define more than one callback.

@see on_closed @api public

# File lib/amqp/session.rb, line 295
def on_open(&block)
  @connection_deferrable.callback(&block)
end
Also aliased as: on_connection
on_possible_authentication_failure(&block) click to toggle source

Defines a callback that will be run when TCP connection is closed before authentication finishes. Usually this means authentication failure. You can define only one callback.

@api public

# File lib/amqp/session.rb, line 333
def on_possible_authentication_failure(&block)
  @on_possible_authentication_failure = block
end
on_recovery(&block) click to toggle source

Defines a callback that will be executed after AMQP connection has recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amqp/session.rb, line 371
def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end
Also aliased as: after_recovery
on_skipped_heartbeats(&block) click to toggle source

Defines a callback that will be executed after time since last broker heartbeat is greater than or equal to the heartbeat interval (skipped heartbeat is detected). Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amqp/session.rb, line 867
def on_skipped_heartbeats(&block)
  self.redefine_callback(:skipped_heartbeats, &block)
end
on_tcp_connection_failure(&block) click to toggle source

Defines a callback that will be run when initial TCP connection fails. You can define only one callback.

@api public

# File lib/amqp/session.rb, line 317
def on_tcp_connection_failure(&block)
  @on_tcp_connection_failure = block
end
on_tcp_connection_loss(&block) click to toggle source

Defines a callback that will be run when TCP connection to AMQP broker is lost (interrupted). You can define only one callback.

@api public

# File lib/amqp/session.rb, line 325
def on_tcp_connection_loss(&block)
  @on_tcp_connection_loss = block
end
on_unblocked(&fn) click to toggle source
# File lib/amqp/session.rb, line 404
def on_unblocked(&fn)
  @on_unblocked = fn
end
open(vhost = "/") click to toggle source

Sends connection.open to the server.

@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.7)

# File lib/amqp/session.rb, line 897
def open(vhost = "/")
  self.send_frame(AMQ::Protocol::Connection::Open.encode(vhost))
end
periodically_reconnect(period = 5) click to toggle source

Periodically try to reconnect.

@param [Fixnum] period Period of time, in seconds, to wait before reconnection attempt. @param [Boolean] force If true, enforces immediate reconnection. @api public

# File lib/amqp/session.rb, line 511
def periodically_reconnect(period = 5)
  @reconnecting = true
  self.reset

  @periodic_reconnection_timer = EventMachine::PeriodicTimer.new(period) {
    EventMachine.reconnect(@settings[:host], @settings[:port], self)
  }
end
port() click to toggle source

@return [String] Broker port this connection uses @api public

# File lib/amqp/session.rb, line 214
def port
  @settings[:port]
end
post_init() click to toggle source

EventMachine reactor callback. Is run when TCP connection is estabilished but before resumption of the network loop. Note that this includes cases when TCP connection has failed. @private

# File lib/amqp/session.rb, line 573
def post_init
  reset

  # note that upgrading to TLS in #connection_completed causes
  # Erlang SSL app that RabbitMQ relies on to report
  # error on TCP connection <0.1465.0>:{ssl_upgrade_error,"record overflow"}
  # and close TCP connection down. Investigation of this issue is likely
  # to take some time and to not be worth in as long as #post_init
  # works fine. MK.
  upgrade_to_tls_if_necessary
rescue Exception => error
  raise error
end
receive_data(chunk) click to toggle source

EventMachine receives data in chunks, sometimes those chunks are smaller than the size of AMQP frame. That's why you need to add some kind of buffer.

@private

# File lib/amqp/session.rb, line 667
def receive_data(chunk)
  @chunk_buffer << chunk
  while frame = get_next_frame
    self.receive_frame(AMQP::Framing::String::Frame.decode(frame))
  end
end
receive_frame(frame) click to toggle source

Processes a single frame.

@param [AMQ::Protocol::Frame] frame @api plugin

# File lib/amqp/session.rb, line 927
def receive_frame(frame)
  @frames[frame.channel] ||= Array.new
  @frames[frame.channel] << frame

  if frameset_complete?(@frames[frame.channel])
    receive_frameset(@frames[frame.channel])
    # for channel.close, frame.channel will be nil. MK.
    clear_frames_on(frame.channel) if @frames[frame.channel]
  end
end
receive_frameset(frames) click to toggle source

Processes a frameset by finding and invoking a suitable handler. Heartbeat frames are treated in a special way: they simply update @last_server_heartbeat value.

@param [Array<AMQ::Protocol::Frame>] frames @api plugin

# File lib/amqp/session.rb, line 944
def receive_frameset(frames)
  if self.heartbeats_enabled?
    # treat incoming traffic as heartbeats.
    # this operation is pretty expensive under heavy traffic but heartbeats can be disabled
    # (and are also disabled by default). MK.
    @last_server_heartbeat = Time.now
  end
  frame = frames.first

  if AMQ::Protocol::HeartbeatFrame === frame
    # no-op
  else
    if callable = AMQP::HandlersRegistry.find(frame.method_class)
      f = frames.shift
      callable.call(self, f, frames)
    else
      raise MissingHandlerError.new(frames.first)
    end
  end
end
reconnect(force = false, period = 5) click to toggle source

Reconnect after a period of wait.

@param [Fixnum] period Period of time, in seconds, to wait before reconnection attempt. @param [Boolean] force If true, enforces immediate reconnection. @api public

# File lib/amqp/session.rb, line 467
def reconnect(force = false, period = 5)
  if @reconnecting and not force
    EventMachine::Timer.new(period) {
      reconnect(true, period)
    }
    return
  end

  if !@reconnecting
    @reconnecting = true
    self.reset
  end

  EventMachine.reconnect(@settings[:host], @settings[:port], self)
end
reconnect_to(connection_string_or_options, period = 5) click to toggle source

Similar to reconnect, but uses different connection settings @see reconnect @api public

# File lib/amqp/session.rb, line 486
def reconnect_to(connection_string_or_options, period = 5)
  settings = case connection_string_or_options
             when String then
               AMQP.parse_connection_uri(connection_string_or_options)
             when Hash then
               connection_string_or_options
             else
               Hash.new
             end

  if !@reconnecting
    @reconnecting = true
    self.reset
  end

  @settings = Settings.configure(settings)
  EventMachine.reconnect(@settings[:host], @settings[:port], self)
end
reconnecting?() click to toggle source

@return [Boolean]

# File lib/amqp/session.rb, line 808
def reconnecting?
  @reconnecting
end
register_connection_callback(&block) click to toggle source

@see on_open @private

# File lib/amqp/session.rb, line 524
def register_connection_callback(&block)
  unless block.nil?
    # delay calling block we were given till after we receive
    # connection.open-ok. Connection will notify us when
    # that happens.
    self.on_open do
      block.call(self)
    end
  end
end
reset_state!() click to toggle source

Resets connection state.

@api plugin

# File lib/amqp/session.rb, line 904
def reset_state!
  # no-op by default
end
run_after_recovery_callbacks() click to toggle source

@private

# File lib/amqp/session.rb, line 832
def run_after_recovery_callbacks
  self.exec_callback_yielding_self(:after_recovery, @settings)

  @channels.each { |n, ch| ch.run_after_recovery_callbacks }
end
run_before_recovery_callbacks() click to toggle source

@private

# File lib/amqp/session.rb, line 824
def run_before_recovery_callbacks
  self.exec_callback_yielding_self(:before_recovery, @settings)

  @channels.each { |n, ch| ch.run_before_recovery_callbacks }
end
run_skipped_heartbeats_callbacks() click to toggle source

@private

# File lib/amqp/session.rb, line 872
def run_skipped_heartbeats_callbacks
  self.exec_callback_yielding_self(:skipped_heartbeats, @settings)
end
send_frame(frame) click to toggle source

Sends frame to the peer, checking that connection is open.

@raise [ConnectionClosedError]

# File lib/amqp/session.rb, line 735
def send_frame(frame)
  if closed?
    raise ConnectionClosedError.new(frame)
  else
    self.send_raw(frame.encode)
  end
end
send_frameset(frames, channel) click to toggle source

Sends multiple frames, one by one. For thread safety this method takes a channel object and synchronizes on it.

@api public

# File lib/amqp/session.rb, line 747
def send_frameset(frames, channel)
  # some (many) developers end up sharing channels between threads and when multiple
  # threads publish on the same channel aggressively, at some point frames will be
  # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
  # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
  # locking. Note that "single frame" methods do not need this kind of synchronization. MK.
  channel.synchronize do
    frames.each { |frame| self.send_frame(frame) }
  end
end
send_heartbeat() click to toggle source

Sends a heartbeat frame if connection is open. @api plugin

# File lib/amqp/session.rb, line 976
def send_heartbeat
  if tcp_connection_established? && !@handling_skipped_heartbeats && @last_server_heartbeat
    if @last_server_heartbeat < (Time.now - (self.heartbeat_interval * 2)) && !reconnecting?
      logger.error "[amqp] Detected missing server heartbeats"
      self.handle_skipped_heartbeats
    end
    send_frame(AMQ::Protocol::HeartbeatFrame)
  end
end
send_preamble() click to toggle source

Sends AMQ protocol header (also known as preamble).

@note This must be implemented by all AMQP clients. @api plugin @see bit.ly/amqp091spec AMQP 0.9.1 specification (Section 2.2)

# File lib/amqp/session.rb, line 728
def send_preamble
  self.send_raw(AMQ::Protocol::PREAMBLE)
end
start_automatic_recovery() click to toggle source

Performs recovery of channels that are in the automatic recovery mode. “before recovery” callbacks are run immediately, “after recovery” callbacks are run after AMQP connection is re-established and auto recovery is performed (using auto_recover).

Use this method if you want to run automatic recovery process after handling a connection-level exception, for example, 320 CONNECTION_FORCED (used by RabbitMQ when it is shut down gracefully).

@see AMQP::Channel#auto_recover @see AMQP::Queue#auto_recover @see AMQP::Exchange#auto_recover @api plugin

# File lib/amqp/session.rb, line 850
def start_automatic_recovery
  self.run_before_recovery_callbacks
  self.register_connection_callback do
    # always run automatic recovery, because it is per-channel
    # and connection has to start it. Channels that did not opt-in for
    # autorecovery won't be selected. MK.
    self.auto_recover
    self.run_after_recovery_callbacks
  end
end
tcp_connection_established?() click to toggle source

IS TCP connection estabilished and currently active? @return [Boolean] @api public

# File lib/amqp/session.rb, line 551
def tcp_connection_established?
  @tcp_connection_established
end
tcp_connection_failed() click to toggle source

Called when initial TCP connection fails. @api public

# File lib/amqp/session.rb, line 792
def tcp_connection_failed
  @recovered = false

  @on_tcp_connection_failure.call(@settings) if @on_tcp_connection_failure
end
tcp_connection_lost() click to toggle source

Called when previously established TCP connection fails. @api public

# File lib/amqp/session.rb, line 800
def tcp_connection_lost
  @recovered = false

  @on_tcp_connection_loss.call(self, @settings) if @on_tcp_connection_loss
  self.handle_connection_interruption
end
unbind(exception = nil) click to toggle source

Called by EventMachine reactor when

  • We close TCP connection down

  • Our peer closes TCP connection down

  • There is a network connection issue

  • Initial TCP connection fails

@private

# File lib/amqp/session.rb, line 634
def unbind(exception = nil)
  if !@tcp_connection_established && !@had_successfully_connected_before && !@intentionally_closing_connection
    @tcp_connection_failed = true
    logger.error "[amqp] Detected TCP connection failure: #{exception}"
    self.tcp_connection_failed
  end

  closing!
  @tcp_connection_established = false

  self.handle_connection_interruption if @reconnecting
  @disconnection_deferrable.succeed

  closed!


  self.tcp_connection_lost if !@intentionally_closing_connection && @had_successfully_connected_before

  # since AMQP spec dictates that authentication failure is a protocol exception
  # and protocol exceptions result in connection closure, check whether we are
  # in the authentication stage. If so, it is likely to signal an authentication
  # issue. Java client behaves the same way. MK.
  if authenticating? && !@intentionally_closing_connection
    @on_possible_authentication_failure.call(@settings) if @on_possible_authentication_failure
  end
end
user()
Alias for: username
username() click to toggle source

@return [String] Username used by this connection @api public

# File lib/amqp/session.rb, line 226
def username
  @settings[:user]
end
Also aliased as: user
vhost() click to toggle source

vhost this connection uses. Default is “/”, a historically estabilished convention of RabbitMQ and amqp gem.

@return [String] vhost this connection uses @api public

# File lib/amqp/session.rb, line 782
def vhost
  @settings.fetch(:vhost, "/")
end

Protected Instance Methods

content_complete?(frames) click to toggle source

Determines, whether given frame array contains full content body

# File lib/amqp/session.rb, line 1115
def content_complete?(frames)
  return false if frames.empty?
  header = frames[0]
  raise "Not a content header frame first: #{header.inspect}" unless header.kind_of?(AMQ::Protocol::HeaderFrame)
  header.body_size == frames[1..-1].inject(0) {|sum, frame| sum + frame.payload.size }
end
frameset_complete?(frames) click to toggle source

Determines, whether the received frameset is ready to be further processed

# File lib/amqp/session.rb, line 1108
def frameset_complete?(frames)
  return false if frames.empty?
  first_frame = frames[0]
  first_frame.final? || (first_frame.method_class.has_content? && content_complete?(frames[1..-1]))
end
get_next_frame() click to toggle source

Returns next frame from buffer whenever possible

@api private

# File lib/amqp/session.rb, line 1090
def get_next_frame
  return nil unless @chunk_buffer.size > 7 # otherwise, cannot read the length
  # octet + short
  offset = 3 # 1 + 2
  # length
  payload_length = @chunk_buffer[offset, 4].unpack(AMQ::Protocol::PACK_UINT32).first
  # 4 bytes for long payload length, 1 byte final octet
  frame_length = offset + payload_length + 5
  if frame_length <= @chunk_buffer.size
    @chunk_buffer.slice!(0, frame_length)
  else
    nil
  end
end
negotiate_heartbeat_value(client_value, server_value) click to toggle source
# File lib/amqp/session.rb, line 1079
def negotiate_heartbeat_value(client_value, server_value)
  if client_value == 0 || server_value == 0
    [client_value, server_value].max
  else
    [client_value, server_value].min
  end
end
reset() click to toggle source
# File lib/amqp/session.rb, line 1159
def reset
  @size      = 0
  @payload   = ""
  @frames    = Array.new

  @chunk_buffer                 = ""
  @connection_deferrable        = EventMachine::DefaultDeferrable.new
  @disconnection_deferrable     = EventMachine::DefaultDeferrable.new

  # used to track down whether authentication succeeded. AMQP 0.9.1 dictates
  # that on authentication failure broker must close TCP connection without sending
  # any more data. This is why we need to explicitly track whether we are past
  # authentication stage to signal possible authentication failures.
  @authenticating           = false
end
upgrade_to_tls_if_necessary() click to toggle source
# File lib/amqp/session.rb, line 1175
def upgrade_to_tls_if_necessary
  tls_options = @settings[:ssl]

  if tls_options.is_a?(Hash)
    start_tls(tls_options)
  elsif tls_options
    start_tls
  end
end