class Rufus::Scheduler
Constants
- DURATIONS
- DURATIONS2
- DURATIONS2M
- DURATION_LETTERS
- DU_KEYS
- MAX_WORK_THREADS
MIN_WORK_THREADS = 3
- TIMEZONES
- TIMEZONEs
- VERSION
Attributes
attr_accessor :min_work_threads
Public Class Methods
Produces a hour/min/sec/milli string representation of Time instance
# File lib/rufus/scheduler/util.rb, line 277 def self.h_to_s(t=Time.now) "#{t.strftime('%H:%M:%S')}.#{sprintf('%06d', t.usec)}" end
# File lib/rufus/scheduler.rb, line 76 def initialize(opts={}) @opts = opts @started_at = nil @paused = false @jobs = JobArray.new @frequency = Rufus::Scheduler.parse(opts[:frequency] || 0.300) @mutexes = {} @work_queue = Queue.new #@min_work_threads = opts[:min_work_threads] || MIN_WORK_THREADS @max_work_threads = opts[:max_work_threads] || MAX_WORK_THREADS @stderr = $stderr @thread_key = "rufus_scheduler_#{self.object_id}" @scheduler_lock = if lockfile = opts[:lockfile] Rufus::Scheduler::FileLock.new(lockfile) else opts[:scheduler_lock] || Rufus::Scheduler::NullLock.new end @trigger_lock = opts[:trigger_lock] || Rufus::Scheduler::NullLock.new # If we can't grab the @scheduler_lock, don't run. @scheduler_lock.lock || return start end
# File lib/rufus/scheduler/util.rb, line 34 def self.parse(o, opts={}) opts[:no_error] = true parse_cron(o, opts) || parse_in(o, opts) || # covers 'every' schedule strings parse_at(o, opts) || fail(ArgumentError.new("couldn't parse \"#{o}\"")) end
# File lib/rufus/scheduler/util.rb, line 49 def self.parse_at(o, opts={}) Rufus::Scheduler::ZoTime.parse(o, opts).time rescue StandardError => se return nil if opts[:no_error] fail se end
# File lib/rufus/scheduler/util.rb, line 59 def self.parse_cron(o, opts) CronLine.new(o) rescue ArgumentError => ae return nil if opts[:no_error] fail ae end
Turns a string like '1m10s' into a float like '70.0', more formally, turns a time duration expressed as a string into a Float instance (millisecond count).
w -> week d -> day h -> hour m -> minute s -> second M -> month y -> year 'nada' -> millisecond
Some examples:
Rufus::Scheduler.parse_duration "0.5" # => 0.5 Rufus::Scheduler.parse_duration "500" # => 0.5 Rufus::Scheduler.parse_duration "1000" # => 1.0 Rufus::Scheduler.parse_duration "1h" # => 3600.0 Rufus::Scheduler.parse_duration "1h10s" # => 3610.0 Rufus::Scheduler.parse_duration "1w2d" # => 777600.0
Negative time strings are OK (Thanks Danny Fullerton):
Rufus::Scheduler.parse_duration "-0.5" # => -0.5 Rufus::Scheduler.parse_duration "-1h" # => -3600.0
# File lib/rufus/scheduler/util.rb, line 126 def self.parse_duration(string, opts={}) s = string.to_s.strip mod = s[0, 1] == '-' ? -1 : 1 s = s[1..-1] if mod == -1 ss = mod < 0 ? '-' : '' r = 0.0 s.scan(/(\d*\.\d+|\d+\.?)([#{DURATION_LETTERS}]?)/) do |f, d| ss += "#{f}#{d}" r += f.to_f * (DURATIONS[d] || 1.0) end if ss == '-' || ss != string.to_s.strip return nil if opts[:no_error] fail ArgumentError.new("invalid time duration #{string.inspect}") end mod * r end
-
for compatibility with rufus-scheduler 2.x
+
# File lib/rufus/scheduler/util.rb, line 44 def self.parse_in(o, opts={}) o.is_a?(String) ? parse_duration(o, opts) : o end
# File lib/rufus/scheduler/util.rb, line 69 def self.parse_to_time(o) t = o t = parse(t) if t.is_a?(String) t = Time.now + t if t.is_a?(Numeric) fail ArgumentError.new( "cannot turn #{o.inspect} to a point in time, doesn't make sense" ) unless t.is_a?(Time) t end
Alias for ::singleton
# File lib/rufus/scheduler.rb, line 121 def self.s(opts={}); singleton(opts); end
Returns a singleton Rufus::Scheduler instance
# File lib/rufus/scheduler.rb, line 114 def self.singleton(opts={}) @singleton ||= Rufus::Scheduler.new(opts) end
Releasing the gem would probably require redirecting .start_new to .new and emit a simple deprecation message.
For now, let's assume the people pointing at rufus-scheduler/master on GitHub know what they do…
# File lib/rufus/scheduler.rb, line 129 def self.start_new fail "this is rufus-scheduler 3.0, use .new instead of .start_new" end
Turns a number of seconds into a a time string
Rufus.to_duration 0 # => '0s' Rufus.to_duration 60 # => '1m' Rufus.to_duration 3661 # => '1h1m1s' Rufus.to_duration 7 * 24 * 3600 # => '1w' Rufus.to_duration 30 * 24 * 3600 + 1 # => "4w2d1s"
It goes from seconds to the year. Months are not counted (as they are of variable length). Weeks are counted.
For 30 days months to be counted, the second parameter of this method can be set to true.
Rufus.to_duration 30 * 24 * 3600 + 1, true # => "1M1s"
If a Float value is passed, milliseconds will be displayed without 'marker'
Rufus.to_duration 0.051 # => "51" Rufus.to_duration 7.051 # => "7s51" Rufus.to_duration 0.120 + 30 * 24 * 3600 + 1 # => "4w2d1s120"
(this behaviour mirrors the one found for ::parse_time_string()).
Options are :
-
:months, if set to true, months (M) of 30 days will be taken into account when building up the result
-
:drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result
# File lib/rufus/scheduler/util.rb, line 189 def self.to_duration(seconds, options={}) h = to_duration_hash(seconds, options) return (options[:drop_seconds] ? '0m' : '0s') if h.empty? s = DU_KEYS.inject('') { |r, key| count = h[key] count = nil if count == 0 r << "#{count}#{key}" if count r } ms = h[:ms] s << ms.to_s if ms s end
Turns a number of seconds (integer or Float) into a hash like in :
Rufus.to_duration_hash 0.051 # => { :ms => "51" } Rufus.to_duration_hash 7.051 # => { :s => 7, :ms => "51" } Rufus.to_duration_hash 0.120 + 30 * 24 * 3600 + 1 # => { :w => 4, :d => 2, :s => 1, :ms => "120" }
This method is used by ::to_duration behind the scenes.
Options are :
-
:months, if set to true, months (M) of 30 days will be taken into account when building up the result
-
:drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result
# File lib/rufus/scheduler/util.rb, line 235 def self.to_duration_hash(seconds, options={}) h = {} if seconds.is_a?(Float) h[:ms] = (seconds % 1 * 1000).to_i seconds = seconds.to_i end if options[:drop_seconds] h.delete(:ms) seconds = (seconds - seconds % 60) end durations = options[:months] ? DURATIONS2M : DURATIONS2 durations.each do |key, duration| count = seconds / duration seconds = seconds % duration h[key.to_sym] = count if count > 0 end h end
-
for compatibility with rufus-scheduler 2.x
+
Produces the UTC string representation of a Time instance
like “2009/11/23 11:11:50.947109 UTC”
# File lib/rufus/scheduler/util.rb, line 270 def self.utc_to_s(t=Time.now) "#{t.utc.strftime('%Y-%m-%d %H:%M:%S')}.#{sprintf('%06d', t.usec)} UTC" end
Public Instance Methods
# File lib/rufus/scheduler.rb, line 203 def at(time, callable=nil, opts={}, &block) do_schedule(:once, time, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 308 def at_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::AtJob) } end
Callback called when a job is triggered. If the lock cannot be acquired, the job won't run (though it'll still be scheduled to run again if necessary).
# File lib/rufus/scheduler.rb, line 376 def confirm_lock @trigger_lock.lock end
# File lib/rufus/scheduler.rb, line 243 def cron(cronline, callable=nil, opts={}, &block) do_schedule(:cron, cronline, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 328 def cron_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::CronJob) } end
# File lib/rufus/scheduler.rb, line 174 def down? ! @started_at end
# File lib/rufus/scheduler.rb, line 223 def every(duration, callable=nil, opts={}, &block) do_schedule(:every, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 318 def every_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::EveryJob) } end
# File lib/rufus/scheduler.rb, line 213 def in(duration, callable=nil, opts={}, &block) do_schedule(:once, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 313 def in_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::InJob) } end
# File lib/rufus/scheduler.rb, line 233 def interval(duration, callable=nil, opts={}, &block) do_schedule(:interval, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 323 def interval_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::IntervalJob) } end
# File lib/rufus/scheduler.rb, line 333 def job(job_id) @jobs[job_id] end
Returns all the scheduled jobs (even those right before re-schedule).
# File lib/rufus/scheduler.rb, line 290 def jobs(opts={}) opts = { opts => true } if opts.is_a?(Symbol) jobs = @jobs.to_a if opts[:running] jobs = jobs.select { |j| j.running? } elsif ! opts[:all] jobs = jobs.reject { |j| j.next_time.nil? || j.unscheduled_at } end tags = Array(opts[:tag] || opts[:tags]).collect { |t| t.to_s } jobs = jobs.reject { |j| tags.find { |t| ! j.tags.include?(t) } } jobs end
# File lib/rufus/scheduler.rb, line 165 def join fail NotRunningError.new( 'cannot join scheduler that is not running' ) unless @thread @thread.join end
Returns true if the scheduler has acquired the [exclusive] lock and thus may run.
Most of the time, a scheduler is run alone and this method should return true. It is useful in cases where among a group of applications only one of them should run the scheduler. For schedulers that should not run, the method should return false.
Out of the box, rufus-scheduler proposes the :lockfile => 'path/to/lock/file' scheduler start option. It makes it easy for schedulers on the same machine to determine which should run (the first to write the lockfile and lock it). It uses “man 2 flock” so it probably won't work reliably on distributed file systems.
If one needs to use a special/different locking mechanism, the scheduler accepts :scheduler_lock => lock_object. lock_object only needs to respond to lock and unlock, and both of these methods should be idempotent.
Look at rufus/scheduler/locks.rb for an example.
# File lib/rufus/scheduler.rb, line 359 def lock @scheduler_lock.lock end
# File lib/rufus/scheduler.rb, line 432 def occurrences(time0, time1, format=:per_job) h = {} jobs.each do |j| os = j.occurrences(time0, time1) h[j] = os if os.any? end if format == :timeline a = [] h.each { |j, ts| ts.each { |t| a << [ t, j ] } } a.sort_by { |(t, _)| t } else h end end
# File lib/rufus/scheduler.rb, line 455 def on_error(job, err) pre = err.object_id.to_s ms = {}; mutexes.each { |k, v| ms[k] = v.locked? } stderr.puts("{ #{pre} rufus-scheduler intercepted an error:") stderr.puts(" #{pre} job:") stderr.puts(" #{pre} #{job.class} #{job.original.inspect} #{job.opts.inspect}") # TODO: eventually use a Job#detail or something like that stderr.puts(" #{pre} error:") stderr.puts(" #{pre} #{err.object_id}") stderr.puts(" #{pre} #{err.class}") stderr.puts(" #{pre} #{err}") err.backtrace.each do |l| stderr.puts(" #{pre} #{l}") end stderr.puts(" #{pre} tz:") stderr.puts(" #{pre} ENV['TZ']: #{ENV['TZ']}") stderr.puts(" #{pre} Time.now: #{Time.now}") stderr.puts(" #{pre} scheduler:") stderr.puts(" #{pre} object_id: #{object_id}") stderr.puts(" #{pre} opts:") stderr.puts(" #{pre} #{@opts.inspect}") stderr.puts(" #{pre} frequency: #{self.frequency}") stderr.puts(" #{pre} scheduler_lock: #{@scheduler_lock.inspect}") stderr.puts(" #{pre} trigger_lock: #{@trigger_lock.inspect}") stderr.puts(" #{pre} uptime: #{uptime} (#{uptime_s})") stderr.puts(" #{pre} down?: #{down?}") stderr.puts(" #{pre} threads: #{self.threads.size}") stderr.puts(" #{pre} thread: #{self.thread}") stderr.puts(" #{pre} thread_key: #{self.thread_key}") stderr.puts(" #{pre} work_threads: #{work_threads.size}") stderr.puts(" #{pre} active: #{work_threads(:active).size}") stderr.puts(" #{pre} vacant: #{work_threads(:vacant).size}") stderr.puts(" #{pre} max_work_threads: #{max_work_threads}") stderr.puts(" #{pre} mutexes: #{ms.inspect}") stderr.puts(" #{pre} jobs: #{jobs.size}") stderr.puts(" #{pre} at_jobs: #{at_jobs.size}") stderr.puts(" #{pre} in_jobs: #{in_jobs.size}") stderr.puts(" #{pre} every_jobs: #{every_jobs.size}") stderr.puts(" #{pre} interval_jobs: #{interval_jobs.size}") stderr.puts(" #{pre} cron_jobs: #{cron_jobs.size}") stderr.puts(" #{pre} running_jobs: #{running_jobs.size}") stderr.puts(" #{pre} work_queue: #{work_queue.size}") stderr.puts("} #{pre} .") rescue => e stderr.puts("failure in #on_error itself:") stderr.puts(e.inspect) stderr.puts(e.backtrace) ensure stderr.flush end
# File lib/rufus/scheduler.rb, line 189 def pause @paused = true end
# File lib/rufus/scheduler.rb, line 184 def paused? @paused end
# File lib/rufus/scheduler.rb, line 264 def repeat(arg, callable=nil, opts={}, &block) opts[:_t] = Scheduler.parse(arg, opts) case opts[:_t] when CronLine then schedule_cron(arg, callable, opts, &block) else schedule_every(arg, callable, opts, &block) end end
# File lib/rufus/scheduler.rb, line 194 def resume @paused = false end
# File lib/rufus/scheduler.rb, line 427 def running_jobs(opts={}) jobs(opts.merge(:running => true)) end
# File lib/rufus/scheduler.rb, line 253 def schedule(arg, callable=nil, opts={}, &block) opts[:_t] = Scheduler.parse(arg, opts) case opts[:_t] when CronLine then schedule_cron(arg, callable, opts, &block) when Time then schedule_at(arg, callable, opts, &block) else schedule_in(arg, callable, opts, &block) end end
# File lib/rufus/scheduler.rb, line 208 def schedule_at(time, callable=nil, opts={}, &block) do_schedule(:once, time, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 248 def schedule_cron(cronline, callable=nil, opts={}, &block) do_schedule(:cron, cronline, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 228 def schedule_every(duration, callable=nil, opts={}, &block) do_schedule(:every, duration, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 218 def schedule_in(duration, callable=nil, opts={}, &block) do_schedule(:once, duration, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 238 def schedule_interval(duration, callable=nil, opts={}, &block) do_schedule(:interval, duration, callable, opts, true, block) end
Returns true if this job is currently scheduled.
Takes extra care to answer true if the job is a repeat job currently firing.
# File lib/rufus/scheduler.rb, line 386 def scheduled?(job_or_job_id) job, _ = fetch(job_or_job_id) !! (job && job.unscheduled_at.nil? && job.next_time != nil) end
# File lib/rufus/scheduler.rb, line 134 def shutdown(opt=nil) @started_at = nil #jobs.each { |j| j.unschedule } # provokes https://github.com/jmettraux/rufus-scheduler/issue/98 @jobs.array.each { |j| j.unschedule } @work_queue.clear if opt == :wait join_all_work_threads elsif opt == :kill kill_all_work_threads end unlock end
Lists all the threads associated with this scheduler.
# File lib/rufus/scheduler.rb, line 395 def threads Thread.list.select { |t| t[thread_key] } end
# File lib/rufus/scheduler.rb, line 450 def timeline(time0, time1) occurrences(time0, time1, :timeline) end
Sister method to lock, is called when the scheduler shuts down.
# File lib/rufus/scheduler.rb, line 366 def unlock @trigger_lock.unlock @scheduler_lock.unlock end
# File lib/rufus/scheduler.rb, line 274 def unschedule(job_or_job_id) job, job_id = fetch(job_or_job_id) fail ArgumentError.new("no job found with id '#{job_id}'") unless job job.unschedule if job end
# File lib/rufus/scheduler.rb, line 179 def up? !! @started_at end
# File lib/rufus/scheduler.rb, line 155 def uptime @started_at ? Time.now - @started_at : nil end
# File lib/rufus/scheduler.rb, line 160 def uptime_s self.class.to_duration(uptime) end
Lists all the work threads (the ones actually running the scheduled block code)
Accepts a query option, which can be set to:
-
:all (default), returns all the threads that are work threads or are currently running a job
-
:active, returns all threads that are currenly running a job
-
:vacant, returns the threads that are not running a job
If, thanks to :blocking => true, a job is scheduled to monopolize the main scheduler thread, that thread will get returned when :active or :all.
# File lib/rufus/scheduler.rb, line 413 def work_threads(query=:all) ts = threads.select { |t| t[:rufus_scheduler_job] || t[:rufus_scheduler_work_thread] } case query when :active then ts.select { |t| t[:rufus_scheduler_job] } when :vacant then ts.reject { |t| t[:rufus_scheduler_job] } else ts end end
Protected Instance Methods
# File lib/rufus/scheduler.rb, line 608 def do_schedule(job_type, t, callable, opts, return_job_instance, block) fail NotRunningError.new( 'cannot schedule, scheduler is down or shutting down' ) if @started_at == nil callable, opts = nil, callable if callable.is_a?(Hash) return_job_instance ||= opts[:job] job_class = case job_type when :once opts[:_t] ||= Rufus::Scheduler.parse(t, opts) opts[:_t].is_a?(Time) ? AtJob : InJob when :every EveryJob when :interval IntervalJob when :cron CronJob end job = job_class.new(self, t, opts, block || callable) fail ArgumentError.new( "job frequency (#{job.frequency}) is higher than " + "scheduler frequency (#{@frequency})" ) if job.respond_to?(:frequency) && job.frequency < @frequency @jobs.push(job) return_job_instance ? job : job.job_id end
Returns [ job, job_id ]
# File lib/rufus/scheduler.rb, line 517 def fetch(job_or_job_id) if job_or_job_id.respond_to?(:job_id) [ job_or_job_id, job_or_job_id.job_id ] else [ job(job_or_job_id), job_or_job_id ] end end
# File lib/rufus/scheduler.rb, line 533 def join_all_work_threads work_threads.size.times { @work_queue << :sayonara } work_threads.each { |t| t.join } @work_queue.clear end
# File lib/rufus/scheduler.rb, line 542 def kill_all_work_threads work_threads.each { |t| t.kill } end
def free_all_work_threads
work_threads.each { |t| t.raise(KillSignal) }
end
# File lib/rufus/scheduler.rb, line 552 def start @started_at = Time.now @thread = Thread.new do while @started_at do unschedule_jobs trigger_jobs unless @paused timeout_jobs sleep(@frequency) end end @thread[@thread_key] = true @thread[:rufus_scheduler] = self @thread[:name] = @opts[:thread_name] || "#{@thread_key}_scheduler" end
# File lib/rufus/scheduler.rb, line 526 def terminate_all_jobs jobs.each { |j| j.unschedule } sleep 0.01 while running_jobs.size > 0 end
# File lib/rufus/scheduler.rb, line 589 def timeout_jobs work_threads(:active).each do |t| job = t[:rufus_scheduler_job] to = t[:rufus_scheduler_timeout] ts = t[:rufus_scheduler_time] next unless job && to && ts # thread might just have become inactive (job -> nil) to = to.is_a?(Time) ? to : ts + to next if to > Time.now t.raise(Rufus::Scheduler::TimeoutError) end end
# File lib/rufus/scheduler.rb, line 579 def trigger_jobs now = Time.now @jobs.each(now) do |job| job.trigger(now) end end
# File lib/rufus/scheduler.rb, line 574 def unschedule_jobs @jobs.delete_unscheduled end