module MessageBus::Implementation

Constants

ENCODE_SITE_TOKEN
MIN_KEEPALIVE

Public Class Methods

new() click to toggle source
# File lib/message_bus.rb, line 30
def initialize
  @mutex = Synchronizer.new
end

Public Instance Methods

after_fork() click to toggle source
# File lib/message_bus.rb, line 308
def after_fork
  reliable_pub_sub.after_fork
  ensure_subscriber_thread
  # will ensure timer is running
  timer.queue{}
end
allow_broadcast=(val) click to toggle source
# File lib/message_bus.rb, line 168
def allow_broadcast=(val)
  @allow_broadcast = val
end
allow_broadcast?() click to toggle source
# File lib/message_bus.rb, line 172
def allow_broadcast?
  @allow_broadcast ||=
    if defined? ::Rails
      ::Rails.env.test? || ::Rails.env.development?
    else
      false
    end
end
around_client_batch(channel, &blk) click to toggle source
# File lib/message_bus.rb, line 152
def around_client_batch(channel, &blk)
  @around_client_batches ||= {}
  @around_client_batches[channel] = blk if blk
  @around_client_batches[channel]
end
backlog(channel=nil, last_id=nil) click to toggle source
# File lib/message_bus.rb, line 271
def backlog(channel=nil, last_id=nil)
  old =
    if channel
      reliable_pub_sub.backlog(encode_channel_name(channel), last_id)
    else
      reliable_pub_sub.global_backlog(last_id)
    end

  old.each{ |m|
    decode_message!(m)
  }
  old
end
blocking_subscribe(channel=nil, &blk) click to toggle source
# File lib/message_bus.rb, line 224
def blocking_subscribe(channel=nil, &blk)
  if channel
    reliable_pub_sub.subscribe(encode_channel_name(channel), &blk)
  else
    reliable_pub_sub.global_subscribe(&blk)
  end
end
cache_assets() click to toggle source
# File lib/message_bus.rb, line 38
def cache_assets
  if defined? @cache_assets
    @cache_assets
  else
    true
  end
end
cache_assets=(val) click to toggle source
# File lib/message_bus.rb, line 34
def cache_assets=(val)
  @cache_assets = val
end
client_filter(channel, &blk) click to toggle source
# File lib/message_bus.rb, line 146
def client_filter(channel, &blk)
  @client_filters ||= {}
  @client_filters[channel] = blk if blk
  @client_filters[channel]
end
decode_channel_name(channel) click to toggle source
# File lib/message_bus.rb, line 244
def decode_channel_name(channel)
  channel.split(ENCODE_SITE_TOKEN)
end
destroy() click to toggle source
# File lib/message_bus.rb, line 298
def destroy
  @mutex.synchronize do
    @subscriptions ||= {}
    reliable_pub_sub.global_unsubscribe
    @destroyed = true
  end
  @subscriber_thread.join if @subscriber_thread
  timer.stop
end
enable_diagnostics() click to toggle source
# File lib/message_bus.rb, line 192
def enable_diagnostics
  MessageBus::Diagnostics.enable
end
encode_channel_name(channel) click to toggle source

encode channel name to include site

# File lib/message_bus.rb, line 235
def encode_channel_name(channel)
  if site_id_lookup && !global?(channel)
    raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN
    "#{channel}#{ENCODE_SITE_TOKEN}#{site_id_lookup.call}"
  else
    channel
  end
end
extra_response_headers_lookup(&blk) click to toggle source
# File lib/message_bus.rb, line 141
def extra_response_headers_lookup(&blk)
  @extra_response_headers_lookup = blk if blk
  @extra_response_headers_lookup
end
global_backlog(last_id=nil) click to toggle source
# File lib/message_bus.rb, line 267
def global_backlog(last_id=nil)
  backlog(nil, last_id)
end
group_ids_lookup(&blk) click to toggle source
# File lib/message_bus.rb, line 131
def group_ids_lookup(&blk)
  @group_ids_lookup = blk if blk
  @group_ids_lookup
