Object
MIN_WORK_THREADS = 3
Produces a hour/min/sec/milli string representation of Time instance
# File lib/rufus/scheduler/util.rb, line 316 def self.h_to_s(t=Time.now) "#{t.strftime('%H:%M:%S')}.#{sprintf('%06d', t.usec)}" end
# File lib/rufus/scheduler.rb, line 75 def initialize(opts={}) @opts = opts @started_at = nil @paused = false @jobs = JobArray.new @frequency = Rufus::Scheduler.parse(opts[:frequency] || 0.300) @mutexes = {} @work_queue = Queue.new #@min_work_threads = opts[:min_work_threads] || MIN_WORK_THREADS @max_work_threads = opts[:max_work_threads] || MAX_WORK_THREADS @stderr = $stderr @thread_key = "rufus_scheduler_#{self.object_id}" @scheduler_lock = if lockfile = opts[:lockfile] Rufus::Scheduler::FileLock.new(lockfile) else opts[:scheduler_lock] || Rufus::Scheduler::NullLock.new end @trigger_lock = opts[:trigger_lock] || Rufus::Scheduler::NullLock.new # If we can't grab the @scheduler_lock, don't run. @scheduler_lock.lock || return start end
# File lib/rufus/scheduler/util.rb, line 34 def self.parse(o, opts={}) opts[:no_error] = true parse_cron(o, opts) || parse_in(o, opts) || # covers 'every' schedule strings parse_at(o, opts) || raise(ArgumentError.new("couldn't parse \"#{o}\"")) end
# File lib/rufus/scheduler/util.rb, line 51 def self.parse_at(o, opts={}) return o if o.is_a?(Time) # TODO: deal with tz if suffixed to Chronic string? return Chronic.parse(o, opts) if defined?(Chronic) tz = nil s = o.to_s.gsub(TZ_REGEX) { |m| t = TZInfo::Timezone.get(m) rescue nil tz ||= t t ? '' : m } begin DateTime.parse(o) rescue raise ArgumentError, "no time information in #{o.inspect}" end if RUBY_VERSION < '1.9.0' t = Time.parse(s) tz ? tz.local_to_utc(t) : t rescue StandardError => se return nil if opts[:no_error] raise se end
# File lib/rufus/scheduler/util.rb, line 82 def self.parse_cron(o, opts) CronLine.new(o) rescue ArgumentError => ae return nil if opts[:no_error] raise ae end
Turns a string like ‘1m10s’ into a float like ‘70.0’, more formally, turns a time duration expressed as a string into a Float instance (millisecond count).
w -> week d -> day h -> hour m -> minute s -> second M -> month y -> year ‘nada’ -> millisecond
Some examples:
Rufus::Scheduler.parse_duration "0.5" # => 0.5 Rufus::Scheduler.parse_duration "500" # => 0.5 Rufus::Scheduler.parse_duration "1000" # => 1.0 Rufus::Scheduler.parse_duration "1h" # => 3600.0 Rufus::Scheduler.parse_duration "1h10s" # => 3610.0 Rufus::Scheduler.parse_duration "1w2d" # => 777600.0
Negative time strings are OK (Thanks Danny Fullerton):
Rufus::Scheduler.parse_duration "-0.5" # => -0.5 Rufus::Scheduler.parse_duration "-1h" # => -3600.0
# File lib/rufus/scheduler/util.rb, line 149 def self.parse_duration(string, opts={}) string = string.to_s return 0.0 if string == '' m = string.match(/^(-?)([\d\.#{DURATION_LETTERS}]+)$/) return nil if m.nil? && opts[:no_error] raise ArgumentError.new("cannot parse '#{string}'") if m.nil? mod = m[1] == '-' ? -1.0 : 1.0 val = 0.0 s = m[2] while s.length > 0 m = nil if m = s.match(/^(\d+|\d+\.\d*|\d*\.\d+)([#{DURATION_LETTERS}])(.*)$/) val += m[1].to_f * DURATIONS[m[2]] elsif s.match(/^\d+$/) val += s.to_i elsif s.match(/^\d*\.\d*$/) val += s.to_f elsif opts[:no_error] return nil else raise ArgumentError.new( "cannot parse '#{string}' (especially '#{s}')" ) end break unless m && m[3] s = m[3] end mod * val end
-
for compatibility with rufus-scheduler 2.x
+
# File lib/rufus/scheduler/util.rb, line 44 def self.parse_in(o, opts={}) o.is_a?(String) ? parse_duration(o, opts) : o end
# File lib/rufus/scheduler/util.rb, line 92 def self.parse_to_time(o) t = o t = parse(t) if t.is_a?(String) t = Time.now + t if t.is_a?(Numeric) raise ArgumentError.new( "cannot turn #{o.inspect} to a point in time, doesn't make sense" ) unless t.is_a?(Time) t end
Alias for Rufus::Scheduler.singleton
# File lib/rufus/scheduler.rb, line 120 def self.s(opts={}); singleton(opts); end
Returns a singleton Rufus::Scheduler instance
# File lib/rufus/scheduler.rb, line 113 def self.singleton(opts={}) @singleton ||= Rufus::Scheduler.new(opts) end
Releasing the gem would probably require redirecting .start_new to .new and emit a simple deprecation message.
For now, let’s assume the people pointing at rufus-scheduler/master on GitHub know what they do…
# File lib/rufus/scheduler.rb, line 128 def self.start_new fail "this is rufus-scheduler 3.0, use .new instead of .start_new" end
Turns a number of seconds into a a time string
Rufus.to_duration 0 # => '0s' Rufus.to_duration 60 # => '1m' Rufus.to_duration 3661 # => '1h1m1s' Rufus.to_duration 7 * 24 * 3600 # => '1w' Rufus.to_duration 30 * 24 * 3600 + 1 # => "4w2d1s"
It goes from seconds to the year. Months are not counted (as they are of variable length). Weeks are counted.
For 30 days months to be counted, the second parameter of this method can be set to true.
Rufus.to_duration 30 * 24 * 3600 + 1, true # => "1M1s"
If a Float value is passed, milliseconds will be displayed without ‘marker’
Rufus.to_duration 0.051 # => "51" Rufus.to_duration 7.051 # => "7s51" Rufus.to_duration 0.120 + 30 * 24 * 3600 + 1 # => "4w2d1s120"
(this behaviour mirrors the one found for parse_time_string()).
Options are :
:months, if set to true, months (M) of 30 days will be taken into account when building up the result
:drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result
# File lib/rufus/scheduler/util.rb, line 228 def self.to_duration(seconds, options={}) h = to_duration_hash(seconds, options) return (options[:drop_seconds] ? '0m' : '0s') if h.empty? s = DU_KEYS.inject('') { |r, key| count = h[key] count = nil if count == 0 r << "#{count}#{key}" if count r } ms = h[:ms] s << ms.to_s if ms s end
Turns a number of seconds (integer or Float) into a hash like in :
Rufus.to_duration_hash 0.051 # => { :ms => "51" } Rufus.to_duration_hash 7.051 # => { :s => 7, :ms => "51" } Rufus.to_duration_hash 0.120 + 30 * 24 * 3600 + 1 # => { :w => 4, :d => 2, :s => 1, :ms => "120" }
This method is used by to_duration behind the scenes.
Options are :
:months, if set to true, months (M) of 30 days will be taken into account when building up the result
:drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result
# File lib/rufus/scheduler/util.rb, line 274 def self.to_duration_hash(seconds, options={}) h = {} if seconds.is_a?(Float) h[:ms] = (seconds % 1 * 1000).to_i seconds = seconds.to_i end if options[:drop_seconds] h.delete(:ms) seconds = (seconds - seconds % 60) end durations = options[:months] ? DURATIONS2M : DURATIONS2 durations.each do |key, duration| count = seconds / duration seconds = seconds % duration h[key.to_sym] = count if count > 0 end h end
-
for compatibility with rufus-scheduler 2.x
+
# File lib/rufus/scheduler.rb, line 202 def at(time, callable=nil, opts={}, &block) do_schedule(:once, time, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 307 def at_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::AtJob) } end
Callback called when a job is triggered. If the lock cannot be acquired, the job won’t run (though it’ll still be scheduled to run again if necessary).
# File lib/rufus/scheduler.rb, line 375 def confirm_lock @trigger_lock.lock end
# File lib/rufus/scheduler.rb, line 242 def cron(cronline, callable=nil, opts={}, &block) do_schedule(:cron, cronline, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 327 def cron_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::CronJob) } end
# File lib/rufus/scheduler.rb, line 222 def every(duration, callable=nil, opts={}, &block) do_schedule(:every, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 317 def every_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::EveryJob) } end
# File lib/rufus/scheduler.rb, line 212 def in(duration, callable=nil, opts={}, &block) do_schedule(:once, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 312 def in_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::InJob) } end
# File lib/rufus/scheduler.rb, line 232 def interval(duration, callable=nil, opts={}, &block) do_schedule(:interval, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 322 def interval_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::IntervalJob) } end
# File lib/rufus/scheduler.rb, line 332 def job(job_id) @jobs[job_id] end
Returns all the scheduled jobs (even those right before re-schedule).
# File lib/rufus/scheduler.rb, line 289 def jobs(opts={}) opts = { opts => true } if opts.is_a?(Symbol) jobs = @jobs.to_a if opts[:running] jobs = jobs.select { |j| j.running? } elsif ! opts[:all] jobs = jobs.reject { |j| j.next_time.nil? || j.unscheduled_at } end tags = Array(opts[:tag] || opts[:tags]).collect { |t| t.to_s } jobs = jobs.reject { |j| tags.find { |t| ! j.tags.include?(t) } } jobs end
# File lib/rufus/scheduler.rb, line 164 def join fail NotRunningError.new( 'cannot join scheduler that is not running' ) unless @thread @thread.join end
Returns true if the scheduler has acquired the [exclusive] lock and thus may run.
Most of the time, a scheduler is run alone and this method should return true. It is useful in cases where among a group of applications only one of them should run the scheduler. For schedulers that should not run, the method should return false.
Out of the box, rufus-scheduler proposes the :lockfile => ‘path/to/lock/file’ scheduler start option. It makes it easy for schedulers on the same machine to determine which should run (the first to write the lockfile and lock it). It uses “man 2 flock” so it probably won’t work reliably on distributed file systems.
If one needs to use a special/different locking mechanism, the scheduler accepts :scheduler_lock => lock_object. lock_object only needs to respond to lock and unlock, and both of these methods should be idempotent.
Look at rufus/scheduler/locks.rb for an example.
# File lib/rufus/scheduler.rb, line 358 def lock @scheduler_lock.lock end
# File lib/rufus/scheduler.rb, line 431 def occurrences(time0, time1, format=:per_job) h = {} jobs.each do |j| os = j.occurrences(time0, time1) h[j] = os if os.any? end if format == :timeline a = [] h.each { |j, ts| ts.each { |t| a << [ t, j ] } } a.sort_by { |(t, j)| t } else h end end
# File lib/rufus/scheduler.rb, line 454 def on_error(job, err) pre = err.object_id.to_s ms = {}; mutexes.each { |k, v| ms[k] = v.locked? } stderr.puts("{ #{pre} rufus-scheduler intercepted an error:") stderr.puts(" #{pre} job:") stderr.puts(" #{pre} #{job.class} #{job.original.inspect} #{job.opts.inspect}") # TODO: eventually use a Job#detail or something like that stderr.puts(" #{pre} error:") stderr.puts(" #{pre} #{err.object_id}") stderr.puts(" #{pre} #{err.class}") stderr.puts(" #{pre} #{err}") err.backtrace.each do |l| stderr.puts(" #{pre} #{l}") end stderr.puts(" #{pre} tz:") stderr.puts(" #{pre} ENV['TZ']: #{ENV['TZ']}") stderr.puts(" #{pre} Time.now: #{Time.now}") stderr.puts(" #{pre} scheduler:") stderr.puts(" #{pre} object_id: #{object_id}") stderr.puts(" #{pre} opts:") stderr.puts(" #{pre} #{@opts.inspect}") stderr.puts(" #{pre} frequency: #{self.frequency}") stderr.puts(" #{pre} scheduler_lock: #{@scheduler_lock.inspect}") stderr.puts(" #{pre} trigger_lock: #{@trigger_lock.inspect}") stderr.puts(" #{pre} uptime: #{uptime} (#{uptime_s})") stderr.puts(" #{pre} down?: #{down?}") stderr.puts(" #{pre} threads: #{self.threads.size}") stderr.puts(" #{pre} thread: #{self.thread}") stderr.puts(" #{pre} thread_key: #{self.thread_key}") stderr.puts(" #{pre} work_threads: #{work_threads.size}") stderr.puts(" #{pre} active: #{work_threads(:active).size}") stderr.puts(" #{pre} vacant: #{work_threads(:vacant).size}") stderr.puts(" #{pre} max_work_threads: #{max_work_threads}") stderr.puts(" #{pre} mutexes: #{ms.inspect}") stderr.puts(" #{pre} jobs: #{jobs.size}") stderr.puts(" #{pre} at_jobs: #{at_jobs.size}") stderr.puts(" #{pre} in_jobs: #{in_jobs.size}") stderr.puts(" #{pre} every_jobs: #{every_jobs.size}") stderr.puts(" #{pre} interval_jobs: #{interval_jobs.size}") stderr.puts(" #{pre} cron_jobs: #{cron_jobs.size}") stderr.puts(" #{pre} running_jobs: #{running_jobs.size}") stderr.puts(" #{pre} work_queue: #{work_queue.size}") stderr.puts("} #{pre} .") rescue => e stderr.puts("failure in #on_error itself:") stderr.puts(e.inspect) stderr.puts(e.backtrace) ensure stderr.flush end
# File lib/rufus/scheduler.rb, line 263 def repeat(arg, callable=nil, opts={}, &block) opts[:_t] = Scheduler.parse(arg, opts) case opts[:_t] when CronLine then schedule_cron(arg, callable, opts, &block) else schedule_every(arg, callable, opts, &block) end end
# File lib/rufus/scheduler.rb, line 193 def resume @paused = false end
# File lib/rufus/scheduler.rb, line 426 def running_jobs(opts={}) jobs(opts.merge(:running => true)) end
# File lib/rufus/scheduler.rb, line 252 def schedule(arg, callable=nil, opts={}, &block) opts[:_t] = Scheduler.parse(arg, opts) case opts[:_t] when CronLine then schedule_cron(arg, callable, opts, &block) when Time then schedule_at(arg, callable, opts, &block) else schedule_in(arg, callable, opts, &block) end end
# File lib/rufus/scheduler.rb, line 207 def schedule_at(time, callable=nil, opts={}, &block) do_schedule(:once, time, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 247 def schedule_cron(cronline, callable=nil, opts={}, &block) do_schedule(:cron, cronline, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 227 def schedule_every(duration, callable=nil, opts={}, &block) do_schedule(:every, duration, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 217 def schedule_in(duration, callable=nil, opts={}, &block) do_schedule(:once, duration, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 237 def schedule_interval(duration, callable=nil, opts={}, &block) do_schedule(:interval, duration, callable, opts, true, block) end
Returns true if this job is currently scheduled.
Takes extra care to answer true if the job is a repeat job currently firing.
# File lib/rufus/scheduler.rb, line 385 def scheduled?(job_or_job_id) job, job_id = fetch(job_or_job_id) !! (job && job.next_time != nil) end
# File lib/rufus/scheduler.rb, line 133 def shutdown(opt=nil) @started_at = nil #jobs.each { |j| j.unschedule } # provokes https://github.com/jmettraux/rufus-scheduler/issue/98 @jobs.array.each { |j| j.unschedule } @work_queue.clear if opt == :wait join_all_work_threads elsif opt == :kill kill_all_work_threads end unlock end
Lists all the threads associated with this scheduler.
# File lib/rufus/scheduler.rb, line 394 def threads Thread.list.select { |t| t[thread_key] } end
# File lib/rufus/scheduler.rb, line 449 def timeline(time0, time1) occurrences(time0, time1, :timeline) end
Sister method to lock, is called when the scheduler shuts down.
# File lib/rufus/scheduler.rb, line 365 def unlock @trigger_lock.unlock @scheduler_lock.unlock end
# File lib/rufus/scheduler.rb, line 273 def unschedule(job_or_job_id) job, job_id = fetch(job_or_job_id) fail ArgumentError.new("no job found with id '#{job_id}'") unless job job.unschedule if job end
# File lib/rufus/scheduler.rb, line 154 def uptime @started_at ? Time.now - @started_at : nil end
# File lib/rufus/scheduler.rb, line 159 def uptime_s self.class.to_duration(uptime) end
Lists all the work threads (the ones actually running the scheduled block code)
Accepts a query option, which can be set to:
:all (default), returns all the threads that are work threads or are currently running a job
:active, returns all threads that are currenly running a job
:vacant, returns the threads that are not running a job
If, thanks to :blocking => true, a job is scheduled to monopolize the main scheduler thread, that thread will get returned when :active or :all.
# File lib/rufus/scheduler.rb, line 412 def work_threads(query=:all) ts = threads.select { |t| t[:rufus_scheduler_job] || t[:rufus_scheduler_work_thread] } case query when :active then ts.select { |t| t[:rufus_scheduler_job] } when :vacant then ts.reject { |t| t[:rufus_scheduler_job] } else ts end end
# File lib/rufus/scheduler.rb, line 607 def do_schedule(job_type, t, callable, opts, return_job_instance, block) fail NotRunningError.new( 'cannot schedule, scheduler is down or shutting down' ) if @started_at == nil callable, opts = nil, callable if callable.is_a?(Hash) return_job_instance ||= opts[:job] job_class = case job_type when :once opts[:_t] ||= Rufus::Scheduler.parse(t, opts) opts[:_t].is_a?(Time) ? AtJob : InJob when :every EveryJob when :interval IntervalJob when :cron CronJob end job = job_class.new(self, t, opts, block || callable) raise ArgumentError.new( "job frequency (#{job.frequency}) is higher than " + "scheduler frequency (#{@frequency})" ) if job.respond_to?(:frequency) && job.frequency < @frequency @jobs.push(job) return_job_instance ? job : job.job_id end
Returns [ job, job_id ]
# File lib/rufus/scheduler.rb, line 516 def fetch(job_or_job_id) if job_or_job_id.respond_to?(:job_id) [ job_or_job_id, job_or_job_id.job_id ] else [ job(job_or_job_id), job_or_job_id ] end end
# File lib/rufus/scheduler.rb, line 532 def join_all_work_threads work_threads.size.times { @work_queue << :sayonara } work_threads.each { |t| t.join } @work_queue.clear end
# File lib/rufus/scheduler.rb, line 541 def kill_all_work_threads work_threads.each { |t| t.kill } end
def free_all_work_threads
work_threads.each { |t| t.raise(KillSignal) }
end
# File lib/rufus/scheduler.rb, line 551 def start @started_at = Time.now @thread = Thread.new do while @started_at do unschedule_jobs trigger_jobs unless @paused timeout_jobs sleep(@frequency) end end @thread[@thread_key] = true @thread[:rufus_scheduler] = self @thread[:name] = @opts[:thread_name] || "#{@thread_key}_scheduler" end
# File lib/rufus/scheduler.rb, line 525 def terminate_all_jobs jobs.each { |j| j.unschedule } sleep 0.01 while running_jobs.size > 0 end
# File lib/rufus/scheduler.rb, line 588 def timeout_jobs work_threads(:active).each do |t| job = t[:rufus_scheduler_job] to = t[:rufus_scheduler_timeout] next unless job && to # thread might just have become inactive (job -> nil) ts = t[:rufus_scheduler_time] to = to.is_a?(Time) ? to : ts + to next if to > Time.now t.raise(Rufus::Scheduler::TimeoutError) end end
Generated with the Darkfish Rdoc Generator 2.