class Fluent::RootAgent
Fluentd forms a tree structure to manage plugins:
RootAgent | +------------+-------------+-------------+ | | | | <label> <source> <filter> <match> | +----+----+ | | <filter> <match>
Relation:
-
RootAgent has many <label>, <source>, <filter> and <match>
-
<label> has many <match> and <filter>
Next step: `fluentd/agent.rb` Next step: 'fluentd/label.rb'
Constants
- ERROR_LABEL
Attributes
inputs[R]
labels[R]
Public Class Methods
new(opts = {})
click to toggle source
Calls superclass method
Fluent::Agent.new
# File lib/fluent/root_agent.rb, line 46 def initialize(opts = {}) super @labels = {} @inputs = [] @started_inputs = [] @suppress_emit_error_log_interval = 0 @next_emit_error_log_time = nil suppress_interval(opts[:suppress_interval]) if opts[:suppress_interval] @without_source = opts[:without_source] if opts[:without_source] end
Public Instance Methods
add_label(name)
click to toggle source
# File lib/fluent/root_agent.rb, line 155 def add_label(name) label = Label.new(name) label.root_agent = self @labels[name] = label end
add_source(type, conf)
click to toggle source
# File lib/fluent/root_agent.rb, line 141 def add_source(type, conf) log.info "adding source", type: type input = Plugin.new_input(type) # <source> emits events to the top-level event router (RootAgent#event_router). # Input#configure overwrites event_router to a label's event_router if it has `@label` parameter. # See also 'fluentd/plugin/input.rb' input.router = @event_router input.configure(conf) @inputs << input input end
configure(conf)
click to toggle source
Calls superclass method
Fluent::Agent#configure
# File lib/fluent/root_agent.rb, line 62 def configure(conf) error_label_config = nil # initialize <label> elements before configuring all plugins to avoid 'label not found' in input, filter and output. label_configs = {} conf.elements.select { |e| e.name == 'label' }.each { |e| name = e.arg raise ConfigError, "Missing symbol argument on <label> directive" if name.empty? if name == ERROR_LABEL error_label_config = e else add_label(name) label_configs[name] = e end } # Call 'configure' here to avoid 'label not found' label_configs.each { |name, e| @labels[name].configure(e) } setup_error_label(error_label_config) if error_label_config super # initialize <source> elements if @without_source log.info "'--without-source' is applied. Ignore <source> sections" else conf.elements.select { |e| e.name == 'source' }.each { |e| type = e['@type'] || e['type'] raise ConfigError, "Missing 'type' parameter on <source> directive" unless type add_source(type, e) } end end
emit_error_event(tag, time, record, error)
click to toggle source
# File lib/fluent/root_agent.rb, line 169 def emit_error_event(tag, time, record, error) error_info = {:error_class => error.class, :error => error.to_s, :tag => tag, :time => time} if @error_collector # A record is not included in the logs because <@ERROR> handles it. This warn is for the notification log.warn "send an error event to @ERROR:", error_info @error_collector.emit(tag, time, record) else error_info[:record] = record log.warn "dump an error event:", error_info end end
find_label(label_name)
click to toggle source
# File lib/fluent/root_agent.rb, line 161 def find_label(label_name) if label = @labels[label_name] label else raise ArgumentError, "#{label_name} label not found" end end
handle_emits_error(tag, es, error)
click to toggle source
# File lib/fluent/root_agent.rb, line 181 def handle_emits_error(tag, es, error) error_info = {:error_class => error.class, :error => error.to_s, :tag => tag} if @error_collector log.warn "send an error event stream to @ERROR:", error_info @error_collector.emit_stream(tag, es) else now = Engine.now if @suppress_emit_error_log_interval.zero? || now > @next_emit_error_log_time log.warn "emit transaction failed:", error_info log.warn_backtrace @next_emit_error_log_time = now + @suppress_emit_error_log_interval end raise error end end
setup_error_label(e)
click to toggle source
# File lib/fluent/root_agent.rb, line 96 def setup_error_label(e) error_label = add_label(ERROR_LABEL) error_label.configure(e) error_label.root_agent = RootAgentProxyWithoutErrorCollector.new(self) @error_collector = error_label.event_router end
shutdown()
click to toggle source
Calls superclass method
Fluent::Agent#shutdown
# File lib/fluent/root_agent.rb, line 116 def shutdown # Shutdown Input plugin first to prevent emitting to terminated Output plugin @started_inputs.map { |i| Thread.new do begin i.shutdown rescue => e log.warn "unexpected error while shutting down input plugin", :plugin => i.class, :plugin_id => i.plugin_id, :error_class => e.class, :error => e log.warn_backtrace end end }.each { |t| t.join } @labels.each { |n, l| l.shutdown } super end
start()
click to toggle source
Calls superclass method
Fluent::Agent#start
# File lib/fluent/root_agent.rb, line 103 def start super @labels.each { |n, l| l.start } @inputs.each { |i| i.start @started_inputs << i } end
suppress_interval(interval_time)
click to toggle source
# File lib/fluent/root_agent.rb, line 136 def suppress_interval(interval_time) @suppress_emit_error_log_interval = interval_time @next_emit_error_log_time = Time.now.to_i end