end
is_admin_lookup(&blk) click to toggle source
# File lib/message_bus.rb, line 136
def is_admin_lookup(&blk)
  @is_admin_lookup = blk if blk
  @is_admin_lookup
end
keepalive_interval() click to toggle source
# File lib/message_bus.rb, line 342
def keepalive_interval
  @keepalive_interval || 60
end
keepalive_interval=(interval) click to toggle source

set to 0 to disable, anything higher and a keepalive will run every N seconds, if it fails process is killed

# File lib/message_bus.rb, line 338
def keepalive_interval=(interval)
  @keepalive_interval = interval
end
last_id(channel) click to toggle source
# File lib/message_bus.rb, line 285
def last_id(channel)
  reliable_pub_sub.last_id(encode_channel_name(channel))
end
last_message(channel) click to toggle source
# File lib/message_bus.rb, line 289
def last_message(channel)
  if last_id = last_id(channel)
    messages = backlog(channel, last_id-1)
    if messages
      messages[0]
    end
  end
end
listening?() click to toggle source
# File lib/message_bus.rb, line 315
def listening?
  @subscriber_thread && @subscriber_thread.alive?
end
local_subscribe(channel=nil, &blk) click to toggle source

subscribe only on current site

# File lib/message_bus.rb, line 262
def local_subscribe(channel=nil, &blk)
  site_id = site_id_lookup.call if site_id_lookup && ! global?(channel)
  subscribe_impl(channel, site_id, &blk)
end
local_unsubscribe(channel=nil, &blk) click to toggle source
# File lib/message_bus.rb, line 256
def local_unsubscribe(channel=nil, &blk)
  site_id = site_id_lookup.call if site_id_lookup
  unsubscribe_impl(channel, site_id, &blk)
end
logger() click to toggle source
# File lib/message_bus.rb, line 50
def logger
  return @logger if @logger
  require 'logger'
  @logger = Logger.new(STDOUT)
end
logger=(logger) click to toggle source
# File lib/message_bus.rb, line 46
def logger=(logger)
  @logger = logger
end
long_polling_enabled=(val) click to toggle source
# File lib/message_bus.rb, line 60
def long_polling_enabled=(val)
  @long_polling_enabled = val
end
long_polling_enabled?() click to toggle source
# File lib/message_bus.rb, line 56
def long_polling_enabled?
  @long_polling_enabled == false ? false : true
end
long_polling_interval() click to toggle source
# File lib/message_bus.rb, line 100
def long_polling_interval
  @long_polling_interval || 25 * 1000
end
long_polling_interval=(millisecs) click to toggle source
# File lib/message_bus.rb, line 96
def long_polling_interval=(millisecs)
  @long_polling_interval = millisecs
end
max_active_clients() click to toggle source
# File lib/message_bus.rb, line 70
def max_active_clients
  @max_active_clients || 1000
end
max_active_clients=(val) click to toggle source

The number of simultanuous clients we can service

will revert to polling if we are out of slots
# File lib/message_bus.rb, line 66
def max_active_clients=(val)
  @max_active_clients = val
end
off() click to toggle source
# File lib/message_bus.rb, line 104
def off
  @off = true
end
on() click to toggle source
# File lib/message_bus.rb, line 108
def on
  @off = false
end
on_connect(&blk) click to toggle source
# File lib/message_bus.rb, line 158
def on_connect(&blk)
  @on_connect = blk if blk
  @on_connect
end
on_disconnect(&blk) click to toggle source
# File lib/message_bus.rb, line 163
def on_disconnect(&blk)
  @on_disconnect = blk if blk
  @on_disconnect
end
publish(channel, data, opts = nil) click to toggle source
# File lib/message_bus.rb, line 196
def publish(channel, data, opts = nil)
  return if @off
  @mutex.synchronize do
    raise ::MessageBus::BusDestroyed if @destroyed
  end

  user_ids = nil
  group_ids = nil
  client_ids = nil

  if opts
    user_ids = opts[:user_ids]
    group_ids = opts[:group_ids]
    client_ids = opts[:client_ids]
  end

  raise ::MessageBus::InvalidMessage if (user_ids || group_ids) && global?(channel)

  encoded_data = JSON.dump({
    data: data,
    user_ids: user_ids,
    group_ids: group_ids,
    client_ids: client_ids
  })

  reliable_pub_sub.publish(encode_channel_name(channel), encoded_data)
