# File lib/fluent/plugin/in_tail.rb, line 118 def initialize(path, rotate_wait, pe, &receive_lines) @path = path @rotate_wait = rotate_wait @pe = pe || MemoryPositionEntry.new @receive_lines = receive_lines @rotate_queue = [] @timer_trigger = TimerWatcher.new(1, true, &method(:on_notify)) @stat_trigger = StatWatcher.new(path, &method(:on_notify)) @rotate_handler = RotateHandler.new(path, &method(:on_rotate)) @io_handler = nil end
# File lib/fluent/plugin/in_tail.rb, line 133 def attach(loop) @timer_trigger.attach(loop) @stat_trigger.attach(loop) on_notify end
# File lib/fluent/plugin/in_tail.rb, line 144 def close @rotate_queue.reject! {|req| req.io.close true } detach end
# File lib/fluent/plugin/in_tail.rb, line 139 def detach @timer_trigger.detach if @timer_trigger.attached? @stat_trigger.detach if @stat_trigger.attached? end
# File lib/fluent/plugin/in_tail.rb, line 152 def on_notify @rotate_handler.on_notify return unless @io_handler @io_handler.on_notify # proceeds rotate queue return if @rotate_queue.empty? @rotate_queue.first.tick while @rotate_queue.first.ready? if io = @rotate_queue.first.io stat = io.stat inode = stat.ino if inode == @pe.read_inode # rotated file has the same inode number with the last file. # assuming following situation: # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case, seek to the saved position pos = @pe.read_pos else pos = io.pos end @pe.update(inode, pos) io_handler = IOHandler.new(io, @pe, &@receive_lines) else io_handler = NullIOHandler.new end @io_handler.close @io_handler = io_handler @rotate_queue.shift break if @rotate_queue.empty? end end
# File lib/fluent/plugin/in_tail.rb, line 187 def on_rotate(io) if @io_handler == nil if io # first time stat = io.stat fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if inode == last_inode # seek to the saved position pos = @pe.read_pos elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. pos = 0 @pe.update(inode, pos) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. pos = fsize @pe.update(inode, pos) end io.seek(pos) @io_handler = IOHandler.new(io, @pe, &@receive_lines) else @io_handler = NullIOHandler.new end else if io && @rotate_queue.find {|req| req.io == io } return end last_io = @rotate_queue.empty? ? @io_handler.io : @rotate_queue.last.io if last_io == nil $log.info "detected rotation of #{@path}" # rotate imeediately if previous file is nil wait = 0 else $log.info "detected rotation of #{@path}; waiting #{@rotate_wait} seconds" wait = @rotate_wait wait -= @rotate_queue.first.wait unless @rotate_queue.empty? end @rotate_queue << RotationRequest.new(io, wait) end end
Generated with the Darkfish Rdoc Generator 2.