Parent

Class/Module Index [+]

Quicksearch

AMQ::Client::Async::Channel

Attributes

consumers_awaiting_cancel_ok[R]
consumers_awaiting_consume_ok[R]
exchanges_awaiting_declare_ok[R]
exchanges_awaiting_delete_ok[R]
flow_is_active[RW]
id[R]
queues_awaiting_bind_ok[R]
queues_awaiting_declare_ok[R]
queues_awaiting_delete_ok[R]
queues_awaiting_get_response[R]
queues_awaiting_purge_ok[R]
queues_awaiting_unbind_ok[R]

Public Class Methods

new(connection, id, options = {}) click to toggle source
# File lib/amq/client/async/channel.rb, line 38
def initialize(connection, id, options = {})
  super(connection)

  @id        = id
  @exchanges = Hash.new
  @queues    = Hash.new
  @consumers = Hash.new
  @options       = { :auto_recovery => connection.auto_recovering? }.merge(options)
  @auto_recovery = (!!@options[:auto_recovery])

  # we must synchronize frameset delivery. MK.
  @mutex     = Mutex.new

  reset_state!

  # 65536 is here for cases when channel is opened without passing a callback in,
  # otherwise channel_mix would be nil and it causes a lot of needless headaches.
  # lets just have this default. MK.
  channel_max = if @connection.open?
                  @connection.channel_max || 65536
                else
                  65536
                end

  if channel_max != 0 && !(0..channel_max).include?(id)
    raise ArgumentError.new("Max channel for the connection is #{channel_max}, given: #{id}")
  end
end

Public Instance Methods

acknowledge(delivery_tag, multiple = false) click to toggle source

Acknowledge one or all messages on the channel.

@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.13.)

# File lib/amq/client/async/channel.rb, line 136
def acknowledge(delivery_tag, multiple = false)
  @connection.send_frame(Protocol::Basic::Ack.encode(self.id, delivery_tag, multiple))

  self
end
after_connection_interruption(&block) click to toggle source
after_recovery(&block) click to toggle source
Alias for: on_recovery
auto_recover() click to toggle source

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).

@api plugin

# File lib/amq/client/async/channel.rb, line 321
def auto_recover
  return unless auto_recovering?

  self.open do
    # exchanges must be recovered first because queue recovery includes recovery of bindings. MK.
    @exchanges.each { |name, e| e.auto_recover }
    @queues.each    { |name, q| q.auto_recover }
  end
end
auto_recovering?() click to toggle source

@return [Boolean] true if this channel uses automatic recovery mode

# File lib/amq/client/async/channel.rb, line 68
def auto_recovering?
  @auto_recovery
end
before_recovery(&block) click to toggle source

Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/channel.rb, line 285
def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end
close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) click to toggle source

Closes AMQP channel.

@api public

# File lib/amq/client/async/channel.rb, line 120
def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
  @connection.send_frame(Protocol::Channel::Close.encode(@id, reply_code, reply_text, class_id, method_id))

  self.redefine_callback :close, &block
end
connection() click to toggle source

AMQP connection this channel belongs to.

@return [AMQ::Client::Connection] Connection this channel belongs to.

# File lib/amq/client/async/channel.rb, line 92
def connection
  @connection
end
consumers() click to toggle source

@return [Hash<String, Consumer>]

# File lib/amq/client/async/channel.rb, line 74
def consumers
  @consumers
end
exchanges() click to toggle source

@return [Array<Exchange>] Collection of exchanges that were declared on this channel.

# File lib/amq/client/async/channel.rb, line 84
def exchanges
  @exchanges.values
end
find_exchange(name) click to toggle source

Finds exchange in the exchanges cache on this channel by name. Exchange only exists in the cache if it was previously instantiated on this channel.

@param [String] name Exchange name @return [AMQ::Client::Exchange] Exchange (if found) @api plugin

# File lib/amq/client/async/channel.rb, line 350
def find_exchange(name)
  @exchanges[name]
