Parent

Class/Module Index [+]

Quicksearch

Fluent::StreamInput::Handler

Public Class Methods

new(io, on_message) click to toggle source
# File lib/fluent/plugin/in_stream.rb, line 106
def initialize(io, on_message)
  super(io)
  if io.is_a?(TCPSocket)
    opt = [1, @timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
    io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
  end
  $log.trace { "accepted fluent socket object_id=#{self.object_id}" }
  @on_message = on_message
end

Public Instance Methods

on_close() click to toggle source
# File lib/fluent/plugin/in_stream.rb, line 152
def on_close
  $log.trace { "closed fluent socket object_id=#{self.object_id}" }
end
on_connect() click to toggle source
# File lib/fluent/plugin/in_stream.rb, line 116
def on_connect
end
on_read(data) click to toggle source
# File lib/fluent/plugin/in_stream.rb, line 119
def on_read(data)
  first = data[0]
  if first == '{' || first == '['
    m = method(:on_read_json)
    @y = Yajl::Parser.new
    @y.on_parse_complete = @on_message
  else
    m = method(:on_read_msgpack)
    @u = MessagePack::Unpacker.new
  end

  (class<<self;self;end).module_eval do
    define_method(:on_read, m)
  end
  m.call(data)
end
on_read_json(data) click to toggle source
# File lib/fluent/plugin/in_stream.rb, line 136
def on_read_json(data)
  @y << data
rescue
  $log.error "unexpected error", :error=>$!.to_s
  $log.error_backtrace
  close
end
on_read_msgpack(data) click to toggle source
# File lib/fluent/plugin/in_stream.rb, line 144
def on_read_msgpack(data)
  @u.feed_each(data, &@on_message)
rescue
  $log.error "unexpected error", :error=>$!.to_s
  $log.error_backtrace
  close
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.