class Sidekiq::Launcher
The Launcher is a very simple Actor whose job is to start, monitor and stop the core Actors in Sidekiq. If any of these actors die, the Sidekiq process exits immediately.
Constants
- JVM_RESERVED_SIGNALS
Attributes
fetcher[RW]
manager[RW]
poller[RW]
Public Class Methods
new(options)
click to toggle source
# File lib/sidekiq/launcher.rb, line 17 def initialize(options) @manager = Sidekiq::Manager.new(options) @poller = Sidekiq::Scheduled::Poller.new @done = false @options = options end
Public Instance Methods
quiet()
click to toggle source
Stops this instance from processing any more jobs,
# File lib/sidekiq/launcher.rb, line 32 def quiet @done = true @manager.quiet @poller.terminate end
run()
click to toggle source
# File lib/sidekiq/launcher.rb, line 24 def run @thread = safe_thread("heartbeat", &method(:start_heartbeat)) @poller.start @manager.start end
stop()
click to toggle source
Shuts down the process. This method does not return until all work is complete and cleaned up. It can take up to the timeout to complete.
# File lib/sidekiq/launcher.rb, line 41 def stop deadline = Time.now + @options[:timeout] @done = true @manager.quiet @poller.terminate @manager.stop(deadline) # Requeue everything in case there was a worker who grabbed work while stopped # This call is a no-op in Sidekiq but necessary for Sidekiq Pro. strategy = (@options[:fetch] || Sidekiq::BasicFetch) strategy.bulk_requeue([], @options) clear_heartbeat end
stopping?()
click to toggle source
# File lib/sidekiq/launcher.rb, line 58 def stopping? @done end
Private Instance Methods
clear_heartbeat()
click to toggle source
# File lib/sidekiq/launcher.rb, line 145 def clear_heartbeat # Remove record from Redis since we are shutting down. # Note we don't stop the heartbeat thread; if the process # doesn't actually exit, it'll reappear in the Web UI. Sidekiq.redis do |conn| conn.pipelined do conn.srem('processes', identity) conn.del("#{identity}:workers") end end rescue # best effort, ignore network errors end
heartbeat(k, data, json)
click to toggle source
# File lib/sidekiq/launcher.rb, line 66 def heartbeat(k, data, json) results = Sidekiq::CLI::PROCTITLES.map {|x| x.(self, data) } results.compact! $0 = results.join(' ') ❤(k, json) end
start_heartbeat()
click to toggle source
# File lib/sidekiq/launcher.rb, line 122 def start_heartbeat k = identity data = { 'hostname' => hostname, 'started_at' => Time.now.to_f, 'pid' => $$, 'tag' => @options[:tag] || '', 'concurrency' => @options[:concurrency], 'queues' => @options[:queues].uniq, 'labels' => @options[:labels], 'identity' => k, } # this data doesn't change so dump it to a string # now so we don't need to dump it every heartbeat. json = Sidekiq.dump_json(data) while true heartbeat(k, data, json) sleep 5 end Sidekiq.logger.info("Heartbeat stopping...") end
❤(key, json)
click to toggle source
# File lib/sidekiq/launcher.rb, line 74 def ❤(key, json) fails = procd = 0 begin Processor::FAILURE.update {|curr| fails = curr; 0 } Processor::PROCESSED.update {|curr| procd = curr; 0 } workers_key = "#{key}:workers".freeze nowdate = Time.now.utc.strftime("%Y-%m-%d".freeze) Sidekiq.redis do |conn| conn.pipelined do conn.incrby("stat:processed".freeze, procd) conn.incrby("stat:processed:#{nowdate}", procd) conn.incrby("stat:failed".freeze, fails) conn.incrby("stat:failed:#{nowdate}", fails) conn.del(workers_key) Processor::WORKER_STATE.each_pair do |tid, hash| conn.hset(workers_key, tid, Sidekiq.dump_json(hash)) end conn.expire(workers_key, 60) end end fails = procd = 0 _, _, _, msg = Sidekiq.redis do |conn| conn.pipelined do conn.sadd('processes', key) conn.hmset(key, 'info', json, 'busy', Processor::WORKER_STATE.size, 'beat', Time.now.to_f, 'quiet', @done) conn.expire(key, 60) conn.rpop("#{key}-signals") end end return unless msg if JVM_RESERVED_SIGNALS.include?(msg) Sidekiq::CLI.instance.handle_signal(msg) else ::Process.kill(msg, $$) end rescue => e # ignore all redis/network issues logger.error("heartbeat: #{e.message}") # don't lose the counts if there was a network issue Processor::PROCESSED.increment(procd) Processor::FAILURE.increment(fails) end end