1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.spdy;
17
18 import org.jboss.netty.channel.MessageEvent;
19
20 import java.util.Comparator;
21 import java.util.Map;
22 import java.util.Set;
23 import java.util.TreeSet;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentLinkedQueue;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
29
30 final class SpdySession {
31
32 private static final SpdyProtocolException STREAM_CLOSED = new SpdyProtocolException("Stream closed");
33
34 private final AtomicInteger activeLocalStreams = new AtomicInteger();
35 private final AtomicInteger activeRemoteStreams = new AtomicInteger();
36 private final Map<Integer, StreamState> activeStreams =
37 new ConcurrentHashMap<Integer, StreamState>();
38
39 private final AtomicInteger sendWindowSize;
40 private final AtomicInteger receiveWindowSize;
41
42 public SpdySession(int sendWindowSize, int receiveWindowSize) {
43 this.sendWindowSize = new AtomicInteger(sendWindowSize);
44 this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
45 }
46
47 int numActiveStreams(boolean remote) {
48 if (remote) {
49 return activeRemoteStreams.get();
50 } else {
51 return activeLocalStreams.get();
52 }
53 }
54
55 boolean noActiveStreams() {
56 return activeStreams.isEmpty();
57 }
58
59 boolean isActiveStream(int streamId) {
60 return activeStreams.containsKey(streamId);
61 }
62
63
64 Set<Integer> getActiveStreams() {
65 TreeSet<Integer> StreamIds = new TreeSet<Integer>(new PriorityComparator());
66 StreamIds.addAll(activeStreams.keySet());
67 return StreamIds;
68 }
69
70 void acceptStream(
71 int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed,
72 int sendWindowSize, int receiveWindowSize, boolean remote) {
73 if (!remoteSideClosed || !localSideClosed) {
74 StreamState state = activeStreams.put(
75 streamId,
76 new StreamState(priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
77 if (state == null) {
78 if (remote) {
79 activeRemoteStreams.incrementAndGet();
80 } else {
81 activeLocalStreams.incrementAndGet();
82 }
83 }
84 }
85 }
86
87 private StreamState removeActiveStream(int streamId, boolean remote) {
88 StreamState state = activeStreams.remove(streamId);
89 if (state != null) {
90 if (remote) {
91 activeRemoteStreams.decrementAndGet();
92 } else {
93 activeLocalStreams.decrementAndGet();
94 }
95 }
96 return state;
97 }
98
99 void removeStream(int streamId, boolean remote) {
100 StreamState state = removeActiveStream(streamId, remote);
101 if (state != null) {
102 MessageEvent e = state.removePendingWrite();
103 while (e != null) {
104 e.getFuture().setFailure(STREAM_CLOSED);
105 e = state.removePendingWrite();
106 }
107 }
108 }
109
110 boolean isRemoteSideClosed(int streamId) {
111 StreamState state = activeStreams.get(streamId);
112 return state == null || state.isRemoteSideClosed();
113 }
114
115 void closeRemoteSide(int streamId, boolean remote) {
116 StreamState state = activeStreams.get(streamId);
117 if (state != null) {
118 state.closeRemoteSide();
119 if (state.isLocalSideClosed()) {
120 removeActiveStream(streamId, remote);
121 }
122 }
123 }
124
125 boolean isLocalSideClosed(int streamId) {
126 StreamState state = activeStreams.get(streamId);
127 return state == null || state.isLocalSideClosed();
128 }
129
130 void closeLocalSide(int streamId, boolean remote) {
131 StreamState state = activeStreams.get(streamId);
132 if (state != null) {
133 state.closeLocalSide();
134 if (state.isRemoteSideClosed()) {
135 removeActiveStream(streamId, remote);
136 }
137 }
138 }
139
140
141
142
143
144
145 boolean hasReceivedReply(int streamId) {
146 StreamState state = activeStreams.get(streamId);
147 return state != null && state.hasReceivedReply();
148 }
149
150 void receivedReply(int streamId) {
151 StreamState state = activeStreams.get(streamId);
152 if (state != null) {
153 state.receivedReply();
154 }
155 }
156
157 int getSendWindowSize(int streamId) {
158 if (streamId == SPDY_SESSION_STREAM_ID) {
159 return sendWindowSize.get();
160 }
161
162 StreamState state = activeStreams.get(streamId);
163 return state != null ? state.getSendWindowSize() : -1;
164 }
165
166 int updateSendWindowSize(int streamId, int deltaWindowSize) {
167 if (streamId == SPDY_SESSION_STREAM_ID) {
168 return sendWindowSize.addAndGet(deltaWindowSize);
169 }
170
171 StreamState state = activeStreams.get(streamId);
172 return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
173 }
174
175 int updateReceiveWindowSize(int streamId, int deltaWindowSize) {
176 if (streamId == SPDY_SESSION_STREAM_ID) {
177 return receiveWindowSize.addAndGet(deltaWindowSize);
178 }
179
180 StreamState state = activeStreams.get(streamId);
181 if (deltaWindowSize > 0) {
182 state.setReceiveWindowSizeLowerBound(0);
183 }
184 return state != null ? state.updateReceiveWindowSize(deltaWindowSize) : -1;
185 }
186
187 int getReceiveWindowSizeLowerBound(int streamId) {
188 if (streamId == SPDY_SESSION_STREAM_ID) {
189 return 0;
190 }
191
192 StreamState state = activeStreams.get(streamId);
193 return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
194 }
195
196 void updateAllSendWindowSizes(int deltaWindowSize) {
197 for (StreamState state: activeStreams.values()) {
198 state.updateSendWindowSize(deltaWindowSize);
199 }
200 }
201
202 void updateAllReceiveWindowSizes(int deltaWindowSize) {
203 for (StreamState state: activeStreams.values()) {
204 state.updateReceiveWindowSize(deltaWindowSize);
205 if (deltaWindowSize < 0) {
206 state.setReceiveWindowSizeLowerBound(deltaWindowSize);
207 }
208 }
209 }
210
211 boolean putPendingWrite(int streamId, MessageEvent evt) {
212 StreamState state = activeStreams.get(streamId);
213 return state != null && state.putPendingWrite(evt);
214 }
215
216 MessageEvent getPendingWrite(int streamId) {
217 if (streamId == SPDY_SESSION_STREAM_ID) {
218 for (Integer id : getActiveStreams()) {
219 StreamState state = activeStreams.get(id);
220 if (state.getSendWindowSize() > 0) {
221 MessageEvent e = state.getPendingWrite();
222 if (e != null) {
223 return e;
224 }
225 }
226 }
227 return null;
228 }
229
230 StreamState state = activeStreams.get(streamId);
231 return state != null ? state.getPendingWrite() : null;
232 }
233
234 MessageEvent removePendingWrite(int streamId) {
235 StreamState state = activeStreams.get(streamId);
236 return state != null ? state.removePendingWrite() : null;
237 }
238
239 private static final class StreamState {
240
241 private final byte priority;
242 private volatile boolean remoteSideClosed;
243 private volatile boolean localSideClosed;
244 private boolean receivedReply;
245 private final AtomicInteger sendWindowSize;
246 private final AtomicInteger receiveWindowSize;
247 private volatile int receiveWindowSizeLowerBound;
248 private final ConcurrentLinkedQueue<MessageEvent> pendingWriteQueue =
249 new ConcurrentLinkedQueue<MessageEvent>();
250
251 StreamState(
252 byte priority, boolean remoteSideClosed, boolean localSideClosed,
253 int sendWindowSize, int receiveWindowSize) {
254 this.priority = priority;
255 this.remoteSideClosed = remoteSideClosed;
256 this.localSideClosed = localSideClosed;
257 this.sendWindowSize = new AtomicInteger(sendWindowSize);
258 this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
259 }
260
261 byte getPriority() {
262 return priority;
263 }
264
265 boolean isRemoteSideClosed() {
266 return remoteSideClosed;
267 }
268
269 void closeRemoteSide() {
270 remoteSideClosed = true;
271 }
272
273 boolean isLocalSideClosed() {
274 return localSideClosed;
275 }
276
277 void closeLocalSide() {
278 localSideClosed = true;
279 }
280
281 boolean hasReceivedReply() {
282 return receivedReply;
283 }
284
285 void receivedReply() {
286 receivedReply = true;
287 }
288
289 int getSendWindowSize() {
290 return sendWindowSize.get();
291 }
292
293 int updateSendWindowSize(int deltaWindowSize) {
294 return sendWindowSize.addAndGet(deltaWindowSize);
295 }
296
297 int updateReceiveWindowSize(int deltaWindowSize) {
298 return receiveWindowSize.addAndGet(deltaWindowSize);
299 }
300
301 int getReceiveWindowSizeLowerBound() {
302 return receiveWindowSizeLowerBound;
303 }
304
305 void setReceiveWindowSizeLowerBound(int receiveWindowSizeLowerBound) {
306 this.receiveWindowSizeLowerBound = receiveWindowSizeLowerBound;
307 }
308
309 boolean putPendingWrite(MessageEvent evt) {
310 return pendingWriteQueue.offer(evt);
311 }
312
313 MessageEvent getPendingWrite() {
314 return pendingWriteQueue.peek();
315 }
316
317 MessageEvent removePendingWrite() {
318 return pendingWriteQueue.poll();
319 }
320 }
321
322 private final class PriorityComparator implements Comparator<Integer> {
323
324 PriorityComparator() {
325 }
326
327 public int compare(Integer id1, Integer id2) {
328 StreamState state1 = activeStreams.get(id1);
329 StreamState state2 = activeStreams.get(id2);
330 return state1.getPriority() - state2.getPriority();
331 }
332 }
333 }