class Fluent::HttpInput

Constants

EMPTY_GIF_IMAGE
EVENT_RECORD_PARAMETER

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::DetachMultiProcessMixin.new
# File lib/fluent/plugin/in_http.rb, line 25
def initialize
  require 'webrick/httputils'
  require 'uri'
  super
end

Public Instance Methods

configure(conf) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 45
def configure(conf)
  super

  m = if @format == 'default'
        method(:parse_params_default)
      else
        @parser = Plugin.new_parser(@format)
        @parser.configure(conf)
        method(:parse_params_with_parser)
      end
  (class << self; self; end).module_eval do
    define_method(:parse_params, m)
  end
end
on_request(path_info, params) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 119
def on_request(path_info, params)
  begin
    path = path_info[1..-1]  # remove /
    tag = path.split('/').join('.')
    record_time, record = parse_params(params)

    # Skip nil record
    if record.nil?
      if @respond_with_empty_img
        return ["200 OK", {'Content-type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE]
      else
        return ["200 OK", {'Content-type'=>'text/plain'}, ""]
      end
    end

    if @add_http_headers
      params.each_pair { |k,v|
        if k.start_with?("HTTP_")
          record[k] = v
        end
      }
    end

    if @add_remote_addr
      record['REMOTE_ADDR'] = params['REMOTE_ADDR']
    end

    time = if param_time = params['time']
             param_time = param_time.to_i
             param_time.zero? ? Engine.now : param_time
           else
             record_time.nil? ? Engine.now : record_time
           end
  rescue
    return ["400 Bad Request", {'Content-type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"]
  end

  # TODO server error
  begin
    # Support batched requests
    if record.is_a?(Array)
      mes = MultiEventStream.new
      record.each do |single_record|
        single_time = single_record.delete("time") || time
        mes.add(single_time, single_record)
      end
      router.emit_stream(tag, mes)
    else
      router.emit(tag, time, record)
    end
  rescue
    return ["500 Internal Server Error", {'Content-type'=>'text/plain'}, "500 Internal Server Error\n#{$!}\n"]
  end

  if @respond_with_empty_img
    return ["200 OK", {'Content-type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE]
  else
    return ["200 OK", {'Content-type'=>'text/plain'}, ""]
  end
end
run() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 112
def run
  @loop.run(@blocking_timeout)
rescue
  log.error "unexpected error", :error=>$!.to_s
  log.error_backtrace
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 105
def shutdown
  @loop.watchers.each {|w| w.detach }
  @loop.stop
  @lsock.close
  @thread.join
end
start() click to toggle source
Calls superclass method Fluent::Input#start
# File lib/fluent/plugin/in_http.rb, line 84
def start
  log.debug "listening http on #{@bind}:#{@port}"
  lsock = TCPServer.new(@bind, @port)

  detach_multi_process do
    super
    @km = KeepaliveManager.new(@keepalive_timeout)
    #@lsock = Coolio::TCPServer.new(@bind, @port, Handler, @km, method(:on_request), @body_size_limit)
    @lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request),
                                   @body_size_limit, @format, log,
                                   @cors_allow_origins)
    @lsock.listen(@backlog) unless @backlog.nil?

    @loop = Coolio::Loop.new
    @loop.attach(@km)
    @loop.attach(@lsock)

    @thread = Thread.new(&method(:run))
  end
end

Private Instance Methods

parse_params_default(params) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 182
def parse_params_default(params)
  record = if msgpack = params['msgpack']
             MessagePack.unpack(msgpack)
           elsif js = params['json']
             JSON.parse(js)
           else
             raise "'json' or 'msgpack' parameter is required"
           end
  return nil, record
end
parse_params_with_parser(params) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 195
def parse_params_with_parser(params)
  if content = params[EVENT_RECORD_PARAMETER]
    @parser.parse(content) { |time, record|
      raise "Received event is not #{@format}: #{content}" if record.nil?
      return time, record
    }
  else
    raise "'#{EVENT_RECORD_PARAMETER}' parameter is required"
  end
end