Object
@return [Hash] Additional arguments given on queue declaration. Typically used by AMQP extensions.
@return [AMQ::Client::Consumer] Default consumer (registered with {Queue#consume}).
@return [Class] AMQ::Client::Consumer or other class implementing consumer API. Used by libraries like {github.com/ruby-amqp/amqp Ruby amqp gem}. @api plugin
# File lib/amq/client/async/queue.rb, line 249 def self.consumer_class AMQ::Client::Async::Consumer end
@param [AMQ::Client::Adapter] AMQ networking adapter to use. @param [AMQ::Client::Channel] AMQ channel this queue object uses. @param [String] Queue name. Please note that AMQP spec does not require brokers to support Unicode for queue names. @api public
# File lib/amq/client/async/queue.rb, line 53 def initialize(connection, channel, name = AMQ::Protocol::EMPTY_STRING) raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil? super(connection) @name = name # this has to stay true even after queue.declare-ok arrives. MK. @server_named = @name.empty? if @server_named self.on_connection_interruption do # server-named queue need to get new names after recovery. MK. @name = AMQ::Protocol::EMPTY_STRING end end @channel = channel # primarily for autorecovery. MK. @bindings = Array.new @consumers = Hash.new end
Acknowledge a delivery tag. @return [Queue] self
@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.13.)
# File lib/amq/client/async/queue.rb, line 347 def acknowledge(delivery_tag) @channel.acknowledge(delivery_tag) self end
@return [Boolean] true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds). @api public
# File lib/amq/client/async/queue.rb, line 91 def auto_delete? @auto_delete end
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
@api plugin
# File lib/amq/client/async/queue.rb, line 426 def auto_recover self.exec_callback_yielding_self(:before_recovery) self.redeclare do self.rebind @consumers.each { |tag, consumer| consumer.auto_recover } self.exec_callback_yielding_self(:after_recovery) end end
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/queue.rb, line 392 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end
@return [Queue] self
@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.3.)
# File lib/amq/client/async/queue.rb, line 185 def bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block) nowait = true unless block exchange_name = if exchange.respond_to?(:name) exchange.name else exchange end @connection.send_frame(Protocol::Queue::Bind.encode(@channel.id, @name, exchange_name, routing_key, nowait, arguments)) if !nowait self.append_callback(:bind, &block) @channel.queues_awaiting_bind_ok.push(self) end # store bindings for automatic recovery, but BE VERY CAREFUL to # not cause an infinite rebinding loop here when we recover. MK. binding = { :exchange => exchange_name, :routing_key => routing_key, :arguments => arguments } @bindings.push(binding) unless @bindings.include?(binding) self end
Unsubscribes from message delivery. @return [Queue] self
@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.5.)
# File lib/amq/client/async/queue.rb, line 274 def cancel(nowait = false, &block) raise "There is no default consumer for this queue. This usually means that you are trying to unsubscribe a queue that never was subscribed for messages in the first place." if @default_consumer.nil? @default_consumer.cancel(nowait, &block) self end
@return [Queue] self
@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.3.)
# File lib/amq/client/async/queue.rb, line 259 def consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block) raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQ::Client::Consumer directly to register additional consumers.") if @default_consumer nowait = true unless block @default_consumer = self.class.consumer_class.new(@channel, self, generate_consumer_tag(@name), exclusive, no_ack, arguments, no_local, &block) @default_consumer.consume(nowait, &block) self end
Declares this queue.
@return [Queue] self
@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.1.)
# File lib/amq/client/async/queue.rb, line 105 def declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block) raise ArgumentError, "declaration with nowait does not make sense for server-named queues! Either specify name other than empty string or use #declare without nowait" if nowait && self.anonymous? # these two are for autorecovery. MK. @passive = passive @server_named = @name.empty? @durable = durable @exclusive = exclusive @auto_delete = auto_delete @arguments = arguments nowait = true if !block && !@name.empty? @connection.send_frame(Protocol::Queue::Declare.encode(@channel.id, @name, passive, durable, exclusive, auto_delete, nowait, arguments)) if !nowait self.append_callback(:declare, &block) @channel.queues_awaiting_declare_ok.push(self) end self end
Deletes this queue.
@param [Boolean] if_unused delete only if queue has no consumers (subscribers). @param [Boolean] if_empty delete only if queue has no messages in it. @param [Boolean] nowait Don't wait for reply from broker. @return [Queue] self
@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.9.)
# File lib/amq/client/async/queue.rb, line 162 def delete(if_unused = false, if_empty = false, nowait = false, &block) nowait = true unless block @connection.send_frame(Protocol::Queue::Delete.encode(@channel.id, @name, if_unused, if_empty, nowait)) if !nowait self.append_callback(:delete, &block) # TODO: delete itself from queues cache @channel.queues_awaiting_delete_ok.push(self) end self end
@return [Boolean] true if this queue was declared as durable (will survive broker restart). @api public
# File lib/amq/client/async/queue.rb, line 79 def durable? @durable end
@return [Boolean] true if this queue was declared as exclusive (limited to just one consumer) @api public
# File lib/amq/client/async/queue.rb, line 85 def exclusive? @exclusive end
Unique string supposed to be used as a consumer tag.
@return [String] Unique string. @api plugin
# File lib/amq/client/async/queue.rb, line 449 def generate_consumer_tag(name) "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}" end
Fetches messages from the queue. @return [Queue] self
@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.10.)
# File lib/amq/client/async/queue.rb, line 302 def get(no_ack = false, &block) @connection.send_frame(Protocol::Basic::Get.encode(@channel.id, @name, no_ack)) # most people only want one callback per #get call. Consider the following example: # # 100.times { queue.get { ... } } # # most likely you won't expect 100 callback runs per message here. MK. self.redefine_callback(:get, &block) @channel.queues_awaiting_get_response.push(self) self end
# File lib/amq/client/async/queue.rb, line 474 def handle_bind_ok(method) self.exec_callback_once(:bind, method) end
@private
# File lib/amq/client/async/queue.rb, line 381 def handle_connection_interruption(method = nil) @consumers.each { |tag, consumer| consumer.handle_connection_interruption(method) } self.exec_callback_yielding_self(:after_connection_interruption) end
# File lib/amq/client/async/queue.rb, line 459 def handle_declare_ok(method) @name = method.queue if @name.empty? @channel.register_queue(self) self.exec_callback_once_yielding_self(:declare, method) end
# File lib/amq/client/async/queue.rb, line 466 def handle_delete_ok(method) self.exec_callback_once(:delete, method) end
# File lib/amq/client/async/queue.rb, line 487 def handle_get_empty(method) method = Protocol::GetResponse.new(method) self.exec_callback(:get, method) end
# File lib/amq/client/async/queue.rb, line 482 def handle_get_ok(method, header, payload) method = Protocol::GetResponse.new(method) self.exec_callback(:get, method, header, payload) end
# File lib/amq/client/async/queue.rb, line 470 def handle_purge_ok(method) self.exec_callback_once(:purge, method) end
# File lib/amq/client/async/queue.rb, line 478 def handle_unbind_ok(method) self.exec_callback_once(:unbind, method) end
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/queue.rb, line 375 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end
@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.8.3.9)
# File lib/amq/client/async/queue.rb, line 292 def on_delivery(&block) @default_consumer.on_delivery(&block) end
Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/queue.rb, line 408 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end
Purges (removes all messagse from) the queue. @return [Queue] self
@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.7.)
# File lib/amq/client/async/queue.rb, line 323 def purge(nowait = false, &block) nowait = true unless block @connection.send_frame(Protocol::Queue::Purge.encode(@channel.id, @name, nowait)) if !nowait self.redefine_callback(:purge, &block) # TODO: handle channel & connection-level exceptions @channel.queues_awaiting_purge_ok.push(self) end self end
Used by automatic recovery machinery. @private @api plugin
# File lib/amq/client/async/queue.rb, line 236 def rebind(&block) @bindings.each { |b| self.bind(b[:exchange], b[:routing_key], true, b[:arguments]) } end
Re-declares queue with the same attributes @api public
# File lib/amq/client/async/queue.rb, line 130 def redeclare(&block) nowait = true if !block && !@name.empty? # server-named queues get their new generated names. new_name = if @server_named AMQ::Protocol::EMPTY_STRING else @name end @connection.send_frame(Protocol::Queue::Declare.encode(@channel.id, new_name, @passive, @durable, @exclusive, @auto_delete, false, @arguments)) if !nowait self.append_callback(:declare, &block) @channel.queues_awaiting_declare_ok.push(self) end self end
@return [Queue] self
@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.14.)
# File lib/amq/client/async/queue.rb, line 358 def reject(delivery_tag, requeue = true) @channel.reject(delivery_tag, requeue) self end
@private
# File lib/amq/client/async/queue.rb, line 414 def run_after_recovery_callbacks self.exec_callback_yielding_self(:after_recovery) @consumers.each { |tag, c| c.run_after_recovery_callbacks } end
@private
# File lib/amq/client/async/queue.rb, line 397 def run_before_recovery_callbacks self.exec_callback_yielding_self(:before_recovery) @consumers.each { |tag, c| c.run_before_recovery_callbacks } end
@return [Queue] self
@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.5.)
# File lib/amq/client/async/queue.rb, line 214 def unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block) exchange_name = if exchange.respond_to?(:name) exchange.name else exchange end @connection.send_frame(Protocol::Queue::Unbind.encode(@channel.id, @name, exchange_name, routing_key, arguments)) self.append_callback(:unbind, &block) @channel.queues_awaiting_unbind_ok.push(self) @bindings.delete_if { |b| b[:exchange] == exchange_name } self end
Generated with the Darkfish Rdoc Generator 2.