Input
obsolete
def listen end
# File lib/fluent/plugin/in_stream.rb, line 47 def run @loop.run rescue $log.error "unexpected error", :error=>$!.to_s $log.error_backtrace end
message Entry {
1: long time 2: object record
}
message Forward {
1: string tag 2: list<Entry> entries
}
message PackedForward {
1: string tag 2: raw entries # msgpack stream of Entry
}
message Message {
1: string tag 2: long? time 3: object record
}
# File lib/fluent/plugin/in_stream.rb, line 75 def on_message(msg) # TODO format error tag = msg[0].to_s entries = msg[1] if entries.class == String # PackedForward es = MessagePackEventStream.new(entries, @cached_unpacker) Engine.emit_stream(tag, es) elsif entries.class == Array # Forward es = MultiEventStream.new entries.each {|e| time = e[0].to_i time = (now ||= Engine.now) if time == 0 record = e[1] es.add(time, record) } Engine.emit_stream(tag, es) else # Message time = msg[1] time = Engine.now if time == 0 record = msg[2] Engine.emit(tag, time, record) end end
Generated with the Darkfish Rdoc Generator 2.