end
find_queue(name) click to toggle source

@api plugin @private

# File lib/amq/client/async/channel.rb, line 364
def find_queue(name)
  @queues[name]
end
flow(active = false, &block) click to toggle source

Asks the peer to pause or restart the flow of content data sent to a consumer. This is a simple flow­control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. It does not affect contents returned to Queue#get callers.

@param [Boolean] active Desired flow state.

@see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.5.2.3.) @api public

# File lib/amq/client/async/channel.rb, line 195
def flow(active = false, &block)
  @connection.send_frame(Protocol::Channel::Flow.encode(@id, active))

  self.redefine_callback :flow, &block
  self
end
flow_is_active?() click to toggle source

@return [Boolean] True if flow in this channel is active (messages will be delivered to consumers that use this channel).

@api public

# File lib/amq/client/async/channel.rb, line 205
def flow_is_active?
  @flow_is_active
end
handle_close(channel_close) click to toggle source

@api plugin @private

# File lib/amq/client/async/channel.rb, line 412
def handle_close(channel_close)
  self.status = :closed
  self.connection.clear_frames_on(self.id)
  self.exec_callback_yielding_self(:error, channel_close)

  self.handle_connection_interruption(channel_close)
end
handle_close_ok(close_ok) click to toggle source

@api plugin @private

# File lib/amq/client/async/channel.rb, line 404
def handle_close_ok(close_ok)
  self.status = :closed
  self.connection.clear_frames_on(self.id)
  self.exec_callback_once_yielding_self(:close, close_ok)
end
handle_connection_interruption(method = nil) click to toggle source

@private

# File lib/amq/client/async/channel.rb, line 271
def handle_connection_interruption(method = nil)
  @queues.each    { |name, q| q.handle_connection_interruption(method) }
  @exchanges.each { |name, e| e.handle_connection_interruption(method) }

  self.exec_callback_yielding_self(:after_connection_interruption)
  self.reset_state!
end
handle_open_ok(open_ok) click to toggle source

@api plugin @private

# File lib/amq/client/async/channel.rb, line 397
def handle_open_ok(open_ok)
  self.status = :opened
  self.exec_callback_once_yielding_self(:open, open_ok)
end
on_connection_interruption(&block) click to toggle source

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/channel.rb, line 265
def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end
on_error(&block) click to toggle source

Defines a callback that will be executed when channel is closed after channel-level exception.

@api public

# File lib/amq/client/async/channel.rb, line 256
def on_error(&block)
  self.define_callback(:error, &block)
end
on_recovery(&block) click to toggle source

Defines a callback that will be executed after AMQP connection has recovered after a network failure. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/channel.rb, line 303
def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end
Also aliased as: after_recovery
open(&block) click to toggle source

Opens AMQP channel.

@api public

# File lib/amq/client/async/channel.rb, line 108
def open(&block)
  @connection.send_frame(Protocol::Channel::Open.encode(@id, AMQ::Protocol::EMPTY_STRING))
  @connection.channels[@id] = self
  self.status = :opening

  self.redefine_callback :open, &block
end
Also aliased as: reopen
qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) click to toggle source

Requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection.

@note RabbitMQ as of 2.3.1 does not support prefetch_size. @api public

# File lib/amq/client/async/channel.rb, line 178
def qos(prefetch_size = 0, prefetch_count = 32, global = false, &block)
  @connection.send_frame(Protocol::Basic::Qos.encode(@id, prefetch_size, prefetch_count, global))

  self.redefine_callback :qos, &block
  self
end
queues() click to toggle source

@return [Array<Queue>] Collection of queues that were declared on this channel.

# File lib/amq/client/async/channel.rb, line 79
def queues
  @queues.values
end
recover(requeue = true, &block) click to toggle source

Notifies AMQ broker that consumer has recovered and unacknowledged messages need to be redelivered.

@return [Channel] self

