module EventMachine::Protocols::Redis

Constants

ALIASES
ASTERISK
BOOLEAN_PROCESSOR
COLON
DELIM
DISABLED_COMMANDS
DOLLAR
MINUS
OK

constants

PLUS
REPLY_PROCESSOR

Attributes

auto_reconnect[RW]
reconnect_on_error[RW]

Public Class Methods

connect(*args) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 337
def connect(*args)
  case args.length
  when 0
    options = {}
  when 1
    arg = args.shift
    case arg
    when Hash then options = arg
    when String then options = parse_url(arg)
    else error ArgumentError, 'first argument must be Hash or String'
    end
  when 2
    options = {:host => args[1], :port => args[2]}
  else
    error ArgumentError, "wrong number of arguments (#{args.length} for 1)"
  end
  options[:host] ||= '127.0.0.1'
  options[:port]   = (options[:port] || 6379).to_i
  EM.connect options[:host], options[:port], self, options
end
new(options = {}) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 359
def initialize(options = {})
  @host               = options[:host]
  @port               = options[:port]
  @db                 = (options[:db] || 0).to_i
  @password           = options[:password]
  @auto_reconnect     = options.fetch(:auto_reconnect, true)
  @reconnect_on_error = options.fetch(:reconnect_on_error, false)
  @logger             = options[:logger]
  @error_callback = lambda do |err|
    raise err
  end
  @reconnect_callbacks = {
    :before => lambda{},
    :after  => lambda{}
  }
  @values = []
end
parse_url(url) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 324
def parse_url(url)
  begin
    uri = URI.parse(url)
    {
      :host => uri.host,
      :port => uri.port,
      :password => uri.password
    }
  rescue
    error ArgumentError, 'invalid redis url'
  end
end

Public Instance Methods

[]=(key,value) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 140
def []=(key,value)
  set(key,value)
end
after_reconnect(&blk) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 239
def after_reconnect(&blk)
  @reconnect_callbacks[:after] = blk
end
auth(password, &blk) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 176
def auth(password, &blk)
  @password = password
  call_command(['auth', password], &blk)
end
before_reconnect(&blk) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 235
def before_reconnect(&blk)
  @reconnect_callbacks[:before] = blk
end
call_command(argv, &blk) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 247
def call_command(argv, &blk)
  callback { raw_call_command(argv, &blk) }
end
call_commands(argvs, &blk) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 258
def call_commands(argvs, &blk)
  callback { raw_call_commands(argvs, &blk) }
end
close() click to toggle source
# File lib/em-redis/redis_protocol.rb, line 519
def close
  @closing = true
  close_connection_after_writing
end
connected?() click to toggle source
# File lib/em-redis/redis_protocol.rb, line 506
def connected?
  @connected || false
end
connection_completed() click to toggle source
# File lib/em-redis/redis_protocol.rb, line 387
def connection_completed
  @logger.debug { "Connected to #{@host}:#{@port}" } if @logger
  @redis_callbacks = []
  @multibulk_n     = false
  @connected       = true
  auth_and_select_db
  @reconnect_callbacks[:after].call if @reconnecting
  @reconnecting = false
  succeed
end
decr(key, decrement = nil, &blk) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 167
def decr(key, decrement = nil, &blk)
  call_command(decrement ? ["decrby",key,decrement] : ["decr",key], &blk)
end
dispatch_error(code) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 456
def dispatch_error(code)
  @redis_callbacks.shift
  error RedisError, code
end
dispatch_response(value) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 461
def dispatch_response(value)
  if @multibulk_n
    @multibulk_values << value
    @multibulk_n -= 1

    if @multibulk_n == 0
      value = @multibulk_values
      @multibulk_n = false
    else
      return
    end
  end

  if @subscribe_callbacks && value.is_a?(Array)
    if %w[message unsubscribe].include?(value[0])
      @subscribe_callbacks[value[1]].each do |blk|
        blk.call(*value) if blk
      end
      return
    end
  end

  callback = @redis_callbacks.shift
  if callback.kind_of?(Array) && callback.length == 2
    processor, blk = callback
    value = processor.call(value) if processor
    blk.call(value) if blk
  elsif callback.kind_of?(Array) && callback.length == 3
    processor, pipeline_count, blk = callback
    value = processor.call(value) if processor
    @values << value
    if pipeline_count > 1
      @redis_callbacks.unshift [processor, pipeline_count - 1, blk]
    else
      blk.call(@values) if blk
      @values = []
    end
  end
