Class/Module Index [+]

Quicksearch

AMQ::Client::Async::Adapter

Base adapter class. Specific implementations (for example, EventMachine-based, Cool.io-based or sockets-based) subclass it and must implement Adapter API methods:

@abstract

Public Class Methods

included(host) click to toggle source
# File lib/amq/client/async/adapter.rb, line 22
def self.included(host)
  host.extend ClassMethods
  host.extend ProtocolMethodHandlers

  host.class_eval do

    #
    # API
    #

    attr_accessor :logger
    attr_accessor :settings

    # @return [Array<#call>]
    attr_reader :callbacks


    # The locale defines the language in which the server will send reply texts.
    #
    # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2)
    attr_accessor :locale

    # Client capabilities
    #
    # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2.1)
    attr_accessor :client_properties

    # Server properties
    #
    # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3)
    attr_reader :server_properties

    # Server capabilities
    #
    # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3)
    attr_reader :server_capabilities

    # Locales server supports
    #
    # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3)
    attr_reader :server_locales

    # Authentication mechanism used.
    #
    # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2)
    attr_reader :mechanism

    # Authentication mechanisms broker supports.
    #
    # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2)
    attr_reader :server_authentication_mechanisms

    # Channels within this connection.
    #
    # @see http://bit.ly/amqp091spec AMQP 0.9.1 specification (Section 2.2.5)
    attr_reader :channels

    # Maximum channel number that the server permits this connection to use.
    # Usable channel numbers are in the range 1..channel_max.
    # Zero indicates no specified limit.
    #
    # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.4.2.5.1 and 1.4.2.6.1)
    attr_accessor :channel_max

    # Maximum frame size that the server permits this connection to use.
    #
    # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.4.2.5.2 and 1.4.2.6.2)
    attr_accessor :frame_max


    attr_reader :known_hosts



    # @api plugin
    # @see #disconnect
    # @note Adapters must implement this method but it is NOT supposed to be used directly.
    #       AMQ protocol defines two-step process of closing connection (send Connection.Close
    #       to the peer and wait for Connection.Close-Ok), implemented by {Adapter#disconnect}
    def close_connection
      raise NotImplementedError
    end unless defined?(:close_connection) # since it is a module, this method may already be defined
  end
end

Public Instance Methods

after_connection_interruption(&block) click to toggle source
after_recovery(&block) click to toggle source
Alias for: on_recovery
auto_recover() click to toggle source

Performs recovery of channels that are in the automatic recovery mode. Does not run recovery callbacks.

@see Channel#auto_recover @see Queue#auto_recover @see Exchange#auto_recover @api plugin

# File lib/amq/client/async/adapter.rb, line 425
def auto_recover
  @channels.select { |channel_id, ch| ch.auto_recovering? }.each { |n, ch| ch.auto_recover }
end
auto_recovering?() click to toggle source

@return [Boolean] whether connection is in the automatic recovery mode @api public

# File lib/amq/client/async/adapter.rb, line 412
def auto_recovering?
  !!@auto_recovery
end
Also aliased as: auto_recovery?
auto_recovery?() click to toggle source
Alias for: auto_recovering?
before_recovery(&block) click to toggle source

Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/adapter.rb, line 381
def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end
clear_frames_on(channel_id) click to toggle source

Clears frames that were received but not processed on given channel. Needs to be called when the channel is closed. @private

# File lib/amq/client/async/adapter.rb, line 560
def clear_frames_on(channel_id)
  raise ArgumentError, "channel id cannot be nil!" if channel_id.nil?

  @frames[channel_id].clear
end
close_connection() click to toggle source

@api plugin @see disconnect @note Adapters must implement this method but it is NOT supposed to be used directly.

AMQ protocol defines two-step process of closing connection (send Connection.Close
to the peer and wait for Connection.Close-Ok), implemented by {Adapter#disconnect}
# File lib/amq/client/async/adapter.rb, line 101
def close_connection
  raise NotImplementedError
end
disconnect(reply_code = 200, reply_text = "Goodbye", class_id = 0, method_id = 0, &block) click to toggle source

Properly close connection with AMQ broker, as described in section 2.2.4 of the {bit.ly/amqp091spec AMQP 0.9.1 specification}.

@api plugin @see close_connection

# File lib/amq/client/async/adapter.rb, line 211
def disconnect(reply_code = 200, reply_text = "Goodbye", class_id = 0, method_id = 0, &block)
  @intentionally_closing_connection = true
  self.on_disconnection do
    @frames.clear
    block.call if block
  end

  # ruby-amqp/amqp#66, MK.
  if self.open?
    closing!
    self.send_frame(Protocol::Connection::Close.encode(reply_code, reply_text, class_id, method_id))
  elsif self.closing?
    # no-op
  else
    self.disconnection_successful
  end
end
encode_credentials(username, password) click to toggle source

@api plugin @see tools.ietf.org/rfc/rfc2595.txt RFC 2595

# File lib/amq/client/async/adapter.rb, line 510
def encode_credentials(username, password)
  "\00##{username}\00##{password}"
end
establish_connection(settings) click to toggle source

Establish socket connection to the server.

@api plugin

# File lib/amq/client/async/adapter.rb, line 202
def establish_connection(settings)
  raise NotImplementedError
end
handle_close(conn_close) click to toggle source

Handles connection.close. When broker detects a connection level exception, this method is called.

@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.5.2.9)

