Class/Module Index [+]

Quicksearch

Fluent::BufferedOutput

Public Class Methods

new() click to toggle source
# File lib/fluent/output.rb, line 152
def initialize
  super
  @next_flush_time = 0
  @last_retry_time = 0
  @next_retry_time = 0
  @error_history = []
  @error_history.extend(MonitorMixin)
  @secondary_limit = 8
  @emit_count = 0
end

Public Instance Methods

before_shutdown() click to toggle source
# File lib/fluent/output.rb, line 364
def before_shutdown
  begin
    @buffer.before_shutdown(self)
  rescue
    $log.warn "before_shutdown failed", :error=>$!.to_s
    $log.warn_backtrace
  end
end
calc_retry_wait() click to toggle source
# File lib/fluent/output.rb, line 373
def calc_retry_wait
  # TODO retry pattern
  wait = if @error_history.size <= @retry_limit
           @retry_wait * (2 ** (@error_history.size-1))
         else
           # secondary retry
           @retry_wait * (2 ** (@error_history.size-2-@retry_limit))
         end
  wait + (rand * (wait / 4.0) - (wait / 8.0))
end
configure(conf) click to toggle source
# File lib/fluent/output.rb, line 170
def configure(conf)
  super

  @buffer = Plugin.new_buffer(@buffer_type)
  @buffer.configure(conf)

  if @buffer.respond_to?(:enable_parallel)
    if @num_threads == 1
      @buffer.enable_parallel(false)
    else
      @buffer.enable_parallel(true)
    end
  end

  @writers = (1..@num_threads).map {
    writer = OutputThread.new(self)
    writer.configure(conf)
    writer
  }

  if sconf = conf.elements.select {|e| e.name == 'secondary' }.first
    type = sconf['type'] || conf['type']
    @secondary = Plugin.new_output(type)
    @secondary.configure(sconf)

    if secondary_limit = conf['secondary_limit']
      @secondary_limit = secondary_limit.to_i
      if @secondary_limit < 0
        raise ConfigError, "invalid parameter 'secondary_limit #{secondary_limit}'"
      end
    end

    @secondary.secondary_init(self)
  end

  Status.register(self, "queue_size") { @buffer.queue_size }
  Status.register(self, "emit_count") { @emit_count }
end
emit(tag, es, chain, key="") click to toggle source
# File lib/fluent/output.rb, line 222
def emit(tag, es, chain, key="")
  @emit_count += 1
  data = format_stream(tag, es)
  if @buffer.emit(key, data, chain)
    submit_flush
  end
end
enqueue_buffer() click to toggle source

def write(chunk) end

# File lib/fluent/output.rb, line 249
def enqueue_buffer
  @buffer.keys.each {|key|
    @buffer.push(key)
  }
end
flush_secondary(secondary) click to toggle source
# File lib/fluent/output.rb, line 394
def flush_secondary(secondary)
  @buffer.pop(secondary)
end
force_flush() click to toggle source
# File lib/fluent/output.rb, line 359
def force_flush
  enqueue_buffer
  submit_flush
end
format_stream(tag, es) click to toggle source
# File lib/fluent/output.rb, line 235
def format_stream(tag, es)
  out = ''
  es.each {|time,record|
    out << format(tag, time, record)
  }
  out
end
shutdown() click to toggle source
# File lib/fluent/output.rb, line 216
def shutdown
  @writers.each {|writer| writer.shutdown }
  @secondary.shutdown if @secondary
  @buffer.shutdown
end
start() click to toggle source
# File lib/fluent/output.rb, line 209
def start
  @next_flush_time = Engine.now + @flush_interval
  @buffer.start
  @secondary.start if @secondary
  @writers.each {|writer| writer.start }
end
submit_flush() click to toggle source
# File lib/fluent/output.rb, line 230
def submit_flush
  # TODO roundrobin?
  @writers.first.submit_flush
end
try_flush() click to toggle source
# File lib/fluent/output.rb, line 255
def try_flush
  time = Engine.now

  empty = @buffer.queue_size == 0
  if empty && @next_flush_time < (now = Engine.now)
    @buffer.synchronize do
      if @next_flush_time < now
        enqueue_buffer
        @next_flush_time = now + @flush_interval
        empty = @buffer.queue_size == 0
      end
    end
  end
  if empty
    return time + 1  # TODO 1
  end

  begin
    retrying = !@error_history.empty?

    if retrying
      @error_history.synchronize do
        if retrying = !@error_history.empty?  # re-check in synchronize
          if @next_retry_time >= time
            # allow retrying for only one thread
            return time + 1  # TODO 1
          end
          # assume next retry failes and
          # clear them if when it succeeds
          @last_retry_time = time
          @error_history << time
          @next_retry_time += calc_retry_wait
        end
      end
    end

    if @secondary && @error_history.size > @retry_limit
      has_next = flush_secondary(@secondary)
    else
      has_next = @buffer.pop(self)
    end

    # success
    if retrying
      @error_history.clear
      # Note: don't notify to other threads to prevent
      #       burst to recovered server
      $log.warn "retry succeeded.", :instance=>object_id
    end

    if has_next
      return Engine.now + @queued_chunk_flush_interval
    else
      return time + 1  # TODO 1
    end

  rescue => e
    if retrying
      error_count = @error_history.size
    else
      # first error
      error_count = 0
      @error_history.synchronize do
        if @error_history.empty?
          @last_retry_time = time
          @error_history << time
          @next_retry_time = time + calc_retry_wait
        end
      end
    end

    if error_count < @retry_limit
      $log.warn "temporarily failed to flush the buffer.", :next_retry=>Time.at(@next_retry_time), :error_class=>e.class.to_s, :error=>e.to_s, :instance=>object_id
      $log.warn_backtrace e.backtrace

    elsif @secondary
      if error_count == @retry_limit
        $log.warn "failed to flush the buffer.", :error_class=>e.class.to_s, :error=>e.to_s, :instance=>object_id
        $log.warn "retry count exceededs limit. falling back to secondary output."
        $log.warn_backtrace e.backtrace
        retry  # retry immediately
      elsif error_count <= @retry_limit + @secondary_limit
        $log.warn "failed to flush the buffer, next retry will be with secondary output.", :next_retry=>Time.at(@next_retry_time), :error_class=>e.class.to_s, :error=>e.to_s, :instance=>object_id
        $log.warn_backtrace e.backtrace
      else
        $log.warn "failed to flush the buffer.", :error_class=>e.class, :error=>e.to_s, :instance=>object_id
        $log.warn "secondary retry count exceededs limit."
        $log.warn_backtrace e.backtrace
        write_abort
        @error_history.clear
      end

    else
      $log.warn "failed to flush the buffer.", :error_class=>e.class.to_s, :error=>e.to_s, :instance=>object_id
      $log.warn "retry count exceededs limit."
      $log.warn_backtrace e.backtrace
      write_abort
      @error_history.clear
    end

    return @next_retry_time
  end
end
write_abort() click to toggle source
# File lib/fluent/output.rb, line 384
def write_abort
  $log.error "throwing away old logs."
  begin
    @buffer.clear!
  rescue
    $log.error "unexpected error while aborting", :error=>$!.to_s
    $log.error_backtrace
  end
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.