class Bunny::Queue
Represents AMQP 0.9.1 queue.
@see rubybunny.info/articles/queues.html Queues and Consumers guide @see rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
Attributes
@return [Bunny::Channel] Channel this queue uses
@return [String] Queue name
@return [Hash] Options this queue was created with
Public Class Methods
@param [Bunny::Channel] channel Channel this queue will use. @param [String] name Queue name. Pass an empty string to make RabbitMQ generate a unique one. @param [Hash] opts Queue properties
@option opts [Boolean] :durable (false) Should this queue be durable? @option opts [Boolean] :auto_delete (false) Should this queue be automatically deleted when the last consumer disconnects? @option opts [Boolean] :exclusive (false) Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)? @option opts [Boolean] :arguments ({}) Additional optional arguments (typically used by RabbitMQ extensions and plugins)
@see Bunny::Channel#queue @see rubybunny.info/articles/queues.html Queues and Consumers guide @see rubybunny.info/articles/extensions.html RabbitMQ Extensions guide @api public
# File lib/bunny/queue.rb, line 34 def initialize(channel, name = AMQ::Protocol::EMPTY_STRING, opts = {}) # old Bunny versions pass a connection here. In that case, # we just use default channel from it. MK. @channel = channel @name = name @options = self.class.add_default_options(name, opts) @consumers = Hash.new @durable = @options[:durable] @exclusive = @options[:exclusive] @server_named = @name.empty? @auto_delete = @options[:auto_delete] @arguments = @options[:arguments] @bindings = Array.new @default_consumer = nil declare! unless opts[:no_declare] @channel.register_queue(self) end
Protected Class Methods
@private
# File lib/bunny/queue.rb, line 378 def self.add_default_options(name, opts, block) { :queue => name, :nowait => (block.nil? && !name.empty?) }.merge(opts) end
Public Instance Methods
@return [Hash] Additional optional arguments (typically used by RabbitMQ extensions and plugins) @api public
# File lib/bunny/queue.rb, line 87 def arguments @arguments end
@return [Boolean] true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds). @api public @see rubybunny.info/articles/queues.html Queues and Consumers guide
# File lib/bunny/queue.rb, line 74 def auto_delete? @auto_delete end
Binds queue to an exchange
@param [Bunny::Exchange,String] exchange Exchange to bind to @param [Hash] opts Binding properties
@option opts [String] :routing_key Routing key @option opts [Hash] :arguments ({}) Additional optional binding arguments
@see rubybunny.info/articles/queues.html Queues and Consumers guide @see rubybunny.info/articles/bindings.html Bindings guide @api public
# File lib/bunny/queue.rb, line 111 def bind(exchange, opts = {}) @channel.queue_bind(@name, exchange, opts) exchange_name = if exchange.respond_to?(:name) exchange.name else exchange end # store bindings for automatic recovery. We need to be very careful to # not cause an infinite rebinding loop here when we recover. MK. binding = { :exchange => exchange_name, :routing_key => (opts[:routing_key] || opts[:key]), :arguments => opts[:arguments] } @bindings.push(binding) unless @bindings.include?(binding) self end
@return [Integer] How many active consumers the queue has
# File lib/bunny/queue.rb, line 324 def consumer_count s = self.status s[:consumer_count] end
@private
# File lib/bunny/queue.rb, line 370 def declare! queue_declare_ok = @channel.queue_declare(@name, @options) @name = queue_declare_ok.queue end
Deletes the queue
@param [Hash] opts Options
@option opts [Boolean] if_unused (false) Should this queue be deleted only if it has no consumers? @option opts [Boolean] if_empty (false) Should this queue be deleted only if it has no messages?
@see rubybunny.info/articles/queues.html Queues and Consumers guide @api public
# File lib/bunny/queue.rb, line 294 def delete(opts = {}) @channel.deregister_queue(self) @channel.queue_delete(@name, opts) end
@return [Boolean] true if this queue was declared as durable (will survive broker restart). @api public @see rubybunny.info/articles/queues.html Queues and Consumers guide
# File lib/bunny/queue.rb, line 60 def durable? @durable end
@return [Boolean] true if this queue was declared as exclusive (limited to just one consumer) @api public @see rubybunny.info/articles/queues.html Queues and Consumers guide
# File lib/bunny/queue.rb, line 67 def exclusive? @exclusive end
# File lib/bunny/queue.rb, line 96 def inspect to_s end
@return [Integer] How many messages the queue has ready (e.g. not delivered but not unacknowledged)
# File lib/bunny/queue.rb, line 318 def message_count s = self.status s[:message_count] end
@param [Hash] opts Options
@option opts [Boolean] :ack (false) [DEPRECATED] Use :manual_ack instead @option opts [Boolean] :manual_ack (false) Will the message be acknowledged manually?
@return [Array] Triple of delivery info, message properties and message content.
If the queue is empty, all three will be nils.
@see rubybunny.info/articles/queues.html Queues and Consumers guide @see #subscribe @api public
@example
conn = Bunny.new conn.start ch = conn.create_channel q = ch.queue("test1") x = ch.default_exchange x.publish("Hello, everybody!", :routing_key => 'test1') delivery_info, properties, payload = q.pop puts "This is the message: " + payload + "\n\n" conn.close
# File lib/bunny/queue.rb, line 243 def pop(opts = {:manual_ack => false}, &block) unless opts[:ack].nil? warn "[DEPRECATION] `:ack` is deprecated. Please use `:manual_ack` instead." opts[:manual_ack] = opts[:ack] end get_response, properties, content = @channel.basic_get(@name, opts) if block if properties di = GetResponse.new(get_response, properties, @channel) mp = MessageProperties.new(properties) block.call(di, mp, content) else block.call(nil, nil, nil) end else if properties di = GetResponse.new(get_response, properties, @channel) mp = MessageProperties.new(properties) [di, mp, content] else [nil, nil, nil] end end end
Publishes a message to the queue via default exchange. Takes the same arguments as {Bunny::Exchange#publish}
@see Bunny::Exchange#publish @see Bunny::Channel#default_exchange @see rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# File lib/bunny/queue.rb, line 278 def publish(payload, opts = {}) @channel.default_exchange.publish(payload, opts.merge(:routing_key => @name)) self end
Purges a queue (removes all messages from it) @see rubybunny.info/articles/queues.html Queues and Consumers guide @api public
# File lib/bunny/queue.rb, line 302 def purge(opts = {}) @channel.queue_purge(@name, opts) self end
@private
# File lib/bunny/queue.rb, line 356 def recover_bindings @bindings.each do |b| # TODO: inject and use logger # puts "Recovering binding #{b.inspect}" self.bind(b[:exchange], b) end end
@private
# File lib/bunny/queue.rb, line 334 def recover_from_network_failure if self.server_named? old_name = @name.dup @name = AMQ::Protocol::EMPTY_STRING @channel.deregister_queue_named(old_name) end # TODO: inject and use logger # puts "Recovering queue #{@name}" begin declare! @channel.register_queue(self) rescue Exception => e # TODO: inject and use logger puts "Caught #{e.inspect} while redeclaring and registering #{@name}!" end recover_bindings end
@return [Boolean] true if this queue was declared as server named. @api public @see rubybunny.info/articles/queues.html Queues and Consumers guide
# File lib/bunny/queue.rb, line 81 def server_named? @server_named end
@return [Hash] A hash with information about the number of queue messages and consumers @see message_count @see consumer_count
# File lib/bunny/queue.rb, line 311 def status queue_declare_ok = @channel.queue_declare(@name, @options.merge(:passive => true)) {:message_count => queue_declare_ok.message_count, :consumer_count => queue_declare_ok.consumer_count} end
Adds a consumer to the queue (subscribes for message deliveries).
@param [Hash] opts Options
@option opts [Boolean] :ack (false) [DEPRECATED] Use :manual_ack instead @option opts [Boolean] :manual_ack (false) Will this consumer use manual acknowledgements? @option opts [Boolean] :exclusive (false) Should this consumer be exclusive for this queue? @option opts [Boolean] :block (false) Should the call block calling thread? @option opts [#call] :on_cancellation Block to execute when this consumer is cancelled remotely (e.g. via the RabbitMQ Management plugin) @option opts [String] :consumer_tag Unique consumer identifier. It is usually recommended to let Bunny generate it for you. @option opts [Hash] :arguments ({}) Additional (optional) arguments, typically used by RabbitMQ extensions
@see rubybunny.info/articles/queues.html Queues and Consumers guide @api public
# File lib/bunny/queue.rb, line 169 def subscribe(opts = { :consumer_tag => @channel.generate_consumer_tag, :manual_ack => false, :exclusive => false, :block => false, :on_cancellation => nil }, &block) unless opts[:ack].nil? warn "[DEPRECATION] `:ack` is deprecated. Please use `:manual_ack` instead." opts[:manual_ack] = opts[:ack] end ctag = opts.fetch(:consumer_tag, @channel.generate_consumer_tag) consumer = Consumer.new(@channel, self, ctag, !opts[:manual_ack], opts[:exclusive], opts[:arguments]) consumer.on_delivery(&block) consumer.on_cancellation(&opts[:on_cancellation]) if opts[:on_cancellation] @channel.basic_consume_with(consumer) if opts[:block] # joins current thread with the consumers pool, will block # the current thread for as long as the consumer pool is active @channel.work_pool.join end consumer end
Adds a consumer object to the queue (subscribes for message deliveries).
@param [Bunny::Consumer] consumer a {Bunny::Consumer} subclass that implements consumer interface @param [Hash] opts Options
@option opts [Boolean] block (false) Should the call block calling thread?
@see rubybunny.info/articles/queues.html Queues and Consumers guide @api public
# File lib/bunny/queue.rb, line 212 def subscribe_with(consumer, opts = {:block => false}) @channel.basic_consume_with(consumer) @channel.work_pool.join if opts[:block] consumer end
# File lib/bunny/queue.rb, line 91 def to_s oid = ("0x%x" % (self.object_id << 1)) "<#{self.class.name}:#{oid} @name=\"#{name}\" channel=#{@channel.to_s} @durable=#{@durable} @auto_delete=#{@auto_delete} @exclusive=#{@exclusive} @arguments=#{@arguments}>" end
Unbinds queue from an exchange
@param [Bunny::Exchange,String] exchange Exchange to unbind from @param [Hash] opts Binding properties
@option opts [String] :routing_key Routing key @option opts [Hash] :arguments ({}) Additional optional binding arguments
@see rubybunny.info/articles/queues.html Queues and Consumers guide @see rubybunny.info/articles/bindings.html Bindings guide @api public
# File lib/bunny/queue.rb, line 140 def unbind(exchange, opts = {}) @channel.queue_unbind(@name, exchange, opts) exchange_name = if exchange.respond_to?(:name) exchange.name else exchange end @bindings.delete_if { |b| b[:exchange] == exchange_name && b[:routing_key] == (opts[:routing_key] || opts[:key]) && b[:arguments] == opts[:arguments] } self end