class MessageBus::Client

Constants

COLON_SPACE
CONNECTION_CLOSE
CONTENT_LENGTH
HTTP_11
NEWLINE

heavily optimised to avoid all uneeded allocations

Attributes

async_response[RW]
cleanup_timer[RW]
client_id[RW]
connect_time[RW]
group_ids[RW]
headers[RW]
io[RW]
site_id[RW]
subscribed_sets[RW]
user_id[RW]

Public Class Methods

new(opts) click to toggle source
# File lib/message_bus/client.rb, line 6
def initialize(opts)
  self.client_id = opts[:client_id]
  self.user_id = opts[:user_id]
  self.group_ids = opts[:group_ids] || []
  self.site_id = opts[:site_id]
  self.connect_time = Time.now
  @bus = opts[:message_bus] || MessageBus
  @subscriptions = {}
end

Public Instance Methods

<<(msg) click to toggle source
# File lib/message_bus/client.rb, line 48
def <<(msg)
  write_and_close messages_to_json([msg])
end
allowed?(msg) click to toggle source
# File lib/message_bus/client.rb, line 56
def allowed?(msg)
  allowed = !msg.user_ids || msg.user_ids.include?(self.user_id)
  allowed &&= !msg.client_ids || msg.client_ids.include?(self.client_id)
  allowed && (
    msg.group_ids.nil? ||
    msg.group_ids.length == 0 ||
    (
      msg.group_ids - self.group_ids
    ).length < msg.group_ids.length
  )
end
backlog() click to toggle source
# File lib/message_bus/client.rb, line 78
def backlog
  r = []
  @subscriptions.each do |k,v|
    next if v.to_i < 0
    messages = @bus.backlog(k,v)
    messages.each do |msg|
      r << msg if allowed?(msg)
    end
  end
  # stats message for all newly subscribed
  status_message = nil
  @subscriptions.each do |k,v|
    if v.to_i == -1
      status_message ||= {}
      status_message[k] = @bus.last_id(k)
    end
  end
  r << MessageBus::Message.new(-1, -1, '/__status', status_message) if status_message

  r.map!{|msg| filter(msg)}.compact!
  r || []
end
close() click to toggle source
# File lib/message_bus/client.rb, line 29
def close
  return unless in_async?
  write_and_close "[]"
end
closed() click to toggle source
# File lib/message_bus/client.rb, line 34
def closed
  !@async_response
end
ensure_closed!() click to toggle source
# File lib/message_bus/client.rb, line 20
def ensure_closed!
  return unless in_async?
  write_and_close "[]"
rescue
  # we may have a dead socket, just nil the @io
  @io = nil
  @async_response = nil
end
filter(msg) click to toggle source
# File lib/message_bus/client.rb, line 68
def filter(msg)
  filter = @bus.client_filter(msg.channel)

  if filter
    filter.call(self.user_id, msg)
  else
    msg
  end
end
in_async?() click to toggle source
# File lib/message_bus/client.rb, line 16
def in_async?
  @async_response || @io
end
subscribe(channel, last_seen_id) click to toggle source
# File lib/message_bus/client.rb, line 38
def subscribe(channel, last_seen_id)
  last_seen_id = nil if last_seen_id == ""
  last_seen_id ||= @bus.last_id(channel)
  @subscriptions[channel] = last_seen_id.to_i
end
subscriptions() click to toggle source
# File lib/message_bus/client.rb, line 44
def subscriptions
  @subscriptions
end

Protected Instance Methods

messages_to_json(msgs) click to toggle source
# File lib/message_bus/client.rb, line 135
def messages_to_json(msgs)
  MessageBus::Rack::Middleware.backlog_to_json(msgs)
end
write_and_close(data) click to toggle source
# File lib/message_bus/client.rb, line 111
def write_and_close(data)
  if @io
    @io.write(HTTP_11)
    @headers.each do |k,v|
      @io.write(k)
      @io.write(COLON_SPACE)
      @io.write(v)
      @io.write(NEWLINE)
    end
    @io.write(CONTENT_LENGTH)
    @io.write(data.bytes.to_a.length)
    @io.write(NEWLINE)
    @io.write(CONNECTION_CLOSE)
    @io.write(NEWLINE)
    @io.write(data)
    @io.close
    @io = nil
  else
    @async_response << data
    @async_response.done
    @async_response = nil
  end
end