BufferedOutput
# File lib/fluent/plugin/out_exec_filter.rb, line 194 def before_shutdown super $log.debug "out_exec_filter#before_shutdown called" @children.each {|c| c.finished = true } sleep 0.5 # TODO wait time before killing child process end
# File lib/fluent/plugin/out_exec_filter.rb, line 79 def configure(conf) if tag_key = conf['tag_key'] # TODO obsoleted? @in_tag_key = tag_key @out_tag_key = tag_key end if time_key = conf['time_key'] # TODO obsoleted? @in_time_key = time_key @out_time_key = time_key end if time_format = conf['time_format'] # TODO obsoleted? @in_time_format = time_format @out_time_format = time_format end super if localtime = conf['localtime'] @localtime = true elsif utc = conf['utc'] @localtime = false end if !@tag && !@out_tag_key raise ConfigError, "'tag' or 'out_tag_key' option is required on exec_filter output" end if @in_time_key if f = @in_time_format tf = TimeFormatter.new(f, @localtime) @time_format_proc = tf.method(:format) else @time_format_proc = Proc.new {|time| time.to_s } end elsif @in_time_format $log.warn "in_time_format effects nothing when in_time_key is not specified: #{conf}" end if @out_time_key if f = @out_time_format @time_parse_proc = Proc.new {|str| Time.strptime(str, f).to_i } else @time_parse_proc = Proc.new {|str| str.to_i } end elsif @out_time_format $log.warn "out_time_format effects nothing when out_time_key is not specified: #{conf}" end if @remove_prefix @removed_prefix_string = @remove_prefix + '.' @removed_length = @removed_prefix_string.length end if @add_prefix @added_prefix_string = @add_prefix + '.' end case @in_format when :tsv if @in_keys.empty? raise ConfigError, "in_keys option is required on exec_filter output for tsv in_format" end @formatter = TSVFormatter.new(@in_keys) when :json @formatter = JSONFormatter.new when :msgpack @formatter = MessagePackFormatter.new end case @out_format when :tsv if @out_keys.empty? raise ConfigError, "out_keys option is required on exec_filter output for tsv in_format" end @parser = TSVParser.new(@out_keys, method(:on_message)) when :json @parser = JSONParser.new(method(:on_message)) when :msgpack @parser = MessagePackParser.new(method(:on_message)) end @respawns = if @child_respawn.nil? or @child_respawn == 'none' or @child_respawn == '0' 0 elsif @child_respawn == 'inf' or @child_respawn == '-1' -1 elsif @child_respawn =~ /^\d+$/ @child_respawn.to_i else raise ConfigError, "child_respawn option argument invalid: none(or 0), inf(or -1) or positive number" end @suppress_error_log_interval ||= 0 @next_log_time = Time.now.to_i end
# File lib/fluent/plugin/out_exec_filter.rb, line 212 def format_stream(tag, es) if @remove_prefix if (tag[0, @removed_length] == @removed_prefix_string and tag.length > @removed_length) or tag == @removed_prefix tag = tag[@removed_length..-1] || '' end end out = '' es.each {|time,record| if @in_time_key record[@in_time_key] = @time_format_proc.call(time) end if @in_tag_key record[@in_tag_key] = tag end @formatter.call(record, out) } out end
# File lib/fluent/plugin/out_exec_filter.rb, line 368 def on_message(record) if val = record.delete(@out_time_key) time = @time_parse_proc.call(val) else time = Engine.now end if val = record.delete(@out_tag_key) tag = if @add_prefix @added_prefix_string + val else val end else tag = @tag end Engine.emit(tag, time, record) rescue if @suppress_error_log_interval == 0 || Time.now.to_i > @next_log_time $log.error "exec_filter failed to emit", :error=>$!.to_s, :error_class=>$!.class.to_s, :record=>Yajl.dump(record) $log.warn_backtrace $!.backtrace @next_log_time = Time.now.to_i + @suppress_error_log_interval end end
# File lib/fluent/plugin/out_exec_filter.rb, line 203 def shutdown super @children.reject! {|c| c.shutdown true } end
Generated with the Darkfish Rdoc Generator 2.