class Fluent::NewTailInput

Attributes

paths[R]

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Input.new
# File lib/fluent/plugin/in_tail.rb, line 21
def initialize
  super
  @paths = []
  @tails = {}
end

Public Instance Methods

close_watcher(tw, close_io = true) click to toggle source

Fluent::NewTailInput::TailWatcher#close is called by another thread at shutdown phase. It causes 'can't modify string; temporarily locked' error in IOHandler so adding close_io argument to avoid this problem. At shutdown, IOHandler's io will be released automatically after detached the event loop

# File lib/fluent/plugin/in_tail.rb, line 183
def close_watcher(tw, close_io = true)
  tw.close(close_io)
  flush_buffer(tw)
  if tw.unwatched && @pf
    @pf[tw.path].update_pos(PositionFile::UNWATCHED_POSITION)
  end
end
close_watcher_after_rotate_wait(tw) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 191
def close_watcher_after_rotate_wait(tw)
  closer = TailWatcher::Closer.new(@rotate_wait, tw, log, &method(:close_watcher))
  closer.attach(@loop)
end
configure(conf) click to toggle source
Calls superclass method Fluent::Input#configure
# File lib/fluent/plugin/in_tail.rb, line 38
def configure(conf)
  super

  @paths = @path.split(',').map {|path| path.strip }
  if @paths.empty?
    raise ConfigError, "tail: 'path' parameter is required on tail input"
  end

  unless @pos_file
    $log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
    $log.warn "this parameter is highly recommended to save the position to resume tailing."
  end

  configure_parser(conf)
  configure_tag

  @multiline_mode = conf['format'] =~ /multiline/
  @receive_handler = if @multiline_mode
                       method(:parse_multilines)
                     else
                       method(:parse_singleline)
                     end
end
configure_parser(conf) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 62
def configure_parser(conf)
  @parser = Plugin.new_parser(conf['format'])
  @parser.configure(conf)
end
configure_tag() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 67
def configure_tag
  if @tag.index('*')
    @tag_prefix, @tag_suffix = @tag.split('*')
    @tag_suffix ||= ''
  else
    @tag_prefix = nil
    @tag_suffix = nil
  end
end
convert_line_to_event(line, es) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 237
def convert_line_to_event(line, es)
  begin
    line.chomp!  # remove \n
    @parser.parse(line) { |time, record|
      if time && record
        es.add(time, record)
      else
        log.warn "pattern not match: #{line.inspect}"
      end
    }
  rescue => e
    log.warn line.dump, :error => e.to_s
    log.debug_backtrace(e.backtrace)
  end
end
expand_paths() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 101
def expand_paths
  date = Time.now
  paths = []

  excluded = @exclude_path.map { |path| path = date.strftime(path); path.include?('*') ? Dir.glob(path) : path }.flatten.uniq
  @paths.each { |path|
    path = date.strftime(path)
    if path.include?('*')
      paths += Dir.glob(path)
    else
      # When file is not created yet, Dir.glob returns an empty array. So just add when path is static.
      paths << path
    end
  }
  paths - excluded
end
flush_buffer(tw) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 196
def flush_buffer(tw)
  if lb = tw.line_buffer
    lb.chomp!
    @parser.parse(lb) { |time, record|
      if time && record
        tag = if @tag_prefix || @tag_suffix
                @tag_prefix + tw.tag + @tag_suffix
              else
                @tag
              end
        router.emit(tag, time, record)
      else
        log.warn "got incomplete line at shutdown from #{tw.path}: #{lb.inspect}"
      end
    }
  end
end
parse_multilines(lines, tail_watcher) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 261
def parse_multilines(lines, tail_watcher)
  lb = tail_watcher.line_buffer
  es = MultiEventStream.new
  if @parser.has_firstline?
    lines.each { |line|
      if @parser.firstline?(line)
        if lb
          convert_line_to_event(lb, es)
        end
        lb = line
      else
        if lb.nil?
          log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}"
        else
          lb << line
        end
      end
    }
  else
    lb ||= ''
    lines.each do |line|
      lb << line
      @parser.parse(lb) { |time, record|
        if time && record
          convert_line_to_event(lb, es)
          lb = ''
        end
      }
    end
  end
  tail_watcher.line_buffer = lb
  es
