Parent

Class/Module Index [+]

Quicksearch

AMQ::Client::Async::Queue

Attributes

arguments[R]

@return [Hash] Additional arguments given on queue declaration. Typically used by AMQP extensions.

bindings[R]

@return [Array<Hash>]

channel[R]

Channel this queue belongs to. @return [AMQ::Client::Channel]

consumers[R]

@return [Array<Hash>] All consumers on this queue.

default_consumer[R]

@return [AMQ::Client::Consumer] Default consumer (registered with {Queue#consume}).

name[R]

Qeueue name. May be server-generated or assigned directly. @return [String]

Public Class Methods

consumer_class() click to toggle source

@return [Class] AMQ::Client::Consumer or other class implementing consumer API. Used by libraries like {github.com/ruby-amqp/amqp Ruby amqp gem}. @api plugin

# File lib/amq/client/async/queue.rb, line 249
def self.consumer_class
  AMQ::Client::Async::Consumer
end
new(connection, channel, name = AMQ::Protocol::EMPTY_STRING) click to toggle source

@param [AMQ::Client::Adapter] AMQ networking adapter to use. @param [AMQ::Client::Channel] AMQ channel this queue object uses. @param [String] Queue name. Please note that AMQP spec does not require brokers to support Unicode for queue names. @api public

# File lib/amq/client/async/queue.rb, line 53
def initialize(connection, channel, name = AMQ::Protocol::EMPTY_STRING)
  raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil?

  super(connection)

  @name         = name
  # this has to stay true even after queue.declare-ok arrives. MK.
  @server_named = @name.empty?
  if @server_named
    self.on_connection_interruption do
      # server-named queue need to get new names after recovery. MK.
      @name = AMQ::Protocol::EMPTY_STRING
    end
  end

  @channel      = channel

  # primarily for autorecovery. MK.
  @bindings  = Array.new

  @consumers = Hash.new
end

Public Instance Methods

acknowledge(delivery_tag) click to toggle source

Acknowledge a delivery tag. @return [Queue] self

@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.13.)

# File lib/amq/client/async/queue.rb, line 347
def acknowledge(delivery_tag)
  @channel.acknowledge(delivery_tag)

  self
end
after_connection_interruption(&block) click to toggle source
after_recovery(&block) click to toggle source
Alias for: on_recovery
auto_delete?() click to toggle source

@return [Boolean] true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds). @api public

# File lib/amq/client/async/queue.rb, line 91
def auto_delete?
  @auto_delete
end
auto_recover() click to toggle source

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).

@api plugin

# File lib/amq/client/async/queue.rb, line 426
def auto_recover
  self.exec_callback_yielding_self(:before_recovery)
  self.redeclare do
    self.rebind

    @consumers.each { |tag, consumer| consumer.auto_recover }

    self.exec_callback_yielding_self(:after_recovery)
  end
end
before_recovery(&block) click to toggle source

Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/queue.rb, line 392
def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end
bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block) click to toggle source

@return [Queue] self

@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.3.)

# File lib/amq/client/async/queue.rb, line 185
def bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block)
  nowait = true unless block
  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else

                    exchange
                  end

  @connection.send_frame(Protocol::Queue::Bind.encode(@channel.id, @name, exchange_name, routing_key, nowait, arguments))

  if !nowait
    self.append_callback(:bind, &block)
    @channel.queues_awaiting_bind_ok.push(self)
  end

  # store bindings for automatic recovery, but BE VERY CAREFUL to
  # not cause an infinite rebinding loop here when we recover. MK.
  binding = { :exchange => exchange_name, :routing_key => routing_key, :arguments => arguments }
  @bindings.push(binding) unless @bindings.include?(binding)

  self
end
cancel(nowait = false, &block) click to toggle source

Unsubscribes from message delivery. @return [Queue] self

@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.5.)

# File lib/amq/client/async/queue.rb, line 274
def cancel(nowait = false, &block)
  raise "There is no default consumer for this queue. This usually means that you are trying to unsubscribe a queue that never was subscribed for messages in the first place." if @default_consumer.nil?

  @default_consumer.cancel(nowait, &block)

  self
end
consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block) click to toggle source

@return [Queue] self