@note RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false. @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.16.) @api public

# File lib/amq/client/async/channel.rb, line 160
def recover(requeue = true, &block)
  @connection.send_frame(Protocol::Basic::Recover.encode(@id, requeue))

  self.redefine_callback :recover, &block
  self
end
register_exchange(exchange) click to toggle source

Implementation

# File lib/amq/client/async/channel.rb, line 338
def register_exchange(exchange)
  raise ArgumentError, "argument is nil!" if exchange.nil?

  @exchanges[exchange.name] = exchange
end
register_queue(queue) click to toggle source

@api plugin @private

# File lib/amq/client/async/channel.rb, line 356
def register_queue(queue)
  raise ArgumentError, "argument is nil!" if queue.nil?

  @queues[queue.name] = queue
end
reject(delivery_tag, requeue = true) click to toggle source

Reject a message with given delivery tag.

@api public @see bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.14.)

# File lib/amq/client/async/channel.rb, line 146
def reject(delivery_tag, requeue = true)
  @connection.send_frame(Protocol::Basic::Reject.encode(self.id, delivery_tag, requeue))

  self
end
reopen(&block) click to toggle source
Alias for: open
reset_state!() click to toggle source

@api plugin @private

# File lib/amq/client/async/channel.rb, line 374
def reset_state!
  @flow_is_active                = true

  @queues_awaiting_declare_ok    = Array.new
  @exchanges_awaiting_declare_ok = Array.new

  @queues_awaiting_delete_ok     = Array.new

  @exchanges_awaiting_delete_ok  = Array.new
  @queues_awaiting_purge_ok      = Array.new
  @queues_awaiting_bind_ok       = Array.new
  @queues_awaiting_unbind_ok     = Array.new
  @consumers_awaiting_consume_ok = Array.new
  @consumers_awaiting_cancel_ok  = Array.new

  @queues_awaiting_get_response  = Array.new

  @callbacks                     = @callbacks.delete_if { |k, v| !RECOVERY_EVENTS.include?(k) }
end
run_after_recovery_callbacks() click to toggle source

@private

# File lib/amq/client/async/channel.rb, line 309
def run_after_recovery_callbacks
  self.exec_callback_yielding_self(:after_recovery)

  @queues.each    { |name, q| q.run_after_recovery_callbacks }
  @exchanges.each { |name, e| e.run_after_recovery_callbacks }
end
run_before_recovery_callbacks() click to toggle source

@private

# File lib/amq/client/async/channel.rb, line 290
def run_before_recovery_callbacks
  self.exec_callback_yielding_self(:before_recovery)

  @queues.each    { |name, q| q.run_before_recovery_callbacks }
  @exchanges.each { |name, e| e.run_before_recovery_callbacks }
end
synchronize(&block) click to toggle source

Synchronizes given block using this channel's mutex. @api public

# File lib/amq/client/async/channel.rb, line 98
def synchronize(&block)
  @mutex.synchronize(&block)
end
tx_commit(&block) click to toggle source

Commits AMQP transaction.

@api public

# File lib/amq/client/async/channel.rb, line 229
def tx_commit(&block)
  @connection.send_frame(Protocol::Tx::Commit.encode(@id))

  self.redefine_callback :tx_commit, &block
  self
end
tx_rollback(&block) click to toggle source

Rolls AMQP transaction back.

@api public

# File lib/amq/client/async/channel.rb, line 239
def tx_rollback(&block)
  @connection.send_frame(Protocol::Tx::Rollback.encode(@id))

  self.redefine_callback :tx_rollback, &block
  self
end
tx_select(&block) click to toggle source

Sets the channel to use standard transactions. One must use this method at least once on a channel before using tx_tommit or tx_rollback methods.

@api public

# File lib/amq/client/async/channel.rb, line 219
def tx_select(&block)
  @connection.send_frame(Protocol::Tx::Select.encode(@id))

  self.redefine_callback :tx_select, &block
  self
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.