Files

Class/Module Index [+]

Quicksearch

ActiveMessaging::Adapters::Adapter::Connection

Connection class needed by a13g

Public Class Methods

new(cfg) click to toggle source

Generic init method needed by a13g

# File lib/activemessaging/adapters/wmq.rb, line 33
def initialize(cfg)
  # Set default values
  cfg[:poll_interval] ||= 0.1

  # Initialize instance members
  # Trick for the connection_options is to allow settings WMQ constants directly in broker.yml :))
  @connection_options = cfg.each_pair {|key, value| cfg[key] = instance_eval(value) if (value.instance_of?(String) && value.match("WMQ::")) }
  @queue_names = []
  @current_queue = 0
  @queues = {}
end

Public Instance Methods

disconnect(headers = {}) click to toggle source

Disconnect method needed by a13g No need to disconnect from the queue manager since connection and disconnection occurs inside the send and receive methods headers is never used

# File lib/activemessaging/adapters/wmq.rb, line 48
def disconnect(headers = {})
end
receive(options={}) click to toggle source

Receive method needed by a13g

# File lib/activemessaging/adapters/wmq.rb, line 52
def receive(options={})
  raise "No subscription to receive messages from" if (@queue_names.nil? || @queue_names.empty?)
  start = @current_queue
  while true
    @current_queue = ((@current_queue < @queue_names.length-1) ? @current_queue + 1 : 0)
    sleep(@connection_options[:poll_interval]) if (@current_queue == start)
    q = @queues[@queue_names[@current_queue]]
    unless q.nil?
      message = retrieve_message(q)
      return message unless message.nil?
    end
  end
end
received(message, headers={}) click to toggle source

called after a message is successfully received and processed

# File lib/activemessaging/adapters/wmq.rb, line 117
def received message, headers={}
end
send(q_name, message_data, headers={}) click to toggle source

Send method needed by a13g headers may contains 2 different hashes to gives more control over the sending process

:descriptor => {...} to populate the descriptor of the message
:put_options => {...} to specify the put options for that message
# File lib/activemessaging/adapters/wmq.rb, line 70
def send(q_name, message_data, headers={})
  WMQ::QueueManager.connect(@connection_options) do |qmgr|
    qmgr.open_queue(:q_name => q_name, :mode => :output) do |queue|

      message_descriptor = headers[:descriptor] || {:format => WMQ::MQFMT_STRING}
      put_options = headers[:put_options].nil? ? {} : headers[:put_options].dup

      wmq_message = WMQ::Message.new(:data => message_data, :descriptor => message_descriptor)
      queue.put(put_options.merge(:message => wmq_message, :data => nil))
      return Message.new(wmq_message, q_name)
    end
  end
end
subscribe(q_name, headers={}, subId=NIL) click to toggle source

Subscribe method needed by a13g headers may contains a hash to give more control over the get operation on the queue

:get_options => {...} to specify the get options when receiving messages
Warning : get options are set only on the first queue subscription and are common to all the queue's subscriptions
          Any other get options passed with subsequent subscribe on an existing queue will be discarded

subId is never used

# File lib/activemessaging/adapters/wmq.rb, line 90
def subscribe(q_name, headers={}, subId=NIL)
  if @queues[q_name].nil?
    get_options = headers[:get_options] || {}
    q = Queue.new(q_name, get_options)
    @queues[q_name] = q
    @queue_names << q.name
  end

  q.add_subscription
end
unreceive(message, headers={}) click to toggle source

called after a message is successfully received but unsuccessfully processed purpose is to return the message to the destination so receiving and processing and be attempted again

# File lib/activemessaging/adapters/wmq.rb, line 122
def unreceive message, headers={}
end
unsubscribe(q_name, headers={}, subId=NIL) click to toggle source

Unsubscribe method needed by a13g Stop listening the queue only after the last unsubscription headers is never used subId is never used

# File lib/activemessaging/adapters/wmq.rb, line 105
def unsubscribe(q_name, headers={}, subId=NIL)
  q = @queues[q_name]
  unless q.nil?
    q.remove_subscription
    unless q.has_subscription?
      @queues.delete(q_name)
      @queue_names.delete(q_name)
    end
  end
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.