RightAws::SqsInterface

Public Class Methods

api() click to toggle source
# File lib/sqs/right_sqs_interface.rb, line 46
def self.api 
  @@api
end
bench_sqs() click to toggle source
# File lib/sqs/right_sqs_interface.rb, line 41
def self.bench_sqs
  @@bench.service
end
bench_xml() click to toggle source
# File lib/sqs/right_sqs_interface.rb, line 38
def self.bench_xml
  @@bench.xml
end
new(aws_access_key_id=nil, aws_secret_access_key=nil, params={}) click to toggle source

Creates a new SqsInterface instance.

sqs = RightAws::SqsInterface.new('1E3GDYEOGFJPIT75KDT40','hgTHt68JY07JKUY08ftHYtERkjgtfERn57DFE379', {:multi_thread => true, :logger => Logger.new('/tmp/x.log')})

Params is a hash:

{:server       => 'queue.amazonaws.com' # Amazon service host: 'queue.amazonaws.com'(default)
 :port         => 443                   # Amazon service port: 80 or 443(default)
 :multi_thread => true|false            # Multi-threaded (connection per each thread): true or false(default)
 :signature_version => '0'              # The signature version : '0' or '1'(default)
 :logger       => Logger Object}        # Logger instance: logs to STDOUT if omitted }
# File lib/sqs/right_sqs_interface.rb, line 62
def initialize(aws_access_key_id=nil, aws_secret_access_key=nil, params={})
  init({ :name             => 'SQS', 
         :default_host     => ENV['SQS_URL'] ? URI.parse(ENV['SQS_URL']).host   : DEFAULT_HOST, 
         :default_port     => ENV['SQS_URL'] ? URI.parse(ENV['SQS_URL']).port   : DEFAULT_PORT, 
         :default_protocol => ENV['SQS_URL'] ? URI.parse(ENV['SQS_URL']).scheme : DEFAULT_PROTOCOL }, 
       aws_access_key_id     || ENV['AWS_ACCESS_KEY_ID'], 
       aws_secret_access_key || ENV['AWS_SECRET_ACCESS_KEY'], 
       params)
end
queue_name_by_url(queue_url) click to toggle source

Returns short queue name by url.

