class ActiveMessaging::Gateway

Attributes

adapters[RW]
connections[RW]
filters[RW]
named_destinations[RW]
processor_groups[RW]
subscriptions[RW]

Public Class Methods

_dispatch(message) click to toggle source
# File lib/activemessaging/gateway.rb, line 228
def _dispatch(message)
  abort = false
  processed = false

  subscriptions.each do |key, subscription|
    if message.matches_subscription?(subscription) then
      processed = true
      routing = {
        :receiver    => subscription.processor_class,
        :destination => subscription.destination,
        :direction   => :incoming
      }
      begin
        execute_filter_chain(:incoming, message.dup, routing) do |m|
          result = subscription.processor_class.new.process!(m)
        end
      rescue ActiveMessaging::AbortMessageException
        abort_message subscription, message
        abort = true
        return
      ensure
        acknowledge_message subscription, message unless abort
      end
    end
  end

  ActiveMessaging.logger.error("No-one responded to #{message}") unless processed

end
abort_message(subscription, message) click to toggle source

::abort_message is called when procesing the message raises a ActiveMessaging::AbortMessageException indicating the message should be returned to the destination so it can be tried again, later

# File lib/activemessaging/gateway.rb, line 265
def abort_message subscription, message
  connection(subscription.destination.broker_name).unreceive message, subscription.subscribe_headers
end
acknowledge_message(subscription, message) click to toggle source

::acknowledge_message is called when the message has been processed w/o error by at least one processor

# File lib/activemessaging/gateway.rb, line 259
def acknowledge_message subscription, message
  connection(subscription.destination.broker_name).received message, subscription.subscribe_headers
end
apply_filter?(direction, details, options) click to toggle source
# File lib/activemessaging/gateway.rb, line 167
def apply_filter?(direction, details, options)
  # check that it is the correct direction
  result = if direction.to_sym == options[:direction] || options[:direction] == :bidirectional
    if options.has_key?(:only) && [options[:only]].flatten.include?(details[:destination].name)
      true
    elsif options.has_key?(:except) && ![options[:except]].flatten.include?(details[:destination].name)
      true
    elsif !options.has_key?(:only) && !options.has_key?(:except)
      true
    end
  end
  result
end
connection(broker_name='default') click to toggle source
# File lib/activemessaging/gateway.rb, line 122
def connection broker_name='default'
  return @connections[broker_name] if @connections.has_key?(broker_name)
  config = load_connection_configuration(broker_name)
  adapter_class = Gateway.adapters[config[:adapter]]
  raise "Unknown messaging adapter #{config[:adapter].inspect}!" if adapter_class.nil?
  @connections[broker_name] = adapter_class.new(config)
end
create_filter(filter, options) click to toggle source
# File lib/activemessaging/gateway.rb, line 181
def create_filter(filter, options)
  filter_class = if filter.is_a?(String) or filter.is_a?(Symbol)
    filter.to_s.camelize.constantize
  elsif filter.is_a?(Class)
    filter
  end

  if filter_class
    if filter_class.respond_to?(:process) && (filter_class.method(:process).arity.abs > 0)
      filter_class
    elsif filter_class.instance_method(:initialize).arity.abs == 1
      filter_class.new(options)
    elsif filter_class.instance_method(:initialize).arity == 0
      filter_class.new
    else
      raise "Filter #{filter} could not be created, no 'initialize' matched."
    end
  else
    raise "Filter #{filter} could not be loaded, created, or used!"
  end
end
current_processor_group() click to toggle source
# File lib/activemessaging/gateway.rb, line 348
def current_processor_group
  if ARGV.length > 0 && !@current_processor_group
    ARGV.each {|arg|
      pair = arg.split('=')
      if pair[0] == 'process-group'
        group_sym = pair[1].to_sym
        if processor_groups.has_key? group_sym
          @current_processor_group = group_sym
        else
          ActiveMessaging.logger.error "Unrecognized process-group."
          ActiveMessaging.logger.error "You specified process-group #{pair[1]}, make sure this is specified in config/messaging.rb"
          ActiveMessaging.logger.error "  ActiveMessaging::Gateway.define do |s|"
          ActiveMessaging.logger.error "    s.processor_groups = { :group1 => [:foo_bar1_processor], :group2 => [:foo_bar2_processor] }"
          ActiveMessaging.logger.error "  end"
          exit
        end
      end
    }
  end
  @current_processor_group
end
define() { |self| ... } click to toggle source
# File lib/activemessaging/gateway.rb, line 269
def define
  #run the rest of messaging.rb
  yield self
end
destination(destination_name, destination, publish_headers={}) click to toggle source
# File lib/activemessaging/gateway.rb, line 274
def destination destination_name, destination, publish_headers={}, broker='default'
  raise "You already defined #{destination_name} to #{named_destinations[destination_name].value}" if named_destinations.has_key?(destination_name)
  named_destinations[destination_name] = Destination.new(destination_name, destination, publish_headers, broker)
