Input
# File lib/fluent/plugin/in_forward.rb, line 32 def configure(conf) super end
# File lib/fluent/plugin/in_forward.rb, line 62 def listen $log.info "listening fluent socket on #{@bind}:#{@port}" Coolio::TCPServer.new(@bind, @port, Handler, method(:on_message)) end
config_param :path, :string, :default => DEFAULT_SOCKET_PATH def listen
if File.exist?(@path) File.unlink(@path) end FileUtils.mkdir_p File.dirname(@path) $log.debug "listening fluent socket on #{@path}" Coolio::UNIXServer.new(@path, Handler, method(:on_message))
end
# File lib/fluent/plugin/in_forward.rb, line 77 def run @loop.run rescue $log.error "unexpected error", :error=>$!.to_s $log.error_backtrace end
# File lib/fluent/plugin/in_forward.rb, line 52 def shutdown @loop.watchers.each {|w| w.detach } @loop.stop @usock.close listen_address = (@bind == '0.0.0.0' ? '127.0.0.1' : @bind) TCPSocket.open(listen_address, @port) {|sock| } # FIXME @thread.join blocks without this line @thread.join @lsock.close end
# File lib/fluent/plugin/in_forward.rb, line 36 def start @loop = Coolio::Loop.new @lsock = listen @loop.attach(@lsock) @usock = SocketUtil.create_udp_socket(@bind) @usock.bind(@bind, @port) @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request)) @loop.attach(@hbr) @thread = Thread.new(&method(:run)) @cached_unpacker = $use_msgpack_5 ? nil : MessagePack::Unpacker.new end
# File lib/fluent/plugin/in_forward.rb, line 213 def on_heartbeat_request(host, port, msg) #$log.trace "heartbeat request from #{host}:#{port}" begin @usock.send "\00"", 0, host, port rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR end end
message Entry {
1: long time 2: object record
}
message Forward {
1: string tag 2: list<Entry> entries
}
message PackedForward {
1: string tag 2: raw entries # msgpack stream of Entry
}
message Message {
1: string tag 2: long? time 3: object record
}
# File lib/fluent/plugin/in_forward.rb, line 105 def on_message(msg) if msg.nil? # for future TCP heartbeat_request return end # TODO format error tag = msg[0].to_s entries = msg[1] if entries.class == String # PackedForward es = MessagePackEventStream.new(entries, @cached_unpacker) Engine.emit_stream(tag, es) elsif entries.class == Array # Forward es = MultiEventStream.new entries.each {|e| time = e[0].to_i time = (now ||= Engine.now) if time == 0 record = e[1] es.add(time, record) } Engine.emit_stream(tag, es) else # Message time = msg[1] time = Engine.now if time == 0 record = msg[2] Engine.emit(tag, time, record) end end
Generated with the Darkfish Rdoc Generator 2.