Files

Class/Module Index [+]

Quicksearch

ActiveMessaging::Adapters::AmazonSqs::Connection

Attributes

access_key_id[RW]

configurable params

aws_version[RW]

configurable params

cache_queue_list[RW]

configurable params

content_type[RW]

configurable params

host[RW]

configurable params

max_message_size[RW]

configurable params

poll_interval[RW]

configurable params

port[RW]

configurable params

reconnect_delay[RW]

configurable params

secret_access_key[RW]

configurable params

Public Class Methods

new(cfg) click to toggle source

generic init method needed by a13g

# File lib/activemessaging/adapters/asqs.rb, line 30
def initialize cfg
  raise "Must specify a access_key_id" if (cfg[:access_key_id].nil? || cfg[:access_key_id].empty?)
  raise "Must specify a secret_access_key" if (cfg[:secret_access_key].nil? || cfg[:secret_access_key].empty?)

  @access_key_id=cfg[:access_key_id]
  @secret_access_key=cfg[:secret_access_key]
  @request_expires = cfg[:requestExpires]         || 10
  @request_retry_count = cfg[:requestRetryCount]  || 5
  @aws_version = cfg[:aws_version]                || '2008-01-01'
  @content_type = cfg[:content_type]              || 'text/plain'
  @host = cfg[:host]                              || 'queue.amazonaws.com'
  @port = cfg[:port]                              || 80
  @protocol = cfg[:protocol]                      || 'http'
  @poll_interval = cfg[:poll_interval]            || 1
  @reconnect_delay = cfg[:reconnectDelay]         || 5

  @max_message_size = cfg[:max_message_size].to_i > 0 ? cfg[:max_message_size].to_i : 8

  @aws_url="#{@protocol}://#{@host}"

  @cache_queue_list = cfg[:cache_queue_list].nil? ? true : cfg[:cache_queue_list]
  @reliable =         cfg[:reliable].nil?         ? true : cfg[:reliable]

  #initialize the subscriptions and queues
  @subscriptions = {}
  @queues_by_priority = {}
  @current_subscription = 0
  queues
end

Public Instance Methods

disconnect() click to toggle source
# File lib/activemessaging/adapters/asqs.rb, line 60
def disconnect
  #it's an http request - there is no disconnect - ha!
  return true
end
receive(options={}) click to toggle source

new receive respects priorities

# File lib/activemessaging/adapters/asqs.rb, line 115
def receive(options={})
  message = nil

  only_priorities = options[:priorities]

  # loop through the priorities
  @queues_by_priority.keys.sort.each do |priority|

    # skip this priority if there is a list, and it is not in the list
    next if only_priorities && !only_priorities.include?(priority.to_i)

    # puts " - priority: #{priority}"
    # loop through queues for the priority in random order each time
    @queues_by_priority[priority].shuffle.each do |queue_name|
      # puts "   - queue_name: #{queue_name}"
      queue = queues[queue_name]
      subscription = @subscriptions[queue_name]

      next if queue.nil? || subscription.nil?
      messages = retrieve_messsages(queue, 1, subscription.headers[:visibility_timeout])
      
      if (messages && !messages.empty?)
        message = messages[0]
      end

      break if message
    end

    break if message
  end

  # puts " - message: #{message}"
  message
end
received(message, headers={}) click to toggle source

# receive a single message from any of the subscribed queues # check each queue once, then sleep for poll_interval def receive

raise "No subscriptions to receive messages from." if (@subscriptions.nil? || @subscriptions.empty?)
start = @current_subscription
while true
  # puts "calling receive..."
  @current_subscription = ((@current_subscription < @subscriptions.length-1) ? @current_subscription + 1 : 0)
  sleep poll_interval if (@current_subscription == start)
  queue_name = @subscriptions.keys.sort[@current_subscription]
  queue = queues[queue_name]
  subscription = @subscriptions[queue_name]
  unless queue.nil?
    messages = retrieve_messsages queue, 1, subscription.headers[:visibility_timeout]
    return messages[0] unless (messages.nil? or messages.empty? or messages[0].nil?)
  end
