class IProto::EMConnection
Attributes
_needed_size[R]
host[R]
port[R]
Public Class Methods
new(host, port, reconnect = true)
click to toggle source
# File lib/iproto/em.rb, line 40 def initialize(host, port, reconnect = true) @host = host @port = port @reconnect_timeout = Numeric === reconnect ? reconnect : DEFAULT_RECONNECT @should_reconnect = !!reconnect @reconnect_timer = nil @ping_timer = nil @connected = :init_waiting @waiting_requests = {} @waiting_for_connect = [] @shutdown_hook = false @inactivity_timeout = 0 init_protocol shutdown_hook end
Public Instance Methods
_do_send_request(request_type, body, request)
click to toggle source
# File lib/iproto/em.rb, line 191 def _do_send_request(request_type, body, request) while @waiting_requests.include?(request_id = next_request_id); end send_data pack_request(request_type, request_id, body) @waiting_requests[request_id] = request end
_perform_waiting_for_connect(real)
click to toggle source
# File lib/iproto/em.rb, line 174 def _perform_waiting_for_connect(real) if real @waiting_for_connect.each do |request_type, body, request| ::EM.next_tick{ _do_send_request(request_type, body, request) } end else i = -1 @waiting_for_connect.each do |request_type, body, request| @waiting_requests[i] = request i -= 1 end end @waiting_for_connect.clear end
_ping()
click to toggle source
# File lib/iproto/em.rb, line 106 def _ping send_data pack_request(PING, PING_ID, EMPTY_STR) @ping_timer = nil end
_send_request(request_type, body, request)
click to toggle source
# File lib/iproto/em.rb, line 157 def _send_request(request_type, body, request) if @connected == true _do_send_request(request_type, body, request) elsif could_be_connected? @waiting_for_connect << [request_type, body, request] if @connected == :force _setup_reconnect_timer(0) end elsif ::EM.reactor_running? EM.next_tick{ do_response(request, IProto::Disconnected.new("connection is closed")) } else do_response(request, IProto::Disconnected.new("connection is closed")) end end
_setup_reconnect_timer(timeout)
click to toggle source
# File lib/iproto/em.rb, line 142 def _setup_reconnect_timer(timeout) if @reconnect_timer.nil? @reconnect_timer = :waiting shutdown_hook if timeout == 0 @connected = :waiting reconnect @host, @port else @reconnect_timer = ::EM.add_timer(timeout) do reconnect @host, @port end end end end
_stop_pinger()
click to toggle source
# File lib/iproto/em.rb, line 56 def _stop_pinger if @ping_timer EM.cancel_timer @ping_timer @ping_timer = nil end end
close()
click to toggle source
# File lib/iproto/em.rb, line 197 def close close_connection(false) end
close_connection(*args)
click to toggle source
Calls superclass method
# File lib/iproto/em.rb, line 201 def close_connection(*args) @should_reconnect = nil if Integer === @reconnect_timer ::EM.cancel_timer @reconnect_timer end @reconnect_timer = nil if @connected == true super(*args) end @connected = false discard_requests end
comm_inactivity_timeout=(t)
click to toggle source
Calls superclass method
# File lib/iproto/em.rb, line 63 def comm_inactivity_timeout=(t) @inactivity_timeout = t super _ping end
connected?()
click to toggle source
# File lib/iproto/em.rb, line 69 def connected? @connected == true end
connection_completed()
click to toggle source
# File lib/iproto/em.rb, line 91 def connection_completed @reconnect_timer = nil @connected = true init_protocol self.comm_inactivity_timeout= @inactivity_timeout _perform_waiting_for_connect(true) end
could_be_connected?()
click to toggle source
# File lib/iproto/em.rb, line 73 def could_be_connected? @connected && (@connected != :force || ::EM.reactor_running?) end
discard_requests()
click to toggle source
# File lib/iproto/em.rb, line 215 def discard_requests exc = IProto::Disconnected.new("discarded cause of disconnect") _perform_waiting_for_connect(false) @waiting_requests.keys.each do |req| request = @waiting_requests.delete req do_response request, exc end end
do_response(request, data)
click to toggle source
# File lib/iproto/em.rb, line 138 def do_response(request, data) request.call data end
init_protocol()
click to toggle source
# File lib/iproto/em.rb, line 100 def init_protocol @_needed_size = HEADER_SIZE @_state = :receive_header buffer_reset end
receive_chunk(chunk)
click to toggle source
# File lib/iproto/em.rb, line 111 def receive_chunk(chunk) if @_state == :receive_header body_size = ::BinUtils.get_int32_le(chunk, 4) @request_id = ::BinUtils.get_int32_le(chunk, 8) if body_size > 0 @_needed_size = body_size @_state = :receive_body return else chunk = '' end end if @request_id == PING_ID @_needed_size = HEADER_SIZE @_state = :receive_header if @ping_timer == nil && @inactivity_timeout > 0 @ping_timer = ::EM.add_timer(@inactivity_timeout / 4.0, method(:_ping)) end return end request = @waiting_requests.delete @request_id raise IProto::UnexpectedResponse.new("For request id #{@request_id}") unless request @_needed_size = HEADER_SIZE @_state = :receive_header do_response(request, chunk) end
shutdown_hook()
click to toggle source
# File lib/iproto/em.rb, line 77 def shutdown_hook unless @shutdown_hook ::EM.add_shutdown_hook { @connected = @should_reconnect ? :force : false if Integer === @reconnect_timer ::EM.cancel_timer @reconnect_timer end @reconnect_timer = nil @shutdown_hook = false } @shutdown_hook = true end end
unbind()
click to toggle source
# File lib/iproto/em.rb, line 224 def unbind _stop_pinger prev_connected = @connected @connected = false discard_requests @connected = prev_connected case @should_reconnect when true @reconnect_timer = nil unless @connected == :force @connected = false _setup_reconnect_timer(@reconnect_timeout) end when false if @connected == :init_waiting raise IProto::CouldNotConnect else raise IProto::Disconnected end when nil # do nothing cause we explicitely disconnected end end
waiting_requests_size()
click to toggle source
# File lib/iproto/em.rb, line 249 def waiting_requests_size @waiting_requests.size + @waiting_for_connect.size end