class Fluent::ForwardOutput

Constants

FORWARD_HEADER

MessagePack FixArray length = 2

NodeConfig

Attributes

nodes[R]

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::ObjectBufferedOutput.new
# File lib/fluent/plugin/out_forward.rb, line 22
def initialize
  super
  require 'socket'
  require 'fileutils'
  require 'fluent/plugin/socket_util'
  @nodes = []  #=> [Node]
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::BufferedOutput#configure
# File lib/fluent/plugin/out_forward.rb, line 53
def configure(conf)
  super

  # backward compatibility
  if host = conf['host']
    log.warn "'host' option in forward output is obsoleted. Use '<server> host xxx </server>' instead."
    port = conf['port']
    port = port ? port.to_i : DEFAULT_LISTEN_PORT
    e = conf.add_element('server')
    e['host'] = host
    e['port'] = port.to_s
  end

  recover_sample_size = @recover_wait / @heartbeat_interval

  conf.elements.each {|e|
    next if e.name != "server"

    host = e['host']
    port = e['port']
    port = port ? port.to_i : DEFAULT_LISTEN_PORT

    weight = e['weight']
    weight = weight ? weight.to_i : 60

    standby = !!e['standby']

    name = e['name']
    unless name
      name = "#{host}:#{port}"
    end

    failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f)

    node_conf = NodeConfig.new(name, host, port, weight, standby, failure,
      @phi_threshold, recover_sample_size, @expire_dns_cache, @phi_failure_detector)
    @nodes << Node.new(log, node_conf)
    log.info "adding forwarding server '#{name}'", :host=>host, :port=>port, :weight=>weight, :plugin_id=>plugin_id
  }
end
run() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 125
def run
  @loop.run
rescue
  log.error "unexpected error", :error=>$!.to_s
  log.error_backtrace
end
shutdown() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 117
def shutdown
  @finished = true
  @loop.watchers.each {|w| w.detach }
  @loop.stop
  @thread.join
  @usock.close if @usock
end
start() click to toggle source
Calls superclass method Fluent::BufferedOutput#start
# File lib/fluent/plugin/out_forward.rb, line 94
def start
  super

  @rand_seed = Random.new.seed
  rebuild_weight_array
  @rr = 0

  @loop = Coolio::Loop.new

  if @heartbeat_type == :udp
    # assuming all hosts use udp
    @usock = SocketUtil.create_udp_socket(@nodes.first.host)
    @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
    @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat))
    @loop.attach(@hb)
  end

  @timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer))
  @loop.attach(@timer)

  @thread = Thread.new(&method(:run))
end
write_objects(tag, chunk) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 132
def write_objects(tag, chunk)
  return if chunk.empty?

  error = nil

  wlen = @weight_array.length
  wlen.times do
    @rr = (@rr + 1) % wlen
    node = @weight_array[@rr]

    if node.available?
      begin
        send_data(node, tag, chunk)
        return
      rescue
        # for load balancing during detecting crashed servers
        error = $!  # use the latest error
      end
    end
  end

  if error
    raise error
  else
    raise "no nodes are available"  # TODO message
  end
end

Private Instance Methods

connect(node) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 260
def connect(node)
  # TODO unix socket?
  TCPSocket.new(node.resolved_host, node.port)
end
on_heartbeat(sockaddr, msg) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 320
def on_heartbeat(sockaddr, msg)
  port, host = Socket.unpack_sockaddr_in(sockaddr)
  if node = @nodes.find {|n| n.sockaddr == sockaddr }
    #log.trace "heartbeat from '#{node.name}'", :host=>node.host, :port=>node.port
    if node.heartbeat
      rebuild_weight_array
    end
  end
end
on_timer() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 278
def on_timer
  return if @finished
  @nodes.each {|n|
    if n.tick
      rebuild_weight_array
    end
    begin
      #log.trace "sending heartbeat #{n.host}:#{n.port} on #{@heartbeat_type}"
      if @heartbeat_type == :tcp
        send_heartbeat_tcp(n)
      else
        @usock.send "\0", 0, Socket.pack_sockaddr_in(n.port, n.resolved_host)
      end
    rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNREFUSED
      # TODO log
      log.debug "failed to send heartbeat packet to #{n.host}:#{n.port}", :error=>$!.to_s
    end
  }
end
rebuild_weight_array() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 162
def rebuild_weight_array
  standby_nodes, regular_nodes = @nodes.partition {|n|
    n.standby?
  }

  lost_weight = 0
  regular_nodes.each {|n|
    unless n.available?
      lost_weight += n.weight
    end
  }
  log.debug "rebuilding weight array", :lost_weight=>lost_weight

  if lost_weight > 0
    standby_nodes.each {|n|
      if n.available?
        regular_nodes << n
        log.warn "using standby node #{n.host}:#{n.port}", :weight=>n.weight
        lost_weight -= n.weight
        break if lost_weight <= 0
      end
    }
  end

  weight_array = []
  gcd = regular_nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) }
  regular_nodes.each {|n|
    (n.weight / gcd).times {
      weight_array << n
    }
  }

  # for load balancing during detecting crashed servers
  coe = (regular_nodes.size * 6) / weight_array.size
  weight_array *= coe if coe > 1

  r = Random.new(@rand_seed)
  weight_array.sort_by! { r.rand }

  @weight_array = weight_array
end
send_data(node, tag, chunk) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 223
def send_data(node, tag, chunk)
  sock = connect(node)
  begin
    opt = [1, @send_timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)

    opt = [@send_timeout.to_i, 0].pack('L!L!')  # struct timeval
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

    # beginArray(2)
    sock.write FORWARD_HEADER

    # writeRaw(tag)
    sock.write tag.to_msgpack  # tag

    # beginRaw(size)
    sz = chunk.size
    #if sz < 32
    #  # FixRaw
    #  sock.write [0xa0 | sz].pack('C')
    #elsif sz < 65536
    #  # raw 16
    #  sock.write [0xda, sz].pack('Cn')
    #else
    # raw 32
    sock.write [0xdb, sz].pack('CN')
    #end

    # writeRawBody(packed_es)
    chunk.write_to(sock)

    node.heartbeat(false)
  ensure
    sock.close
  end
end
send_heartbeat_tcp(node) click to toggle source

FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack

# File lib/fluent/plugin/out_forward.rb, line 208
def send_heartbeat_tcp(node)
  sock = connect(node)
  begin
    opt = [1, @send_timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
    opt = [@send_timeout.to_i, 0].pack('L!L!')  # struct timeval
    # don't send any data to not cause a compatibility problem
    #sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)
    #sock.write FORWARD_TCP_HEARTBEAT_DATA
    node.heartbeat(true)
  ensure
    sock.close
  end
end