# File lib/amq/client/async/adapter.rb, line 635
def handle_close(conn_close)
  closed!
  self.exec_callback_yielding_self(:error, conn_close)
end
handle_close_ok(close_ok) click to toggle source

Handles Connection.Close-Ok.

@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.10)

# File lib/amq/client/async/adapter.rb, line 645
def handle_close_ok(close_ok)
  closed!
  self.disconnection_successful
end
handle_connection_interruption() click to toggle source

@private @api plugin

# File lib/amq/client/async/adapter.rb, line 369
def handle_connection_interruption
  @channels.each { |n, c| c.handle_connection_interruption }
  self.exec_callback_yielding_self(:after_connection_interruption)
end
handle_open_ok(open_ok) click to toggle source

Handles Connection.Open-Ok.

@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.8.)

# File lib/amq/client/async/adapter.rb, line 623
def handle_open_ok(open_ok)
  @known_hosts = open_ok.known_hosts.dup.freeze

  opened!
  self.connection_successful if self.respond_to?(:connection_successful)
end
handle_start(connection_start) click to toggle source

Handles connection.start.

@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.)

# File lib/amq/client/async/adapter.rb, line 587
def handle_start(connection_start)
  @server_properties                = connection_start.server_properties
  @server_capabilities              = @server_properties["capabilities"]

  @server_authentication_mechanisms = (connection_start.mechanisms || "").split(" ")
  @server_locales                   = Array(connection_start.locales)

  username = @settings[:user] || @settings[:username]
  password = @settings[:pass] || @settings[:password]

  # It's not clear whether we should transition to :opening state here
  # or in #open but in case authentication fails, it would be strange to have
  # @status undefined. So lets do this. MK.
  opening!

  self.send_frame(Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale))
end
handle_tune(tune_ok) click to toggle source

Handles Connection.Tune-Ok.

@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.6)

# File lib/amq/client/async/adapter.rb, line 610
def handle_tune(tune_ok)
  @channel_max        = tune_ok.channel_max.freeze
  @frame_max          = tune_ok.frame_max.freeze
  @heartbeat_interval = self.heartbeat_interval || tune_ok.heartbeat

  self.send_frame(Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval))
end
handshake() click to toggle source

Sends connection preamble to the broker. @api plugin

# File lib/amq/client/async/adapter.rb, line 487
def handshake
  @authenticating = true
  self.send_preamble
end
heartbeat_interval() click to toggle source

Returns heartbeat interval this client uses, in seconds. This value may or may not be used depending on broker capabilities. Zero means the server does not want a heartbeat.

@return [Fixnum] Heartbeat interval this client uses, in seconds. @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.6)

# File lib/amq/client/async/adapter.rb, line 275
def heartbeat_interval
  @settings[:heartbeat] || @settings[:heartbeat_interval] || 0
end
heartbeats_enabled?() click to toggle source

Returns true if heartbeats are enabled (heartbeat interval is greater than 0) @return [Boolean]

# File lib/amq/client/async/adapter.rb, line 281
def heartbeats_enabled?
  self.heartbeat_interval > 0
end
on_connection_interruption(&block) click to toggle source

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/adapter.rb, line 361
def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end
on_error(&block) click to toggle source

Defines a callback that will be executed when connection is closed after connection-level exception. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/adapter.rb, line 352
def on_error(&block)
  self.redefine_callback(:error, &block)
end
on_possible_authentication_failure(&block) click to toggle source

Defines a callback that will be run when TCP connection is closed before authentication finishes. Usually this means authentication failure. You can define only one callback.

@api public

# File lib/amq/client/async/adapter.rb, line 342
def on_possible_authentication_failure(&block)
  @on_possible_authentication_failure = block
end
on_recovery(&block) click to toggle source

