class Fluent::ForwardInput::Handler
Constants
- PEERADDR_FAILED
Public Class Methods
new(io, linger_timeout, log, on_message)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_forward.rb, line 180 def initialize(io, linger_timeout, log, on_message) super(io) if io.is_a?(TCPSocket) # for unix domain socket support in the future proto, port, host, addr = ( io.peeraddr rescue PEERADDR_FAILED ) @source = "host: #{host}, addr: #{addr}, port: #{port}" opt = [1, linger_timeout].pack('I!I!') # { int l_onoff; int l_linger; } io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) end @chunk_counter = 0 @on_message = on_message @log = log @log.trace { begin remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername) rescue => e remote_port = nil remote_addr = nil end "accepted fluent socket from '#{remote_addr}:#{remote_port}': object_id=#{self.object_id}" } end
Public Instance Methods
on_close()
click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 261 def on_close @log.trace { "closed socket" } end
on_connect()
click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 205 def on_connect end
on_read(data)
click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 208 def on_read(data) first = data[0] if first == '{' || first == '[' m = method(:on_read_json) @serializer = :to_json.to_proc @y = Yajl::Parser.new @y.on_parse_complete = lambda { |obj| option = @on_message.call(obj, @chunk_counter, @source) respond option if option @chunk_counter = 0 } else m = method(:on_read_msgpack) @serializer = :to_msgpack.to_proc @u = MessagePack::Unpacker.new end (class << self; self; end).module_eval do define_method(:on_read, m) end m.call(data) end
on_read_json(data)
click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 231 def on_read_json(data) @chunk_counter += data.bytesize @y << data rescue => e @log.error "forward error", :error => e, :error_class => e.class @log.error_backtrace close end
on_read_msgpack(data)
click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 240 def on_read_msgpack(data) @chunk_counter += data.bytesize @u.feed_each(data) do |obj| option = @on_message.call(obj, @chunk_counter, @source) respond option if option @chunk_counter = 0 end rescue => e @log.error "forward error", :error => e, :error_class => e.class @log.error_backtrace close end
respond(option)
click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 253 def respond(option) if option && option['chunk'] res = { 'ack' => option['chunk'] } write @serializer.call(res) @log.trace { "sent response to fluent socket" } end end