class Sidekiq::Cron::Job

Constants

REMEMBER_THRESHOLD

how long we would like to store informations about previous enqueues

Attributes

args[RW]
cron[RW]
description[RW]
klass[RW]
last_enqueue_time[R]
message[RW]
name[RW]

Public Class Methods

all() click to toggle source

get all cron jobs

# File lib/sidekiq/cron/job.rb, line 161
def self.all
  out = []
  Sidekiq.redis do |conn|
    out = conn.smembers(jobs_key).collect do |key|
      if conn.exists key
        Job.new conn.hgetall(key)
      else
        nil
      end
    end
  end
  out.select{|j| !j.nil? }
end
count() click to toggle source
# File lib/sidekiq/cron/job.rb, line 175
def self.count
  out = 0
  Sidekiq.redis do |conn|
    out = conn.scard(jobs_key)
  end
  out
end
create(hash) click to toggle source

create new instance of cron job

# File lib/sidekiq/cron/job.rb, line 197
def self.create hash
  new(hash).save
end
destroy(name) click to toggle source

destroy job by name

# File lib/sidekiq/cron/job.rb, line 202
def self.destroy name
  #if name is hash try to get name from it
  name = name[:name] || name['name'] if name.is_a?(Hash)

  if job = find(name)
    job.destroy
  else
    false
  end
end
destroy_all!() click to toggle source

remove all job from cron

# File lib/sidekiq/cron/job.rb, line 425
def self.destroy_all!
  all.each do |job|
    job.destroy
  end
  logger.info { "Cron Jobs - deleted all jobs" }
end
destroy_removed_jobs(new_job_names) click to toggle source

remove “removed jobs” between current jobs and new jobs

# File lib/sidekiq/cron/job.rb, line 433
def self.destroy_removed_jobs new_job_names
  current_job_names = Sidekiq::Cron::Job.all.map(&:name)
  removed_job_names = current_job_names - new_job_names
  removed_job_names.each { |j| Sidekiq::Cron::Job.destroy(j) }
  removed_job_names
end
exists?(name) click to toggle source
# File lib/sidekiq/cron/job.rb, line 454
def self.exists? name
  out = false
  Sidekiq.redis do |conn|
    out = conn.exists redis_key name
  end
  out
end
find(name) click to toggle source
# File lib/sidekiq/cron/job.rb, line 183
def self.find name
  #if name is hash try to get name from it
  name = name[:name] || name['name'] if name.is_a?(Hash)

  output = nil
  Sidekiq.redis do |conn|
    if exists? name
      output = Job.new conn.hgetall( redis_key(name) )
    end
  end
  output
end
load_from_array(array) click to toggle source

load cron jobs from Array input structure should look like: [

{
  'name'        => 'name_of_job',
  'class'       => 'MyClass',
  'cron'        => '1 * * * *',
  'args'        => '(OPTIONAL) [Array or Hash]',
  'description' => '(OPTIONAL) Description of job'
},
{
  'name'  => 'Cool Job for Second Class',
  'class' => 'SecondClass',
  'cron'  => '*/5 * * * *'
}

]

# File lib/sidekiq/cron/job.rb, line 143
def self.load_from_array array
  errors = {}
  array.each do |job_data|
    job = new(job_data)
    errors[job.name] = job.errors unless job.save
  end
  errors
end
load_from_array!(array) click to toggle source

