# File lib/activemessaging/adapters/amqp.rb, line 39 def initialize config = {} @connect_options = { :user => config[:user] || 'guest', :pass => config[:pass] || 'guest', :host => config[:host] || 'localhost', :port => config[:port] || (config[:ssl] ? 5671 : 5672), :vhost => config[:vhost] || nil, :ssl => config[:ssl] || false, :ssl_verify => config[:ssl_verify] || OpenSSL::SSL::VERIFY_PEER, } @debug = config[:debug].to_i rescue 0 Carrot.logging = true unless @debug < 5 @auto_generated_queue = false unless config[:queue_name] @queue_name = Digest::MD5.hexdigest Time.now.to_s @auto_generated_queue = true else @queue_name = config[:queue_name] end @queue_config = DEFAULT_QUEUE_CONFIG unless @auto_generated_queue @queue_config.merge({ :durable => !!config[:queue_durable], :auto_delete => !!config[:queue_auto_delete], :exclusive => !!config[:queue_exclusive] }) end end
# File lib/activemessaging/adapters/amqp.rb, line 140 def disconnect(headers={}) @client.stop end
# File lib/activemessaging/adapters/amqp.rb, line 82 def receive(options={}) while true message = queue.pop(:ack => true) unless message.nil? message = AmqpMessage.decode(message).stamp_received! unless message.nil? message.delivery_tag = queue.delivery_tag puts "RECEIVE: #{message.inspect}" if @debug return message end sleep 0.2 end end
# File lib/activemessaging/adapters/amqp.rb, line 72 def received message, headers = {} puts "Received Message - ACK'ing with delivery_tag '#{message.headers[:delivery_tag]}'" if @debug > 0 client.server.send_frame(::Carrot::AMQP::Protocol::Basic::Ack.new(:delivery_tag => message.headers[:delivery_tag])) end
# File lib/activemessaging/adapters/amqp.rb, line 95 def send queue_name, data, headers = {} headers[:routing_key] ||= queue_name message = AmqpMessage.new({:headers => headers, :data => data}, queue_name) if @debug > 0 puts "Sending the following message: "; pp message end begin exchange(*exchange_info(headers)).publish(message.stamp_sent!.encode, :key => headers[:routing_key]) rescue ::Carrot::AMQP::Server::ServerDown retry_attempts = retry_attempts.nil? ? 1 : retry_attempts + 1 sleep(retry_attempts * 0.25) retry unless retry_attempts >= SERVER_RETRY_MAX_ATTEMPTS raise e end end
# File lib/activemessaging/adapters/amqp.rb, line 113 def subscribe queue_name, headers = {}, subId = nil if @debug > 1 puts "Begin Subscribe Request:" puts " Queue Name: #{queue_name.inspect}" puts " Headers: #{headers.inspect}" puts " subId: #{subId.inspect}" puts " EXCH INFO: #{exchange_info(headers).inspect}" puts "End Subscribe Request." end routing_key = headers[:routing_key] || queue_name queue.bind(exchange(*exchange_info(headers)), :key => routing_key) end
# File lib/activemessaging/adapters/amqp.rb, line 77 def unreceive message, headers = {} puts "Un-Receiving Message - REJECTing with delivery_tag '#{message.headers[:delivery_tag]}'" if @debug > 0 client.server.send_frame(::Carrot::AMQP::Protocol::Basic::Reject.new(:delivery_tag => message.headers[:delivery_tag])) end
# File lib/activemessaging/adapters/amqp.rb, line 127 def unsubscribe(queue_name, headers={}, subId=nil) if @debug > 1 puts "Begin UNsubscribe Request:" puts " Queue Name: #{queue_name.inspect}" puts " Headers: #{headers.inspect}" puts " subId: #{subId.inspect}" puts "End UNsubscribe Request." end routing_key = headers[:routing_key] || queue_name queue.unbind(exchange(*exchange_info(headers)), :key => routing_key) end
Generated with the Darkfish Rdoc Generator 2.