Class/Module Index [+]

Quicksearch

Fluent::FileBuffer

Constants

PATH_MATCH

Public Class Methods

new() click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 81
def initialize
  require 'uri'
  super
end

Public Instance Methods

before_shutdown(out) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 171
def before_shutdown(out)
  if @flush_at_shutdown
    synchronize do
      @map.each_key {|key|
        push(key)
      }
      while pop(out)
      end
    end
  end
end
configure(conf) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 88
def configure(conf)
  super

  if pos = @buffer_path.index('*')
    @buffer_path_prefix = @buffer_path[0,pos]
    @buffer_path_suffix = @buffer_path[pos+1..-1]
  else
    @buffer_path_prefix = @buffer_path+"."
    @buffer_path_suffix = ".log"
  end

  if flush_at_shutdown = conf['flush_at_shutdown']
    @flush_at_shutdown = true
  else
    @flush_at_shutdown = false
  end
end
enqueue(chunk) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 159
def enqueue(chunk)
  path = chunk.path
  mp = path[@buffer_path_prefix.length..-(@buffer_path_suffix.length+1)]

  m = PATH_MATCH.match(mp)
  encoded_key = m ? m[1] : ""
  tsuffix = m[3]
  npath = "#{@buffer_path_prefix}#{encoded_key}.q#{tsuffix}#{@buffer_path_suffix}"

  chunk.mv(npath)
end
new_chunk(key) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 113
def new_chunk(key)
  encoded_key = encode_key(key)
  path, tsuffix = make_path(encoded_key, "b")
  unique_id = tsuffix_to_unique_id(tsuffix)
  FileBufferChunk.new(key, path, unique_id)
end
resume() click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 120
def resume
  maps = []
  queues = []

  Dir.glob("#{@buffer_path_prefix}*#{@buffer_path_suffix}") {|path|
    match = path[@buffer_path_prefix.length..-(@buffer_path_suffix.length+1)]
    if m = PATH_MATCH.match(match)
      key = decode_key(m[1])
      bq = m[2]
      tsuffix = m[3]
      timestamp = m[3].to_i(16)
      unique_id = tsuffix_to_unique_id(tsuffix)

      if bq == 'b'
        chunk = FileBufferChunk.new(key, path, unique_id, "a+")
        maps << [timestamp, chunk]
      elsif bq == 'q'
        chunk = FileBufferChunk.new(key, path, unique_id, "r")
        queues << [timestamp, chunk]
      end
    end
  }

  map = {}
  maps.sort_by {|(timestamp,chunk)|
    timestamp
  }.each {|(timestamp,chunk)|
    map[chunk.key] = chunk
  }

  queue = queues.sort_by {|(timestamp,chunk)|
    timestamp
  }.map {|(timestamp,chunk)|
    chunk
  }

  return queue, map
end
start() click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 106
def start
  FileUtils.mkdir_p File.dirname(@buffer_path_prefix+"path")
  super
end

Protected Instance Methods

decode_key(encoded_key) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 188
def decode_key(encoded_key)
  URI.unescape(encoded_key)
end
encode_key(key) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 184
def encode_key(key)
  URI.escape(key, /[^-_.a-zA-Z0-9]/)
end
make_path(encoded_key, bq) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 192
def make_path(encoded_key, bq)
  now = Time.now.utc
  timestamp = ((now.to_i*1000*1000+now.usec) << 12 | rand(0xfff))
  tsuffix = timestamp.to_s(16)
  path = "#{@buffer_path_prefix}#{encoded_key}.#{bq}#{tsuffix}#{@buffer_path_suffix}"
  return path, tsuffix
end
tsuffix_to_unique_id(tsuffix) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 200
def tsuffix_to_unique_id(tsuffix)
  tsuffix.scan(/../).map {|x| x.to_i(16) }.pack('C*') * 2
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.