end
errback(&blk) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 224
def errback(&blk)
  @error_callback = blk
end
Also aliased as: on_error
error(klass, msg) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 229
def error(klass, msg)
  err = klass.new(msg)
  err.code = msg if err.respond_to?(:code)
  @error_callback.call(err)
end
incr(key, increment = nil, &blk) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 163
def incr(key, increment = nil, &blk)
  call_command(increment ? ["incrby",key,increment] : ["incr",key], &blk)
end
mapped_mget(*keys) { |result| ... } click to toggle source

Similar to memcache.rb's get_multi, returns a hash mapping keys to values.

# File lib/em-redis/redis_protocol.rb, line 183
def mapped_mget(*keys)
  mget(*keys) do |response|
    result = {}
    response.each do |value|
      key = keys.shift
      result.merge!(key => value) unless value.nil?
    end
    yield result if block_given?
  end
end
method_missing(*argv, &blk) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 243
def method_missing(*argv, &blk)
  call_command(argv, &blk)
end
on_error(&blk)
Alias for: errback
process_cmd(line) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 414
def process_cmd(line)
  @logger.debug { "*** processing #{line}" } if @logger
  # first character of buffer will always be the response type
  reply_type = line[0, 1]
  reply_args = line.slice(1..-3) # remove type character and \r\n
  case reply_type
  # e.g. -ERR
  when MINUS
    # server ERROR
    dispatch_error(reply_args)
  # e.g. +OK
  when PLUS
    dispatch_response(reply_args)
  # e.g. $3\r\nabc\r\n
  # 'bulk' is more complex because it could be part of multi-bulk
  when DOLLAR
    data_len = Integer(reply_args)
    if data_len == -1 # expect no data; return nil
      dispatch_response(nil)
    elsif @buffer.size >= data_len + 2 # buffer is full of expected data
      dispatch_response(@buffer.slice!(0, data_len))
      @buffer.slice!(0,2) # tossing \r\n
    else # buffer isn't full or nil
      raise ParserError
    end
  # e.g. :8
  when COLON
    dispatch_response(Integer(reply_args))
  # e.g. *2\r\n$1\r\na\r\n$1\r\nb\r\n
  when ASTERISK
    multibulk_count = Integer(reply_args)
    if multibulk_count == -1 || multibulk_count == 0
      dispatch_response([])
    else
      start_multibulk(multibulk_count)
    end
  # WAT?
  else
    error ProtocolError, "reply type not recognized: #{line.strip}"
  end
end
quit(&blk) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 220
def quit(&blk)
  call_command(['quit'], &blk)
end
raw_call_command(argv, &blk) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 251
def raw_call_command(argv, &blk)
  argv[0] = argv[0].to_s unless argv[0].kind_of? String
  argv[0] = argv[0].downcase
  send_command(argv)
  @redis_callbacks << [REPLY_PROCESSOR[argv[0]], blk]
end
raw_call_commands(argvs, &blk) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 262
def raw_call_commands(argvs, &blk)
  if argvs.empty?  # Shortcut
    blk.call []
    return
  end

  argvs.each do |argv|
    argv[0] = argv[0].to_s unless argv[0].kind_of? String
    send_command argv
  end
  # FIXME: argvs may contain heterogenous commands, storing all
  # REPLY_PROCESSORs may turn out expensive and has been omitted
  # for now.
  @redis_callbacks << [nil, argvs.length, blk]
end
receive_data(data) click to toggle source

19Feb09 Switched to a custom parser, LineText2 is recursive and can cause

stack overflows when there is too much data.

include EM::P::LineText2

# File lib/em-redis/redis_protocol.rb, line 401
def receive_data(data)
  (@buffer ||= '') << data
  while index = @buffer.index(DELIM)
    begin
      line = @buffer.slice!(0, index+2)
      process_cmd line
    rescue ParserError
      @buffer[0...0] = line
      break
    end
  end
