EM::Connection
Backwards compatibility with 0.7.0.a25. MK.
Initiates connection to AMQP broker. If callback is given, runs it when (and if) AMQP connection succeeds.
@option settings [String] :host ("127.0.0.1") Hostname AMQ broker runs on. @option settings [String] :port (5672) Port AMQ broker listens on. @option settings [String] :vhost ("/") Virtual host to use. @option settings [String] :user ("guest") Username to use for authentication. @option settings [String] :pass ("guest") Password to use for authentication. @option settings [String] :ssl (false) Should be use TLS (SSL) for connection? @option settings [String] :timeout (nil) Connection timeout. @option settings [String] :logging (false) Turns logging on or off. @option settings [String] :broker (nil) Broker name (use if you intend to use broker-specific features). @option settings [Fixnum] :frame_max (131072) Maximum frame size to use. If broker cannot support frames this large, broker's maximum value will be used instead.
@param [Hash] settings
# File lib/amq/client/async/adapters/event_machine.rb, line 40 def self.connect(settings = {}, &block) @settings = Settings.configure(settings) instance = EventMachine.connect(@settings[:host], @settings[:port], self, @settings) instance.register_connection_callback(&block) instance end
# File lib/amq/client/async/adapters/event_machine.rb, line 139 def initialize(*args) super(*args) self.logger = self.class.logger # channel => collected frames. MK. @frames = Hash.new { Array.new } @channels = Hash.new @callbacks = Hash.new opening! # track TCP connection state, used to detect initial TCP connection failures. @tcp_connection_established = false @tcp_connection_failed = false @intentionally_closing_connection = false # EventMachine::Connection's and Adapter's constructors arity # make it easier to use *args. MK. @settings = Settings.configure(args.first) @on_tcp_connection_failure = @settings[:on_tcp_connection_failure] || Proc.new { |settings| raise self.class.tcp_connection_failure_exception_class.new(settings) } @on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings| raise self.class.authentication_failure_exception_class.new(settings) } @mechanism = "PLAIN" @locale = @settings.fetch(:locale, "en_GB") @client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new)) @auto_recovery = (!!@settings[:auto_recovery]) self.reset self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION) self.initialize_heartbeat_sender if self.heartbeats_enabled? end
Whether we are in authentication state (after TCP connection was estabilished but before broker authenticated us).
@return [Boolean] @api public
# File lib/amq/client/async/adapters/event_machine.rb, line 196 def authenticating? @authenticating end
@private
# File lib/amq/client/async/adapters/event_machine.rb, line 275 def close_connection(*args) @intentionally_closing_connection = true super(*args) end
Called by EventMachine reactor once TCP connection is successfully estabilished. @private
# File lib/amq/client/async/adapters/event_machine.rb, line 243 def connection_completed # we only can safely set this value here because EventMachine is a lovely piece of # software that calls #post_init before #unbind even when TCP connection # fails. MK. @tcp_connection_established = true @periodic_reconnection_timer.cancel if @periodic_reconnection_timer # again, this is because #unbind is called in different situations # and there is no easy way to tell initial connection failure # from connection loss. Not in EventMachine 0.12.x, anyway. MK. if @had_successfully_connected_before @recovered = true self.start_automatic_recovery self.upgrade_to_tls_if_necessary end # now we can set it. MK. @had_successfully_connected_before = true @reconnecting = false @handling_skipped_hearbeats = false @last_server_heartbeat = Time.now self.initialize_heartbeat_sender if self.heartbeat_interval > 0 self.handshake end
Called by AMQ::Client::Connection after we receive connection.open-ok. @api public
# File lib/amq/client/async/adapters/event_machine.rb, line 331 def connection_successful @authenticating = false opened! @connection_deferrable.succeed end
Called by AMQ::Client::Connection after we receive connection.close-ok.
@api public
# File lib/amq/client/async/adapters/event_machine.rb, line 342 def disconnection_successful @disconnection_deferrable.succeed # true for "after writing buffered data" self.close_connection(true) self.reset closed! end
For EventMachine adapter, this is a no-op. @api public
# File lib/amq/client/async/adapters/event_machine.rb, line 182 def establish_connection(settings) # Unfortunately there doesn't seem to be any sane way # how to get EventMachine connect to the instance level. end
Called when time since last server heartbeat received is greater or equal to the heartbeat interval set via :heartbeat_interval option on connection.
@api plugin
# File lib/amq/client/async/adapters/event_machine.rb, line 355 def handle_skipped_hearbeats if !@handling_skipped_hearbeats && @tcp_connection_established && !@intentionally_closing_connection @handling_skipped_hearbeats = true @heartbeats_timer.cancel self.run_skipped_heartbeats_callbacks end end
@private
# File lib/amq/client/async/adapters/event_machine.rb, line 365 def initialize_heartbeat_sender @last_server_heartbeat = Time.now @heartbeats_timer = EventMachine::PeriodicTimer.new(self.heartbeat_interval, &method(:send_heartbeat)) end
Defines a callback that will be run when broker confirms connection termination (client receives connection.close-ok). You can define more than one callback.
@api public
# File lib/amq/client/async/adapters/event_machine.rb, line 118 def on_closed(&block) @disconnection_deferrable.callback(&block) end
Defines a callback that will be executed when AMQP connection is considered open: client and broker has agreed on max channel identifier and maximum allowed frame size and authentication succeeds. You can define more than one callback.
@see on_possible_authentication_failure @api public
# File lib/amq/client/async/adapters/event_machine.rb, line 109 def on_open(&block) @connection_deferrable.callback(&block) end
Periodically try to reconnect.
@param [Fixnum] period Period of time, in seconds, to wait before reconnection attempt. @param [Boolean] force If true, enforces immediate reconnection. @api public
# File lib/amq/client/async/adapters/event_machine.rb, line 89 def periodically_reconnect(period = 5) @reconnecting = true self.reset @periodic_reconnection_timer = EventMachine::PeriodicTimer.new(period) { EventMachine.reconnect(@settings[:host], @settings[:port], self) } end
EventMachine reactor callback. Is run when TCP connection is estabilished but before resumption of the network loop. Note that this includes cases when TCP connection has failed. @private
# File lib/amq/client/async/adapters/event_machine.rb, line 225 def post_init reset # note that upgrading to TLS in #connection_completed causes # Erlang SSL app that RabbitMQ relies on to report # error on TCP connection <0.1465.0>:{ssl_upgrade_error,"record overflow"} # and close TCP connection down. Investigation of this issue is likely # to take some time and to not be worth in as long as #post_init # works fine. MK. upgrade_to_tls_if_necessary rescue Exception => error raise error end
EventMachine receives data in chunks, sometimes those chunks are smaller than the size of AMQP frame. That's why you need to add some kind of buffer.
@private
# File lib/amq/client/async/adapters/event_machine.rb, line 321 def receive_data(chunk) @chunk_buffer << chunk while frame = get_next_frame self.receive_frame(AMQ::Client::Framing::String::Frame.decode(frame)) end end
Reconnect after a period of wait.
@param [Fixnum] period Period of time, in seconds, to wait before reconnection attempt. @param [Boolean] force If true, enforces immediate reconnection. @api public
# File lib/amq/client/async/adapters/event_machine.rb, line 54 def reconnect(force = false, period = 5) if @reconnecting and not force EventMachine::Timer.new(period) { reconnect(true, period) } return end if !@reconnecting @reconnecting = true self.reset end EventMachine.reconnect(@settings[:host], @settings[:port], self) end
Similar to reconnect, but uses different connection settings @see reconnect @api public
# File lib/amq/client/async/adapters/event_machine.rb, line 73 def reconnect_to(settings, period = 5) if !@reconnecting @reconnecting = true self.reset end @settings = Settings.configure(settings) EventMachine.reconnect(@settings[:host], @settings[:port], self) end
@see on_open @private
# File lib/amq/client/async/adapters/event_machine.rb, line 125 def register_connection_callback(&block) unless block.nil? # delay calling block we were given till after we receive # connection.open-ok. Connection will notify us when # that happens. self.on_open do block.call(self) end end end
IS TCP connection estabilished and currently active? @return [Boolean] @api public
# File lib/amq/client/async/adapters/event_machine.rb, line 203 def tcp_connection_established? @tcp_connection_established end
Called by EventMachine reactor when
We close TCP connection down
Our peer closes TCP connection down
There is a network connection issue
Initial TCP connection fails
@private
# File lib/amq/client/async/adapters/event_machine.rb, line 288 def unbind(exception = nil) if !@tcp_connection_established && !@had_successfully_connected_before && !@intentionally_closing_connection @tcp_connection_failed = true logger.error "[amqp] Detected TCP connection failure" self.tcp_connection_failed end closing! @tcp_connection_established = false self.handle_connection_interruption if @reconnecting @disconnection_deferrable.succeed closed! self.tcp_connection_lost if !@intentionally_closing_connection && @had_successfully_connected_before # since AMQP spec dictates that authentication failure is a protocol exception # and protocol exceptions result in connection closure, check whether we are # in the authentication stage. If so, it is likely to signal an authentication # issue. Java client behaves the same way. MK. if authenticating? && !@intentionally_closing_connection @on_possible_authentication_failure.call(@settings) if @on_possible_authentication_failure end end
# File lib/amq/client/async/adapters/event_machine.rb, line 400 def reset @size = 0 @payload = "" @frames = Array.new @chunk_buffer = "" @connection_deferrable = EventMachine::DefaultDeferrable.new @disconnection_deferrable = EventMachine::DefaultDeferrable.new # used to track down whether authentication succeeded. AMQP 0.9.1 dictates # that on authentication failure broker must close TCP connection without sending # any more data. This is why we need to explicitly track whether we are past # authentication stage to signal possible authentication failures. @authenticating = false end
Generated with the Darkfish Rdoc Generator 2.