module EventMachine::Protocols::Redis
Constants
- ALIASES
- ASTERISK
- BOOLEAN_PROCESSOR
- COLON
- DELIM
- DISABLED_COMMANDS
- DOLLAR
- MINUS
- OK
constants
- PLUS
- REPLY_PROCESSOR
Attributes
auto_reconnect[RW]
reconnect_on_error[RW]
Public Class Methods
connect(*args)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 337 def connect(*args) case args.length when 0 options = {} when 1 arg = args.shift case arg when Hash then options = arg when String then options = parse_url(arg) else error ArgumentError, 'first argument must be Hash or String' end when 2 options = {:host => args[1], :port => args[2]} else error ArgumentError, "wrong number of arguments (#{args.length} for 1)" end options[:host] ||= '127.0.0.1' options[:port] = (options[:port] || 6379).to_i EM.connect options[:host], options[:port], self, options end
new(options = {})
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 359 def initialize(options = {}) @host = options[:host] @port = options[:port] @db = (options[:db] || 0).to_i @password = options[:password] @auto_reconnect = options.fetch(:auto_reconnect, true) @reconnect_on_error = options.fetch(:reconnect_on_error, false) @logger = options[:logger] @error_callback = lambda do |err| raise err end @reconnect_callbacks = { :before => lambda{}, :after => lambda{} } @values = [] end
parse_url(url)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 324 def parse_url(url) begin uri = URI.parse(url) { :host => uri.host, :port => uri.port, :password => uri.password } rescue error ArgumentError, 'invalid redis url' end end
Public Instance Methods
[]=(key,value)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 140 def []=(key,value) set(key,value) end
after_reconnect(&blk)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 239 def after_reconnect(&blk) @reconnect_callbacks[:after] = blk end
auth(password, &blk)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 176 def auth(password, &blk) @password = password call_command(['auth', password], &blk) end
before_reconnect(&blk)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 235 def before_reconnect(&blk) @reconnect_callbacks[:before] = blk end
call_command(argv, &blk)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 247 def call_command(argv, &blk) callback { raw_call_command(argv, &blk) } end
call_commands(argvs, &blk)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 258 def call_commands(argvs, &blk) callback { raw_call_commands(argvs, &blk) } end
close()
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 519 def close @closing = true close_connection_after_writing end
connected?()
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 506 def connected? @connected || false end
connection_completed()
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 387 def connection_completed @logger.debug { "Connected to #{@host}:#{@port}" } if @logger @redis_callbacks = [] @multibulk_n = false @connected = true auth_and_select_db @reconnect_callbacks[:after].call if @reconnecting @reconnecting = false succeed end
decr(key, decrement = nil, &blk)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 167 def decr(key, decrement = nil, &blk) call_command(decrement ? ["decrby",key,decrement] : ["decr",key], &blk) end
dispatch_error(code)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 456 def dispatch_error(code) @redis_callbacks.shift error RedisError, code end
dispatch_response(value)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 461 def dispatch_response(value) if @multibulk_n @multibulk_values << value @multibulk_n -= 1 if @multibulk_n == 0 value = @multibulk_values @multibulk_n = false else return end end if @subscribe_callbacks && value.is_a?(Array) if %w[message unsubscribe].include?(value[0]) @subscribe_callbacks[value[1]].each do |blk| blk.call(*value) if blk end return end end callback = @redis_callbacks.shift if callback.kind_of?(Array) && callback.length == 2 processor, blk = callback value = processor.call(value) if processor blk.call(value) if blk elsif callback.kind_of?(Array) && callback.length == 3 processor, pipeline_count, blk = callback value = processor.call(value) if processor @values << value if pipeline_count > 1 @redis_callbacks.unshift [processor, pipeline_count - 1, blk] else blk.call(@values) if blk @values = [] end end end
errback(&blk)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 224 def errback(&blk) @error_callback = blk end
Also aliased as: on_error
error(klass, msg)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 229 def error(klass, msg) err = klass.new(msg) err.code = msg if err.respond_to?(:code) @error_callback.call(err) end
incr(key, increment = nil, &blk)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 163 def incr(key, increment = nil, &blk) call_command(increment ? ["incrby",key,increment] : ["incr",key], &blk) end
mapped_mget(*keys) { |result| ... }
click to toggle source
Similar to memcache.rb's get_multi, returns a hash mapping keys to values.
# File lib/em-redis/redis_protocol.rb, line 183 def mapped_mget(*keys) mget(*keys) do |response| result = {} response.each do |value| key = keys.shift result.merge!(key => value) unless value.nil? end yield result if block_given? end end
method_missing(*argv, &blk)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 243 def method_missing(*argv, &blk) call_command(argv, &blk) end
process_cmd(line)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 414 def process_cmd(line) @logger.debug { "*** processing #{line}" } if @logger # first character of buffer will always be the response type reply_type = line[0, 1] reply_args = line.slice(1..-3) # remove type character and \r\n case reply_type # e.g. -ERR when MINUS # server ERROR dispatch_error(reply_args) # e.g. +OK when PLUS dispatch_response(reply_args) # e.g. $3\r\nabc\r\n # 'bulk' is more complex because it could be part of multi-bulk when DOLLAR data_len = Integer(reply_args) if data_len == -1 # expect no data; return nil dispatch_response(nil) elsif @buffer.size >= data_len + 2 # buffer is full of expected data dispatch_response(@buffer.slice!(0, data_len)) @buffer.slice!(0,2) # tossing \r\n else # buffer isn't full or nil raise ParserError end # e.g. :8 when COLON dispatch_response(Integer(reply_args)) # e.g. *2\r\n$1\r\na\r\n$1\r\nb\r\n when ASTERISK multibulk_count = Integer(reply_args) if multibulk_count == -1 || multibulk_count == 0 dispatch_response([]) else start_multibulk(multibulk_count) end # WAT? else error ProtocolError, "reply type not recognized: #{line.strip}" end end
quit(&blk)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 220 def quit(&blk) call_command(['quit'], &blk) end
raw_call_command(argv, &blk)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 251 def raw_call_command(argv, &blk) argv[0] = argv[0].to_s unless argv[0].kind_of? String argv[0] = argv[0].downcase send_command(argv) @redis_callbacks << [REPLY_PROCESSOR[argv[0]], blk] end
raw_call_commands(argvs, &blk)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 262 def raw_call_commands(argvs, &blk) if argvs.empty? # Shortcut blk.call [] return end argvs.each do |argv| argv[0] = argv[0].to_s unless argv[0].kind_of? String send_command argv end # FIXME: argvs may contain heterogenous commands, storing all # REPLY_PROCESSORs may turn out expensive and has been omitted # for now. @redis_callbacks << [nil, argvs.length, blk] end
receive_data(data)
click to toggle source
19Feb09 Switched to a custom parser, LineText2 is recursive and can cause
stack overflows when there is too much data.
include EM::P::LineText2
# File lib/em-redis/redis_protocol.rb, line 401 def receive_data(data) (@buffer ||= '') << data while index = @buffer.index(DELIM) begin line = @buffer.slice!(0, index+2) process_cmd line rescue ParserError @buffer[0...0] = line break end end end
reconnect!()
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 510 def reconnect! @reconnect_callbacks[:before].call unless @reconnecting @reconnecting = true EM.add_timer(1) do @logger.debug { "Reconnecting to #{@host}:#{@port}" } if @logger reconnect(@host, @port) end end
select(db, &blk)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 171 def select(db, &blk) @db = db.to_i call_command(['select', @db], &blk) end
send_command(argv)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 278 def send_command(argv) argv = argv.dup error DisabledCommand, "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]] argv[0] = ALIASES[argv[0]] if ALIASES[argv[0]] if argv[-1].is_a?(Hash) argv[-1] = argv[-1].to_a argv.flatten! end command = ["*#{argv.size}"] argv.each do |v| v = v.to_s command << "$#{get_size(v)}" command << v end command = command.map {|cmd| cmd + DELIM}.join @logger.debug { "*** sending: #{command}" } if @logger send_data command end
set(key, value, expiry=nil) { |s| ... }
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 144 def set(key, value, expiry=nil) call_command(["set", key, value]) do |s| yield s if block_given? end expire(key, expiry) if expiry end
sort(key, options={}, &blk)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 151 def sort(key, options={}, &blk) cmd = ["sort", key] cmd << ["by", options[:by]] if options[:by] Array(options[:get]).each do |v| cmd << ["get", v] end cmd << options[:order].split(" ") if options[:order] cmd << ["limit", options[:limit]] if options[:limit] cmd << ["store", options[:store]] if options[:store] call_command(cmd.flatten, &blk) end
start_multibulk(multibulk_count)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 501 def start_multibulk(multibulk_count) @multibulk_n = multibulk_count @multibulk_values = [] end
subscribe(channel, proc=nil, &blk)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 194 def subscribe(channel, proc=nil, &blk) @subscribe_callbacks ||= Hash.new([]) @subscribe_callbacks[channel] += [(proc || blk)] call_command(['subscribe', channel], &blk) end
type(key, &blk)
click to toggle source
Ruby defines a now deprecated type method so we need to override it here since it will never hit #method_missing
# File lib/em-redis/redis_protocol.rb, line 216 def type(key, &blk) call_command(['type', key], &blk) end
unbind()
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 524 def unbind @logger.debug { "Disconnected" } if @logger if @closing @reconnecting = false elsif ((@connected || @reconnecting) && @auto_reconnect) || @reconnect_on_error reconnect! elsif @connected error ConnectionError, 'connection closed' else error ConnectionError, 'unable to connect to redis server' end @connected = false @deferred_status = nil end
unsubscribe(channel=nil, &blk)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 200 def unsubscribe(channel=nil, &blk) @subscribe_callbacks ||= Hash.new([]) argv = ["unsubscribe"] if channel @subscribe_callbacks[channel] = [blk] argv << channel else @subscribe_callbacks.each_key do |key| @subscribe_callbacks[key] = [blk] end end callback { send_command(argv) } end
Private Instance Methods
auth_and_select_db()
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 377 def auth_and_select_db # auth and select go to the front of the line callbacks = @callbacks || [] @callbacks = [] call_command(["auth", @password]) if @password call_command(["select", @db]) unless @db == 0 callbacks.each { |block| callback &block } end
get_size(string)
click to toggle source
# File lib/em-redis/redis_protocol.rb, line 540 def get_size(string) string.respond_to?(:bytesize) ? string.bytesize : string.size end