end

end

# File lib/activemessaging/adapters/asqs.rb, line 169
def received message, headers={}
  begin
    delete_message message
  rescue Object=>exception
    logger.error "Exception in ActiveMessaging::Adapters::AmazonSWS::Connection.received() logged and ignored: "
    logger.error exception
  end
end
send(queue_name, message_body, message_headers={}) click to toggle source

queue_name string, body string, headers hash send a single message to a queue

# File lib/activemessaging/adapters/asqs.rb, line 95
def send queue_name, message_body, message_headers={}
  queue = get_or_create_queue queue_name
  send_messsage queue, message_body
end
subscribe(queue_name, message_headers={}) click to toggle source

queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues

# File lib/activemessaging/adapters/asqs.rb, line 67
def subscribe queue_name, message_headers={}
  # look at the existing queues, create any that are missing
  queue = get_or_create_queue queue_name
  if @subscriptions.has_key? queue.name
    @subscriptions[queue.name].add
  else
    @subscriptions[queue.name] = Subscription.new(queue.name, message_headers)
  end
  priority = @subscriptions[queue.name].priority

  @queues_by_priority[priority] = [] unless @queues_by_priority.has_key?(priority)
  @queues_by_priority[priority] << queue.name unless @queues_by_priority[priority].include?(queue.name)
end
unreceive(message, headers={}) click to toggle source

do nothing; by not deleting the message will eventually become visible again

# File lib/activemessaging/adapters/asqs.rb, line 179
def unreceive message, headers={}
  return true
end
unsubscribe(queue_name, message_headers={}) click to toggle source

queue_name string, headers hash for sqs, attempt delete the queues, won’t work if not empty, that’s ok

# File lib/activemessaging/adapters/asqs.rb, line 83
def unsubscribe queue_name, message_headers={}
  if @subscriptions[queue_name]
    @subscriptions[queue_name].remove
    if @subscriptions[queue_name].count <= 0
      sub = @subscriptions.delete(queue_name)
      @queues_by_priority[sub.priority].delete(queue_name)
    end
  end
end

Protected Instance Methods

check_errors(response) click to toggle source
# File lib/activemessaging/adapters/asqs.rb, line 310
def check_errors(response)
  raise "http response was nil" if (response.nil?)
  raise response.errors if (response && response.errors?)
  response
end
create_queue(name) click to toggle source
# File lib/activemessaging/adapters/asqs.rb, line 185
def create_queue(name)
  validate_new_queue name
        response = make_request('CreateQueue', nil, {'QueueName'=>name})
  add_queue(response.get_text("//QueueUrl")) unless response.nil?
end
delete_message(message) click to toggle source
# File lib/activemessaging/adapters/asqs.rb, line 251
def delete_message message
  response = make_request('DeleteMessage', "#{message.queue.queue_url}", {'ReceiptHandle'=>message.receipt_handle})
end
delete_queue(queue) click to toggle source
# File lib/activemessaging/adapters/asqs.rb, line 191
def delete_queue queue
  validate_queue queue
  response = make_request('DeleteQueue', "#{queue.queue_url}")
end
get_queue_attributes(queue, attribute='All') click to toggle source
# File lib/activemessaging/adapters/asqs.rb, line 203
def get_queue_attributes(queue, attribute='All')
  validate_get_queue_attribute(attribute)
  params = {'AttributeName'=>attribute}
  response = make_request('GetQueueAttributes', "#{queue.queue_url}")
  attributes = {}
  response.each_node('/GetQueueAttributesResponse/GetQueueAttributesResult/Attribute') { |n|
    n = n.elements['Name'].text
    v = n.elements['Value'].text
    attributes[n] = v
  }
  if attribute != 'All'
    attributes[attribute]
  else
    attributes
  end
