class DirectoryWatcher::Collector

Collector reads items from a collection Queue and processes them to see if FileEvents should be put onto the notification Queue.

Public Class Methods

new( config ) click to toggle source
Create a new StatCollector from the given Configuration, and an optional
Scan.

configuration - The Collector uses from Configuration:
  collection_queue   - The Queue to read items from the Scanner on
  notification_queue - The Queue to submit the Events to the Notifier on
  stable             - The number of times we see a file hasn't changed before
                       emitting a stable event
  sort_by            - the method used to sort events during on_scan results
  order_by           - The method used to order events from call to on_scan

pre_load_scan - A Scan to use to load our internal state from before. No
                events will be emitted for the FileStat's in this scan.

def initialize( #notification_queue, #collection_queue, options = {} )

# File lib/directory_watcher/collector.rb, line 23
def initialize( config )
  @stats = Hash.new
  @stable_counts = Hash.new
  @config = config
  on_scan( DirectoryWatcher::Scan.new( config.glob ), false ) if config.pre_load?
  self.interval = 0.01 # yes this is a fast loop
end

Public Instance Methods

collection_queue() click to toggle source

The queue from which to read items from the scanners. See Configuration.

# File lib/directory_watcher/collector.rb, line 51
def collection_queue
  @config.collection_queue
end
dump_stats( io ) click to toggle source

Write the current stats to the given IO object as a YAML document.

io - The IO object to write the document to.

Returns nothing.

# File lib/directory_watcher/collector.rb, line 122
def dump_stats( io )
  YAML.dump(@stats, io)
end
load_stats( io ) click to toggle source

Read the current stats from the given IO object. Any existing stats in the Collector will be overwritten

io - The IO object from which to read the document.

Returns nothing.

# File lib/directory_watcher/collector.rb, line 132
def load_stats( io )
  @stats = YAML.load(io)
end
notification_queue() click to toggle source

The queue to write Events for the Notifier. See Configuration.

# File lib/directory_watcher/collector.rb, line 57
def notification_queue
  @config.notification_queue
end
on_scan( scan, emit_events = true ) click to toggle source

Given the scan, update the set of stats with the results from the Scan and emit events to the notification queue as appropriate.

scan - The Scan containing all the new FileStat items emit_events - Should events be emitted for the events in the scan

(default: true)

There is one odd thing that happens here. Scanners that are EventableScanners use #on_stat to emit removed events, and the standard threaded Scanner only uses Scans. So we make sure and only emit removed events in this method if the scanner that gave us the scan was the basic threaded Scanner.

TODO: Possibly fix this through another abstraction in the Scanners. No idea about what that would be yet.

Returns nothing.

# File lib/directory_watcher/collector.rb, line 78
def on_scan( scan, emit_events = true )
  seen_paths = Set.new
  logger.debug "Sorting by #{sort_by} #{order_by}"
  sorted_stats( scan.run ).each do |stat|
    on_stat(stat, emit_events)
    seen_paths << stat.path
  end
  emit_removed_events(seen_paths) if @config.scanner.nil?
end
on_stat( stat, emit_event = true ) click to toggle source

Process a single stat and emit an event if necessary.

stat - The new FileStat to process and see if an event should

be emitted

emit_event - Whether or not an event should be emitted.

Returns nothing

# File lib/directory_watcher/collector.rb, line 95
def on_stat( stat, emit_event = true )
  orig_stat = update_stat( stat )
  logger.debug "Emitting event for on_stat #{stat}"
  emit_event_for( orig_stat, stat ) if emit_event
end
order_by() click to toggle source

How to order Scan results. See Configuration.

# File lib/directory_watcher/collector.rb, line 45
def order_by
  @config.order_by
end
run() click to toggle source

Remove one item from the collection queue and process it.

This method is required by the Threaded API

Returns nothing

# File lib/directory_watcher/collector.rb, line 106
def run
  case thing = collection_queue.deq
  when ::DirectoryWatcher::Scan
    on_scan(thing)
  when ::DirectoryWatcher::FileStat
    on_stat(thing)
  else
    raise "Unknown item in the queue: #{thing}"
  end
end
sort_by() click to toggle source

How to sort Scan results. See Configuration.

# File lib/directory_watcher/collector.rb, line 39
def sort_by
  @config.sort_by
end
stable_threshold() click to toggle source

The number of times we see a file hasn't changed before emitting a stable count. See Configuration#stable

# File lib/directory_watcher/collector.rb, line 33
def stable_threshold
  @config.stable
end

Private Instance Methods

emit_event_for( old_stat, new_stat ) click to toggle source

Determine what type of event to emit, and put that event onto the notification queue.

old_stat - The old FileStat new_stat - The new FileStat

Returns nothing

# File lib/directory_watcher/collector.rb, line 177
def emit_event_for( old_stat, new_stat )
  event = DirectoryWatcher::Event.from_stats( old_stat, new_stat )
  if should_emit?(event) then
    logger.debug "Sending event #{event.object_id} to notifcation queue"
    notification_queue.enq( event )
  else
    logger.debug "Emitting of event #{event.object_id} cancelled"
  end
end
emit_removed_events( seen_paths ) click to toggle source

Look for removed files and emit removed events for all of them.

seen_paths - the list of files that we know currently exist

Return nothing

# File lib/directory_watcher/collector.rb, line 162
def emit_removed_events( seen_paths )
  @stats.keys.each do |existing_path|
    next if seen_paths.include?(existing_path)
    old_stat = @stats.delete(existing_path)
    emit_event_for(old_stat, ::DirectoryWatcher::FileStat.for_removed_path(existing_path))
  end
end
emitting_stable_events?() click to toggle source

Is it legal for us to emit stable events at all. This checks the config to see if that is the case.

In the @config if the stable threshold is set then we are emitting stable events.

Returns whether it is legal to propogate stable events

# File lib/directory_watcher/collector.rb, line 280
def emitting_stable_events?
  stable_threshold
end
increment_stable_count( path ) click to toggle source

Increment the stable count for the given path

path - the path of the file to increment its stable count

Returns nothing

# File lib/directory_watcher/collector.rb, line 260
def increment_stable_count( path )
  @stable_counts[path] += 1
end
mark_as_invalid_for_stable_event( path ) click to toggle source

Mark that the given path is invalid for having a stable event emitted for it.

path - the path to mark

Returns nothing

# File lib/directory_watcher/collector.rb, line 250
def mark_as_invalid_for_stable_event( path )
  logger.debug "#{path} marked as invalid for stable"
  @stable_counts.delete( path )
end
mark_as_valid_for_stable_event( path ) click to toggle source

Let it be known that the given path can now have a stable event emitted for it.

path - the path to mark as ready

Returns nothing

# File lib/directory_watcher/collector.rb, line 239
def mark_as_valid_for_stable_event( path )
  logger.debug "#{path} marked as valid for stable"
  @stable_counts[path] = 0
end
should_emit?( event ) click to toggle source

Should the event given actually be emitted.

If the event passed in is NOT a stable event, return true If there is a #stable_threshold, then check to see if the stable count for this event's path has crossed the stable threshold.

This method has the side effect of updating the stable count of the path of the event. If we are going to return true for the stable event, then we reset the stable count of that event to 0.

event - any event

Returns whether or not to emit the event based upon its stability

# File lib/directory_watcher/collector.rb, line 200
def should_emit?( event )
  if event.stable? then
    if emitting_stable_events? and valid_for_stable_event?( event.path )then
      increment_stable_count( event.path )
      if should_emit_stable?( event.path ) then
        mark_as_invalid_for_stable_event( event.path )
        return true
      end
    end
    return false
  elsif event.removed? then
    mark_as_invalid_for_stable_event( event.path )
    return true
  else
    mark_as_valid_for_stable_event( event.path )
    return true
  end
end
should_emit_stable?( path ) click to toggle source

Is the given path ready to have a stable event emitted?

path - the path to report on

Returns whether to emit a stable event or not

# File lib/directory_watcher/collector.rb, line 269
def should_emit_stable?( path )
  @stable_counts[path] >= stable_threshold
end
sorted_stats( stats ) click to toggle source

Sort the stats by sort_by and order_by returning the results

# File lib/directory_watcher/collector.rb, line 142
def sorted_stats( stats )
  sorted = stats.sort_by{ |stat| stat.send(sort_by) }
  sorted = sorted.reverse if order_by == :descending
  return sorted
end
update_stat( new_stat ) click to toggle source

Update the stats Hash with the new_stat information, return the old data that is being replaced.

# File lib/directory_watcher/collector.rb, line 151
def update_stat( new_stat )
  old_stat = @stats.delete(new_stat.path)
  @stats.store(new_stat.path, new_stat) unless new_stat.removed?
  return old_stat
end
valid_for_stable_event?( path ) click to toggle source

Is the given path able to have a stable event emitted for it?

A stable event may only be emitted for a path that has already had an added or modified event already sent. Also, once a stable event has been emitted for a path, another stable event may not be emitted until it has been modified, or added again.

path - the path of the file to check

Returns whether or not the path may have a stable event emitted for it.

# File lib/directory_watcher/collector.rb, line 229
def valid_for_stable_event?( path )
  @stable_counts.has_key?( path )
end