class Fluent::SocketUtil::BaseInput

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Input.new
# File lib/fluent/plugin/socket_util.rb, line 76
def initialize
  super
  require 'fluent/parser'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Input#configure
# File lib/fluent/plugin/socket_util.rb, line 87
def configure(conf)
  super

  @parser = Plugin.new_parser(@format)
  @parser.configure(conf)
end
run() click to toggle source
# File lib/fluent/plugin/socket_util.rb, line 108
def run
  if support_blocking_timeout?
    @loop.run(0.5)
  else
    @loop.run
  end
rescue => e
  log.error "unexpected error", :error => e, :error_class => e.class
  log.error_backtrace
end
shutdown() click to toggle source
# File lib/fluent/plugin/socket_util.rb, line 101
def shutdown
  @loop.watchers.each { |w| w.detach }
  @loop.stop
  @handler.close
  @thread.join
end
start() click to toggle source
# File lib/fluent/plugin/socket_util.rb, line 94
def start
  @loop = Coolio::Loop.new
  @handler = listen(method(:on_message))
  @loop.attach(@handler)
  @thread = Thread.new(&method(:run))
end

Private Instance Methods

on_message(msg, addr) click to toggle source
# File lib/fluent/plugin/socket_util.rb, line 125
def on_message(msg, addr)
  @parser.parse(msg) { |time, record|
    unless time && record
      log.warn "pattern not match: #{msg.inspect}"
      return
    end

    record[@source_host_key] = addr[3] if @source_host_key
    router.emit(@tag, time, record)
  }
rescue => e
  log.error msg.dump, :error => e, :error_class => e.class, :host => addr[3]
  log.error_backtrace
end
support_blocking_timeout?() click to toggle source
# File lib/fluent/plugin/socket_util.rb, line 121
def support_blocking_timeout?
  @loop.method(:run).arity.nonzero?
end