Defines a callback that will be executed after AMQP connection has recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/adapter.rb, line 397
def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end
Also aliased as: after_recovery
on_skipped_heartbeats(&block) click to toggle source

Defines a callback that will be executed after time since last broker heartbeat is greater than or equal to the heartbeat interval (skipped heartbeat is detected). Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/adapter.rb, line 458
def on_skipped_heartbeats(&block)
  self.redefine_callback(:skipped_heartbeats, &block)
end
on_tcp_connection_failure(&block) click to toggle source

Defines a callback that will be run when initial TCP connection fails. You can define only one callback.

@api public

# File lib/amq/client/async/adapter.rb, line 326
def on_tcp_connection_failure(&block)
  @on_tcp_connection_failure = block
end
on_tcp_connection_loss(&block) click to toggle source

Defines a callback that will be run when TCP connection to AMQP broker is lost (interrupted). You can define only one callback.

@api public

# File lib/amq/client/async/adapter.rb, line 334
def on_tcp_connection_loss(&block)
  @on_tcp_connection_loss = block
end
open(vhost = "/") click to toggle source

Sends connection.open to the server.

@api plugin @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.7)

# File lib/amq/client/async/adapter.rb, line 497
def open(vhost = "/")
  self.send_frame(Protocol::Connection::Open.encode(vhost))
end
receive_frame(frame) click to toggle source

Processes a single frame.

@param [AMQ::Protocol::Frame] frame @api plugin

# File lib/amq/client/async/adapter.rb, line 519
def receive_frame(frame)
  @frames[frame.channel] ||= Array.new
  @frames[frame.channel] << frame

  if frameset_complete?(@frames[frame.channel])
    receive_frameset(@frames[frame.channel])
    # for channel.close, frame.channel will be nil. MK.
    clear_frames_on(frame.channel) if @frames[frame.channel]
  end
end
receive_frameset(frames) click to toggle source

Processes a frameset by finding and invoking a suitable handler. Heartbeat frames are treated in a special way: they simply update @last_server_heartbeat value.

@param [Array<AMQ::Protocol::Frame>] frames @api plugin

# File lib/amq/client/async/adapter.rb, line 536
def receive_frameset(frames)
  if self.heartbeats_enabled?
    # treat incoming traffic as heartbeats.
    # this operation is pretty expensive under heavy traffic but heartbeats can be disabled
    # (and are also disabled by default). MK.
    @last_server_heartbeat = Time.now
  end
  frame = frames.first

  if AMQ::Protocol::HeartbeatFrame === frame
    # no-op
  else
    if callable = AMQ::Client::HandlersRegistry.find(frame.method_class)
      f = frames.shift
      callable.call(self, f, frames)
    else
      raise MissingHandlerError.new(frames.first)
    end
  end
end
reconnecting?() click to toggle source

@return [Boolean]

# File lib/amq/client/async/adapter.rb, line 317
def reconnecting?
  @reconnecting
end
reset_state!() click to toggle source

Resets connection state.

@api plugin

# File lib/amq/client/async/adapter.rb, line 504
def reset_state!
  # no-op by default
end
run_after_recovery_callbacks() click to toggle source

@private

# File lib/amq/client/async/adapter.rb, line 403
def run_after_recovery_callbacks
  self.exec_callback_yielding_self(:after_recovery, @settings)

  @channels.each { |n, ch| ch.run_after_recovery_callbacks }
end
run_before_recovery_callbacks() click to toggle source

@private

# File lib/amq/client/async/adapter.rb, line 386
def run_before_recovery_callbacks
  self.exec_callback_yielding_self(:before_recovery, @settings)

  @channels.each { |n, ch| ch.run_before_recovery_callbacks }
end
run_skipped_heartbeats_callbacks() click to toggle source

@private

# File lib/amq/client/async/adapter.rb, line 463
def run_skipped_heartbeats_callbacks
  self.exec_callback_yielding_self(:skipped_heartbeats, @settings)
end
send_frame(frame) click to toggle source

Sends frame to the peer, checking that connection is open.

@raise [ConnectionClosedError]

# File lib/amq/client/async/adapter.rb, line 244
def send_frame(frame)
  if closed?
    raise ConnectionClosedError.new(frame)
  else
    self.send_raw(frame.encode)
  end
end
send_frameset(frames, channel) click to toggle source

Sends multiple frames, one by one. For thread safety this method takes a channel object and synchronizes on it.

@api public

# File lib/amq/client/async/adapter.rb, line 256
def send_frameset(frames, channel)
  # some (many) developers end up sharing channels between threads and when multiple
  # threads publish on the same channel aggressively, at some point frames will be
  # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
  # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
  # locking. Note that "single frame" methods do not need this kind of synchronization. MK.
  channel.synchronize do
    frames.each { |frame| self.send_frame(frame) }
  end