end
Also aliased as: queue
disconnect() click to toggle source
# File lib/activemessaging/gateway.rb, line 147
def disconnect
  @connections.each { |key,connection| connection.disconnect }
  @connections = {}
end
dispatch(message) click to toggle source
# File lib/activemessaging/gateway.rb, line 216
def dispatch(message)
  prepare_application
  _dispatch(message)
rescue Object => exc
  ActiveMessaging.logger.error "Dispatch exception: #{exc}"
  ActiveMessaging.logger.error exc.backtrace.join("\n\t")
  raise exc
ensure
  ActiveMessaging.logger.flush rescue nil
  reset_application
end
execute_filter_chain(direction, message, details={}) { |message| ... } click to toggle source
# File lib/activemessaging/gateway.rb, line 152
def execute_filter_chain(direction, message, details={})
  filters.each do |filter, options|
    if apply_filter?(direction, details, options)
      begin
        filter_obj = create_filter(filter, options)
        filter_obj.process(message, details)
      rescue ActiveMessaging::StopFilterException => sfe
        ActiveMessaging.logger.error "Filter: #{filter_obj.inspect} threw StopFilterException: #{sfe.message}"
        return
      end
    end
  end
  yield(message)
end
filter(filter, options = {}) click to toggle source
# File lib/activemessaging/gateway.rb, line 134
def filter filter, options = {}
  options[:direction] = :bidirectional if options[:direction].nil?
  filters << [filter, options]
end
find_destination(destination_name) click to toggle source
# File lib/activemessaging/gateway.rb, line 281
def find_destination destination_name
  real_destination = named_destinations[destination_name]
  raise "You have not yet defined a destination named #{destination_name}. Destinations currently defined are [#{named_destinations.keys.join(',')}]" if real_destination.nil?
  real_destination
end
Also aliased as: find_queue
find_queue(destination_name)
Alias for: find_destination
load_connection_configuration(label='default') click to toggle source
# File lib/activemessaging/gateway.rb, line 370
def load_connection_configuration(label='default')
  @broker_yml = YAML::load(ERB.new(IO.read(File.join(ActiveMessaging.app_root, 'config', 'broker.yml'))).result) if @broker_yml.nil?
  if label == 'default'
    config = @broker_yml[ActiveMessaging.app_env].symbolize_keys
  else
    config = @broker_yml[ActiveMessaging.app_env][label].symbolize_keys
  end
  config[:adapter] = config[:adapter].to_sym if config[:adapter]
  config[:adapter] ||= :stomp
  return config
end
prepare_application() click to toggle source
# File lib/activemessaging/gateway.rb, line 203
def prepare_application
  return unless defined?(ActiveRecord)

  if ActiveRecord::VERSION::MAJOR >= 4
    ActiveRecord::Base.connection_pool.connections.map(&:verify!)
  else
    ActiveRecord::Base.verify_active_connections!
  end
end
processor_group(group_name, *processors) click to toggle source
# File lib/activemessaging/gateway.rb, line 340
def processor_group group_name, *processors
  if processor_groups.has_key? group_name
    processor_groups[group_name] =  processor_groups[group_name] + processors
  else
    processor_groups[group_name] = processors
  end
end
publish(destination_name, body, publisher=nil, headers={}) click to toggle source
# File lib/activemessaging/gateway.rb, line 297
def publish destination_name, body, publisher=nil, headers={}, timeout=10
  raise "You cannot have a nil or empty destination name." if destination_name.nil?
  raise "You cannot have a nil or empty message body." if (body.nil? || body.empty?)

  real_destination = find_destination(destination_name)
  details = {
    :publisher => publisher,
    :destination => real_destination,
    :direction => :outgoing
  }
  message = OpenStruct.new(:body => body, :headers => headers.reverse_merge(real_destination.publish_headers))
  begin
    Timeout.timeout timeout do
      execute_filter_chain(:outgoing, message, details) do |message|
        connection(real_destination.broker_name).send real_destination.value, message.body, message.headers
      end
    end
  rescue Timeout::Error=>toe
    ActiveMessaging.logger.error("Timed out trying to send the message #{message} to destination #{destination_name} via broker #{real_destination.broker_name}")
    raise toe
  end
end
queue(destination_name, destination, publish_headers={})
Alias for: destination
receive(destination_name, receiver=nil, subscribe_headers={}) click to toggle source
# File lib/activemessaging/gateway.rb, line 320
def receive destination_name, receiver=nil, subscribe_headers={}, timeout=10
  raise "You cannot have a nil or empty destination name." if destination_name.nil?
  conn = nil
  dest = find_destination destination_name
  config = load_connection_configuration(dest.broker_name)
  subscribe_headers['id'] = receiver.name.underscore unless (receiver.nil? or subscribe_headers.key? 'id')
  Timeout.timeout timeout do
    conn = Gateway.adapters[config[:adapter]].new(config)
    conn.subscribe(dest.value, subscribe_headers)
    message = conn.receive
    conn.received message, subscribe_headers
    return message
  end
