Parent

Class/Module Index [+]

Quicksearch

Fluent::TailInput::TailWatcher

Constants

MAX_LINES_AT_ONCE

Public Class Methods

new(path, rotate_wait, pe, &receive_lines) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 118
def initialize(path, rotate_wait, pe, &receive_lines)
  @path = path
  @rotate_wait = rotate_wait
  @pe = pe || MemoryPositionEntry.new
  @receive_lines = receive_lines

  @rotate_queue = []

  @timer_trigger = TimerWatcher.new(1, true, &method(:on_notify))
  @stat_trigger = StatWatcher.new(path, &method(:on_notify))

  @rotate_handler = RotateHandler.new(path, &method(:on_rotate))
  @io_handler = nil
end

Public Instance Methods

attach(loop) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 133
def attach(loop)
  @timer_trigger.attach(loop)
  @stat_trigger.attach(loop)
  on_notify
end
close() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 144
def close
  @rotate_queue.reject! {|req|
    req.io.close
    true
  }
  detach
end
detach() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 139
def detach
  @timer_trigger.detach if @timer_trigger.attached?
  @stat_trigger.detach if @stat_trigger.attached?
end
on_notify() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 152
def on_notify
  @rotate_handler.on_notify
  return unless @io_handler
  @io_handler.on_notify

  # proceeds rotate queue
  return if @rotate_queue.empty?
  @rotate_queue.first.tick

  while @rotate_queue.first.ready?
    if io = @rotate_queue.first.io
      stat = io.stat
      inode = stat.ino
      if inode == @pe.read_inode
        # rotated file has the same inode number with the last file.
        # assuming following situation:
        #   a) file was once renamed and backed, or
        #   b) symlink or hardlink to the same file is recreated
        # in either case, seek to the saved position
        pos = @pe.read_pos
      else
        pos = io.pos
      end
      @pe.update(inode, pos)
      io_handler = IOHandler.new(io, @pe, &@receive_lines)
    else
      io_handler = NullIOHandler.new
    end
    @io_handler.close
    @io_handler = io_handler
    @rotate_queue.shift
    break if @rotate_queue.empty?
  end
end
on_rotate(io) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 187
def on_rotate(io)
  if @io_handler == nil
    if io
      # first time
      stat = io.stat
      fsize = stat.size
      inode = stat.ino

      last_inode = @pe.read_inode
      if inode == last_inode
        # seek to the saved position
        pos = @pe.read_pos
      elsif last_inode != 0
        # this is FilePositionEntry and fluentd once started.
        # read data from the head of the rotated file.
        # logs never duplicate because this file is a rotated new file.
        pos = 0
        @pe.update(inode, pos)
      else
        # this is MemoryPositionEntry or this is the first time fluentd started.
        # seek to the end of the any files.
        # logs may duplicate without this seek because it's not sure the file is
        # existent file or rotated new file.
        pos = fsize
        @pe.update(inode, pos)
      end
      io.seek(pos)

      @io_handler = IOHandler.new(io, @pe, &@receive_lines)
    else
      @io_handler = NullIOHandler.new
    end

  else
    if io && @rotate_queue.find {|req| req.io == io }
      return
    end
    last_io = @rotate_queue.empty? ? @io_handler.io : @rotate_queue.last.io
    if last_io == nil
      $log.info "detected rotation of #{@path}"
      # rotate imeediately if previous file is nil
      wait = 0
    else
      $log.info "detected rotation of #{@path}; waiting #{@rotate_wait} seconds"
      wait = @rotate_wait
      wait -= @rotate_queue.first.wait unless @rotate_queue.empty?
    end
    @rotate_queue << RotationRequest.new(io, wait)
  end
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.