1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.ChannelBufferFactory;
20 import org.jboss.netty.channel.Channel;
21 import org.jboss.netty.channel.ChannelException;
22 import org.jboss.netty.channel.ChannelFuture;
23 import org.jboss.netty.channel.MessageEvent;
24 import org.jboss.netty.channel.ReceiveBufferSizePredictor;
25
26 import java.io.IOException;
27 import java.net.SocketAddress;
28 import java.nio.ByteBuffer;
29 import java.nio.channels.AsynchronousCloseException;
30 import java.nio.channels.ClosedChannelException;
31 import java.nio.channels.DatagramChannel;
32 import java.nio.channels.SelectionKey;
33 import java.nio.channels.Selector;
34 import java.util.Queue;
35 import java.util.concurrent.Executor;
36
37 import static org.jboss.netty.channel.Channels.*;
38
39
40
41
42
43 public class NioDatagramWorker extends AbstractNioWorker {
44
45 private final SocketReceiveBufferAllocator bufferAllocator = new SocketReceiveBufferAllocator();
46
47
48
49
50
51
52
53 NioDatagramWorker(final Executor executor) {
54 super(executor);
55 }
56
57 @Override
58 protected boolean read(final SelectionKey key) {
59 final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();
60 ReceiveBufferSizePredictor predictor =
61 channel.getConfig().getReceiveBufferSizePredictor();
62 final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
63 final DatagramChannel nioChannel = (DatagramChannel) key.channel();
64 final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
65
66 final ByteBuffer byteBuffer = bufferAllocator.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
67
68 boolean failure = true;
69 SocketAddress remoteAddress = null;
70 try {
71
72
73 remoteAddress = nioChannel.receive(byteBuffer);
74 failure = false;
75 } catch (ClosedChannelException e) {
76
77 } catch (Throwable t) {
78 fireExceptionCaught(channel, t);
79 }
80
81 if (remoteAddress != null) {
82
83 byteBuffer.flip();
84
85 int readBytes = byteBuffer.remaining();
86 if (readBytes > 0) {
87
88 predictor.previousReceiveBufferSize(readBytes);
89
90 final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
91 buffer.setBytes(0, byteBuffer);
92 buffer.writerIndex(readBytes);
93
94
95 predictor.previousReceiveBufferSize(readBytes);
96
97
98 fireMessageReceived(
99 channel, buffer, remoteAddress);
100 }
101 }
102
103 if (failure) {
104 key.cancel();
105 close(channel, succeededFuture(channel));
106 return false;
107 }
108
109 return true;
110 }
111
112 @Override
113 protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
114 final Thread workerThread = thread;
115 if (workerThread == null || Thread.currentThread() != workerThread) {
116 if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
117
118 registerTask(channel.writeTask);
119 }
120 return true;
121 }
122
123 return false;
124 }
125
126 static void disconnect(NioDatagramChannel channel, ChannelFuture future) {
127 boolean connected = channel.isConnected();
128 boolean iothread = isIoThread(channel);
129 try {
130 channel.getDatagramChannel().disconnect();
131 future.setSuccess();
132 if (connected) {
133 if (iothread) {
134 fireChannelDisconnected(channel);
135 } else {
136 fireChannelDisconnectedLater(channel);
137 }
138 }
139 } catch (Throwable t) {
140 future.setFailure(t);
141 if (iothread) {
142 fireExceptionCaught(channel, t);
143 } else {
144 fireExceptionCaughtLater(channel, t);
145 }
146 }
147 }
148
149 @Override
150 protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
151 return new ChannelRegistionTask((NioDatagramChannel) channel, future);
152 }
153
154
155
156
157
158 private final class ChannelRegistionTask implements Runnable {
159 private final NioDatagramChannel channel;
160
161 private final ChannelFuture future;
162
163 ChannelRegistionTask(final NioDatagramChannel channel,
164 final ChannelFuture future) {
165 this.channel = channel;
166 this.future = future;
167 }
168
169
170
171
172
173 public void run() {
174 final SocketAddress localAddress = channel.getLocalAddress();
175 if (localAddress == null) {
176 if (future != null) {
177 future.setFailure(new ClosedChannelException());
178 }
179 close(channel, succeededFuture(channel));
180 return;
181 }
182
183 try {
184 channel.getDatagramChannel().register(
185 selector, channel.getRawInterestOps(), channel);
186
187 if (future != null) {
188 future.setSuccess();
189 }
190 } catch (final IOException e) {
191 if (future != null) {
192 future.setFailure(e);
193 }
194 close(channel, succeededFuture(channel));
195
196 if (!(e instanceof ClosedChannelException)) {
197 throw new ChannelException(
198 "Failed to register a socket to the selector.", e);
199 }
200 }
201 }
202 }
203
204 @Override
205 public void writeFromUserCode(final AbstractNioChannel<?> channel) {
206
207
208
209
210
211 if (!channel.isBound()) {
212 cleanUpWriteBuffer(channel);
213 return;
214 }
215
216 if (scheduleWriteIfNecessary(channel)) {
217 return;
218 }
219
220
221
222 if (channel.writeSuspended) {
223 return;
224 }
225
226 if (channel.inWriteNowLoop) {
227 return;
228 }
229
230 write0(channel);
231 }
232
233 @Override
234 protected void write0(final AbstractNioChannel<?> channel) {
235
236 boolean addOpWrite = false;
237 boolean removeOpWrite = false;
238
239 long writtenBytes = 0;
240
241 final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
242 final DatagramChannel ch = ((NioDatagramChannel) channel).getDatagramChannel();
243 final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
244 final int writeSpinCount = channel.getConfig().getWriteSpinCount();
245 synchronized (channel.writeLock) {
246
247 channel.inWriteNowLoop = true;
248
249
250 for (;;) {
251 MessageEvent evt = channel.currentWriteEvent;
252 SocketSendBufferPool.SendBuffer buf;
253 if (evt == null) {
254 if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
255 removeOpWrite = true;
256 channel.writeSuspended = false;
257 break;
258 }
259
260 channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
261 } else {
262 buf = channel.currentWriteBuffer;
263 }
264
265 try {
266 long localWrittenBytes = 0;
267 SocketAddress raddr = evt.getRemoteAddress();
268 if (raddr == null) {
269 for (int i = writeSpinCount; i > 0; i --) {
270 localWrittenBytes = buf.transferTo(ch);
271 if (localWrittenBytes != 0) {
272 writtenBytes += localWrittenBytes;
273 break;
274 }
275 if (buf.finished()) {
276 break;
277 }
278 }
279 } else {
280 for (int i = writeSpinCount; i > 0; i --) {
281 localWrittenBytes = buf.transferTo(ch, raddr);
282 if (localWrittenBytes != 0) {
283 writtenBytes += localWrittenBytes;
284 break;
285 }
286 if (buf.finished()) {
287 break;
288 }
289 }
290 }
291
292 if (localWrittenBytes > 0 || buf.finished()) {
293
294 buf.release();
295 ChannelFuture future = evt.getFuture();
296 channel.currentWriteEvent = null;
297 channel.currentWriteBuffer = null;
298 evt = null;
299 buf = null;
300 future.setSuccess();
301 } else {
302
303 addOpWrite = true;
304 channel.writeSuspended = true;
305 break;
306 }
307 } catch (final AsynchronousCloseException e) {
308
309 } catch (final Throwable t) {
310 buf.release();
311 ChannelFuture future = evt.getFuture();
312 channel.currentWriteEvent = null;
313 channel.currentWriteBuffer = null;
314
315
316 buf = null;
317
318 evt = null;
319 future.setFailure(t);
320 fireExceptionCaught(channel, t);
321 }
322 }
323 channel.inWriteNowLoop = false;
324
325
326
327
328
329
330
331 if (addOpWrite) {
332 setOpWrite(channel);
333 } else if (removeOpWrite) {
334 clearOpWrite(channel);
335 }
336 }
337
338 fireWriteComplete(channel, writtenBytes);
339 }
340
341 @Override
342 public void run() {
343 super.run();
344 bufferAllocator.releaseExternalResources();
345 }
346 }