end
http_request(h, p, r) click to toggle source

I wrap this so I can move to a different client, or easily mock for testing

# File lib/activemessaging/adapters/asqs.rb, line 298
def http_request h, p, r
  return Net::HTTP.start(h, p){ |http| http.request(r) }
end
list_queues(queue_name_prefix=nil) click to toggle source
# File lib/activemessaging/adapters/asqs.rb, line 196
def list_queues(queue_name_prefix=nil)
  validate_queue_name queue_name_prefix unless queue_name_prefix.nil?
  params = queue_name_prefix.nil? ? {} : {"QueueNamePrefix"=>queue_name_prefix}
        response = make_request('ListQueues', nil, params)
  response.nil? ? [] : response.nodes("//QueueUrl").collect{ |n| add_queue(n.text) }
end
make_request(action, url=nil, params = {}) click to toggle source
# File lib/activemessaging/adapters/asqs.rb, line 255
def make_request(action, url=nil, params = {})
  # puts "make_request a=#{action} u=#{url} p=#{params}"
  url ||= @aws_url
  
        # Add Actions
        params['Action'] = action
        params['Version'] = @aws_version
        params['AWSAccessKeyId'] = @access_key_id
        params['Expires']= (Time.now + @request_expires).gmtime.iso8601
        params['SignatureVersion'] = '1'

        # Sign the string
        sorted_params = params.sort_by { |key,value| key.downcase }
        string_to_sign = sorted_params.collect { |key, value| key.to_s + value.to_s }.join()
        digest = OpenSSL::Digest::Digest.new('sha1')
  hmac = OpenSSL::HMAC.digest(digest, @secret_access_key, string_to_sign)
  params['Signature'] = Base64.encode64(hmac).chomp

  # Construct request
  query_params = params.collect { |key, value| key + "=" + CGI.escape(value.to_s) }.join("&")

  # Put these together to get the request query string
  request_url = "#{url}?#{query_params}"
  # puts "request_url = #{request_url}"
  request = Net::HTTP::Get.new(request_url)

  retry_count = 0
  while retry_count < @request_retry_count.to_i
          retry_count = retry_count + 1
    # puts "make_request try retry_count=#{retry_count}"
    begin
      response = SQSResponse.new(http_request(host,port,request))
      check_errors(response)
      return response
    rescue Object=>ex
      # puts "make_request caught #{ex}"
      raise ex unless reliable
                  sleep(@reconnect_delay)
    end
  end
end
retrieve_messsages(queue, num_messages=1, timeout=nil) click to toggle source
# File lib/activemessaging/adapters/asqs.rb, line 239
def retrieve_messsages queue, num_messages=1, timeout=nil
  validate_queue queue
  validate_number_of_messages num_messages
  validate_timeout timeout if timeout

  params = {'MaxNumberOfMessages'=>num_messages.to_s}
  params['VisibilityTimeout'] = timeout.to_s if timeout

  response = make_request('ReceiveMessage', "#{queue.queue_url}", params)
  response.nodes("//Message").collect{ |n| Message.from_element n, response, queue } unless response.nil?
end
send_messsage(queue, message) click to toggle source

in progress

# File lib/activemessaging/adapters/asqs.rb, line 232
def send_messsage queue, message
  validate_queue queue
  validate_message message
  response = make_request('SendMessage', queue.queue_url, {'MessageBody'=>message})
  response.get_text("//MessageId") unless response.nil?
end
set_queue_attribute(queue, attribute, value) click to toggle source
# File lib/activemessaging/adapters/asqs.rb, line 220
def set_queue_attribute(queue, attribute, value)
  validate_set_queue_attribute(attribute)
  params = {'Attribute.Name'=>attribute, 'Attribute.Value'=>value.to_s}
  response = make_request('SetQueueAttributes', "#{queue.queue_url}", params)
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.