Parent

Class/Module Index [+]

Quicksearch

AMQ::Client::Async::Consumer

Attributes

arguments[R]
channel[R]

API

consumer_tag[R]
queue[R]

Public Class Methods

new(channel, queue, consumer_tag = self.class.tag_generator.generate_for(queue), exclusive = false, no_ack = false, arguments = {}, no_local = false, &block) click to toggle source
# File lib/amq/client/async/consumer.rb, line 38
def initialize(channel, queue, consumer_tag = self.class.tag_generator.generate_for(queue), exclusive = false, no_ack = false, arguments = {}, no_local = false, &block)
  @callbacks    = Hash.new

  @channel       = channel            || raise(ArgumentError, "channel is nil")
  @connection    = channel.connection || raise(ArgumentError, "connection is nil")
  @queue         = queue        || raise(ArgumentError, "queue is nil")
  @consumer_tag  = consumer_tag
  @exclusive     = exclusive
  @no_ack        = no_ack
  @arguments     = arguments

  @no_local     = no_local

  self.register_with_channel
  self.register_with_queue
end
tag_generator() click to toggle source
# File lib/amq/client/async/consumer.rb, line 29
def self.tag_generator
  @tag_generator ||= AMQ::Client::ConsumerTagGenerator.new
end
tag_generator=(generator) click to toggle source
# File lib/amq/client/async/consumer.rb, line 33
def self.tag_generator=(generator)
  @tag_generator = generator
end

Public Instance Methods

acknowledge(delivery_tag) click to toggle source

Acknowledge a delivery tag. @return [Consumer] self

@api public @see bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)

# File lib/amq/client/async/consumer.rb, line 113
def acknowledge(delivery_tag)
  @channel.acknowledge(delivery_tag)

  self
end
after_connection_interruption(&block) click to toggle source
after_recovery(&block) click to toggle source
Alias for: on_recovery
auto_recover() click to toggle source

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).

@api plugin

# File lib/amq/client/async/consumer.rb, line 185
def auto_recover
  self.exec_callback_yielding_self(:before_recovery)
  self.resubscribe
  self.exec_callback_yielding_self(:after_recovery)
end
before_recovery(&block) click to toggle source

Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/consumer.rb, line 155
def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end
cancel(nowait = false, &block) click to toggle source
# File lib/amq/client/async/consumer.rb, line 81
def cancel(nowait = false, &block)
  @connection.send_frame(Protocol::Basic::Cancel.encode(@channel.id, @consumer_tag, nowait))
  self.clear_callbacks(:delivery)
  self.clear_callbacks(:consume)

  self.unregister_with_channel
  self.unregister_with_queue

  if !nowait
    self.redefine_callback(:cancel, &block)
    @channel.consumers_awaiting_cancel_ok.push(self)
  end

  self
end
consume(nowait = false, &block) click to toggle source
# File lib/amq/client/async/consumer.rb, line 62
def consume(nowait = false, &block)
  @connection.send_frame(Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, nowait, @arguments))
  self.redefine_callback(:consume, &block)

  @channel.consumers_awaiting_consume_ok.push(self)

  self
end
exclusive?() click to toggle source
# File lib/amq/client/async/consumer.rb, line 56
def exclusive?
  !!@exclusive
end
handle_cancel_ok(cancel_ok) click to toggle source
# File lib/amq/client/async/consumer.rb, line 208
def handle_cancel_ok(cancel_ok)
  @consumer_tag = nil

  # detach from object graph so that this object will be garbage-collected
  @queue        = nil
  @channel      = nil
  @connection   = nil

  self.exec_callback_once(:cancel, cancel_ok)
end
handle_connection_interruption(method = nil) click to toggle source

@private

# File lib/amq/client/async/consumer.rb, line 145
def handle_connection_interruption(method = nil)
  self.exec_callback_yielding_self(:after_connection_interruption)
end
handle_consume_ok(consume_ok) click to toggle source
# File lib/amq/client/async/consumer.rb, line 204
def handle_consume_ok(consume_ok)
  self.exec_callback_once(:consume, consume_ok)
end
handle_delivery(basic_deliver, metadata, payload) click to toggle source

Implementation

# File lib/amq/client/async/consumer.rb, line 200
def handle_delivery(basic_deliver, metadata, payload)
  self.exec_callback(:delivery, basic_deliver, metadata, payload)
end
on_connection_interruption(&block) click to toggle source

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/consumer.rb, line 139
def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end
on_delivery(&block) click to toggle source
# File lib/amq/client/async/consumer.rb, line 99
def on_delivery(&block)
  self.append_callback(:delivery, &block)

  self
end
on_recovery(&block) click to toggle source

Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/consumer.rb, line 169
def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end
Also aliased as: after_recovery
reject(delivery_tag, requeue = true) click to toggle source

@return [Consumer] self

@api public @see bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)

# File lib/amq/client/async/consumer.rb, line 124
def reject(delivery_tag, requeue = true)
  @channel.reject(delivery_tag, requeue)

  self
end
resubscribe(&block) click to toggle source

Used by automatic recovery code. @api plugin

# File lib/amq/client/async/consumer.rb, line 73
def resubscribe(&block)
  @connection.send_frame(Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, block.nil?, @arguments))
  self.redefine_callback(:consume, &block) if block

  self
end
run_after_recovery_callbacks() click to toggle source

@private

# File lib/amq/client/async/consumer.rb, line 175
def run_after_recovery_callbacks
  self.exec_callback_yielding_self(:after_recovery)
end
run_before_recovery_callbacks() click to toggle source

@private

# File lib/amq/client/async/consumer.rb, line 160
def run_before_recovery_callbacks
  self.exec_callback_yielding_self(:before_recovery)
end

Protected Instance Methods

register_with_channel() click to toggle source
# File lib/amq/client/async/consumer.rb, line 253
def register_with_channel
  @channel.consumers[@consumer_tag] = self
end
register_with_queue() click to toggle source
# File lib/amq/client/async/consumer.rb, line 257
def register_with_queue
  @queue.consumers[@consumer_tag]   = self
end
unregister_with_channel() click to toggle source
# File lib/amq/client/async/consumer.rb, line 261
def unregister_with_channel
  @channel.consumers.delete(@consumer_tag)
end
unregister_with_queue() click to toggle source
# File lib/amq/client/async/consumer.rb, line 265
def unregister_with_queue
  @queue.consumers.delete(@consumer_tag)
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.