end
reconnect!() click to toggle source
# File lib/em-redis/redis_protocol.rb, line 510
def reconnect!
  @reconnect_callbacks[:before].call unless @reconnecting
  @reconnecting = true
  EM.add_timer(1) do
    @logger.debug { "Reconnecting to #{@host}:#{@port}" } if @logger
    reconnect(@host, @port)
  end
end
select(db, &blk) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 171
def select(db, &blk)
  @db = db.to_i
  call_command(['select', @db], &blk)
end
send_command(argv) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 278
def send_command(argv)
  argv = argv.dup

  error DisabledCommand, "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]]
  argv[0] = ALIASES[argv[0]] if ALIASES[argv[0]]

  if argv[-1].is_a?(Hash)
    argv[-1] = argv[-1].to_a
    argv.flatten!
  end

  command = ["*#{argv.size}"]
  argv.each do |v|
    v = v.to_s
    command << "$#{get_size(v)}"
    command << v
  end
  command = command.map {|cmd| cmd + DELIM}.join

  @logger.debug { "*** sending: #{command}" } if @logger
  send_data command
end
set(key, value, expiry=nil) { |s| ... } click to toggle source
# File lib/em-redis/redis_protocol.rb, line 144
def set(key, value, expiry=nil)
  call_command(["set", key, value]) do |s|
    yield s if block_given?
  end
  expire(key, expiry) if expiry
end
sort(key, options={}, &blk) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 151
def sort(key, options={}, &blk)
  cmd = ["sort", key]
  cmd << ["by", options[:by]] if options[:by]
  Array(options[:get]).each do |v|
    cmd << ["get", v]
  end
  cmd << options[:order].split(" ") if options[:order]
  cmd << ["limit", options[:limit]] if options[:limit]
  cmd << ["store", options[:store]] if options[:store]
  call_command(cmd.flatten, &blk)
end
start_multibulk(multibulk_count) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 501
def start_multibulk(multibulk_count)
  @multibulk_n = multibulk_count
  @multibulk_values = []
end
subscribe(channel, proc=nil, &blk) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 194
def subscribe(channel, proc=nil, &blk)
  @subscribe_callbacks ||= Hash.new([])
  @subscribe_callbacks[channel] += [(proc || blk)]
  call_command(['subscribe', channel], &blk)
end
type(key, &blk) click to toggle source

Ruby defines a now deprecated type method so we need to override it here since it will never hit #method_missing

# File lib/em-redis/redis_protocol.rb, line 216
def type(key, &blk)
  call_command(['type', key], &blk)
end
unbind() click to toggle source
# File lib/em-redis/redis_protocol.rb, line 524
def unbind
  @logger.debug { "Disconnected" } if @logger
  if @closing
    @reconnecting = false
  elsif ((@connected || @reconnecting) && @auto_reconnect) || @reconnect_on_error
    reconnect!
  elsif @connected
    error ConnectionError, 'connection closed'
  else
    error ConnectionError, 'unable to connect to redis server'
  end
  @connected = false
  @deferred_status = nil
end
unsubscribe(channel=nil, &blk) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 200
def unsubscribe(channel=nil, &blk)
  @subscribe_callbacks ||= Hash.new([])
  argv = ["unsubscribe"]
  if channel
    @subscribe_callbacks[channel] = [blk]
    argv << channel
  else
    @subscribe_callbacks.each_key do |key|
      @subscribe_callbacks[key] = [blk]
    end
  end
  callback { send_command(argv) }
end

Private Instance Methods

auth_and_select_db() click to toggle source
# File lib/em-redis/redis_protocol.rb, line 377
def auth_and_select_db
  # auth and select go to the front of the line
  callbacks = @callbacks || []
  @callbacks = []
  call_command(["auth", @password]) if @password
  call_command(["select", @db]) unless @db == 0
  callbacks.each { |block| callback &block }
end
get_size(string) click to toggle source
# File lib/em-redis/redis_protocol.rb, line 540
def get_size(string)
  string.respond_to?(:bytesize) ? string.bytesize : string.size
end