@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.3.)

# File lib/amq/client/async/queue.rb, line 259
def consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block)
  raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQ::Client::Consumer directly to register additional consumers.") if @default_consumer

  nowait            = true unless block
  @default_consumer = self.class.consumer_class.new(@channel, self, generate_consumer_tag(@name), exclusive, no_ack, arguments, no_local, &block)
  @default_consumer.consume(nowait, &block)

  self
end
declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block) click to toggle source

Declares this queue.

@return [Queue] self

@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.1.)

# File lib/amq/client/async/queue.rb, line 105
def declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block)
  raise ArgumentError, "declaration with nowait does not make sense for server-named queues! Either specify name other than empty string or use #declare without nowait" if nowait && self.anonymous?

  # these two are for autorecovery. MK.
  @passive      = passive
  @server_named = @name.empty?

  @durable     = durable
  @exclusive   = exclusive
  @auto_delete = auto_delete
  @arguments   = arguments

  nowait = true if !block && !@name.empty?
  @connection.send_frame(Protocol::Queue::Declare.encode(@channel.id, @name, passive, durable, exclusive, auto_delete, nowait, arguments))

  if !nowait
    self.append_callback(:declare, &block)
    @channel.queues_awaiting_declare_ok.push(self)
  end

  self
end
delete(if_unused = false, if_empty = false, nowait = false, &block) click to toggle source

Deletes this queue.

@param [Boolean] if_unused delete only if queue has no consumers (subscribers). @param [Boolean] if_empty delete only if queue has no messages in it. @param [Boolean] nowait Don't wait for reply from broker. @return [Queue] self

@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.9.)

# File lib/amq/client/async/queue.rb, line 162
def delete(if_unused = false, if_empty = false, nowait = false, &block)
  nowait = true unless block
  @connection.send_frame(Protocol::Queue::Delete.encode(@channel.id, @name, if_unused, if_empty, nowait))

  if !nowait
    self.append_callback(:delete, &block)

    # TODO: delete itself from queues cache
    @channel.queues_awaiting_delete_ok.push(self)
  end

  self
end
durable?() click to toggle source

@return [Boolean] true if this queue was declared as durable (will survive broker restart). @api public

# File lib/amq/client/async/queue.rb, line 79
def durable?
  @durable
end
exclusive?() click to toggle source

@return [Boolean] true if this queue was declared as exclusive (limited to just one consumer) @api public

# File lib/amq/client/async/queue.rb, line 85
def exclusive?
  @exclusive
end
generate_consumer_tag(name) click to toggle source

Unique string supposed to be used as a consumer tag.

@return [String] Unique string. @api plugin

# File lib/amq/client/async/queue.rb, line 449
def generate_consumer_tag(name)
  "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
end
get(no_ack = false, &block) click to toggle source

Fetches messages from the queue. @return [Queue] self

@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.10.)

# File lib/amq/client/async/queue.rb, line 302
def get(no_ack = false, &block)
  @connection.send_frame(Protocol::Basic::Get.encode(@channel.id, @name, no_ack))

  # most people only want one callback per #get call. Consider the following example:
  #
  # 100.times { queue.get { ... } }
  #
  # most likely you won't expect 100 callback runs per message here. MK.
  self.redefine_callback(:get, &block)
  @channel.queues_awaiting_get_response.push(self)

  self
end
handle_bind_ok(method) click to toggle source
# File lib/amq/client/async/queue.rb, line 474
def handle_bind_ok(method)
  self.exec_callback_once(:bind, method)
end
handle_connection_interruption(method = nil) click to toggle source

@private

# File lib/amq/client/async/queue.rb, line 381
def handle_connection_interruption(method = nil)
  @consumers.each { |tag, consumer| consumer.handle_connection_interruption(method) }

  self.exec_callback_yielding_self(:after_connection_interruption)
end
handle_declare_ok(method) click to toggle source
# File lib/amq/client/async/queue.rb, line 459
def handle_declare_ok(method)
  @name = method.queue if @name.empty?
  @channel.register_queue(self)

  self.exec_callback_once_yielding_self(:declare, method)
