class MessageBus::Redis::ReliablePubSub

Constants

UNSUB_MESSAGE

Attributes

max_backlog_age[RW]
max_backlog_size[RW]
max_global_backlog_size[RW]
max_in_memory_publish_backlog[RW]
max_publish_retries[RW]
max_publish_wait[RW]
subscribed[R]

Public Class Methods

new(redis_config = {}, max_backlog_size = 1000) click to toggle source

#max_backlog_size is per multiplexed channel

# File lib/message_bus/redis/reliable_pub_sub.rb, line 29
def initialize(redis_config = {}, max_backlog_size = 1000)
  @redis_config = redis_config
  @max_backlog_size = max_backlog_size
  @max_global_backlog_size = 2000
  @max_publish_retries = 10
  @max_publish_wait = 500 #ms
  @max_in_memory_publish_backlog = 1000
  @in_memory_backlog = []
  @lock = Mutex.new
  @flush_backlog_thread = nil
  # after 7 days inactive backlogs will be removed
  @max_backlog_age = 604800
end

Public Instance Methods

after_fork() click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 47
def after_fork
  pub_redis.client.reconnect
end
backlog(channel, last_id = nil) click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 190
def backlog(channel, last_id = nil)
  redis = pub_redis
  backlog_key = backlog_key(channel)
  items = redis.zrangebyscore backlog_key, last_id.to_i + 1, "+inf"

  items.map do |i|
    MessageBus::Message.decode(i)
  end
end
backlog_id_key(channel) click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 65
def backlog_id_key(channel)
  "__mb_backlog_id_n_#{channel}"
end
backlog_key(channel) click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 61
def backlog_key(channel)
  "__mb_backlog_n_#{channel}"
end
ensure_backlog_flushed() click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 150
def ensure_backlog_flushed
  while true
    try_again = false

    @lock.synchronize do
      break if @in_memory_backlog.length == 0

      begin
        publish(*@in_memory_backlog[0],false)
      rescue Redis::CommandError => e
        if e.message =~ /^READONLY/
          try_again = true
        else
          MessageBus.logger.warn("Dropping undeliverable message #{e}")
        end
      rescue => e
        MessageBus.logger.warn("Dropping undeliverable message #{e}")
      end

      @in_memory_backlog.delete_at(0) unless try_again
    end

    if try_again
      sleep 0.005
      # in case we are not connected to the correct server
      # which can happen when sharing ips
      pub_redis.client.reconnect
    end
  end
ensure
  @lock.synchronize do
    @flush_backlog_thread = nil
  end
end
get_message(channel, message_id) click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 218
def get_message(channel, message_id)
  redis = pub_redis
  backlog_key = backlog_key(channel)

  items = redis.zrangebyscore backlog_key, message_id, message_id
  if items && items[0]
    MessageBus::Message.decode(items[0])
  else
    nil
  end
end
global_backlog(last_id = nil) click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 200
def global_backlog(last_id = nil)
  last_id = last_id.to_i
  redis = pub_redis

  items = redis.zrangebyscore global_backlog_key, last_id.to_i + 1, "+inf"

  items.map! do |i|
    pipe = i.index "|"
    message_id = i[0..pipe].to_i
    channel = i[pipe+1..-1]
    m = get_message(channel, message_id)
    m
  end

  items.compact!
  items
end
global_backlog_key() click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 73
def global_backlog_key
  "__mb_global_backlog_n"
end
global_id_key() click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 69
def global_id_key
  "__mb_global_id_n"
end
global_subscribe(last_id=nil) { |m| ... } click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 278
def global_subscribe(last_id=nil, &blk)
  raise ArgumentError unless block_given?
  highest_id = last_id

  clear_backlog = lambda do
    retries = 4
    begin
      highest_id = process_global_backlog(highest_id, retries > 0, &blk)
    rescue BackLogOutOfOrder => e
      highest_id = e.highest_id
      retries -= 1
      sleep(rand(50) / 1000.0)
      retry
    end
  end


  begin
    @redis_global = new_redis_connection

    if highest_id
      clear_backlog.call(&blk)
    end

    @redis_global.subscribe(redis_channel_name) do |on|
      on.subscribe do
        if highest_id
          clear_backlog.call(&blk)
        end
        @subscribed = true
      end

      on.unsubscribe do
        @subscribed = false
      end

      on.message do |c,m|
        if m == UNSUB_MESSAGE
          @redis_global.unsubscribe
          return
        end
        m = MessageBus::Message.decode m

        # we have 3 options
        #
        # 1. message came in the correct order GREAT, just deal with it
        # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog
        # 3. message came in the incorrect order and is lowest than current highest id, reset

        if highest_id.nil? || m.global_id == highest_id + 1
          highest_id = m.global_id
          yield m
        else
          clear_backlog.call(&blk)
        end
      end
    end
  rescue => error
    MessageBus.logger.warn "#{error} subscribe failed, reconnecting in 1 second. Call stack #{error.backtrace}"
    sleep 1
    retry
  end
