class ActiveMessaging::Adapters::ReliableMsgConnection

Constants

QUEUE_PARAMS
THREAD_OLD_TXS
TOPIC_PARAMS

Attributes

current_subscription[RW]

configurable params

destinations[RW]

configurable params

poll_interval[RW]

configurable params

subscriptions[RW]

configurable params

tx_timeout[RW]

configurable params

Public Class Methods

new(cfg) click to toggle source

generic init method needed by a13g

# File lib/activemessaging/adapters/reliable_msg.rb, line 33
def initialize cfg
  @poll_interval = cfg[:poll_interval]  || 1
  @reliable = cfg[:reliable]            || true
  @tx_timeout = cfg[:tx_timeout]        || ::ReliableMsg::Client::DEFAULT_TX_TIMEOUT

  @subscriptions = {}
  @destinations = {}
  @current_subscription = 0
end

Public Instance Methods

disconnect() click to toggle source

called to cleanly get rid of connection

# File lib/activemessaging/adapters/reliable_msg.rb, line 44
def disconnect
  nil
end
get_or_create_destination(destination_name, message_headers={}) click to toggle source
# File lib/activemessaging/adapters/reliable_msg.rb, line 80
def get_or_create_destination destination_name, message_headers={}
  return destinations[destination_name] if destinations.has_key? destination_name
  dd = /^\/(queue|topic)\/(.*)$/.match(destination_name)
  rm_class = dd[1].titleize
  message_headers.delete("id")
  dest_headers = message_headers.reject {|k,v| rm_class == 'Queue' ? !QUEUE_PARAMS.include?(k) : !TOPIC_PARAMS.include?(k)}
  rm_dest = "ReliableMsg::#{rm_class}".constantize.new(dd[2], dest_headers)
  destinations[destination_name] = rm_dest
end
receive(options={}) click to toggle source

receive a single message from any of the subscribed destinations check each destination once, then sleep for #poll_interval

# File lib/activemessaging/adapters/reliable_msg.rb, line 92
def receive(options={})

  raise "No subscriptions to receive messages from." if (subscriptions.nil? || subscriptions.empty?)
  start = current_subscription
  while true
    self.current_subscription = ((current_subscription < subscriptions.length-1) ? current_subscription + 1 : 0)
    sleep poll_interval if (current_subscription == start)
    destination_name = subscriptions.keys.sort[current_subscription]
    destination = destinations[destination_name]
    unless destination.nil?
      # from the way we use this, assume this is the start of a transaction, 
      # there should be no current transaction
      ctx = Thread.current[::ReliableMsg::Client::THREAD_CURRENT_TX]
      raise "There should not be an existing reliable-msg transaction. #{ctx.inspect}" if ctx

      # start a new transaction
      @tx = {:qm=>destination.queue_manager}
      @tx[:tid] = @tx[:qm].begin @tx_timeout
      Thread.current[::ReliableMsg::Client::THREAD_CURRENT_TX] = @tx
      begin

        # now call a get on the destination - it will use the transaction
        #the commit or the abort will occur in the received or unreceive methods
        reliable_msg = destination.get subscriptions[destination_name].headers[:selector]
        @tx[:qm].commit(@tx[:tid]) if reliable_msg.nil?

      rescue Object=>err
        #abort the transaction on error
        @tx[:qm].abort(@tx[:tid])

        raise err unless reliable
        puts "receive failed, will retry in #{@poll_interval} seconds"
        sleep poll_interval
      end
      return Message.new(reliable_msg.object, reliable_msg.id, reliable_msg.headers, destination_name, @tx) if reliable_msg
      
      Thread.current[::ReliableMsg::Client::THREAD_CURRENT_TX] = nil
    end
  end
end
received(message, headers={}) click to toggle source

called after a message is successfully received and processed

# File lib/activemessaging/adapters/reliable_msg.rb, line 134
def received message, headers={}
  begin
    message.transaction[:qm].commit(message.transaction[:tid]) 
  rescue Object=>ex
    puts "received failed: #{ex.message}"
  ensure
    Thread.current[::ReliableMsg::Client::THREAD_CURRENT_TX] = nil
  end
  
end
send(destination_name, message_body, message_headers={}) click to toggle source

destination_name string, body string, headers hash send a single message to a destination

# File lib/activemessaging/adapters/reliable_msg.rb, line 69
def send destination_name, message_body, message_headers={}
  dest = get_or_create_destination(destination_name)
  begin
    dest.put message_body, message_headers
  rescue Object=>err
    raise err unless reliable
    puts "send failed, will retry in #{@poll_interval} seconds"
    sleep @poll_interval
  end
end
subscribe(destination_name, message_headers={}) click to toggle source

destination_name string, headers hash subscribe to listen on a destination use '/destination-type/name' convetion, like stomp

# File lib/activemessaging/adapters/reliable_msg.rb, line 51
def subscribe destination_name, message_headers={}
  get_or_create_destination(destination_name, message_headers)
  if subscriptions.has_key? destination_name
    subscriptions[destination_name].add
  else
    subscriptions[destination_name] = Subscription.new(destination_name, message_headers)
  end
end
unreceive(message, headers={}) click to toggle source

called after a message is successfully received and processed

# File lib/activemessaging/adapters/reliable_msg.rb, line 146
def unreceive message, headers={}
  begin
    message.transaction[:qm].abort(message.transaction[:tid])
  rescue Object=>ex
    puts "unreceive failed: #{ex.message}"
  ensure
    Thread.current[::ReliableMsg::Client::THREAD_CURRENT_TX] = nil
  end
end
unsubscribe(destination_name, message_headers={}) click to toggle source

destination_name string, headers hash unsubscribe to listen on a destination

# File lib/activemessaging/adapters/reliable_msg.rb, line 62
def unsubscribe destination_name, message_headers={}
  subscriptions[destination_name].remove
  subscriptions.delete(destination_name) if subscriptions[destination_name].count <= 0
end