# File lib/fluent/engine.rb, line 22 def initialize @matches = [] @sources = [] @match_cache = {} @match_cache_keys = [] @started = [] @default_loop = nil @log_emit_thread = nil @log_event_loop_stop = false @log_event_queue = [] @suppress_emit_error_log_interval = 0 @next_emit_error_log_time = nil end
# File lib/fluent/engine.rb, line 74 def configure(conf) $log.info "using configuration file: #{conf.to_s.rstrip}" conf.elements.select {|e| e.name == 'source' }.each {|e| type = e['type'] unless type raise ConfigError, "Missing 'type' parameter on <source> directive" end $log.info "adding source type=#{type.dump}" input = Plugin.new_input(type) input.configure(e) @sources << input } conf.elements.select {|e| e.name == 'match' }.each {|e| type = e['type'] pattern = e.arg unless type raise ConfigError, "Missing 'type' parameter on <match #{e.arg}> directive" end $log.info "adding match", :pattern=>pattern, :type=>type output = Plugin.new_output(type) output.configure(e) match = Match.new(pattern, output) @matches << match } end
# File lib/fluent/engine.rb, line 114 def emit(tag, time, record) emit_stream tag, OneEventStream.new(time, record) end
# File lib/fluent/engine.rb, line 118 def emit_array(tag, array) emit_stream tag, ArrayEventStream.new(array) end
# File lib/fluent/engine.rb, line 122 def emit_stream(tag, es) target = @match_cache[tag] unless target target = match(tag) || NoMatchMatch.new # this is not thread-safe but inconsistency doesn't # cause serious problems while locking causes. if @match_cache_keys.size >= MATCH_CACHE_SIZE @match_cache_keys.delete @match_cache_keys.shift end @match_cache[tag] = target @match_cache_keys << tag end target.emit(tag, es) rescue if @suppress_emit_error_log_interval == 0 || now > @next_emit_error_log_time $log.warn "emit transaction failed ", :error=>$!.to_s $log.warn_backtrace # $log.debug "current next_emit_error_log_time: #{Time.at(@next_emit_error_log_time)}" @next_emit_error_log_time = Time.now.to_i + @suppress_emit_error_log_interval # $log.debug "next emit failure log suppressed" # $log.debug "next logged time is #{Time.at(@next_emit_error_log_time)}" end raise end
# File lib/fluent/engine.rb, line 155 def flush! flush_recursive(@matches) end
# File lib/fluent/engine.rb, line 44 def init BasicSocket.do_not_reverse_lookup = true Plugin.load_plugins if defined?(Encoding) Encoding.default_internal = 'ASCII-8BIT' if Encoding.respond_to?(:default_internal) Encoding.default_external = 'ASCII-8BIT' if Encoding.respond_to?(:default_external) end self end
# File lib/fluent/engine.rb, line 110 def load_plugin_dir(dir) Plugin.load_plugin_dir(dir) end
# File lib/fluent/engine.rb, line 164 def log_event_loop $log.disable_events(Thread.current) while sleep(LOG_EMIT_INTERVAL) break if @log_event_loop_stop next if @log_event_queue.empty? # NOTE: thead-safe of slice! depends on GVL events = @log_event_queue.slice!(0..-1) next if events.empty? events.each {|tag,time,record| begin Engine.emit(tag, time, record) rescue $log.error "failed to emit fluentd's log event", :tag => tag, :event => record, :error => $! end } end end
# File lib/fluent/engine.rb, line 147 def match(tag) @matches.find {|m| m.match(tag) } end
# File lib/fluent/engine.rb, line 151 def match?(tag) !!match(tag) end
# File lib/fluent/engine.rb, line 159 def now # TODO thread update Time.now.to_i end
# File lib/fluent/engine.rb, line 66 def parse_config(io, fname, basepath=Dir.pwd) conf = Config.parse(io, fname, basepath) configure(conf) conf.check_not_fetched {|key,e| $log.warn "parameter '#{key}' in #{e.to_s.strip} is not used." } end
# File lib/fluent/engine.rb, line 221 def push_log_event(tag, time, record) return if @log_emit_thread.nil? @log_event_queue.push([tag, time, record]) end
# File lib/fluent/engine.rb, line 59 def read_config(path) $log.info "reading config file", :path=>path File.open(path) {|io| parse_config(io, File.basename(path), File.dirname(path)) } end
# File lib/fluent/engine.rb, line 185 def run begin start if match?($log.tag) $log.enable_event @log_emit_thread = Thread.new(&method(:log_event_loop)) end # for empty loop @default_loop = Coolio::Loop.default @default_loop.attach Coolio::TimerWatcher.new(1, true) # TODO attach async watch for thread pool @default_loop.run rescue $log.error "unexpected error", :error=>$!.to_s $log.error_backtrace ensure $log.info "shutting down fluentd" shutdown if @log_emit_thread @log_event_loop_stop = true @log_emit_thread.join end end end
Generated with the Darkfish Rdoc Generator 2.