class AMQP::Consumer

AMQP consumers are entities that handle messages delivered to them (“push API” as opposed to “pull API”) by AMQP broker. Every consumer is associated with a queue. Consumers can be exclusive (no other consumers can be registered for the same queue) or not (consumers share the queue). In the case of multiple consumers per queue, messages are distributed in round robin manner with respect to channel-level prefetch setting).

@see AMQP::Queue @see AMQP::Queue#subscribe @see AMQP::Queue#cancel

Attributes

arguments[R]

@return [Hash] Custom subscription metadata

channel[R]

@return [AMQP::Channel] Channel this consumer uses

consumer_tag[R]

@return [String] Consumer tag, unique consumer identifier

queue[R]

@return [AMQP::Queue] Queue messages are consumed from

Public Class Methods

new(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false, &block) click to toggle source
# File lib/amqp/consumer.rb, line 49
def initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false, &block)
  @callbacks    = Hash.new

  @channel       = channel            || raise(ArgumentError, "channel is nil")
  @connection    = channel.connection || raise(ArgumentError, "connection is nil")
  @queue         = queue        || raise(ArgumentError, "queue is nil")
  @consumer_tag  = consumer_tag || self.class.tag_generator.generate_for(queue)
  @exclusive     = exclusive
  @no_ack        = no_ack
  @arguments     = arguments

  @no_local     = no_local

  self.register_with_channel
  self.register_with_queue
end
tag_generator() click to toggle source

@return [AMQP::ConsumerTagGenerator] Consumer tag generator

# File lib/amqp/consumer.rb, line 38
def self.tag_generator
  @tag_generator ||= AMQP::ConsumerTagGenerator.new
end
tag_generator=(generator) click to toggle source

@param [AMQP::ConsumerTagGenerator] Assigns consumer tag generator that will be used by consumer instances @return [AMQP::ConsumerTagGenerator] Provided argument

# File lib/amqp/consumer.rb, line 44
def self.tag_generator=(generator)
  @tag_generator = generator
end

Public Instance Methods

acknowledge(delivery_tag) click to toggle source

Acknowledge a delivery tag. @return [Consumer] self

@api public @see bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)

# File lib/amqp/consumer.rb, line 199
def acknowledge(delivery_tag)
  @channel.acknowledge(delivery_tag)

  self
end
after_connection_interruption(&block)
after_recovery(&block)
Alias for: on_recovery
auto_recover() click to toggle source

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).

@api plugin

# File lib/amqp/consumer.rb, line 274
def auto_recover
  self.exec_callback_yielding_self(:before_recovery)
  self.resubscribe
  self.exec_callback_yielding_self(:after_recovery)
end
before_recovery(&block) click to toggle source

Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amqp/consumer.rb, line 254
def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end
callback() click to toggle source

Legacy {AMQP::Queue} API compatibility. @private @deprecated

# File lib/amqp/consumer.rb, line 140
def callback
  if @callbacks[:delivery]
    @callbacks[:delivery].first
  end
end
cancel(nowait = false, &block) click to toggle source

@return [AMQP::Consumer] self

# File lib/amqp/consumer.rb, line 113
def cancel(nowait = false, &block)
  @channel.once_open do
    @queue.once_declared do
      @connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@channel.id, @consumer_tag, nowait))
      if !nowait
        self.redefine_callback(:cancel, &block)
        @channel.consumers_awaiting_cancel_ok.push(self)
      end

      self
    end
  end

  self
end
consume(nowait = false, &block) click to toggle source

Begin consuming messages from the queue @return [AMQP::Consumer] self

# File lib/amqp/consumer.rb, line 75
def consume(nowait = false, &block)
  @channel.once_open do
    @queue.once_declared do
      @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, nowait, @arguments))

      if !nowait
        self.redefine_callback(:consume, &block)
        @channel.consumers_awaiting_consume_ok.push(self)
      end

      self
    end
  end

  self
end
exclusive?() click to toggle source

@return [Boolean] true if this consumer is exclusive (other consumers for the same queue are not allowed)

# File lib/amqp/consumer.rb, line 67
def exclusive?
  !!@exclusive
end
handle_cancel(basic_cancel) click to toggle source
# File lib/amqp/consumer.rb, line 186
def handle_cancel(basic_cancel)
  self.exec_callback(:scancel, basic_cancel)
end
handle_cancel_ok(cancel_ok) click to toggle source
# File lib/amqp/consumer.rb, line 300
def handle_cancel_ok(cancel_ok)
  self.exec_callback_once(:cancel, cancel_ok)

  self.unregister_with_channel
  self.unregister_with_queue

  @consumer_tag = nil

  # detach from object graph so that this object will be garbage-collected
  @queue        = nil
  @channel      = nil
  @connection   = nil

  self.clear_callbacks(:delivery)
  self.clear_callbacks(:consume)
  self.clear_callbacks(:cancel)
  self.clear_callbacks(:scancel)
