class Fluent::ForwardOutput

Constants

FORWARD_HEADER

MessagePack FixArray length = 3 (if @extend_internal_protocol)

= 2 (else)
FORWARD_HEADER_EXT
NodeConfig

Attributes

extend_internal_protocol[RW]
nodes[R]

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::ObjectBufferedOutput.new
# File lib/fluent/plugin/out_forward.rb, line 30
def initialize
  super
  require "base64"
  require 'socket'
  require 'fileutils'
  require 'fluent/plugin/socket_util'
  @nodes = []  #=> [Node]
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::BufferedOutput#configure
# File lib/fluent/plugin/out_forward.rb, line 75
def configure(conf)
  super

  # backward compatibility
  if host = conf['host']
    log.warn "'host' option in forward output is obsoleted. Use '<server> host xxx </server>' instead."
    port = conf['port']
    port = port ? port.to_i : DEFAULT_LISTEN_PORT
    e = conf.add_element('server')
    e['host'] = host
    e['port'] = port.to_s
  end

  recover_sample_size = @recover_wait / @heartbeat_interval

  # add options here if any options addes which uses extended protocol
  @extend_internal_protocol = if @require_ack_response
                                true
                              else
                                false
                              end

  if @dns_round_robin
    if @heartbeat_type == :udp
      raise ConfigError, "forward output heartbeat type must be 'tcp' or 'none' to use dns_round_robin option"
    end
  end

  conf.elements.each {|e|
    next if e.name != "server"

    host = e['host']
    port = e['port']
    port = port ? port.to_i : DEFAULT_LISTEN_PORT

    weight = e['weight']
    weight = weight ? weight.to_i : 60

    standby = !!e['standby']

    name = e['name']
    unless name
      name = "#{host}:#{port}"
    end

    failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f)

    node_conf = NodeConfig.new(name, host, port, weight, standby, failure,
      @phi_threshold, recover_sample_size, @expire_dns_cache, @phi_failure_detector, @dns_round_robin)

    if @heartbeat_type == :none
      @nodes << NoneHeartbeatNode.new(log, node_conf)
    else
      @nodes << Node.new(log, node_conf)
    end
    log.info "adding forwarding server '#{name}'", :host=>host, :port=>port, :weight=>weight, :plugin_id=>plugin_id
  }
end
run() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 169
def run
  @loop.run if @loop
rescue
  log.error "unexpected error", :error=>$!.to_s
  log.error_backtrace
end
shutdown() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 159
def shutdown
  @finished = true
  if @loop
    @loop.watchers.each {|w| w.detach }
    @loop.stop
  end
  @thread.join if @thread
  @usock.close if @usock
end
start() click to toggle source
Calls superclass method Fluent::BufferedOutput#start
# File lib/fluent/plugin/out_forward.rb, line 134
def start
  super

  @rand_seed = Random.new.seed
  rebuild_weight_array
  @rr = 0

  unless @heartbeat_type == :none
    @loop = Coolio::Loop.new

    if @heartbeat_type == :udp
      # assuming all hosts use udp
      @usock = SocketUtil.create_udp_socket(@nodes.first.host)
      @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
      @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat))
      @loop.attach(@hb)
    end

    @timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer))
    @loop.attach(@timer)

    @thread = Thread.new(&method(:run))
  end
end
write_objects(tag, chunk) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 176
def write_objects(tag, chunk)
  return if chunk.empty?

  error = nil

  wlen = @weight_array.length
  wlen.times do
    @rr = (@rr + 1) % wlen
    node = @weight_array[@rr]

    if node.available?
      begin
        send_data(node, tag, chunk)
        return
      rescue
        # for load balancing during detecting crashed servers
        error = $!  # use the latest error
      end
    end
  end

  if error
    raise error
  else
    raise "no nodes are available"  # TODO message
  end
end

Private Instance Methods

connect(node) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 354
def connect(node)
  # TODO unix socket?
  TCPSocket.new(node.resolved_host, node.port)
end
forward_header() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 252
def forward_header
  if @extend_internal_protocol
    FORWARD_HEADER_EXT
  else
    FORWARD_HEADER
  end
end
on_heartbeat(sockaddr, msg) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 414
def on_heartbeat(sockaddr, msg)
  port, host = Socket.unpack_sockaddr_in(sockaddr)
  if node = @nodes.find {|n| n.sockaddr == sockaddr }
    #log.trace "heartbeat from '#{node.name}'", :host=>node.host, :port=>node.port
    if node.heartbeat
      rebuild_weight_array
    end
  end