end
send_heartbeat() click to toggle source

Sends a heartbeat frame if connection is open. @api plugin

# File lib/amq/client/async/adapter.rb, line 568
def send_heartbeat
  if tcp_connection_established? && !@handling_skipped_hearbeats
    if @last_server_heartbeat < (Time.now - (self.heartbeat_interval * 2)) && !reconnecting?
      logger.error "[amqp] Detected missing server heartbeats"
      self.handle_skipped_hearbeats
    end
    send_frame(Protocol::HeartbeatFrame)
  end
end
send_preamble() click to toggle source

Sends AMQ protocol header (also known as preamble).

@note This must be implemented by all AMQP clients. @api plugin @see bit.ly/amqp091spec AMQP 0.9.1 specification (Section 2.2)

# File lib/amq/client/async/adapter.rb, line 237
def send_preamble
  self.send_raw(AMQ::Protocol::PREAMBLE)
end
send_raw(data) click to toggle source

Sends opaque data to AMQ broker over active connection.

@note This must be implemented by all AMQP clients. @api plugin

# File lib/amq/client/async/adapter.rb, line 481
def send_raw(data)
  raise NotImplementedError
end
start_automatic_recovery() click to toggle source

Performs recovery of channels that are in the automatic recovery mode. "before recovery" callbacks are run immediately, "after recovery" callbacks are run after AMQP connection is re-established and auto recovery is performed (using auto_recover).

Use this method if you want to run automatic recovery process after handling a connection-level exception, for example, 320 CONNECTION_FORCED (used by RabbitMQ when it is shut down gracefully).

@see Channel#auto_recover @see Queue#auto_recover @see Exchange#auto_recover @api plugin

# File lib/amq/client/async/adapter.rb, line 441
def start_automatic_recovery
  self.run_before_recovery_callbacks
  self.register_connection_callback do
    # always run automatic recovery, because it is per-channel
    # and connection has to start it. Channels that did not opt-in for
    # autorecovery won't be selected. MK.
    self.auto_recover
    self.run_after_recovery_callbacks
  end
end
tcp_connection_failed() click to toggle source

Called when initial TCP connection fails. @api public

# File lib/amq/client/async/adapter.rb, line 301
def tcp_connection_failed
  @recovered = false

  @on_tcp_connection_failure.call(@settings) if @on_tcp_connection_failure
end
tcp_connection_lost() click to toggle source

Called when previously established TCP connection fails. @api public

# File lib/amq/client/async/adapter.rb, line 309
def tcp_connection_lost
  @recovered = false

  @on_tcp_connection_loss.call(self, @settings) if @on_tcp_connection_loss
  self.handle_connection_interruption
end
vhost() click to toggle source

vhost this connection uses. Default is "/", a historically estabilished convention of RabbitMQ and amqp gem.

@return [String] vhost this connection uses @api public

# File lib/amq/client/async/adapter.rb, line 291
def vhost
  @settings.fetch(:vhost, "/")
end

Protected Instance Methods

content_complete?(frames) click to toggle source

Determines, whether given frame array contains full content body

# File lib/amq/client/async/adapter.rb, line 682
def content_complete?(frames)
  return false if frames.empty?
  header = frames[0]
  raise "Not a content header frame first: #{header.inspect}" unless header.kind_of?(AMQ::Protocol::HeaderFrame)
  header.body_size == frames[1..-1].inject(0) {|sum, frame| sum + frame.payload.size }
end
frameset_complete?(frames) click to toggle source

Determines, whether the received frameset is ready to be further processed

# File lib/amq/client/async/adapter.rb, line 675
def frameset_complete?(frames)
  return false if frames.empty?
  first_frame = frames[0]
  first_frame.final? || (first_frame.method_class.has_content? && content_complete?(frames[1..-1]))
end
get_next_frame() click to toggle source

Returns next frame from buffer whenever possible

@api private

# File lib/amq/client/async/adapter.rb, line 657
def get_next_frame
  return nil unless @chunk_buffer.size > 7 # otherwise, cannot read the length
  # octet + short
  offset = 3 # 1 + 2
  # length
  payload_length = @chunk_buffer[offset, 4].unpack(AMQ::Protocol::PACK_UINT32).first
  # 4 bytes for long payload length, 1 byte final octet
  frame_length = offset + payload_length + 5
  if frame_length <= @chunk_buffer.size
    @chunk_buffer.slice!(0, frame_length)
  else
    nil
  end
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.