end
global_unsubscribe() click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 270
def global_unsubscribe
  if @redis_global
    pub_redis.publish(redis_channel_name, UNSUB_MESSAGE)
    @redis_global.disconnect
    @redis_global = nil
  end
end
last_id(channel) click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 185
def last_id(channel)
  backlog_id_key = backlog_id_key(channel)
  pub_redis.get(backlog_id_key).to_i
end
new_redis_connection() click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 43
def new_redis_connection
  ::Redis.new(@redis_config)
end
process_global_backlog(highest_id, raise_error) { |old| ... } click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 249
def process_global_backlog(highest_id, raise_error, &blk)
  if highest_id > pub_redis.get(global_id_key).to_i
    highest_id = 0
  end

  global_backlog(highest_id).each do |old|
    if highest_id + 1 == old.global_id
      yield old
      highest_id = old.global_id
    else
      raise BackLogOutOfOrder.new(highest_id) if raise_error
      if old.global_id > highest_id
        yield old
        highest_id = old.global_id
      end
    end
  end

  highest_id
end
pub_redis() click to toggle source

redis connection used for publishing messages

# File lib/message_bus/redis/reliable_pub_sub.rb, line 57
def pub_redis
  @pub_redis ||= new_redis_connection
end
publish(channel, data, queue_in_memory=true) click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 84
def publish(channel, data, queue_in_memory=true)
  redis = pub_redis
  backlog_id_key = backlog_id_key(channel)
  backlog_key = backlog_key(channel)

  global_id = nil
  backlog_id = nil

  redis.multi do |m|
    global_id = m.incr(global_id_key)
    backlog_id = m.incr(backlog_id_key)
  end

  global_id = global_id.value
  backlog_id = backlog_id.value

  msg = MessageBus::Message.new global_id, backlog_id, channel, data
  payload = msg.encode

  redis.multi do |m|

    redis.zadd backlog_key, backlog_id, payload
    redis.expire backlog_key, @max_backlog_age

    redis.zadd global_backlog_key, global_id, backlog_id.to_s << "|" << channel
    redis.expire global_backlog_key, @max_backlog_age

    redis.publish redis_channel_name, payload

    if backlog_id > @max_backlog_size
      redis.zremrangebyscore backlog_key, 1, backlog_id - @max_backlog_size
    end

    if global_id > @max_global_backlog_size
      redis.zremrangebyscore global_backlog_key, 1, global_id - @max_global_backlog_size
    end

  end

  backlog_id

rescue Redis::CommandError => e
  if queue_in_memory &&
        e.message =~ /^READONLY/

    @lock.synchronize do
      @in_memory_backlog << [channel,data]
      if @in_memory_backlog.length > @max_in_memory_publish_backlog
        @in_memory_backlog.delete_at(0)
        MessageBus.logger.warn("Dropping old message cause max_in_memory_publish_backlog is full")
      end
    end

    if @flush_backlog_thread == nil
      @lock.synchronize do
        if @flush_backlog_thread == nil
          @flush_backlog_thread = Thread.new{ensure_backlog_flushed}
        end
      end
    end
    nil
  else
    raise
  end
end
redis_channel_name() click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 51
def redis_channel_name
  db = @redis_config[:db] || 0
  "_message_bus_#{db}"
end
reset!() click to toggle source

use with extreme care, will nuke all of the data

# File lib/message_bus/redis/reliable_pub_sub.rb, line 78
def reset!
  pub_redis.keys("__mb_*").each do |k|
    pub_redis.del k
  end
end
subscribe(channel, last_id = nil) { |m| ... } click to toggle source
# File lib/message_bus/redis/reliable_pub_sub.rb, line 230
def subscribe(channel, last_id = nil)
  # trivial implementation for now,
  #   can cut down on connections if we only have one global subscriber
  raise ArgumentError unless block_given?

  if last_id
    # we need to translate this to a global id, at least give it a shot
    #   we are subscribing on global and global is always going to be bigger than local
    #   so worst case is a replay of a few messages
    message = get_message(channel, last_id)
    if message
      last_id = message.global_id
    end
  end
  global_subscribe(last_id) do |m|
    yield m if m.channel == channel
  end
end