Object
# File lib/amq/client/async/exchange.rb, line 39 def initialize(connection, channel, name, type = :fanout) if !(BUILTIN_TYPES.include?(type.to_sym) || type.to_s =~ /^x-.+/) raise UnknownExchangeTypeError.new(BUILTIN_TYPES, type) end @connection = connection @channel = channel @name = name @type = type # register pre-declared exchanges if @name == AMQ::Protocol::EMPTY_STRING || @name =~ /^amq\.(direct|fanout|topic|match|headers)/ @channel.register_exchange(self) end super(connection) end
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
@api plugin
# File lib/amq/client/async/exchange.rb, line 223 def auto_recover self.redeclare unless predefined? end
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/exchange.rb, line 194 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end
@return [Boolean] true if this exchange is of a custom type (begins with x-) @api public
# File lib/amq/client/async/exchange.rb, line 83 def custom_type? @type.to_s =~ /^x-.+/ end
@api public
# File lib/amq/client/async/exchange.rb, line 96 def declare(passive = false, durable = false, auto_delete = false, nowait = false, arguments = nil, &block) # for re-declaration @passive = passive @durable = durable @auto_delete = auto_delete @arguments = arguments @connection.send_frame(Protocol::Exchange::Declare.encode(@channel.id, @name, @type.to_s, passive, durable, auto_delete, false, nowait, arguments)) unless nowait self.define_callback(:declare, &block) @channel.exchanges_awaiting_declare_ok.push(self) end self end
@api public
# File lib/amq/client/async/exchange.rb, line 131 def delete(if_unused = false, nowait = false, &block) @connection.send_frame(Protocol::Exchange::Delete.encode(@channel.id, @name, if_unused, nowait)) unless nowait self.define_callback(:delete, &block) # TODO: delete itself from exchanges cache @channel.exchanges_awaiting_delete_ok.push(self) end self end
@return [Boolean] true if this exchange is of type `direct` @api public
# File lib/amq/client/async/exchange.rb, line 65 def direct? @type == :direct end
@return [Boolean] true if this exchange is of type `fanout` @api public
# File lib/amq/client/async/exchange.rb, line 59 def fanout? @type == :fanout end
@private
# File lib/amq/client/async/exchange.rb, line 183 def handle_connection_interruption(method = nil) self.exec_callback_yielding_self(:after_connection_interruption) end
Implementation
# File lib/amq/client/async/exchange.rb, line 236 def handle_declare_ok(method) @name = method.exchange if self.anonymous? @channel.register_exchange(self) self.exec_callback_once_yielding_self(:declare, method) end
# File lib/amq/client/async/exchange.rb, line 243 def handle_delete_ok(method) self.exec_callback_once(:delete, method) end
@return [Boolean] true if this exchange is of type `headers` @api public
# File lib/amq/client/async/exchange.rb, line 77 def headers? @type == :headers end
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/exchange.rb, line 177 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end
Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amq/client/async/exchange.rb, line 208 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end
@api public
# File lib/amq/client/async/exchange.rb, line 160 def on_return(&block) self.redefine_callback(:return, &block) self end
@return [Boolean] true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on)
# File lib/amq/client/async/exchange.rb, line 88 def predefined? @name && ((@name == AMQ::Protocol::EMPTY_STRING) || !!(@name =~ /^amq\.(direct|fanout|topic|headers|match)/)) end
@api public
# File lib/amq/client/async/exchange.rb, line 149 def publish(payload, routing_key = AMQ::Protocol::EMPTY_STRING, user_headers = {}, mandatory = false, immediate = false, frame_size = nil) headers = { :priority => 0, :delivery_mode => 2, :content_type => "application/octet-stream" }.merge(user_headers) @connection.send_frameset(Protocol::Basic::Publish.encode(@channel.id, payload, headers, @name, routing_key, mandatory, immediate, (frame_size || @connection.frame_max)), @channel) # publisher confirms support. MK. @channel.exec_callback(:after_publish) self end
@api public
# File lib/amq/client/async/exchange.rb, line 115 def redeclare(&block) nowait = block.nil? @connection.send_frame(Protocol::Exchange::Declare.encode(@channel.id, @name, @type.to_s, @passive, @durable, @auto_delete, false, nowait, @arguments)) unless nowait self.define_callback(:declare, &block) @channel.exchanges_awaiting_declare_ok.push(self) end self end
@private
# File lib/amq/client/async/exchange.rb, line 214 def run_after_recovery_callbacks self.exec_callback_yielding_self(:after_recovery) end
Generated with the Darkfish Rdoc Generator 2.