class MessageBus::Client
Constants
- COLON_SPACE
- CONNECTION_CLOSE
- CONTENT_LENGTH
- HTTP_11
- NEWLINE
heavily optimised to avoid all uneeded allocations
Attributes
async_response[RW]
cleanup_timer[RW]
client_id[RW]
connect_time[RW]
group_ids[RW]
headers[RW]
io[RW]
site_id[RW]
subscribed_sets[RW]
user_id[RW]
Public Class Methods
new(opts)
click to toggle source
# File lib/message_bus/client.rb, line 6 def initialize(opts) self.client_id = opts[:client_id] self.user_id = opts[:user_id] self.group_ids = opts[:group_ids] || [] self.site_id = opts[:site_id] self.connect_time = Time.now @bus = opts[:message_bus] || MessageBus @subscriptions = {} end
Public Instance Methods
<<(msg)
click to toggle source
# File lib/message_bus/client.rb, line 48 def <<(msg) write_and_close messages_to_json([msg]) end
allowed?(msg)
click to toggle source
# File lib/message_bus/client.rb, line 56 def allowed?(msg) allowed = !msg.user_ids || msg.user_ids.include?(self.user_id) allowed &&= !msg.client_ids || msg.client_ids.include?(self.client_id) allowed && ( msg.group_ids.nil? || msg.group_ids.length == 0 || ( msg.group_ids - self.group_ids ).length < msg.group_ids.length ) end
backlog()
click to toggle source
# File lib/message_bus/client.rb, line 78 def backlog r = [] @subscriptions.each do |k,v| next if v.to_i < 0 messages = @bus.backlog(k,v) messages.each do |msg| r << msg if allowed?(msg) end end # stats message for all newly subscribed status_message = nil @subscriptions.each do |k,v| if v.to_i == -1 status_message ||= {} status_message[k] = @bus.last_id(k) end end r << MessageBus::Message.new(-1, -1, '/__status', status_message) if status_message r.map!{|msg| filter(msg)}.compact! r || [] end
close()
click to toggle source
# File lib/message_bus/client.rb, line 29 def close return unless in_async? write_and_close "[]" end
closed()
click to toggle source
# File lib/message_bus/client.rb, line 34 def closed !@async_response end
ensure_closed!()
click to toggle source
# File lib/message_bus/client.rb, line 20 def ensure_closed! return unless in_async? write_and_close "[]" rescue # we may have a dead socket, just nil the @io @io = nil @async_response = nil end
filter(msg)
click to toggle source
# File lib/message_bus/client.rb, line 68 def filter(msg) filter = @bus.client_filter(msg.channel) if filter filter.call(self.user_id, msg) else msg end end
in_async?()
click to toggle source
# File lib/message_bus/client.rb, line 16 def in_async? @async_response || @io end
subscribe(channel, last_seen_id)
click to toggle source
# File lib/message_bus/client.rb, line 38 def subscribe(channel, last_seen_id) last_seen_id = nil if last_seen_id == "" last_seen_id ||= @bus.last_id(channel) @subscriptions[channel] = last_seen_id.to_i end
subscriptions()
click to toggle source
# File lib/message_bus/client.rb, line 44 def subscriptions @subscriptions end
Protected Instance Methods
messages_to_json(msgs)
click to toggle source
# File lib/message_bus/client.rb, line 135 def messages_to_json(msgs) MessageBus::Rack::Middleware.backlog_to_json(msgs) end
write_and_close(data)
click to toggle source
# File lib/message_bus/client.rb, line 111 def write_and_close(data) if @io @io.write(HTTP_11) @headers.each do |k,v| @io.write(k) @io.write(COLON_SPACE) @io.write(v) @io.write(NEWLINE) end @io.write(CONTENT_LENGTH) @io.write(data.bytes.to_a.length) @io.write(NEWLINE) @io.write(CONNECTION_CLOSE) @io.write(NEWLINE) @io.write(data) @io.close @io = nil else @async_response << data @async_response.done @async_response = nil end end