rescue Timeout::Error=>toe
  ActiveMessaging.logger.error("Timed out trying to receive a message on destination #{destination_name}")
  raise toe
ensure
  conn.disconnect unless conn.nil?
end
register_adapter(adapter_name, adapter_class) click to toggle source
# File lib/activemessaging/gateway.rb, line 130
def register_adapter adapter_name, adapter_class
  Gateway.adapters[adapter_name] = adapter_class
end
reset() click to toggle source
# File lib/activemessaging/test_helper.rb, line 24
def self.reset
  unsubscribe
  disconnect
  @filters = []
  @subscriptions = {}
  @named_destinations = {}
  @processor_groups = {}
  @current_processor_group = nil
  @connections = {}
end
reset_application() click to toggle source
# File lib/activemessaging/gateway.rb, line 213
def reset_application
end
start() click to toggle source

Starts up an message listener to start polling for messages on each configured connection, and dispatching processing

# File lib/activemessaging/gateway.rb, line 28
def start

  # subscribe - creating connections along the way
  subscribe

  # for each connection, start a thread
  @connections.each do |name, conn|
    @connection_threads[name] = Thread.start do
      while @running
        begin
          Thread.current[:message] = nil
          Thread.current[:message] = conn.receive
        #catch these but then stop looping
        rescue StopProcessingException=>spe
          ActiveMessaging.logger.error "ActiveMessaging: thread[#{name}]: Processing Stopped - receive interrupted, will process last message if already received"
          # break
        #catch all others, but go back and try and recieve again
        rescue Object=>exception
          ActiveMessaging.logger.error "ActiveMessaging: thread[#{name}]: Exception from connection.receive: #{exception.message}\n" + exception.backtrace.join("\n\t")
        ensure
          if Thread.current[:message]
            @guard.synchronize {
              dispatch Thread.current[:message]
            }
            Thread.current[:message] = nil
          else
            # if there is no message at all, sleep
            # maybe this should be configurable
            sleep(1)
          end
        end
        Thread.pass
      end
      ActiveMessaging.logger.error "ActiveMessaging: thread[#{name}]: receive loop terminated"
    end
  end

  while @running
    trap("TERM", "EXIT")
    living = false
    @connection_threads.each { |name, thread| living ||=  thread.alive? }
    @running = living
    sleep(1)
  end
  ActiveMessaging.logger.error "All connection threads have died..."
rescue Interrupt
  ActiveMessaging.logger.error "\n<<Interrupt received>>\n"
rescue Object=>exception
  ActiveMessaging.logger.error "#{exception.class.name}: #{exception.message}\n\t#{exception.backtrace.join("\n\t")}"
  raise exception
ensure
  ActiveMessaging.logger.error "Cleaning up..."
  stop
  ActiveMessaging.logger.error "=> END"
end
stop() click to toggle source
# File lib/activemessaging/gateway.rb, line 84
def stop
  # first tell the threads to stop their looping, so they'll stop when next complete a receive/dispatch cycle
  @running = false

  # if they are dispatching (i.e. !thread[:message].nil?), wait for them to finish
  # if they are receiving (i.e. thread[:message].nil?), stop them by raising exception
  dispatching = true
  while dispatching
    dispatching = false
    @connection_threads.each do |name, thread|
      if thread[:message]
        dispatching = true
        # if thread got killed, but dispatch not done, try it again
        if thread.alive?
          ActiveMessaging.logger.error "Waiting on thread #{name} to finish processing last message..."
        else
          ActiveMessaging.logger.error "Starting thread #{name} to finish processing last message..."
          msg = thread[:message]
          thread.exit
          thread = Thread.start do
            begin
              Thread.current[:message] = msg
              dispatch Thread.current[:message]
            ensure
              Thread.current[:message] = nil
            end
          end
        end
      else
        thread.raise StopProcessingException, "Time to stop." if thread.alive?
      end
    end
    sleep(1)
  end
  unsubscribe
  disconnect
end
subscribe() click to toggle source
# File lib/activemessaging/gateway.rb, line 139
def subscribe
  subscriptions.each { |key, subscription| subscription.subscribe }
end
subscribe_to(destination_name, processor, headers={}) click to toggle source
# File lib/activemessaging/gateway.rb, line 289
def subscribe_to destination_name, processor, headers={}
  proc_name = processor.name.underscore
  proc_sym = processor.name.underscore.to_sym
  if (!current_processor_group || processor_groups[current_processor_group].include?(proc_sym))
    @subscriptions["#{proc_name}:#{destination_name}"]= Subscription.new(find_destination(destination_name), processor, headers)
  end
end
unsubscribe() click to toggle source
# File lib/activemessaging/gateway.rb, line 143
def unsubscribe
  subscriptions.each { |key, subscription| subscription.unsubscribe }
end