class MessageBus::ConnectionManager
Public Class Methods
new(bus = nil)
click to toggle source
# File lib/message_bus/connection_manager.rb, line 28 def initialize(bus = nil) @clients = {} @subscriptions = {} @bus = bus || MessageBus end
Public Instance Methods
add_client(client)
click to toggle source
# File lib/message_bus/connection_manager.rb, line 78 def add_client(client) @clients[client.client_id] = client @subscriptions[client.site_id] ||= {} client.subscriptions.each do |k,v| subscribe_client(client, k) end end
client_count()
click to toggle source
# File lib/message_bus/connection_manager.rb, line 107 def client_count @clients.length end
lookup_client(client_id)
click to toggle source
# File lib/message_bus/connection_manager.rb, line 94 def lookup_client(client_id) @clients[client_id] end
notify_clients(msg)
click to toggle source
# File lib/message_bus/connection_manager.rb, line 34 def notify_clients(msg) begin site_subs = @subscriptions[msg.site_id] subscription = site_subs[msg.channel] if site_subs return unless subscription around_filter = @bus.around_client_batch(msg.channel) work = lambda do subscription.each do |client_id| client = @clients[client_id] if client && client.allowed?(msg) if copy = client.filter(msg) begin client << copy rescue # pipe may be broken, move on end # turns out you can delete from a set while itereating remove_client(client) end end end end if around_filter user_ids = subscription.map do |s| c = @clients[s] c && c.user_id end.compact if user_ids && user_ids.length > 0 around_filter.call(msg, user_ids, work) end else work.call end rescue => e MessageBus.logger.error "notify clients crash #{e} : #{e.backtrace}" end end
remove_client(c)
click to toggle source
# File lib/message_bus/connection_manager.rb, line 86 def remove_client(c) @clients.delete c.client_id @subscriptions[c.site_id].each do |k, set| set.delete c.client_id end c.cleanup_timer.cancel if c.cleanup_timer end
stats()
click to toggle source
# File lib/message_bus/connection_manager.rb, line 111 def stats { client_count: @clients.length, subscriptions: @subscriptions } end
subscribe_client(client,channel)
click to toggle source
# File lib/message_bus/connection_manager.rb, line 98 def subscribe_client(client,channel) set = @subscriptions[client.site_id][channel] unless set set = SynchronizedSet.new @subscriptions[client.site_id][channel] = set end set << client.client_id end