class Fluent::RPC::Server

Public Class Methods

new(endpoint, log) click to toggle source
# File lib/fluent/rpc.rb, line 22
def initialize(endpoint, log)
  bind, port = endpoint.split(':')
  @bind = bind
  @port = port
  @log = log

  @server = WEBrick::HTTPServer.new(
    :BindAddress => @bind,
    :Port => @port,
    :Logger => WEBrick::Log.new(STDERR, WEBrick::Log::FATAL),
    :AccessLog => [],
  )
end

Public Instance Methods

mount(path, servlet, *args) click to toggle source
# File lib/fluent/rpc.rb, line 36
def mount(path, servlet, *args)
  @server.mount(path, servlet, *args)
  @log.debug "register #{path} RPC servlet"
end
mount_proc(path, &block) click to toggle source
# File lib/fluent/rpc.rb, line 41
def mount_proc(path, &block)
  @server.mount_proc(path) { |req, res|
    begin
      code, header, body = block.call(req, res)
    rescue => e
      @log.warn "failed to handle RPC request", :path => path, :error => e.to_s
      @log.warn_backtrace e.backtrace

      code = 500
      body = {
        'message '=> 'Internal Server Error',
        'error' => "#{e}",
        'backtrace'=> e.backtrace,
      }
    end

    code = 200 if code.nil?
    header = {'Content-Type' => 'application/json'} if header.nil?
    body = if body.nil?
             '{"ok":true}'
           else
             body['ok'] = code == 200
             body.to_json
           end

    res.status = code
    header.each_pair { |k, v|
      res[k] = v
    }
    res.body = body
  }
  @log.debug "register #{path} RPC handler"
end
shutdown() click to toggle source
# File lib/fluent/rpc.rb, line 82
def shutdown
  if @server
    @server.shutdown
    @server = nil
  end
  if @thread
    @thread.join
    @thread = nil
  end
end
start() click to toggle source
# File lib/fluent/rpc.rb, line 75
def start
  @log.debug "listening RPC http server on http://#{@bind}:#{@port}/"
  @thread = Thread.new {
    @server.start
  }
end