Parent

Namespace

Included Modules

Class/Module Index [+]

Quicksearch

Writer

Public Class Methods

new(tag, connector) click to toggle source
# File lib/fluent/command/cat.rb, line 117
def initialize(tag, connector)
  @tag = tag
  @connector = connector
  @socket = false

  @socket_time = Time.now.to_i
  @socket_ttl = 10  # TODO
  @error_history = []

  @pending = []
  @pending_limit = 1024  # TODO
  @retry_wait = 1
  @retry_limit = 5  # TODO

  super()
end

Public Instance Methods

close() click to toggle source
# File lib/fluent/command/cat.rb, line 173
def close
  @socket.close
  @socket = nil
end
on_timer() click to toggle source
# File lib/fluent/command/cat.rb, line 154
def on_timer
  now = Time.now.to_i

  synchronize {
    unless @pending.empty?
      # flush pending records
      if write_impl(@pending)
        # write succeeded
        @pending.clear
      end
    end

    if @socket && @socket_time + @socket_ttl < now
      # socket is not used @socket_ttl seconds
      close
    end
  }
end
shutdown() click to toggle source
# File lib/fluent/command/cat.rb, line 184
def shutdown
  @timer.shutdown
end
start() click to toggle source
# File lib/fluent/command/cat.rb, line 178
def start
  @timer = TimerThread.new(self)
  @timer.start
  self
end
write(record) click to toggle source
# File lib/fluent/command/cat.rb, line 134
def write(record)
  if record.class != Hash
    raise ArgumentError, "Input must be a map (got #{record.class})"
  end

  entry = [Time.now.to_i, record]
  synchronize {
    unless write_impl([entry])
      # write failed
      @pending.push(entry)

      while @pending.size > @pending_limit
        # exceeds pending limit; trash oldest record
        time, record = @pending.shift
        abort_message(time, record)
      end
    end
  }
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.