class MogileFS::HTTPFile
HTTPFile wraps up the new file operations for storing files onto an HTTP storage node.
You really don't want to create an HTTPFile by hand. Instead you want to create a new file using MogileFS::MogileFS#new_file.
Attributes
big_io[RW]
The #big_io name in case we have file > 256M
streaming_io[RW]
Public Class Methods
new(dests, opts = nil)
click to toggle source
Creates a new HTTPFile with MogileFS-specific data. Use MogileFS::MogileFS#new_file instead of this method.
Calls superclass method
# File lib/mogilefs/http_file.rb, line 26 def initialize(dests, opts = nil) super "" @md5 = @streaming_io = @big_io = @active = nil @dests = dests @opts = Integer === opts ? { :content_length => opts } : opts end
Public Instance Methods
close()
click to toggle source
Calls superclass method
# File lib/mogilefs/http_file.rb, line 181 def close commit super end
commit()
click to toggle source
# File lib/mogilefs/http_file.rb, line 164 def commit errors = nil @dests.each do |devid, path| begin uri = URI.parse(path) bytes_uploaded = size > 0 ? nhp_put(devid, uri) : upload(devid, uri) return create_close(devid, uri, bytes_uploaded) rescue Timeout::Error, SystemCallError, RetryableError => e errors ||= [] errors << "#{path} - #{e.message} (#{e.class})" end end raise NoStorageNodesError, "all paths failed with PUT: #{errors.join(', ')}", [] end
nhp_put(devid, uri)
click to toggle source
# File lib/mogilefs/http_file.rb, line 137 def nhp_put(devid, uri) clen = @opts[:content_length] if clen && clen != size raise MogileFS::SizeMismatchError, ":content_length expected: #{clen.inspect}, actual: #{size}" end put = Net::HTTP::Put.new(uri.path) put["Content-Type"] = "application/octet-stream" if md5 = @opts[:content_md5] if md5.respond_to?(:call) md5 = md5.call.strip elsif md5 == :trailer md5 = [ Digest::MD5.digest(string) ].pack("m").chomp! end put["Content-MD5"] = md5 end put.body = string res = @opts[:nhp_put].request(uri, put) return size if Net::HTTPSuccess === res raise BadResponseError, "#{res.code} #{res.message}" rescue => e /\ANet::/ =~ "#{e.class}" and raise RetryableError, "#{e.message} (#{e.class})", e.backtrace raise end
put_streaming_io(sock, uri)
click to toggle source
# File lib/mogilefs/http_file.rb, line 72 def put_streaming_io(sock, uri) # unlikely to be used file_size = @streaming_io.length written = 0 request_put(sock, uri, file_size) do |wr| @streaming_io.call(Proc.new do |data_to_write| written += wr.write(data_to_write) end) end file_size ? file_size : written end
request_put(sock, uri, file_size, input = nil) { |sock| ... }
click to toggle source
# File lib/mogilefs/http_file.rb, line 33 def request_put(sock, uri, file_size, input = nil) host_with_port = "#{uri.host}:#{uri.port}" clen = @opts[:content_length] file_size ||= clen content_md5 = @opts[:content_md5] if String === content_md5 file_size or raise ArgumentError, ":content_length must be specified with :content_md5 String" file_size = "#{file_size}\r\nContent-MD5: #{content_md5}" elsif content_md5.respond_to?(:call) || :trailer == content_md5 || MD5_TRAILER_NODES[host_with_port] file_size = nil @md5 = Digest::MD5.new end if file_size sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" "Content-Length: #{file_size}\r\n\r\n") rv = input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock) else trailers = @md5 ? "Trailer: Content-MD5\r\n" : "" sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" "Host: #{host_with_port}\r\n#{trailers}" "Transfer-Encoding: chunked\r\n\r\n") tmp = MogileFS::Chunker.new(sock, @md5, content_md5) rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp) tmp.flush end if clen && clen != rv raise MogileFS::SizeMismatchError, ":content_length expected: #{clen.inspect}, actual: #{rv.inspect}" end rv end
rewind_or_raise!(uri, err)
click to toggle source
# File lib/mogilefs/http_file.rb, line 83 def rewind_or_raise!(uri, err) @active.rewind if @active rescue => e msg = "#{uri} failed with #{err.message} (#{err.class}) and " "retrying is impossible as rewind on " "#{@active.inspect} failed with: #{e.message} (#{e.class})" raise NonRetryableError, msg, e.backtrace end