class Fluent::ForwardOutput::Node

Attributes

available[R]
conf[R]
failure[R]
host[R]
name[R]
port[R]
sockaddr[R]
weight[R]

Public Class Methods

new(log, conf) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 428
def initialize(log, conf)
  @log = log
  @conf = conf
  @name = @conf.name
  @host = @conf.host
  @port = @conf.port
  @weight = @conf.weight
  @failure = @conf.failure
  @available = true

  @resolved_host = nil
  @resolved_time = 0
  resolved_host  # check dns
end

Public Instance Methods

available?() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 448
def available?
  @available
end
disable!() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 452
def disable!
  @available = false
end
heartbeat(detect=true) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 520
def heartbeat(detect=true)
  now = Time.now.to_f
  @failure.add(now)
  #@log.trace "heartbeat from '#{@name}'", :host=>@host, :port=>@port, :available=>@available, :sample_size=>@failure.sample_size
  if detect && !@available && @failure.sample_size > @conf.recover_sample_size
    @available = true
    @log.warn "recovered forwarding server '#{@name}'", :host=>@host, :port=>@port
    return true
  else
    return nil
  end
end
resolved_host() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 460
def resolved_host
  case @conf.expire_dns_cache
  when 0
    # cache is disabled
    return resolve_dns!

  when nil
    # persistent cache
    return @resolved_host ||= resolve_dns!

  else
    now = Engine.now
    rh = @resolved_host
    if !rh || now - @resolved_time >= @conf.expire_dns_cache
      rh = @resolved_host = resolve_dns!
      @resolved_time = now
    end
    return rh
  end
end
standby?() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 456
def standby?
  @conf.standby
end
tick() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 489
def tick
  now = Time.now.to_f
  if !@available
    if @failure.hard_timeout?(now)
      @failure.clear
    end
    return nil
  end

  if @failure.hard_timeout?(now)
    @log.warn "detached forwarding server '#{@name}'", :host=>@host, :port=>@port, :hard_timeout=>true
    @available = false
    @resolved_host = nil  # expire cached host
    @failure.clear
    return true
  end

  if @conf.phi_failure_detector
    phi = @failure.phi(now)
    #$log.trace "phi '#{@name}'", :host=>@host, :port=>@port, :phi=>phi
    if phi > @conf.phi_threshold
      @log.warn "detached forwarding server '#{@name}'", :host=>@host, :port=>@port, :phi=>phi
      @available = false
      @resolved_host = nil  # expire cached host
      @failure.clear
      return true
    end
  end
  return false
end
to_msgpack(out = '') click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 533
def to_msgpack(out = '')
  [@host, @port, @weight, @available].to_msgpack(out)
end

Private Instance Methods

resolve_dns!() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 481
def resolve_dns!
  addrinfo_list = Socket.getaddrinfo(@host, @port, nil, Socket::SOCK_STREAM)
  addrinfo = @conf.dns_round_robin ? addrinfo_list.sample : addrinfo_list.first
  @sockaddr = Socket.pack_sockaddr_in(addrinfo[1], addrinfo[3]) # used by on_heartbeat
  addrinfo[3]
end