Class Jabber::Bytestreams::IBB
In: lib/xmpp4r/bytestreams/helper/ibb/base.rb
Parent: Object

In-Band Bytestreams (JEP-0047) implementation

Don‘t use directly, use IBBInitiator and IBBTarget

In-Band Bytestreams should only be used when transferring very small amounts of binary data, because it is slow and increases server load drastically.

Note that the constructor takes a lot of arguments. In-Band Bytestreams do not specify a way to initiate the stream, this should be done via Stream Initiation.

Methods

activate   active?   close   deactivate   flush   new   read   send_data   write  

Constants

NS_IBB = 'http://jabber.org/protocol/ibb'

Public Class methods

Create a new bytestream

Will register a <message/> callback to intercept data of this stream. This data will be buffered, you can retrieve it with receive

[Source]

    # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 30
30:       def initialize(stream, session_id, my_jid, peer_jid)
31:         @stream = stream
32:         @session_id = session_id
33:         @my_jid = (my_jid.kind_of?(String) ? JID.new(my_jid) : my_jid)
34:         @peer_jid = (peer_jid.kind_of?(String) ? JID.new(peer_jid) : peer_jid)
35: 
36:         @active = false
37:         @seq_send = 0
38:         @seq_recv = 0
39:         @queue = []
40:         @queue_lock = Mutex.new
41:         @pending = Semaphore.new
42:         @sendbuf = ''
43:         @sendbuf_lock = Mutex.new
44: 
45:         @block_size = 4096  # Recommended by JEP0047
46:       end

Public Instance methods

[Source]

    # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 48
48:       def active?
49:         @active
50:       end

Close the stream

Waits for acknowledge from peer, may throw ErrorException

[Source]

     # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 128
128:       def close
129:         if active?
130:           flush
131:           deactivate
132: 
133:           iq = Iq.new(:set, @peer_jid)
134:           close = iq.add REXML::Element.new('close')
135:           close.add_namespace IBB::NS_IBB
136:           close.attributes['sid'] = @session_id
137: 
138:           @stream.send_with_id(iq) { |answer|
139:             answer.type == :result
140:           }
141:         end
142:       end

Empty the send-buffer by sending remaining data

[Source]

    # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 72
72:       def flush
73:         @sendbuf_lock.synchronize {
74:           while @sendbuf.size > 0
75:             send_data(@sendbuf[0..@block_size-1])
76:             @sendbuf = @sendbuf[@block_size..-1].to_s
77:           end
78:         }
79:       end

Receive data

Will wait until the Message with the next sequence number is in the stanza queue.

[Source]

     # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 86
 86:       def read
 87:         if active?
 88:           res = nil
 89: 
 90:           while res.nil?
 91:             @queue_lock.synchronize {
 92:               @queue.each { |item|
 93:                 # Find next data
 94:                 if item.type == :data and item.seq == @seq_recv.to_s
 95:                   res = item
 96:                   break
 97:                 # No data? Find close
 98:                 elsif item.type == :close and res.nil?
 99:                   res = item
100:                 end
101:               }
102: 
103:               @queue.delete_if { |item| item == res }
104:             }
105: 
106:             # No data? Wait for next to arrive...
107:             @pending.wait unless res
108:           end
109: 
110:           if res.type == :data
111:             @seq_recv += 1
112:             @seq_recv = 0 if @seq_recv > 65535
113:             res.data
114:           elsif res.type == :close
115:             deactivate
116:             nil # Closed
117:           end
118:         else
119:           nil
120:         end
121:       end

Send data

Data is buffered to match block_size in each packet. If you need the data to be sent immediately, use flush afterwards.

buf:[String]

[Source]

    # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 59
59:       def write(buf)
60:         @sendbuf_lock.synchronize {
61:           @sendbuf += buf
62: 
63:           while @sendbuf.size >= @block_size
64:             send_data(@sendbuf[0..@block_size-1])
65:             @sendbuf = @sendbuf[@block_size..-1].to_s
66:           end
67:         }
68:       end

Private Instance methods

[Source]

     # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 182
182:       def activate
183:         unless active?
184:           @stream.add_message_callback(200, self) { |msg|
185:             data = msg.first_element('data')
186:             if msg.from == @peer_jid and msg.to == @my_jid and data and data.attributes['sid'] == @session_id
187:               if msg.type == nil
188:                 @queue_lock.synchronize {
189:                   @queue.push IBBQueueItem.new(:data, data.attributes['seq'], data.text.to_s)
190:                   @pending.run
191:                 }
192:               elsif msg.type == :error
193:                 @queue_lock.synchronize {
194:                   @queue << IBBQueueItem.new(:close)
195:                   @pending.run
196:                 }
197:               end
198:               true
199:             else
200:               false
201:             end
202:           }
203: 
204:           @stream.add_iq_callback(200, self) { |iq|
205:             close = iq.first_element('close')
206:             if iq.type == :set and close and close.attributes['sid'] == @session_id
207:               answer = iq.answer(false)
208:               answer.type = :result
209:               @stream.send(answer)
210: 
211:               @queue_lock.synchronize {
212:                 @queue << IBBQueueItem.new(:close)
213:                 @pending.run
214:               }
215:               true
216:             else
217:               false
218:             end
219:           }
220: 
221:           @active = true
222:         end
223:       end

[Source]

     # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 225
225:       def deactivate
226:         if active?
227:           @stream.delete_message_callback(self)
228:           @stream.delete_iq_callback(self)
229: 
230:           @active = false
231:         end
232:       end

Send data directly

data:[String]

[Source]

     # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 149
149:       def send_data(databuf)
150:         if active?
151:           msg = Message.new
152:           msg.from = @my_jid
153:           msg.to = @peer_jid
154:           
155:           data = msg.add REXML::Element.new('data')
156:           data.add_namespace NS_IBB
157:           data.attributes['sid'] = @session_id
158:           data.attributes['seq'] = @seq_send.to_s
159:           data.text = Base64::encode64 databuf
160: 
161:           # TODO: Implement AMP correctly
162:           amp = msg.add REXML::Element.new('amp')
163:           amp.add_namespace 'http://jabber.org/protocol/amp'
164:           deliver_at = amp.add REXML::Element.new('rule')
165:           deliver_at.attributes['condition'] = 'deliver-at'
166:           deliver_at.attributes['value'] = 'stored'
167:           deliver_at.attributes['action'] = 'error'
168:           match_resource = amp.add REXML::Element.new('rule')
169:           match_resource.attributes['condition'] = 'match-resource'
170:           match_resource.attributes['value'] = 'exact'
171:           match_resource.attributes['action'] = 'error'
172:    
173:           @stream.send(msg)
174: 
175:           @seq_send += 1
176:           @seq_send = 0 if @seq_send > 65535
177:         else
178:           raise 'Attempt to send data when not activated'
179:         end
180:       end

[Validate]