connection is a string, name of the connection from broker.yml to use for this threaded poller instance
configuration is a list of hashes each has describes a group of worker threads for each group, define what priorities those workers will process
[ { :pool_size => 1 # number of workers of this type :priorities => [1,2,3] # what message priorities this thread will process } ]
# File lib/activemessaging/threaded_poller.rb, line 33 def initialize(connection='default', configuration={}) # default config is a pool size of 3 worker threads self.configuration = configuration || [{:pool_size => 3}] self.connection = connection end
# File lib/activemessaging/threaded_poller.rb, line 120 def died(worker, reason) busy.delete(worker) if running logger.info "uh oh, #{worker.inspect} died because of #{reason.class}" worker = Worker.new_link(current_actor) workers << worker receive(worker) else logger.info "check to see if busy is empty: #{busy.inspect}" if busy.empty? logger.info "all died: signal stopped" after(0){ signal(:shutdown) } end end end
# File lib/activemessaging/threaded_poller.rb, line 99 def dispatch(message, worker) workers.delete(worker) busy << worker worker.execute!(message) end
# File lib/activemessaging/threaded_poller.rb, line 105 def executed(worker) busy.delete(worker) if running workers << worker receive(worker) else worker.terminate if worker.alive? if busy.empty? logger.info "all executed: signal stopped" after(0) { signal(:shutdown) } end end end
recursive method, uses celluloid 'after' to keep calling
# File lib/activemessaging/threaded_poller.rb, line 89 def log_status return unless logger.debug? logger.debug("ActiveMessaging::ThreadedPoller: conn:#{connection}, #{workers.count}, #{busy.count}, #{running}") after(10){ log_status } end
# File lib/activemessaging/threaded_poller.rb, line 141 def logger; ActiveMessaging.logger; end
# File lib/activemessaging/threaded_poller.rb, line 95 def receive(worker) receiver.receive!(worker) if (receiver && running && worker) end
# File lib/activemessaging/threaded_poller.rb, line 39 def start logger.info "ActiveMessaging::ThreadedPoller start" # these are workers ready to use self.workers = [] # these are workers already working self.busy = [] # this indicates if we are running or not, helps threads to stop gracefully self.running = true # subscribe will create the connections based on subscriptions in processsors # (you can't find or use the connection until it is created by calling this) ActiveMessaging::Gateway.subscribe # create a message receiver actor, ony need one, using connection receiver_connection = ActiveMessaging::Gateway.connection(connection) self.receiver = MessageReceiver.new(current_actor, receiver_connection) # start the workers based on the config configuration.each do |c| (c[:pool_size] || 1).times{ self.workers << Worker.new_link(current_actor, c) } end # once all workers are created, start them up self.workers.each{|worker| receive(worker)} # in debug level, log info about workers every 10 seconds log_status end
# File lib/activemessaging/threaded_poller.rb, line 71 def stop logger.info "ActiveMessaging::ThreadedPoller stop" # indicates to all busy workers not to pick up another messages, but does not interrupt # also indicates to the message receiver to stop getting more messages self.running = false # tell each waiting worker to shut down. Running ones will be allowed to finish receiver.terminate! if receiver.alive? logger.info "ActiveMessaging::ThreadedPoller receiver terminated" workers.each { |w| w.terminate! if w.alive? } logger.info "ActiveMessaging::ThreadedPoller workers terminated" after(0) { signal(:shutdown) } if stopped? end
Generated with the Darkfish Rdoc Generator 2.