class Delayed::Worker

Public Class Methods

after_fork() click to toggle source
# File lib/delayed/worker.rb, line 94
def self.after_fork
  # Re-open file handles
  @files_to_reopen.each do |file|
    begin
      file.reopen file.path, 'a+'
      file.sync = true
    rescue ::Exception # rubocop:disable HandleExceptions, RescueException
    end
  end
  backend.after_fork
end
backend=(backend) click to toggle source
# File lib/delayed/worker.rb, line 64
def self.backend=(backend)
  if backend.is_a? Symbol
    require "delayed/serialization/#{backend}"
    require "delayed/backend/#{backend}"
    backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
  end
  @@backend = backend # rubocop:disable ClassVars
  silence_warnings { ::Delayed.const_set(:Job, backend) }
end
before_fork() click to toggle source
# File lib/delayed/worker.rb, line 83
def self.before_fork
  unless @files_to_reopen
    @files_to_reopen = []
    ObjectSpace.each_object(File) do |file|
      @files_to_reopen << file unless file.closed?
    end
  end

  backend.before_fork
end
delay_job?(job) click to toggle source
# File lib/delayed/worker.rb, line 122
def self.delay_job?(job)
  if delay_jobs.is_a?(Proc)
    delay_jobs.arity == 1 ? delay_jobs.call(job) : delay_jobs.call
  else
    delay_jobs
  end
end
guess_backend() click to toggle source
# File lib/delayed/worker.rb, line 79
def self.guess_backend
  warn '[DEPRECATION] guess_backend is deprecated. Please remove it from your code.'
end
lifecycle() click to toggle source
# File lib/delayed/worker.rb, line 106
def self.lifecycle
  # In case a worker has not been set up, job enqueueing needs a lifecycle.
  setup_lifecycle unless @lifecycle

  @lifecycle
end
new(options = {}) click to toggle source
# File lib/delayed/worker.rb, line 130
def initialize(options = {})
  @quiet = options.key?(:quiet) ? options[:quiet] : true
  @failed_reserve_count = 0

  [:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues, :exit_on_complete].each do |option|
    self.class.send("#{option}=", options[option]) if options.key?(option)
  end

  # Reset lifecycle on the offhand chance that something lazily
  # triggered its creation before all plugins had been registered.
  self.class.setup_lifecycle
end
queue_attributes=(val) click to toggle source

rubocop:disable ClassVars

# File lib/delayed/worker.rb, line 75
def self.queue_attributes=(val)
  @@queue_attributes = val.with_indifferent_access
end
reload_app?() click to toggle source
# File lib/delayed/worker.rb, line 118
def self.reload_app?
  defined?(ActionDispatch::Reloader) && Rails.application.config.cache_classes == false
end
reset() click to toggle source
# File lib/delayed/worker.rb, line 35
def self.reset
  self.default_log_level = DEFAULT_LOG_LEVEL
  self.sleep_delay       = DEFAULT_SLEEP_DELAY
  self.max_attempts      = DEFAULT_MAX_ATTEMPTS
  self.max_run_time      = DEFAULT_MAX_RUN_TIME
  self.default_priority  = DEFAULT_DEFAULT_PRIORITY
  self.delay_jobs        = DEFAULT_DELAY_JOBS
  self.queues            = DEFAULT_QUEUES
  self.queue_attributes  = DEFAULT_QUEUE_ATTRIBUTES
  self.read_ahead        = DEFAULT_READ_AHEAD
  @lifecycle             = nil
end
setup_lifecycle() click to toggle source
# File lib/delayed/worker.rb, line 113
def self.setup_lifecycle
  @lifecycle = Delayed::Lifecycle.new
  plugins.each { |klass| klass.new }
end

Public Instance Methods

failed(job) click to toggle source
# File lib/delayed/worker.rb, line 257
def failed(job)
  self.class.lifecycle.run_callbacks(:failure, self, job) do
    begin
      job.hook(:failure)
    rescue => error
      say "Error when running failure callback: #{error}", 'error'
      say error.backtrace.join("\n"), 'error'
    ensure
      job.destroy_failed_jobs? ? job.destroy : job.fail!
    end
  end
end
job_say(job, text, level = default_log_level) click to toggle source
# File lib/delayed/worker.rb, line 270
def job_say(job, text, level = default_log_level)
  text = "Job #{job.name} (id=#{job.id}) #{text}"
  say text, level
end
max_attempts(job) click to toggle source
# File lib/delayed/worker.rb, line 286
def max_attempts(job)
  job.max_attempts || self.class.max_attempts
end
max_run_time(job) click to toggle source
# File lib/delayed/worker.rb, line 290
def max_run_time(job)
  job.max_run_time || self.class.max_run_time
