class Fluent::EngineClass
Constants
- LOG_EMIT_INTERVAL
- MATCH_CACHE_SIZE
Attributes
matches[R]
root_agent[R]
sources[R]
Public Class Methods
new()
click to toggle source
# File lib/fluent/engine.rb, line 22 def initialize @root_agent = nil @event_router = nil @default_loop = nil @engine_stopped = false @log_emit_thread = nil @log_event_loop_stop = false @log_event_queue = [] @suppress_config_dump = false end
Public Instance Methods
configure(conf)
click to toggle source
# File lib/fluent/engine.rb, line 97 def configure(conf) # plugins / configuration dumps Gem::Specification.find_all.select{|x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/}.each do |spec| $log.info "gem '#{spec.name}' version '#{spec.version}'" end @root_agent.configure(conf) @event_router = @root_agent.event_router unless @suppress_config_dump $log.info "using configuration file: #{conf.to_s.rstrip}" end end
emit(tag, time, record)
click to toggle source
# File lib/fluent/engine.rb, line 115 def emit(tag, time, record) unless record.nil? emit_stream tag, OneEventStream.new(time, record) end end
emit_array(tag, array)
click to toggle source
# File lib/fluent/engine.rb, line 121 def emit_array(tag, array) emit_stream tag, ArrayEventStream.new(array) end
emit_stream(tag, es)
click to toggle source
# File lib/fluent/engine.rb, line 125 def emit_stream(tag, es) @event_router.emit_stream(tag, es) end
flush!()
click to toggle source
# File lib/fluent/engine.rb, line 129 def flush! @root_agent.flush! end
init(opts = {})
click to toggle source
# File lib/fluent/engine.rb, line 41 def init(opts = {}) BasicSocket.do_not_reverse_lookup = true Plugin.load_plugins if defined?(Encoding) Encoding.default_internal = 'ASCII-8BIT' if Encoding.respond_to?(:default_internal) Encoding.default_external = 'ASCII-8BIT' if Encoding.respond_to?(:default_external) end suppress_interval(opts[:suppress_interval]) if opts[:suppress_interval] @suppress_config_dump = opts[:suppress_config_dump] if opts[:suppress_config_dump] @without_source = opts[:without_source] if opts[:without_source] @root_agent = RootAgent.new(opts) self end
load_plugin_dir(dir)
click to toggle source
# File lib/fluent/engine.rb, line 111 def load_plugin_dir(dir) Plugin.load_plugin_dir(dir) end
log()
click to toggle source
# File lib/fluent/engine.rb, line 58 def log $log end
log_event_loop()
click to toggle source
# File lib/fluent/engine.rb, line 138 def log_event_loop $log.disable_events(Thread.current) while sleep(LOG_EMIT_INTERVAL) break if @log_event_loop_stop next if @log_event_queue.empty? # NOTE: thead-safe of slice! depends on GVL events = @log_event_queue.slice!(0..-1) next if events.empty? events.each {|tag,time,record| begin @event_router.emit(tag, time, record) rescue => e $log.error "failed to emit fluentd's log event", :tag => tag, :event => record, :error_class => e.class, :error => e end } end end
now()
click to toggle source
# File lib/fluent/engine.rb, line 133 def now # TODO thread update Time.now.to_i end
parse_config(io, fname, basepath = Dir.pwd, v1_config = false)
click to toggle source
# File lib/fluent/engine.rb, line 67 def parse_config(io, fname, basepath = Dir.pwd, v1_config = false) if fname =~ /\.rb$/ require 'fluent/config/dsl' Config::DSL::Parser.parse(io, File.join(basepath, fname)) else Config.parse(io, fname, basepath, v1_config) end end
push_log_event(tag, time, record)
click to toggle source
# File lib/fluent/engine.rb, line 203 def push_log_event(tag, time, record) return if @log_emit_thread.nil? @log_event_queue.push([tag, time, record]) end
run()
click to toggle source
# File lib/fluent/engine.rb, line 159 def run begin start if @event_router.match?($log.tag) $log.enable_event @log_emit_thread = Thread.new(&method(:log_event_loop)) end unless @engine_stopped # for empty loop @default_loop = Coolio::Loop.default @default_loop.attach Coolio::TimerWatcher.new(1, true) # TODO attach async watch for thread pool @default_loop.run end if @engine_stopped and @default_loop @default_loop.stop @default_loop = nil end rescue => e $log.error "unexpected error", :error_class=>e.class, :error=>e $log.error_backtrace ensure $log.info "shutting down fluentd" shutdown if @log_emit_thread @log_event_loop_stop = true @log_emit_thread.join end end end
run_configure(conf)
click to toggle source
# File lib/fluent/engine.rb, line 76 def run_configure(conf) configure(conf) conf.check_not_fetched { |key, e| parent_name, plugin_name = e.unused_in if parent_name message = if plugin_name "section <#{e.name}> is not used in <#{parent_name}> of #{plugin_name} plugin" else "section <#{e.name}> is not used in <#{parent_name}>" end $log.warn message next end unless e.name == 'system' unless @without_source && e.name == 'source' $log.warn "parameter '#{key}' in #{e.to_s.strip} is not used." end end } end
stop()
click to toggle source
# File lib/fluent/engine.rb, line 194 def stop @engine_stopped = true if @default_loop @default_loop.stop @default_loop = nil end nil end
suppress_interval(interval_time)
click to toggle source
# File lib/fluent/engine.rb, line 62 def suppress_interval(interval_time) @suppress_emit_error_log_interval = interval_time @next_emit_error_log_time = Time.now.to_i end
Private Instance Methods
shutdown()
click to toggle source
# File lib/fluent/engine.rb, line 214 def shutdown @root_agent.shutdown end
start()
click to toggle source
# File lib/fluent/engine.rb, line 210 def start @root_agent.start end