like to {#load_from_array} If exists old jobs in redis but removed from args, destroy old jobs

# File lib/sidekiq/cron/job.rb, line 154
def self.load_from_array! array
  job_names = array.map { |job| job["name"] }
  destroy_removed_jobs(job_names)
  load_from_array(array)
end
load_from_hash(hash) click to toggle source

load cron jobs from Hash input structure should look like: {

'name_of_job' => {
  'class'       => 'MyClass',
  'cron'        => '1 * * * *',
  'args'        => '(OPTIONAL) [Array or Hash]',
  'description' => '(OPTIONAL) Description of job'
},
'My super iber cool job' => {
  'class' => 'SecondClass',
  'cron'  => '*/5 * * * *'
}

}

# File lib/sidekiq/cron/job.rb, line 111
def self.load_from_hash hash
  array = hash.inject([]) do |out,(key, job)|
    job['name'] = key
    out << job
  end
  load_from_array array
end
load_from_hash!(hash) click to toggle source

like to {#load_from_hash} If exists old jobs in redis but removed from args, destroy old jobs

# File lib/sidekiq/cron/job.rb, line 121
def self.load_from_hash! hash
  destroy_removed_jobs(hash.keys)
  load_from_hash(hash)
end
new(input_args = {}) click to toggle source
# File lib/sidekiq/cron/job.rb, line 216
def initialize input_args = {}
  args = input_args.stringify_keys

  @name = args["name"]
  @cron = args["cron"]
  @description = args["description"] if args["description"]

  #get class from klass or class
  @klass = args["klass"] || args["class"]

  #set status of job
  @status = args['status'] || status_from_redis

  #set last enqueue time - from args or from existing job
  if args['last_enqueue_time'] && !args['last_enqueue_time'].empty?
    @last_enqueue_time = Time.parse(args['last_enqueue_time'])
  else
    @last_enqueue_time = last_enqueue_time_from_redis
  end

  #get right arguments for job
  @args = args["args"].nil? ? [] : parse_args( args["args"] )

  @active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
  @active_job_queue_name_prefix = args["queue_name_prefix"]

  if args["message"]
    @message = args["message"]
    message_data = Sidekiq.load_json(@message) || {}
    @queue = message_data['queue'] || default
  elsif @klass
    message_data = {
      "class" => @klass.to_s,
      "args"  => @args,
    }

    #get right data for message
    #only if message wasn't specified before
    message_data = case @klass
      when Class
        @klass.get_sidekiq_options.merge(message_data)
      when String
        begin
          @klass.constantize.get_sidekiq_options.merge(message_data)
        rescue
          #Unknown class
          message_data.merge("queue"=>"default")
        end

    end

    #override queue if setted in config
    #only if message is hash - can be string (dumped JSON)
    if args['queue']
      @queue = message_data['queue'] = args['queue']
    else
      @queue = message_data['queue'] || default
    end

    #dump message as json
    @message = message_data
  end

end

Private Class Methods

job_enqueued_key(name) click to toggle source

Redis key for storing one cron job run times (when poller added job to queue)

# File lib/sidekiq/cron/job.rb, line 520
def self.job_enqueued_key name
  "cron_job:#{name}:enqueued"
end
jobs_key() click to toggle source

Redis key for set of all cron jobs

# File lib/sidekiq/cron/job.rb, line 504
def self.jobs_key
  "cron_jobs"
end
redis_key(name) click to toggle source

Redis key for storing one cron job

# File lib/sidekiq/cron/job.rb, line 509
def self.redis_key name
  "cron_job:#{name}"
end

Public Instance Methods

active_job_message() click to toggle source

active job has different structure how it is loading data from sidekiq queue, it createaswrapper arround job

# File lib/sidekiq/cron/job.rb, line 74
def active_job_message
  if !"#{@active_job_queue_name_prefix}".empty?
    queue_name = "#{@active_job_queue_name_prefix}_#{@queue}"
  elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_prefix) && !"#{ActiveJob::Base.queue_name_prefix}".empty?
    queue_name = "#{ActiveJob::Base.queue_name_prefix}_#{@queue}"
  else
    queue_name = @queue
  end

  {
    'class'        => 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper',
    'queue'        => queue_name,
    'description'  => @description,
    'args'         => [{
      'job_class'  => @klass,
      'job_id'     => SecureRandom.uuid,
      'queue_name' => queue_name,
      'arguments'  => @args
    }]
  }
end
destroy() click to toggle source

remove job from cron jobs by name input:

first arg: name (string) - name of job (must be same - case sensitive)
# File lib/sidekiq/cron/job.rb, line 410
def destroy
  Sidekiq.redis do |conn|
    #delete from set
    conn.srem self.class.jobs_key, redis_key

    #delete runned timestamps
    conn.del job_enqueued_key

    #delete main job
    conn.del redis_key
  end
  logger.info { "Cron Jobs - deleted job with name: #{@name}" }
end
disable!() click to toggle source
# File lib/sidekiq/cron/job.rb, line 285
def disable!
  @status = "disabled"
  save
end
disabled?() click to toggle source
# File lib/sidekiq/cron/job.rb, line 299
def disabled?
  !enabled?
end
enable!() click to toggle source
# File lib/sidekiq/cron/job.rb, line 290
def enable!
  @status = "enabled"
  save
end
enabled?() click to toggle source
# File lib/sidekiq/cron/job.rb, line 295
def enabled?
  @status == "enabled"
end
enque!(time = Time.now) click to toggle source

enque cron job to queue

# File lib/sidekiq/cron/job.rb, line 47
def enque! time = Time.now
  @last_enqueue_time = time

  klass_const =
      begin
        @klass.to_s.constantize
      rescue NameError
        nil
      end

  if @active_job or defined?(ActiveJob::Base) && klass_const && klass_const < ActiveJob::Base
    Sidekiq::Client.push(active_job_message)
  else
    Sidekiq::Client.push(sidekiq_worker_message)
  end

  save
  logger.debug { "enqueued #{@name}: #{@message}" }
end
errors() click to toggle source
# File lib/sidekiq/cron/job.rb, line 339
def errors
  @errors ||= []
end
exists?() click to toggle source
# File lib/sidekiq/cron/job.rb, line 462
def exists?
  self.class.exists? @name
end
formated_enqueue_time(now = Time.now) click to toggle source
# File lib/sidekiq/cron/job.rb, line 446
def formated_enqueue_time now = Time.now
  last_time(now).getutc.to_f.to_s
end
formated_last_time(now = Time.now) click to toggle source
# File lib/sidekiq/cron/job.rb, line 450
def formated_last_time now = Time.now
  last_time(now).getutc.iso8601
end
klass_valid() click to toggle source
# File lib/sidekiq/cron/job.rb, line 369
def klass_valid
  case @klass
    when Class
      true
    when String
      @klass.size > 0
    else
  end
end
last_enqueue_time_from_redis() click to toggle source
# File lib/sidekiq/cron/job.rb, line 313
def last_enqueue_time_from_redis
  out = nil
  if exists?
    Sidekiq.redis do |conn|
      out = Time.parse(conn.hget(redis_key, "last_enqueue_time")) rescue nil
    end
  end
  out
end
last_time(now = Time.now) click to toggle source

Parse cron specification '* * * * *' and returns time when last run should be performed

# File lib/sidekiq/cron/job.rb, line 442
def last_time now = Time.now
  Rufus::Scheduler::CronLine.new(@cron).previous_time(now)
end
remove_previous_enques(time) click to toggle source

remove previous informations about run times this will clear redis and make sure that redis will not overflow with memory

# File lib/sidekiq/cron/job.rb, line 30
def remove_previous_enques time
  Sidekiq.redis do |conn|
    conn.zremrangebyscore(job_enqueued_key, 0, "(#{(time.to_f - REMEMBER_THRESHOLD).to_s}")
  end
end
save() click to toggle source

add job to cron jobs input:

name: (string) - name of job
cron: (string: '* * * * *' - cron specification when to run job
class: (string|class) - which class to perform

optional input:

queue: (string) - which queue to use for enquing (will override class queue)
args: (array|hash|nil) - arguments for permorm method
# File lib/sidekiq/cron/job.rb, line 388
def save
  #if job is invalid return false
  return false unless valid?

  Sidekiq.redis do |conn|

    #add to set of all jobs
    conn.sadd self.class.jobs_key, redis_key

    #add informations for this job!
    conn.hmset redis_key, *hash_to_redis(to_hash)

    #add information about last time! - don't enque right after scheduler poller starts!
    time = Time.now
    conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s)
  end
  logger.info { "Cron Jobs - add job with name: #{@name}" }
end
should_enque?(time) click to toggle source

crucial part of whole enquing job

# File lib/sidekiq/cron/job.rb, line 16
def should_enque? time
  enqueue = false
  enqueue = Sidekiq.redis do |conn|
    status == "enabled" &&
      not_past_scheduled_time?(time) &&
      not_enqueued_after?(time) &&
      conn.zadd(job_enqueued_key, formated_enqueue_time(time), formated_last_time(time))
  end
  enqueue
end
sidekiq_worker_message() click to toggle source

siodekiq worker message

# File lib/sidekiq/cron/job.rb, line 68
def sidekiq_worker_message
  @message.is_a?(String) ? Sidekiq.load_json(@message) : @message
end
sort_name() click to toggle source
# File lib/sidekiq/cron/job.rb, line 466
def sort_name
  "#{status == "enabled" ? 0 : 1}_#{name}".downcase
end
status() click to toggle source
# File lib/sidekiq/cron/job.rb, line 281
def status
  @status
end
status_from_redis() click to toggle source
# File lib/sidekiq/cron/job.rb, line 303
def status_from_redis
  out = "enabled"
  if exists?
    Sidekiq.redis do |conn|
      out = conn.hget redis_key, "status"
    end
  end
  out
end
test_and_enque_for_time!(time) click to toggle source

test if job should be enqued If yes add it to queue

# File lib/sidekiq/cron/job.rb, line 37
def test_and_enque_for_time! time
  #should this job be enqued?
  if should_enque?(time)
    enque!

    remove_previous_enques(time)
  end
end
to_hash() click to toggle source

export job data to hash

# File lib/sidekiq/cron/job.rb, line 324
def to_hash
  {
    name: @name,
    klass: @klass,
    cron: @cron,
    description: @description,
    args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []),
    message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}),
    status: @status,
    active_job: @active_job,
    queue_name_prefix: @active_job_queue_name_prefix,
    last_enqueue_time: @last_enqueue_time,
  }
