class Sidekiq::Scheduled::Enq
Public Instance Methods
enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
click to toggle source
# File lib/sidekiq/scheduled.rb, line 11 def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS) # A job's "score" in Redis is the time at which it should be processed. # Just check Redis for the set of jobs with a timestamp before now. Sidekiq.redis do |conn| sorted_sets.each do |sorted_set| # Get the next item in the queue if it's score (time to execute) is <= now. # We need to go through the list one at a time to reduce the risk of something # going wrong between the time jobs are popped from the scheduled queue and when # they are pushed onto a work queue and losing the jobs. while job = conn.zrangebyscore(sorted_set, '-inf'.freeze, now, :limit => [0, 1]).first do # Pop item off the queue and add it to the work queue. If the job can't be popped from # the queue, it's because another process already popped it so we can move on to the # next one. if conn.zrem(sorted_set, job) Sidekiq::Client.push(Sidekiq.load_json(job)) Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" } end end end end