Included Modules

Class/Module Index [+]

Quicksearch

Fluent::BasicBuffer

Public Class Methods

new() click to toggle source
# File lib/fluent/buffer.rb, line 121
def initialize
  super
  @parallel_pop = true
end

Public Instance Methods

clear!() click to toggle source
# File lib/fluent/buffer.rb, line 293
def clear!
  @queue.delete_if {|chunk|
    chunk.purge
    true
  }
end
configure(conf) click to toggle source
# File lib/fluent/buffer.rb, line 141
def configure(conf)
  super
end
emit(key, data, chain) click to toggle source
# File lib/fluent/buffer.rb, line 163
def emit(key, data, chain)
  key = key.to_s

  synchronize do
    top = (@map[key] ||= new_chunk(key))  # TODO generate unique chunk id

    if top.size + data.bytesize <= @buffer_chunk_limit
      chain.next
      top << data
      return false

    ## FIXME
    #elsif data.bytesize > @buffer_chunk_limit
    #  # TODO
    #  raise BufferChunkLimitError, "received data too large"

    elsif @queue.size >= @buffer_queue_limit
      raise BufferQueueLimitError, "queue size exceeds limit"
    end

    if data.bytesize > @buffer_chunk_limit
      $log.warn "Size of the emitted data exceeds buffer_chunk_limit."
      $log.warn "This may occur problems in the output plugins ``at this server.``"
      $log.warn "To avoid problems, set a smaller number to the buffer_chunk_limit"
      $log.warn "in the forward output ``at the log forwarding server.``"
    end

    nc = new_chunk(key)  # TODO generate unique chunk id
    ok = false

    begin
      nc << data
      chain.next

      flush_trigger = false
      @queue.synchronize {
        enqueue(top)
        flush_trigger = @queue.empty?
        @queue << top
        @map[key] = nc
      }

      ok = true
      return flush_trigger
    ensure
      nc.purge unless ok
    end

  end  # synchronize
end
enable_parallel(b=true) click to toggle source
# File lib/fluent/buffer.rb, line 126
def enable_parallel(b=true)
  @parallel_pop = b
end
keys() click to toggle source
# File lib/fluent/buffer.rb, line 214
def keys
  @map.keys
end
pop(out) click to toggle source
# File lib/fluent/buffer.rb, line 259
def pop(out)
  chunk = nil
  @queue.synchronize do
    if @parallel_pop
      chunk = @queue.find {|c| c.try_mon_enter }
      return false unless chunk
    else
      chunk = @queue.first
      return false unless chunk
      return false unless chunk.try_mon_enter
    end
  end

  begin
    if !chunk.empty?
      write_chunk(chunk, out)
    end

    @queue.delete_if {|c|
      c.object_id == chunk.object_id
    }

    chunk.purge

    return !@queue.empty?
  ensure
    chunk.mon_exit
  end
end
push(key) click to toggle source

def enqueue(chunk) end

# File lib/fluent/buffer.rb, line 242
def push(key)
  synchronize do
    top = @map[key]
    if !top || top.empty?
      return false
    end

    @queue.synchronize do
      enqueue(top)
      @queue << top
      @map.delete(key)
    end

    return true
  end  # synchronize
end
queue_size() click to toggle source
# File lib/fluent/buffer.rb, line 218
def queue_size
  @queue.size
end
shutdown() click to toggle source
# File lib/fluent/buffer.rb, line 150
def shutdown
  synchronize do
    @queue.synchronize do
      until @queue.empty?
        @queue.shift.close
      end
    end
    @map.each_pair {|key,chunk|
      chunk.close
    }
  end
end
start() click to toggle source
# File lib/fluent/buffer.rb, line 145
def start
  @queue, @map = resume
  @queue.extend(MonitorMixin)
end
total_queued_chunk_size() click to toggle source
# File lib/fluent/buffer.rb, line 222
def total_queued_chunk_size
  total = 0
  @map.each_value {|c|
    total += c.size
  }
  @queue.each {|c|
    total += c.size
  }
  total
end
write_chunk(chunk, out) click to toggle source
# File lib/fluent/buffer.rb, line 289
def write_chunk(chunk, out)
  out.write(chunk)
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.