Parent

Class/Module Index [+]

Quicksearch

Fluent::ExecFilterOutput

Constants

SUPPORTED_FORMAT

Public Class Methods

new() click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 24
def initialize
  super
end

Public Instance Methods

before_shutdown() click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 194
def before_shutdown
  super
  $log.debug "out_exec_filter#before_shutdown called"
  @children.each {|c|
    c.finished = true
  }
  sleep 0.5  # TODO wait time before killing child process
end
configure(conf) click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 79
def configure(conf)
  if tag_key = conf['tag_key']
    # TODO obsoleted?
    @in_tag_key = tag_key
    @out_tag_key = tag_key
  end

  if time_key = conf['time_key']
    # TODO obsoleted?
    @in_time_key = time_key
    @out_time_key = time_key
  end

  if time_format = conf['time_format']
    # TODO obsoleted?
    @in_time_format = time_format
    @out_time_format = time_format
  end

  super

  if localtime = conf['localtime']
    @localtime = true
  elsif utc = conf['utc']
    @localtime = false
  end

  if !@tag && !@out_tag_key
    raise ConfigError, "'tag' or 'out_tag_key' option is required on exec_filter output"
  end

  if @in_time_key
    if f = @in_time_format
      tf = TimeFormatter.new(f, @localtime)
      @time_format_proc = tf.method(:format)
    else
      @time_format_proc = Proc.new {|time| time.to_s }
    end
  elsif @in_time_format
    $log.warn "in_time_format effects nothing when in_time_key is not specified: #{conf}"
  end

  if @out_time_key
    if f = @out_time_format
      @time_parse_proc = Proc.new {|str| Time.strptime(str, f).to_i }
    else
      @time_parse_proc = Proc.new {|str| str.to_i }
    end
  elsif @out_time_format
    $log.warn "out_time_format effects nothing when out_time_key is not specified: #{conf}"
  end

  if @remove_prefix
    @removed_prefix_string = @remove_prefix + '.'
    @removed_length = @removed_prefix_string.length
  end
  if @add_prefix
    @added_prefix_string = @add_prefix + '.'
  end

  case @in_format
  when :tsv
    if @in_keys.empty?
      raise ConfigError, "in_keys option is required on exec_filter output for tsv in_format"
    end
    @formatter = TSVFormatter.new(@in_keys)
  when :json
    @formatter = JSONFormatter.new
  when :msgpack
    @formatter = MessagePackFormatter.new
  end

  case @out_format
  when :tsv
    if @out_keys.empty?
      raise ConfigError, "out_keys option is required on exec_filter output for tsv in_format"
    end
    @parser = TSVParser.new(@out_keys, method(:on_message))
  when :json
    @parser = JSONParser.new(method(:on_message))
  when :msgpack
    @parser = MessagePackParser.new(method(:on_message))
  end

  @respawns = if @child_respawn.nil? or @child_respawn == 'none' or @child_respawn == '0'
                0
              elsif @child_respawn == 'inf' or @child_respawn == '-1'
                -1
              elsif @child_respawn =~ /^\d+$/
                @child_respawn.to_i
              else
                raise ConfigError, "child_respawn option argument invalid: none(or 0), inf(or -1) or positive number"
              end

  @suppress_error_log_interval ||= 0
  @next_log_time = Time.now.to_i
end
format_stream(tag, es) click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 212
def format_stream(tag, es)
  if @remove_prefix
    if (tag[0, @removed_length] == @removed_prefix_string and tag.length > @removed_length) or tag == @removed_prefix
      tag = tag[@removed_length..-1] || ''
    end
  end

  out = ''

  es.each {|time,record|
    if @in_time_key
      record[@in_time_key] = @time_format_proc.call(time)
    end
    if @in_tag_key
      record[@in_tag_key] = tag
    end
    @formatter.call(record, out)
  }

  out
end
on_message(record) click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 368
def on_message(record)
  if val = record.delete(@out_time_key)
    time = @time_parse_proc.call(val)
  else
    time = Engine.now
  end

  if val = record.delete(@out_tag_key)
    tag = if @add_prefix
            @added_prefix_string + val
          else
            val
          end
  else
    tag = @tag
  end

  Engine.emit(tag, time, record)

rescue
  if @suppress_error_log_interval == 0 || Time.now.to_i > @next_log_time
    $log.error "exec_filter failed to emit", :error=>$!.to_s, :error_class=>$!.class.to_s, :record=>Yajl.dump(record)
    $log.warn_backtrace $!.backtrace
    @next_log_time = Time.now.to_i + @suppress_error_log_interval
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 203
def shutdown
  super

  @children.reject! {|c|
    c.shutdown
    true
  }
end
start() click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 177
def start
  super

  @children = []
  @rr = 0
  begin
    @num_children.times do
      c = ChildProcess.new(@parser, @respawns)
      c.start(@command)
      @children << c
    end
  rescue
    shutdown
    raise
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 234
def write(chunk)
  r = @rr = (@rr + 1) % @children.length
  @children[r].write chunk
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.