end
on_timer() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 372
def on_timer
  return if @finished
  @nodes.each {|n|
    if n.tick
      rebuild_weight_array
    end
    begin
      #log.trace "sending heartbeat #{n.host}:#{n.port} on #{@heartbeat_type}"
      if @heartbeat_type == :tcp
        send_heartbeat_tcp(n)
      else
        @usock.send "\0", 0, Socket.pack_sockaddr_in(n.port, n.resolved_host)
      end
    rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNREFUSED
      # TODO log
      log.debug "failed to send heartbeat packet to #{n.host}:#{n.port}", :error=>$!.to_s
    end
  }
end
rebuild_weight_array() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 206
def rebuild_weight_array
  standby_nodes, regular_nodes = @nodes.partition {|n|
    n.standby?
  }

  lost_weight = 0
  regular_nodes.each {|n|
    unless n.available?
      lost_weight += n.weight
    end
  }
  log.debug "rebuilding weight array", :lost_weight=>lost_weight

  if lost_weight > 0
    standby_nodes.each {|n|
      if n.available?
        regular_nodes << n
        log.warn "using standby node #{n.host}:#{n.port}", :weight=>n.weight
        lost_weight -= n.weight
        break if lost_weight <= 0
      end
    }
  end

  weight_array = []
  gcd = regular_nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) }
  regular_nodes.each {|n|
    (n.weight / gcd).times {
      weight_array << n
    }
  }

  # for load balancing during detecting crashed servers
  coe = (regular_nodes.size * 6) / weight_array.size
  weight_array *= coe if coe > 1

  r = Random.new(@rand_seed)
  weight_array.sort_by! { r.rand }

  @weight_array = weight_array
end
send_data(node, tag, chunk) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 276
def send_data(node, tag, chunk)
  sock = connect(node)
  begin
    opt = [1, @send_timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)

    opt = [@send_timeout.to_i, 0].pack('L!L!')  # struct timeval
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

    # beginArray(2)
    sock.write forward_header

    # writeRaw(tag)
    sock.write tag.to_msgpack  # tag

    # beginRaw(size)
    sz = chunk.size
    #if sz < 32
    #  # FixRaw
    #  sock.write [0xa0 | sz].pack('C')
    #elsif sz < 65536
    #  # raw 16
    #  sock.write [0xda, sz].pack('Cn')
    #else
    # raw 32
    sock.write [0xdb, sz].pack('CN')
    #end

    # writeRawBody(packed_es)
    chunk.write_to(sock)

    if @extend_internal_protocol
      option = {}
      option['chunk'] = Base64.encode64(chunk.unique_id) if @require_ack_response
      sock.write option.to_msgpack

      if @require_ack_response && @ack_response_timeout > 0
        # Waiting for a response here results in a decrease of throughput because a chunk queue is locked.
        # To avoid a decrease of troughput, it is necessary to prepare a list of chunks that wait for responses
        # and process them asynchronously.
        if IO.select([sock], nil, nil, @ack_response_timeout)
          raw_data = sock.recv(1024)

          # When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF.
          # If this happens we assume the data wasn't delivered and retry it.
          if raw_data.empty?
            @log.warn "node #{node.host}:#{node.port} closed the connection. regard it as unavailable."
            node.disable!
            raise ForwardOutputConnectionClosedError, "node #{node.host}:#{node.port} closed connection"
          else
            # Serialization type of the response is same as sent data.
            res = MessagePack.unpack(raw_data)

            if res['ack'] != option['chunk']
              # Some errors may have occured when ack and chunk id is different, so send the chunk again.
              raise ForwardOutputResponseError, "ack in response and chunk id in sent data are different"
            end
          end

        else
          # IO.select returns nil on timeout.
          # There are 2 types of cases when no response has been received:
          # (1) the node does not support sending responses
          # (2) the node does support sending response but responses have not arrived for some reasons.
          @log.warn "no response from #{node.host}:#{node.port}. regard it as unavailable."
          node.disable!
          raise ForwardOutputACKTimeoutError, "node #{node.host}:#{node.port} does not return ACK"
        end
      end
    end

    node.heartbeat(false)
    return res  # for test
  ensure
    sock.close
  end
end
send_heartbeat_tcp(node) click to toggle source

FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack

# File lib/fluent/plugin/out_forward.rb, line 261
def send_heartbeat_tcp(node)
  sock = connect(node)
  begin
    opt = [1, @send_timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
    opt = [@send_timeout.to_i, 0].pack('L!L!')  # struct timeval
    # don't send any data to not cause a compatibility problem
    #sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)
    #sock.write FORWARD_TCP_HEARTBEAT_DATA
    node.heartbeat(true)
  ensure
    sock.close
  end
end