Object
Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.
Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.
A new Client object can be initialized using two forms:
Standard positional parameters:
login (String, default : '') passcode (String, default : '') host (String, default : 'localhost') port (Integer, default : 61613) reliable (Boolean, default : false) e.g. c = Client.new('login', 'passcode', 'localhost', 61613, true)
Stomp URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms: stomp://host:port stomp://host.domain.tld:port stomp://login:passcode@host:port stomp://login:passcode@host.domain.tld:port
# File lib/stomp/client.rb, line 36 def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) # Parse stomp:// URL's or set params if login.is_a?(Hash) @parameters = login first_host = @parameters[:hosts][0] @login = first_host[:login] @passcode = first_host[:passcode] @host = first_host[:host] @port = first_host[:port] || Connection::default_port(first_host[:ssl]) @reliable = true elsif login =~ /^stomp:\/\/#{url_regex}/ # e.g. stomp://login:passcode@host:port or stomp://host:port @login = $2 || "" @passcode = $3 || "" @host = $4 @port = $5.to_i @reliable = false elsif login =~ /^failover:(\/\/)?\(stomp(\+ssl)?:\/\/#{url_regex}(,stomp(\+ssl)?:\/\/#{url_regex}\))+(\?(.*))?$/ # e.g. failover://(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)?option1=param first_host = {} first_host[:ssl] = !$2.nil? @login = first_host[:login] = $4 || "" @passcode = first_host[:passcode] = $5 || "" @host = first_host[:host] = $6 @port = first_host[:port] = $7.to_i || Connection::default_port(first_host[:ssl]) options = $16 || "" parts = options.split(/&|=/) options = Hash[*parts] hosts = [first_host] + parse_hosts(login) @parameters = {} @parameters[:hosts] = hosts @parameters.merge! filter_options(options) @reliable = true else @login = login @passcode = passcode @host = host @port = port.to_i @reliable = reliable end check_arguments! @id_mutex = Mutex.new @ids = 1 if @parameters @connection = Connection.new(@parameters) else @connection = Connection.new(@login, @passcode, @host, @port, @reliable) end start_listeners end
Syntactic sugar for 'Client.new' See 'initialize' for usage.
# File lib/stomp/client.rb, line 102 def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) Client.new(login, passcode, host, port, reliable) end
Abort a transaction by name
# File lib/stomp/client.rb, line 118 def abort(name, headers = {}) @connection.abort(name, headers) # lets replay any ack'd messages in this transaction replay_list = @replay_messages_by_txn[name] if replay_list replay_list.each do |message| if listener = find_listener(message) listener.call(message) end end end end
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => 'client'g
Accepts a transaction header ( :transaction => 'some_transaction_id' )
# File lib/stomp/client.rb, line 167 def acknowledge(message, headers = {}) txn_id = headers[:transaction] if txn_id # lets keep around messages ack'd in this transaction in case we rollback replay_list = @replay_messages_by_txn[txn_id] if replay_list.nil? replay_list = [] @replay_messages_by_txn[txn_id] = replay_list end replay_list << message end if block_given? headers['receipt'] = register_receipt_listener lambda {|r| yield r} end @connection.ack message.headers['message-id'], headers end
Begin a transaction by name
# File lib/stomp/client.rb, line 113 def begin(name, headers = {}) @connection.begin(name, headers) end
Close out resources in use by this client
# File lib/stomp/client.rb, line 231 def close headers={} @listener_thread.exit @connection.disconnect headers end
Is this client closed?
# File lib/stomp/client.rb, line 226 def closed? @connection.closed? end
Commit a transaction by name
# File lib/stomp/client.rb, line 133 def commit(name, headers = {}) txn_id = headers[:transaction] @replay_messages_by_txn.delete(txn_id) @connection.commit(name, headers) end
# File lib/stomp/client.rb, line 212 def connection_frame @connection.connection_frame end
# File lib/stomp/client.rb, line 216 def disconnect_receipt @connection.disconnect_receipt end
Join the listener thread for this client, generally used to wait for a quit signal
# File lib/stomp/client.rb, line 108 def join(limit = nil) @listener_thread.join(limit) end
# File lib/stomp/client.rb, line 203 def obj_send(*args) __send__(*args) end
Is this client open?
# File lib/stomp/client.rb, line 221 def open? @connection.open? end
Publishes message to destination
If a block is given a receipt will be requested and passed to the block on receipt
Accepts a transaction header ( :transaction => 'some_transaction_id' )
# File lib/stomp/client.rb, line 196 def publish(destination, message, headers = {}) if block_given? headers['receipt'] = register_receipt_listener lambda {|r| yield r} end @connection.publish(destination, message, headers) end
Check if the thread was created and isn't dead
# File lib/stomp/client.rb, line 237 def running @listener_thread && !!@listener_thread.status end
# File lib/stomp/client.rb, line 207 def send(*args) warn("This method is deprecated and will be removed on the next release. Use 'publish' instead") publish(*args) end
Subscribe to a destination, must be passed a block which will be used as a callback listener
Accepts a transaction header ( :transaction => 'some_transaction_id' )
# File lib/stomp/client.rb, line 143 def subscribe(destination, headers = {}) raise "No listener given" unless block_given? # use subscription id to correlate messages to subscription. As described in # the SUBSCRIPTION section of the protocol: http://stomp.codehaus.org/Protocol. # If no subscription id is provided, generate one. set_subscription_id_if_missing(destination, headers) if @listeners[headers[:id]] raise "attempting to subscribe to a queue with a previous subscription" end @listeners[headers[:id]] = lambda {|msg| yield msg} @connection.subscribe(destination, headers) end
Generated with the Darkfish Rdoc Generator 2.