# File lib/activemessaging/adapters/amqp.rb, line 13 def self.subscribes_to destination_name, headers={} # let's default to using the same exchange_type/exchange_name as defined in the messages.rb # for the given destination. XXX: THIS IS A BIG TIME MONKEY PATCH! Might consider pushing a # proper patch upstream instead of jury-rigging this. d = ActiveMessaging::Gateway.find_destination(destination_name) type, name = [ d.publish_headers[:exchange_type], d.publish_headers[:exchange_name] ] ActiveMessaging::Gateway.subscribe_to destination_name, self, { :exchange_type => type, :exchange_name => name }.merge(headers) end
# File lib/activemessaging/processor.rb, line 15 def logger() @@logger ||= ActiveMessaging.logger end
# File lib/activemessaging/processor.rb, line 23 def on_error(exception) raise exception end
# File lib/activemessaging/processor.rb, line 19 def on_message(message) raise NotImplementedError.new("Implement the on_message method in your own processor class that extends ActiveMessaging::Processor") end
Bind the processor to the current message so that the processor could potentially access headers and other attributes of the message
# File lib/activemessaging/processor.rb, line 29 def process!(message) @message = message return on_message(message.body) rescue Object=>err begin on_error(err) rescue ActiveMessaging::AbortMessageException => rpe logger.error "Processor:process! - AbortMessageException caught." raise rpe rescue Object=>ex logger.error "Processor:process! - error in on_error, will propagate no further: #{ex.message}\n\t#{ex.backtrace.join("\n\t")}" end end
Generated with the Darkfish Rdoc Generator 2.