module MessageBus::Implementation
Constants
- ENCODE_SITE_TOKEN
- MIN_KEEPALIVE
Public Class Methods
new()
click to toggle source
# File lib/message_bus.rb, line 30 def initialize @mutex = Synchronizer.new end
Public Instance Methods
after_fork()
click to toggle source
# File lib/message_bus.rb, line 308 def after_fork reliable_pub_sub.after_fork ensure_subscriber_thread # will ensure timer is running timer.queue{} end
allow_broadcast=(val)
click to toggle source
# File lib/message_bus.rb, line 168 def allow_broadcast=(val) @allow_broadcast = val end
allow_broadcast?()
click to toggle source
# File lib/message_bus.rb, line 172 def allow_broadcast? @allow_broadcast ||= if defined? ::Rails ::Rails.env.test? || ::Rails.env.development? else false end end
around_client_batch(channel, &blk)
click to toggle source
# File lib/message_bus.rb, line 152 def around_client_batch(channel, &blk) @around_client_batches ||= {} @around_client_batches[channel] = blk if blk @around_client_batches[channel] end
backlog(channel=nil, last_id=nil)
click to toggle source
# File lib/message_bus.rb, line 271 def backlog(channel=nil, last_id=nil) old = if channel reliable_pub_sub.backlog(encode_channel_name(channel), last_id) else reliable_pub_sub.global_backlog(last_id) end old.each{ |m| decode_message!(m) } old end
blocking_subscribe(channel=nil, &blk)
click to toggle source
# File lib/message_bus.rb, line 224 def blocking_subscribe(channel=nil, &blk) if channel reliable_pub_sub.subscribe(encode_channel_name(channel), &blk) else reliable_pub_sub.global_subscribe(&blk) end end
cache_assets()
click to toggle source
# File lib/message_bus.rb, line 38 def cache_assets if defined? @cache_assets @cache_assets else true end end
cache_assets=(val)
click to toggle source
# File lib/message_bus.rb, line 34 def cache_assets=(val) @cache_assets = val end
client_filter(channel, &blk)
click to toggle source
# File lib/message_bus.rb, line 146 def client_filter(channel, &blk) @client_filters ||= {} @client_filters[channel] = blk if blk @client_filters[channel] end
decode_channel_name(channel)
click to toggle source
# File lib/message_bus.rb, line 244 def decode_channel_name(channel) channel.split(ENCODE_SITE_TOKEN) end
destroy()
click to toggle source
# File lib/message_bus.rb, line 298 def destroy @mutex.synchronize do @subscriptions ||= {} reliable_pub_sub.global_unsubscribe @destroyed = true end @subscriber_thread.join if @subscriber_thread timer.stop end
enable_diagnostics()
click to toggle source
# File lib/message_bus.rb, line 192 def enable_diagnostics MessageBus::Diagnostics.enable end
encode_channel_name(channel)
click to toggle source
encode channel name to include site
# File lib/message_bus.rb, line 235 def encode_channel_name(channel) if site_id_lookup && !global?(channel) raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN "#{channel}#{ENCODE_SITE_TOKEN}#{site_id_lookup.call}" else channel end end
extra_response_headers_lookup(&blk)
click to toggle source
# File lib/message_bus.rb, line 141 def extra_response_headers_lookup(&blk) @extra_response_headers_lookup = blk if blk @extra_response_headers_lookup end
global_backlog(last_id=nil)
click to toggle source
# File lib/message_bus.rb, line 267 def global_backlog(last_id=nil) backlog(nil, last_id) end
group_ids_lookup(&blk)
click to toggle source
# File lib/message_bus.rb, line 131 def group_ids_lookup(&blk) @group_ids_lookup = blk if blk @group_ids_lookup end
is_admin_lookup(&blk)
click to toggle source
# File lib/message_bus.rb, line 136 def is_admin_lookup(&blk) @is_admin_lookup = blk if blk @is_admin_lookup end
keepalive_interval()
click to toggle source
# File lib/message_bus.rb, line 342 def keepalive_interval @keepalive_interval || 60 end
keepalive_interval=(interval)
click to toggle source
set to 0 to disable, anything higher and a keepalive will run every N seconds, if it fails process is killed
# File lib/message_bus.rb, line 338 def keepalive_interval=(interval) @keepalive_interval = interval end
last_id(channel)
click to toggle source
# File lib/message_bus.rb, line 285 def last_id(channel) reliable_pub_sub.last_id(encode_channel_name(channel)) end
last_message(channel)
click to toggle source
# File lib/message_bus.rb, line 289 def last_message(channel) if last_id = last_id(channel) messages = backlog(channel, last_id-1) if messages messages[0] end end end
listening?()
click to toggle source
# File lib/message_bus.rb, line 315 def listening? @subscriber_thread && @subscriber_thread.alive? end
local_subscribe(channel=nil, &blk)
click to toggle source
subscribe only on current site
# File lib/message_bus.rb, line 262 def local_subscribe(channel=nil, &blk) site_id = site_id_lookup.call if site_id_lookup && ! global?(channel) subscribe_impl(channel, site_id, &blk) end
local_unsubscribe(channel=nil, &blk)
click to toggle source
# File lib/message_bus.rb, line 256 def local_unsubscribe(channel=nil, &blk) site_id = site_id_lookup.call if site_id_lookup unsubscribe_impl(channel, site_id, &blk) end
logger()
click to toggle source
# File lib/message_bus.rb, line 50 def logger return @logger if @logger require 'logger' @logger = Logger.new(STDOUT) end
logger=(logger)
click to toggle source
# File lib/message_bus.rb, line 46 def logger=(logger) @logger = logger end
long_polling_enabled=(val)
click to toggle source
# File lib/message_bus.rb, line 60 def long_polling_enabled=(val) @long_polling_enabled = val end
long_polling_enabled?()
click to toggle source
# File lib/message_bus.rb, line 56 def long_polling_enabled? @long_polling_enabled == false ? false : true end
long_polling_interval()
click to toggle source
# File lib/message_bus.rb, line 100 def long_polling_interval @long_polling_interval || 25 * 1000 end
long_polling_interval=(millisecs)
click to toggle source
# File lib/message_bus.rb, line 96 def long_polling_interval=(millisecs) @long_polling_interval = millisecs end
max_active_clients()
click to toggle source
# File lib/message_bus.rb, line 70 def max_active_clients @max_active_clients || 1000 end
max_active_clients=(val)
click to toggle source
The number of simultanuous clients we can service
will revert to polling if we are out of slots
# File lib/message_bus.rb, line 66 def max_active_clients=(val) @max_active_clients = val end
off()
click to toggle source
# File lib/message_bus.rb, line 104 def off @off = true end
on()
click to toggle source
# File lib/message_bus.rb, line 108 def on @off = false end
on_connect(&blk)
click to toggle source
# File lib/message_bus.rb, line 158 def on_connect(&blk) @on_connect = blk if blk @on_connect end
on_disconnect(&blk)
click to toggle source
# File lib/message_bus.rb, line 163 def on_disconnect(&blk) @on_disconnect = blk if blk @on_disconnect end
publish(channel, data, opts = nil)
click to toggle source
# File lib/message_bus.rb, line 196 def publish(channel, data, opts = nil) return if @off @mutex.synchronize do raise ::MessageBus::BusDestroyed if @destroyed end user_ids = nil group_ids = nil client_ids = nil if opts user_ids = opts[:user_ids] group_ids = opts[:group_ids] client_ids = opts[:client_ids] end raise ::MessageBus::InvalidMessage if (user_ids || group_ids) && global?(channel) encoded_data = JSON.dump({ data: data, user_ids: user_ids, group_ids: group_ids, client_ids: client_ids }) reliable_pub_sub.publish(encode_channel_name(channel), encoded_data) end
rack_hijack_enabled=(val)
click to toggle source
# File lib/message_bus.rb, line 92 def rack_hijack_enabled=(val) @rack_hijack_enabled = val end
rack_hijack_enabled?()
click to toggle source
# File lib/message_bus.rb, line 74 def rack_hijack_enabled? if @rack_hijack_enabled.nil? @rack_hijack_enabled = true # without this switch passenger will explode # it will run out of connections after about 10 if defined? PhusionPassenger @rack_hijack_enabled = false if PhusionPassenger.respond_to? :advertised_concurrency_level PhusionPassenger.advertised_concurrency_level = 0 @rack_hijack_enabled = true end end end @rack_hijack_enabled end
redis_config()
click to toggle source
# File lib/message_bus.rb, line 117 def redis_config @redis_config ||= {} end
redis_config=(config)
click to toggle source
Allow us to inject a redis db
# File lib/message_bus.rb, line 113 def redis_config=(config) @redis_config = config end
reliable_pub_sub()
click to toggle source
# File lib/message_bus.rb, line 185 def reliable_pub_sub @mutex.synchronize do return nil if @destroyed @reliable_pub_sub ||= MessageBus::Redis::ReliablePubSub.new redis_config end end
reliable_pub_sub=(pub_sub)
click to toggle source
# File lib/message_bus.rb, line 181 def reliable_pub_sub=(pub_sub) @reliable_pub_sub = pub_sub end
reset!()
click to toggle source
will reset all keys
# File lib/message_bus.rb, line 320 def reset! reliable_pub_sub.reset! end
site_id_lookup(&blk)
click to toggle source
# File lib/message_bus.rb, line 121 def site_id_lookup(&blk) @site_id_lookup = blk if blk @site_id_lookup end
subscribe(channel=nil, &blk)
click to toggle source
# File lib/message_bus.rb, line 248 def subscribe(channel=nil, &blk) subscribe_impl(channel, nil, &blk) end
timer()
click to toggle source
# File lib/message_bus.rb, line 324 def timer return @timer_thread if @timer_thread @timer_thread ||= begin t = MessageBus::TimerThread.new t.on_error do |e| logger.warn "Failed to process job: #{e} #{e.backtrace}" end t end end
unsubscribe(channel=nil, &blk)
click to toggle source
# File lib/message_bus.rb, line 252 def unsubscribe(channel=nil, &blk) unsubscribe_impl(channel, nil, &blk) end
user_id_lookup(&blk)
click to toggle source
# File lib/message_bus.rb, line 126 def user_id_lookup(&blk) @user_id_lookup = blk if blk @user_id_lookup end
Protected Instance Methods
decode_message!(msg)
click to toggle source
# File lib/message_bus.rb, line 352 def decode_message!(msg) channel, site_id = decode_channel_name(msg.channel) msg.channel = channel msg.site_id = site_id parsed = JSON.parse(msg.data) msg.data = parsed["data"] msg.user_ids = parsed["user_ids"] msg.group_ids = parsed["group_ids"] msg.client_ids = parsed["client_ids"] end
ensure_subscriber_thread()
click to toggle source
# File lib/message_bus.rb, line 396 def ensure_subscriber_thread @mutex.synchronize do return if (@subscriber_thread && @subscriber_thread.alive?) || @destroyed @subscriber_thread = new_subscriber_thread end end
global?(channel)
click to toggle source
# File lib/message_bus.rb, line 348 def global?(channel) channel && channel.start_with?('/global/'.freeze) end
global_subscribe_thread()
click to toggle source
# File lib/message_bus.rb, line 454 def global_subscribe_thread # pretend we just got a message @last_message = Time.now reliable_pub_sub.global_subscribe do |msg| begin @last_message = Time.now decode_message!(msg) globals, locals, local_globals, global_globals = nil @mutex.synchronize do raise MessageBus::BusDestroyed if @destroyed globals = @subscriptions[nil] locals = @subscriptions[msg.site_id] if msg.site_id global_globals = globals[nil] if globals local_globals = locals[nil] if locals globals = globals[msg.channel] if globals locals = locals[msg.channel] if locals end multi_each(globals,locals, global_globals, local_globals) do |c| begin c.call msg rescue => e MessageBus.logger.warn "failed to deliver message, skipping #{msg.inspect}\n ex: #{e} backtrace: #{e.backtrace}" end end rescue => e MessageBus.logger.warn "failed to process message #{msg.inspect}\n ex: #{e} backtrace: #{e.backtrace}" end @global_id = msg.global_id end end
multi_each(*args,&block)
click to toggle source
# File lib/message_bus.rb, line 489 def multi_each(*args,&block) args.each do |a| a.each(&block) if a end end
new_subscriber_thread()
click to toggle source
# File lib/message_bus.rb, line 405 def new_subscriber_thread thread = Thread.new do begin global_subscribe_thread unless @destroyed rescue => e MessageBus.logger.warn "Unexpected error in subscriber thread #{e}" end end # adjust for possible race condition @last_message = Time.now blk = proc do if !@destroyed && thread.alive? && keepalive_interval > MIN_KEEPALIVE publish("/__mb_keepalive__/", Process.pid, user_ids: [-1]) # going for x3 keepalives missed for a restart, need to ensure this only very rarely happens # note: after_fork will sort out a bad @last_message date, but thread will be dead anyway if (Time.now - (@last_message || Time.now)) > keepalive_interval*3 MessageBus.logger.warn "Global messages on #{Process.pid} timed out, restarting process" # No other clean way to remove this thread, its listening on a socket # no data is arriving # # In production we see this kind of situation ... sometimes ... when there is # a VRRP failover, or weird networking condition pid = Process.pid # do the best we can to terminate self cleanly fork do Process.kill('TERM', pid) sleep 10 Process.kill('KILL', pid) end sleep 10 Process.kill('KILL', pid) else timer.queue(keepalive_interval, &blk) if keepalive_interval > MIN_KEEPALIVE end end end timer.queue(keepalive_interval, &blk) if keepalive_interval > MIN_KEEPALIVE thread end
subscribe_impl(channel, site_id, &blk)
click to toggle source
# File lib/message_bus.rb, line 363 def subscribe_impl(channel, site_id, &blk) raise MessageBus::BusDestroyed if @destroyed @subscriptions ||= {} @subscriptions[site_id] ||= {} @subscriptions[site_id][channel] ||= [] @subscriptions[site_id][channel] << blk ensure_subscriber_thread attempts = 100 while attempts > 0 && !reliable_pub_sub.subscribed sleep 0.001 attempts-=1 end raise MessageBus::BusDestroyed if @destroyed blk end
unsubscribe_impl(channel, site_id, &blk)
click to toggle source
# File lib/message_bus.rb, line 383 def unsubscribe_impl(channel, site_id, &blk) @mutex.synchronize do if blk @subscriptions[site_id][channel].delete blk else @subscriptions[site_id][channel] = [] end end end