end
rack_hijack_enabled=(val) click to toggle source
# File lib/message_bus.rb, line 92
def rack_hijack_enabled=(val)
  @rack_hijack_enabled = val
end
rack_hijack_enabled?() click to toggle source
# File lib/message_bus.rb, line 74
def rack_hijack_enabled?
  if @rack_hijack_enabled.nil?
    @rack_hijack_enabled = true

    # without this switch passenger will explode
    # it will run out of connections after about 10
    if defined? PhusionPassenger
      @rack_hijack_enabled = false
      if PhusionPassenger.respond_to? :advertised_concurrency_level
        PhusionPassenger.advertised_concurrency_level = 0
        @rack_hijack_enabled = true
      end
    end
  end

  @rack_hijack_enabled
end
redis_config() click to toggle source
# File lib/message_bus.rb, line 117
def redis_config
  @redis_config ||= {}
end
redis_config=(config) click to toggle source

Allow us to inject a redis db

# File lib/message_bus.rb, line 113
def redis_config=(config)
  @redis_config = config
end
reliable_pub_sub() click to toggle source
# File lib/message_bus.rb, line 185
def reliable_pub_sub
  @mutex.synchronize do
    return nil if @destroyed
    @reliable_pub_sub ||= MessageBus::Redis::ReliablePubSub.new redis_config
  end
end
reliable_pub_sub=(pub_sub) click to toggle source
# File lib/message_bus.rb, line 181
def reliable_pub_sub=(pub_sub)
  @reliable_pub_sub = pub_sub
end
reset!() click to toggle source

will reset all keys

# File lib/message_bus.rb, line 320
def reset!
  reliable_pub_sub.reset!
end
site_id_lookup(&blk) click to toggle source
# File lib/message_bus.rb, line 121
def site_id_lookup(&blk)
  @site_id_lookup = blk if blk
  @site_id_lookup
end
subscribe(channel=nil, &blk) click to toggle source
# File lib/message_bus.rb, line 248
def subscribe(channel=nil, &blk)
  subscribe_impl(channel, nil, &blk)
end
timer() click to toggle source
# File lib/message_bus.rb, line 324
def timer
  return @timer_thread if @timer_thread
  @timer_thread ||= begin
    t = MessageBus::TimerThread.new
    t.on_error do |e|
      logger.warn "Failed to process job: #{e} #{e.backtrace}"
    end
    t
  end
end
unsubscribe(channel=nil, &blk) click to toggle source
# File lib/message_bus.rb, line 252
def unsubscribe(channel=nil, &blk)
  unsubscribe_impl(channel, nil, &blk)
end
user_id_lookup(&blk) click to toggle source
# File lib/message_bus.rb, line 126
def user_id_lookup(&blk)
  @user_id_lookup = blk if blk
  @user_id_lookup
end

Protected Instance Methods

decode_message!(msg) click to toggle source
# File lib/message_bus.rb, line 352
def decode_message!(msg)
  channel, site_id = decode_channel_name(msg.channel)
  msg.channel = channel
  msg.site_id = site_id
  parsed = JSON.parse(msg.data)
  msg.data = parsed["data"]
  msg.user_ids = parsed["user_ids"]
  msg.group_ids = parsed["group_ids"]
  msg.client_ids = parsed["client_ids"]
end
ensure_subscriber_thread() click to toggle source
# File lib/message_bus.rb, line 396
def ensure_subscriber_thread
  @mutex.synchronize do
    return if (@subscriber_thread && @subscriber_thread.alive?) || @destroyed
    @subscriber_thread = new_subscriber_thread
  end
