Parent

Class/Module Index [+]

Quicksearch

AMQ::Client::Async::Exchange

Constants

BUILTIN_TYPES

Attributes

arguments[R]

@return [Hash] Additional arguments given on queue declaration. Typically used by AMQP extensions.

channel[R]

Channel this exchange belongs to.

name[R]

Exchange name. May be server-generated or assigned directly. @return [String]

type[R]

@return [Symbol] One of :direct, :fanout, :topic, :headers

Public Class Methods

new(connection, channel, name, type = :fanout) click to toggle source
# File lib/amq/client/async/exchange.rb, line 39
def initialize(connection, channel, name, type = :fanout)
  if !(BUILTIN_TYPES.include?(type.to_sym) || type.to_s =~ /^x-.+/)
    raise UnknownExchangeTypeError.new(BUILTIN_TYPES, type)
  end

  @connection = connection
  @channel    = channel
  @name       = name
  @type       = type

  # register pre-declared exchanges
  if @name == AMQ::Protocol::EMPTY_STRING || @name =~ /^amq\.(direct|fanout|topic|match|headers)/
    @channel.register_exchange(self)
  end

  super(connection)
end

Public Instance Methods

after_connection_interruption(&block) click to toggle source
after_recovery(&block) click to toggle source
Alias for: on_recovery
auto_recover() click to toggle source

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).

@api plugin

# File lib/amq/client/async/exchange.rb, line 223
def auto_recover
  self.redeclare unless predefined?
end
before_recovery(&block) click to toggle source

Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/exchange.rb, line 194
def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end
custom_type?() click to toggle source

@return [Boolean] true if this exchange is of a custom type (begins with x-) @api public

# File lib/amq/client/async/exchange.rb, line 83
def custom_type?
  @type.to_s =~ /^x-.+/
end
declare(passive = false, durable = false, auto_delete = false, nowait = false, arguments = nil, &block) click to toggle source

@api public

# File lib/amq/client/async/exchange.rb, line 96
def declare(passive = false, durable = false, auto_delete = false, nowait = false, arguments = nil, &block)
  # for re-declaration
  @passive     = passive
  @durable     = durable
  @auto_delete = auto_delete
  @arguments   = arguments

  @connection.send_frame(Protocol::Exchange::Declare.encode(@channel.id, @name, @type.to_s, passive, durable, auto_delete, false, nowait, arguments))

  unless nowait
    self.define_callback(:declare, &block)
    @channel.exchanges_awaiting_declare_ok.push(self)
  end

  self
end
delete(if_unused = false, nowait = false, &block) click to toggle source

@api public

# File lib/amq/client/async/exchange.rb, line 131
def delete(if_unused = false, nowait = false, &block)
  @connection.send_frame(Protocol::Exchange::Delete.encode(@channel.id, @name, if_unused, nowait))

  unless nowait
    self.define_callback(:delete, &block)

    # TODO: delete itself from exchanges cache
    @channel.exchanges_awaiting_delete_ok.push(self)
  end

  self
end
direct?() click to toggle source

@return [Boolean] true if this exchange is of type `direct` @api public

# File lib/amq/client/async/exchange.rb, line 65
def direct?
  @type == :direct
end
fanout?() click to toggle source

@return [Boolean] true if this exchange is of type `fanout` @api public

# File lib/amq/client/async/exchange.rb, line 59
def fanout?
  @type == :fanout
end
handle_connection_interruption(method = nil) click to toggle source

@private

# File lib/amq/client/async/exchange.rb, line 183
def handle_connection_interruption(method = nil)
  self.exec_callback_yielding_self(:after_connection_interruption)
end
handle_declare_ok(method) click to toggle source

Implementation

# File lib/amq/client/async/exchange.rb, line 236
def handle_declare_ok(method)
  @name = method.exchange if self.anonymous?
  @channel.register_exchange(self)

  self.exec_callback_once_yielding_self(:declare, method)
end
handle_delete_ok(method) click to toggle source
# File lib/amq/client/async/exchange.rb, line 243
def handle_delete_ok(method)
  self.exec_callback_once(:delete, method)
end
headers?() click to toggle source

@return [Boolean] true if this exchange is of type `headers` @api public

# File lib/amq/client/async/exchange.rb, line 77
def headers?
  @type == :headers
end
on_connection_interruption(&block) click to toggle source

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/exchange.rb, line 177
def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end
on_recovery(&block) click to toggle source

Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amq/client/async/exchange.rb, line 208
def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end
Also aliased as: after_recovery
on_return(&block) click to toggle source

@api public

# File lib/amq/client/async/exchange.rb, line 160
def on_return(&block)
  self.redefine_callback(:return, &block)

  self
end
predefined?() click to toggle source

@return [Boolean] true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on)

# File lib/amq/client/async/exchange.rb, line 88
def predefined?
  @name && ((@name == AMQ::Protocol::EMPTY_STRING) || !!(@name =~ /^amq\.(direct|fanout|topic|headers|match)/))
end
publish(payload, routing_key = AMQ::Protocol::EMPTY_STRING, user_headers = {}, mandatory = false, immediate = false, frame_size = nil) click to toggle source

@api public

# File lib/amq/client/async/exchange.rb, line 149
def publish(payload, routing_key = AMQ::Protocol::EMPTY_STRING, user_headers = {}, mandatory = false, immediate = false, frame_size = nil)
  headers = { :priority => 0, :delivery_mode => 2, :content_type => "application/octet-stream" }.merge(user_headers)
  @connection.send_frameset(Protocol::Basic::Publish.encode(@channel.id, payload, headers, @name, routing_key, mandatory, immediate, (frame_size || @connection.frame_max)), @channel)

  # publisher confirms support. MK.
  @channel.exec_callback(:after_publish)
  self
end
redeclare(&block) click to toggle source

@api public

# File lib/amq/client/async/exchange.rb, line 115
def redeclare(&block)
  nowait = block.nil?
  @connection.send_frame(Protocol::Exchange::Declare.encode(@channel.id, @name, @type.to_s, @passive, @durable, @auto_delete, false, nowait, @arguments))

  unless nowait
    self.define_callback(:declare, &block)
    @channel.exchanges_awaiting_declare_ok.push(self)
  end

  self
end
run_after_recovery_callbacks() click to toggle source

@private

# File lib/amq/client/async/exchange.rb, line 214
def run_after_recovery_callbacks
  self.exec_callback_yielding_self(:after_recovery)
end
run_before_recovery_callbacks() click to toggle source

@private

# File lib/amq/client/async/exchange.rb, line 199
def run_before_recovery_callbacks
  self.exec_callback_yielding_self(:before_recovery)
end
topic?() click to toggle source

@return [Boolean] true if this exchange is of type `topic` @api public

# File lib/amq/client/async/exchange.rb, line 71
def topic?
  @type == :topic
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.