end
parse_singleline(lines, tail_watcher) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 253
def parse_singleline(lines, tail_watcher)
  es = MultiEventStream.new
  lines.each { |line|
    convert_line_to_event(line, es)
  }
  es
end
receive_lines(lines, tail_watcher) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 221
def receive_lines(lines, tail_watcher)
  es = @receive_handler.call(lines, tail_watcher)
  unless es.empty?
    tag = if @tag_prefix || @tag_suffix
            @tag_prefix + tail_watcher.tag + @tag_suffix
          else
            @tag
          end
    begin
      router.emit_stream(tag, es)
    rescue
      # ignore errors. Engine shows logs and backtraces.
    end
  end
end
refresh_watchers() click to toggle source

in_tail with '*' path doesn't check rotation file equality at refresh phase. So you should not use '*' path when your logs will be rotated by another tool. It will cause log duplication after updated watch files. In such case, you should separate log directory and specify two paths in path parameter. e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file

# File lib/fluent/plugin/in_tail.rb, line 123
def refresh_watchers
  target_paths = expand_paths
  existence_paths = @tails.keys

  unwatched = existence_paths - target_paths
  added = target_paths - existence_paths

  stop_watchers(unwatched, false, true) unless unwatched.empty?
  start_watchers(added) unless added.empty?
end
run() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 214
def run
  @loop.run
rescue
  log.error "unexpected error", :error=>$!.to_s
  log.error_backtrace
end
setup_watcher(path, pe) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 134
def setup_watcher(path, pe)
  tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @read_lines_limit, method(:update_watcher), &method(:receive_lines))
  tw.attach(@loop)
  tw
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 92
def shutdown
  @refresh_trigger.detach if @refresh_trigger && @refresh_trigger.attached?

  stop_watchers(@tails.keys, true)
  @loop.stop rescue nil # when all watchers are detached, `stop` raises RuntimeError. We can ignore this exception.
  @thread.join
  @pf_file.close if @pf_file
end
start() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 77
def start
  if @pos_file
    @pf_file = File.open(@pos_file, File::RDWR|File::CREAT, DEFAULT_FILE_PERMISSION)
    @pf_file.sync = true
    @pf = PositionFile.parse(@pf_file)
  end

  @loop = Coolio::Loop.new
  refresh_watchers

  @refresh_trigger = TailWatcher::TimerWatcher.new(@refresh_interval, true, log, &method(:refresh_watchers))
  @refresh_trigger.attach(@loop)
  @thread = Thread.new(&method(:run))
end
start_watchers(paths) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 140
def start_watchers(paths)
  paths.each { |path|
    pe = nil
    if @pf
      pe = @pf[path]
      if @read_from_head && pe.read_inode.zero?
        begin
          pe.update(File::Stat.new(path).ino, 0)
        rescue Errno::ENOENT
          $log.warn "#{path} not found. Continuing without tailing it."
        end
      end
    end

    @tails[path] = setup_watcher(path, pe)
  }
end
stop_watchers(paths, immediate = false, unwatched = false) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 158
def stop_watchers(paths, immediate = false, unwatched = false)
  paths.each { |path|
    tw = @tails.delete(path)
    if tw
      tw.unwatched = unwatched
      if immediate
        close_watcher(tw, false)
      else
        close_watcher_after_rotate_wait(tw)
      end
    end
  }
end
update_watcher(path, pe) click to toggle source

#refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.

# File lib/fluent/plugin/in_tail.rb, line 173
def update_watcher(path, pe)
  rotated_tw = @tails[path]
  @tails[path] = setup_watcher(path, pe)
  close_watcher_after_rotate_wait(rotated_tw) if rotated_tw
end