Parent

Class/Module Index [+]

Quicksearch

Fluent::EngineClass

Attributes

matches[R]
sources[R]

Public Class Methods

new() click to toggle source
# File lib/fluent/engine.rb, line 22
def initialize
  @matches = []
  @sources = []
  @match_cache = {}
  @match_cache_keys = []
  @started = []
  @default_loop = nil

  @log_emit_thread = nil
  @log_event_loop_stop = false
  @log_event_queue = []

  @suppress_emit_error_log_interval = 0
  @next_emit_error_log_time = nil
end

Public Instance Methods

configure(conf) click to toggle source
# File lib/fluent/engine.rb, line 74
def configure(conf)
  $log.info "using configuration file: #{conf.to_s.rstrip}"

  conf.elements.select {|e|
    e.name == 'source'
  }.each {|e|
    type = e['type']
    unless type
      raise ConfigError, "Missing 'type' parameter on <source> directive"
    end
    $log.info "adding source type=#{type.dump}"

    input = Plugin.new_input(type)
    input.configure(e)

    @sources << input
  }

  conf.elements.select {|e|
    e.name == 'match'
  }.each {|e|
    type = e['type']
    pattern = e.arg
    unless type
      raise ConfigError, "Missing 'type' parameter on <match #{e.arg}> directive"
    end
    $log.info "adding match", :pattern=>pattern, :type=>type

    output = Plugin.new_output(type)
    output.configure(e)

    match = Match.new(pattern, output)
    @matches << match
  }
end
emit(tag, time, record) click to toggle source
# File lib/fluent/engine.rb, line 114
def emit(tag, time, record)
  emit_stream tag, OneEventStream.new(time, record)
end
emit_array(tag, array) click to toggle source
# File lib/fluent/engine.rb, line 118
def emit_array(tag, array)
  emit_stream tag, ArrayEventStream.new(array)
end
emit_stream(tag, es) click to toggle source
# File lib/fluent/engine.rb, line 122
def emit_stream(tag, es)
  target = @match_cache[tag]
  unless target
    target = match(tag) || NoMatchMatch.new
    # this is not thread-safe but inconsistency doesn't
    # cause serious problems while locking causes.
    if @match_cache_keys.size >= MATCH_CACHE_SIZE
      @match_cache_keys.delete @match_cache_keys.shift
    end
    @match_cache[tag] = target
    @match_cache_keys << tag
  end
  target.emit(tag, es)
rescue
  if @suppress_emit_error_log_interval == 0 || now > @next_emit_error_log_time
    $log.warn "emit transaction failed ", :error=>$!.to_s
    $log.warn_backtrace
    # $log.debug "current next_emit_error_log_time: #{Time.at(@next_emit_error_log_time)}"
    @next_emit_error_log_time = Time.now.to_i + @suppress_emit_error_log_interval
    # $log.debug "next emit failure log suppressed"
    # $log.debug "next logged time is #{Time.at(@next_emit_error_log_time)}"
  end
  raise
end
flush!() click to toggle source
# File lib/fluent/engine.rb, line 155
def flush!
  flush_recursive(@matches)
end
init() click to toggle source
# File lib/fluent/engine.rb, line 44
def init
  BasicSocket.do_not_reverse_lookup = true
  Plugin.load_plugins
  if defined?(Encoding)
    Encoding.default_internal = 'ASCII-8BIT' if Encoding.respond_to?(:default_internal)
    Encoding.default_external = 'ASCII-8BIT' if Encoding.respond_to?(:default_external)
  end
  self
end
load_plugin_dir(dir) click to toggle source
# File lib/fluent/engine.rb, line 110
def load_plugin_dir(dir)
  Plugin.load_plugin_dir(dir)
end
log_event_loop() click to toggle source
# File lib/fluent/engine.rb, line 164
def log_event_loop
  $log.disable_events(Thread.current)

  while sleep(LOG_EMIT_INTERVAL)
    break if @log_event_loop_stop
    next if @log_event_queue.empty?

    # NOTE: thead-safe of slice! depends on GVL
    events = @log_event_queue.slice!(0..-1)
    next if events.empty?

    events.each {|tag,time,record|
      begin
        Engine.emit(tag, time, record)
      rescue
        $log.error "failed to emit fluentd's log event", :tag => tag, :event => record, :error => $!
      end
    }
  end
end
match(tag) click to toggle source
# File lib/fluent/engine.rb, line 147
def match(tag)
  @matches.find {|m| m.match(tag) }
end
match?(tag) click to toggle source
# File lib/fluent/engine.rb, line 151
def match?(tag)
  !!match(tag)
end
now() click to toggle source
# File lib/fluent/engine.rb, line 159
def now
  # TODO thread update
  Time.now.to_i
end
parse_config(io, fname, basepath=Dir.pwd) click to toggle source
# File lib/fluent/engine.rb, line 66
def parse_config(io, fname, basepath=Dir.pwd)
  conf = Config.parse(io, fname, basepath)
  configure(conf)
  conf.check_not_fetched {|key,e|
    $log.warn "parameter '#{key}' in #{e.to_s.strip} is not used."
  }
end
push_log_event(tag, time, record) click to toggle source
# File lib/fluent/engine.rb, line 221
def push_log_event(tag, time, record)
  return if @log_emit_thread.nil?
  @log_event_queue.push([tag, time, record])
end
read_config(path) click to toggle source
# File lib/fluent/engine.rb, line 59
def read_config(path)
  $log.info "reading config file", :path=>path
  File.open(path) {|io|
    parse_config(io, File.basename(path), File.dirname(path))
  }
end
run() click to toggle source
# File lib/fluent/engine.rb, line 185
def run
  begin
    start

    if match?($log.tag)
      $log.enable_event
      @log_emit_thread = Thread.new(&method(:log_event_loop))
    end

    # for empty loop
    @default_loop = Coolio::Loop.default
    @default_loop.attach Coolio::TimerWatcher.new(1, true)
    # TODO attach async watch for thread pool
    @default_loop.run

  rescue
    $log.error "unexpected error", :error=>$!.to_s
    $log.error_backtrace
  ensure
    $log.info "shutting down fluentd"
    shutdown
    if @log_emit_thread
      @log_event_loop_stop = true
      @log_emit_thread.join
    end
  end
end
stop() click to toggle source
# File lib/fluent/engine.rb, line 213
def stop
  if @default_loop
    @default_loop.stop
    @default_loop = nil
  end
  nil
end
suppress_interval(interval_time) click to toggle source
# File lib/fluent/engine.rb, line 54
def suppress_interval(interval_time)
  @suppress_emit_error_log_interval = interval_time
  @next_emit_error_log_time = Time.now.to_i
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.