Object
# File lib/amq/client/async/channel.rb, line 38 def initialize(connection, id, options = {}) super(connection) @id = id @exchanges = Hash.new @queues = Hash.new @consumers = Hash.new @options = { :auto_recovery => connection.auto_recovering? }.merge(options) @auto_recovery = (!!@options[:auto_recovery]) # we must synchronize frameset delivery. MK. @mutex = Mutex.new reset_state! # 65536 is here for cases when channel is opened without passing a callback in, # otherwise channel_mix would be nil and it causes a lot of needless headaches. # lets just have this default. MK. channel_max = if @connection.open? @connection.channel_max || 65536 else 65536 end if channel_max != 0 && !(0..channel_max).include?(id) raise ArgumentError.new("Max channel for the connection is #{channel_max}, given: #{id}") end end
Acknowledge one or all messages on the channel.
@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.13.)
# File lib/amq/client/async/channel.rb, line 136 def acknowledge(delivery_tag, multiple = false) @connection.send_frame(Protocol::Basic::Ack.encode(self.id, delivery_tag, multiple)) self end
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
@api plugin
# File lib/amq/client/async/channel.rb, line 321 def auto_recover return unless auto_recovering? self.open do # exchanges must be recovered first because queue recovery includes recovery of bindings. MK. @exchanges.each { |name, e| e.auto_recover } @queues.each { |name, q| q.auto_recover } end end
@return [Boolean] true if this channel uses automatic recovery mode
# File lib/amq/client/async/channel.rb, line 68 def auto_recovering? @auto_recovery end
Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/channel.rb, line 285 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end
Closes AMQP channel.
@api public
# File lib/amq/client/async/channel.rb, line 120 def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) @connection.send_frame(Protocol::Channel::Close.encode(@id, reply_code, reply_text, class_id, method_id)) self.redefine_callback :close, &block end
AMQP connection this channel belongs to.
@return [AMQ::Client::Connection] Connection this channel belongs to.
# File lib/amq/client/async/channel.rb, line 92 def connection @connection end
@return [Hash<String, Consumer>]
# File lib/amq/client/async/channel.rb, line 74 def consumers @consumers end
@return [Array<Exchange>] Collection of exchanges that were declared on this channel.
# File lib/amq/client/async/channel.rb, line 84 def exchanges @exchanges.values end
Finds exchange in the exchanges cache on this channel by name. Exchange only exists in the cache if it was previously instantiated on this channel.
@param [String] name Exchange name @return [AMQ::Client::Exchange] Exchange (if found) @api plugin
# File lib/amq/client/async/channel.rb, line 350 def find_exchange(name) @exchanges[name] end
@api plugin @private
# File lib/amq/client/async/channel.rb, line 364 def find_queue(name) @queues[name] end
Asks the peer to pause or restart the flow of content data sent to a consumer. This is a simple flowcontrol mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. It does not affect contents returned to Queue#get callers.
@param [Boolean] active Desired flow state.
@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.5.2.3.) @api public
# File lib/amq/client/async/channel.rb, line 195 def flow(active = false, &block) @connection.send_frame(Protocol::Channel::Flow.encode(@id, active)) self.redefine_callback :flow, &block self end
@return [Boolean] True if flow in this channel is active (messages will be delivered to consumers that use this channel).
@api public
# File lib/amq/client/async/channel.rb, line 205 def flow_is_active? @flow_is_active end
@api plugin @private
# File lib/amq/client/async/channel.rb, line 412 def handle_close(channel_close) self.status = :closed self.connection.clear_frames_on(self.id) self.exec_callback_yielding_self(:error, channel_close) self.handle_connection_interruption(channel_close) end
@api plugin @private
# File lib/amq/client/async/channel.rb, line 404 def handle_close_ok(close_ok) self.status = :closed self.connection.clear_frames_on(self.id) self.exec_callback_once_yielding_self(:close, close_ok) end
@private
# File lib/amq/client/async/channel.rb, line 271 def handle_connection_interruption(method = nil) @queues.each { |name, q| q.handle_connection_interruption(method) } @exchanges.each { |name, e| e.handle_connection_interruption(method) } self.exec_callback_yielding_self(:after_connection_interruption) self.reset_state! end
@api plugin @private
# File lib/amq/client/async/channel.rb, line 397 def handle_open_ok(open_ok) self.status = :opened self.exec_callback_once_yielding_self(:open, open_ok) end
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/channel.rb, line 265 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end
Defines a callback that will be executed when channel is closed after channel-level exception.
@api public
# File lib/amq/client/async/channel.rb, line 256 def on_error(&block) self.define_callback(:error, &block) end
Defines a callback that will be executed after AMQP connection has recovered after a network failure. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/channel.rb, line 303 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end
Opens AMQP channel.
@api public
# File lib/amq/client/async/channel.rb, line 108 def open(&block) @connection.send_frame(Protocol::Channel::Open.encode(@id, AMQ::Protocol::EMPTY_STRING)) @connection.channels[@id] = self self.status = :opening self.redefine_callback :open, &block end
Requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection.
@note RabbitMQ as of 2.3.1 does not support prefetch_size. @api public
# File lib/amq/client/async/channel.rb, line 178 def qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) @connection.send_frame(Protocol::Basic::Qos.encode(@id, prefetch_size, prefetch_count, global)) self.redefine_callback :qos, &block self end
@return [Array<Queue>] Collection of queues that were declared on this channel.
# File lib/amq/client/async/channel.rb, line 79 def queues @queues.values end
Notifies AMQ broker that consumer has recovered and unacknowledged messages need to be redelivered.
@return [Channel] self
@note RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false. @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.16.) @api public
# File lib/amq/client/async/channel.rb, line 160 def recover(requeue = true, &block) @connection.send_frame(Protocol::Basic::Recover.encode(@id, requeue)) self.redefine_callback :recover, &block self end
Implementation
# File lib/amq/client/async/channel.rb, line 338 def register_exchange(exchange) raise ArgumentError, "argument is nil!" if exchange.nil? @exchanges[exchange.name] = exchange end
@api plugin @private
# File lib/amq/client/async/channel.rb, line 356 def register_queue(queue) raise ArgumentError, "argument is nil!" if queue.nil? @queues[queue.name] = queue end
Reject a message with given delivery tag.
@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.14.)
# File lib/amq/client/async/channel.rb, line 146 def reject(delivery_tag, requeue = true) @connection.send_frame(Protocol::Basic::Reject.encode(self.id, delivery_tag, requeue)) self end
@api plugin @private
# File lib/amq/client/async/channel.rb, line 374 def reset_state! @flow_is_active = true @queues_awaiting_declare_ok = Array.new @exchanges_awaiting_declare_ok = Array.new @queues_awaiting_delete_ok = Array.new @exchanges_awaiting_delete_ok = Array.new @queues_awaiting_purge_ok = Array.new @queues_awaiting_bind_ok = Array.new @queues_awaiting_unbind_ok = Array.new @consumers_awaiting_consume_ok = Array.new @consumers_awaiting_cancel_ok = Array.new @queues_awaiting_get_response = Array.new @callbacks = @callbacks.delete_if { |k, v| !RECOVERY_EVENTS.include?(k) } end
@private
# File lib/amq/client/async/channel.rb, line 309 def run_after_recovery_callbacks self.exec_callback_yielding_self(:after_recovery) @queues.each { |name, q| q.run_after_recovery_callbacks } @exchanges.each { |name, e| e.run_after_recovery_callbacks } end
@private
# File lib/amq/client/async/channel.rb, line 290 def run_before_recovery_callbacks self.exec_callback_yielding_self(:before_recovery) @queues.each { |name, q| q.run_before_recovery_callbacks } @exchanges.each { |name, e| e.run_before_recovery_callbacks } end
Synchronizes given block using this channel's mutex. @api public
# File lib/amq/client/async/channel.rb, line 98 def synchronize(&block) @mutex.synchronize(&block) end
Commits AMQP transaction.
@api public
# File lib/amq/client/async/channel.rb, line 229 def tx_commit(&block) @connection.send_frame(Protocol::Tx::Commit.encode(@id)) self.redefine_callback :tx_commit, &block self end
Rolls AMQP transaction back.
@api public
# File lib/amq/client/async/channel.rb, line 239 def tx_rollback(&block) @connection.send_frame(Protocol::Tx::Rollback.encode(@id)) self.redefine_callback :tx_rollback, &block self end
Sets the channel to use standard transactions. One must use this method at least once on a channel before using tx_tommit or tx_rollback methods.
@api public
# File lib/amq/client/async/channel.rb, line 219 def tx_select(&block) @connection.send_frame(Protocol::Tx::Select.encode(@id)) self.redefine_callback :tx_select, &block self end
Generated with the Darkfish Rdoc Generator 2.