View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.spdy;
17  
18  import org.jboss.netty.channel.MessageEvent;
19  
20  import java.util.Comparator;
21  import java.util.Map;
22  import java.util.Set;
23  import java.util.TreeSet;
24  import java.util.concurrent.ConcurrentHashMap;
25  import java.util.concurrent.ConcurrentLinkedQueue;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
29  
30  final class SpdySession {
31  
32      private static final SpdyProtocolException STREAM_CLOSED = new SpdyProtocolException("Stream closed");
33  
34      private final AtomicInteger activeLocalStreams  = new AtomicInteger();
35      private final AtomicInteger activeRemoteStreams = new AtomicInteger();
36      private final Map<Integer, StreamState> activeStreams =
37          new ConcurrentHashMap<Integer, StreamState>();
38  
39      private final AtomicInteger sendWindowSize;
40      private final AtomicInteger receiveWindowSize;
41  
42      public SpdySession(int sendWindowSize, int receiveWindowSize) {
43          this.sendWindowSize = new AtomicInteger(sendWindowSize);
44          this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
45      }
46  
47      int numActiveStreams(boolean remote) {
48          if (remote) {
49              return activeRemoteStreams.get();
50          } else {
51              return activeLocalStreams.get();
52          }
53      }
54  
55      boolean noActiveStreams() {
56          return activeStreams.isEmpty();
57      }
58  
59      boolean isActiveStream(int streamId) {
60          return activeStreams.containsKey(streamId);
61      }
62  
63      // Stream-IDs should be iterated in priority order
64      Set<Integer> getActiveStreams() {
65          TreeSet<Integer> StreamIds = new TreeSet<Integer>(new PriorityComparator());
66          StreamIds.addAll(activeStreams.keySet());
67          return StreamIds;
68      }
69  
70      void acceptStream(
71              int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed,
72              int sendWindowSize, int receiveWindowSize, boolean remote) {
73          if (!remoteSideClosed || !localSideClosed) {
74              StreamState state = activeStreams.put(
75                      streamId,
76                      new StreamState(priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
77              if (state == null) {
78                  if (remote) {
79                      activeRemoteStreams.incrementAndGet();
80                  } else {
81                      activeLocalStreams.incrementAndGet();
82                  }
83              }
84          }
85      }
86  
87      private StreamState removeActiveStream(int streamId, boolean remote) {
88          StreamState state = activeStreams.remove(streamId);
89          if (state != null) {
90              if (remote) {
91                  activeRemoteStreams.decrementAndGet();
92              } else {
93                  activeLocalStreams.decrementAndGet();
94              }
95          }
96          return state;
97      }
98  
99      void removeStream(int streamId, boolean remote) {
100         StreamState state = removeActiveStream(streamId, remote);
101         if (state != null) {
102             MessageEvent e = state.removePendingWrite();
103             while (e != null) {
104                 e.getFuture().setFailure(STREAM_CLOSED);
105                 e = state.removePendingWrite();
106             }
107         }
108     }
109 
110     boolean isRemoteSideClosed(int streamId) {
111         StreamState state = activeStreams.get(streamId);
112         return state == null || state.isRemoteSideClosed();
113     }
114 
115     void closeRemoteSide(int streamId, boolean remote) {
116         StreamState state = activeStreams.get(streamId);
117         if (state != null) {
118             state.closeRemoteSide();
119             if (state.isLocalSideClosed()) {
120                 removeActiveStream(streamId, remote);
121             }
122         }
123     }
124 
125     boolean isLocalSideClosed(int streamId) {
126         StreamState state = activeStreams.get(streamId);
127         return state == null || state.isLocalSideClosed();
128     }
129 
130     void closeLocalSide(int streamId, boolean remote) {
131         StreamState state = activeStreams.get(streamId);
132         if (state != null) {
133             state.closeLocalSide();
134             if (state.isRemoteSideClosed()) {
135                 removeActiveStream(streamId, remote);
136             }
137         }
138     }
139 
140     /*
141      * hasReceivedReply and receivedReply are only called from messageReceived
142      * no need to synchronize access to the StreamState
143      */
144 
145     boolean hasReceivedReply(int streamId) {
146         StreamState state = activeStreams.get(streamId);
147         return state != null && state.hasReceivedReply();
148     }
149 
150     void receivedReply(int streamId) {
151         StreamState state = activeStreams.get(streamId);
152         if (state != null) {
153             state.receivedReply();
154         }
155     }
156 
157     int getSendWindowSize(int streamId) {
158         if (streamId == SPDY_SESSION_STREAM_ID) {
159             return sendWindowSize.get();
160         }
161 
162         StreamState state = activeStreams.get(streamId);
163         return state != null ? state.getSendWindowSize() : -1;
164     }
165 
166     int updateSendWindowSize(int streamId, int deltaWindowSize) {
167         if (streamId == SPDY_SESSION_STREAM_ID) {
168             return sendWindowSize.addAndGet(deltaWindowSize);
169         }
170 
171         StreamState state = activeStreams.get(streamId);
172         return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
173     }
174 
175     int updateReceiveWindowSize(int streamId, int deltaWindowSize) {
176         if (streamId == SPDY_SESSION_STREAM_ID) {
177             return receiveWindowSize.addAndGet(deltaWindowSize);
178         }
179 
180         StreamState state = activeStreams.get(streamId);
181         if (deltaWindowSize > 0) {
182             state.setReceiveWindowSizeLowerBound(0);
183         }
184         return state != null ? state.updateReceiveWindowSize(deltaWindowSize) : -1;
185     }
186 
187     int getReceiveWindowSizeLowerBound(int streamId) {
188         if (streamId == SPDY_SESSION_STREAM_ID) {
189             return 0;
190         }
191 
192         StreamState state = activeStreams.get(streamId);
193         return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
194     }
195 
196     void updateAllSendWindowSizes(int deltaWindowSize) {
197         for (StreamState state: activeStreams.values()) {
198             state.updateSendWindowSize(deltaWindowSize);
199         }
200     }
201 
202     void updateAllReceiveWindowSizes(int deltaWindowSize) {
203         for (StreamState state: activeStreams.values()) {
204             state.updateReceiveWindowSize(deltaWindowSize);
205             if (deltaWindowSize < 0) {
206                 state.setReceiveWindowSizeLowerBound(deltaWindowSize);
207             }
208         }
209     }
210 
211     boolean putPendingWrite(int streamId, MessageEvent evt) {
212         StreamState state = activeStreams.get(streamId);
213         return state != null && state.putPendingWrite(evt);
214     }
215 
216     MessageEvent getPendingWrite(int streamId) {
217         if (streamId == SPDY_SESSION_STREAM_ID) {
218             for (Integer id : getActiveStreams()) {
219                 StreamState state = activeStreams.get(id);
220                 if (state.getSendWindowSize() > 0) {
221                     MessageEvent e = state.getPendingWrite();
222                     if (e != null) {
223                         return e;
224                     }
225                 }
226             }
227             return null;
228         }
229 
230         StreamState state = activeStreams.get(streamId);
231         return state != null ? state.getPendingWrite() : null;
232     }
233 
234     MessageEvent removePendingWrite(int streamId) {
235         StreamState state = activeStreams.get(streamId);
236         return state != null ? state.removePendingWrite() : null;
237     }
238 
239     private static final class StreamState {
240 
241         private final byte priority;
242         private volatile boolean remoteSideClosed;
243         private volatile boolean localSideClosed;
244         private boolean receivedReply;
245         private final AtomicInteger sendWindowSize;
246         private final AtomicInteger receiveWindowSize;
247         private volatile int receiveWindowSizeLowerBound;
248         private final ConcurrentLinkedQueue<MessageEvent> pendingWriteQueue =
249                 new ConcurrentLinkedQueue<MessageEvent>();
250 
251         StreamState(
252                 byte priority, boolean remoteSideClosed, boolean localSideClosed,
253                 int sendWindowSize, int receiveWindowSize) {
254             this.priority = priority;
255             this.remoteSideClosed = remoteSideClosed;
256             this.localSideClosed = localSideClosed;
257             this.sendWindowSize = new AtomicInteger(sendWindowSize);
258             this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
259         }
260 
261         byte getPriority() {
262             return priority;
263         }
264 
265         boolean isRemoteSideClosed() {
266             return remoteSideClosed;
267         }
268 
269         void closeRemoteSide() {
270             remoteSideClosed = true;
271         }
272 
273         boolean isLocalSideClosed() {
274             return localSideClosed;
275         }
276 
277         void closeLocalSide() {
278             localSideClosed = true;
279         }
280 
281         boolean hasReceivedReply() {
282             return receivedReply;
283         }
284 
285         void receivedReply() {
286             receivedReply = true;
287         }
288 
289         int getSendWindowSize() {
290             return sendWindowSize.get();
291         }
292 
293         int updateSendWindowSize(int deltaWindowSize) {
294             return sendWindowSize.addAndGet(deltaWindowSize);
295         }
296 
297         int updateReceiveWindowSize(int deltaWindowSize) {
298             return receiveWindowSize.addAndGet(deltaWindowSize);
299         }
300 
301         int getReceiveWindowSizeLowerBound() {
302             return receiveWindowSizeLowerBound;
303         }
304 
305         void setReceiveWindowSizeLowerBound(int receiveWindowSizeLowerBound) {
306             this.receiveWindowSizeLowerBound = receiveWindowSizeLowerBound;
307         }
308 
309         boolean putPendingWrite(MessageEvent evt) {
310             return pendingWriteQueue.offer(evt);
311         }
312 
313         MessageEvent getPendingWrite() {
314             return pendingWriteQueue.peek();
315         }
316 
317         MessageEvent removePendingWrite() {
318             return pendingWriteQueue.poll();
319         }
320     }
321 
322     private final class PriorityComparator implements Comparator<Integer> {
323 
324         PriorityComparator() {
325         }
326 
327         public int compare(Integer id1, Integer id2) {
328             StreamState state1 = activeStreams.get(id1);
329             StreamState state2 = activeStreams.get(id2);
330             return state1.getPriority() - state2.getPriority();
331         }
332     }
333 }