end
valid?() click to toggle source
# File lib/sidekiq/cron/job.rb, line 343
def valid?
  #clear previos errors
  @errors = []

  errors << "'name' must be set" if @name.nil? || @name.size == 0
  if @cron.nil? || @cron.size == 0
    errors << "'cron' must be set"
  else
    begin
      cron = Rufus::Scheduler::CronLine.new(@cron)
      cron.next_time(Time.now)
    rescue Exception => e
      #fix for different versions of cron-parser
      if e.message == "Bad Vixie-style specification bad"
        errors << "'cron' -> #{@cron}: not a valid cronline"
      else
        errors << "'cron' -> #{@cron}: #{e.message}"
      end
    end
  end

  errors << "'klass' (or class) must be set" unless klass_valid

  !errors.any?
end

Private Instance Methods

hash_to_redis(hash) click to toggle source

Give Hash returns array for using it for redis.hmset

# File lib/sidekiq/cron/job.rb, line 532
def hash_to_redis hash
  hash.inject([]){ |arr,kv| arr + [kv[0], kv[1]] }
end
job_enqueued_key() click to toggle source

Redis key for storing one cron job run times (when poller added job to queue)

# File lib/sidekiq/cron/job.rb, line 526
def job_enqueued_key
  self.class.job_enqueued_key @name
end
not_enqueued_after?(time) click to toggle source
# File lib/sidekiq/cron/job.rb, line 472
def not_enqueued_after?(time)
  @last_enqueue_time.nil? || @last_enqueue_time < last_time(time)
end
not_past_scheduled_time?(current_time) click to toggle source
# File lib/sidekiq/cron/job.rb, line 497
def not_past_scheduled_time?(current_time)
  last_cron_time = Rufus::Scheduler::CronLine.new(@cron).previous_time(current_time)
  return false if (current_time - last_cron_time) > 60
  true
end
parse_args(args) click to toggle source

Try parsing inbound args into an array. args from Redis will be encoded JSON; try to load JSON, then failover to string array.

# File lib/sidekiq/cron/job.rb, line 480
def parse_args(args)
  case args
  when String
    begin
      Sidekiq.load_json(args)
    rescue JSON::ParserError
      [*args]   # cast to string array
    end
  when Hash
    [args]      # just put hash into array
  when Array
    args        # do nothing, already array
  else
    [*args]     # cast to string array
  end
end
redis_key() click to toggle source

Redis key for storing one cron job

# File lib/sidekiq/cron/job.rb, line 514
def redis_key
  self.class.redis_key @name
end