Base adapter class. Specific implementations (for example, EventMachine-based, Cool.io-based or sockets-based) subclass it and must implement Adapter API methods:
estabilish_connection(settings)
@abstract
# File lib/amq/client/async/adapter.rb, line 22 def self.included(host) host.extend ClassMethods host.extend ProtocolMethodHandlers host.class_eval do # # API # attr_accessor :logger attr_accessor :settings # @return [Array<#call>] attr_reader :callbacks # The locale defines the language in which the server will send reply texts. # # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2) attr_accessor :locale # Client capabilities # # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2.1) attr_accessor :client_properties # Server properties # # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3) attr_reader :server_properties # Server capabilities # # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3) attr_reader :server_capabilities # Locales server supports # # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3) attr_reader :server_locales # Authentication mechanism used. # # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2) attr_reader :mechanism # Authentication mechanisms broker supports. # # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2) attr_reader :server_authentication_mechanisms # Channels within this connection. # # @see http://bit.ly/amqp091spec AMQP 0.9.1 specification (Section 2.2.5) attr_reader :channels # Maximum channel number that the server permits this connection to use. # Usable channel numbers are in the range 1..channel_max. # Zero indicates no specified limit. # # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.4.2.5.1 and 1.4.2.6.1) attr_accessor :channel_max # Maximum frame size that the server permits this connection to use. # # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.4.2.5.2 and 1.4.2.6.2) attr_accessor :frame_max attr_reader :known_hosts # @api plugin # @see #disconnect # @note Adapters must implement this method but it is NOT supposed to be used directly. # AMQ protocol defines two-step process of closing connection (send Connection.Close # to the peer and wait for Connection.Close-Ok), implemented by {Adapter#disconnect} def close_connection raise NotImplementedError end unless defined?(:close_connection) # since it is a module, this method may already be defined end end
Performs recovery of channels that are in the automatic recovery mode. Does not run recovery callbacks.
@see Channel#auto_recover @see Queue#auto_recover @see Exchange#auto_recover @api plugin
# File lib/amq/client/async/adapter.rb, line 425 def auto_recover @channels.select { |channel_id, ch| ch.auto_recovering? }.each { |n, ch| ch.auto_recover } end
@return [Boolean] whether connection is in the automatic recovery mode @api public
# File lib/amq/client/async/adapter.rb, line 412 def auto_recovering? !!@auto_recovery end
Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/adapter.rb, line 381 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end
Clears frames that were received but not processed on given channel. Needs to be called when the channel is closed. @private
# File lib/amq/client/async/adapter.rb, line 560 def clear_frames_on(channel_id) raise ArgumentError, "channel id cannot be nil!" if channel_id.nil? @frames[channel_id].clear end
@api plugin @see disconnect @note Adapters must implement this method but it is NOT supposed to be used directly.
AMQ protocol defines two-step process of closing connection (send Connection.Close to the peer and wait for Connection.Close-Ok), implemented by {Adapter#disconnect}
# File lib/amq/client/async/adapter.rb, line 101 def close_connection raise NotImplementedError end
Properly close connection with AMQ broker, as described in section 2.2.4 of the {bit.ly/amqp091spec AMQP 0.9.1 specification}.
@api plugin @see close_connection
# File lib/amq/client/async/adapter.rb, line 211 def disconnect(reply_code = 200, reply_text = "Goodbye", class_id = 0, method_id = 0, &block) @intentionally_closing_connection = true self.on_disconnection do @frames.clear block.call if block end # ruby-amqp/amqp#66, MK. if self.open? closing! self.send_frame(Protocol::Connection::Close.encode(reply_code, reply_text, class_id, method_id)) elsif self.closing? # no-op else self.disconnection_successful end end
@api plugin @see tools.ietf.org/rfc/rfc2595.txt RFC 2595
# File lib/amq/client/async/adapter.rb, line 510 def encode_credentials(username, password) "\00##{username}\00##{password}" end
Establish socket connection to the server.
@api plugin
# File lib/amq/client/async/adapter.rb, line 202 def establish_connection(settings) raise NotImplementedError end
Handles connection.close. When broker detects a connection level exception, this method is called.
@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.5.2.9)
# File lib/amq/client/async/adapter.rb, line 635 def handle_close(conn_close) closed! self.exec_callback_yielding_self(:error, conn_close) end
Handles Connection.Close-Ok.
@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.10)
# File lib/amq/client/async/adapter.rb, line 645 def handle_close_ok(close_ok) closed! self.disconnection_successful end
@private @api plugin
# File lib/amq/client/async/adapter.rb, line 369 def handle_connection_interruption @channels.each { |n, c| c.handle_connection_interruption } self.exec_callback_yielding_self(:after_connection_interruption) end
Handles Connection.Open-Ok.
@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.8.)
# File lib/amq/client/async/adapter.rb, line 623 def handle_open_ok(open_ok) @known_hosts = open_ok.known_hosts.dup.freeze opened! self.connection_successful if self.respond_to?(:connection_successful) end
Handles connection.start.
@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.)
# File lib/amq/client/async/adapter.rb, line 587 def handle_start(connection_start) @server_properties = connection_start.server_properties @server_capabilities = @server_properties["capabilities"] @server_authentication_mechanisms = (connection_start.mechanisms || "").split(" ") @server_locales = Array(connection_start.locales) username = @settings[:user] || @settings[:username] password = @settings[:pass] || @settings[:password] # It's not clear whether we should transition to :opening state here # or in #open but in case authentication fails, it would be strange to have # @status undefined. So lets do this. MK. opening! self.send_frame(Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale)) end
Handles Connection.Tune-Ok.
@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.6)
# File lib/amq/client/async/adapter.rb, line 610 def handle_tune(tune_ok) @channel_max = tune_ok.channel_max.freeze @frame_max = tune_ok.frame_max.freeze @heartbeat_interval = self.heartbeat_interval || tune_ok.heartbeat self.send_frame(Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval)) end
Sends connection preamble to the broker. @api plugin
# File lib/amq/client/async/adapter.rb, line 487 def handshake @authenticating = true self.send_preamble end
Returns heartbeat interval this client uses, in seconds. This value may or may not be used depending on broker capabilities. Zero means the server does not want a heartbeat.
@return [Fixnum] Heartbeat interval this client uses, in seconds. @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.6)
# File lib/amq/client/async/adapter.rb, line 275 def heartbeat_interval @settings[:heartbeat] || @settings[:heartbeat_interval] || 0 end
Returns true if heartbeats are enabled (heartbeat interval is greater than 0) @return [Boolean]
# File lib/amq/client/async/adapter.rb, line 281 def heartbeats_enabled? self.heartbeat_interval > 0 end
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/adapter.rb, line 361 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end
Defines a callback that will be executed when connection is closed after connection-level exception. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/adapter.rb, line 352 def on_error(&block) self.redefine_callback(:error, &block) end
Defines a callback that will be run when TCP connection is closed before authentication finishes. Usually this means authentication failure. You can define only one callback.
@api public
# File lib/amq/client/async/adapter.rb, line 342 def on_possible_authentication_failure(&block) @on_possible_authentication_failure = block end
Defines a callback that will be executed after AMQP connection has recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/adapter.rb, line 397 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end
Defines a callback that will be executed after time since last broker heartbeat is greater than or equal to the heartbeat interval (skipped heartbeat is detected). Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/adapter.rb, line 458 def on_skipped_heartbeats(&block) self.redefine_callback(:skipped_heartbeats, &block) end
Defines a callback that will be run when initial TCP connection fails. You can define only one callback.
@api public
# File lib/amq/client/async/adapter.rb, line 326 def on_tcp_connection_failure(&block) @on_tcp_connection_failure = block end
Defines a callback that will be run when TCP connection to AMQP broker is lost (interrupted). You can define only one callback.
@api public
# File lib/amq/client/async/adapter.rb, line 334 def on_tcp_connection_loss(&block) @on_tcp_connection_loss = block end
Sends connection.open to the server.
@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.7)
# File lib/amq/client/async/adapter.rb, line 497 def open(vhost = "/") self.send_frame(Protocol::Connection::Open.encode(vhost)) end
Processes a single frame.
@param [AMQ::Protocol::Frame] frame @api plugin
# File lib/amq/client/async/adapter.rb, line 519 def receive_frame(frame) @frames[frame.channel] ||= Array.new @frames[frame.channel] << frame if frameset_complete?(@frames[frame.channel]) receive_frameset(@frames[frame.channel]) # for channel.close, frame.channel will be nil. MK. clear_frames_on(frame.channel) if @frames[frame.channel] end end
Processes a frameset by finding and invoking a suitable handler. Heartbeat frames are treated in a special way: they simply update @last_server_heartbeat value.
@param [Array<AMQ::Protocol::Frame>] frames @api plugin
# File lib/amq/client/async/adapter.rb, line 536 def receive_frameset(frames) if self.heartbeats_enabled? # treat incoming traffic as heartbeats. # this operation is pretty expensive under heavy traffic but heartbeats can be disabled # (and are also disabled by default). MK. @last_server_heartbeat = Time.now end frame = frames.first if AMQ::Protocol::HeartbeatFrame === frame # no-op else if callable = AMQ::Client::HandlersRegistry.find(frame.method_class) f = frames.shift callable.call(self, f, frames) else raise MissingHandlerError.new(frames.first) end end end
@return [Boolean]
# File lib/amq/client/async/adapter.rb, line 317 def reconnecting? @reconnecting end
Resets connection state.
@api plugin
# File lib/amq/client/async/adapter.rb, line 504 def reset_state! # no-op by default end
@private
# File lib/amq/client/async/adapter.rb, line 403 def run_after_recovery_callbacks self.exec_callback_yielding_self(:after_recovery, @settings) @channels.each { |n, ch| ch.run_after_recovery_callbacks } end
@private
# File lib/amq/client/async/adapter.rb, line 386 def run_before_recovery_callbacks self.exec_callback_yielding_self(:before_recovery, @settings) @channels.each { |n, ch| ch.run_before_recovery_callbacks } end
@private
# File lib/amq/client/async/adapter.rb, line 463 def run_skipped_heartbeats_callbacks self.exec_callback_yielding_self(:skipped_heartbeats, @settings) end
Sends frame to the peer, checking that connection is open.
@raise [ConnectionClosedError]
# File lib/amq/client/async/adapter.rb, line 244 def send_frame(frame) if closed? raise ConnectionClosedError.new(frame) else self.send_raw(frame.encode) end end
Sends multiple frames, one by one. For thread safety this method takes a channel object and synchronizes on it.
@api public
# File lib/amq/client/async/adapter.rb, line 256 def send_frameset(frames, channel) # some (many) developers end up sharing channels between threads and when multiple # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained # locking. Note that "single frame" methods do not need this kind of synchronization. MK. channel.synchronize do frames.each { |frame| self.send_frame(frame) } end end
Sends a heartbeat frame if connection is open. @api plugin
# File lib/amq/client/async/adapter.rb, line 568 def send_heartbeat if tcp_connection_established? && !@handling_skipped_hearbeats if @last_server_heartbeat < (Time.now - (self.heartbeat_interval * 2)) && !reconnecting? logger.error "[amqp] Detected missing server heartbeats" self.handle_skipped_hearbeats end send_frame(Protocol::HeartbeatFrame) end end
Sends AMQ protocol header (also known as preamble).
@note This must be implemented by all AMQP clients. @api plugin @see bit.ly/amqp091spec AMQP 0.9.1 specification (Section 2.2)
# File lib/amq/client/async/adapter.rb, line 237 def send_preamble self.send_raw(AMQ::Protocol::PREAMBLE) end
Sends opaque data to AMQ broker over active connection.
@note This must be implemented by all AMQP clients. @api plugin
# File lib/amq/client/async/adapter.rb, line 481 def send_raw(data) raise NotImplementedError end
Performs recovery of channels that are in the automatic recovery mode. "before recovery" callbacks are run immediately, "after recovery" callbacks are run after AMQP connection is re-established and auto recovery is performed (using auto_recover).
Use this method if you want to run automatic recovery process after handling a connection-level exception, for example, 320 CONNECTION_FORCED (used by RabbitMQ when it is shut down gracefully).
@see Channel#auto_recover @see Queue#auto_recover @see Exchange#auto_recover @api plugin
# File lib/amq/client/async/adapter.rb, line 441 def start_automatic_recovery self.run_before_recovery_callbacks self.register_connection_callback do # always run automatic recovery, because it is per-channel # and connection has to start it. Channels that did not opt-in for # autorecovery won't be selected. MK. self.auto_recover self.run_after_recovery_callbacks end end
Called when initial TCP connection fails. @api public
# File lib/amq/client/async/adapter.rb, line 301 def tcp_connection_failed @recovered = false @on_tcp_connection_failure.call(@settings) if @on_tcp_connection_failure end
Called when previously established TCP connection fails. @api public
# File lib/amq/client/async/adapter.rb, line 309 def tcp_connection_lost @recovered = false @on_tcp_connection_loss.call(self, @settings) if @on_tcp_connection_loss self.handle_connection_interruption end
Determines, whether given frame array contains full content body
# File lib/amq/client/async/adapter.rb, line 682 def content_complete?(frames) return false if frames.empty? header = frames[0] raise "Not a content header frame first: #{header.inspect}" unless header.kind_of?(AMQ::Protocol::HeaderFrame) header.body_size == frames[1..-1].inject(0) {|sum, frame| sum + frame.payload.size } end
Determines, whether the received frameset is ready to be further processed
# File lib/amq/client/async/adapter.rb, line 675 def frameset_complete?(frames) return false if frames.empty? first_frame = frames[0] first_frame.final? || (first_frame.method_class.has_content? && content_complete?(frames[1..-1])) end
Returns next frame from buffer whenever possible
@api private
# File lib/amq/client/async/adapter.rb, line 657 def get_next_frame return nil unless @chunk_buffer.size > 7 # otherwise, cannot read the length # octet + short offset = 3 # 1 + 2 # length payload_length = @chunk_buffer[offset, 4].unpack(AMQ::Protocol::PACK_UINT32).first # 4 bytes for long payload length, 1 byte final octet frame_length = offset + payload_length + 5 if frame_length <= @chunk_buffer.size @chunk_buffer.slice!(0, frame_length) else nil end end
Generated with the Darkfish Rdoc Generator 2.