class Sidekiq::Launcher

The Launcher is a very simple Actor whose job is to start, monitor and stop the core Actors in Sidekiq. If any of these actors die, the Sidekiq process exits immediately.

Constants

JVM_RESERVED_SIGNALS

Attributes

fetcher[RW]
manager[RW]
poller[RW]

Public Class Methods

new(options) click to toggle source
# File lib/sidekiq/launcher.rb, line 17
def initialize(options)
  @manager = Sidekiq::Manager.new(options)
  @poller = Sidekiq::Scheduled::Poller.new
  @done = false
  @options = options
end

Public Instance Methods

quiet() click to toggle source

Stops this instance from processing any more jobs,

# File lib/sidekiq/launcher.rb, line 32
def quiet
  @done = true
  @manager.quiet
  @poller.terminate
end
run() click to toggle source
# File lib/sidekiq/launcher.rb, line 24
def run
  @thread = safe_thread("heartbeat", &method(:start_heartbeat))
  @poller.start
  @manager.start
end
stop() click to toggle source

Shuts down the process. This method does not return until all work is complete and cleaned up. It can take up to the timeout to complete.

# File lib/sidekiq/launcher.rb, line 41
def stop
  deadline = Time.now + @options[:timeout]

  @done = true
  @manager.quiet
  @poller.terminate

  @manager.stop(deadline)

  # Requeue everything in case there was a worker who grabbed work while stopped
  # This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
  strategy = (@options[:fetch] || Sidekiq::BasicFetch)
  strategy.bulk_requeue([], @options)

  clear_heartbeat
end
stopping?() click to toggle source
# File lib/sidekiq/launcher.rb, line 58
def stopping?
  @done
end

Private Instance Methods

clear_heartbeat() click to toggle source
# File lib/sidekiq/launcher.rb, line 145
def clear_heartbeat
  # Remove record from Redis since we are shutting down.
  # Note we don't stop the heartbeat thread; if the process
  # doesn't actually exit, it'll reappear in the Web UI.
  Sidekiq.redis do |conn|
    conn.pipelined do
      conn.srem('processes', identity)
      conn.del("#{identity}:workers")
    end
  end
rescue
  # best effort, ignore network errors
end
heartbeat(k, data, json) click to toggle source
# File lib/sidekiq/launcher.rb, line 66
def heartbeat(k, data, json)
  results = Sidekiq::CLI::PROCTITLES.map {|x| x.(self, data) }
  results.compact!
  $0 = results.join(' ')

  (k, json)
end
start_heartbeat() click to toggle source
# File lib/sidekiq/launcher.rb, line 122
def start_heartbeat
  k = identity
  data = {
    'hostname' => hostname,
    'started_at' => Time.now.to_f,
    'pid' => $$,
    'tag' => @options[:tag] || '',
    'concurrency' => @options[:concurrency],
    'queues' => @options[:queues].uniq,
    'labels' => @options[:labels],
    'identity' => k,
  }
  # this data doesn't change so dump it to a string
  # now so we don't need to dump it every heartbeat.
  json = Sidekiq.dump_json(data)

  while true
    heartbeat(k, data, json)
    sleep 5
  end
  Sidekiq.logger.info("Heartbeat stopping...")
end
(key, json) click to toggle source
# File lib/sidekiq/launcher.rb, line 74
def (key, json)
  fails = procd = 0
  begin
    Processor::FAILURE.update {|curr| fails = curr; 0 }
    Processor::PROCESSED.update {|curr| procd = curr; 0 }

    workers_key = "#{key}:workers".freeze
    nowdate = Time.now.utc.strftime("%Y-%m-%d".freeze)
    Sidekiq.redis do |conn|
      conn.pipelined do
        conn.incrby("stat:processed".freeze, procd)
        conn.incrby("stat:processed:#{nowdate}", procd)
        conn.incrby("stat:failed".freeze, fails)
        conn.incrby("stat:failed:#{nowdate}", fails)
        conn.del(workers_key)
        Processor::WORKER_STATE.each_pair do |tid, hash|
          conn.hset(workers_key, tid, Sidekiq.dump_json(hash))
        end
        conn.expire(workers_key, 60)
      end
    end
    fails = procd = 0

    _, _, _, msg = Sidekiq.redis do |conn|
      conn.pipelined do
        conn.sadd('processes', key)
        conn.hmset(key, 'info', json, 'busy', Processor::WORKER_STATE.size, 'beat', Time.now.to_f, 'quiet', @done)
        conn.expire(key, 60)
        conn.rpop("#{key}-signals")
      end
    end

    return unless msg

    if JVM_RESERVED_SIGNALS.include?(msg)
      Sidekiq::CLI.instance.handle_signal(msg)
    else
      ::Process.kill(msg, $$)
    end
  rescue => e
    # ignore all redis/network issues
    logger.error("heartbeat: #{e.message}")
    # don't lose the counts if there was a network issue
    Processor::PROCESSED.increment(procd)
    Processor::FAILURE.increment(fails)
  end
end