class Fluent::ForwardInput
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Input.new
# File lib/fluent/plugin/in_forward.rb, line 24 def initialize super require 'fluent/plugin/socket_util' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Input#configure
# File lib/fluent/plugin/in_forward.rb, line 40 def configure(conf) super end
listen()
click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 74 def listen log.info "listening fluent socket on #{@bind}:#{@port}" s = Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message)) s.listen(@backlog) unless @backlog.nil? s end
run()
click to toggle source
config_param :path, :string, :default => DEFAULT_SOCKET_PATH def listen
if File.exist?(@path) File.unlink(@path) end FileUtils.mkdir_p File.dirname(@path) log.debug "listening fluent socket on #{@path}" Coolio::UNIXServer.new(@path, Handler, method(:on_message))
end
# File lib/fluent/plugin/in_forward.rb, line 91 def run if support_blocking_timeout? @loop.run(@blocking_timeout) else @loop.run end rescue => e log.error "unexpected error", :error => e, :error_class => e.class log.error_backtrace end
shutdown()
click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 60 def shutdown @loop.watchers.each {|w| w.detach } @loop.stop @usock.close unless support_blocking_timeout? listen_address = (@bind == '0.0.0.0' ? '127.0.0.1' : @bind) # This line is for connecting listen socket to stop the event loop. # We should use more better approach, e.g. using pipe, fixing cool.io with timeout, etc. TCPSocket.open(listen_address, @port) {|sock| } # FIXME @thread.join blocks without this line end @thread.join @lsock.close end
start()
click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 44 def start @loop = Coolio::Loop.new @lsock = listen @loop.attach(@lsock) @usock = SocketUtil.create_udp_socket(@bind) @usock.bind(@bind, @port) @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request)) @loop.attach(@hbr) @thread = Thread.new(&method(:run)) @cached_unpacker = $use_msgpack_5 ? nil : MessagePack::Unpacker.new end
Protected Instance Methods
on_heartbeat_request(host, port, msg)
click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 270 def on_heartbeat_request(host, port, msg) #log.trace "heartbeat request from #{host}:#{port}" begin @usock.send "\0", 0, host, port rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR end end
on_message(msg, chunk_size, source)
click to toggle source
message Entry {
1: long time 2: object record
}
message Forward {
1: string tag 2: list<Entry> entries
}
message PackedForward {
1: string tag 2: raw entries # msgpack stream of Entry
}
message Message {
1: string tag 2: long? time 3: object record
}
# File lib/fluent/plugin/in_forward.rb, line 128 def on_message(msg, chunk_size, source) if msg.nil? # for future TCP heartbeat_request return end # TODO format error tag = msg[0].to_s entries = msg[1] if @chunk_size_limit && (chunk_size > @chunk_size_limit) log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, source: source, limit: @chunk_size_limit, size: chunk_size return elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit) log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, source: source, limit: @chunk_size_warn_limit, size: chunk_size end if entries.class == String # PackedForward es = MessagePackEventStream.new(entries, @cached_unpacker) router.emit_stream(tag, es) elsif entries.class == Array # Forward es = MultiEventStream.new entries.each {|e| record = e[1] next if record.nil? time = e[0].to_i time = (now ||= Engine.now) if time == 0 es.add(time, record) } router.emit_stream(tag, es) else # Message record = msg[2] return if record.nil? time = msg[1] time = Engine.now if time == 0 router.emit(tag, time, record) end end
support_blocking_timeout?()
click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 104 def support_blocking_timeout? @loop.method(:run).arity.nonzero? end