1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.DataOutputStream;
26 import java.io.IOException;
27 import java.net.BindException;
28 import java.net.InetAddress;
29 import java.net.InetSocketAddress;
30 import java.net.ServerSocket;
31 import java.net.Socket;
32 import java.net.SocketException;
33 import java.net.UnknownHostException;
34 import java.nio.ByteBuffer;
35 import java.nio.channels.CancelledKeyException;
36 import java.nio.channels.Channels;
37 import java.nio.channels.ClosedChannelException;
38 import java.nio.channels.GatheringByteChannel;
39 import java.nio.channels.ReadableByteChannel;
40 import java.nio.channels.SelectionKey;
41 import java.nio.channels.Selector;
42 import java.nio.channels.ServerSocketChannel;
43 import java.nio.channels.SocketChannel;
44 import java.nio.channels.WritableByteChannel;
45 import java.security.PrivilegedExceptionAction;
46 import java.util.ArrayList;
47 import java.util.Arrays;
48 import java.util.Collections;
49 import java.util.HashMap;
50 import java.util.Iterator;
51 import java.util.LinkedList;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.Random;
55 import java.util.Set;
56 import java.util.concurrent.ConcurrentHashMap;
57 import java.util.concurrent.ConcurrentLinkedDeque;
58 import java.util.concurrent.ExecutorService;
59 import java.util.concurrent.Executors;
60 import java.util.concurrent.atomic.AtomicInteger;
61 import java.util.concurrent.locks.Lock;
62 import java.util.concurrent.locks.ReentrantLock;
63
64 import javax.security.sasl.Sasl;
65 import javax.security.sasl.SaslException;
66 import javax.security.sasl.SaslServer;
67
68 import org.apache.commons.logging.Log;
69 import org.apache.commons.logging.LogFactory;
70 import org.apache.hadoop.hbase.CallQueueTooBigException;
71 import org.apache.hadoop.hbase.classification.InterfaceAudience;
72 import org.apache.hadoop.hbase.classification.InterfaceStability;
73 import org.apache.hadoop.conf.Configuration;
74 import org.apache.hadoop.hbase.CellScanner;
75 import org.apache.hadoop.hbase.DoNotRetryIOException;
76 import org.apache.hadoop.hbase.HBaseIOException;
77 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
78 import org.apache.hadoop.hbase.HConstants;
79 import org.apache.hadoop.hbase.HRegionInfo;
80 import org.apache.hadoop.hbase.Server;
81 import org.apache.hadoop.hbase.TableName;
82 import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException;
83 import org.apache.hadoop.hbase.client.Operation;
84 import org.apache.hadoop.hbase.client.VersionInfoUtil;
85 import org.apache.hadoop.hbase.codec.Codec;
86 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
87 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
88 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
89 import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
90 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
91 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
92 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
93 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
94 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
95 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
96 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
97 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
98 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
99 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
100 import org.apache.hadoop.hbase.regionserver.HRegionServer;
101 import org.apache.hadoop.hbase.security.AccessDeniedException;
102 import org.apache.hadoop.hbase.security.AuthMethod;
103 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
104 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
105 import org.apache.hadoop.hbase.security.User;
106 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
107 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
108 import org.apache.hadoop.hbase.security.SaslStatus;
109 import org.apache.hadoop.hbase.security.SaslUtil;
110 import org.apache.hadoop.hbase.security.UserProvider;
111 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
112 import org.apache.hadoop.hbase.util.Bytes;
113 import org.apache.hadoop.hbase.util.Counter;
114 import org.apache.hadoop.hbase.util.Pair;
115 import org.apache.hadoop.io.BytesWritable;
116 import org.apache.hadoop.io.IntWritable;
117 import org.apache.hadoop.io.Writable;
118 import org.apache.hadoop.io.WritableUtils;
119 import org.apache.hadoop.io.compress.CompressionCodec;
120 import org.apache.hadoop.security.UserGroupInformation;
121 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
122 import org.apache.hadoop.security.authorize.AuthorizationException;
123 import org.apache.hadoop.security.authorize.PolicyProvider;
124 import org.apache.hadoop.security.authorize.ProxyUsers;
125 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
126 import org.apache.hadoop.security.token.SecretManager;
127 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
128 import org.apache.hadoop.security.token.TokenIdentifier;
129 import org.apache.hadoop.util.StringUtils;
130 import org.codehaus.jackson.map.ObjectMapper;
131 import org.apache.htrace.TraceInfo;
132
133 import com.google.common.util.concurrent.ThreadFactoryBuilder;
134 import com.google.protobuf.BlockingService;
135 import com.google.protobuf.CodedInputStream;
136 import com.google.protobuf.Descriptors.MethodDescriptor;
137 import com.google.protobuf.Message;
138 import com.google.protobuf.ServiceException;
139 import com.google.protobuf.TextFormat;
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
162 @InterfaceStability.Evolving
163 public class RpcServer implements RpcServerInterface, ConfigurationObserver {
164
165 public static final Log LOG = LogFactory.getLog(RpcServer.class);
166 private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
167 = new CallQueueTooBigException();
168
169 private final boolean authorize;
170 private boolean isSecurityEnabled;
171
172 public static final byte CURRENT_VERSION = 0;
173
174
175
176
177 public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
178 "hbase.ipc.server.fallback-to-simple-auth-allowed";
179
180
181
182
183 static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
184
185
186
187
188 private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
189
190 private final IPCUtil ipcUtil;
191
192 private static final String AUTH_FAILED_FOR = "Auth failed for ";
193 private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
194 private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
195 Server.class.getName());
196 protected SecretManager<TokenIdentifier> secretManager;
197 protected ServiceAuthorizationManager authManager;
198
199
200
201
202 protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
203
204
205 static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
206 = new ThreadLocal<MonitoredRPCHandler>();
207
208 protected final InetSocketAddress bindAddress;
209 protected int port;
210 private int readThreads;
211 protected int maxIdleTime;
212
213
214 protected int thresholdIdleConnections;
215
216
217
218 int maxConnectionsToNuke;
219
220
221
222 protected MetricsHBaseServer metrics;
223
224 protected final Configuration conf;
225
226 private int maxQueueSize;
227 protected int socketSendBufferSize;
228 protected final boolean tcpNoDelay;
229 protected final boolean tcpKeepAlive;
230 protected final long purgeTimeout;
231
232
233
234
235
236
237 volatile boolean running = true;
238
239
240
241
242
243 volatile boolean started = false;
244
245
246
247
248 protected final Counter callQueueSize = new Counter();
249
250 protected final List<Connection> connectionList =
251 Collections.synchronizedList(new LinkedList<Connection>());
252
253
254 private Listener listener = null;
255 protected Responder responder = null;
256 protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
257 protected int numConnections = 0;
258
259 protected HBaseRPCErrorHandler errorHandler = null;
260
261 private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
262 private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
263
264
265 private static final int DEFAULT_WARN_RESPONSE_TIME = 10000;
266 private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
267
268 private static final ObjectMapper MAPPER = new ObjectMapper();
269
270 private final int warnResponseTime;
271 private final int warnResponseSize;
272 private final Server server;
273 private final List<BlockingServiceAndInterface> services;
274
275 private final RpcScheduler scheduler;
276
277 private UserProvider userProvider;
278
279 private final BoundedByteBufferPool reservoir;
280
281 private volatile boolean allowFallbackToSimpleAuth;
282
283
284
285
286
287 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
288 @InterfaceStability.Evolving
289 public class Call implements RpcCallContext {
290 protected int id;
291 protected BlockingService service;
292 protected MethodDescriptor md;
293 protected RequestHeader header;
294 protected Message param;
295
296 protected CellScanner cellScanner;
297 protected Connection connection;
298 protected long timestamp;
299
300
301
302
303 protected BufferChain response;
304 protected Responder responder;
305
306 protected long size;
307 protected boolean isError;
308 protected TraceInfo tinfo;
309 private ByteBuffer cellBlock = null;
310
311 private User user;
312 private InetAddress remoteAddress;
313
314 private long responseCellSize = 0;
315 private long responseBlockSize = 0;
316 private boolean retryImmediatelySupported;
317
318 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
319 justification="Can't figure why this complaint is happening... see below")
320 Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
321 Message param, CellScanner cellScanner, Connection connection, Responder responder,
322 long size, TraceInfo tinfo, final InetAddress remoteAddress) {
323 this.id = id;
324 this.service = service;
325 this.md = md;
326 this.header = header;
327 this.param = param;
328 this.cellScanner = cellScanner;
329 this.connection = connection;
330 this.timestamp = System.currentTimeMillis();
331 this.response = null;
332 this.responder = responder;
333 this.isError = false;
334 this.size = size;
335 this.tinfo = tinfo;
336 this.user = connection == null? null: connection.user;
337 this.remoteAddress = remoteAddress;
338 this.retryImmediatelySupported =
339 connection == null? null: connection.retryImmediatelySupported;
340 }
341
342
343
344
345
346 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
347 justification="Presume the lock on processing request held by caller is protection enough")
348 void done() {
349 if (this.cellBlock != null && reservoir != null) {
350
351 reservoir.putBuffer(this.cellBlock);
352 this.cellBlock = null;
353 }
354 this.connection.decRpcCount();
355 }
356
357 @Override
358 public String toString() {
359 return toShortString() + " param: " +
360 (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
361 " connection: " + connection.toString();
362 }
363
364 protected RequestHeader getHeader() {
365 return this.header;
366 }
367
368 public boolean hasPriority() {
369 return this.header.hasPriority();
370 }
371
372 public int getPriority() {
373 return this.header.getPriority();
374 }
375
376
377
378
379
380 String toShortString() {
381 String serviceName = this.connection.service != null ?
382 this.connection.service.getDescriptorForType().getName() : "null";
383 return "callId: " + this.id + " service: " + serviceName +
384 " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
385 " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
386 " connection: " + connection.toString();
387 }
388
389 String toTraceString() {
390 String serviceName = this.connection.service != null ?
391 this.connection.service.getDescriptorForType().getName() : "";
392 String methodName = (this.md != null) ? this.md.getName() : "";
393 return serviceName + "." + methodName;
394 }
395
396 protected synchronized void setSaslTokenResponse(ByteBuffer response) {
397 this.response = new BufferChain(response);
398 }
399
400 protected synchronized void setResponse(Object m, final CellScanner cells,
401 Throwable t, String errorMsg) {
402 if (this.isError) return;
403 if (t != null) this.isError = true;
404 BufferChain bc = null;
405 try {
406 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
407
408 Message result = (Message)m;
409
410 headerBuilder.setCallId(this.id);
411 if (t != null) {
412 ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
413 exceptionBuilder.setExceptionClassName(t.getClass().getName());
414 exceptionBuilder.setStackTrace(errorMsg);
415 exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException ||
416 t instanceof NeedUnmanagedConnectionException);
417 if (t instanceof RegionMovedException) {
418
419
420
421 RegionMovedException rme = (RegionMovedException)t;
422 exceptionBuilder.setHostname(rme.getHostname());
423 exceptionBuilder.setPort(rme.getPort());
424 }
425
426 headerBuilder.setException(exceptionBuilder.build());
427 }
428
429
430
431 this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
432 this.connection.compressionCodec, cells, reservoir);
433 if (this.cellBlock != null) {
434 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
435
436 cellBlockBuilder.setLength(this.cellBlock.limit());
437 headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
438 }
439 Message header = headerBuilder.build();
440
441
442
443 ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
444 ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
445 int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
446 (this.cellBlock == null? 0: this.cellBlock.limit());
447 ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
448 bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
449 if (connection.useWrap) {
450 bc = wrapWithSasl(bc);
451 }
452 } catch (IOException e) {
453 LOG.warn("Exception while creating response " + e);
454 }
455 this.response = bc;
456 }
457
458 private BufferChain wrapWithSasl(BufferChain bc)
459 throws IOException {
460 if (!this.connection.useSasl) return bc;
461
462
463 byte [] responseBytes = bc.getBytes();
464 byte [] token;
465
466
467 synchronized (connection.saslServer) {
468 token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
469 }
470 if (LOG.isTraceEnabled()) {
471 LOG.trace("Adding saslServer wrapped token of size " + token.length
472 + " as call response.");
473 }
474
475 ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
476 ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
477 return new BufferChain(bbTokenLength, bbTokenBytes);
478 }
479
480 @Override
481 public boolean isClientCellBlockSupported() {
482 return this.connection != null && this.connection.codec != null;
483 }
484
485 @Override
486 public long disconnectSince() {
487 if (!connection.channel.isOpen()) {
488 return System.currentTimeMillis() - timestamp;
489 } else {
490 return -1L;
491 }
492 }
493
494 public long getSize() {
495 return this.size;
496 }
497
498 public long getResponseCellSize() {
499 return responseCellSize;
500 }
501
502 public void incrementResponseCellSize(long cellSize) {
503 responseCellSize += cellSize;
504 }
505
506 @Override
507 public long getResponseBlockSize() {
508 return responseBlockSize;
509 }
510
511 @Override
512 public void incrementResponseBlockSize(long blockSize) {
513 responseBlockSize += blockSize;
514 }
515
516 public synchronized void sendResponseIfReady() throws IOException {
517 this.responder.doRespond(this);
518 }
519
520 public UserGroupInformation getRemoteUser() {
521 return connection.ugi;
522 }
523
524 @Override
525 public User getRequestUser() {
526 return user;
527 }
528
529 @Override
530 public String getRequestUserName() {
531 User user = getRequestUser();
532 return user == null? null: user.getShortName();
533 }
534
535 @Override
536 public InetAddress getRemoteAddress() {
537 return remoteAddress;
538 }
539
540 @Override
541 public VersionInfo getClientVersionInfo() {
542 return connection.getVersionInfo();
543 }
544
545 @Override
546 public boolean isRetryImmediatelySupported() {
547 return retryImmediatelySupported;
548 }
549 }
550
551
552 private class Listener extends Thread {
553
554 private ServerSocketChannel acceptChannel = null;
555 private Selector selector = null;
556 private Reader[] readers = null;
557 private int currentReader = 0;
558 private Random rand = new Random();
559 private long lastCleanupRunTime = 0;
560
561 private long cleanupInterval = 10000;
562
563 private int backlogLength;
564
565 private ExecutorService readPool;
566
567 public Listener(final String name) throws IOException {
568 super(name);
569 backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
570
571 acceptChannel = ServerSocketChannel.open();
572 acceptChannel.configureBlocking(false);
573
574
575 bind(acceptChannel.socket(), bindAddress, backlogLength);
576 port = acceptChannel.socket().getLocalPort();
577
578 selector= Selector.open();
579
580 readers = new Reader[readThreads];
581 readPool = Executors.newFixedThreadPool(readThreads,
582 new ThreadFactoryBuilder().setNameFormat(
583 "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
584 ",port=" + port).setDaemon(true).build());
585 for (int i = 0; i < readThreads; ++i) {
586 Reader reader = new Reader();
587 readers[i] = reader;
588 readPool.execute(reader);
589 }
590 LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
591
592
593 acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
594 this.setName("RpcServer.listener,port=" + port);
595 this.setDaemon(true);
596 }
597
598
599 private class Reader implements Runnable {
600 private volatile boolean adding = false;
601 private final Selector readSelector;
602
603 Reader() throws IOException {
604 this.readSelector = Selector.open();
605 }
606 @Override
607 public void run() {
608 try {
609 doRunLoop();
610 } finally {
611 try {
612 readSelector.close();
613 } catch (IOException ioe) {
614 LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
615 }
616 }
617 }
618
619 private synchronized void doRunLoop() {
620 while (running) {
621 try {
622 readSelector.select();
623 while (adding) {
624 this.wait(1000);
625 }
626
627 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
628 while (iter.hasNext()) {
629 SelectionKey key = iter.next();
630 iter.remove();
631 if (key.isValid()) {
632 if (key.isReadable()) {
633 doRead(key);
634 }
635 }
636 }
637 } catch (InterruptedException e) {
638 LOG.debug("Interrupted while sleeping");
639 return;
640 } catch (IOException ex) {
641 LOG.info(getName() + ": IOException in Reader", ex);
642 }
643 }
644 }
645
646
647
648
649
650
651
652
653 public void startAdd() {
654 adding = true;
655 readSelector.wakeup();
656 }
657
658 public synchronized SelectionKey registerChannel(SocketChannel channel)
659 throws IOException {
660 return channel.register(readSelector, SelectionKey.OP_READ);
661 }
662
663 public synchronized void finishAdd() {
664 adding = false;
665 this.notify();
666 }
667 }
668
669
670
671
672
673
674
675
676 private void cleanupConnections(boolean force) {
677 if (force || numConnections > thresholdIdleConnections) {
678 long currentTime = System.currentTimeMillis();
679 if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
680 return;
681 }
682 int start = 0;
683 int end = numConnections - 1;
684 if (!force) {
685 start = rand.nextInt() % numConnections;
686 end = rand.nextInt() % numConnections;
687 int temp;
688 if (end < start) {
689 temp = start;
690 start = end;
691 end = temp;
692 }
693 }
694 int i = start;
695 int numNuked = 0;
696 while (i <= end) {
697 Connection c;
698 synchronized (connectionList) {
699 try {
700 c = connectionList.get(i);
701 } catch (Exception e) {return;}
702 }
703 if (c.timedOut(currentTime)) {
704 if (LOG.isDebugEnabled())
705 LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
706 closeConnection(c);
707 numNuked++;
708 end--;
709
710 c = null;
711 if (!force && numNuked == maxConnectionsToNuke) break;
712 }
713 else i++;
714 }
715 lastCleanupRunTime = System.currentTimeMillis();
716 }
717 }
718
719 @Override
720 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
721 justification="selector access is not synchronized; seems fine but concerned changing " +
722 "it will have per impact")
723 public void run() {
724 LOG.info(getName() + ": starting");
725 while (running) {
726 SelectionKey key = null;
727 try {
728 selector.select();
729 Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
730 while (iter.hasNext()) {
731 key = iter.next();
732 iter.remove();
733 try {
734 if (key.isValid()) {
735 if (key.isAcceptable())
736 doAccept(key);
737 }
738 } catch (IOException ignored) {
739 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
740 }
741 key = null;
742 }
743 } catch (OutOfMemoryError e) {
744 if (errorHandler != null) {
745 if (errorHandler.checkOOME(e)) {
746 LOG.info(getName() + ": exiting on OutOfMemoryError");
747 closeCurrentConnection(key, e);
748 cleanupConnections(true);
749 return;
750 }
751 } else {
752
753
754
755 LOG.warn(getName() + ": OutOfMemoryError in server select", e);
756 closeCurrentConnection(key, e);
757 cleanupConnections(true);
758 try {
759 Thread.sleep(60000);
760 } catch (InterruptedException ex) {
761 LOG.debug("Interrupted while sleeping");
762 return;
763 }
764 }
765 } catch (Exception e) {
766 closeCurrentConnection(key, e);
767 }
768 cleanupConnections(false);
769 }
770
771 LOG.info(getName() + ": stopping");
772
773 synchronized (this) {
774 try {
775 acceptChannel.close();
776 selector.close();
777 } catch (IOException ignored) {
778 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
779 }
780
781 selector= null;
782 acceptChannel= null;
783
784
785 while (!connectionList.isEmpty()) {
786 closeConnection(connectionList.remove(0));
787 }
788 }
789 }
790
791 private void closeCurrentConnection(SelectionKey key, Throwable e) {
792 if (key != null) {
793 Connection c = (Connection)key.attachment();
794 if (c != null) {
795 if (LOG.isDebugEnabled()) {
796 LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
797 (e != null ? " on error " + e.getMessage() : ""));
798 }
799 closeConnection(c);
800 key.attach(null);
801 }
802 }
803 }
804
805 InetSocketAddress getAddress() {
806 return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
807 }
808
809 void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
810 Connection c;
811 ServerSocketChannel server = (ServerSocketChannel) key.channel();
812
813 SocketChannel channel;
814 while ((channel = server.accept()) != null) {
815 try {
816 channel.configureBlocking(false);
817 channel.socket().setTcpNoDelay(tcpNoDelay);
818 channel.socket().setKeepAlive(tcpKeepAlive);
819 } catch (IOException ioe) {
820 channel.close();
821 throw ioe;
822 }
823
824 Reader reader = getReader();
825 try {
826 reader.startAdd();
827 SelectionKey readKey = reader.registerChannel(channel);
828 c = getConnection(channel, System.currentTimeMillis());
829 readKey.attach(c);
830 synchronized (connectionList) {
831 connectionList.add(numConnections, c);
832 numConnections++;
833 }
834 if (LOG.isDebugEnabled())
835 LOG.debug(getName() + ": connection from " + c.toString() +
836 "; # active connections: " + numConnections);
837 } finally {
838 reader.finishAdd();
839 }
840 }
841 }
842
843 void doRead(SelectionKey key) throws InterruptedException {
844 int count;
845 Connection c = (Connection) key.attachment();
846 if (c == null) {
847 return;
848 }
849 c.setLastContact(System.currentTimeMillis());
850 try {
851 count = c.readAndProcess();
852
853 if (count > 0) {
854 c.setLastContact(System.currentTimeMillis());
855 }
856
857 } catch (InterruptedException ieo) {
858 throw ieo;
859 } catch (Exception e) {
860 if (LOG.isDebugEnabled()) {
861 LOG.debug(getName() + ": Caught exception while reading:" + e.getMessage());
862 }
863 count = -1;
864 }
865 if (count < 0) {
866 if (LOG.isDebugEnabled()) {
867 LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
868 " because read count=" + count +
869 ". Number of active connections: " + numConnections);
870 }
871 closeConnection(c);
872 }
873 }
874
875 synchronized void doStop() {
876 if (selector != null) {
877 selector.wakeup();
878 Thread.yield();
879 }
880 if (acceptChannel != null) {
881 try {
882 acceptChannel.socket().close();
883 } catch (IOException e) {
884 LOG.info(getName() + ": exception in closing listener socket. " + e);
885 }
886 }
887 readPool.shutdownNow();
888 }
889
890
891
892 Reader getReader() {
893 currentReader = (currentReader + 1) % readers.length;
894 return readers[currentReader];
895 }
896 }
897
898
899 protected class Responder extends Thread {
900 private final Selector writeSelector;
901 private final Set<Connection> writingCons =
902 Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
903
904 Responder() throws IOException {
905 this.setName("RpcServer.responder");
906 this.setDaemon(true);
907 writeSelector = Selector.open();
908 }
909
910 @Override
911 public void run() {
912 LOG.info(getName() + ": starting");
913 try {
914 doRunLoop();
915 } finally {
916 LOG.info(getName() + ": stopping");
917 try {
918 writeSelector.close();
919 } catch (IOException ioe) {
920 LOG.error(getName() + ": couldn't close write selector", ioe);
921 }
922 }
923 }
924
925
926
927
928
929 private void registerWrites() {
930 Iterator<Connection> it = writingCons.iterator();
931 while (it.hasNext()) {
932 Connection c = it.next();
933 it.remove();
934 SelectionKey sk = c.channel.keyFor(writeSelector);
935 try {
936 if (sk == null) {
937 try {
938 c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
939 } catch (ClosedChannelException e) {
940
941 if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
942 }
943 } else {
944 sk.interestOps(SelectionKey.OP_WRITE);
945 }
946 } catch (CancelledKeyException e) {
947
948 if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
949 }
950 }
951 }
952
953
954
955
956 public void registerForWrite(Connection c) {
957 if (writingCons.add(c)) {
958 writeSelector.wakeup();
959 }
960 }
961
962 private void doRunLoop() {
963 long lastPurgeTime = 0;
964 while (running) {
965 try {
966 registerWrites();
967 int keyCt = writeSelector.select(purgeTimeout);
968 if (keyCt == 0) {
969 continue;
970 }
971
972 Set<SelectionKey> keys = writeSelector.selectedKeys();
973 Iterator<SelectionKey> iter = keys.iterator();
974 while (iter.hasNext()) {
975 SelectionKey key = iter.next();
976 iter.remove();
977 try {
978 if (key.isValid() && key.isWritable()) {
979 doAsyncWrite(key);
980 }
981 } catch (IOException e) {
982 LOG.debug(getName() + ": asyncWrite", e);
983 }
984 }
985
986 lastPurgeTime = purge(lastPurgeTime);
987
988 } catch (OutOfMemoryError e) {
989 if (errorHandler != null) {
990 if (errorHandler.checkOOME(e)) {
991 LOG.info(getName() + ": exiting on OutOfMemoryError");
992 return;
993 }
994 } else {
995
996
997
998
999
1000 LOG.warn(getName() + ": OutOfMemoryError in server select", e);
1001 try {
1002 Thread.sleep(60000);
1003 } catch (InterruptedException ex) {
1004 LOG.debug("Interrupted while sleeping");
1005 return;
1006 }
1007 }
1008 } catch (Exception e) {
1009 LOG.warn(getName() + ": exception in Responder " +
1010 StringUtils.stringifyException(e), e);
1011 }
1012 }
1013 LOG.info(getName() + ": stopped");
1014 }
1015
1016
1017
1018
1019
1020
1021 private long purge(long lastPurgeTime) {
1022 long now = System.currentTimeMillis();
1023 if (now < lastPurgeTime + purgeTimeout) {
1024 return lastPurgeTime;
1025 }
1026
1027 ArrayList<Connection> conWithOldCalls = new ArrayList<Connection>();
1028
1029 synchronized (writeSelector.keys()) {
1030 for (SelectionKey key : writeSelector.keys()) {
1031 Connection connection = (Connection) key.attachment();
1032 if (connection == null) {
1033 throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
1034 }
1035 Call call = connection.responseQueue.peekFirst();
1036 if (call != null && now > call.timestamp + purgeTimeout) {
1037 conWithOldCalls.add(call.connection);
1038 }
1039 }
1040 }
1041
1042
1043 for (Connection connection : conWithOldCalls) {
1044 closeConnection(connection);
1045 }
1046
1047 return now;
1048 }
1049
1050 private void doAsyncWrite(SelectionKey key) throws IOException {
1051 Connection connection = (Connection) key.attachment();
1052 if (connection == null) {
1053 throw new IOException("doAsyncWrite: no connection");
1054 }
1055 if (key.channel() != connection.channel) {
1056 throw new IOException("doAsyncWrite: bad channel");
1057 }
1058
1059 if (processAllResponses(connection)) {
1060 try {
1061
1062
1063 key.interestOps(0);
1064 } catch (CancelledKeyException e) {
1065
1066
1067
1068
1069
1070 LOG.warn("Exception while changing ops : " + e);
1071 }
1072 }
1073 }
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083 private boolean processResponse(final Call call) throws IOException {
1084 boolean error = true;
1085 try {
1086
1087 long numBytes = channelWrite(call.connection.channel, call.response);
1088 if (numBytes < 0) {
1089 throw new HBaseIOException("Error writing on the socket " +
1090 "for the call:" + call.toShortString());
1091 }
1092 error = false;
1093 } finally {
1094 if (error) {
1095 LOG.debug(getName() + call.toShortString() + ": output error -- closing");
1096 closeConnection(call.connection);
1097 }
1098 }
1099
1100 if (!call.response.hasRemaining()) {
1101 call.done();
1102 return true;
1103 } else {
1104 return false;
1105 }
1106 }
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116 private boolean processAllResponses(final Connection connection) throws IOException {
1117
1118 connection.responseWriteLock.lock();
1119 try {
1120 for (int i = 0; i < 20; i++) {
1121
1122 Call call = connection.responseQueue.pollFirst();
1123 if (call == null) {
1124 return true;
1125 }
1126 if (!processResponse(call)) {
1127 connection.responseQueue.addFirst(call);
1128 return false;
1129 }
1130 }
1131 } finally {
1132 connection.responseWriteLock.unlock();
1133 }
1134
1135 return connection.responseQueue.isEmpty();
1136 }
1137
1138
1139
1140
1141 void doRespond(Call call) throws IOException {
1142 boolean added = false;
1143
1144
1145
1146 if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
1147 try {
1148 if (call.connection.responseQueue.isEmpty()) {
1149
1150
1151 if (processResponse(call)) {
1152 return;
1153 }
1154
1155 call.connection.responseQueue.addFirst(call);
1156 added = true;
1157 }
1158 } finally {
1159 call.connection.responseWriteLock.unlock();
1160 }
1161 }
1162
1163 if (!added) {
1164 call.connection.responseQueue.addLast(call);
1165 }
1166 call.responder.registerForWrite(call.connection);
1167
1168
1169 call.timestamp = System.currentTimeMillis();
1170 }
1171 }
1172
1173
1174 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1175 value="VO_VOLATILE_INCREMENT",
1176 justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1177 public class Connection {
1178
1179 private boolean connectionPreambleRead = false;
1180
1181 private boolean connectionHeaderRead = false;
1182 protected SocketChannel channel;
1183 private ByteBuffer data;
1184 private ByteBuffer dataLengthBuffer;
1185 protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
1186 private final Lock responseWriteLock = new ReentrantLock();
1187 private Counter rpcCount = new Counter();
1188 private long lastContact;
1189 private InetAddress addr;
1190 protected Socket socket;
1191
1192
1193 protected String hostAddress;
1194 protected int remotePort;
1195 ConnectionHeader connectionHeader;
1196
1197
1198
1199 private Codec codec;
1200
1201
1202
1203 private CompressionCodec compressionCodec;
1204 BlockingService service;
1205
1206 private AuthMethod authMethod;
1207 private boolean saslContextEstablished;
1208 private boolean skipInitialSaslHandshake;
1209 private ByteBuffer unwrappedData;
1210
1211 private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1212 boolean useSasl;
1213 SaslServer saslServer;
1214 private boolean useWrap = false;
1215
1216 private static final int AUTHORIZATION_FAILED_CALLID = -1;
1217 private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
1218 null, null, this, null, 0, null, null);
1219 private ByteArrayOutputStream authFailedResponse =
1220 new ByteArrayOutputStream();
1221
1222 private static final int SASL_CALLID = -33;
1223 private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
1224 0, null, null);
1225
1226
1227 private boolean authenticatedWithFallback;
1228
1229 private boolean retryImmediatelySupported = false;
1230
1231 public UserGroupInformation attemptingUser = null;
1232 protected User user = null;
1233 protected UserGroupInformation ugi = null;
1234
1235 public Connection(SocketChannel channel, long lastContact) {
1236 this.channel = channel;
1237 this.lastContact = lastContact;
1238 this.data = null;
1239 this.dataLengthBuffer = ByteBuffer.allocate(4);
1240 this.socket = channel.socket();
1241 this.addr = socket.getInetAddress();
1242 if (addr == null) {
1243 this.hostAddress = "*Unknown*";
1244 } else {
1245 this.hostAddress = addr.getHostAddress();
1246 }
1247 this.remotePort = socket.getPort();
1248 if (socketSendBufferSize != 0) {
1249 try {
1250 socket.setSendBufferSize(socketSendBufferSize);
1251 } catch (IOException e) {
1252 LOG.warn("Connection: unable to set socket send buffer size to " +
1253 socketSendBufferSize);
1254 }
1255 }
1256 }
1257
1258 @Override
1259 public String toString() {
1260 return getHostAddress() + ":" + remotePort;
1261 }
1262
1263 public String getHostAddress() {
1264 return hostAddress;
1265 }
1266
1267 public InetAddress getHostInetAddress() {
1268 return addr;
1269 }
1270
1271 public int getRemotePort() {
1272 return remotePort;
1273 }
1274
1275 public void setLastContact(long lastContact) {
1276 this.lastContact = lastContact;
1277 }
1278
1279 public VersionInfo getVersionInfo() {
1280 if (connectionHeader.hasVersionInfo()) {
1281 return connectionHeader.getVersionInfo();
1282 }
1283 return null;
1284 }
1285
1286
1287 private boolean isIdle() {
1288 return rpcCount.get() == 0;
1289 }
1290
1291
1292 protected void decRpcCount() {
1293 rpcCount.decrement();
1294 }
1295
1296
1297 protected void incRpcCount() {
1298 rpcCount.increment();
1299 }
1300
1301 protected boolean timedOut(long currentTime) {
1302 return isIdle() && currentTime - lastContact > maxIdleTime;
1303 }
1304
1305 private UserGroupInformation getAuthorizedUgi(String authorizedId)
1306 throws IOException {
1307 UserGroupInformation authorizedUgi;
1308 if (authMethod == AuthMethod.DIGEST) {
1309 TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1310 secretManager);
1311 authorizedUgi = tokenId.getUser();
1312 if (authorizedUgi == null) {
1313 throw new AccessDeniedException(
1314 "Can't retrieve username from tokenIdentifier.");
1315 }
1316 authorizedUgi.addTokenIdentifier(tokenId);
1317 } else {
1318 authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
1319 }
1320 authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
1321 return authorizedUgi;
1322 }
1323
1324 private void saslReadAndProcess(byte[] saslToken) throws IOException,
1325 InterruptedException {
1326 if (saslContextEstablished) {
1327 if (LOG.isTraceEnabled())
1328 LOG.trace("Have read input token of size " + saslToken.length
1329 + " for processing by saslServer.unwrap()");
1330
1331 if (!useWrap) {
1332 processOneRpc(saslToken);
1333 } else {
1334 byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
1335 processUnwrappedData(plaintextData);
1336 }
1337 } else {
1338 byte[] replyToken;
1339 try {
1340 if (saslServer == null) {
1341 switch (authMethod) {
1342 case DIGEST:
1343 if (secretManager == null) {
1344 throw new AccessDeniedException(
1345 "Server is not configured to do DIGEST authentication.");
1346 }
1347 saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1348 .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1349 SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
1350 secretManager, this));
1351 break;
1352 default:
1353 UserGroupInformation current = UserGroupInformation.getCurrentUser();
1354 String fullName = current.getUserName();
1355 if (LOG.isDebugEnabled()) {
1356 LOG.debug("Kerberos principal name is " + fullName);
1357 }
1358 final String names[] = SaslUtil.splitKerberosName(fullName);
1359 if (names.length != 3) {
1360 throw new AccessDeniedException(
1361 "Kerberos principal name does NOT have the expected "
1362 + "hostname part: " + fullName);
1363 }
1364 current.doAs(new PrivilegedExceptionAction<Object>() {
1365 @Override
1366 public Object run() throws SaslException {
1367 saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1368 .getMechanismName(), names[0], names[1],
1369 SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
1370 return null;
1371 }
1372 });
1373 }
1374 if (saslServer == null)
1375 throw new AccessDeniedException(
1376 "Unable to find SASL server implementation for "
1377 + authMethod.getMechanismName());
1378 if (LOG.isDebugEnabled()) {
1379 LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1380 }
1381 }
1382 if (LOG.isDebugEnabled()) {
1383 LOG.debug("Have read input token of size " + saslToken.length
1384 + " for processing by saslServer.evaluateResponse()");
1385 }
1386 replyToken = saslServer.evaluateResponse(saslToken);
1387 } catch (IOException e) {
1388 IOException sendToClient = e;
1389 Throwable cause = e;
1390 while (cause != null) {
1391 if (cause instanceof InvalidToken) {
1392 sendToClient = (InvalidToken) cause;
1393 break;
1394 }
1395 cause = cause.getCause();
1396 }
1397 doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1398 sendToClient.getLocalizedMessage());
1399 metrics.authenticationFailure();
1400 String clientIP = this.toString();
1401
1402 AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1403 throw e;
1404 }
1405 if (replyToken != null) {
1406 if (LOG.isDebugEnabled()) {
1407 LOG.debug("Will send token of size " + replyToken.length
1408 + " from saslServer.");
1409 }
1410 doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1411 null);
1412 }
1413 if (saslServer.isComplete()) {
1414 String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1415 useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1416 ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
1417 if (LOG.isDebugEnabled()) {
1418 LOG.debug("SASL server context established. Authenticated client: "
1419 + ugi + ". Negotiated QoP is "
1420 + saslServer.getNegotiatedProperty(Sasl.QOP));
1421 }
1422 metrics.authenticationSuccess();
1423 AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1424 saslContextEstablished = true;
1425 }
1426 }
1427 }
1428
1429
1430
1431
1432 private void doRawSaslReply(SaslStatus status, Writable rv,
1433 String errorClass, String error) throws IOException {
1434 ByteBufferOutputStream saslResponse = null;
1435 DataOutputStream out = null;
1436 try {
1437
1438
1439 saslResponse = new ByteBufferOutputStream(256);
1440 out = new DataOutputStream(saslResponse);
1441 out.writeInt(status.state);
1442 if (status == SaslStatus.SUCCESS) {
1443 rv.write(out);
1444 } else {
1445 WritableUtils.writeString(out, errorClass);
1446 WritableUtils.writeString(out, error);
1447 }
1448 saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1449 saslCall.responder = responder;
1450 saslCall.sendResponseIfReady();
1451 } finally {
1452 if (saslResponse != null) {
1453 saslResponse.close();
1454 }
1455 if (out != null) {
1456 out.close();
1457 }
1458 }
1459 }
1460
1461 private void disposeSasl() {
1462 if (saslServer != null) {
1463 try {
1464 saslServer.dispose();
1465 saslServer = null;
1466 } catch (SaslException ignored) {
1467
1468 }
1469 }
1470 }
1471
1472 private int readPreamble() throws IOException {
1473 int count;
1474
1475 this.dataLengthBuffer.flip();
1476 if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
1477 return doBadPreambleHandling("Expected HEADER=" +
1478 Bytes.toStringBinary(HConstants.RPC_HEADER) +
1479 " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1480 " from " + toString());
1481 }
1482
1483 ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1484 count = channelRead(channel, versionAndAuthBytes);
1485 if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1486 return count;
1487 }
1488 int version = versionAndAuthBytes.get(0);
1489 byte authbyte = versionAndAuthBytes.get(1);
1490 this.authMethod = AuthMethod.valueOf(authbyte);
1491 if (version != CURRENT_VERSION) {
1492 String msg = getFatalConnectionString(version, authbyte);
1493 return doBadPreambleHandling(msg, new WrongVersionException(msg));
1494 }
1495 if (authMethod == null) {
1496 String msg = getFatalConnectionString(version, authbyte);
1497 return doBadPreambleHandling(msg, new BadAuthException(msg));
1498 }
1499 if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1500 if (allowFallbackToSimpleAuth) {
1501 metrics.authenticationFallback();
1502 authenticatedWithFallback = true;
1503 } else {
1504 AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1505 setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1506 responder.doRespond(authFailedCall);
1507 throw ae;
1508 }
1509 }
1510 if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1511 doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1512 SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1513 authMethod = AuthMethod.SIMPLE;
1514
1515
1516
1517 skipInitialSaslHandshake = true;
1518 }
1519 if (authMethod != AuthMethod.SIMPLE) {
1520 useSasl = true;
1521 }
1522
1523 dataLengthBuffer.clear();
1524 connectionPreambleRead = true;
1525 return count;
1526 }
1527
1528 private int read4Bytes() throws IOException {
1529 if (this.dataLengthBuffer.remaining() > 0) {
1530 return channelRead(channel, this.dataLengthBuffer);
1531 } else {
1532 return 0;
1533 }
1534 }
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544 public int readAndProcess() throws IOException, InterruptedException {
1545
1546
1547
1548
1549 int count = read4Bytes();
1550 if (count < 0 || dataLengthBuffer.remaining() > 0) {
1551 return count;
1552 }
1553
1554
1555 if (!connectionPreambleRead) {
1556 count = readPreamble();
1557 if (!connectionPreambleRead) {
1558 return count;
1559 }
1560
1561 count = read4Bytes();
1562 if (count < 0 || dataLengthBuffer.remaining() > 0) {
1563 return count;
1564 }
1565 }
1566
1567
1568
1569 if (data == null) {
1570 dataLengthBuffer.flip();
1571 int dataLength = dataLengthBuffer.getInt();
1572 if (dataLength == RpcClient.PING_CALL_ID) {
1573 if (!useWrap) {
1574 dataLengthBuffer.clear();
1575 return 0;
1576 }
1577 }
1578 if (dataLength < 0) {
1579 throw new IllegalArgumentException("Unexpected data length "
1580 + dataLength + "!! from " + getHostAddress());
1581 }
1582 data = ByteBuffer.allocate(dataLength);
1583
1584
1585
1586
1587 incRpcCount();
1588 }
1589
1590 count = channelRead(channel, data);
1591
1592 if (count >= 0 && data.remaining() == 0) {
1593 process();
1594 }
1595
1596 return count;
1597 }
1598
1599
1600
1601
1602 private void process() throws IOException, InterruptedException {
1603 data.flip();
1604 try {
1605 if (skipInitialSaslHandshake) {
1606 skipInitialSaslHandshake = false;
1607 return;
1608 }
1609
1610 if (useSasl) {
1611 saslReadAndProcess(data.array());
1612 } else {
1613 processOneRpc(data.array());
1614 }
1615
1616 } finally {
1617 dataLengthBuffer.clear();
1618 data = null;
1619 }
1620 }
1621
1622 private String getFatalConnectionString(final int version, final byte authByte) {
1623 return "serverVersion=" + CURRENT_VERSION +
1624 ", clientVersion=" + version + ", authMethod=" + authByte +
1625 ", authSupported=" + (authMethod != null) + " from " + toString();
1626 }
1627
1628 private int doBadPreambleHandling(final String msg) throws IOException {
1629 return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1630 }
1631
1632 private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1633 LOG.warn(msg);
1634 Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
1635 setupResponse(null, fakeCall, e, msg);
1636 responder.doRespond(fakeCall);
1637
1638 return -1;
1639 }
1640
1641
1642 private void processConnectionHeader(byte[] buf) throws IOException {
1643 this.connectionHeader = ConnectionHeader.parseFrom(buf);
1644 String serviceName = connectionHeader.getServiceName();
1645 if (serviceName == null) throw new EmptyServiceNameException();
1646 this.service = getService(services, serviceName);
1647 if (this.service == null) throw new UnknownServiceException(serviceName);
1648 setupCellBlockCodecs(this.connectionHeader);
1649 UserGroupInformation protocolUser = createUser(connectionHeader);
1650 if (!useSasl) {
1651 ugi = protocolUser;
1652 if (ugi != null) {
1653 ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1654 }
1655
1656 if (authenticatedWithFallback) {
1657 LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
1658 + " connecting from " + getHostAddress());
1659 }
1660 AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1661 } else {
1662
1663 ugi.setAuthenticationMethod(authMethod.authenticationMethod);
1664
1665
1666
1667 if ((protocolUser != null)
1668 && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
1669 if (authMethod == AuthMethod.DIGEST) {
1670
1671 throw new AccessDeniedException("Authenticated user (" + ugi
1672 + ") doesn't match what the client claims to be ("
1673 + protocolUser + ")");
1674 } else {
1675
1676
1677
1678 UserGroupInformation realUser = ugi;
1679 ugi = UserGroupInformation.createProxyUser(protocolUser
1680 .getUserName(), realUser);
1681
1682 ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
1683 }
1684 }
1685 }
1686 if (connectionHeader.hasVersionInfo()) {
1687
1688 retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
1689
1690 AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1691 + " with version info: "
1692 + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
1693 } else {
1694 AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1695 + " with unknown version info");
1696 }
1697
1698
1699 }
1700
1701
1702
1703
1704
1705 private void setupCellBlockCodecs(final ConnectionHeader header)
1706 throws FatalConnectionException {
1707
1708 if (!header.hasCellBlockCodecClass()) return;
1709 String className = header.getCellBlockCodecClass();
1710 if (className == null || className.length() == 0) return;
1711 try {
1712 this.codec = (Codec)Class.forName(className).newInstance();
1713 } catch (Exception e) {
1714 throw new UnsupportedCellCodecException(className, e);
1715 }
1716 if (!header.hasCellBlockCompressorClass()) return;
1717 className = header.getCellBlockCompressorClass();
1718 try {
1719 this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1720 } catch (Exception e) {
1721 throw new UnsupportedCompressionCodecException(className, e);
1722 }
1723 }
1724
1725 private void processUnwrappedData(byte[] inBuf) throws IOException,
1726 InterruptedException {
1727 ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1728
1729 while (true) {
1730 int count;
1731 if (unwrappedDataLengthBuffer.remaining() > 0) {
1732 count = channelRead(ch, unwrappedDataLengthBuffer);
1733 if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1734 return;
1735 }
1736
1737 if (unwrappedData == null) {
1738 unwrappedDataLengthBuffer.flip();
1739 int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1740
1741 if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1742 if (LOG.isDebugEnabled())
1743 LOG.debug("Received ping message");
1744 unwrappedDataLengthBuffer.clear();
1745 continue;
1746 }
1747 unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1748 }
1749
1750 count = channelRead(ch, unwrappedData);
1751 if (count <= 0 || unwrappedData.remaining() > 0)
1752 return;
1753
1754 if (unwrappedData.remaining() == 0) {
1755 unwrappedDataLengthBuffer.clear();
1756 unwrappedData.flip();
1757 processOneRpc(unwrappedData.array());
1758 unwrappedData = null;
1759 }
1760 }
1761 }
1762
1763 private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
1764 if (connectionHeaderRead) {
1765 processRequest(buf);
1766 } else {
1767 processConnectionHeader(buf);
1768 this.connectionHeaderRead = true;
1769 if (!authorizeConnection()) {
1770
1771
1772 throw new AccessDeniedException("Connection from " + this + " for service " +
1773 connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
1774 }
1775 this.user = userProvider.create(this.ugi);
1776 }
1777 }
1778
1779
1780
1781
1782
1783
1784
1785 protected void processRequest(byte[] buf) throws IOException, InterruptedException {
1786 long totalRequestSize = buf.length;
1787 int offset = 0;
1788
1789
1790 CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
1791 int headerSize = cis.readRawVarint32();
1792 offset = cis.getTotalBytesRead();
1793 Message.Builder builder = RequestHeader.newBuilder();
1794 ProtobufUtil.mergeFrom(builder, buf, offset, headerSize);
1795 RequestHeader header = (RequestHeader) builder.build();
1796 offset += headerSize;
1797 int id = header.getCallId();
1798 if (LOG.isTraceEnabled()) {
1799 LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1800 " totalRequestSize: " + totalRequestSize + " bytes");
1801 }
1802
1803
1804 if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1805 final Call callTooBig =
1806 new Call(id, this.service, null, null, null, null, this,
1807 responder, totalRequestSize, null, null);
1808 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1809 metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1810 InetSocketAddress address = getListenerAddress();
1811 setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
1812 "Call queue is full on " + (address != null ? address : "(channel closed)") +
1813 ", is hbase.ipc.server.max.callqueue.size too small?");
1814 responder.doRespond(callTooBig);
1815 return;
1816 }
1817 MethodDescriptor md = null;
1818 Message param = null;
1819 CellScanner cellScanner = null;
1820 try {
1821 if (header.hasRequestParam() && header.getRequestParam()) {
1822 md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1823 if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1824 builder = this.service.getRequestPrototype(md).newBuilderForType();
1825
1826 cis = CodedInputStream.newInstance(buf, offset, buf.length);
1827 int paramSize = cis.readRawVarint32();
1828 offset += cis.getTotalBytesRead();
1829 if (builder != null) {
1830 ProtobufUtil.mergeFrom(builder, buf, offset, paramSize);
1831 param = builder.build();
1832 }
1833 offset += paramSize;
1834 }
1835 if (header.hasCellBlockMeta()) {
1836 cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
1837 buf, offset, buf.length);
1838 }
1839 } catch (Throwable t) {
1840 InetSocketAddress address = getListenerAddress();
1841 String msg = (address != null ? address : "(channel closed)") +
1842 " is unable to read call parameter from client " + getHostAddress();
1843 LOG.warn(msg, t);
1844
1845 metrics.exception(t);
1846
1847
1848 if (t instanceof LinkageError) {
1849 t = new DoNotRetryIOException(t);
1850 }
1851
1852 if (t instanceof UnsupportedOperationException) {
1853 t = new DoNotRetryIOException(t);
1854 }
1855
1856 final Call readParamsFailedCall =
1857 new Call(id, this.service, null, null, null, null, this,
1858 responder, totalRequestSize, null, null);
1859 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1860 setupResponse(responseBuffer, readParamsFailedCall, t,
1861 msg + "; " + t.getMessage());
1862 responder.doRespond(readParamsFailedCall);
1863 return;
1864 }
1865
1866 TraceInfo traceInfo = header.hasTraceInfo()
1867 ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1868 : null;
1869 Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1870 totalRequestSize, traceInfo, this.addr);
1871
1872 if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
1873 callQueueSize.add(-1 * call.getSize());
1874
1875 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1876 metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1877 InetSocketAddress address = getListenerAddress();
1878 setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
1879 "Call queue is full on " + (address != null ? address : "(channel closed)") +
1880 ", too many items queued ?");
1881 responder.doRespond(call);
1882 }
1883 }
1884
1885 private boolean authorizeConnection() throws IOException {
1886 try {
1887
1888
1889
1890
1891 if (ugi != null && ugi.getRealUser() != null
1892 && (authMethod != AuthMethod.DIGEST)) {
1893 ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
1894 }
1895 authorize(ugi, connectionHeader, getHostInetAddress());
1896 metrics.authorizationSuccess();
1897 } catch (AuthorizationException ae) {
1898 if (LOG.isDebugEnabled()) {
1899 LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1900 }
1901 metrics.authorizationFailure();
1902 setupResponse(authFailedResponse, authFailedCall,
1903 new AccessDeniedException(ae), ae.getMessage());
1904 responder.doRespond(authFailedCall);
1905 return false;
1906 }
1907 return true;
1908 }
1909
1910 protected synchronized void close() {
1911 disposeSasl();
1912 data = null;
1913 if (!channel.isOpen())
1914 return;
1915 try {socket.shutdownOutput();} catch(Exception ignored) {
1916 if (LOG.isTraceEnabled()) {
1917 LOG.trace(ignored);
1918 }
1919 }
1920 if (channel.isOpen()) {
1921 try {channel.close();} catch(Exception ignored) {
1922 if (LOG.isTraceEnabled()) {
1923 LOG.trace(ignored);
1924 }
1925 }
1926 }
1927 try {socket.close();} catch(Exception ignored) {
1928 if (LOG.isTraceEnabled()) {
1929 LOG.trace(ignored);
1930 }
1931 }
1932 }
1933
1934 private UserGroupInformation createUser(ConnectionHeader head) {
1935 UserGroupInformation ugi = null;
1936
1937 if (!head.hasUserInfo()) {
1938 return null;
1939 }
1940 UserInformation userInfoProto = head.getUserInfo();
1941 String effectiveUser = null;
1942 if (userInfoProto.hasEffectiveUser()) {
1943 effectiveUser = userInfoProto.getEffectiveUser();
1944 }
1945 String realUser = null;
1946 if (userInfoProto.hasRealUser()) {
1947 realUser = userInfoProto.getRealUser();
1948 }
1949 if (effectiveUser != null) {
1950 if (realUser != null) {
1951 UserGroupInformation realUserUgi =
1952 UserGroupInformation.createRemoteUser(realUser);
1953 ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1954 } else {
1955 ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1956 }
1957 }
1958 return ugi;
1959 }
1960 }
1961
1962
1963
1964
1965
1966
1967
1968 public static class BlockingServiceAndInterface {
1969 private final BlockingService service;
1970 private final Class<?> serviceInterface;
1971 public BlockingServiceAndInterface(final BlockingService service,
1972 final Class<?> serviceInterface) {
1973 this.service = service;
1974 this.serviceInterface = serviceInterface;
1975 }
1976 public Class<?> getServiceInterface() {
1977 return this.serviceInterface;
1978 }
1979 public BlockingService getBlockingService() {
1980 return this.service;
1981 }
1982 }
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994 public RpcServer(final Server server, final String name,
1995 final List<BlockingServiceAndInterface> services,
1996 final InetSocketAddress bindAddress, Configuration conf,
1997 RpcScheduler scheduler)
1998 throws IOException {
1999
2000 if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
2001 this.reservoir = new BoundedByteBufferPool(
2002 conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
2003 conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
2004
2005 conf.getInt("hbase.ipc.server.reservoir.initial.max",
2006 conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
2007 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
2008 } else {
2009 reservoir = null;
2010 }
2011
2012 this.server = server;
2013 this.services = services;
2014 this.bindAddress = bindAddress;
2015 this.conf = conf;
2016 this.socketSendBufferSize = 0;
2017 this.maxQueueSize =
2018 this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
2019 this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
2020 this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
2021 this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
2022 this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
2023 this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2024 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
2025 this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
2026 this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
2027
2028
2029 listener = new Listener(name);
2030 this.port = listener.getAddress().getPort();
2031
2032 this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
2033 this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
2034 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
2035
2036 this.ipcUtil = new IPCUtil(conf);
2037
2038
2039
2040 responder = new Responder();
2041 this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
2042 this.userProvider = UserProvider.instantiate(conf);
2043 this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
2044 if (isSecurityEnabled) {
2045 HBaseSaslRpcServer.init(conf);
2046 }
2047 initReconfigurable(conf);
2048
2049 this.scheduler = scheduler;
2050 this.scheduler.init(new RpcSchedulerContext(this));
2051 }
2052
2053 @Override
2054 public void onConfigurationChange(Configuration newConf) {
2055 initReconfigurable(newConf);
2056 }
2057
2058 private void initReconfigurable(Configuration confToLoad) {
2059 this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
2060 if (isSecurityEnabled && allowFallbackToSimpleAuth) {
2061 LOG.warn("********* WARNING! *********");
2062 LOG.warn("This server is configured to allow connections from INSECURE clients");
2063 LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
2064 LOG.warn("While this option is enabled, client identities cannot be secured, and user");
2065 LOG.warn("impersonation is possible!");
2066 LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
2067 LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
2068 LOG.warn("****************************");
2069 }
2070 }
2071
2072
2073
2074
2075
2076 protected Connection getConnection(SocketChannel channel, long time) {
2077 return new Connection(channel, time);
2078 }
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088 private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
2089 throws IOException {
2090 if (response != null) response.reset();
2091 call.setResponse(null, null, t, error);
2092 }
2093
2094 protected void closeConnection(Connection connection) {
2095 synchronized (connectionList) {
2096 if (connectionList.remove(connection)) {
2097 numConnections--;
2098 }
2099 }
2100 connection.close();
2101 }
2102
2103 Configuration getConf() {
2104 return conf;
2105 }
2106
2107
2108
2109
2110 @Override
2111 public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2112
2113 @Override
2114 public boolean isStarted() {
2115 return this.started;
2116 }
2117
2118
2119 @Override
2120 public synchronized void start() {
2121 if (started) return;
2122 authTokenSecretMgr = createSecretManager();
2123 if (authTokenSecretMgr != null) {
2124 setSecretManager(authTokenSecretMgr);
2125 authTokenSecretMgr.start();
2126 }
2127 this.authManager = new ServiceAuthorizationManager();
2128 HBasePolicyProvider.init(conf, authManager);
2129 responder.start();
2130 listener.start();
2131 scheduler.start();
2132 started = true;
2133 }
2134
2135 @Override
2136 public synchronized void refreshAuthManager(PolicyProvider pp) {
2137
2138
2139 this.authManager.refresh(this.conf, pp);
2140 }
2141
2142 private AuthenticationTokenSecretManager createSecretManager() {
2143 if (!isSecurityEnabled) return null;
2144 if (server == null) return null;
2145 Configuration conf = server.getConfiguration();
2146 long keyUpdateInterval =
2147 conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2148 long maxAge =
2149 conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2150 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2151 server.getServerName().toString(), keyUpdateInterval, maxAge);
2152 }
2153
2154 public SecretManager<? extends TokenIdentifier> getSecretManager() {
2155 return this.secretManager;
2156 }
2157
2158 @SuppressWarnings("unchecked")
2159 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2160 this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2161 }
2162
2163
2164
2165
2166
2167
2168 @Override
2169 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2170 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2171 throws IOException {
2172 try {
2173 status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2174
2175 status.setRPCPacket(param);
2176 status.resume("Servicing call");
2177
2178 long startTime = System.currentTimeMillis();
2179 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2180 Message result = service.callBlockingMethod(md, controller, param);
2181 long endTime = System.currentTimeMillis();
2182 int processingTime = (int) (endTime - startTime);
2183 int qTime = (int) (startTime - receiveTime);
2184 int totalTime = (int) (endTime - receiveTime);
2185 if (LOG.isTraceEnabled()) {
2186 LOG.trace(CurCall.get().toString() +
2187 ", response " + TextFormat.shortDebugString(result) +
2188 " queueTime: " + qTime +
2189 " processingTime: " + processingTime +
2190 " totalTime: " + totalTime);
2191 }
2192 long requestSize = param.getSerializedSize();
2193 long responseSize = result.getSerializedSize();
2194 metrics.dequeuedCall(qTime);
2195 metrics.processedCall(processingTime);
2196 metrics.totalCall(totalTime);
2197 metrics.receivedRequest(requestSize);
2198 metrics.sentResponse(responseSize);
2199
2200
2201 boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2202 boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2203 if (tooSlow || tooLarge) {
2204
2205
2206 logResponse(new Object[]{param},
2207 md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
2208 (tooLarge ? "TooLarge" : "TooSlow"),
2209 status.getClient(), startTime, processingTime, qTime,
2210 responseSize);
2211 }
2212 return new Pair<Message, CellScanner>(result, controller.cellScanner());
2213 } catch (Throwable e) {
2214
2215
2216
2217 if (e instanceof ServiceException) e = e.getCause();
2218
2219
2220 metrics.exception(e);
2221
2222 if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2223 if (e instanceof IOException) throw (IOException)e;
2224 LOG.error("Unexpected throwable object ", e);
2225 throw new IOException(e.getMessage(), e);
2226 }
2227 }
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243 void logResponse(Object[] params, String methodName, String call, String tag,
2244 String clientAddress, long startTime, int processingTime, int qTime,
2245 long responseSize)
2246 throws IOException {
2247
2248 Map<String, Object> responseInfo = new HashMap<String, Object>();
2249 responseInfo.put("starttimems", startTime);
2250 responseInfo.put("processingtimems", processingTime);
2251 responseInfo.put("queuetimems", qTime);
2252 responseInfo.put("responsesize", responseSize);
2253 responseInfo.put("client", clientAddress);
2254 responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
2255 responseInfo.put("method", methodName);
2256 if (params.length == 2 && server instanceof HRegionServer &&
2257 params[0] instanceof byte[] &&
2258 params[1] instanceof Operation) {
2259
2260
2261 TableName tableName = TableName.valueOf(
2262 HRegionInfo.parseRegionName((byte[]) params[0])[0]);
2263 responseInfo.put("table", tableName.getNameAsString());
2264
2265 responseInfo.putAll(((Operation) params[1]).toMap());
2266
2267 LOG.warn("(operation" + tag + "): " +
2268 MAPPER.writeValueAsString(responseInfo));
2269 } else if (params.length == 1 && server instanceof HRegionServer &&
2270 params[0] instanceof Operation) {
2271
2272 responseInfo.putAll(((Operation) params[0]).toMap());
2273
2274 LOG.warn("(operation" + tag + "): " +
2275 MAPPER.writeValueAsString(responseInfo));
2276 } else {
2277
2278
2279 responseInfo.put("call", call);
2280 LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2281 }
2282 }
2283
2284
2285 @Override
2286 public synchronized void stop() {
2287 LOG.info("Stopping server on " + port);
2288 running = false;
2289 if (authTokenSecretMgr != null) {
2290 authTokenSecretMgr.stop();
2291 authTokenSecretMgr = null;
2292 }
2293 listener.interrupt();
2294 listener.doStop();
2295 responder.interrupt();
2296 scheduler.stop();
2297 notifyAll();
2298 }
2299
2300
2301
2302
2303
2304
2305 @Override
2306 public synchronized void join() throws InterruptedException {
2307 while (running) {
2308 wait();
2309 }
2310 }
2311
2312
2313
2314
2315
2316
2317
2318 @Override
2319 public synchronized InetSocketAddress getListenerAddress() {
2320 if (listener == null) {
2321 return null;
2322 }
2323 return listener.getAddress();
2324 }
2325
2326
2327
2328
2329
2330 @Override
2331 public void setErrorHandler(HBaseRPCErrorHandler handler) {
2332 this.errorHandler = handler;
2333 }
2334
2335 @Override
2336 public HBaseRPCErrorHandler getErrorHandler() {
2337 return this.errorHandler;
2338 }
2339
2340
2341
2342
2343 @Override
2344 public MetricsHBaseServer getMetrics() {
2345 return metrics;
2346 }
2347
2348 @Override
2349 public void addCallSize(final long diff) {
2350 this.callQueueSize.add(diff);
2351 }
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362 public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
2363 InetAddress addr)
2364 throws AuthorizationException {
2365 if (authorize) {
2366 Class<?> c = getServiceInterface(services, connection.getServiceName());
2367 this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2368 }
2369 }
2370
2371
2372
2373
2374
2375
2376 private static int NIO_BUFFER_LIMIT = 64 * 1024;
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392 protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2393 throws IOException {
2394 long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
2395 if (count > 0) this.metrics.sentBytes(count);
2396 return count;
2397 }
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411 protected int channelRead(ReadableByteChannel channel,
2412 ByteBuffer buffer) throws IOException {
2413
2414 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2415 channel.read(buffer) : channelIO(channel, null, buffer);
2416 if (count > 0) {
2417 metrics.receivedBytes(count);
2418 }
2419 return count;
2420 }
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435 private static int channelIO(ReadableByteChannel readCh,
2436 WritableByteChannel writeCh,
2437 ByteBuffer buf) throws IOException {
2438
2439 int originalLimit = buf.limit();
2440 int initialRemaining = buf.remaining();
2441 int ret = 0;
2442
2443 while (buf.remaining() > 0) {
2444 try {
2445 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2446 buf.limit(buf.position() + ioSize);
2447
2448 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2449
2450 if (ret < ioSize) {
2451 break;
2452 }
2453
2454 } finally {
2455 buf.limit(originalLimit);
2456 }
2457 }
2458
2459 int nBytes = initialRemaining - buf.remaining();
2460 return (nBytes > 0) ? nBytes : ret;
2461 }
2462
2463
2464
2465
2466
2467
2468
2469 public static RpcCallContext getCurrentCall() {
2470 return CurCall.get();
2471 }
2472
2473 public static boolean isInRpcCallContext() {
2474 return CurCall.get() != null;
2475 }
2476
2477
2478
2479
2480
2481
2482 public static User getRequestUser() {
2483 RpcCallContext ctx = getCurrentCall();
2484 return ctx == null? null: ctx.getRequestUser();
2485 }
2486
2487
2488
2489
2490
2491 public static String getRequestUserName() {
2492 User user = getRequestUser();
2493 return user == null? null: user.getShortName();
2494 }
2495
2496
2497
2498
2499 public static InetAddress getRemoteAddress() {
2500 RpcCallContext ctx = getCurrentCall();
2501 return ctx == null? null: ctx.getRemoteAddress();
2502 }
2503
2504
2505
2506
2507
2508
2509 static BlockingServiceAndInterface getServiceAndInterface(
2510 final List<BlockingServiceAndInterface> services, final String serviceName) {
2511 for (BlockingServiceAndInterface bs : services) {
2512 if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2513 return bs;
2514 }
2515 }
2516 return null;
2517 }
2518
2519
2520
2521
2522
2523
2524 static Class<?> getServiceInterface(
2525 final List<BlockingServiceAndInterface> services,
2526 final String serviceName) {
2527 BlockingServiceAndInterface bsasi =
2528 getServiceAndInterface(services, serviceName);
2529 return bsasi == null? null: bsasi.getServiceInterface();
2530 }
2531
2532
2533
2534
2535
2536
2537 static BlockingService getService(
2538 final List<BlockingServiceAndInterface> services,
2539 final String serviceName) {
2540 BlockingServiceAndInterface bsasi =
2541 getServiceAndInterface(services, serviceName);
2542 return bsasi == null? null: bsasi.getBlockingService();
2543 }
2544
2545 static MonitoredRPCHandler getStatus() {
2546
2547 MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
2548 if (status != null) {
2549 return status;
2550 }
2551 status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
2552 status.pause("Waiting for a call");
2553 RpcServer.MONITORED_RPC.set(status);
2554 return status;
2555 }
2556
2557
2558
2559
2560
2561 public static InetAddress getRemoteIp() {
2562 Call call = CurCall.get();
2563 if (call != null && call.connection != null && call.connection.socket != null) {
2564 return call.connection.socket.getInetAddress();
2565 }
2566 return null;
2567 }
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580 public static void bind(ServerSocket socket, InetSocketAddress address,
2581 int backlog) throws IOException {
2582 try {
2583 socket.bind(address, backlog);
2584 } catch (BindException e) {
2585 BindException bindException =
2586 new BindException("Problem binding to " + address + " : " +
2587 e.getMessage());
2588 bindException.initCause(e);
2589 throw bindException;
2590 } catch (SocketException e) {
2591
2592
2593 if ("Unresolved address".equals(e.getMessage())) {
2594 throw new UnknownHostException("Invalid hostname for server: " +
2595 address.getHostName());
2596 }
2597 throw e;
2598 }
2599 }
2600
2601 @Override
2602 public RpcScheduler getScheduler() {
2603 return scheduler;
2604 }
2605 }