end
handle_connection_interruption(method = nil) click to toggle source

@private

# File lib/amqp/consumer.rb, line 244
def handle_connection_interruption(method = nil)
  self.exec_callback_yielding_self(:after_connection_interruption)
end
handle_consume_ok(consume_ok) click to toggle source
# File lib/amqp/consumer.rb, line 296
def handle_consume_ok(consume_ok)
  self.exec_callback_once(:consume, consume_ok)
end
handle_delivery(basic_deliver, metadata, payload) click to toggle source

Implementation

# File lib/amqp/consumer.rb, line 292
def handle_delivery(basic_deliver, metadata, payload)
  self.exec_callback(:delivery, basic_deliver, metadata, payload)
end
inspect() click to toggle source

@return [String] Readable representation of relevant object state.

# File lib/amqp/consumer.rb, line 175
def inspect
  "#<AMQP::Consumer:#{@consumer_tag}> queue=#{@queue.name} channel=#{@channel.id} callbacks=#{@callbacks.inspect}"
end
on_cancel(&block) click to toggle source
# File lib/amqp/consumer.rb, line 180
def on_cancel(&block)
  self.append_callback(:scancel, &block)

  self
end
on_connection_interruption(&block) click to toggle source

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amqp/consumer.rb, line 238
def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end
on_delivery(&block) click to toggle source

Register a block that will be used to handle delivered messages.

@return [AMQP::Consumer] self @see AMQP::Queue#subscribe

# File lib/amqp/consumer.rb, line 151
def on_delivery(&block)
  # We have to maintain this multiple arities jazz
  # because older versions this gem are used in examples in at least 3
  # books published by O'Reilly :(. MK.
  delivery_shim = Proc.new { |basic_deliver, headers, payload|
    case block.arity
    when 1 then
      block.call(payload)
    when 2 then
      h = Header.new(@channel, basic_deliver, headers.decode_payload)
      block.call(h, payload)
    else
      h = Header.new(@channel, basic_deliver, headers.decode_payload)
      block.call(h, payload, basic_deliver.consumer_tag, basic_deliver.delivery_tag, basic_deliver.redelivered, basic_deliver.exchange, basic_deliver.routing_key)
    end
  }

  self.append_callback(:delivery, &delivery_shim)

  self
end
on_recovery(&block) click to toggle source

Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).

@api public Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amqp/consumer.rb, line 224
def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end
Also aliased as: after_recovery
reject(delivery_tag, requeue = true) click to toggle source

@return [Consumer] self

@api public @see bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)

# File lib/amqp/consumer.rb, line 210
def reject(delivery_tag, requeue = true)
  @channel.reject(delivery_tag, requeue)

  self
end
resubscribe(&block) click to toggle source

Used by automatic recovery code. @api plugin @return [AMQP::Consumer] self

# File lib/amqp/consumer.rb, line 95
def resubscribe(&block)
  @channel.once_open do
    @queue.once_declared do
      self.unregister_with_channel
      @consumer_tag = self.class.tag_generator.generate_for(@queue)
      self.register_with_channel

      @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, block.nil?, @arguments))
      self.redefine_callback(:consume, &block) if block

      self
    end
  end

  self
end
run_after_recovery_callbacks() click to toggle source

@private

# File lib/amqp/consumer.rb, line 264
def run_after_recovery_callbacks
  self.exec_callback_yielding_self(:after_recovery)
end
run_before_recovery_callbacks() click to toggle source

@private

# File lib/amqp/consumer.rb, line 259
def run_before_recovery_callbacks
  self.exec_callback_yielding_self(:before_recovery)
end
subscribed?() click to toggle source

{AMQP::Queue} API compatibility.

@return [Boolean] true if this consumer is active (subscribed for message delivery) @api public

# File lib/amqp/consumer.rb, line 133
def subscribed?
  !@callbacks[:delivery].empty?
end
to_s() click to toggle source

@endgroup

# File lib/amqp/consumer.rb, line 283
def to_s
  "#<#{self.class.name} @consumer_tag=#{@consumer_tag} @queue=#{@queue.name} @channel=#{@channel.id}>"
end

Protected Instance Methods

register_with_channel() click to toggle source
# File lib/amqp/consumer.rb, line 353
def register_with_channel
  @channel.consumers[@consumer_tag] = self
end
register_with_queue() click to toggle source
# File lib/amqp/consumer.rb, line 357
def register_with_queue
  @queue.consumers[@consumer_tag]   = self
end
unregister_with_channel() click to toggle source
# File lib/amqp/consumer.rb, line 361
def unregister_with_channel
  @channel.consumers.delete(@consumer_tag)
end
unregister_with_queue() click to toggle source
# File lib/amqp/consumer.rb, line 365
def unregister_with_queue
  @queue.consumers.delete(@consumer_tag)
end