Parent

Class/Module Index [+]

Quicksearch

Fluent::ForwardOutput

Constants

FORWARD_HEADER

MessagePack FixArray length = 2

Attributes

nodes[R]

Public Class Methods

new() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 24
def initialize
  super
  require 'socket'
  require 'fileutils'
  require 'fluent/plugin/socket_util'
  @nodes = []  #=> [Node]
end

Public Instance Methods

configure(conf) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 54
def configure(conf)
  super

  # backward compatibility
  if host = conf['host']
    $log.warn "'host' option in forward output is obsoleted. Use '<server> host xxx </server>' instead."
    port = conf['port']
    port = port ? port.to_i : DEFAULT_LISTEN_PORT
    e = conf.add_element('server')
    e['host'] = host
    e['port'] = port.to_s
  end

  recover_sample_size = @recover_wait / @heartbeat_interval

  conf.elements.each {|e|
    next if e.name != "server"

    host = e['host']
    port = e['port']
    port = port ? port.to_i : DEFAULT_LISTEN_PORT

    weight = e['weight']
    weight = weight ? weight.to_i : 60

    standby = !!e['standby']

    name = e['name']
    unless name
      name = "#{host}:#{port}"
    end

    failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f)
    @nodes << Node.new(name, host, port, weight, standby, failure,
                       @phi_threshold, recover_sample_size, @expire_dns_cache)
    $log.info "adding forwarding server '#{name}'", :host=>host, :port=>port, :weight=>weight
  }
end
run() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 124
def run
  @loop.run
rescue
  $log.error "unexpected error", :error=>$!.to_s
  $log.error_backtrace
end
shutdown() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 116
def shutdown
  @finished = true
  @loop.watchers.each {|w| w.detach }
  @loop.stop
  @thread.join
  @usock.close if @usock
end
start() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 93
def start
  super

  @rand_seed = Random.new.seed
  rebuild_weight_array
  @rr = 0

  @loop = Coolio::Loop.new

  if @heartbeat_type == :udp
    # assuming all hosts use udp
    @usock = SocketUtil.create_udp_socket(@nodes.first.host)
    @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
    @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat))
    @loop.attach(@hb)
  end

  @timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer))
  @loop.attach(@timer)

  @thread = Thread.new(&method(:run))
end
write_objects(tag, chunk) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 131
def write_objects(tag, chunk)
  return if chunk.empty?

  error = nil

  wlen = @weight_array.length
  wlen.times do
    @rr = (@rr + 1) % wlen
    node = @weight_array[@rr]

    if node.available?
      begin
        send_data(node, tag, chunk)
        return
      rescue
        # for load balancing during detecting crashed servers
        error = $!  # use the latest error
      end
    end
  end

  if error
    raise error
  else
    raise "no nodes are available"  # TODO message
  end
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.