end
name() click to toggle source

Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker restarts: Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.

# File lib/delayed/worker.rb, line 147
def name
  return @name unless @name.nil?
  "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
end
reschedule(job, time = nil) click to toggle source

Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.

# File lib/delayed/worker.rb, line 245
def reschedule(job, time = nil)
  if (job.attempts += 1) < max_attempts(job)
    time ||= job.reschedule_at
    job.run_at = time
    job.unlock
    job.save!
  else
    job_say job, "REMOVED permanently because of #{job.attempts} consecutive failures", 'error'
    failed(job)
  end
end
run(job) click to toggle source
# File lib/delayed/worker.rb, line 227
def run(job)
  job_say job, 'RUNNING'
  runtime = Benchmark.realtime do
    Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) { job.invoke_job }
    job.destroy
  end
  job_say job, format('COMPLETED after %.4f', runtime)
  return true # did work
rescue DeserializationError => error
  job.error = error
  failed(job)
rescue Exception => error # rubocop:disable RescueException
  self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, error) }
  return false # work failed
end
say(text, level = default_log_level) click to toggle source
# File lib/delayed/worker.rb, line 275
def say(text, level = default_log_level)
  text = "[Worker(#{name})] #{text}"
  puts text unless @quiet
  return unless logger
  # TODO: Deprecate use of Fixnum log levels
  unless level.is_a?(String)
    level = Logger::Severity.constants.detect { |i| Logger::Severity.const_get(i) == level }.to_s.downcase
  end
  logger.send(level, "#{Time.now.strftime('%FT%T%z')}: #{text}")
end
start() click to toggle source
# File lib/delayed/worker.rb, line 156
def start # rubocop:disable CyclomaticComplexity, PerceivedComplexity
  trap('TERM') do
    Thread.new { say 'Exiting...' }
    stop
    raise SignalException, 'TERM' if self.class.raise_signal_exceptions
  end

  trap('INT') do
    Thread.new { say 'Exiting...' }
    stop
    raise SignalException, 'INT' if self.class.raise_signal_exceptions && self.class.raise_signal_exceptions != :term
  end

  say 'Starting job worker'

  self.class.lifecycle.run_callbacks(:execute, self) do
    loop do
      self.class.lifecycle.run_callbacks(:loop, self) do
        @realtime = Benchmark.realtime do
          @result = work_off
        end
      end

      count = @result[0] + @result[1]

      if count.zero?
        if self.class.exit_on_complete
          say 'No more jobs available. Exiting'
          break
        elsif !stop?
          sleep(self.class.sleep_delay)
          reload!
        end
      else
        say format("#{count} jobs processed at %.4f j/s, %d failed", count / @realtime, @result.last)
      end

      break if stop?
    end
  end
end
stop() click to toggle source
# File lib/delayed/worker.rb, line 198
def stop
  @exit = true
end
stop?() click to toggle source
# File lib/delayed/worker.rb, line 202
def stop?
  !!@exit
end
work_off(num = 100) click to toggle source

Do num jobs and return stats on success/failure. Exit early if interrupted.

# File lib/delayed/worker.rb, line 208
def work_off(num = 100)
  success = 0
  failure = 0

  num.times do
    case reserve_and_run_one_job
    when true
      success += 1
    when false
      failure += 1
    else
      break # leave if no work could be done
    end
    break if stop? # leave if we're exiting
  end

  [success, failure]
end

Protected Instance Methods

handle_failed_job(job, error) click to toggle source
# File lib/delayed/worker.rb, line 296
def handle_failed_job(job, error)
  job.error = error
  job_say job, "FAILED (#{job.attempts} prior attempts) with #{error.class.name}: #{error.message}", 'error'
  reschedule(job)
end
reload!() click to toggle source
# File lib/delayed/worker.rb, line 321
def reload!
  return unless self.class.reload_app?
  ActionDispatch::Reloader.cleanup!
  ActionDispatch::Reloader.prepare!
end
reserve_and_run_one_job() click to toggle source

Run the next job we can get an exclusive lock on. If no jobs are left we return nil

# File lib/delayed/worker.rb, line 304
def reserve_and_run_one_job
  job = reserve_job
  self.class.lifecycle.run_callbacks(:perform, self, job) { run(job) } if job
end
reserve_job() click to toggle source
# File lib/delayed/worker.rb, line 309
def reserve_job
  job = Delayed::Job.reserve(self)
  @failed_reserve_count = 0
  job
rescue ::Exception => error # rubocop:disable RescueException
  say "Error while reserving job: #{error}"
  Delayed::Job.recover_from(error)
  @failed_reserve_count += 1
  raise FatalBackendError if @failed_reserve_count >= 10
  nil
end