class Backup::CloudIO::CloudFiles
Constants
- MAX_FILE_SIZE
- MAX_SLO_SIZE
- SEGMENT_BUFFER
Attributes
Public Class Methods
# File lib/backup/cloud_io/cloud_files.rb, line 18 def initialize(options = {}) super @username = options[:username] @api_key = options[:api_key] @auth_url = options[:auth_url] @region = options[:region] @servicenet = options[:servicenet] @container = options[:container] @segments_container = options[:segments_container] @segment_size = options[:segment_size] @days_to_keep = options[:days_to_keep] @fog_options = options[:fog_options] end
Public Instance Methods
Delete non-SLO object(s) from the container.
-
Called by the Storage (with objects) and the Syncer (with names)
-
Deletes 10,000 objects per request.
-
Missing objects will be ignored.
# File lib/backup/cloud_io/cloud_files.rb, line 103 def delete(objects_or_names) names = Array(objects_or_names).dup names.map!(&:name) if names.first.is_a?(Object) until names.empty? _names = names.slice!(0, 10000) with_retries('DELETE Multiple Objects') do resp = connection.delete_multiple_objects(container, _names) resp_status = resp.body['Response Status'] raise Error, " #{ resp_status } The server returned the following: #{ resp.body.inspect } " unless resp_status == '200 OK' end end end
Delete an SLO object(s) from the container.
-
Removes the SLO manifest object and all associated segments.
-
Missing segments will be ignored.
# File lib/backup/cloud_io/cloud_files.rb, line 126 def delete_slo(objects) Array(objects).each do |object| with_retries("DELETE SLO Manifest '#{ container }/#{ object.name }'") do resp = connection.delete_static_large_object(container, object.name) resp_status = resp.body['Response Status'] raise Error, " #{ resp_status } The server returned the following: #{ resp.body.inspect } " unless resp_status == '200 OK' end end end
Used by Object to fetch metadata if needed.
# File lib/backup/cloud_io/cloud_files.rb, line 90 def head_object(object) resp = nil with_retries("HEAD '#{ container }/#{ object.name }'") do resp = connection.head_object(container, object.name) end resp end
Returns all objects in the container with the given prefix.
-
get_container returns a max of 10000 objects per request.
-
Returns objects sorted using a sqlite binary collating function.
-
If marker is given, only objects after the marker are in the response.
# File lib/backup/cloud_io/cloud_files.rb, line 68 def objects(prefix) objects = [] resp = nil prefix = prefix.chomp('/') opts = { :prefix => prefix + '/' } create_containers while resp.nil? || resp.body.count == 10000 opts.merge!(:marker => objects.last.name) unless objects.empty? with_retries("GET '#{ container }/#{ prefix }/*'") do resp = connection.get_container(container, opts) end resp.body.each do |obj_data| objects << Object.new(self, obj_data) end end objects end
The Syncer may call this method in multiple threads, but objects is always called before this occurs.
# File lib/backup/cloud_io/cloud_files.rb, line 35 def upload(src, dest) create_containers file_size = File.size(src) segment_bytes = segment_size * 1024**2 if segment_bytes > 0 && file_size > segment_bytes raise FileSizeError, " File Too Large File: #{ src } Size: #{ file_size } Max SLO Size is #{ MAX_SLO_SIZE } (5 GiB * 1000 segments) " if file_size > MAX_SLO_SIZE segment_bytes = adjusted_segment_bytes(segment_bytes, file_size) segments = upload_segments(src, dest, segment_bytes, file_size) upload_manifest(dest, segments) else raise FileSizeError, " File Too Large File: #{ src } Size: #{ file_size } Max File Size is #{ MAX_FILE_SIZE } (5 GiB) " if file_size > MAX_FILE_SIZE put_object(src, dest) end end
Private Instance Methods
# File lib/backup/cloud_io/cloud_files.rb, line 256 def adjusted_segment_bytes(segment_bytes, file_size) return segment_bytes if file_size / segment_bytes.to_f <= 1000 mb = orig_mb = segment_bytes / 1024**2 mb += 1 until file_size / (1024**2 * mb).to_f <= 1000 Logger.warn Error.new(" Segment Size Adjusted Your original #segment_size of #{ orig_mb } MiB has been adjusted to #{ mb } MiB in order to satisfy the limit of 1000 segments. To enforce your chosen #segment_size, you should use the Splitter. e.g. split_into_chunks_of #{ mb * 1000 } (#segment_size * 1000) ") 1024**2 * mb end
# File lib/backup/cloud_io/cloud_files.rb, line 142 def connection @connection ||= Fog::Storage.new({ :provider => 'Rackspace', :rackspace_username => username, :rackspace_api_key => api_key, :rackspace_auth_url => auth_url, :rackspace_region => region, :rackspace_servicenet => servicenet }.merge(fog_options || {})) end
# File lib/backup/cloud_io/cloud_files.rb, line 153 def create_containers return if @containers_created @containers_created = true with_retries('Create Containers') do connection.put_container(container) connection.put_container(segments_container) if segments_container end end
# File lib/backup/cloud_io/cloud_files.rb, line 251 def delete_at return unless days_to_keep @delete_at ||= (Time.now.utc + days_to_keep * 60**2 * 24).to_i end
If :days_to_keep was set, each object will be scheduled for deletion. This includes non-SLO objects, the SLO manifest and all segments.
# File lib/backup/cloud_io/cloud_files.rb, line 245 def headers headers = {} headers.merge!('X-Delete-At' => delete_at) if delete_at headers end
# File lib/backup/cloud_io/cloud_files.rb, line 163 def put_object(src, dest) opts = headers.merge('ETag' => Digest::MD5.file(src).hexdigest) with_retries("PUT '#{ container }/#{ dest }'") do File.open(src, 'r') do |file| connection.put_object(container, dest, file, opts) end end end
# File lib/backup/cloud_io/cloud_files.rb, line 219 def segment_md5(file, segment_bytes) md5 = Digest::MD5.new offset = 0 while offset <= segment_bytes - SEGMENT_BUFFER data = file.read(SEGMENT_BUFFER) break unless data offset += data.size md5 << data end md5.hexdigest end
Each segment's ETag and byte_size will be verified once uploaded. Request will raise an exception if verification fails or segments are not found. However, each segment's ETag was verified when we uploaded the segments, so this should only retry failed requests.
# File lib/backup/cloud_io/cloud_files.rb, line 235 def upload_manifest(dest, segments) Logger.info "\s\sStoring SLO Manifest '#{ container }/#{ dest }'" with_retries("PUT SLO Manifest '#{ container }/#{ dest }'") do connection.put_static_obj_manifest(container, dest, segments, headers) end end
Each segment is uploaded using chunked transfer encoding using SEGMENT_BUFFER, and each segment's MD5 is sent to verify the transfer. Each segment's MD5 and byte_size will also be verified when the SLO manifest object is uploaded.
# File lib/backup/cloud_io/cloud_files.rb, line 176 def upload_segments(src, dest, segment_bytes, file_size) total_segments = (file_size / segment_bytes.to_f).ceil progress = (0.1..0.9).step(0.1).map {|n| (total_segments * n).floor } Logger.info "\s\sUploading #{ total_segments } SLO Segments..." segments = [] File.open(src, 'r') do |file| segment_number = 0 until file.eof? segment_number += 1 object = "#{ dest }/#{ segment_number.to_s.rjust(4, '0') }" pos = file.pos md5 = segment_md5(file, segment_bytes) opts = headers.merge('ETag' => md5) with_retries("PUT '#{ segments_container }/#{ object }'") do file.seek(pos) offset = 0 connection.put_object(segments_container, object, nil, opts) do # block is called to stream data until it returns '' data = '' if offset <= segment_bytes - SEGMENT_BUFFER data = file.read(SEGMENT_BUFFER).to_s # nil => '' offset += data.size end data end end segments << { :path => "#{ segments_container }/#{ object }", :etag => md5, :size_bytes => file.pos - pos } if i = progress.rindex(segment_number) Logger.info "\s\s...#{ i + 1 }0% Complete..." end end end segments end