end
global?(channel) click to toggle source
# File lib/message_bus.rb, line 348
def global?(channel)
  channel && channel.start_with?('/global/'.freeze)
end
global_subscribe_thread() click to toggle source
# File lib/message_bus.rb, line 454
def global_subscribe_thread
  # pretend we just got a message
  @last_message = Time.now
  reliable_pub_sub.global_subscribe do |msg|
    begin
      @last_message = Time.now
      decode_message!(msg)
      globals, locals, local_globals, global_globals = nil

      @mutex.synchronize do
        raise MessageBus::BusDestroyed if @destroyed
        globals = @subscriptions[nil]
        locals = @subscriptions[msg.site_id] if msg.site_id

        global_globals = globals[nil] if globals
        local_globals = locals[nil] if locals

        globals = globals[msg.channel] if globals
        locals = locals[msg.channel] if locals
      end

      multi_each(globals,locals, global_globals, local_globals) do |c|
        begin
          c.call msg
        rescue => e
          MessageBus.logger.warn "failed to deliver message, skipping #{msg.inspect}\n ex: #{e} backtrace: #{e.backtrace}"
        end
      end
    rescue => e
      MessageBus.logger.warn "failed to process message #{msg.inspect}\n ex: #{e} backtrace: #{e.backtrace}"
    end
    @global_id = msg.global_id
  end
end
multi_each(*args,&block) click to toggle source
# File lib/message_bus.rb, line 489
def multi_each(*args,&block)
  args.each do |a|
    a.each(&block) if a
  end
end
new_subscriber_thread() click to toggle source
# File lib/message_bus.rb, line 405
def new_subscriber_thread

  thread = Thread.new do
    begin
      global_subscribe_thread unless @destroyed
    rescue => e
      MessageBus.logger.warn "Unexpected error in subscriber thread #{e}"
    end
  end

  # adjust for possible race condition
  @last_message = Time.now

  blk = proc do
    if !@destroyed && thread.alive? && keepalive_interval > MIN_KEEPALIVE

      publish("/__mb_keepalive__/", Process.pid, user_ids: [-1])
      # going for x3 keepalives missed for a restart, need to ensure this only very rarely happens
      # note: after_fork will sort out a bad @last_message date, but thread will be dead anyway
      if (Time.now - (@last_message || Time.now)) > keepalive_interval*3
        MessageBus.logger.warn "Global messages on #{Process.pid} timed out, restarting process"
        # No other clean way to remove this thread, its listening on a socket
        #   no data is arriving
        #
        # In production we see this kind of situation ... sometimes ... when there is
        # a VRRP failover, or weird networking condition
        pid = Process.pid

        # do the best we can to terminate self cleanly
        fork do
          Process.kill('TERM', pid)
          sleep 10
          Process.kill('KILL', pid)
        end

        sleep 10
        Process.kill('KILL', pid)

      else
        timer.queue(keepalive_interval, &blk) if keepalive_interval > MIN_KEEPALIVE
      end
    end
  end

  timer.queue(keepalive_interval, &blk) if keepalive_interval > MIN_KEEPALIVE

  thread
end
subscribe_impl(channel, site_id, &blk) click to toggle source
# File lib/message_bus.rb, line 363
def subscribe_impl(channel, site_id, &blk)

  raise MessageBus::BusDestroyed if @destroyed

  @subscriptions ||= {}
  @subscriptions[site_id] ||= {}
  @subscriptions[site_id][channel] ||=  []
  @subscriptions[site_id][channel] << blk
  ensure_subscriber_thread

  attempts = 100
  while attempts > 0 && !reliable_pub_sub.subscribed
    sleep 0.001
    attempts-=1
  end

  raise MessageBus::BusDestroyed if @destroyed
  blk
end
unsubscribe_impl(channel, site_id, &blk) click to toggle source
# File lib/message_bus.rb, line 383
def unsubscribe_impl(channel, site_id, &blk)

  @mutex.synchronize do
    if blk
      @subscriptions[site_id][channel].delete blk
    else
      @subscriptions[site_id][channel] = []
    end
  end

end