end
handle_delete_ok(method) click to toggle source
# File lib/amq/client/async/queue.rb, line 466
def handle_delete_ok(method)
  self.exec_callback_once(:delete, method)
end
handle_get_empty(method) click to toggle source
# File lib/amq/client/async/queue.rb, line 487
def handle_get_empty(method)
  method = Protocol::GetResponse.new(method)
  self.exec_callback(:get, method)
end
handle_get_ok(method, header, payload) click to toggle source
# File lib/amq/client/async/queue.rb, line 482
def handle_get_ok(method, header, payload)
  method = Protocol::GetResponse.new(method)
  self.exec_callback(:get, method, header, payload)
end
handle_purge_ok(method) click to toggle source
# File lib/amq/client/async/queue.rb, line 470
def handle_purge_ok(method)
  self.exec_callback_once(:purge, method)
end
handle_unbind_ok(method) click to toggle source
# File lib/amq/client/async/queue.rb, line 478
def handle_unbind_ok(method)
  self.exec_callback_once(:unbind, method)
end
on_connection_interruption(&block) click to toggle source

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/queue.rb, line 375
def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end
on_delivery(&block) click to toggle source

@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.8.3.9)

# File lib/amq/client/async/queue.rb, line 292
def on_delivery(&block)
  @default_consumer.on_delivery(&block)
end
on_recovery(&block) click to toggle source

Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/queue.rb, line 408
def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end
Also aliased as: after_recovery
purge(nowait = false, &block) click to toggle source

Purges (removes all messagse from) the queue. @return [Queue] self

@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.7.)

# File lib/amq/client/async/queue.rb, line 323
def purge(nowait = false, &block)
  nowait = true unless block
  @connection.send_frame(Protocol::Queue::Purge.encode(@channel.id, @name, nowait))

  if !nowait
    self.redefine_callback(:purge, &block)
    # TODO: handle channel & connection-level exceptions
    @channel.queues_awaiting_purge_ok.push(self)
  end

  self
end
rebind(&block) click to toggle source

Used by automatic recovery machinery. @private @api plugin

# File lib/amq/client/async/queue.rb, line 236
def rebind(&block)
  @bindings.each { |b| self.bind(b[:exchange], b[:routing_key], true, b[:arguments]) }
end
redeclare(&block) click to toggle source

Re-declares queue with the same attributes @api public

# File lib/amq/client/async/queue.rb, line 130
def redeclare(&block)
  nowait = true if !block && !@name.empty?

  # server-named queues get their new generated names.
  new_name = if @server_named
               AMQ::Protocol::EMPTY_STRING
             else
               @name
             end
  @connection.send_frame(Protocol::Queue::Declare.encode(@channel.id, new_name, @passive, @durable, @exclusive, @auto_delete, false, @arguments))

  if !nowait
    self.append_callback(:declare, &block)
    @channel.queues_awaiting_declare_ok.push(self)
  end

  self
end
reject(delivery_tag, requeue = true) click to toggle source

@return [Queue] self

@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.14.)

# File lib/amq/client/async/queue.rb, line 358
def reject(delivery_tag, requeue = true)
  @channel.reject(delivery_tag, requeue)

  self
end
run_after_recovery_callbacks() click to toggle source

@private

# File lib/amq/client/async/queue.rb, line 414
def run_after_recovery_callbacks
  self.exec_callback_yielding_self(:after_recovery)

  @consumers.each { |tag, c| c.run_after_recovery_callbacks }
end
run_before_recovery_callbacks() click to toggle source

@private

# File lib/amq/client/async/queue.rb, line 397
def run_before_recovery_callbacks
  self.exec_callback_yielding_self(:before_recovery)

  @consumers.each { |tag, c| c.run_before_recovery_callbacks }
end
unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block) click to toggle source

@return [Queue] self

@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.5.)

# File lib/amq/client/async/queue.rb, line 214
def unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block)
  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else

                    exchange
                  end

  @connection.send_frame(Protocol::Queue::Unbind.encode(@channel.id, @name, exchange_name, routing_key, arguments))

  self.append_callback(:unbind, &block)
  @channel.queues_awaiting_unbind_ok.push(self)


  @bindings.delete_if { |b| b[:exchange] == exchange_name }

  self
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.