class AMQP::Session
AMQP session represents connection to the broker. Session objects let you define callbacks for various TCP connection lifecycle events, for instance:
-
Connection is established
-
Connection has failed
-
Authentication has failed
-
Connection is lost (there is a network failure)
-
AMQP connection is opened
-
AMQP connection parameters (tuning) are negotiated and accepted by the broker
-
AMQP connection is properly closed
h2. Key methods
-
{Session#on_connection}
-
{Session#on_open}
-
{Session#on_disconnection}
-
{Session#on_possible_authentication_failure}
-
{Session#on_tcp_connection_failure}
-
{Session#on_tcp_connection_loss}
-
{Session#reconnect}
-
{Session#connected?}
@api public
Constants
- Deferrable
Backwards compatibility with 0.7.0.a25. MK.
Attributes
@return [Array<#call>]
Maximum channel number that the server permits this connection to use. Usable channel numbers are in the range 1..channel_max. Zero indicates no specified limit.
@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.4.2.5.1 and 1.4.2.6.1)
Channels within this connection.
@see bit.ly/amqp091spec AMQP 0.9.1 specification (Section 2.2.5)
Client capabilities
@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2.1)
Maximum frame size that the server permits this connection to use.
@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.4.2.5.2 and 1.4.2.6.2)
The locale defines the language in which the server will send reply texts.
@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2)
API
Authentication mechanism used.
@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2)
Authentication mechanisms broker supports.
@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2)
Server capabilities
@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3)
Locales server supports
@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3)
Server properties
@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3)
Public Class Methods
Overrides authentication failure exception to one that inherits from AMQP::Error and thus is backwards compatible.
@private @api plugin @return [Class] AMQP::PossibleAuthenticationFailureError
# File lib/amqp/session.rb, line 432 def self.authentication_failure_exception_class @authentication_failure_exception_class ||= AMQP::PossibleAuthenticationFailureError end
Initiates connection to AMQP broker. If callback is given, runs it when (and if) AMQP connection succeeds.
@option settings [String] :host (“127.0.0.1”) Hostname AMQ broker runs on. @option settings [String] :port (5672) Port AMQ broker listens on. @option settings [String] :vhost (“/”) Virtual host to use. @option settings [String] :user (“guest”) Username to use for authentication. @option settings [String] :pass (“guest”) Password to use for authentication. @option settings [String] :auth_mechanism (“PLAIN”) SASL authentication mechanism to use. @option settings [String] :ssl (false) Should be use TLS (SSL) for connection? @option settings [String] :timeout (nil) Connection timeout. @option settings [Fixnum] :heartbeat (0) Connection heartbeat, in seconds. @option settings [Fixnum] :frame_max (131072) Maximum frame size to use. If broker cannot support frames this large, broker's maximum value will be used instead.
@param [Hash] settings
# File lib/amqp/session.rb, line 453 def self.connect(settings = {}, &block) @settings = Settings.configure(settings) instance = EventMachine.connect(@settings[:host], @settings[:port], self, @settings) instance.register_connection_callback(&block) instance end
# File lib/amqp/session.rb, line 124 def logger @logger ||= begin require "logger" Logger.new(STDERR) end end
# File lib/amqp/session.rb, line 131 def logger=(logger) methods = AMQP::Logging::REQUIRED_METHODS unless methods.all? { |method| logger.respond_to?(method) } raise AMQP::Logging::IncompatibleLoggerError.new(methods) end @logger = logger end
@return [Boolean] Current value of logging flag.
# File lib/amqp/session.rb, line 141 def logging settings[:logging] end
Turns loggin on or off.
# File lib/amqp/session.rb, line 146 def logging=(boolean) settings[:logging] = boolean end
@group Connecting, reconnecting, disconnecting
# File lib/amqp/session.rb, line 155 def initialize(*args, &block) super(*args) self.logger = self.class.logger # channel => collected frames. MK. @frames = Hash.new { Array.new } @channels = Hash.new @callbacks = Hash.new opening! # track TCP connection state, used to detect initial TCP connection failures. @tcp_connection_established = false @tcp_connection_failed = false @intentionally_closing_connection = false # EventMachine::Connection's and Adapter's constructors arity # make it easier to use *args. MK. @settings = Settings.configure(args.first) @on_tcp_connection_failure = Proc.new { |settings| closed! if cb = @settings[:on_tcp_connection_failure] cb.call(settings) else raise self.class.tcp_connection_failure_exception_class.new(settings) end } @on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings| raise self.class.authentication_failure_exception_class.new(settings) } @mechanism = @settings.fetch(:auth_mechanism, "PLAIN") @locale = @settings.fetch(:locale, "en_GB") @client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new)) @auto_recovery = (!!@settings[:auto_recovery]) self.reset self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION) end
# File lib/amqp/session.rb, line 120 def settings @settings ||= AMQP::Settings.default end
Overrides TCP connection failure exception to one that inherits from AMQP::Error and thus is backwards compatible.
@private @api plugin @return [Class] AMQP::TCPConnectionFailed
# File lib/amqp/session.rb, line 421 def self.tcp_connection_failure_exception_class @tcp_connection_failure_exception_class ||= AMQP::TCPConnectionFailed end
Public Instance Methods
Retrieves an AuthMechanismAdapter that will encode credentials for this Adapter.
@api plugin
# File lib/amqp/session.rb, line 918 def auth_mechanism_adapter @auth_mechanism_adapter ||= AuthMechanismAdapter.for_adapter(self) end
Whether we are in authentication state (after TCP connection was estabilished but before broker authenticated us).
@return [Boolean] @api public
# File lib/amqp/session.rb, line 544 def authenticating? @authenticating end
Performs recovery of channels that are in the automatic recovery mode.
@see AMQP::Channel#auto_recover @see AMQP::Queue#auto_recover @see AMQP::Exchange#auto_recover @api plugin
# File lib/amqp/session.rb, line 391 def auto_recover @channels.select { |channel_id, ch| ch.auto_recovering? }.each { |n, ch| ch.auto_recover } end
@return [Boolean] whether connection is in the automatic recovery mode @api public
# File lib/amqp/session.rb, line 379 def auto_recovering? !!@auto_recovery end
Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/session.rb, line 362 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end
@return [AMQP::Broker] Broker this connection is established with
# File lib/amqp/session.rb, line 281 def broker @broker ||= AMQP::Broker.new(@server_properties) end
@return [String] Broker endpoint in the form of HOST:PORT/VHOST @api public
# File lib/amqp/session.rb, line 220 def broker_endpoint "#{self.hostname}:#{self.port}/#{self.vhost}" end
@private
# File lib/amqp/session.rb, line 717 def cancel_heartbeat_sender @heartbeats_timer.cancel if @heartbeats_timer end
Clears frames that were received but not processed on given channel. Needs to be called when the channel is closed. @private
# File lib/amqp/session.rb, line 968 def clear_frames_on(channel_id) raise ArgumentError, "channel id cannot be nil!" if channel_id.nil? @frames[channel_id].clear end
@private
# File lib/amqp/session.rb, line 621 def close_connection(*args) @intentionally_closing_connection = true super(*args) end
@return [Boolean] true if this AMQP connection is currently open @api plugin
# File lib/amqp/session.rb, line 201 def connected? self.opened? end
Called by EventMachine reactor once TCP connection is successfully estabilished. @private
# File lib/amqp/session.rb, line 591 def connection_completed # we only can safely set this value here because EventMachine is a lovely piece of # software that calls #post_init before #unbind even when TCP connection # fails. MK. @tcp_connection_established = true @periodic_reconnection_timer.cancel if @periodic_reconnection_timer # again, this is because #unbind is called in different situations # and there is no easy way to tell initial connection failure # from connection loss. Not in EventMachine 0.12.x, anyway. MK. if @had_successfully_connected_before @recovered = true self.start_automatic_recovery self.upgrade_to_tls_if_necessary end # now we can set it. MK. @had_successfully_connected_before = true @reconnecting = false @handling_skipped_heartbeats = false @last_server_heartbeat = Time.now self.handshake end
Called by AMQP::Connection after we receive connection.open-ok. @api public
# File lib/amqp/session.rb, line 677 def connection_successful @authenticating = false opened! @connection_deferrable.succeed end
Properly close connection with AMQ broker, as described in section 2.2.4 of the {files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification}.
@api plugin @see close_connection
# File lib/amqp/session.rb, line 237 def disconnect(reply_code = 200, reply_text = "Goodbye", &block) @intentionally_closing_connection = true self.on_disconnection do @frames.clear block.call if block end # ruby-amqp/amqp#66, MK. if self.open? closing! self.send_frame(AMQ::Protocol::Connection::Close.encode(reply_code, reply_text, 0, 0)) elsif self.closing? # no-op else self.disconnection_successful end end
Called by AMQP::Connection after we receive connection.close-ok.
@api public
# File lib/amqp/session.rb, line 688 def disconnection_successful @disconnection_deferrable.succeed # true for "after writing buffered data" self.close_connection(true) self.reset closed! end
@api plugin @see tools.ietf.org/rfc/rfc2595.txt RFC 2595
# File lib/amqp/session.rb, line 910 def encode_credentials(username, password) auth_mechanism_adapter.encode_credentials(username, password) end
Handles connection.close. When broker detects a connection level exception, this method is called.
@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.5.2.9)
# File lib/amqp/session.rb, line 1047 def handle_close(conn_close) closed! # getting connection.close during connection negotiation means authentication # has failed (RabbitMQ 3.2+): # http://www.rabbitmq.com/auth-notification.html if authenticating? @on_possible_authentication_failure.call(@settings) if @on_possible_authentication_failure end self.exec_callback_yielding_self(:error, conn_close) end
Handles Connection.Close-Ok.
@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.10)
# File lib/amqp/session.rb, line 1063 def handle_close_ok(close_ok) closed! self.disconnection_successful end
# File lib/amqp/session.rb, line 1069 def handle_connection_blocked(connection_blocked) @on_blocked.call(self, connection_blocked) if @on_blocked end
@private @api plugin
# File lib/amqp/session.rb, line 814 def handle_connection_interruption self.cancel_heartbeat_sender @channels.each { |n, c| c.handle_connection_interruption } self.exec_callback_yielding_self(:after_connection_interruption) end
# File lib/amqp/session.rb, line 1073 def handle_connection_unblocked(connection_unblocked) @on_unblocked.call(self, connection_unblocked) if @on_unblocked end
Handles Connection.Open-Ok.
@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.8.)
# File lib/amqp/session.rb, line 1035 def handle_open_ok(open_ok) @known_hosts = open_ok.known_hosts.dup.freeze opened! self.connection_successful if self.respond_to?(:connection_successful) end
Called when time since last server heartbeat received is greater or equal to the heartbeat interval set via :heartbeat_interval option on connection.
@api plugin
# File lib/amqp/session.rb, line 701 def handle_skipped_heartbeats if !@handling_skipped_heartbeats && @tcp_connection_established && !@intentionally_closing_connection @handling_skipped_heartbeats = true self.cancel_heartbeat_sender self.run_skipped_heartbeats_callbacks end end
Handles connection.start.
@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.)
# File lib/amqp/session.rb, line 995 def handle_start(connection_start) @server_properties = connection_start.server_properties @server_capabilities = @server_properties["capabilities"] @server_authentication_mechanisms = (connection_start.mechanisms || "").split(" ") @server_locales = Array(connection_start.locales) username = @settings[:user] || @settings[:username] password = @settings[:pass] || @settings[:password] # It's not clear whether we should transition to :opening state here # or in #open but in case authentication fails, it would be strange to have # @status undefined. So lets do this. MK. opening! self.send_frame(AMQ::Protocol::Connection::StartOk.encode(@client_properties, mechanism, self.encode_credentials(username, password), @locale)) end
Handles Connection.Tune-Ok.
@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.6)
# File lib/amqp/session.rb, line 1018 def handle_tune(connection_tune) @channel_max = connection_tune.channel_max.freeze @frame_max = connection_tune.frame_max.freeze client_heartbeat = @settings[:heartbeat] || @settings[:heartbeat_interval] || 0 @heartbeat_interval = negotiate_heartbeat_value(client_heartbeat, connection_tune.heartbeat) self.send_frame(AMQ::Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval)) self.initialize_heartbeat_sender if heartbeats_enabled? end
Sends connection preamble to the broker. @api plugin
# File lib/amqp/session.rb, line 887 def handshake @authenticating = true self.send_preamble end
Returns heartbeat interval this client uses, in seconds. This value may or may not be used depending on broker capabilities. Zero means the server does not want a heartbeat.
@return [Fixnum] Heartbeat interval this client uses, in seconds. @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.6)
# File lib/amqp/session.rb, line 766 def heartbeat_interval @heartbeat_interval end
Returns true if heartbeats are enabled (heartbeat interval is greater than 0) @return [Boolean]
# File lib/amqp/session.rb, line 772 def heartbeats_enabled? @heartbeat_interval && (@heartbeat_interval > 0) end
@return [String] Broker hostname this connection uses @api public
# File lib/amqp/session.rb, line 207 def hostname @settings[:host] end
@private
# File lib/amqp/session.rb, line 711 def initialize_heartbeat_sender @last_server_heartbeat = Time.now @heartbeats_timer = EventMachine::PeriodicTimer.new(self.heartbeat_interval, &method(:send_heartbeat)) end
@group Blocked connection notifications
# File lib/amqp/session.rb, line 400 def on_blocked(&fn) @on_blocked = fn end
Defines a callback that will be run when broker confirms connection termination (client receives connection.close-ok). You can define more than one callback.
@see on_closed @api public
# File lib/amqp/session.rb, line 308 def on_closed(&block) @disconnection_deferrable.callback(&block) end
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/session.rb, line 341 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end
Defines a callback that will be executed when connection is closed after connection-level exception. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/session.rb, line 352 def on_error(&block) self.redefine_callback(:error, &block) end
Defines a callback that will be executed when AMQP connection is considered open: after client and broker has agreed on max channel identifier and maximum allowed frame size and authentication succeeds. You can define more than one callback.
@see on_closed @api public
# File lib/amqp/session.rb, line 295 def on_open(&block) @connection_deferrable.callback(&block) end
Defines a callback that will be run when TCP connection is closed before authentication finishes. Usually this means authentication failure. You can define only one callback.
@api public
# File lib/amqp/session.rb, line 333 def on_possible_authentication_failure(&block) @on_possible_authentication_failure = block end
Defines a callback that will be executed after AMQP connection has recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/session.rb, line 371 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end
Defines a callback that will be executed after time since last broker heartbeat is greater than or equal to the heartbeat interval (skipped heartbeat is detected). Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/session.rb, line 867 def on_skipped_heartbeats(&block) self.redefine_callback(:skipped_heartbeats, &block) end
Defines a callback that will be run when initial TCP connection fails. You can define only one callback.
@api public
# File lib/amqp/session.rb, line 317 def on_tcp_connection_failure(&block) @on_tcp_connection_failure = block end
Defines a callback that will be run when TCP connection to AMQP broker is lost (interrupted). You can define only one callback.
@api public
# File lib/amqp/session.rb, line 325 def on_tcp_connection_loss(&block) @on_tcp_connection_loss = block end
# File lib/amqp/session.rb, line 404 def on_unblocked(&fn) @on_unblocked = fn end
Sends connection.open to the server.
@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.7)
# File lib/amqp/session.rb, line 897 def open(vhost = "/") self.send_frame(AMQ::Protocol::Connection::Open.encode(vhost)) end
Periodically try to reconnect.
@param [Fixnum] period Period of time, in seconds, to wait before reconnection attempt. @param [Boolean] force If true, enforces immediate reconnection. @api public
# File lib/amqp/session.rb, line 511 def periodically_reconnect(period = 5) @reconnecting = true self.reset @periodic_reconnection_timer = EventMachine::PeriodicTimer.new(period) { EventMachine.reconnect(@settings[:host], @settings[:port], self) } end
@return [String] Broker port this connection uses @api public
# File lib/amqp/session.rb, line 214 def port @settings[:port] end
EventMachine reactor callback. Is run when TCP connection is estabilished but before resumption of the network loop. Note that this includes cases when TCP connection has failed. @private
# File lib/amqp/session.rb, line 573 def post_init reset # note that upgrading to TLS in #connection_completed causes # Erlang SSL app that RabbitMQ relies on to report # error on TCP connection <0.1465.0>:{ssl_upgrade_error,"record overflow"} # and close TCP connection down. Investigation of this issue is likely # to take some time and to not be worth in as long as #post_init # works fine. MK. upgrade_to_tls_if_necessary rescue Exception => error raise error end
EventMachine receives data in chunks, sometimes those chunks are smaller than the size of AMQP frame. That's why you need to add some kind of buffer.
@private
# File lib/amqp/session.rb, line 667 def receive_data(chunk) @chunk_buffer << chunk while frame = get_next_frame self.receive_frame(AMQP::Framing::String::Frame.decode(frame)) end end
Processes a single frame.
@param [AMQ::Protocol::Frame] frame @api plugin
# File lib/amqp/session.rb, line 927 def receive_frame(frame) @frames[frame.channel] ||= Array.new @frames[frame.channel] << frame if frameset_complete?(@frames[frame.channel]) receive_frameset(@frames[frame.channel]) # for channel.close, frame.channel will be nil. MK. clear_frames_on(frame.channel) if @frames[frame.channel] end end
Processes a frameset by finding and invoking a suitable handler. Heartbeat frames are treated in a special way: they simply update @last_server_heartbeat value.
@param [Array<AMQ::Protocol::Frame>] frames @api plugin
# File lib/amqp/session.rb, line 944 def receive_frameset(frames) if self.heartbeats_enabled? # treat incoming traffic as heartbeats. # this operation is pretty expensive under heavy traffic but heartbeats can be disabled # (and are also disabled by default). MK. @last_server_heartbeat = Time.now end frame = frames.first if AMQ::Protocol::HeartbeatFrame === frame # no-op else if callable = AMQP::HandlersRegistry.find(frame.method_class) f = frames.shift callable.call(self, f, frames) else raise MissingHandlerError.new(frames.first) end end end
Reconnect after a period of wait.
@param [Fixnum] period Period of time, in seconds, to wait before reconnection attempt. @param [Boolean] force If true, enforces immediate reconnection. @api public
# File lib/amqp/session.rb, line 467 def reconnect(force = false, period = 5) if @reconnecting and not force EventMachine::Timer.new(period) { reconnect(true, period) } return end if !@reconnecting @reconnecting = true self.reset end EventMachine.reconnect(@settings[:host], @settings[:port], self) end
Similar to reconnect, but uses different connection settings @see reconnect @api public
# File lib/amqp/session.rb, line 486 def reconnect_to(connection_string_or_options, period = 5) settings = case connection_string_or_options when String then AMQP.parse_connection_uri(connection_string_or_options) when Hash then connection_string_or_options else Hash.new end if !@reconnecting @reconnecting = true self.reset end @settings = Settings.configure(settings) EventMachine.reconnect(@settings[:host], @settings[:port], self) end
@return [Boolean]
# File lib/amqp/session.rb, line 808 def reconnecting? @reconnecting end
@see on_open @private
# File lib/amqp/session.rb, line 524 def register_connection_callback(&block) unless block.nil? # delay calling block we were given till after we receive # connection.open-ok. Connection will notify us when # that happens. self.on_open do block.call(self) end end end
Resets connection state.
@api plugin
# File lib/amqp/session.rb, line 904 def reset_state! # no-op by default end
@private
# File lib/amqp/session.rb, line 832 def run_after_recovery_callbacks self.exec_callback_yielding_self(:after_recovery, @settings) @channels.each { |n, ch| ch.run_after_recovery_callbacks } end
@private
# File lib/amqp/session.rb, line 824 def run_before_recovery_callbacks self.exec_callback_yielding_self(:before_recovery, @settings) @channels.each { |n, ch| ch.run_before_recovery_callbacks } end
@private
# File lib/amqp/session.rb, line 872 def run_skipped_heartbeats_callbacks self.exec_callback_yielding_self(:skipped_heartbeats, @settings) end
Sends frame to the peer, checking that connection is open.
@raise [ConnectionClosedError]
# File lib/amqp/session.rb, line 735 def send_frame(frame) if closed? raise ConnectionClosedError.new(frame) else self.send_raw(frame.encode) end end
Sends multiple frames, one by one. For thread safety this method takes a channel object and synchronizes on it.
@api public
# File lib/amqp/session.rb, line 747 def send_frameset(frames, channel) # some (many) developers end up sharing channels between threads and when multiple # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained # locking. Note that "single frame" methods do not need this kind of synchronization. MK. channel.synchronize do frames.each { |frame| self.send_frame(frame) } end end
Sends a heartbeat frame if connection is open. @api plugin
# File lib/amqp/session.rb, line 976 def send_heartbeat if tcp_connection_established? && !@handling_skipped_heartbeats && @last_server_heartbeat if @last_server_heartbeat < (Time.now - (self.heartbeat_interval * 2)) && !reconnecting? logger.error "[amqp] Detected missing server heartbeats" self.handle_skipped_heartbeats end send_frame(AMQ::Protocol::HeartbeatFrame) end end
Sends AMQ protocol header (also known as preamble).
@note This must be implemented by all AMQP clients. @api plugin @see bit.ly/amqp091spec AMQP 0.9.1 specification (Section 2.2)
# File lib/amqp/session.rb, line 728 def send_preamble self.send_raw(AMQ::Protocol::PREAMBLE) end
Performs recovery of channels that are in the automatic recovery mode. “before recovery” callbacks are run immediately, “after recovery” callbacks are run after AMQP connection is re-established and auto recovery is performed (using auto_recover).
Use this method if you want to run automatic recovery process after handling a connection-level exception, for example, 320 CONNECTION_FORCED (used by RabbitMQ when it is shut down gracefully).
@see AMQP::Channel#auto_recover @see AMQP::Queue#auto_recover @see AMQP::Exchange#auto_recover @api plugin
# File lib/amqp/session.rb, line 850 def start_automatic_recovery self.run_before_recovery_callbacks self.register_connection_callback do # always run automatic recovery, because it is per-channel # and connection has to start it. Channels that did not opt-in for # autorecovery won't be selected. MK. self.auto_recover self.run_after_recovery_callbacks end end
IS TCP connection estabilished and currently active? @return [Boolean] @api public
# File lib/amqp/session.rb, line 551 def tcp_connection_established? @tcp_connection_established end
Called when initial TCP connection fails. @api public
# File lib/amqp/session.rb, line 792 def tcp_connection_failed @recovered = false @on_tcp_connection_failure.call(@settings) if @on_tcp_connection_failure end
Called when previously established TCP connection fails. @api public
# File lib/amqp/session.rb, line 800 def tcp_connection_lost @recovered = false @on_tcp_connection_loss.call(self, @settings) if @on_tcp_connection_loss self.handle_connection_interruption end
Called by EventMachine reactor when
-
We close TCP connection down
-
Our peer closes TCP connection down
-
There is a network connection issue
-
Initial TCP connection fails
@private
# File lib/amqp/session.rb, line 634 def unbind(exception = nil) if !@tcp_connection_established && !@had_successfully_connected_before && !@intentionally_closing_connection @tcp_connection_failed = true logger.error "[amqp] Detected TCP connection failure: #{exception}" self.tcp_connection_failed end closing! @tcp_connection_established = false self.handle_connection_interruption if @reconnecting @disconnection_deferrable.succeed closed! self.tcp_connection_lost if !@intentionally_closing_connection && @had_successfully_connected_before # since AMQP spec dictates that authentication failure is a protocol exception # and protocol exceptions result in connection closure, check whether we are # in the authentication stage. If so, it is likely to signal an authentication # issue. Java client behaves the same way. MK. if authenticating? && !@intentionally_closing_connection @on_possible_authentication_failure.call(@settings) if @on_possible_authentication_failure end end
@return [String] Username used by this connection @api public
# File lib/amqp/session.rb, line 226 def username @settings[:user] end
vhost this connection uses. Default is “/”, a historically estabilished convention of RabbitMQ and amqp gem.
@return [String] vhost this connection uses @api public
# File lib/amqp/session.rb, line 782 def vhost @settings.fetch(:vhost, "/") end
Protected Instance Methods
Determines, whether given frame array contains full content body
# File lib/amqp/session.rb, line 1115 def content_complete?(frames) return false if frames.empty? header = frames[0] raise "Not a content header frame first: #{header.inspect}" unless header.kind_of?(AMQ::Protocol::HeaderFrame) header.body_size == frames[1..-1].inject(0) {|sum, frame| sum + frame.payload.size } end
Determines, whether the received frameset is ready to be further processed
# File lib/amqp/session.rb, line 1108 def frameset_complete?(frames) return false if frames.empty? first_frame = frames[0] first_frame.final? || (first_frame.method_class.has_content? && content_complete?(frames[1..-1])) end
Returns next frame from buffer whenever possible
@api private
# File lib/amqp/session.rb, line 1090 def get_next_frame return nil unless @chunk_buffer.size > 7 # otherwise, cannot read the length # octet + short offset = 3 # 1 + 2 # length payload_length = @chunk_buffer[offset, 4].unpack(AMQ::Protocol::PACK_UINT32).first # 4 bytes for long payload length, 1 byte final octet frame_length = offset + payload_length + 5 if frame_length <= @chunk_buffer.size @chunk_buffer.slice!(0, frame_length) else nil end end
# File lib/amqp/session.rb, line 1079 def negotiate_heartbeat_value(client_value, server_value) if client_value == 0 || server_value == 0 [client_value, server_value].max else [client_value, server_value].min end end
# File lib/amqp/session.rb, line 1159 def reset @size = 0 @payload = "" @frames = Array.new @chunk_buffer = "" @connection_deferrable = EventMachine::DefaultDeferrable.new @disconnection_deferrable = EventMachine::DefaultDeferrable.new # used to track down whether authentication succeeded. AMQP 0.9.1 dictates # that on authentication failure broker must close TCP connection without sending # any more data. This is why we need to explicitly track whether we are past # authentication stage to signal possible authentication failures. @authenticating = false end
# File lib/amqp/session.rb, line 1175 def upgrade_to_tls_if_necessary tls_options = @settings[:ssl] if tls_options.is_a?(Hash) start_tls(tls_options) elsif tls_options start_tls end end