class MogileFS::Backend
This class communicates with the MogileFS trackers. You should not have to use this directly unless you are developing support for new commands or plugins for MogileFS
Attributes
The last error
The string attached to the last error
Public Class Methods
Adds MogileFS commands names
.
# File lib/mogilefs/backend.rb, line 9 def self.add_command(*names) names.each do |name| define_method name do |*args| do_request(name, args[0] || {}, false) end end end
this converts an error code from a mogilefsd tracker to an exception:
Examples of some exceptions that get created:
class AfterMismatchError < MogileFS::Error; end class DomainNotFoundError < MogileFS::Error; end class InvalidCharsError < MogileFS::Error; end
# File lib/mogilefs/backend.rb, line 35 def self.add_error(err_snake) err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase } err_camel << 'Error' unless /Error\z/ =~ err_camel unless const_defined?(err_camel) const_set(err_camel, Class.new(MogileFS::Error)) end BACKEND_ERRORS[err_snake] = const_get(err_camel) end
adds idempotent MogileFS commands
names
, these commands may be retried transparently on a
different tracker if there is a network/server error.
# File lib/mogilefs/backend.rb, line 19 def self.add_idempotent_command(*names) names.each do |name| define_method name do |*args| do_request(name, args[0] || {}, true) end end end
Creates a new MogileFS::Backend.
:hosts is a required argument and must be an Array containing one or more 'hostname:port' pairs as Strings.
:timeout adjusts the request timeout before an error is returned.
# File lib/mogilefs/backend.rb, line 70 def initialize(args) @hosts = args[:hosts] @fail_timeout = args[:fail_timeout] || 5 raise ArgumentError, "must specify at least one host" unless @hosts raise ArgumentError, "must specify at least one host" if @hosts.empty? unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then raise ArgumentError, ":hosts must be in 'host:port' form" end @mutex = Mutex.new @timeout = args[:timeout] || 3 @socket = nil @lasterr = nil @lasterrstr = nil @pending = [] @dead = {} end
Public Instance Methods
this command is special since the cache is per-tracker, so we connect to all backends and not just one
# File lib/mogilefs/backend.rb, line 310 def clear_cache(types = %w(all)) opts = {} types.each { |type| opts[type] = 1 } sockets = @hosts.map do |host| MogileFS::Socket.start(*(host.split(/:/))) rescue nil end sockets.compact! wpending = sockets rpending = [] request = make_request("clear_cache", opts) while wpending[0] || rpending[0] r = IO.select(rpending, wpending, nil, @timeout) or return rpending -= r[0] wpending -= r[1] r[0].each { |io| io.timed_gets(0) rescue nil } r[1].each do |io| begin io.timed_write(request, 0) rpending << io rescue end end end nil ensure sockets.each { |io| io.close } end
Performs the cmd
request with args
.
# File lib/mogilefs/backend.rb, line 235 def do_request(cmd, args, idempotent = false) no_raise = args.delete(:ruby_no_raise) request = make_request(cmd, args) line = nil failed = false @mutex.synchronize do begin io = dispatch_unlocked(request) line = io.timed_gets(@timeout) break if /\r?\n\z/ =~ line line and raise MogileFS::InvalidResponseError, "Invalid response from server: #{line.inspect}" idempotent or raise EOFError, "end of file reached after: #{request.inspect}" # fall through to retry in loop rescue SystemCallError, MogileFS::InvalidResponseError # truncated response # we got a successful timed_write, but not a timed_gets if idempotent failed = true shutdown_unlocked(false) retry end shutdown_unlocked(true) rescue MogileFS::UnreadableSocketError, MogileFS::Timeout shutdown_unlocked(true) rescue # we DO NOT want the response we timed out waiting for, to crop up later # on, on the same socket, intersperesed with a subsequent request! we # close the socket if there's any error. shutdown_unlocked(true) end while idempotent shutdown_unlocked if failed end # @mutex.synchronize parse_response(line, no_raise ? request : nil) end
this converts an error code from a mogilefsd tracker to an exception Most of these exceptions should already be defined, but since the MogileFS server code is liable to change and we may not always be able to keep up with the changes
# File lib/mogilefs/backend.rb, line 283 def error(err_snake) BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake) end
Makes a new request string for cmd
and args
.
# File lib/mogilefs/backend.rb, line 275 def make_request(cmd, args) "#{cmd} #{url_encode args}\r\n" end
Turns the line
response from the server into a Hash of
options, an error, or raises, as appropriate.
# File lib/mogilefs/backend.rb, line 289 def parse_response(line, request = nil) case line when /\AOK\s+\d*\s*(\S*)\r?\n\z/ url_decode($1) when /\AERR\s+(\w+)\s*([^\r\n]*)/ @lasterr = $1 @lasterrstr = $2 ? url_unescape($2) : nil if request request = " request=#{request.strip}" @lasterrstr = @lasterrstr ? (@lasterrstr << request) : request return error(@lasterr).new(@lasterrstr) end raise error(@lasterr).new(@lasterrstr) else raise MogileFS::InvalidResponseError, "Invalid response from server: #{line.inspect}" end end
Closes this backend's socket.
# File lib/mogilefs/backend.rb, line 92 def shutdown @mutex.synchronize { shutdown_unlocked } end
Returns a socket connected to a MogileFS tracker.
# File lib/mogilefs/backend.rb, line 341 def socket return @socket if @socket and not @socket.closed? @hosts.shuffle.each do |host| next if dead = @dead[host] and dead[0] > (MogileFS.now - @fail_timeout) begin addr, port = host.split(/:/) @socket = MogileFS::Socket.tcp(addr, port, @timeout) @active_host = host rescue SystemCallError, MogileFS::Timeout => err @dead[host] = [ MogileFS.now, err ] next end return @socket end errors = @dead.map { |host,(_,e)| "#{host} - #{e.message} (#{e.class})" } raise MogileFS::UnreachableBackendError, "couldn't connect to any tracker: #{errors.join(', ')}" end