Module | Delayed::Backend::Base::ClassMethods |
In: |
lib/delayed/backend/base.rb
|
Hook method that is called after a new worker is forked
# File lib/delayed/backend/base.rb, line 55 55: def after_fork 56: end
Hook method that is called before a new worker is forked
# File lib/delayed/backend/base.rb, line 51 51: def before_fork 52: end
Add a job to the queue
# File lib/delayed/backend/base.rb, line 10 10: def enqueue(*args) 11: options = { 12: :priority => Delayed::Worker.default_priority, 13: :queue => Delayed::Worker.default_queue_name 14: }.merge!(args.extract_options!) 15: 16: options[:payload_object] ||= args.shift 17: 18: if args.size > 0 19: warn "[DEPRECATION] Passing multiple arguments to `#enqueue` is deprecated. Pass a hash with :priority and :run_at." 20: options[:priority] = args.first || options[:priority] 21: options[:run_at] = args[1] 22: end 23: 24: unless options[:payload_object].respond_to?(:perform) 25: raise ArgumentError, 'Cannot enqueue items which do not respond to perform' 26: end 27: 28: if Delayed::Worker.delay_jobs 29: self.new(options).tap do |job| 30: Delayed::Worker.lifecycle.run_callbacks(:enqueue, job) do 31: job.hook(:enqueue) 32: job.save 33: end 34: end 35: else 36: Delayed::Job.new(:payload_object => options[:payload_object]).tap do |job| 37: job.invoke_job 38: end 39: end 40: end
# File lib/delayed/backend/base.rb, line 42 42: def reserve(worker, max_run_time = Worker.max_run_time) 43: # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next. 44: # this leads to a more even distribution of jobs across the worker processes 45: find_available(worker.name, worker.read_ahead, max_run_time).detect do |job| 46: job.lock_exclusively!(max_run_time, worker.name) 47: end 48: end