Object
# File lib/amq/client/async/consumer.rb, line 38 def initialize(channel, queue, consumer_tag = self.class.tag_generator.generate_for(queue), exclusive = false, no_ack = false, arguments = {}, no_local = false, &block) @callbacks = Hash.new @channel = channel || raise(ArgumentError, "channel is nil") @connection = channel.connection || raise(ArgumentError, "connection is nil") @queue = queue || raise(ArgumentError, "queue is nil") @consumer_tag = consumer_tag @exclusive = exclusive @no_ack = no_ack @arguments = arguments @no_local = no_local self.register_with_channel self.register_with_queue end
Acknowledge a delivery tag. @return [Consumer] self
@api public @see bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)
# File lib/amq/client/async/consumer.rb, line 113 def acknowledge(delivery_tag) @channel.acknowledge(delivery_tag) self end
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
@api plugin
# File lib/amq/client/async/consumer.rb, line 185 def auto_recover self.exec_callback_yielding_self(:before_recovery) self.resubscribe self.exec_callback_yielding_self(:after_recovery) end
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/consumer.rb, line 155 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end
# File lib/amq/client/async/consumer.rb, line 81 def cancel(nowait = false, &block) @connection.send_frame(Protocol::Basic::Cancel.encode(@channel.id, @consumer_tag, nowait)) self.clear_callbacks(:delivery) self.clear_callbacks(:consume) self.unregister_with_channel self.unregister_with_queue if !nowait self.redefine_callback(:cancel, &block) @channel.consumers_awaiting_cancel_ok.push(self) end self end
# File lib/amq/client/async/consumer.rb, line 62 def consume(nowait = false, &block) @connection.send_frame(Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, nowait, @arguments)) self.redefine_callback(:consume, &block) @channel.consumers_awaiting_consume_ok.push(self) self end
# File lib/amq/client/async/consumer.rb, line 56 def exclusive? !!@exclusive end
# File lib/amq/client/async/consumer.rb, line 208 def handle_cancel_ok(cancel_ok) @consumer_tag = nil # detach from object graph so that this object will be garbage-collected @queue = nil @channel = nil @connection = nil self.exec_callback_once(:cancel, cancel_ok) end
@private
# File lib/amq/client/async/consumer.rb, line 145 def handle_connection_interruption(method = nil) self.exec_callback_yielding_self(:after_connection_interruption) end
# File lib/amq/client/async/consumer.rb, line 204 def handle_consume_ok(consume_ok) self.exec_callback_once(:consume, consume_ok) end
Implementation
# File lib/amq/client/async/consumer.rb, line 200 def handle_delivery(basic_deliver, metadata, payload) self.exec_callback(:delivery, basic_deliver, metadata, payload) end
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/consumer.rb, line 139 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end
# File lib/amq/client/async/consumer.rb, line 99 def on_delivery(&block) self.append_callback(:delivery, &block) self end
Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/consumer.rb, line 169 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end
@return [Consumer] self
@api public @see bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)
# File lib/amq/client/async/consumer.rb, line 124 def reject(delivery_tag, requeue = true) @channel.reject(delivery_tag, requeue) self end
Used by automatic recovery code. @api plugin
# File lib/amq/client/async/consumer.rb, line 73 def resubscribe(&block) @connection.send_frame(Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, block.nil?, @arguments)) self.redefine_callback(:consume, &block) if block self end
# File lib/amq/client/async/consumer.rb, line 253 def register_with_channel @channel.consumers[@consumer_tag] = self end
# File lib/amq/client/async/consumer.rb, line 257 def register_with_queue @queue.consumers[@consumer_tag] = self end
Generated with the Darkfish Rdoc Generator 2.