Class | Jabber::Stream |
In: |
lib/xmpp4r/stream.rb
|
Parent: | Object |
The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)
You may register callbacks for the three Jabber stanzas (message, presence and iq) and use the send and send_with_id methods.
To ensure the order of received stanzas, callback blocks are launched in the parser thread. If further blocking operations are intended in those callbacks, run your own thread there.
DISCONNECTED | = | 1 |
CONNECTED | = | 2 |
fd | [R] | file descriptor used |
processing | [R] | number of stanzas currently being processed |
status | [R] | connection status |
Initialize a new stream
# File lib/xmpp4r/stream.rb, line 43 43: def initialize 44: @fd = nil 45: @status = DISCONNECTED 46: @xmlcbs = CallbackList.new 47: @stanzacbs = CallbackList.new 48: @messagecbs = CallbackList.new 49: @iqcbs = CallbackList.new 50: @presencecbs = CallbackList.new 51: @send_lock = Mutex.new 52: @last_send = Time.now 53: @exception_block = nil 54: @tbcbmutex = Mutex.new 55: @threadblocks = [] 56: @wakeup_thread = nil 57: @streamid = nil 58: @streamns = 'jabber:client' 59: @features_sem = Semaphore.new 60: @parser_thread = nil 61: @processing = 0 62: end
Adds a callback block to process received Iqs
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 552 552: def add_iq_callback(priority = 0, ref = nil, &block) 553: @tbcbmutex.synchronize do 554: @iqcbs.add(priority, ref, block) 555: end 556: end
Adds a callback block to process received Messages
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 486 486: def add_message_callback(priority = 0, ref = nil, &block) 487: @tbcbmutex.synchronize do 488: @messagecbs.add(priority, ref, block) 489: end 490: end
Adds a callback block to process received Presences
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 530 530: def add_presence_callback(priority = 0, ref = nil, &block) 531: @tbcbmutex.synchronize do 532: @presencecbs.add(priority, ref, block) 533: end 534: end
Adds a callback block to process received Stanzas
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 508 508: def add_stanza_callback(priority = 0, ref = nil, &block) 509: @tbcbmutex.synchronize do 510: @stanzacbs.add(priority, ref, block) 511: end 512: end
Adds a callback block to process received XML messages, these will be handled before any blocks given to Stream#send or other callbacks.
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 464 464: def add_xml_callback(priority = 0, ref = nil, &block) 465: @tbcbmutex.synchronize do 466: @xmlcbs.add(priority, ref, block) 467: end 468: end
# File lib/xmpp4r/stream.rb, line 574 574: def close! 575: pr = 1 576: n = 0 577: # In some cases, we might lost count of some stanzas 578: # (for example, if the handler raises an exception) 579: # so we can't block forever. 580: while pr > 0 and n <= 1000 581: @tbcbmutex.synchronize { pr = @processing } 582: if pr > 0 583: n += 1 584: Jabber::debuglog("TRYING TO CLOSE, STILL PROCESSING #{pr} STANZAS") 585: #puts("TRYING TO CLOSE, STILL PROCESSING #{pr} STANZAS") 586: Thread::pass 587: end 588: end 589: 590: # Order Matters here! If this method is called from within 591: # @parser_thread then killing @parser_thread first would 592: # mean the other parts of the method fail to execute. 593: # That would be bad. So kill parser_thread last 594: @fd.close if @fd and !@fd.closed? 595: @status = DISCONNECTED 596: @parser_thread.kill if @parser_thread 597: end
Delete a Stanza callback
ref: | [String] The reference of the callback to delete |
# File lib/xmpp4r/stream.rb, line 518 518: def delete_stanza_callback(ref) 519: @tbcbmutex.synchronize do 520: @stanzacbs.delete(ref) 521: end 522: end
Delete an XML-messages callback
ref: | [String] The reference of the callback to delete |
# File lib/xmpp4r/stream.rb, line 474 474: def delete_xml_callback(ref) 475: @tbcbmutex.synchronize do 476: @xmlcbs.delete(ref) 477: end 478: end
Get the list of iq callbacks.
# File lib/xmpp4r/stream.rb, line 289 289: def iq_callbacks 290: @iqcbs 291: end
Returns if this connection is connected to a Jabber service
return: | [Boolean] Connection status |
# File lib/xmpp4r/stream.rb, line 160 160: def is_connected? 161: return @status == CONNECTED 162: end
Returns if this connection is NOT connected to a Jabber service
return: | [Boolean] Connection status |
# File lib/xmpp4r/stream.rb, line 168 168: def is_disconnected? 169: return @status == DISCONNECTED 170: end
Get the list of message callbacks.
# File lib/xmpp4r/stream.rb, line 295 295: def message_callbacks 296: @messagecbs 297: end
Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.
The block has to take three arguments:
# File lib/xmpp4r/stream.rb, line 117 117: def on_exception(&block) 118: @exception_block = block 119: end
This method is called by the parser when a failure occurs
# File lib/xmpp4r/stream.rb, line 123 123: def parse_failure(e) 124: Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") 125: 126: # A new thread has to be created because close will cause the thread 127: # to commit suicide(???) 128: if @exception_block 129: # New thread, because close will kill the current thread 130: Thread.new do 131: Thread.current.abort_on_exception = true 132: close 133: @exception_block.call(e, self, :parser) 134: end 135: else 136: Jabber::warnlog "Stream#parse_failure was called by XML parser. Dumping " + 137: "backtrace...\n" + e.exception + "\n#{e.backtrace.join("\n")}" 138: close 139: raise 140: end 141: end
This method is called by the parser upon receiving </stream:stream>
# File lib/xmpp4r/stream.rb, line 145 145: def parser_end 146: if @exception_block 147: Thread.new do 148: Thread.current.abort_on_exception = true 149: close 150: @exception_block.call(nil, self, :close) 151: end 152: else 153: close 154: end 155: end
Get the list of presence callbacks.
# File lib/xmpp4r/stream.rb, line 301 301: def presence_callbacks 302: @presencecbs 303: end
Processes a received REXML::Element and executes registered thread blocks and filters against it.
element: | [REXML::Element] The received element |
# File lib/xmpp4r/stream.rb, line 177 177: def receive(element) 178: @tbcbmutex.synchronize { @processing += 1 } 179: Jabber::debuglog("RECEIVED:\n#{element.to_s}") 180: 181: if element.namespace('').to_s == '' # REXML namespaces are always strings 182: element.add_namespace(@streamns) 183: end 184: 185: case element.prefix 186: when 'stream' 187: case element.name 188: when 'stream' 189: stanza = element 190: @streamid = element.attributes['id'] 191: @streamns = element.namespace('') if element.namespace('') 192: 193: # Hack: component streams are basically client streams. 194: # Someday we may want to create special stanza classes 195: # for components/s2s deriving from normal stanzas but 196: # posessing these namespaces 197: @streamns = 'jabber:client' if @streamns == 'jabber:component:accept' 198: 199: unless element.attributes['version'] # isn't XMPP compliant, so 200: Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features") 201: @features_sem.run # don't wait for <stream:features/> 202: end 203: when 'features' 204: stanza = element 205: element.each { |e| 206: if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl' 207: e.each_element('mechanism') { |mech| 208: @stream_mechanisms.push(mech.text) 209: } 210: else 211: @stream_features[e.name] = e.namespace 212: end 213: } 214: Jabber::debuglog("FEATURES: received") 215: @features_sem.run 216: else 217: stanza = element 218: end 219: else 220: # Any stanza, classes are registered by XMPPElement::name_xmlns 221: begin 222: stanza = XMPPStanza::import(element) 223: rescue NoNameXmlnsRegistered 224: stanza = element 225: end 226: end 227: 228: if @xmlcbs.process(stanza) 229: @tbcbmutex.synchronize { @processing -= 1 } 230: return true 231: end 232: 233: # Iterate through blocked threads (= waiting for an answer) 234: # 235: # We're dup'ping the @threadblocks here, so that we won't end up in an 236: # endless loop if Stream#send is being nested. That means, the nested 237: # threadblock won't receive the stanza currently processed, but the next 238: # one. 239: threadblocks = nil 240: @tbcbmutex.synchronize do 241: threadblocks = @threadblocks.dup 242: end 243: threadblocks.each { |threadblock| 244: exception = nil 245: r = false 246: begin 247: r = threadblock.call(stanza) 248: rescue Exception => e 249: exception = e 250: end 251: 252: if r == true 253: @tbcbmutex.synchronize do 254: @threadblocks.delete(threadblock) 255: end 256: threadblock.wakeup 257: @tbcbmutex.synchronize { @processing -= 1 } 258: return true 259: elsif exception 260: @tbcbmutex.synchronize do 261: @threadblocks.delete(threadblock) 262: end 263: threadblock.raise(exception) 264: end 265: } 266: 267: Jabber::debuglog("PROCESSING:\n#{stanza.to_s} (#{stanza.class})") 268: Jabber::debuglog("TRYING stanzacbs...") 269: if @stanzacbs.process(stanza) 270: @tbcbmutex.synchronize { @processing -= 1 } 271: return true 272: end 273: r = false 274: Jabber::debuglog("TRYING message/iq/presence/cbs...") 275: case stanza 276: when Message 277: r = @messagecbs.process(stanza) 278: when Iq 279: r = @iqcbs.process(stanza) 280: when Presence 281: r = @presencecbs.process(stanza) 282: end 283: @tbcbmutex.synchronize { @processing -= 1 } 284: return r 285: end
Sends XML data to the socket and (optionally) waits to process received data.
Do not invoke this in a callback but in a seperate thread because we may not suspend the parser-thread (in whose context callbacks are executed).
xml: | [String] The xml data to send |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 361 361: def send(xml, &block) 362: Jabber::debuglog("SENDING:\n#{xml}") 363: if block 364: threadblock = ThreadBlock.new(block) 365: @tbcbmutex.synchronize do 366: @threadblocks.unshift(threadblock) 367: end 368: end 369: begin 370: # Temporarily remove stanza's namespace to 371: # reduce bandwidth consumption 372: if xml.kind_of? XMPPStanza and xml.namespace == 'jabber:client' and 373: xml.prefix != 'stream' and xml.name != 'stream' 374: xml.delete_namespace 375: send_data(xml.to_s) 376: xml.add_namespace(@streamns) 377: else 378: send_data(xml.to_s) 379: end 380: rescue Exception => e 381: Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") 382: 383: if @exception_block 384: Thread.new do 385: Thread.current.abort_on_exception = true 386: close! 387: @exception_block.call(e, self, :sending) 388: end 389: else 390: Jabber::warnlog "Exception caught while sending! (#{e.class})\n#{e.backtrace.join("\n")}" 391: close! 392: raise 393: end 394: end 395: # The parser thread might be running this (think of a callback running send()) 396: # If this is the case, we mustn't stop (or we would cause a deadlock) 397: if block and Thread.current != @parser_thread 398: threadblock.wait 399: elsif block 400: Jabber::warnlog("WARNING:\nCannot stop current thread in Jabber::Stream#send because it is the parser thread!") 401: end 402: end
# File lib/xmpp4r/stream.rb, line 343 343: def send_data(data) 344: @send_lock.synchronize do 345: @last_send = Time.now 346: @fd << data 347: @fd.flush 348: end 349: end
Send an XMMP stanza with an Jabber::XMPPStanza#id. The id will be generated by Jabber::IdGenerator if not already set.
The block will be called once: when receiving a stanza with the same Jabber::XMPPStanza#id. There is no need to return true to complete this! Instead the return value of the block will be returned. This is a direct result of unique request/response stanza identification via the id attribute.
The block may be omitted. Then, the result will be the response stanza.
Be aware that if a stanza with type=‘error‘ is received the function does not yield but raises an ServerError with the corresponding error element.
Please see Stream#send for some implementational details.
Please read the note about nesting at Stream#send
xml: | [XMPPStanza] |
# File lib/xmpp4r/stream.rb, line 425 425: def send_with_id(xml, &block) 426: if xml.id.nil? 427: xml.id = Jabber::IdGenerator.instance.generate_id 428: end 429: 430: res = nil 431: error = nil 432: send(xml) do |received| 433: if received.kind_of? XMPPStanza and received.id == xml.id 434: if received.type == :error 435: error = (received.error ? received.error : ErrorResponse.new) 436: true 437: elsif block_given? 438: res = yield(received) 439: true 440: else 441: res = received 442: true 443: end 444: else 445: false 446: end 447: end 448: 449: unless error.nil? 450: raise ServerError.new(error) 451: end 452: 453: res 454: end
Get the list of stanza callbacks.
# File lib/xmpp4r/stream.rb, line 307 307: def stanza_callbacks 308: @stanzacbs 309: end
Start the XML parser on the fd
# File lib/xmpp4r/stream.rb, line 66 66: def start(fd) 67: @stream_mechanisms = [] 68: @stream_features = {} 69: 70: @fd = fd 71: @parser = StreamParser.new(@fd, self) 72: @parser_thread = Thread.new do 73: Thread.current.abort_on_exception = true 74: begin 75: @parser.parse 76: Jabber::debuglog("DISCONNECTED\n") 77: 78: if @exception_block 79: Thread.new { close!; @exception_block.call(nil, self, :disconnected) } 80: else 81: close! 82: end 83: rescue Exception => e 84: Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") 85: 86: if @exception_block 87: Thread.new do 88: Thread.current.abort_on_exception = true 89: close 90: @exception_block.call(e, self, :start) 91: end 92: else 93: Jabber::warnlog "Exception caught in Parser thread! (#{e.class})\n#{e.backtrace.join("\n")}" 94: close! 95: raise 96: end 97: end 98: end 99: 100: @status = CONNECTED 101: end
# File lib/xmpp4r/stream.rb, line 103 103: def stop 104: @parser_thread.kill 105: @parser = nil 106: end