class Fluent::BasicBuffer
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Buffer.new
# File lib/fluent/buffer.rb, line 119 def initialize super @parallel_pop = true end
Public Instance Methods
clear!()
click to toggle source
# File lib/fluent/buffer.rb, line 303 def clear! @queue.delete_if {|chunk| chunk.purge true } end
configure(conf)
click to toggle source
Calls superclass method
Fluent::Buffer#configure
# File lib/fluent/buffer.rb, line 139 def configure(conf) super end
emit(key, data, chain)
click to toggle source
# File lib/fluent/buffer.rb, line 165 def emit(key, data, chain) key = key.to_s synchronize do top = (@map[key] ||= new_chunk(key)) # TODO generate unique chunk id if storable?(top, data) chain.next top << data return false ## FIXME #elsif data.bytesize > @buffer_chunk_limit # # TODO # raise BufferChunkLimitError, "received data too large" elsif @queue.size >= @buffer_queue_limit raise BufferQueueLimitError, "queue size exceeds limit" end if data.bytesize > @buffer_chunk_limit $log.warn "Size of the emitted data exceeds buffer_chunk_limit." $log.warn "This may occur problems in the output plugins ``at this server.``" $log.warn "To avoid problems, set a smaller number to the buffer_chunk_limit" $log.warn "in the forward output ``at the log forwarding server.``" end nc = new_chunk(key) # TODO generate unique chunk id ok = false begin nc << data chain.next flush_trigger = false @queue.synchronize { enqueue(top) flush_trigger = @queue.empty? @queue << top @map[key] = nc } ok = true return flush_trigger ensure nc.purge unless ok end end # synchronize end
enable_parallel(b=true)
click to toggle source
# File lib/fluent/buffer.rb, line 124 def enable_parallel(b=true) @parallel_pop = b end
keys()
click to toggle source
# File lib/fluent/buffer.rb, line 216 def keys @map.keys end
pop(out)
click to toggle source
# File lib/fluent/buffer.rb, line 265 def pop(out) chunk = nil @queue.synchronize do if @parallel_pop chunk = @queue.find {|c| c.try_mon_enter } return false unless chunk else chunk = @queue.first return false unless chunk return false unless chunk.try_mon_enter end end begin if !chunk.empty? write_chunk(chunk, out) end empty = false @queue.synchronize do @queue.delete_if {|c| c.object_id == chunk.object_id } empty = @queue.empty? end chunk.purge return !empty ensure chunk.mon_exit end end
push(key)
click to toggle source
def enqueue(chunk) end
# File lib/fluent/buffer.rb, line 248 def push(key) synchronize do top = @map[key] if !top || top.empty? return false end @queue.synchronize do enqueue(top) @queue << top @map.delete(key) end return true end # synchronize end
queue_size()
click to toggle source
# File lib/fluent/buffer.rb, line 220 def queue_size @queue.size end
shutdown()
click to toggle source
# File lib/fluent/buffer.rb, line 148 def shutdown synchronize do @queue.synchronize do until @queue.empty? @queue.shift.close end end @map.each_pair {|key,chunk| chunk.close } end end
start()
click to toggle source
# File lib/fluent/buffer.rb, line 143 def start @queue, @map = resume @queue.extend(MonitorMixin) end
storable?(chunk, data)
click to toggle source
# File lib/fluent/buffer.rb, line 161 def storable?(chunk, data) chunk.size + data.bytesize <= @buffer_chunk_limit end
total_queued_chunk_size()
click to toggle source
# File lib/fluent/buffer.rb, line 224 def total_queued_chunk_size total = 0 synchronize { @map.each_value {|c| total += c.size } @queue.synchronize { @queue.each {|c| total += c.size } } } total end
write_chunk(chunk, out)
click to toggle source
# File lib/fluent/buffer.rb, line 299 def write_chunk(chunk, out) out.write(chunk) end