1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION;
22
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.DataOutputStream;
26 import java.io.IOException;
27 import java.net.BindException;
28 import java.net.InetAddress;
29 import java.net.InetSocketAddress;
30 import java.net.ServerSocket;
31 import java.net.Socket;
32 import java.net.SocketException;
33 import java.net.UnknownHostException;
34 import java.nio.ByteBuffer;
35 import java.nio.channels.CancelledKeyException;
36 import java.nio.channels.Channels;
37 import java.nio.channels.ClosedChannelException;
38 import java.nio.channels.GatheringByteChannel;
39 import java.nio.channels.ReadableByteChannel;
40 import java.nio.channels.SelectionKey;
41 import java.nio.channels.Selector;
42 import java.nio.channels.ServerSocketChannel;
43 import java.nio.channels.SocketChannel;
44 import java.nio.channels.WritableByteChannel;
45 import java.security.PrivilegedExceptionAction;
46 import java.util.ArrayList;
47 import java.util.Collections;
48 import java.util.HashMap;
49 import java.util.Iterator;
50 import java.util.LinkedList;
51 import java.util.List;
52 import java.util.Map;
53 import java.util.Random;
54 import java.util.concurrent.ExecutorService;
55 import java.util.concurrent.Executors;
56 import java.util.concurrent.atomic.AtomicInteger;
57
58 import javax.security.sasl.Sasl;
59 import javax.security.sasl.SaslException;
60 import javax.security.sasl.SaslServer;
61
62 import org.apache.commons.logging.Log;
63 import org.apache.commons.logging.LogFactory;
64 import org.apache.hadoop.classification.InterfaceAudience;
65 import org.apache.hadoop.classification.InterfaceStability;
66 import org.apache.hadoop.conf.Configuration;
67 import org.apache.hadoop.hbase.CellScanner;
68 import org.apache.hadoop.hbase.DoNotRetryIOException;
69 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
70 import org.apache.hadoop.hbase.HConstants;
71 import org.apache.hadoop.hbase.HRegionInfo;
72 import org.apache.hadoop.hbase.Server;
73 import org.apache.hadoop.hbase.TableName;
74 import org.apache.hadoop.hbase.client.Operation;
75 import org.apache.hadoop.hbase.codec.Codec;
76 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
77 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
78 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
79 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
80 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
81 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
82 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
83 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
84 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
85 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
86 import org.apache.hadoop.hbase.regionserver.HRegionServer;
87 import org.apache.hadoop.hbase.security.AuthMethod;
88 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
89 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
90 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
91 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
92 import org.apache.hadoop.hbase.security.SaslStatus;
93 import org.apache.hadoop.hbase.security.SaslUtil;
94 import org.apache.hadoop.hbase.security.UserProvider;
95 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
96 import org.apache.hadoop.hbase.util.Bytes;
97 import org.apache.hadoop.hbase.util.Pair;
98 import org.apache.hadoop.io.BytesWritable;
99 import org.apache.hadoop.io.IntWritable;
100 import org.apache.hadoop.io.Writable;
101 import org.apache.hadoop.io.WritableUtils;
102 import org.apache.hadoop.io.compress.CompressionCodec;
103 import org.apache.hadoop.security.AccessControlException;
104 import org.apache.hadoop.security.UserGroupInformation;
105 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
106 import org.apache.hadoop.security.authorize.AuthorizationException;
107 import org.apache.hadoop.security.authorize.PolicyProvider;
108 import org.apache.hadoop.security.authorize.ProxyUsers;
109 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
110 import org.apache.hadoop.security.token.SecretManager;
111 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
112 import org.apache.hadoop.security.token.TokenIdentifier;
113 import org.apache.hadoop.util.StringUtils;
114 import org.cliffc.high_scale_lib.Counter;
115 import org.cloudera.htrace.TraceInfo;
116 import org.codehaus.jackson.map.ObjectMapper;
117
118 import com.google.common.util.concurrent.ThreadFactoryBuilder;
119 import com.google.protobuf.BlockingService;
120 import com.google.protobuf.CodedInputStream;
121 import com.google.protobuf.Descriptors.MethodDescriptor;
122 import com.google.protobuf.Message;
123 import com.google.protobuf.Message.Builder;
124 import com.google.protobuf.ServiceException;
125 import com.google.protobuf.TextFormat;
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
149 @InterfaceStability.Evolving
150 public class RpcServer implements RpcServerInterface {
151
152
153 public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcServer");
154
155 private final boolean authorize;
156 private boolean isSecurityEnabled;
157
158 public static final byte CURRENT_VERSION = 0;
159
160
161
162
163 static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
164
165
166
167
168 private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
169
170 static final int BUFFER_INITIAL_SIZE = 1024;
171
172 private static final String WARN_DELAYED_CALLS = "hbase.ipc.warn.delayedrpc.number";
173
174 private static final int DEFAULT_WARN_DELAYED_CALLS = 1000;
175
176 private final int warnDelayedCalls;
177
178 private AtomicInteger delayedCalls;
179 private final IPCUtil ipcUtil;
180
181 private static final String AUTH_FAILED_FOR = "Auth failed for ";
182 private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
183 private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
184 Server.class.getName());
185 protected SecretManager<TokenIdentifier> secretManager;
186 protected ServiceAuthorizationManager authManager;
187
188
189
190
191 protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
192
193
194 static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
195 = new ThreadLocal<MonitoredRPCHandler>();
196
197 protected final InetSocketAddress isa;
198 protected int port;
199 private int readThreads;
200 protected int maxIdleTime;
201
202
203 protected int thresholdIdleConnections;
204
205
206
207 int maxConnectionsToNuke;
208
209
210
211 protected MetricsHBaseServer metrics;
212
213 protected final Configuration conf;
214
215 private int maxQueueSize;
216 protected int socketSendBufferSize;
217 protected final boolean tcpNoDelay;
218 protected final boolean tcpKeepAlive;
219 protected final long purgeTimeout;
220
221
222
223
224
225
226 volatile boolean running = true;
227
228
229
230
231
232 volatile boolean started = false;
233
234
235
236
237 protected final Counter callQueueSize = new Counter();
238
239 protected final List<Connection> connectionList =
240 Collections.synchronizedList(new LinkedList<Connection>());
241
242
243 private Listener listener = null;
244 protected Responder responder = null;
245 protected int numConnections = 0;
246
247 protected HBaseRPCErrorHandler errorHandler = null;
248
249 private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
250 private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
251
252
253 private static final int DEFAULT_WARN_RESPONSE_TIME = 10000;
254 private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
255
256 private static final ObjectMapper MAPPER = new ObjectMapper();
257
258 private final int warnResponseTime;
259 private final int warnResponseSize;
260 private final Object serverInstance;
261 private final List<BlockingServiceAndInterface> services;
262
263 private final RpcScheduler scheduler;
264
265 private UserProvider userProvider;
266
267
268
269
270
271 class Call implements RpcCallContext {
272 protected int id;
273 protected BlockingService service;
274 protected MethodDescriptor md;
275 protected RequestHeader header;
276 protected Message param;
277
278 protected CellScanner cellScanner;
279 protected Connection connection;
280 protected long timestamp;
281
282
283
284
285 protected BufferChain response;
286 protected boolean delayResponse;
287 protected Responder responder;
288 protected boolean delayReturnValue;
289
290 protected long size;
291 protected boolean isError;
292 protected TraceInfo tinfo;
293
294 Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
295 Message param, CellScanner cellScanner, Connection connection, Responder responder,
296 long size, TraceInfo tinfo) {
297 this.id = id;
298 this.service = service;
299 this.md = md;
300 this.header = header;
301 this.param = param;
302 this.cellScanner = cellScanner;
303 this.connection = connection;
304 this.timestamp = System.currentTimeMillis();
305 this.response = null;
306 this.delayResponse = false;
307 this.responder = responder;
308 this.isError = false;
309 this.size = size;
310 this.tinfo = tinfo;
311 }
312
313 @Override
314 public String toString() {
315 return toShortString() + " param: " +
316 (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
317 " connection: " + connection.toString();
318 }
319
320 protected RequestHeader getHeader() {
321 return this.header;
322 }
323
324
325
326
327
328 String toShortString() {
329 String serviceName = this.connection.service != null?
330 this.connection.service.getDescriptorForType().getName() : "null";
331 StringBuilder sb = new StringBuilder();
332 sb.append("callId: ");
333 sb.append(this.id);
334 sb.append(" service: ");
335 sb.append(serviceName);
336 sb.append(" methodName: ");
337 sb.append((this.md != null) ? this.md.getName() : "");
338 sb.append(" size: ");
339 sb.append(StringUtils.humanReadableInt(this.size));
340 sb.append(" connection: ");
341 sb.append(connection.toString());
342 return sb.toString();
343 }
344
345 String toTraceString() {
346 String serviceName = this.connection.service != null ?
347 this.connection.service.getDescriptorForType().getName() : "";
348 String methodName = (this.md != null) ? this.md.getName() : "";
349 String result = serviceName + "." + methodName;
350 return result;
351 }
352
353 protected synchronized void setSaslTokenResponse(ByteBuffer response) {
354 this.response = new BufferChain(response);
355 }
356
357 protected synchronized void setResponse(Object m, final CellScanner cells,
358 Throwable t, String errorMsg) {
359 if (this.isError) return;
360 if (t != null) this.isError = true;
361 BufferChain bc = null;
362 try {
363 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
364
365 Message result = (Message)m;
366
367 headerBuilder.setCallId(this.id);
368 if (t != null) {
369 ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
370 exceptionBuilder.setExceptionClassName(t.getClass().getName());
371 exceptionBuilder.setStackTrace(errorMsg);
372 exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
373 if (t instanceof RegionMovedException) {
374
375
376
377 RegionMovedException rme = (RegionMovedException)t;
378 exceptionBuilder.setHostname(rme.getHostname());
379 exceptionBuilder.setPort(rme.getPort());
380 }
381
382 headerBuilder.setException(exceptionBuilder.build());
383 }
384 ByteBuffer cellBlock =
385 ipcUtil.buildCellBlock(this.connection.codec, this.connection.compressionCodec, cells);
386 if (cellBlock != null) {
387 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
388
389 cellBlockBuilder.setLength(cellBlock.limit());
390 headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
391 }
392 Message header = headerBuilder.build();
393
394
395
396 ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
397 ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
398 int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
399 (cellBlock == null? 0: cellBlock.limit());
400 ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
401 bc = new BufferChain(bbTotalSize, bbHeader, bbResult, cellBlock);
402 if (connection.useWrap) {
403 bc = wrapWithSasl(bc);
404 }
405 } catch (IOException e) {
406 LOG.warn("Exception while creating response " + e);
407 }
408 this.response = bc;
409 }
410
411 private BufferChain wrapWithSasl(BufferChain bc)
412 throws IOException {
413 if (bc == null) return bc;
414 if (!this.connection.useSasl) return bc;
415
416
417 byte [] responseBytes = bc.getBytes();
418 byte [] token;
419
420
421 synchronized (connection.saslServer) {
422 token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
423 }
424 if (LOG.isDebugEnabled())
425 LOG.debug("Adding saslServer wrapped token of size " + token.length
426 + " as call response.");
427
428 ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
429 ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
430 return new BufferChain(bbTokenLength, bbTokenBytes);
431 }
432
433 @Override
434 public synchronized void endDelay(Object result) throws IOException {
435 assert this.delayResponse;
436 assert this.delayReturnValue || result == null;
437 this.delayResponse = false;
438 delayedCalls.decrementAndGet();
439 if (this.delayReturnValue) {
440 this.setResponse(result, null, null, null);
441 }
442 this.responder.doRespond(this);
443 }
444
445 @Override
446 public synchronized void endDelay() throws IOException {
447 this.endDelay(null);
448 }
449
450 @Override
451 public synchronized void startDelay(boolean delayReturnValue) {
452 assert !this.delayResponse;
453 this.delayResponse = true;
454 this.delayReturnValue = delayReturnValue;
455 int numDelayed = delayedCalls.incrementAndGet();
456 if (numDelayed > warnDelayedCalls) {
457 LOG.warn("Too many delayed calls: limit " + warnDelayedCalls + " current " + numDelayed);
458 }
459 }
460
461 @Override
462 public synchronized void endDelayThrowing(Throwable t) throws IOException {
463 this.setResponse(null, null, t, StringUtils.stringifyException(t));
464 this.delayResponse = false;
465 this.sendResponseIfReady();
466 }
467
468 @Override
469 public synchronized boolean isDelayed() {
470 return this.delayResponse;
471 }
472
473 @Override
474 public synchronized boolean isReturnValueDelayed() {
475 return this.delayReturnValue;
476 }
477
478 @Override
479 public boolean isClientCellBlockSupport() {
480 return this.connection != null && this.connection.codec != null;
481 }
482
483 @Override
484 public long disconnectSince() {
485 if (!connection.channel.isOpen()) {
486 return System.currentTimeMillis() - timestamp;
487 } else {
488 return -1L;
489 }
490 }
491
492 public long getSize() {
493 return this.size;
494 }
495
496
497
498
499
500
501 public synchronized void sendResponseIfReady() throws IOException {
502 if (!this.delayResponse) {
503 this.responder.doRespond(this);
504 }
505 }
506 }
507
508
509 private class Listener extends Thread {
510
511 private ServerSocketChannel acceptChannel = null;
512 private Selector selector = null;
513 private Reader[] readers = null;
514 private int currentReader = 0;
515 private Random rand = new Random();
516 private long lastCleanupRunTime = 0;
517
518 private long cleanupInterval = 10000;
519
520 private int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size",
521 conf.getInt("ipc.server.listen.queue.size", 128));
522
523 private ExecutorService readPool;
524
525 public Listener(final String name) throws IOException {
526 super(name);
527
528 acceptChannel = ServerSocketChannel.open();
529 acceptChannel.configureBlocking(false);
530
531
532 bind(acceptChannel.socket(), isa, backlogLength);
533 port = acceptChannel.socket().getLocalPort();
534
535 selector= Selector.open();
536
537 readers = new Reader[readThreads];
538 readPool = Executors.newFixedThreadPool(readThreads,
539 new ThreadFactoryBuilder().setNameFormat(
540 "RpcServer.reader=%d,port=" + port).setDaemon(true).build());
541 for (int i = 0; i < readThreads; ++i) {
542 Reader reader = new Reader();
543 readers[i] = reader;
544 readPool.execute(reader);
545 }
546 LOG.info(getName() + ": started " + readThreads + " reader(s).");
547
548
549 acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
550 this.setName("RpcServer.listener,port=" + port);
551 this.setDaemon(true);
552 }
553
554
555 private class Reader implements Runnable {
556 private volatile boolean adding = false;
557 private final Selector readSelector;
558
559 Reader() throws IOException {
560 this.readSelector = Selector.open();
561 }
562 public void run() {
563 try {
564 doRunLoop();
565 } finally {
566 try {
567 readSelector.close();
568 } catch (IOException ioe) {
569 LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
570 }
571 }
572 }
573
574 private synchronized void doRunLoop() {
575 while (running) {
576 SelectionKey key = null;
577 try {
578 readSelector.select();
579 while (adding) {
580 this.wait(1000);
581 }
582
583 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
584 while (iter.hasNext()) {
585 key = iter.next();
586 iter.remove();
587 if (key.isValid()) {
588 if (key.isReadable()) {
589 doRead(key);
590 }
591 }
592 key = null;
593 }
594 } catch (InterruptedException e) {
595 if (running) {
596 LOG.info(getName() + ": unexpectedly interrupted: " +
597 StringUtils.stringifyException(e));
598 }
599 } catch (IOException ex) {
600 LOG.error(getName() + ": error in Reader", ex);
601 }
602 }
603 }
604
605
606
607
608
609
610
611
612 public void startAdd() {
613 adding = true;
614 readSelector.wakeup();
615 }
616
617 public synchronized SelectionKey registerChannel(SocketChannel channel)
618 throws IOException {
619 return channel.register(readSelector, SelectionKey.OP_READ);
620 }
621
622 public synchronized void finishAdd() {
623 adding = false;
624 this.notify();
625 }
626 }
627
628
629
630
631
632
633
634
635 private void cleanupConnections(boolean force) {
636 if (force || numConnections > thresholdIdleConnections) {
637 long currentTime = System.currentTimeMillis();
638 if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
639 return;
640 }
641 int start = 0;
642 int end = numConnections - 1;
643 if (!force) {
644 start = rand.nextInt() % numConnections;
645 end = rand.nextInt() % numConnections;
646 int temp;
647 if (end < start) {
648 temp = start;
649 start = end;
650 end = temp;
651 }
652 }
653 int i = start;
654 int numNuked = 0;
655 while (i <= end) {
656 Connection c;
657 synchronized (connectionList) {
658 try {
659 c = connectionList.get(i);
660 } catch (Exception e) {return;}
661 }
662 if (c.timedOut(currentTime)) {
663 if (LOG.isDebugEnabled())
664 LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
665 closeConnection(c);
666 numNuked++;
667 end--;
668
669 c = null;
670 if (!force && numNuked == maxConnectionsToNuke) break;
671 }
672 else i++;
673 }
674 lastCleanupRunTime = System.currentTimeMillis();
675 }
676 }
677
678 @Override
679 public void run() {
680 LOG.info(getName() + ": starting");
681 while (running) {
682 SelectionKey key = null;
683 try {
684 selector.select();
685 Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
686 while (iter.hasNext()) {
687 key = iter.next();
688 iter.remove();
689 try {
690 if (key.isValid()) {
691 if (key.isAcceptable())
692 doAccept(key);
693 }
694 } catch (IOException ignored) {
695 }
696 key = null;
697 }
698 } catch (OutOfMemoryError e) {
699 if (errorHandler != null) {
700 if (errorHandler.checkOOME(e)) {
701 LOG.info(getName() + ": exiting on OutOfMemoryError");
702 closeCurrentConnection(key, e);
703 cleanupConnections(true);
704 return;
705 }
706 } else {
707
708
709
710 LOG.warn(getName() + ": OutOfMemoryError in server select", e);
711 closeCurrentConnection(key, e);
712 cleanupConnections(true);
713 try { Thread.sleep(60000); } catch (Exception ignored) {}
714 }
715 } catch (Exception e) {
716 closeCurrentConnection(key, e);
717 }
718 cleanupConnections(false);
719 }
720 LOG.info(getName() + ": stopping");
721
722 synchronized (this) {
723 try {
724 acceptChannel.close();
725 selector.close();
726 } catch (IOException ignored) { }
727
728 selector= null;
729 acceptChannel= null;
730
731
732 while (!connectionList.isEmpty()) {
733 closeConnection(connectionList.remove(0));
734 }
735 }
736 }
737
738 private void closeCurrentConnection(SelectionKey key, Throwable e) {
739 if (key != null) {
740 Connection c = (Connection)key.attachment();
741 if (c != null) {
742 if (LOG.isDebugEnabled()) {
743 LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
744 (e != null ? " on error " + e.getMessage() : ""));
745 }
746 closeConnection(c);
747 key.attach(null);
748 }
749 }
750 }
751
752 InetSocketAddress getAddress() {
753 return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
754 }
755
756 void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
757 Connection c;
758 ServerSocketChannel server = (ServerSocketChannel) key.channel();
759
760 SocketChannel channel;
761 while ((channel = server.accept()) != null) {
762 try {
763 channel.configureBlocking(false);
764 channel.socket().setTcpNoDelay(tcpNoDelay);
765 channel.socket().setKeepAlive(tcpKeepAlive);
766 } catch (IOException ioe) {
767 channel.close();
768 throw ioe;
769 }
770
771 Reader reader = getReader();
772 try {
773 reader.startAdd();
774 SelectionKey readKey = reader.registerChannel(channel);
775 c = getConnection(channel, System.currentTimeMillis());
776 readKey.attach(c);
777 synchronized (connectionList) {
778 connectionList.add(numConnections, c);
779 numConnections++;
780 }
781 if (LOG.isDebugEnabled())
782 LOG.debug(getName() + ": connection from " + c.toString() +
783 "; # active connections: " + numConnections);
784 } finally {
785 reader.finishAdd();
786 }
787 }
788 }
789
790 void doRead(SelectionKey key) throws InterruptedException {
791 int count = 0;
792 Connection c = (Connection)key.attachment();
793 if (c == null) {
794 return;
795 }
796 c.setLastContact(System.currentTimeMillis());
797 try {
798 count = c.readAndProcess();
799 } catch (InterruptedException ieo) {
800 throw ieo;
801 } catch (Exception e) {
802 LOG.warn(getName() + ": count of bytes read: " + count, e);
803 count = -1;
804 }
805 if (count < 0) {
806 if (LOG.isDebugEnabled()) {
807 LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
808 " because read count=" + count +
809 ". Number of active connections: " + numConnections);
810 }
811 closeConnection(c);
812
813 } else {
814 c.setLastContact(System.currentTimeMillis());
815 }
816 }
817
818 synchronized void doStop() {
819 if (selector != null) {
820 selector.wakeup();
821 Thread.yield();
822 }
823 if (acceptChannel != null) {
824 try {
825 acceptChannel.socket().close();
826 } catch (IOException e) {
827 LOG.info(getName() + ": exception in closing listener socket. " + e);
828 }
829 }
830 readPool.shutdownNow();
831 }
832
833
834
835 Reader getReader() {
836 currentReader = (currentReader + 1) % readers.length;
837 return readers[currentReader];
838 }
839 }
840
841
842 protected class Responder extends Thread {
843 private final Selector writeSelector;
844 private int pending;
845
846 Responder() throws IOException {
847 this.setName("RpcServer.responder");
848 this.setDaemon(true);
849 writeSelector = Selector.open();
850 pending = 0;
851 }
852
853 @Override
854 public void run() {
855 LOG.info(getName() + ": starting");
856 try {
857 doRunLoop();
858 } finally {
859 LOG.info(getName() + ": stopping");
860 try {
861 writeSelector.close();
862 } catch (IOException ioe) {
863 LOG.error(getName() + ": couldn't close write selector", ioe);
864 }
865 }
866 }
867
868 private void doRunLoop() {
869 long lastPurgeTime = 0;
870
871 while (running) {
872 try {
873 waitPending();
874 writeSelector.select(purgeTimeout);
875 Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
876 while (iter.hasNext()) {
877 SelectionKey key = iter.next();
878 iter.remove();
879 try {
880 if (key.isValid() && key.isWritable()) {
881 doAsyncWrite(key);
882 }
883 } catch (IOException e) {
884 LOG.info(getName() + ": asyncWrite", e);
885 }
886 }
887 long now = System.currentTimeMillis();
888 if (now < lastPurgeTime + purgeTimeout) {
889 continue;
890 }
891 lastPurgeTime = now;
892
893
894
895
896 if (LOG.isDebugEnabled()) LOG.debug(getName() + ": checking for old call responses.");
897 ArrayList<Call> calls;
898
899
900 synchronized (writeSelector.keys()) {
901 calls = new ArrayList<Call>(writeSelector.keys().size());
902 iter = writeSelector.keys().iterator();
903 while (iter.hasNext()) {
904 SelectionKey key = iter.next();
905 Call call = (Call)key.attachment();
906 if (call != null && key.channel() == call.connection.channel) {
907 calls.add(call);
908 }
909 }
910 }
911
912 for(Call call : calls) {
913 try {
914 doPurge(call, now);
915 } catch (IOException e) {
916 LOG.warn(getName() + ": error in purging old calls " + e);
917 }
918 }
919 } catch (OutOfMemoryError e) {
920 if (errorHandler != null) {
921 if (errorHandler.checkOOME(e)) {
922 LOG.info(getName() + ": exiting on OutOfMemoryError");
923 return;
924 }
925 } else {
926
927
928
929
930
931 LOG.warn(getName() + ": OutOfMemoryError in server select", e);
932 try { Thread.sleep(60000); } catch (Exception ignored) {}
933 }
934 } catch (Exception e) {
935 LOG.warn(getName() + ": exception in Responder " +
936 StringUtils.stringifyException(e));
937 }
938 }
939 LOG.info(getName() + ": stopped");
940 }
941
942 private void doAsyncWrite(SelectionKey key) throws IOException {
943 Call call = (Call)key.attachment();
944 if (call == null) {
945 return;
946 }
947 if (key.channel() != call.connection.channel) {
948 throw new IOException("doAsyncWrite: bad channel");
949 }
950
951 synchronized(call.connection.responseQueue) {
952 if (processResponse(call.connection.responseQueue, false)) {
953 try {
954 key.interestOps(0);
955 } catch (CancelledKeyException e) {
956
957
958
959
960
961 LOG.warn("Exception while changing ops : " + e);
962 }
963 }
964 }
965 }
966
967
968
969
970
971 private void doPurge(Call call, long now) throws IOException {
972 synchronized (call.connection.responseQueue) {
973 Iterator<Call> iter = call.connection.responseQueue.listIterator(0);
974 while (iter.hasNext()) {
975 Call nextCall = iter.next();
976 if (now > nextCall.timestamp + purgeTimeout) {
977 closeConnection(nextCall.connection);
978 break;
979 }
980 }
981 }
982 }
983
984
985
986
987 private boolean processResponse(final LinkedList<Call> responseQueue, boolean inHandler)
988 throws IOException {
989 boolean error = true;
990 boolean done = false;
991 int numElements;
992 Call call = null;
993 try {
994
995 synchronized (responseQueue) {
996
997
998
999 numElements = responseQueue.size();
1000 if (numElements == 0) {
1001 error = false;
1002 return true;
1003 }
1004
1005
1006
1007 call = responseQueue.removeFirst();
1008 SocketChannel channel = call.connection.channel;
1009
1010
1011
1012 long numBytes = channelWrite(channel, call.response);
1013 if (numBytes < 0) {
1014 return true;
1015 }
1016 if (!call.response.hasRemaining()) {
1017 call.connection.decRpcCount();
1018
1019 if (numElements == 1) {
1020 done = true;
1021 } else {
1022 done = false;
1023 }
1024 if (LOG.isDebugEnabled()) {
1025 LOG.debug(getName() + ": callId: " + call.id + " wrote " + numBytes + " bytes.");
1026 }
1027 } else {
1028
1029
1030
1031
1032 call.connection.responseQueue.addFirst(call);
1033
1034 if (inHandler) {
1035
1036 call.timestamp = System.currentTimeMillis();
1037 if (enqueueInSelector(call))
1038 done = true;
1039 }
1040 if (LOG.isDebugEnabled()) {
1041 LOG.debug(getName() + call.toShortString() + " partially sent, wrote " +
1042 numBytes + " bytes.");
1043 }
1044 }
1045 error = false;
1046 }
1047 } finally {
1048 if (error && call != null) {
1049 LOG.warn(getName() + call.toShortString() + ": output error");
1050 done = true;
1051 closeConnection(call.connection);
1052 }
1053 }
1054 return done;
1055 }
1056
1057
1058
1059
1060 private boolean enqueueInSelector(Call call) throws IOException {
1061 boolean done = false;
1062 incPending();
1063 try {
1064
1065
1066 SocketChannel channel = call.connection.channel;
1067 writeSelector.wakeup();
1068 channel.register(writeSelector, SelectionKey.OP_WRITE, call);
1069 } catch (ClosedChannelException e) {
1070
1071 done = true;
1072 } finally {
1073 decPending();
1074 }
1075 return done;
1076 }
1077
1078
1079
1080
1081 void doRespond(Call call) throws IOException {
1082
1083 call.timestamp = System.currentTimeMillis();
1084
1085 boolean doRegister = false;
1086 synchronized (call.connection.responseQueue) {
1087 call.connection.responseQueue.addLast(call);
1088 if (call.connection.responseQueue.size() == 1) {
1089 doRegister = !processResponse(call.connection.responseQueue, false);
1090 }
1091 }
1092 if (doRegister) {
1093 enqueueInSelector(call);
1094 }
1095 }
1096
1097 private synchronized void incPending() {
1098 pending++;
1099 }
1100
1101 private synchronized void decPending() {
1102 pending--;
1103 notify();
1104 }
1105
1106 private synchronized void waitPending() throws InterruptedException {
1107 while (pending > 0) {
1108 wait();
1109 }
1110 }
1111 }
1112
1113 @SuppressWarnings("serial")
1114 public static class CallQueueTooBigException extends IOException {
1115 CallQueueTooBigException() {
1116 super();
1117 }
1118 }
1119
1120
1121 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1122 value="VO_VOLATILE_INCREMENT",
1123 justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1124 public class Connection {
1125
1126 private boolean connectionPreambleRead = false;
1127
1128 private boolean connectionHeaderRead = false;
1129 protected SocketChannel channel;
1130 private ByteBuffer data;
1131 private ByteBuffer dataLengthBuffer;
1132 protected final LinkedList<Call> responseQueue;
1133 private Counter rpcCount = new Counter();
1134 private long lastContact;
1135 private InetAddress addr;
1136 protected Socket socket;
1137
1138
1139 protected String hostAddress;
1140 protected int remotePort;
1141 ConnectionHeader connectionHeader;
1142
1143
1144
1145 private Codec codec;
1146
1147
1148
1149 private CompressionCodec compressionCodec;
1150 BlockingService service;
1151 protected UserGroupInformation user = null;
1152 private AuthMethod authMethod;
1153 private boolean saslContextEstablished;
1154 private boolean skipInitialSaslHandshake;
1155 private ByteBuffer unwrappedData;
1156
1157 private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1158 boolean useSasl;
1159 SaslServer saslServer;
1160 private boolean useWrap = false;
1161
1162 private static final int AUTHROIZATION_FAILED_CALLID = -1;
1163 private final Call authFailedCall =
1164 new Call(AUTHROIZATION_FAILED_CALLID, this.service, null,
1165 null, null, null, this, null, 0, null);
1166 private ByteArrayOutputStream authFailedResponse =
1167 new ByteArrayOutputStream();
1168
1169 private static final int SASL_CALLID = -33;
1170 private final Call saslCall =
1171 new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null);
1172
1173 public UserGroupInformation attemptingUser = null;
1174
1175 public Connection(SocketChannel channel, long lastContact) {
1176 this.channel = channel;
1177 this.lastContact = lastContact;
1178 this.data = null;
1179 this.dataLengthBuffer = ByteBuffer.allocate(4);
1180 this.socket = channel.socket();
1181 this.addr = socket.getInetAddress();
1182 if (addr == null) {
1183 this.hostAddress = "*Unknown*";
1184 } else {
1185 this.hostAddress = addr.getHostAddress();
1186 }
1187 this.remotePort = socket.getPort();
1188 this.responseQueue = new LinkedList<Call>();
1189 if (socketSendBufferSize != 0) {
1190 try {
1191 socket.setSendBufferSize(socketSendBufferSize);
1192 } catch (IOException e) {
1193 LOG.warn("Connection: unable to set socket send buffer size to " +
1194 socketSendBufferSize);
1195 }
1196 }
1197 }
1198
1199 @Override
1200 public String toString() {
1201 return getHostAddress() + ":" + remotePort;
1202 }
1203
1204 public String getHostAddress() {
1205 return hostAddress;
1206 }
1207
1208 public InetAddress getHostInetAddress() {
1209 return addr;
1210 }
1211
1212 public int getRemotePort() {
1213 return remotePort;
1214 }
1215
1216 public void setLastContact(long lastContact) {
1217 this.lastContact = lastContact;
1218 }
1219
1220 public long getLastContact() {
1221 return lastContact;
1222 }
1223
1224
1225 private boolean isIdle() {
1226 return rpcCount.get() == 0;
1227 }
1228
1229
1230 protected void decRpcCount() {
1231 rpcCount.decrement();
1232 }
1233
1234
1235 protected void incRpcCount() {
1236 rpcCount.increment();
1237 }
1238
1239 protected boolean timedOut(long currentTime) {
1240 return isIdle() && currentTime - lastContact > maxIdleTime;
1241 }
1242
1243 private UserGroupInformation getAuthorizedUgi(String authorizedId)
1244 throws IOException {
1245 if (authMethod == AuthMethod.DIGEST) {
1246 TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1247 secretManager);
1248 UserGroupInformation ugi = tokenId.getUser();
1249 if (ugi == null) {
1250 throw new AccessControlException(
1251 "Can't retrieve username from tokenIdentifier.");
1252 }
1253 ugi.addTokenIdentifier(tokenId);
1254 return ugi;
1255 } else {
1256 return UserGroupInformation.createRemoteUser(authorizedId);
1257 }
1258 }
1259
1260 private void saslReadAndProcess(byte[] saslToken) throws IOException,
1261 InterruptedException {
1262 if (saslContextEstablished) {
1263 if (LOG.isDebugEnabled())
1264 LOG.debug("Have read input token of size " + saslToken.length
1265 + " for processing by saslServer.unwrap()");
1266
1267 if (!useWrap) {
1268 processOneRpc(saslToken);
1269 } else {
1270 byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
1271 processUnwrappedData(plaintextData);
1272 }
1273 } else {
1274 byte[] replyToken = null;
1275 try {
1276 if (saslServer == null) {
1277 switch (authMethod) {
1278 case DIGEST:
1279 if (secretManager == null) {
1280 throw new AccessControlException(
1281 "Server is not configured to do DIGEST authentication.");
1282 }
1283 saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1284 .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1285 SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
1286 secretManager, this));
1287 break;
1288 default:
1289 UserGroupInformation current = UserGroupInformation
1290 .getCurrentUser();
1291 String fullName = current.getUserName();
1292 if (LOG.isDebugEnabled()) {
1293 LOG.debug("Kerberos principal name is " + fullName);
1294 }
1295 final String names[] = SaslUtil.splitKerberosName(fullName);
1296 if (names.length != 3) {
1297 throw new AccessControlException(
1298 "Kerberos principal name does NOT have the expected "
1299 + "hostname part: " + fullName);
1300 }
1301 current.doAs(new PrivilegedExceptionAction<Object>() {
1302 @Override
1303 public Object run() throws SaslException {
1304 saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1305 .getMechanismName(), names[0], names[1],
1306 SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
1307 return null;
1308 }
1309 });
1310 }
1311 if (saslServer == null)
1312 throw new AccessControlException(
1313 "Unable to find SASL server implementation for "
1314 + authMethod.getMechanismName());
1315 if (LOG.isDebugEnabled()) {
1316 LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1317 }
1318 }
1319 if (LOG.isDebugEnabled()) {
1320 LOG.debug("Have read input token of size " + saslToken.length
1321 + " for processing by saslServer.evaluateResponse()");
1322 }
1323 replyToken = saslServer.evaluateResponse(saslToken);
1324 } catch (IOException e) {
1325 IOException sendToClient = e;
1326 Throwable cause = e;
1327 while (cause != null) {
1328 if (cause instanceof InvalidToken) {
1329 sendToClient = (InvalidToken) cause;
1330 break;
1331 }
1332 cause = cause.getCause();
1333 }
1334 doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1335 sendToClient.getLocalizedMessage());
1336 metrics.authenticationFailure();
1337 String clientIP = this.toString();
1338
1339 AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1340 throw e;
1341 }
1342 if (replyToken != null) {
1343 if (LOG.isDebugEnabled()) {
1344 LOG.debug("Will send token of size " + replyToken.length
1345 + " from saslServer.");
1346 }
1347 doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1348 null);
1349 }
1350 if (saslServer.isComplete()) {
1351 String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1352 useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1353 user = getAuthorizedUgi(saslServer.getAuthorizationID());
1354 if (LOG.isDebugEnabled()) {
1355 LOG.debug("SASL server context established. Authenticated client: "
1356 + user + ". Negotiated QoP is "
1357 + saslServer.getNegotiatedProperty(Sasl.QOP));
1358 }
1359 metrics.authenticationSuccess();
1360 AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
1361 saslContextEstablished = true;
1362 }
1363 }
1364 }
1365
1366
1367
1368
1369 private void doRawSaslReply(SaslStatus status, Writable rv,
1370 String errorClass, String error) throws IOException {
1371 ByteBufferOutputStream saslResponse = null;
1372 DataOutputStream out = null;
1373 try {
1374
1375
1376 saslResponse = new ByteBufferOutputStream(256);
1377 out = new DataOutputStream(saslResponse);
1378 out.writeInt(status.state);
1379 if (status == SaslStatus.SUCCESS) {
1380 rv.write(out);
1381 } else {
1382 WritableUtils.writeString(out, errorClass);
1383 WritableUtils.writeString(out, error);
1384 }
1385 saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1386 saslCall.responder = responder;
1387 saslCall.sendResponseIfReady();
1388 } finally {
1389 if (saslResponse != null) {
1390 saslResponse.close();
1391 }
1392 if (out != null) {
1393 out.close();
1394 }
1395 }
1396 }
1397
1398 private void disposeSasl() {
1399 if (saslServer != null) {
1400 try {
1401 saslServer.dispose();
1402 saslServer = null;
1403 } catch (SaslException ignored) {
1404 }
1405 }
1406 }
1407
1408
1409
1410
1411
1412
1413
1414
1415 public int readAndProcess() throws IOException, InterruptedException {
1416 while (true) {
1417
1418
1419
1420
1421 int count;
1422 if (this.dataLengthBuffer.remaining() > 0) {
1423 count = channelRead(channel, this.dataLengthBuffer);
1424 if (count < 0 || this.dataLengthBuffer.remaining() > 0) {
1425 return count;
1426 }
1427 }
1428
1429 if (!connectionPreambleRead) {
1430
1431 this.dataLengthBuffer.flip();
1432 if (!HConstants.RPC_HEADER.equals(dataLengthBuffer)) {
1433 return doBadPreambleHandling("Expected HEADER=" +
1434 Bytes.toStringBinary(HConstants.RPC_HEADER.array()) +
1435 " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1436 " from " + toString());
1437 }
1438
1439 ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1440 count = channelRead(channel, versionAndAuthBytes);
1441 if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1442 return count;
1443 }
1444 int version = versionAndAuthBytes.get(0);
1445 byte authbyte = versionAndAuthBytes.get(1);
1446 this.authMethod = AuthMethod.valueOf(authbyte);
1447 if (version != CURRENT_VERSION) {
1448 String msg = getFatalConnectionString(version, authbyte);
1449 return doBadPreambleHandling(msg, new WrongVersionException(msg));
1450 }
1451 if (authMethod == null) {
1452 String msg = getFatalConnectionString(version, authbyte);
1453 return doBadPreambleHandling(msg, new BadAuthException(msg));
1454 }
1455 if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1456 AccessControlException ae = new AccessControlException("Authentication is required");
1457 setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1458 responder.doRespond(authFailedCall);
1459 throw ae;
1460 }
1461 if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1462 doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1463 SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1464 authMethod = AuthMethod.SIMPLE;
1465
1466
1467
1468 skipInitialSaslHandshake = true;
1469 }
1470 if (authMethod != AuthMethod.SIMPLE) {
1471 useSasl = true;
1472 }
1473 connectionPreambleRead = true;
1474
1475 dataLengthBuffer.clear();
1476 continue;
1477 }
1478
1479
1480 if (data == null) {
1481 dataLengthBuffer.flip();
1482 int dataLength = dataLengthBuffer.getInt();
1483 if (dataLength == RpcClient.PING_CALL_ID) {
1484 if (!useWrap) {
1485 dataLengthBuffer.clear();
1486 return 0;
1487 }
1488 }
1489 if (dataLength < 0) {
1490 throw new IllegalArgumentException("Unexpected data length "
1491 + dataLength + "!! from " + getHostAddress());
1492 }
1493 data = ByteBuffer.allocate(dataLength);
1494 incRpcCount();
1495 }
1496 count = channelRead(channel, data);
1497 if (count < 0) {
1498 return count;
1499 } else if (data.remaining() == 0) {
1500 dataLengthBuffer.clear();
1501 data.flip();
1502 if (skipInitialSaslHandshake) {
1503 data = null;
1504 skipInitialSaslHandshake = false;
1505 continue;
1506 }
1507 boolean headerRead = connectionHeaderRead;
1508 if (useSasl) {
1509 saslReadAndProcess(data.array());
1510 } else {
1511 processOneRpc(data.array());
1512 }
1513 this.data = null;
1514 if (!headerRead) {
1515 continue;
1516 }
1517 } else if (count > 0) {
1518
1519 if (LOG.isTraceEnabled()) LOG.trace("Continue to read rest of data " + data.remaining());
1520 continue;
1521 }
1522 return count;
1523 }
1524 }
1525
1526 private String getFatalConnectionString(final int version, final byte authByte) {
1527 return "serverVersion=" + CURRENT_VERSION +
1528 ", clientVersion=" + version + ", authMethod=" + authByte +
1529 ", authSupported=" + (authMethod != null) + " from " + toString();
1530 }
1531
1532 private int doBadPreambleHandling(final String msg) throws IOException {
1533 return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1534 }
1535
1536 private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1537 LOG.warn(msg);
1538 Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null);
1539 setupResponse(null, fakeCall, e, msg);
1540 responder.doRespond(fakeCall);
1541
1542 return -1;
1543 }
1544
1545
1546 private void processConnectionHeader(byte[] buf) throws IOException {
1547 this.connectionHeader = ConnectionHeader.parseFrom(buf);
1548 String serviceName = connectionHeader.getServiceName();
1549 if (serviceName == null) throw new EmptyServiceNameException();
1550 this.service = getService(services, serviceName);
1551 if (this.service == null) throw new UnknownServiceException(serviceName);
1552 setupCellBlockCodecs(this.connectionHeader);
1553 UserGroupInformation protocolUser = createUser(connectionHeader);
1554 if (!useSasl) {
1555 user = protocolUser;
1556 if (user != null) {
1557 user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1558 }
1559 } else {
1560
1561 user.setAuthenticationMethod(authMethod.authenticationMethod);
1562
1563
1564
1565 if ((protocolUser != null)
1566 && (!protocolUser.getUserName().equals(user.getUserName()))) {
1567 if (authMethod == AuthMethod.DIGEST) {
1568
1569 throw new AccessControlException("Authenticated user (" + user
1570 + ") doesn't match what the client claims to be ("
1571 + protocolUser + ")");
1572 } else {
1573
1574
1575
1576 UserGroupInformation realUser = user;
1577 user = UserGroupInformation.createProxyUser(protocolUser
1578 .getUserName(), realUser);
1579
1580 user.setAuthenticationMethod(AuthenticationMethod.PROXY);
1581 }
1582 }
1583 }
1584 }
1585
1586
1587
1588
1589
1590
1591 private void setupCellBlockCodecs(final ConnectionHeader header)
1592 throws FatalConnectionException {
1593
1594 if (!header.hasCellBlockCodecClass()) return;
1595 String className = header.getCellBlockCodecClass();
1596 if (className == null || className.length() == 0) return;
1597 try {
1598 this.codec = (Codec)Class.forName(className).newInstance();
1599 } catch (Exception e) {
1600 throw new UnsupportedCellCodecException(className, e);
1601 }
1602 if (!header.hasCellBlockCompressorClass()) return;
1603 className = header.getCellBlockCompressorClass();
1604 try {
1605 this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1606 } catch (Exception e) {
1607 throw new UnsupportedCompressionCodecException(className, e);
1608 }
1609 }
1610
1611 private void processUnwrappedData(byte[] inBuf) throws IOException,
1612 InterruptedException {
1613 ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1614
1615 while (true) {
1616 int count = -1;
1617 if (unwrappedDataLengthBuffer.remaining() > 0) {
1618 count = channelRead(ch, unwrappedDataLengthBuffer);
1619 if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1620 return;
1621 }
1622
1623 if (unwrappedData == null) {
1624 unwrappedDataLengthBuffer.flip();
1625 int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1626
1627 if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1628 if (LOG.isDebugEnabled())
1629 LOG.debug("Received ping message");
1630 unwrappedDataLengthBuffer.clear();
1631 continue;
1632 }
1633 unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1634 }
1635
1636 count = channelRead(ch, unwrappedData);
1637 if (count <= 0 || unwrappedData.remaining() > 0)
1638 return;
1639
1640 if (unwrappedData.remaining() == 0) {
1641 unwrappedDataLengthBuffer.clear();
1642 unwrappedData.flip();
1643 processOneRpc(unwrappedData.array());
1644 unwrappedData = null;
1645 }
1646 }
1647 }
1648
1649 private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
1650 if (connectionHeaderRead) {
1651 processRequest(buf);
1652 } else {
1653 processConnectionHeader(buf);
1654 this.connectionHeaderRead = true;
1655 if (!authorizeConnection()) {
1656
1657
1658 throw new AccessControlException("Connection from " + this + " for service " +
1659 connectionHeader.getServiceName() + " is unauthorized for user: " + user);
1660 }
1661 }
1662 }
1663
1664
1665
1666
1667
1668
1669
1670 protected void processRequest(byte[] buf) throws IOException, InterruptedException {
1671 long totalRequestSize = buf.length;
1672 int offset = 0;
1673
1674
1675 CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
1676 int headerSize = cis.readRawVarint32();
1677 offset = cis.getTotalBytesRead();
1678 RequestHeader header = RequestHeader.newBuilder().mergeFrom(buf, offset, headerSize).build();
1679 offset += headerSize;
1680 int id = header.getCallId();
1681 if (LOG.isTraceEnabled()) {
1682 LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1683 " totalRequestSize: " + totalRequestSize + " bytes");
1684 }
1685
1686
1687 if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1688 final Call callTooBig =
1689 new Call(id, this.service, null, null, null, null, this,
1690 responder, totalRequestSize, null);
1691 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1692 setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
1693 "Call queue is full, is ipc.server.max.callqueue.size too small?");
1694 responder.doRespond(callTooBig);
1695 return;
1696 }
1697 MethodDescriptor md = null;
1698 Message param = null;
1699 CellScanner cellScanner = null;
1700 try {
1701 if (header.hasRequestParam() && header.getRequestParam()) {
1702 md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1703 if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1704 Builder builder = this.service.getRequestPrototype(md).newBuilderForType();
1705
1706 cis = CodedInputStream.newInstance(buf, offset, buf.length);
1707 int paramSize = cis.readRawVarint32();
1708 offset += cis.getTotalBytesRead();
1709 if (builder != null) {
1710 param = builder.mergeFrom(buf, offset, paramSize).build();
1711 }
1712 offset += paramSize;
1713 }
1714 if (header.hasCellBlockMeta()) {
1715 cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
1716 buf, offset, buf.length);
1717 }
1718 } catch (Throwable t) {
1719 String msg = "Unable to read call parameter from client " + getHostAddress();
1720 LOG.warn(msg, t);
1721
1722
1723 if (t instanceof LinkageError) {
1724 t = new DoNotRetryIOException(t);
1725 }
1726
1727 if (t instanceof UnsupportedOperationException) {
1728 t = new DoNotRetryIOException(t);
1729 }
1730
1731 final Call readParamsFailedCall =
1732 new Call(id, this.service, null, null, null, null, this,
1733 responder, totalRequestSize, null);
1734 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1735 setupResponse(responseBuffer, readParamsFailedCall, t,
1736 msg + "; " + t.getMessage());
1737 responder.doRespond(readParamsFailedCall);
1738 return;
1739 }
1740
1741 TraceInfo traceInfo = header.hasTraceInfo()
1742 ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1743 : null;
1744 Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1745 totalRequestSize,
1746 traceInfo);
1747 scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));
1748 }
1749
1750 private boolean authorizeConnection() throws IOException {
1751 try {
1752
1753
1754
1755
1756 if (user != null && user.getRealUser() != null
1757 && (authMethod != AuthMethod.DIGEST)) {
1758 ProxyUsers.authorize(user, this.getHostAddress(), conf);
1759 }
1760 authorize(user, connectionHeader, getHostInetAddress());
1761 if (LOG.isDebugEnabled()) {
1762 LOG.debug("Authorized " + TextFormat.shortDebugString(connectionHeader));
1763 }
1764 metrics.authorizationSuccess();
1765 } catch (AuthorizationException ae) {
1766 LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1767 metrics.authorizationFailure();
1768 setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1769 responder.doRespond(authFailedCall);
1770 return false;
1771 }
1772 return true;
1773 }
1774
1775 protected synchronized void close() {
1776 disposeSasl();
1777 data = null;
1778 this.dataLengthBuffer = null;
1779 if (!channel.isOpen())
1780 return;
1781 try {socket.shutdownOutput();} catch(Exception ignored) {}
1782 if (channel.isOpen()) {
1783 try {channel.close();} catch(Exception ignored) {}
1784 }
1785 try {socket.close();} catch(Exception ignored) {}
1786 }
1787
1788 private UserGroupInformation createUser(ConnectionHeader head) {
1789 UserGroupInformation ugi = null;
1790
1791 if (!head.hasUserInfo()) {
1792 return null;
1793 }
1794 UserInformation userInfoProto = head.getUserInfo();
1795 String effectiveUser = null;
1796 if (userInfoProto.hasEffectiveUser()) {
1797 effectiveUser = userInfoProto.getEffectiveUser();
1798 }
1799 String realUser = null;
1800 if (userInfoProto.hasRealUser()) {
1801 realUser = userInfoProto.getRealUser();
1802 }
1803 if (effectiveUser != null) {
1804 if (realUser != null) {
1805 UserGroupInformation realUserUgi =
1806 UserGroupInformation.createRemoteUser(realUser);
1807 ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1808 } else {
1809 ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1810 }
1811 }
1812 return ugi;
1813 }
1814 }
1815
1816
1817
1818
1819
1820
1821
1822 public static class BlockingServiceAndInterface {
1823 private final BlockingService service;
1824 private final Class<?> serviceInterface;
1825 public BlockingServiceAndInterface(final BlockingService service,
1826 final Class<?> serviceInterface) {
1827 this.service = service;
1828 this.serviceInterface = serviceInterface;
1829 }
1830 public Class<?> getServiceInterface() {
1831 return this.serviceInterface;
1832 }
1833 public BlockingService getBlockingService() {
1834 return this.service;
1835 }
1836 }
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848 public RpcServer(final Server serverInstance, final String name,
1849 final List<BlockingServiceAndInterface> services,
1850 final InetSocketAddress isa, Configuration conf,
1851 RpcScheduler scheduler)
1852 throws IOException {
1853 this.serverInstance = serverInstance;
1854 this.services = services;
1855 this.isa = isa;
1856 this.conf = conf;
1857 this.socketSendBufferSize = 0;
1858 this.maxQueueSize = conf.getInt("hbase.ipc.server.max.callqueue.size",
1859 conf.getInt("ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE));
1860 this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size",
1861 conf.getInt("ipc.server.read.threadpool.size", 10));
1862 this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime",
1863 conf.getInt("ipc.client.connection.maxidletime", 1000));
1864 this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max",
1865 conf.getInt("ipc.client.kill.max", 10));
1866 this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold",
1867 conf.getInt("ipc.client.idlethreshold", 4000));
1868 this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
1869 conf.getLong("ipc.client.call.purge.timeout",
1870 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
1871 this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
1872 this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
1873
1874
1875 listener = new Listener(name);
1876 this.port = listener.getAddress().getPort();
1877
1878 this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
1879 this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay",
1880 conf.getBoolean("ipc.server.tcpnodelay", true));
1881 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive",
1882 conf.getBoolean("ipc.server.tcpkeepalive", true));
1883
1884 this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
1885 this.delayedCalls = new AtomicInteger(0);
1886 this.ipcUtil = new IPCUtil(conf);
1887
1888
1889
1890 responder = new Responder();
1891 this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
1892 this.userProvider = UserProvider.instantiate(conf);
1893 this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
1894 if (isSecurityEnabled) {
1895 HBaseSaslRpcServer.init(conf);
1896 }
1897 this.scheduler = scheduler;
1898 this.scheduler.init(new RpcSchedulerContext(this));
1899 }
1900
1901
1902
1903
1904
1905 protected Connection getConnection(SocketChannel channel, long time) {
1906 return new Connection(channel, time);
1907 }
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918 private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
1919 throws IOException {
1920 if (response != null) response.reset();
1921 call.setResponse(null, null, t, error);
1922 }
1923
1924 protected void closeConnection(Connection connection) {
1925 synchronized (connectionList) {
1926 if (connectionList.remove(connection)) {
1927 numConnections--;
1928 }
1929 }
1930 connection.close();
1931 }
1932
1933 Configuration getConf() {
1934 return conf;
1935 }
1936
1937
1938
1939
1940 @Override
1941 public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
1942
1943
1944 @Override
1945 public void start() {
1946 startThreads();
1947 openServer();
1948 }
1949
1950
1951
1952
1953 @Override
1954 public void openServer() {
1955 this.started = true;
1956 }
1957
1958 @Override
1959 public boolean isStarted() {
1960 return this.started;
1961 }
1962
1963
1964
1965
1966
1967 @Override
1968 public synchronized void startThreads() {
1969 AuthenticationTokenSecretManager mgr = createSecretManager();
1970 if (mgr != null) {
1971 setSecretManager(mgr);
1972 mgr.start();
1973 }
1974 this.authManager = new ServiceAuthorizationManager();
1975 HBasePolicyProvider.init(conf, authManager);
1976 responder.start();
1977 listener.start();
1978 scheduler.start();
1979 }
1980
1981 @Override
1982 public void refreshAuthManager(PolicyProvider pp) {
1983
1984
1985 this.authManager.refresh(this.conf, pp);
1986 }
1987
1988 private AuthenticationTokenSecretManager createSecretManager() {
1989 if (!isSecurityEnabled) return null;
1990 if (serverInstance == null) return null;
1991 if (!(serverInstance instanceof org.apache.hadoop.hbase.Server)) return null;
1992 org.apache.hadoop.hbase.Server server = (org.apache.hadoop.hbase.Server)serverInstance;
1993 Configuration conf = server.getConfiguration();
1994 long keyUpdateInterval =
1995 conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
1996 long maxAge =
1997 conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
1998 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
1999 server.getServerName().toString(), keyUpdateInterval, maxAge);
2000 }
2001
2002 public SecretManager<? extends TokenIdentifier> getSecretManager() {
2003 return this.secretManager;
2004 }
2005
2006 @SuppressWarnings("unchecked")
2007 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2008 this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2009 }
2010
2011
2012
2013
2014
2015
2016 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2017 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2018 throws IOException {
2019 try {
2020 status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2021
2022 status.setRPCPacket(param);
2023 status.resume("Servicing call");
2024
2025 long startTime = System.currentTimeMillis();
2026 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2027 Message result = service.callBlockingMethod(md, controller, param);
2028 int processingTime = (int) (System.currentTimeMillis() - startTime);
2029 int qTime = (int) (startTime - receiveTime);
2030 if (LOG.isTraceEnabled()) {
2031 LOG.trace(CurCall.get().toString() +
2032 ", response " + TextFormat.shortDebugString(result) +
2033 " queueTime: " + qTime +
2034 " processingTime: " + processingTime);
2035 }
2036 metrics.dequeuedCall(qTime);
2037 metrics.processedCall(processingTime);
2038 long responseSize = result.getSerializedSize();
2039
2040
2041 boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2042 boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2043 if (tooSlow || tooLarge) {
2044
2045
2046 StringBuilder buffer = new StringBuilder(256);
2047 buffer.append(md.getName());
2048 buffer.append("(");
2049 buffer.append(param.getClass().getName());
2050 buffer.append(")");
2051 logResponse(new Object[]{param},
2052 md.getName(), buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
2053 status.getClient(), startTime, processingTime, qTime,
2054 responseSize);
2055 }
2056 return new Pair<Message, CellScanner>(result,
2057 controller != null? controller.cellScanner(): null);
2058 } catch (Throwable e) {
2059
2060
2061
2062 if (e instanceof ServiceException) e = e.getCause();
2063 if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2064 if (e instanceof IOException) throw (IOException)e;
2065 LOG.error("Unexpected throwable object ", e);
2066 throw new IOException(e.getMessage(), e);
2067 }
2068 }
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084 void logResponse(Object[] params, String methodName, String call, String tag,
2085 String clientAddress, long startTime, int processingTime, int qTime,
2086 long responseSize)
2087 throws IOException {
2088
2089 Map<String, Object> responseInfo = new HashMap<String, Object>();
2090 responseInfo.put("starttimems", startTime);
2091 responseInfo.put("processingtimems", processingTime);
2092 responseInfo.put("queuetimems", qTime);
2093 responseInfo.put("responsesize", responseSize);
2094 responseInfo.put("client", clientAddress);
2095 responseInfo.put("class", serverInstance == null? "": serverInstance.getClass().getSimpleName());
2096 responseInfo.put("method", methodName);
2097 if (params.length == 2 && serverInstance instanceof HRegionServer &&
2098 params[0] instanceof byte[] &&
2099 params[1] instanceof Operation) {
2100
2101
2102 TableName tableName = TableName.valueOf(
2103 HRegionInfo.parseRegionName((byte[]) params[0])[0]);
2104 responseInfo.put("table", tableName.getNameAsString());
2105
2106 responseInfo.putAll(((Operation) params[1]).toMap());
2107
2108 LOG.warn("(operation" + tag + "): " +
2109 MAPPER.writeValueAsString(responseInfo));
2110 } else if (params.length == 1 && serverInstance instanceof HRegionServer &&
2111 params[0] instanceof Operation) {
2112
2113 responseInfo.putAll(((Operation) params[0]).toMap());
2114
2115 LOG.warn("(operation" + tag + "): " +
2116 MAPPER.writeValueAsString(responseInfo));
2117 } else {
2118
2119
2120 responseInfo.put("call", call);
2121 LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2122 }
2123 }
2124
2125
2126 @Override
2127 public synchronized void stop() {
2128 LOG.info("Stopping server on " + port);
2129 running = false;
2130 listener.interrupt();
2131 listener.doStop();
2132 responder.interrupt();
2133 scheduler.stop();
2134 notifyAll();
2135 }
2136
2137
2138
2139
2140
2141
2142 @Override
2143 public synchronized void join() throws InterruptedException {
2144 while (running) {
2145 wait();
2146 }
2147 }
2148
2149
2150
2151
2152
2153 @Override
2154 public synchronized InetSocketAddress getListenerAddress() {
2155 return listener.getAddress();
2156 }
2157
2158
2159
2160
2161
2162 @Override
2163 public void setErrorHandler(HBaseRPCErrorHandler handler) {
2164 this.errorHandler = handler;
2165 }
2166
2167 @Override
2168 public HBaseRPCErrorHandler getErrorHandler() {
2169 return this.errorHandler;
2170 }
2171
2172
2173
2174
2175 public MetricsHBaseServer getMetrics() {
2176 return metrics;
2177 }
2178
2179 @Override
2180 public void addCallSize(final long diff) {
2181 this.callQueueSize.add(diff);
2182 }
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192 @SuppressWarnings("static-access")
2193 public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr)
2194 throws AuthorizationException {
2195 if (authorize) {
2196 Class<?> c = getServiceInterface(services, connection.getServiceName());
2197 this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2198 }
2199 }
2200
2201
2202
2203
2204
2205
2206 private static int NIO_BUFFER_LIMIT = 64 * 1024;
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222 protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2223 throws IOException {
2224 long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
2225 if (count > 0) this.metrics.sentBytes(count);
2226 return count;
2227 }
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241 protected int channelRead(ReadableByteChannel channel,
2242 ByteBuffer buffer) throws IOException {
2243
2244 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2245 channel.read(buffer) : channelIO(channel, null, buffer);
2246 if (count > 0) {
2247 metrics.receivedBytes(count);
2248 }
2249 return count;
2250 }
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265 private static int channelIO(ReadableByteChannel readCh,
2266 WritableByteChannel writeCh,
2267 ByteBuffer buf) throws IOException {
2268
2269 int originalLimit = buf.limit();
2270 int initialRemaining = buf.remaining();
2271 int ret = 0;
2272
2273 while (buf.remaining() > 0) {
2274 try {
2275 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2276 buf.limit(buf.position() + ioSize);
2277
2278 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2279
2280 if (ret < ioSize) {
2281 break;
2282 }
2283
2284 } finally {
2285 buf.limit(originalLimit);
2286 }
2287 }
2288
2289 int nBytes = initialRemaining - buf.remaining();
2290 return (nBytes > 0) ? nBytes : ret;
2291 }
2292
2293
2294
2295
2296
2297
2298
2299 public static RpcCallContext getCurrentCall() {
2300 return CurCall.get();
2301 }
2302
2303
2304
2305
2306
2307
2308 static BlockingServiceAndInterface getServiceAndInterface(
2309 final List<BlockingServiceAndInterface> services, final String serviceName) {
2310 for (BlockingServiceAndInterface bs : services) {
2311 if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2312 return bs;
2313 }
2314 }
2315 return null;
2316 }
2317
2318
2319
2320
2321
2322
2323 static Class<?> getServiceInterface(
2324 final List<BlockingServiceAndInterface> services,
2325 final String serviceName) {
2326 BlockingServiceAndInterface bsasi =
2327 getServiceAndInterface(services, serviceName);
2328 return bsasi == null? null: bsasi.getServiceInterface();
2329 }
2330
2331
2332
2333
2334
2335
2336 static BlockingService getService(
2337 final List<BlockingServiceAndInterface> services,
2338 final String serviceName) {
2339 BlockingServiceAndInterface bsasi =
2340 getServiceAndInterface(services, serviceName);
2341 return bsasi == null? null: bsasi.getBlockingService();
2342 }
2343
2344
2345
2346
2347
2348 public static InetAddress getRemoteIp() {
2349 Call call = CurCall.get();
2350 if (call != null && call.connection.socket != null) {
2351 return call.connection.socket.getInetAddress();
2352 }
2353 return null;
2354 }
2355
2356
2357
2358
2359
2360 public static String getRemoteAddress() {
2361 Call call = CurCall.get();
2362 if (call != null) {
2363 return call.connection.getHostAddress();
2364 }
2365 return null;
2366 }
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378 public static void bind(ServerSocket socket, InetSocketAddress address,
2379 int backlog) throws IOException {
2380 try {
2381 socket.bind(address, backlog);
2382 } catch (BindException e) {
2383 BindException bindException =
2384 new BindException("Problem binding to " + address + " : " +
2385 e.getMessage());
2386 bindException.initCause(e);
2387 throw bindException;
2388 } catch (SocketException e) {
2389
2390
2391 if ("Unresolved address".equals(e.getMessage())) {
2392 throw new UnknownHostException("Invalid hostname for server: " +
2393 address.getHostName());
2394 }
2395 throw e;
2396 }
2397 }
2398
2399 public RpcScheduler getScheduler() {
2400 return scheduler;
2401 }
2402 }