View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.buffer.ChannelBufferFactory;
20  import org.jboss.netty.channel.Channel;
21  import org.jboss.netty.channel.ChannelException;
22  import org.jboss.netty.channel.ChannelFuture;
23  import org.jboss.netty.channel.MessageEvent;
24  import org.jboss.netty.channel.ReceiveBufferSizePredictor;
25  
26  import java.io.IOException;
27  import java.net.SocketAddress;
28  import java.nio.ByteBuffer;
29  import java.nio.channels.AsynchronousCloseException;
30  import java.nio.channels.ClosedChannelException;
31  import java.nio.channels.DatagramChannel;
32  import java.nio.channels.SelectionKey;
33  import java.nio.channels.Selector;
34  import java.util.Queue;
35  import java.util.concurrent.Executor;
36  
37  import static org.jboss.netty.channel.Channels.*;
38  
39  /**
40   * A class responsible for registering channels with {@link Selector}.
41   * It also implements the {@link Selector} loop.
42   */
43  public class NioDatagramWorker extends AbstractNioWorker {
44  
45      private final SocketReceiveBufferAllocator bufferAllocator = new SocketReceiveBufferAllocator();
46  
47      /**
48       * Sole constructor.
49       *
50       * @param executor the {@link Executor} used to execute {@link Runnable}s
51       *                 such as {@link ChannelRegistionTask}
52       */
53      NioDatagramWorker(final Executor executor) {
54          super(executor);
55      }
56  
57      @Override
58      protected boolean read(final SelectionKey key) {
59          final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();
60          ReceiveBufferSizePredictor predictor =
61              channel.getConfig().getReceiveBufferSizePredictor();
62          final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
63          final DatagramChannel nioChannel = (DatagramChannel) key.channel();
64          final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
65  
66          final ByteBuffer byteBuffer = bufferAllocator.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
67  
68          boolean failure = true;
69          SocketAddress remoteAddress = null;
70          try {
71              // Receive from the channel in a non blocking mode. We have already been notified that
72              // the channel is ready to receive.
73              remoteAddress = nioChannel.receive(byteBuffer);
74              failure = false;
75          } catch (ClosedChannelException e) {
76              // Can happen, and does not need a user attention.
77          } catch (Throwable t) {
78              fireExceptionCaught(channel, t);
79          }
80  
81          if (remoteAddress != null) {
82              // Flip the buffer so that we can wrap it.
83              byteBuffer.flip();
84  
85              int readBytes = byteBuffer.remaining();
86              if (readBytes > 0) {
87                  // Update the predictor.
88                  predictor.previousReceiveBufferSize(readBytes);
89  
90                  final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
91                  buffer.setBytes(0, byteBuffer);
92                  buffer.writerIndex(readBytes);
93  
94                  // Update the predictor.
95                  predictor.previousReceiveBufferSize(readBytes);
96  
97                  // Notify the interested parties about the newly arrived message.
98                  fireMessageReceived(
99                          channel, buffer, remoteAddress);
100             }
101         }
102 
103         if (failure) {
104             key.cancel(); // Some JDK implementations run into an infinite loop without this.
105             close(channel, succeededFuture(channel));
106             return false;
107         }
108 
109         return true;
110     }
111 
112     @Override
113     protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
114         final Thread workerThread = thread;
115         if (workerThread == null || Thread.currentThread() != workerThread) {
116             if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
117                 // "add" the channels writeTask to the writeTaskQueue.
118                 registerTask(channel.writeTask);
119             }
120             return true;
121         }
122 
123         return false;
124     }
125 
126     static void disconnect(NioDatagramChannel channel, ChannelFuture future) {
127         boolean connected = channel.isConnected();
128         boolean iothread = isIoThread(channel);
129         try {
130             channel.getDatagramChannel().disconnect();
131             future.setSuccess();
132             if (connected) {
133                 if (iothread) {
134                     fireChannelDisconnected(channel);
135                 } else {
136                     fireChannelDisconnectedLater(channel);
137                 }
138             }
139         } catch (Throwable t) {
140             future.setFailure(t);
141             if (iothread) {
142                 fireExceptionCaught(channel, t);
143             } else {
144                 fireExceptionCaughtLater(channel, t);
145             }
146         }
147     }
148 
149     @Override
150     protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
151         return new ChannelRegistionTask((NioDatagramChannel) channel, future);
152     }
153 
154     /**
155      * RegisterTask is a task responsible for registering a channel with a
156      * selector.
157      */
158     private final class ChannelRegistionTask implements Runnable {
159         private final NioDatagramChannel channel;
160 
161         private final ChannelFuture future;
162 
163         ChannelRegistionTask(final NioDatagramChannel channel,
164                 final ChannelFuture future) {
165             this.channel = channel;
166             this.future = future;
167         }
168 
169         /**
170          * This runnable's task. Does the actual registering by calling the
171          * underlying DatagramChannels peer DatagramSocket register method.
172          */
173         public void run() {
174             final SocketAddress localAddress = channel.getLocalAddress();
175             if (localAddress == null) {
176                 if (future != null) {
177                     future.setFailure(new ClosedChannelException());
178                 }
179                 close(channel, succeededFuture(channel));
180                 return;
181             }
182 
183             try {
184                 channel.getDatagramChannel().register(
185                         selector, channel.getRawInterestOps(), channel);
186 
187                 if (future != null) {
188                     future.setSuccess();
189                 }
190             } catch (final IOException e) {
191                 if (future != null) {
192                     future.setFailure(e);
193                 }
194                 close(channel, succeededFuture(channel));
195 
196                 if (!(e instanceof ClosedChannelException)) {
197                     throw new ChannelException(
198                             "Failed to register a socket to the selector.", e);
199                 }
200             }
201         }
202     }
203 
204     @Override
205     public void writeFromUserCode(final AbstractNioChannel<?> channel) {
206         /*
207          * Note that we are not checking if the channel is connected. Connected
208          * has a different meaning in UDP and means that the channels socket is
209          * configured to only send and receive from a given remote peer.
210          */
211         if (!channel.isBound()) {
212             cleanUpWriteBuffer(channel);
213             return;
214         }
215 
216         if (scheduleWriteIfNecessary(channel)) {
217             return;
218         }
219 
220         // From here, we are sure Thread.currentThread() == workerThread.
221 
222         if (channel.writeSuspended) {
223             return;
224         }
225 
226         if (channel.inWriteNowLoop) {
227             return;
228         }
229 
230         write0(channel);
231     }
232 
233     @Override
234     protected void write0(final AbstractNioChannel<?> channel) {
235 
236         boolean addOpWrite = false;
237         boolean removeOpWrite = false;
238 
239         long writtenBytes = 0;
240 
241         final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
242         final DatagramChannel ch = ((NioDatagramChannel) channel).getDatagramChannel();
243         final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
244         final int writeSpinCount = channel.getConfig().getWriteSpinCount();
245         synchronized (channel.writeLock) {
246             // inform the channel that write is in-progress
247             channel.inWriteNowLoop = true;
248 
249             // loop forever...
250             for (;;) {
251                 MessageEvent evt = channel.currentWriteEvent;
252                 SocketSendBufferPool.SendBuffer buf;
253                 if (evt == null) {
254                     if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
255                         removeOpWrite = true;
256                         channel.writeSuspended = false;
257                         break;
258                     }
259 
260                     channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
261                 } else {
262                     buf = channel.currentWriteBuffer;
263                 }
264 
265                 try {
266                     long localWrittenBytes = 0;
267                     SocketAddress raddr = evt.getRemoteAddress();
268                     if (raddr == null) {
269                         for (int i = writeSpinCount; i > 0; i --) {
270                             localWrittenBytes = buf.transferTo(ch);
271                             if (localWrittenBytes != 0) {
272                                 writtenBytes += localWrittenBytes;
273                                 break;
274                             }
275                             if (buf.finished()) {
276                                 break;
277                             }
278                         }
279                     } else {
280                         for (int i = writeSpinCount; i > 0; i --) {
281                             localWrittenBytes = buf.transferTo(ch, raddr);
282                             if (localWrittenBytes != 0) {
283                                 writtenBytes += localWrittenBytes;
284                                 break;
285                             }
286                             if (buf.finished()) {
287                                 break;
288                             }
289                         }
290                     }
291 
292                     if (localWrittenBytes > 0 || buf.finished()) {
293                         // Successful write - proceed to the next message.
294                         buf.release();
295                         ChannelFuture future = evt.getFuture();
296                         channel.currentWriteEvent = null;
297                         channel.currentWriteBuffer = null;
298                         evt = null;
299                         buf = null;
300                         future.setSuccess();
301                     } else {
302                         // Not written at all - perhaps the kernel buffer is full.
303                         addOpWrite = true;
304                         channel.writeSuspended = true;
305                         break;
306                     }
307                 } catch (final AsynchronousCloseException e) {
308                     // Doesn't need a user attention - ignore.
309                 } catch (final Throwable t) {
310                     buf.release();
311                     ChannelFuture future = evt.getFuture();
312                     channel.currentWriteEvent = null;
313                     channel.currentWriteBuffer = null;
314                     // Mark the event object for garbage collection.
315                     //noinspection UnusedAssignment
316                     buf = null;
317                     //noinspection UnusedAssignment
318                     evt = null;
319                     future.setFailure(t);
320                     fireExceptionCaught(channel, t);
321                 }
322             }
323             channel.inWriteNowLoop = false;
324 
325             // Initially, the following block was executed after releasing
326             // the writeLock, but there was a race condition, and it has to be
327             // executed before releasing the writeLock:
328             //
329             // https://issues.jboss.org/browse/NETTY-410
330             //
331             if (addOpWrite) {
332                 setOpWrite(channel);
333             } else if (removeOpWrite) {
334                 clearOpWrite(channel);
335             }
336         }
337 
338         fireWriteComplete(channel, writtenBytes);
339     }
340 
341     @Override
342     public void run() {
343         super.run();
344         bufferAllocator.releaseExternalResources();
345     }
346 }