class Fluent::ForwardOutput
Constants
- FORWARD_HEADER
MessagePack FixArray length = 2
- NodeConfig
Attributes
nodes[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::ObjectBufferedOutput.new
# File lib/fluent/plugin/out_forward.rb, line 22 def initialize super require 'socket' require 'fileutils' require 'fluent/plugin/socket_util' @nodes = [] #=> [Node] end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::BufferedOutput#configure
# File lib/fluent/plugin/out_forward.rb, line 53 def configure(conf) super # backward compatibility if host = conf['host'] log.warn "'host' option in forward output is obsoleted. Use '<server> host xxx </server>' instead." port = conf['port'] port = port ? port.to_i : DEFAULT_LISTEN_PORT e = conf.add_element('server') e['host'] = host e['port'] = port.to_s end recover_sample_size = @recover_wait / @heartbeat_interval conf.elements.each {|e| next if e.name != "server" host = e['host'] port = e['port'] port = port ? port.to_i : DEFAULT_LISTEN_PORT weight = e['weight'] weight = weight ? weight.to_i : 60 standby = !!e['standby'] name = e['name'] unless name name = "#{host}:#{port}" end failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f) node_conf = NodeConfig.new(name, host, port, weight, standby, failure, @phi_threshold, recover_sample_size, @expire_dns_cache, @phi_failure_detector) @nodes << Node.new(log, node_conf) log.info "adding forwarding server '#{name}'", :host=>host, :port=>port, :weight=>weight, :plugin_id=>plugin_id } end
run()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 125 def run @loop.run rescue log.error "unexpected error", :error=>$!.to_s log.error_backtrace end
shutdown()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 117 def shutdown @finished = true @loop.watchers.each {|w| w.detach } @loop.stop @thread.join @usock.close if @usock end
start()
click to toggle source
Calls superclass method
Fluent::BufferedOutput#start
# File lib/fluent/plugin/out_forward.rb, line 94 def start super @rand_seed = Random.new.seed rebuild_weight_array @rr = 0 @loop = Coolio::Loop.new if @heartbeat_type == :udp # assuming all hosts use udp @usock = SocketUtil.create_udp_socket(@nodes.first.host) @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat)) @loop.attach(@hb) end @timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer)) @loop.attach(@timer) @thread = Thread.new(&method(:run)) end
write_objects(tag, chunk)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 132 def write_objects(tag, chunk) return if chunk.empty? error = nil wlen = @weight_array.length wlen.times do @rr = (@rr + 1) % wlen node = @weight_array[@rr] if node.available? begin send_data(node, tag, chunk) return rescue # for load balancing during detecting crashed servers error = $! # use the latest error end end end if error raise error else raise "no nodes are available" # TODO message end end
Private Instance Methods
connect(node)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 260 def connect(node) # TODO unix socket? TCPSocket.new(node.resolved_host, node.port) end
on_heartbeat(sockaddr, msg)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 320 def on_heartbeat(sockaddr, msg) port, host = Socket.unpack_sockaddr_in(sockaddr) if node = @nodes.find {|n| n.sockaddr == sockaddr } #log.trace "heartbeat from '#{node.name}'", :host=>node.host, :port=>node.port if node.heartbeat rebuild_weight_array end end end
on_timer()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 278 def on_timer return if @finished @nodes.each {|n| if n.tick rebuild_weight_array end begin #log.trace "sending heartbeat #{n.host}:#{n.port} on #{@heartbeat_type}" if @heartbeat_type == :tcp send_heartbeat_tcp(n) else @usock.send "\0", 0, Socket.pack_sockaddr_in(n.port, n.resolved_host) end rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNREFUSED # TODO log log.debug "failed to send heartbeat packet to #{n.host}:#{n.port}", :error=>$!.to_s end } end
rebuild_weight_array()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 162 def rebuild_weight_array standby_nodes, regular_nodes = @nodes.partition {|n| n.standby? } lost_weight = 0 regular_nodes.each {|n| unless n.available? lost_weight += n.weight end } log.debug "rebuilding weight array", :lost_weight=>lost_weight if lost_weight > 0 standby_nodes.each {|n| if n.available? regular_nodes << n log.warn "using standby node #{n.host}:#{n.port}", :weight=>n.weight lost_weight -= n.weight break if lost_weight <= 0 end } end weight_array = [] gcd = regular_nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) } regular_nodes.each {|n| (n.weight / gcd).times { weight_array << n } } # for load balancing during detecting crashed servers coe = (regular_nodes.size * 6) / weight_array.size weight_array *= coe if coe > 1 r = Random.new(@rand_seed) weight_array.sort_by! { r.rand } @weight_array = weight_array end
send_data(node, tag, chunk)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 223 def send_data(node, tag, chunk) sock = connect(node) begin opt = [1, @send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) # beginArray(2) sock.write FORWARD_HEADER # writeRaw(tag) sock.write tag.to_msgpack # tag # beginRaw(size) sz = chunk.size #if sz < 32 # # FixRaw # sock.write [0xa0 | sz].pack('C') #elsif sz < 65536 # # raw 16 # sock.write [0xda, sz].pack('Cn') #else # raw 32 sock.write [0xdb, sz].pack('CN') #end # writeRawBody(packed_es) chunk.write_to(sock) node.heartbeat(false) ensure sock.close end end
send_heartbeat_tcp(node)
click to toggle source
FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack
# File lib/fluent/plugin/out_forward.rb, line 208 def send_heartbeat_tcp(node) sock = connect(node) begin opt = [1, @send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval # don't send any data to not cause a compatibility problem #sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) #sock.write FORWARD_TCP_HEARTBEAT_DATA node.heartbeat(true) ensure sock.close end end