class Fluent::ExecFilterOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::BufferedOutput.new
# File lib/fluent/plugin/out_exec_filter.rb, line 23 def initialize super require 'fluent/timezone' end
Public Instance Methods
before_shutdown()
click to toggle source
Calls superclass method
Fluent::BufferedOutput#before_shutdown
# File lib/fluent/plugin/out_exec_filter.rb, line 194 def before_shutdown super log.debug "out_exec_filter#before_shutdown called" @children.each {|c| c.finished = true } sleep 0.5 # TODO wait time before killing child process end
configure(conf)
click to toggle source
Calls superclass method
Fluent::BufferedOutput#configure
# File lib/fluent/plugin/out_exec_filter.rb, line 74 def configure(conf) if tag_key = conf['tag_key'] # TODO obsoleted? @in_tag_key = tag_key @out_tag_key = tag_key end if time_key = conf['time_key'] # TODO obsoleted? @in_time_key = time_key @out_time_key = time_key end if time_format = conf['time_format'] # TODO obsoleted? @in_time_format = time_format @out_time_format = time_format end super if localtime = conf['localtime'] @localtime = true elsif utc = conf['utc'] @localtime = false end if conf['timezone'] @timezone = conf['timezone'] Fluent::Timezone.validate!(@timezone) end if !@tag && !@out_tag_key raise ConfigError, "'tag' or 'out_tag_key' option is required on exec_filter output" end if @in_time_key if f = @in_time_format tf = TimeFormatter.new(f, @localtime, @timezone) @time_format_proc = tf.method(:format) else @time_format_proc = Proc.new {|time| time.to_s } end elsif @in_time_format log.warn "in_time_format effects nothing when in_time_key is not specified: #{conf}" end if @out_time_key if f = @out_time_format @time_parse_proc = Proc.new {|str| Time.strptime(str, f).to_i } else @time_parse_proc = Proc.new {|str| str.to_i } end elsif @out_time_format log.warn "out_time_format effects nothing when out_time_key is not specified: #{conf}" end if @remove_prefix @removed_prefix_string = @remove_prefix + '.' @removed_length = @removed_prefix_string.length end if @add_prefix @added_prefix_string = @add_prefix + '.' end case @in_format when :tsv if @in_keys.empty? raise ConfigError, "in_keys option is required on exec_filter output for tsv in_format" end @formatter = ExecUtil::TSVFormatter.new(@in_keys) when :json @formatter = ExecUtil::JSONFormatter.new when :msgpack @formatter = ExecUtil::MessagePackFormatter.new end case @out_format when :tsv if @out_keys.empty? raise ConfigError, "out_keys option is required on exec_filter output for tsv in_format" end @parser = ExecUtil::TSVParser.new(@out_keys, method(:on_message)) when :json @parser = ExecUtil::JSONParser.new(method(:on_message)) when :msgpack @parser = ExecUtil::MessagePackParser.new(method(:on_message)) end @respawns = if @child_respawn.nil? or @child_respawn == 'none' or @child_respawn == '0' 0 elsif @child_respawn == 'inf' or @child_respawn == '-1' -1 elsif @child_respawn =~ /^\d+$/ @child_respawn.to_i else raise ConfigError, "child_respawn option argument invalid: none(or 0), inf(or -1) or positive number" end @suppress_error_log_interval ||= 0 @next_log_time = Time.now.to_i end
format_stream(tag, es)
click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 212 def format_stream(tag, es) if @remove_prefix if (tag[0, @removed_length] == @removed_prefix_string and tag.length > @removed_length) or tag == @removed_prefix tag = tag[@removed_length..-1] || '' end end out = '' es.each {|time,record| if @in_time_key record[@in_time_key] = @time_format_proc.call(time) end if @in_tag_key record[@in_tag_key] = tag end @formatter.call(record, out) } out end
on_message(record)
click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 337 def on_message(record) if val = record.delete(@out_time_key) time = @time_parse_proc.call(val) else time = Engine.now end if val = record.delete(@out_tag_key) tag = if @add_prefix @added_prefix_string + val else val end else tag = @tag end router.emit(tag, time, record) rescue if @suppress_error_log_interval == 0 || Time.now.to_i > @next_log_time log.error "exec_filter failed to emit", :error=>$!.to_s, :error_class=>$!.class.to_s, :record=>Yajl.dump(record) log.warn_backtrace $!.backtrace @next_log_time = Time.now.to_i + @suppress_error_log_interval end end
shutdown()
click to toggle source
Calls superclass method
Fluent::BufferedOutput#shutdown
# File lib/fluent/plugin/out_exec_filter.rb, line 203 def shutdown super @children.reject! {|c| c.shutdown true } end
start()
click to toggle source
Calls superclass method
Fluent::BufferedOutput#start
# File lib/fluent/plugin/out_exec_filter.rb, line 177 def start super @children = [] @rr = 0 begin @num_children.times do c = ChildProcess.new(@parser, @respawns, log) c.start(@command) @children << c end rescue shutdown raise end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 234 def write(chunk) r = @rr = (@rr + 1) % @children.length @children[r].write chunk end