class ActiveMessaging::Adapters::Jms::Connection
Attributes
connection[RW]
consumers[RW]
producers[RW]
reliable[RW]
session[RW]
Public Class Methods
new(cfg={})
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 23 def initialize cfg={} @url = cfg[:url] @login = cfg[:login] @passcode = cfg[:passcode] #initialize our connection factory if cfg.has_key? :connection_factory #this initialize is probably activemq specific. There might be a more generic #way of getting this without resorting to jndi lookup. eval <<-end_eval @connection_factory = Java::#{cfg[:connection_factory]}.new(@login, @passcode, @url) end_eval elsif cfg.has_key? :jndi @connection_factory = javax.naming.InitialContext.new().lookup(cfg[:jndi]) else raise "Either jndi or connection_factory has to be set in the config." end raise "Connection factory could not be initialized." if @connection_factory.nil? @connection = @connection_factory.create_connection() @session = @connection.createSession(false, 1) @destinations = [] @producers = {} @consumers = {} @connection.start end
Public Instance Methods
close()
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 117 def close @consumers.each {|k, c| c.stop } @connection.stop @session.close @connection.close @connection = nil @session = nil @consumers = {} @producers = {} end
find_or_create_consumer(queue_name, headers={})
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 137 def find_or_create_consumer queue_name, headers={} consumer = @consumers[queue_name] if consumer.nil? destination = find_or_create_destination queue_name, headers if headers.symbolize_keys.has_key? :selector consumer = @session.create_consumer destination, headers.symbolize_keys[:selector] else consumer = @session.create_consumer destination end @consumers[queue_name] = consumer end consumer end
find_or_create_destination(queue_name, headers={})
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 152 def find_or_create_destination queue_name, headers={} destination = find_destination queue_name, headers[:destination_type] if destination.nil? if headers.symbolize_keys[:destination_type] == :topic destination = @session.create_topic(queue_name.to_s) @destinations << destination elsif headers.symbolize_keys[:destination_type] == :queue destination = @session.create_queue(queue_name.to_s) @destinations << destination else raise "headers[:destination_type] must be either :queue or :topic. was #{headers[:destination_type]}" end end destination end
find_or_create_producer(queue_name, headers={})
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 128 def find_or_create_producer queue_name, headers={} producer = @producers[queue_name] if producer.nil? destination = find_or_create_destination queue_name, headers producer = @session.create_producer destination end producer end
receive(options={})
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 89 def receive(options={}) queue_name = options[:queue_name] headers = options[:headers] || {} receive_message(queue_name, headers) end
receive_message(queue_name=nil, headers={})
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 95 def receive_message(queue_name=nil, headers={}) if queue_name.nil? @consumers.find do |k, c| message = c.receive(1) return condition_message(message) unless message.nil? end else consumer = subscribe(queue_name, headers) message = consumer.receive(1) unsubscribe(queue_name, headers) condition_message(message) end end
received(message, headers={})
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 109 def received message, headers={} #do nothing end
send(queue_name, body, headers={})
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 63 def send queue_name, body, headers={} queue_name = check_destination_type queue_name, headers producer = find_or_create_producer queue_name, headers.symbolize_keys message = @session.create_text_message body headers.stringify_keys.each do |key, value| if ['id', 'message-id', 'JMSMessageID'].include? key message.setJMSMessageID value.to_s elsif ['correlation-id', 'JMSCorrelationID'].include? key message.setJMSCorrelationID value.to_s elsif ['expires', 'JMSExpiration'].include? key message.setJMSExpiration value.to_i elsif ['persistent', 'JMSDeliveryMode'].include? key message.setJMSDeliveryMode(value ? 2 : 1) elsif ['priority', 'JMSPriority'].include? key message.setJMSPriority value.to_i elsif ['reply-to', 'JMSReplyTo'].include? key message.setJMSReplyTo value.to_s elsif ['type', 'JMSType'].include? key message.setJMSType value.to_s else #is this the most appropriate thing to do here? message.set_string_property key, value.to_s end end producer.send message end
subscribe(queue_name, headers={})
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 49 def subscribe queue_name, headers={} queue_name = check_destination_type queue_name, headers find_or_create_consumer queue_name, headers end
unreceive(message, headers={})
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 113 def unreceive message, headers={} # do nothing end
unsubscribe(queue_name, headers={})
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 54 def unsubscribe queue_name, headers={} queue_name = check_destination_type queue_name, headers consumer = @consumers[queue_name] unless consumer.nil? consumer.close @consumers.delete queue_name end end
Protected Instance Methods
check_destination_type(queue_name, headers)
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 188 def check_destination_type queue_name, headers stringy_h = headers.stringify_keys if queue_name =~ %r{^/(topic|queue)/(.*)$} && !stringy_h.has_key?('destination_type') headers['destination_type'] = $1.to_sym return $2 else raise "Must specify destination type either with either 'headers[\'destination_type\']=[:queue|:topic]' or /[topic|queue]/destination_name for queue name '#{queue_name}'" unless [:topic, :queue].include? stringy_h['destination_type'] end end
condition_message(message)
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 170 def condition_message message message.class.class_eval { alias_method :body, :text unless method_defined? :body def headers destination.to_s =~ %r{(queue|topic)://(.*)} puts "/#{$1}/#{$2}" {'destination' => "/#{$1}/#{$2}"} end def matches_subscription?(subscription) self.headers['destination'].to_s == subscription.value.to_s end } unless message.nil? || message.respond_to?(:headers) message end
find_destination(queue_name, type)
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 198 def find_destination queue_name, type @destinations.find do |d| if d.is_a?(javax.jms.Topic) && type == :topic d.topic_name == queue_name elsif d.is_a?(javax.jms.Queue) && type == :queue d.queue_name == queue_name end end end
headers()
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 174 def headers destination.to_s =~ %r{(queue|topic)://(.*)} puts "/#{$1}/#{$2}" {'destination' => "/#{$1}/#{$2}"} end
matches_subscription?(subscription)
click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 180 def matches_subscription?(subscription) self.headers['destination'].to_s == subscription.value.to_s end