Parent

Included Modules

Files

Class/Module Index [+]

Quicksearch

ActiveMessaging::Adapters::Jms::Connection

Attributes

connection[RW]
consumers[RW]
producers[RW]
reliable[RW]
session[RW]

Public Class Methods

new(cfg={}) click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 23
def initialize cfg={}
  @url = cfg[:url]
  @login = cfg[:login]
  @passcode = cfg[:passcode]
  #initialize our connection factory
  if cfg.has_key? :connection_factory
    #this initialize is probably activemq specific. There might be a more generic
    #way of getting this without resorting to jndi lookup.
    eval               @connection_factory = Java::#{cfg[:connection_factory]}.new(@login, @passcode, @url)
  elsif cfg.has_key? :jndi
    @connection_factory = javax.naming.InitialContext.new().lookup(cfg[:jndi])
  else
    raise "Either jndi or connection_factory has to be set in the config."
  end
  raise "Connection factory could not be initialized." if @connection_factory.nil?
  
  @connection = @connection_factory.create_connection()
  @session = @connection.createSession(false, 1)
  @destinations = []
  @producers = {}
  @consumers = {}
  @connection.start
end

Public Instance Methods

close() click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 117
def close
  @consumers.each {|k, c| c.stop }
  @connection.stop
  @session.close
  @connection.close
  @connection = nil
  @session = nil
  @consumers = {}
  @producers = {}
end
find_or_create_consumer(queue_name, headers={}) click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 137
def find_or_create_consumer queue_name, headers={}
  consumer = @consumers[queue_name]
  if consumer.nil?
    destination = find_or_create_destination queue_name, headers
    if headers.symbolize_keys.has_key? :selector
      consumer = @session.create_consumer destination, headers.symbolize_keys[:selector]
    else
      consumer = @session.create_consumer destination
    end
    
    @consumers[queue_name] = consumer
  end
  consumer
end
find_or_create_destination(queue_name, headers={}) click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 152
def find_or_create_destination queue_name, headers={}
  destination = find_destination queue_name, headers[:destination_type]
  if destination.nil?
    if headers.symbolize_keys[:destination_type] == :topic
      destination = @session.create_topic(queue_name.to_s)
      @destinations << destination
    elsif headers.symbolize_keys[:destination_type] == :queue
      destination = @session.create_queue(queue_name.to_s)
      @destinations << destination
    else
      raise "headers[:destination_type] must be either :queue or :topic.  was #{headers[:destination_type]}"
    end
  end
  destination
end
find_or_create_producer(queue_name, headers={}) click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 128
def find_or_create_producer queue_name, headers={}
  producer = @producers[queue_name]
  if producer.nil?
    destination = find_or_create_destination queue_name, headers
    producer = @session.create_producer destination
  end
  producer
end
receive(options={}) click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 89
def receive(options={})
  queue_name = options[:queue_name]
  headers = options[:headers] || {}
  receive_message(queue_name, headers)
end
receive_message(queue_name=nil, headers={}) click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 95
def receive_message(queue_name=nil, headers={})
  if queue_name.nil?
    @consumers.find do |k, c|
      message = c.receive(1)
      return condition_message(message) unless message.nil?
    end
  else
    consumer = subscribe(queue_name, headers)
    message = consumer.receive(1)
    unsubscribe(queue_name, headers)
    condition_message(message)
  end
end
received(message, headers={}) click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 109
def received message, headers={}
  #do nothing
end
send(queue_name, body, headers={}) click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 63
def send queue_name, body, headers={}
  queue_name = check_destination_type queue_name, headers
  producer = find_or_create_producer queue_name, headers.symbolize_keys
  message = @session.create_text_message body
  headers.stringify_keys.each do |key, value|
    if ['id', 'message-id', 'JMSMessageID'].include? key
      message.setJMSMessageID value.to_s
    elsif ['correlation-id', 'JMSCorrelationID'].include? key
      message.setJMSCorrelationID value.to_s
    elsif ['expires', 'JMSExpiration'].include? key
      message.setJMSExpiration value.to_i
    elsif ['persistent', 'JMSDeliveryMode'].include? key
      message.setJMSDeliveryMode(value ? 2 : 1)
    elsif ['priority', 'JMSPriority'].include? key
      message.setJMSPriority value.to_i
    elsif ['reply-to', 'JMSReplyTo'].include? key
      message.setJMSReplyTo value.to_s
    elsif ['type', 'JMSType'].include? key
      message.setJMSType value.to_s
    else #is this the most appropriate thing to do here?
      message.set_string_property key, value.to_s
    end
  end
  producer.send message
end
subscribe(queue_name, headers={}) click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 49
def subscribe queue_name, headers={}
  queue_name = check_destination_type queue_name, headers
  find_or_create_consumer queue_name, headers
end
unreceive(message, headers={}) click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 113
def unreceive message, headers={}
  # do nothing
end
unsubscribe(queue_name, headers={}) click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 54
def unsubscribe queue_name, headers={}
  queue_name = check_destination_type queue_name, headers
  consumer = @consumers[queue_name]
  unless consumer.nil?
    consumer.close
    @consumers.delete queue_name
  end
end

Protected Instance Methods

check_destination_type(queue_name, headers) click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 188
def check_destination_type queue_name, headers
  stringy_h = headers.stringify_keys
  if queue_name =~ %{^/(topic|queue)/(.*)$}  && !stringy_h.has_key?('destination_type')
    headers['destination_type'] = $1.to_sym
    return $2
  else
    raise "Must specify destination type either with either 'headers[\'destination_type\']=[:queue|:topic]' or /[topic|queue]/destination_name for queue name '#{queue_name}'" unless [:topic, :queue].include? stringy_h['destination_type']
  end
end
condition_message(message) click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 170
def condition_message message
  message.class.class_eval { 
    alias_method :body, :text unless method_defined? :body

    def headers
      destination.to_s =~ %{(queue|topic)://(.*)}
      puts "/#{$1}/#{$2}"
      {'destination' => "/#{$1}/#{$2}"}
    end
    
    def matches_subscription?(subscription)
      self.headers['destination'].to_s == subscription.value.to_s
    end
    
  } unless message.nil? || message.respond_to?(:headers)
  message
end
find_destination(queue_name, type) click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 198
def find_destination queue_name, type
  @destinations.find do |d| 
    if d.is_a?(javax.jms.Topic) && type == :topic
      d.topic_name == queue_name
    elsif d.is_a?(javax.jms.Queue) && type == :queue
      d.queue_name == queue_name
    end
  end
end
headers() click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 174
def headers
  destination.to_s =~ %{(queue|topic)://(.*)}
  puts "/#{$1}/#{$2}"
  {'destination' => "/#{$1}/#{$2}"}
end
matches_subscription?(subscription) click to toggle source
# File lib/activemessaging/adapters/jms.rb, line 180
def matches_subscription?(subscription)
  self.headers['destination'].to_s == subscription.value.to_s
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.