Parent

Class/Module Index [+]

Quicksearch

Fluent::ExecFilterOutput::ChildProcess

Attributes

finished[RW]

Public Class Methods

new(parser,respawns=0) click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 242
def initialize(parser,respawns=0)
  @pid = nil
  @thread = nil
  @parser = parser
  @respawns = respawns
  @mutex = Mutex.new
  @finished = nil
end

Public Instance Methods

kill_child(join_wait) click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 262
def kill_child(join_wait)
  begin
    Process.kill(:TERM, @pid)
  rescue Errno::ESRCH
    # Errno::ESRCH 'No such process', ignore
    # child process killed by signal chained from fluentd process
  end
  if @thread.join(join_wait)
    # @thread successfully shutdown
    return
  end
  begin
    Process.kill(:KILL, @pid)
  rescue Errno::ESRCH
    # ignore if successfully killed by :TERM
  end
  @thread.join
end
run() click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 320
def run
  @parser.call(@io)
rescue
  $log.error "exec_filter thread unexpectedly failed with an error.", :command=>@command, :error=>$!.to_s
  $log.warn_backtrace $!.backtrace
ensure
  pid, stat = Process.waitpid2(@pid)
  unless @finished
    $log.error "exec_filter process unexpectedly exited.", :command=>@command, :ecode=>stat.to_i
    unless @respawns == 0
      $log.warn "exec_filter child process will respawn for next input data (respawns #{@respawns})."
    end
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 281
def shutdown
  @finished = true
  @mutex.synchronize do
    kill_child(60) # TODO wait time
  end
end
start(command) click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 251
def start(command)
  @command = command
  @mutex.synchronize do
    @io = IO.popen(command, "r+")
    @pid = @io.pid
    @io.sync = true
    @thread = Thread.new(&method(:run))
  end
  @finished = false
end
try_respawn() click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 302
def try_respawn
  return false if @respawns == 0
  @mutex.synchronize do
    return false if @respawns == 0

    kill_child(5) # TODO wait time

    @io = IO.popen(@command, "r+")
    @pid = @io.pid
    @io.sync = true
    @thread = Thread.new(&method(:run))

    @respawns -= 1 if @respawns > 0
  end
  $log.warn "exec_filter child process successfully respawned.", :command => @command, :respawns => @respawns
  true
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 288
def write(chunk)
  begin
    chunk.write_to(@io)
  rescue Errno::EPIPE => e
    # Broken pipe (child process unexpectedly exited)
    $log.warn "exec_filter Broken pipe, child process maybe exited.", :command => @command
    if try_respawn
      retry # retry chunk#write_to with child respawned
    else
      raise e # to retry #write with other ChildProcess instance (when num_children > 1)
    end
  end
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.