# File lib/fluent/buffer.rb, line 293 def clear! @queue.delete_if {|chunk| chunk.purge true } end
# File lib/fluent/buffer.rb, line 141 def configure(conf) super end
# File lib/fluent/buffer.rb, line 163 def emit(key, data, chain) key = key.to_s synchronize do top = (@map[key] ||= new_chunk(key)) # TODO generate unique chunk id if top.size + data.bytesize <= @buffer_chunk_limit chain.next top << data return false ## FIXME #elsif data.bytesize > @buffer_chunk_limit # # TODO # raise BufferChunkLimitError, "received data too large" elsif @queue.size >= @buffer_queue_limit raise BufferQueueLimitError, "queue size exceeds limit" end if data.bytesize > @buffer_chunk_limit $log.warn "Size of the emitted data exceeds buffer_chunk_limit." $log.warn "This may occur problems in the output plugins ``at this server.``" $log.warn "To avoid problems, set a smaller number to the buffer_chunk_limit" $log.warn "in the forward output ``at the log forwarding server.``" end nc = new_chunk(key) # TODO generate unique chunk id ok = false begin nc << data chain.next flush_trigger = false @queue.synchronize { enqueue(top) flush_trigger = @queue.empty? @queue << top @map[key] = nc } ok = true return flush_trigger ensure nc.purge unless ok end end # synchronize end
# File lib/fluent/buffer.rb, line 126 def enable_parallel(b=true) @parallel_pop = b end
# File lib/fluent/buffer.rb, line 259 def pop(out) chunk = nil @queue.synchronize do if @parallel_pop chunk = @queue.find {|c| c.try_mon_enter } return false unless chunk else chunk = @queue.first return false unless chunk return false unless chunk.try_mon_enter end end begin if !chunk.empty? write_chunk(chunk, out) end @queue.delete_if {|c| c.object_id == chunk.object_id } chunk.purge return !@queue.empty? ensure chunk.mon_exit end end
def enqueue(chunk) end
# File lib/fluent/buffer.rb, line 242 def push(key) synchronize do top = @map[key] if !top || top.empty? return false end @queue.synchronize do enqueue(top) @queue << top @map.delete(key) end return true end # synchronize end
# File lib/fluent/buffer.rb, line 218 def queue_size @queue.size end
# File lib/fluent/buffer.rb, line 150 def shutdown synchronize do @queue.synchronize do until @queue.empty? @queue.shift.close end end @map.each_pair {|key,chunk| chunk.close } end end
# File lib/fluent/buffer.rb, line 145 def start @queue, @map = resume @queue.extend(MonitorMixin) end
Generated with the Darkfish Rdoc Generator 2.