Parent

Included Modules

Files

Class/Module Index [+]

Quicksearch

ActiveMessaging::Adapters::Amqp::Connection

Public Class Methods

new(config = {}) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 39
def initialize config = {}
  @connect_options = {
    :user  => config[:user]  || 'guest',
    :pass  => config[:pass]  || 'guest',
    :host  => config[:host]  || 'localhost',
    :port  => config[:port]  || (config[:ssl] ? 5671 : 5672),
    :vhost => config[:vhost] || nil,
    :ssl   => config[:ssl]   || false,
    :ssl_verify => config[:ssl_verify] || OpenSSL::SSL::VERIFY_PEER,
  }
  
  @debug = config[:debug].to_i rescue 0
  
  Carrot.logging = true unless @debug < 5
  
  @auto_generated_queue = false
  unless config[:queue_name]
    @queue_name = Digest::MD5.hexdigest Time.now.to_s
    @auto_generated_queue = true
  else
    @queue_name = config[:queue_name]
  end

  @queue_config = DEFAULT_QUEUE_CONFIG
  unless @auto_generated_queue
    @queue_config.merge!({
      :durable     => !!config[:queue_durable],
      :auto_delete => !!config[:queue_auto_delete],
      :exclusive   => !!config[:queue_exclusive]
    })
  end
end

Public Instance Methods

disconnect(headers={}) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 140
def disconnect(headers={})
  @client.stop
end
receive(options={}) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 82
def receive(options={})
  while true 
    message = queue.pop(:ack => true)
    unless message.nil?
      message = AmqpMessage.decode(message).stamp_received! unless message.nil?
      message.delivery_tag = queue.delivery_tag
      puts "RECEIVE: #{message.inspect}" if @debug 
      return message
    end
    sleep 0.2
  end
end
received(message, headers = {}) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 72
def received message, headers = {}
  puts "Received Message - ACK'ing with delivery_tag '#{message.headers[:delivery_tag]}'" if @debug > 0
  client.server.send_frame(::Carrot::AMQP::Protocol::Basic::Ack.new(:delivery_tag => message.headers[:delivery_tag]))
end
send(queue_name, data, headers = {}) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 95
def send queue_name, data, headers = {}
  headers[:routing_key] ||= queue_name
  message = AmqpMessage.new({:headers => headers, :data => data}, queue_name)
  
  if @debug > 0
    puts "Sending the following message: "; pp message
  end
  
  begin
    exchange(*exchange_info(headers)).publish(message.stamp_sent!.encode, :key => headers[:routing_key])
  rescue ::Carrot::AMQP::Server::ServerDown
    retry_attempts = retry_attempts.nil? ? 1 : retry_attempts + 1
    sleep(retry_attempts * 0.25)
    retry unless retry_attempts >= SERVER_RETRY_MAX_ATTEMPTS
    raise e
  end
end
subscribe(queue_name, headers = {}) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 113
def subscribe queue_name, headers = {}, subId = nil
  if @debug > 1
    puts "Begin Subscribe Request:"
    puts "    Queue Name: #{queue_name.inspect}"
    puts "       Headers: #{headers.inspect}"
    puts "         subId: #{subId.inspect}"
    puts "     EXCH INFO: #{exchange_info(headers).inspect}"
    puts "End Subscribe Request."
  end
  
  routing_key = headers[:routing_key] || queue_name
  queue.bind(exchange(*exchange_info(headers)), :key => routing_key)
end
unreceive(message, headers = {}) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 77
def unreceive message, headers = {}
  puts "Un-Receiving Message - REJECTing with delivery_tag '#{message.headers[:delivery_tag]}'" if @debug > 0
  client.server.send_frame(::Carrot::AMQP::Protocol::Basic::Reject.new(:delivery_tag => message.headers[:delivery_tag]))
end
unsubscribe(queue_name, headers={}, subId=nil) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 127
def unsubscribe(queue_name, headers={}, subId=nil)
  if @debug > 1
    puts "Begin UNsubscribe Request:"
    puts "    Queue Name: #{queue_name.inspect}"
    puts "    Headers:    #{headers.inspect}"
    puts "    subId:      #{subId.inspect}"
    puts "End UNsubscribe Request."
  end
  
  routing_key = headers[:routing_key] || queue_name
  queue.unbind(exchange(*exchange_info(headers)), :key => routing_key)
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.