RightSqs.queue_name_by_url('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> 'my_awesome_queue'
# File lib/sqs/right_sqs_interface.rb, line 381
def self.queue_name_by_url(queue_url)
  queue_url[/[^\/]*$/]
rescue
  on_exception
end

Public Instance Methods

add_grant(queue_url, grantee_email_address, permission = nil) click to toggle source

Adds grants for user (identified by email he registered at Amazon). Returns true or an exception. Permission = 'FULLCONTROL' | 'RECEIVEMESSAGE' | 'SENDMESSAGE'.

sqs.add_grant('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 'my_awesome_friend@gmail.com', 'FULLCONTROL') #=> true
# File lib/sqs/right_sqs_interface.rb, line 239
def add_grant(queue_url, grantee_email_address, permission = nil)
  req_hash = generate_request('AddGrant', 
                              'Grantee.EmailAddress' => grantee_email_address,
                              'Permission'           => permission,
                              :queue_url             => queue_url)
  request_info(req_hash, SqsStatusParser.new(:logger => @logger))
rescue
  on_exception
end
change_message_visibility(queue_url, message_id, visibility_timeout=0) click to toggle source

Changes message visibility timeout. Returns true or an exception.

sqs.change_message_visibility('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', '1234567890...0987654321', 10) #=> true
# File lib/sqs/right_sqs_interface.rb, line 352
def change_message_visibility(queue_url, message_id, visibility_timeout=0)
  req_hash = generate_request('ChangeMessageVisibility', 
                              'MessageId'         => message_id,
                              'VisibilityTimeout' => visibility_timeout.to_s,
                              :queue_url          => queue_url)
  request_info(req_hash, SqsStatusParser.new(:logger => @logger))
rescue
  on_exception
end
clear_queue(queue_url) click to toggle source

Removes all visible messages from queue. Return true or an exception.

sqs.clear_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> true
# File lib/sqs/right_sqs_interface.rb, line 411
def clear_queue(queue_url)
  while (m = pop_message(queue_url)) ; end   # delete all messages in queue
  true
rescue
  on_exception
end
create_queue(queue_name, default_visibility_timeout=nil) click to toggle source

Creates new queue. Returns new queue link.

sqs.create_queue('my_awesome_queue') #=> 'http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue'

PS Some queue based requests may not become available until a couple of minutes after queue creation (permission grant and removal for example)

# File lib/sqs/right_sqs_interface.rb, line 144
def create_queue(queue_name, default_visibility_timeout=nil)
  req_hash = generate_request('CreateQueue', 
                              'QueueName'                => queue_name,
                              'DefaultVisibilityTimeout' => default_visibility_timeout || DEFAULT_VISIBILITY_TIMEOUT )
  request_info(req_hash, SqsCreateQueueParser.new(:logger => @logger))
end
delete_message(queue_url, message_id) click to toggle source

Deletes message from queue. Returns true or an exception. Amazon returns true on deletion of non-existent messages.

sqs.delete_message('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', '12345678904...0987654321') #=> true
# File lib/sqs/right_sqs_interface.rb, line 339
def delete_message(queue_url, message_id)
  req_hash = generate_request('DeleteMessage', 
                              'MessageId' => message_id,
                              :queue_url  => queue_url)
  request_info(req_hash, SqsStatusParser.new(:logger => @logger))
rescue
  on_exception
end
delete_queue(queue_url, force_deletion = false) click to toggle source

Deletes queue (queue must be empty or force_deletion must be set to true). Queue is identified by url. Returns true or an exception.

sqs.delete_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue_2') #=> true
# File lib/sqs/right_sqs_interface.rb, line 168
def delete_queue(queue_url, force_deletion = false)
  req_hash = generate_request('DeleteQueue', 
                              'ForceDeletion' => force_deletion.to_s,
                              :queue_url      => queue_url)
  request_info(req_hash, SqsStatusParser.new(:logger => @logger))
rescue
  on_exception
end
force_clear_queue(queue_url) click to toggle source

Deletes queue then re-creates it (restores attributes also). The fastest method to clear big queue or queue with invisible messages. Return true or an exception.

sqs.force_clear_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> true

PS This function is no longer supported. Amazon has changed the SQS semantics to require at least 60 seconds between queue deletion and creation. Hence this method will fail with an exception.

# File lib/sqs/right_sqs_interface.rb, line 425
def force_clear_queue(queue_url)
  queue_name       = queue_name_by_url(queue_url)
  queue_attributes = get_queue_attributes(queue_url)
  force_delete_queue(queue_url)
  create_queue(queue_name)
    # hmmm... The next line is a trick. Amazon do not want change attributes immediately after queue creation
    # So we do 'empty' get_queue_attributes. Probably they need some time to allow attributes change.
  get_queue_attributes(queue_url)  
  queue_attributes.each{ |attribute, value| set_queue_attributes(queue_url, attribute, value) }
  true
rescue
  on_exception
end
force_delete_queue(queue_url) click to toggle source

Deletes queue even if it has messages. Return true or an exception.

force_delete_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> true

P.S. same as delete_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', true)

# File lib/sqs/right_sqs_interface.rb, line 444
def force_delete_queue(queue_url)
  delete_queue(queue_url, true)
rescue
  on_exception
end
get_queue_attributes(queue_url, attribute='All') click to toggle source

Retrieves the queue attribute(s). Returns a hash of attribute(s) or an exception.

sqs.get_queue_attributes('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> {"ApproximateNumberOfMessages"=>"0", "VisibilityTimeout"=>"30"}
# File lib/sqs/right_sqs_interface.rb, line 181
def get_queue_attributes(queue_url, attribute='All')
  req_hash = generate_request('GetQueueAttributes', 
                              'Attribute' => attribute,
                              :queue_url  => queue_url)
  request_info(req_hash, SqsGetQueueAttributesParser.new(:logger => @logger))
rescue
  on_exception
end
get_queue_length(queue_url) click to toggle source

Returns approximate number of messages in queue.

sqs.get_queue_length('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> 3
# File lib/sqs/right_sqs_interface.rb, line 401
def get_queue_length(queue_url)
  get_queue_attributes(queue_url)['ApproximateNumberOfMessages'].to_i
rescue
  on_exception
end
get_visibility_timeout(queue_url) click to toggle source

Retrieves visibility timeout.

sqs.get_visibility_timeout('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> 15

See also: get_queue_attributes

# File lib/sqs/right_sqs_interface.rb, line 228
def get_visibility_timeout(queue_url)
  req_hash = generate_request('GetVisibilityTimeout', :queue_url => queue_url )
  request_info(req_hash, SqsGetVisibilityTimeoutParser.new(:logger => @logger))
rescue
  on_exception
end
list_grants(queue_url, grantee_email_address=nil, permission = nil) click to toggle source

Retrieves hash of grantee_id => perms for this queue:

sqs.list_grants('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=>
  {"000000000000000000000001111111111117476c7fea6efb2c3347ac3ab2792a"=>{:name=>"root", :perms=>["FULLCONTROL"]},
   "00000000000000000000000111111111111e5828344600fc9e4a784a09e97041"=>{:name=>"myawesomefriend", :perms=>["FULLCONTROL"]}
# File lib/sqs/right_sqs_interface.rb, line 255
def list_grants(queue_url, grantee_email_address=nil, permission = nil)
  req_hash = generate_request('ListGrants', 
                              'Grantee.EmailAddress' => grantee_email_address,
                              'Permission'           => permission,
                              :queue_url             => queue_url)
  response = request_info(req_hash, SqsListGrantsParser.new(:logger => @logger))
    # One user may have up to 3 permission records for every queue.
    # We will join these records to one.
  result = {}    
  response.each do |perm|
    id = perm[:id]
      # create hash for new user if unexisit
    result[id] = {:perms=>[]} unless result[id]
      # fill current grantee params
    result[id][:perms] << perm[:permission]
    result[id][:name] = perm[:name]
  end
  result
end
list_queues(queue_name_prefix=nil) click to toggle source

Lists all queues owned by this user that have names beginning with queue_name_prefix. If queue_name_prefix is omitted then retrieves a list of all queues.

sqs.create_queue('my_awesome_queue')
sqs.create_queue('my_awesome_queue_2')
sqs.list_queues('my_awesome') #=> ['http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue','http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue_2']
# File lib/sqs/right_sqs_interface.rb, line 157
def list_queues(queue_name_prefix=nil)
  req_hash = generate_request('ListQueues', 'QueueNamePrefix' => queue_name_prefix)
  request_info(req_hash, SqsListQueuesParser.new(:logger => @logger))
rescue
  on_exception
end
peek_message(queue_url, message_id) click to toggle source

Peeks message from queue by message id. Returns message in format of {:id=>'message_id', :body=>'message_body'} or nil.

sqs.peek_message('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', '1234567890...0987654321') #=>
  {:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}
# File lib/sqs/right_sqs_interface.rb, line 313
def peek_message(queue_url, message_id)
  req_hash = generate_rest_request('GET', :queue_url => "#{queue_url}/#{CGI::escape message_id}" )
  messages = request_info(req_hash, SqsReceiveMessagesParser.new(:logger => @logger))
  messages.blank? ? nil : messages[0]
rescue
  on_exception
end
pop_message(queue_url) click to toggle source

Pops (retrieves and deletes) first accessible message from queue. Returns the message in format {:id=>'message_id', :body=>'message_body'} or nil.

sqs.pop_message('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=>
  {:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}
# File lib/sqs/right_sqs_interface.rb, line 485
def pop_message(queue_url)
  messages = pop_messages(queue_url)
  messages.blank? ? nil : messages[0]
rescue
  on_exception
end
pop_messages(queue_url, number_of_messages=1) click to toggle source

Pops (retrieves and deletes) up to 'number_of_messages' from queue. Returns an array of retrieved messages in format: [{:id=>'message_id', :body=>'message_body'}].

sqs.pop_messages('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 3) #=>
 [{:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}, ..., {}]
# File lib/sqs/right_sqs_interface.rb, line 470
def pop_messages(queue_url, number_of_messages=1)
  messages = receive_messages(queue_url, number_of_messages)
  messages.each do |message|
    delete_message(queue_url, message[:id])
  end
  messages
rescue
  on_exception
end
push_message(queue_url, message) click to toggle source

Same as send_message

Alias for: send_message
queue_name_by_url(queue_url) click to toggle source

Returns short queue name by url.

sqs.queue_name_by_url('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> 'my_awesome_queue'
# File lib/sqs/right_sqs_interface.rb, line 391
def queue_name_by_url(queue_url)
  self.class.queue_name_by_url(queue_url)
rescue
  on_exception
end
queue_url_by_name(queue_name) click to toggle source

Returns queue url by queue short name or nil if queue is not found

sqs.queue_url_by_name('my_awesome_queue') #=> 'http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue'
# File lib/sqs/right_sqs_interface.rb, line 366
def queue_url_by_name(queue_name)
  return queue_name if queue_name.include?('/')
  queue_urls = list_queues(queue_name)
  queue_urls.each do |queue_url|
    return queue_url if queue_name_by_url(queue_url) == queue_name
  end
  nil
rescue
  on_exception
end
receive_message(queue_url, visibility_timeout=nil) click to toggle source

Reads first accessible message from queue. Returns message as a hash: {:id=>'message_id', :body=>'message_body'} or nil.

sqs.receive_message('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 10) #=>
  {:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}
# File lib/sqs/right_sqs_interface.rb, line 455
def receive_message(queue_url, visibility_timeout=nil)
  result = receive_messages(queue_url, 1, visibility_timeout)
  result.blank? ? nil : result[0]
rescue
  on_exception
end
receive_messages(queue_url, number_of_messages=1, visibility_timeout=nil) click to toggle source

Retrieves a list of messages from queue. Returns an array of hashes in format: {:id=>'message_id', body=>'message_body'}

sqs.receive_messages('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue',10, 5) #=>
 [{:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}, ..., {}]

P.S. Usually returns fewer messages than requested even if they are available.

# File lib/sqs/right_sqs_interface.rb, line 297
def receive_messages(queue_url, number_of_messages=1, visibility_timeout=nil)
  return [] if number_of_messages == 0
  req_hash = generate_rest_request('GET',
                                   'NumberOfMessages'  => number_of_messages,
                                   'VisibilityTimeout' => visibility_timeout,
                                   :queue_url          => "#{queue_url}/front" )
  request_info(req_hash, SqsReceiveMessagesParser.new(:logger => @logger))
rescue
  on_exception
end
remove_grant(queue_url, grantee_email_address_or_id, permission = nil) click to toggle source

Revokes permission from user. Returns true or an exception.

sqs.remove_grant('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 'my_awesome_friend@gmail.com', 'FULLCONTROL') #=> true
# File lib/sqs/right_sqs_interface.rb, line 279
def remove_grant(queue_url, grantee_email_address_or_id, permission = nil)
  grantee_key = grantee_email_address_or_id.include?('@') ? 'Grantee.EmailAddress' : 'Grantee.ID'
  req_hash = generate_request('RemoveGrant', 
                              grantee_key  => grantee_email_address_or_id,
                              'Permission' => permission,
                              :queue_url   => queue_url)
  request_info(req_hash, SqsStatusParser.new(:logger => @logger))
rescue
  on_exception
end
send_message(queue_url, message) click to toggle source

Sends new message to queue.Returns 'message_id' or raises an exception.

sqs.send_message('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 'message_1') #=> "1234567890...0987654321"
# File lib/sqs/right_sqs_interface.rb, line 325
def send_message(queue_url, message)
  req_hash = generate_rest_request('PUT',
                                   :message   => message,
                                   :queue_url => "#{queue_url}/back")
  request_info(req_hash, SqsSendMessagesParser.new(:logger => @logger))
rescue
  on_exception
end
Also aliased as: push_message
set_queue_attributes(queue_url, attribute, value) click to toggle source

Sets queue attribute. Returns true or an exception.

sqs.set_queue_attributes('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', "VisibilityTimeout", 10) #=> true

P.S. Amazon returns success even if the attribute does not exist. Also, attribute values may not be immediately available to other queries for some time after an update (see the SQS documentation for semantics).

# File lib/sqs/right_sqs_interface.rb, line 197
def set_queue_attributes(queue_url, attribute, value)
  req_hash = generate_request('SetQueueAttributes', 
                              'Attribute' => attribute,
                              'Value'     => value,
                              :queue_url  => queue_url)
  request_info(req_hash, SqsStatusParser.new(:logger => @logger))
rescue
  on_exception
end
set_visibility_timeout(queue_url, visibility_timeout=nil) click to toggle source

Sets visibility timeout. Returns true or an exception.

sqs.set_visibility_timeout('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 15) #=> true

See also: set_queue_attributes

# File lib/sqs/right_sqs_interface.rb, line 213
def set_visibility_timeout(queue_url, visibility_timeout=nil)
  req_hash = generate_request('SetVisibilityTimeout', 
                              'VisibilityTimeout' => visibility_timeout || DEFAULT_VISIBILITY_TIMEOUT,
                              :queue_url => queue_url )
  request_info(req_hash, SqsStatusParser.new(:logger => @logger))
rescue
  on_exception
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.