1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.ipc;
21
22 import java.io.BufferedInputStream;
23 import java.io.BufferedOutputStream;
24 import java.io.DataInputStream;
25 import java.io.DataOutputStream;
26 import java.io.FilterInputStream;
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.io.OutputStream;
30 import java.net.ConnectException;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketAddress;
34 import java.net.SocketException;
35 import java.net.SocketTimeoutException;
36 import java.net.UnknownHostException;
37 import java.nio.ByteBuffer;
38 import java.security.PrivilegedExceptionAction;
39 import java.util.HashMap;
40 import java.util.Iterator;
41 import java.util.LinkedList;
42 import java.util.Map;
43 import java.util.Map.Entry;
44 import java.util.Random;
45 import java.util.concurrent.ConcurrentSkipListMap;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicLong;
48
49 import javax.net.SocketFactory;
50 import javax.security.sasl.SaslException;
51
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54 import org.apache.hadoop.classification.InterfaceAudience;
55 import org.apache.hadoop.classification.InterfaceStability;
56 import org.apache.hadoop.conf.Configuration;
57 import org.apache.hadoop.hbase.CellScanner;
58 import org.apache.hadoop.hbase.DoNotRetryIOException;
59 import org.apache.hadoop.hbase.HBaseIOException;
60 import org.apache.hadoop.hbase.HConstants;
61 import org.apache.hadoop.hbase.ServerName;
62 import org.apache.hadoop.hbase.codec.Codec;
63 import org.apache.hadoop.hbase.codec.KeyValueCodec;
64 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
65 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
66 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
67 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
68 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
69 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
70 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
71 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
72 import org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo;
73 import org.apache.hadoop.hbase.security.AuthMethod;
74 import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
75 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
76 import org.apache.hadoop.hbase.security.SecurityInfo;
77 import org.apache.hadoop.hbase.security.User;
78 import org.apache.hadoop.hbase.security.UserProvider;
79 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
80 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
81 import org.apache.hadoop.hbase.util.ExceptionUtil;
82 import org.apache.hadoop.hbase.util.Pair;
83 import org.apache.hadoop.hbase.util.PoolMap;
84 import org.apache.hadoop.hbase.util.PoolMap.PoolType;
85 import org.apache.hadoop.io.IOUtils;
86 import org.apache.hadoop.io.Text;
87 import org.apache.hadoop.io.compress.CompressionCodec;
88 import org.apache.hadoop.ipc.RemoteException;
89 import org.apache.hadoop.net.NetUtils;
90 import org.apache.hadoop.security.SecurityUtil;
91 import org.apache.hadoop.security.UserGroupInformation;
92 import org.apache.hadoop.security.token.Token;
93 import org.apache.hadoop.security.token.TokenIdentifier;
94 import org.apache.hadoop.security.token.TokenSelector;
95 import org.cloudera.htrace.Span;
96 import org.cloudera.htrace.Trace;
97
98 import com.google.common.annotations.VisibleForTesting;
99 import com.google.protobuf.BlockingRpcChannel;
100 import com.google.protobuf.Descriptors.MethodDescriptor;
101 import com.google.protobuf.Message;
102 import com.google.protobuf.Message.Builder;
103 import com.google.protobuf.RpcController;
104 import com.google.protobuf.ServiceException;
105 import com.google.protobuf.TextFormat;
106
107
108
109
110
111
112 @InterfaceAudience.Private
113 public class RpcClient {
114
115
116 public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcClient");
117 protected final PoolMap<ConnectionId, Connection> connections;
118
119 protected int counter;
120 protected final AtomicBoolean running = new AtomicBoolean(true);
121 final protected Configuration conf;
122 final protected int maxIdleTime;
123
124 final protected int maxRetries;
125 final protected long failureSleep;
126 protected final boolean tcpNoDelay;
127 protected final boolean tcpKeepAlive;
128 protected int pingInterval;
129 protected FailedServers failedServers;
130 private final Codec codec;
131 private final CompressionCodec compressor;
132 private final IPCUtil ipcUtil;
133
134 protected final SocketFactory socketFactory;
135 protected String clusterId;
136 protected final SocketAddress localAddr;
137
138 private final boolean fallbackAllowed;
139 private UserProvider userProvider;
140
141 final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
142 final private static String SOCKET_TIMEOUT = "ipc.socket.timeout";
143 final static int DEFAULT_PING_INTERVAL = 60000;
144 final static int DEFAULT_SOCKET_TIMEOUT = 20000;
145 final static int PING_CALL_ID = -1;
146
147 public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
148 public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
149
150 public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY =
151 "hbase.ipc.client.fallback-to-simple-auth-allowed";
152 public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
153
154
155
156
157
158
159 private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
160 @Override
161 protected Integer initialValue() {
162 return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
163 }
164 };
165
166
167
168
169 static class FailedServers {
170 private final LinkedList<Pair<Long, String>> failedServers = new
171 LinkedList<Pair<Long, java.lang.String>>();
172 private final int recheckServersTimeout;
173
174 FailedServers(Configuration conf) {
175 this.recheckServersTimeout = conf.getInt(
176 FAILED_SERVER_EXPIRY_KEY, FAILED_SERVER_EXPIRY_DEFAULT);
177 }
178
179
180
181
182 public synchronized void addToFailedServers(InetSocketAddress address) {
183 final long expiry = EnvironmentEdgeManager.currentTimeMillis() + recheckServersTimeout;
184 failedServers.addFirst(new Pair<Long, String>(expiry, address.toString()));
185 }
186
187
188
189
190
191
192 public synchronized boolean isFailedServer(final InetSocketAddress address) {
193 if (failedServers.isEmpty()) {
194 return false;
195 }
196
197 final String lookup = address.toString();
198 final long now = EnvironmentEdgeManager.currentTimeMillis();
199
200
201 Iterator<Pair<Long, String>> it = failedServers.iterator();
202 while (it.hasNext()) {
203 Pair<Long, String> cur = it.next();
204 if (cur.getFirst() < now) {
205 it.remove();
206 } else {
207 if (lookup.equals(cur.getSecond())) {
208 return true;
209 }
210 }
211 }
212
213 return false;
214 }
215 }
216
217 @SuppressWarnings("serial")
218 @InterfaceAudience.Public
219 @InterfaceStability.Evolving
220
221 public static class FailedServerException extends HBaseIOException {
222 public FailedServerException(String s) {
223 super(s);
224 }
225 }
226
227
228
229
230
231
232
233
234
235 public static void setPingInterval(Configuration conf, int pingInterval) {
236 conf.setInt(PING_INTERVAL_NAME, pingInterval);
237 }
238
239
240
241
242
243
244
245
246 static int getPingInterval(Configuration conf) {
247 return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
248 }
249
250
251
252
253
254
255 public static void setSocketTimeout(Configuration conf, int socketTimeout) {
256 conf.setInt(SOCKET_TIMEOUT, socketTimeout);
257 }
258
259
260
261
262 static int getSocketTimeout(Configuration conf) {
263 return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
264 }
265
266
267 protected class Call {
268 final int id;
269 final Message param;
270
271
272
273
274 CellScanner cells;
275 Message response;
276
277 Message responseDefaultType;
278 IOException error;
279 boolean done;
280 long startTime;
281 final MethodDescriptor md;
282
283 protected Call(final MethodDescriptor md, Message param, final CellScanner cells,
284 final Message responseDefaultType) {
285 this.param = param;
286 this.md = md;
287 this.cells = cells;
288 this.startTime = System.currentTimeMillis();
289 this.responseDefaultType = responseDefaultType;
290 synchronized (RpcClient.this) {
291 this.id = counter++;
292 }
293 }
294
295 @Override
296 public String toString() {
297 return "callId: " + this.id + " methodName: " + this.md.getName() + " param {" +
298 (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + "}";
299 }
300
301
302
303 protected synchronized void callComplete() {
304 this.done = true;
305 notify();
306 }
307
308
309
310
311
312
313 public void setException(IOException error) {
314 this.error = error;
315 callComplete();
316 }
317
318
319
320
321
322
323
324
325 public void setResponse(Message response, final CellScanner cells) {
326 this.response = response;
327 this.cells = cells;
328 callComplete();
329 }
330
331 public long getStartTime() {
332 return this.startTime;
333 }
334 }
335
336 protected final static Map<AuthenticationProtos.TokenIdentifier.Kind,
337 TokenSelector<? extends TokenIdentifier>> tokenHandlers =
338 new HashMap<AuthenticationProtos.TokenIdentifier.Kind, TokenSelector<? extends TokenIdentifier>>();
339 static {
340 tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
341 new AuthenticationTokenSelector());
342 }
343
344
345
346
347
348 protected Connection createConnection(ConnectionId remoteId, final Codec codec,
349 final CompressionCodec compressor)
350 throws IOException {
351 return new Connection(remoteId, codec, compressor);
352 }
353
354
355
356
357 protected class Connection extends Thread {
358 private ConnectionHeader header;
359 protected ConnectionId remoteId;
360 protected Socket socket = null;
361 protected DataInputStream in;
362 protected DataOutputStream out;
363 private InetSocketAddress server;
364 private String serverPrincipal;
365 private AuthMethod authMethod;
366 private boolean useSasl;
367 private Token<? extends TokenIdentifier> token;
368 private HBaseSaslRpcClient saslRpcClient;
369 private int reloginMaxBackoff;
370 private final Codec codec;
371 private final CompressionCodec compressor;
372
373
374 protected final ConcurrentSkipListMap<Integer, Call> calls =
375 new ConcurrentSkipListMap<Integer, Call>();
376 protected final AtomicLong lastActivity =
377 new AtomicLong();
378 protected final AtomicBoolean shouldCloseConnection =
379 new AtomicBoolean();
380 protected IOException closeException;
381
382 Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
383 throws IOException {
384 if (remoteId.getAddress().isUnresolved()) {
385 throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
386 }
387 this.server = remoteId.getAddress();
388 this.codec = codec;
389 this.compressor = compressor;
390
391 UserGroupInformation ticket = remoteId.getTicket().getUGI();
392 SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
393 this.useSasl = userProvider.isHBaseSecurityEnabled();
394 if (useSasl && securityInfo != null) {
395 AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
396 if (tokenKind != null) {
397 TokenSelector<? extends TokenIdentifier> tokenSelector =
398 tokenHandlers.get(tokenKind);
399 if (tokenSelector != null) {
400 token = tokenSelector.selectToken(new Text(clusterId),
401 ticket.getTokens());
402 } else if (LOG.isDebugEnabled()) {
403 LOG.debug("No token selector found for type "+tokenKind);
404 }
405 }
406 String serverKey = securityInfo.getServerPrincipal();
407 if (serverKey == null) {
408 throw new IOException(
409 "Can't obtain server Kerberos config key from SecurityInfo");
410 }
411 serverPrincipal = SecurityUtil.getServerPrincipal(
412 conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase());
413 if (LOG.isDebugEnabled()) {
414 LOG.debug("RPC Server Kerberos principal name for service="
415 + remoteId.getServiceName() + " is " + serverPrincipal);
416 }
417 }
418
419 if (!useSasl) {
420 authMethod = AuthMethod.SIMPLE;
421 } else if (token != null) {
422 authMethod = AuthMethod.DIGEST;
423 } else {
424 authMethod = AuthMethod.KERBEROS;
425 }
426
427 if (LOG.isDebugEnabled()) {
428 LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName +
429 ", sasl=" + useSasl);
430 }
431 reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
432 this.remoteId = remoteId;
433
434 ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
435 builder.setServiceName(remoteId.getServiceName());
436 UserInformation userInfoPB;
437 if ((userInfoPB = getUserInfo(ticket)) != null) {
438 builder.setUserInfo(userInfoPB);
439 }
440 if (this.codec != null) {
441 builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());
442 }
443 if (this.compressor != null) {
444 builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
445 }
446 this.header = builder.build();
447
448 this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
449 remoteId.getAddress().toString() +
450 ((ticket==null)?" from an unknown user": (" from "
451 + ticket.getUserName())));
452 this.setDaemon(true);
453 }
454
455 private UserInformation getUserInfo(UserGroupInformation ugi) {
456 if (ugi == null || authMethod == AuthMethod.DIGEST) {
457
458 return null;
459 }
460 UserInformation.Builder userInfoPB = UserInformation.newBuilder();
461 if (authMethod == AuthMethod.KERBEROS) {
462
463 userInfoPB.setEffectiveUser(ugi.getUserName());
464 } else if (authMethod == AuthMethod.SIMPLE) {
465
466 userInfoPB.setEffectiveUser(ugi.getUserName());
467 if (ugi.getRealUser() != null) {
468 userInfoPB.setRealUser(ugi.getRealUser().getUserName());
469 }
470 }
471 return userInfoPB.build();
472 }
473
474
475 protected void touch() {
476 lastActivity.set(System.currentTimeMillis());
477 }
478
479
480
481
482
483
484
485
486
487 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
488 justification="Notify because new call available for processing")
489 protected synchronized void addCall(Call call) {
490
491
492
493 if (this.shouldCloseConnection.get()) {
494 if (this.closeException == null) {
495 call.setException(new IOException(
496 "Call " + call.id + " not added as the connection " + remoteId + " is closing"));
497 } else {
498 call.setException(this.closeException);
499 }
500 synchronized (call) {
501 call.notifyAll();
502 }
503 } else {
504 calls.put(call.id, call);
505 synchronized (call) {
506 notify();
507 }
508 }
509 }
510
511
512
513
514
515 protected class PingInputStream extends FilterInputStream {
516
517 protected PingInputStream(InputStream in) {
518 super(in);
519 }
520
521
522
523
524
525 private void handleTimeout(SocketTimeoutException e) throws IOException {
526 if (shouldCloseConnection.get() || !running.get() || remoteId.rpcTimeout > 0) {
527 throw e;
528 }
529 sendPing();
530 }
531
532
533
534
535
536
537 @Override
538 public int read() throws IOException {
539 do {
540 try {
541 return super.read();
542 } catch (SocketTimeoutException e) {
543 handleTimeout(e);
544 }
545 } while (true);
546 }
547
548
549
550
551
552
553
554 @Override
555 public int read(byte[] buf, int off, int len) throws IOException {
556 do {
557 try {
558 return super.read(buf, off, len);
559 } catch (SocketTimeoutException e) {
560 handleTimeout(e);
561 }
562 } while (true);
563 }
564 }
565
566 protected synchronized void setupConnection() throws IOException {
567 short ioFailures = 0;
568 short timeoutFailures = 0;
569 while (true) {
570 try {
571 this.socket = socketFactory.createSocket();
572 this.socket.setTcpNoDelay(tcpNoDelay);
573 this.socket.setKeepAlive(tcpKeepAlive);
574 if (localAddr != null) {
575 this.socket.bind(localAddr);
576 }
577
578 NetUtils.connect(this.socket, remoteId.getAddress(),
579 getSocketTimeout(conf));
580 if (remoteId.rpcTimeout > 0) {
581 pingInterval = remoteId.rpcTimeout;
582 }
583 this.socket.setSoTimeout(pingInterval);
584 return;
585 } catch (SocketTimeoutException toe) {
586
587
588
589 handleConnectionFailure(timeoutFailures++, maxRetries, toe);
590 } catch (IOException ie) {
591 handleConnectionFailure(ioFailures++, maxRetries, ie);
592 }
593 }
594 }
595
596 protected void closeConnection() {
597 if (socket == null) {
598 return;
599 }
600
601
602 try {
603 if (socket.getOutputStream() != null) {
604 socket.getOutputStream().close();
605 }
606 } catch (IOException ignored) {
607 }
608 try {
609 if (socket.getInputStream() != null) {
610 socket.getInputStream().close();
611 }
612 } catch (IOException ignored) {
613 }
614 try {
615 if (socket.getChannel() != null) {
616 socket.getChannel().close();
617 }
618 } catch (IOException ignored) {
619 }
620 try {
621 socket.close();
622 } catch (IOException e) {
623 LOG.warn("Not able to close a socket", e);
624 }
625
626
627
628 socket = null;
629 }
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646 private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
647 throws IOException {
648 closeConnection();
649
650
651 if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
652 throw ioe;
653 }
654
655
656 try {
657 Thread.sleep(failureSleep);
658 } catch (InterruptedException ie) {
659 ExceptionUtil.rethrowIfInterrupt(ie);
660 }
661
662 LOG.info("Retrying connect to server: " + remoteId.getAddress() +
663 " after sleeping " + failureSleep + "ms. Already tried " + curRetries +
664 " time(s).");
665 }
666
667
668
669
670
671
672
673 protected synchronized boolean waitForWork() {
674 if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
675 long timeout = maxIdleTime - (System.currentTimeMillis()-lastActivity.get());
676 if (timeout>0) {
677 try {
678 wait(timeout);
679 } catch (InterruptedException ie) {
680 Thread.currentThread().interrupt();
681 }
682 }
683 }
684
685 if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
686 return true;
687 } else if (shouldCloseConnection.get()) {
688 return false;
689 } else if (calls.isEmpty()) {
690 markClosed(null);
691 return false;
692 } else {
693 markClosed((IOException)new IOException().initCause(
694 new InterruptedException()));
695 return false;
696 }
697 }
698
699 public InetSocketAddress getRemoteAddress() {
700 return remoteId.getAddress();
701 }
702
703
704
705
706 protected synchronized void sendPing() throws IOException {
707
708 long curTime = System.currentTimeMillis();
709 if ( curTime - lastActivity.get() >= pingInterval) {
710 lastActivity.set(curTime);
711
712 synchronized (this.out) {
713 out.writeInt(PING_CALL_ID);
714 out.flush();
715 }
716 }
717 }
718
719 @Override
720 public void run() {
721 if (LOG.isDebugEnabled()) {
722 LOG.debug(getName() + ": starting, connections " + connections.size());
723 }
724
725 try {
726 while (waitForWork()) {
727 readResponse();
728 }
729 } catch (Throwable t) {
730 LOG.warn(getName() + ": unexpected exception receiving call responses", t);
731 markClosed(new IOException("Unexpected exception receiving call responses", t));
732 }
733
734 close();
735
736 if (LOG.isDebugEnabled())
737 LOG.debug(getName() + ": stopped, connections " + connections.size());
738 }
739
740 private synchronized void disposeSasl() {
741 if (saslRpcClient != null) {
742 try {
743 saslRpcClient.dispose();
744 saslRpcClient = null;
745 } catch (IOException ioe) {
746 LOG.error("Error disposing of SASL client", ioe);
747 }
748 }
749 }
750
751 private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
752 UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
753 UserGroupInformation currentUser =
754 UserGroupInformation.getCurrentUser();
755 UserGroupInformation realUser = currentUser.getRealUser();
756 return authMethod == AuthMethod.KERBEROS &&
757 loginUser != null &&
758
759 loginUser.hasKerberosCredentials() &&
760
761
762 (loginUser.equals(currentUser) || loginUser.equals(realUser));
763 }
764
765 private synchronized boolean setupSaslConnection(final InputStream in2,
766 final OutputStream out2) throws IOException {
767 saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal, fallbackAllowed,
768 conf.get("hbase.rpc.protection",
769 QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
770 return saslRpcClient.saslConnect(in2, out2);
771 }
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791 private synchronized void handleSaslConnectionFailure(
792 final int currRetries,
793 final int maxRetries, final Exception ex, final Random rand,
794 final UserGroupInformation user)
795 throws IOException, InterruptedException{
796 user.doAs(new PrivilegedExceptionAction<Object>() {
797 public Object run() throws IOException, InterruptedException {
798 closeConnection();
799 if (shouldAuthenticateOverKrb()) {
800 if (currRetries < maxRetries) {
801 LOG.debug("Exception encountered while connecting to " +
802 "the server : " + ex);
803
804 if (UserGroupInformation.isLoginKeytabBased()) {
805 UserGroupInformation.getLoginUser().reloginFromKeytab();
806 } else {
807 UserGroupInformation.getLoginUser().reloginFromTicketCache();
808 }
809 disposeSasl();
810
811
812
813
814 Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
815 return null;
816 } else {
817 String msg = "Couldn't setup connection for " +
818 UserGroupInformation.getLoginUser().getUserName() +
819 " to " + serverPrincipal;
820 LOG.warn(msg);
821 throw (IOException) new IOException(msg).initCause(ex);
822 }
823 } else {
824 LOG.warn("Exception encountered while connecting to " +
825 "the server : " + ex);
826 }
827 if (ex instanceof RemoteException) {
828 throw (RemoteException)ex;
829 }
830 if (ex instanceof SaslException) {
831 String msg = "SASL authentication failed." +
832 " The most likely cause is missing or invalid credentials." +
833 " Consider 'kinit'.";
834 LOG.fatal(msg, ex);
835 throw new RuntimeException(msg, ex);
836 }
837 throw new IOException(ex);
838 }
839 });
840 }
841
842 protected synchronized void setupIOstreams()
843 throws IOException, InterruptedException {
844 if (socket != null || shouldCloseConnection.get()) {
845 return;
846 }
847
848 if (failedServers.isFailedServer(remoteId.getAddress())) {
849 if (LOG.isDebugEnabled()) {
850 LOG.debug("Not trying to connect to " + server +
851 " this server is in the failed servers list");
852 }
853 IOException e = new FailedServerException(
854 "This server is in the failed servers list: " + server);
855 markClosed(e);
856 close();
857 throw e;
858 }
859
860 try {
861 if (LOG.isDebugEnabled()) {
862 LOG.debug("Connecting to " + server);
863 }
864 short numRetries = 0;
865 final short MAX_RETRIES = 5;
866 Random rand = null;
867 while (true) {
868 setupConnection();
869 InputStream inStream = NetUtils.getInputStream(socket);
870
871
872
873 OutputStream outStream = NetUtils.getOutputStream(socket, pingInterval);
874
875 writeConnectionHeaderPreamble(outStream);
876 if (useSasl) {
877 final InputStream in2 = inStream;
878 final OutputStream out2 = outStream;
879 UserGroupInformation ticket = remoteId.getTicket().getUGI();
880 if (authMethod == AuthMethod.KERBEROS) {
881 if (ticket != null && ticket.getRealUser() != null) {
882 ticket = ticket.getRealUser();
883 }
884 }
885 boolean continueSasl = false;
886 if (ticket == null) throw new FatalConnectionException("ticket/user is null");
887 try {
888 continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
889 @Override
890 public Boolean run() throws IOException {
891 return setupSaslConnection(in2, out2);
892 }
893 });
894 } catch (Exception ex) {
895 if (rand == null) {
896 rand = new Random();
897 }
898 handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand, ticket);
899 continue;
900 }
901 if (continueSasl) {
902
903 inStream = saslRpcClient.getInputStream(inStream);
904 outStream = saslRpcClient.getOutputStream(outStream);
905 } else {
906
907 authMethod = AuthMethod.SIMPLE;
908 useSasl = false;
909 }
910 }
911 this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream)));
912 this.out = new DataOutputStream(new BufferedOutputStream(outStream));
913
914 writeConnectionHeader();
915
916
917 touch();
918
919
920 start();
921 return;
922 }
923 } catch (Throwable t) {
924 failedServers.addToFailedServers(remoteId.address);
925 IOException e = null;
926 if (t instanceof LinkageError) {
927
928 e = new DoNotRetryIOException(t);
929 markClosed(e);
930 } else if (t instanceof IOException) {
931 e = (IOException)t;
932 markClosed(e);
933 } else {
934 e = new IOException("Could not set up IO Streams", t);
935 markClosed(e);
936 }
937 close();
938 throw e;
939 }
940 }
941
942
943
944
945 private void writeConnectionHeaderPreamble(OutputStream outStream) throws IOException {
946
947
948
949
950
951 int rpcHeaderLen = HConstants.RPC_HEADER.array().length;
952 byte [] preamble = new byte [rpcHeaderLen + 2];
953 System.arraycopy(HConstants.RPC_HEADER.array(), 0, preamble, 0, rpcHeaderLen);
954 preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
955 preamble[rpcHeaderLen + 1] = authMethod.code;
956 outStream.write(preamble);
957 outStream.flush();
958 }
959
960
961
962
963
964 private void writeConnectionHeader() throws IOException {
965 synchronized (this.out) {
966 this.out.writeInt(this.header.getSerializedSize());
967 this.header.writeTo(this.out);
968 this.out.flush();
969 }
970 }
971
972
973 protected synchronized void close() {
974 if (!shouldCloseConnection.get()) {
975 LOG.error(getName() + ": the connection is not in the closed state");
976 return;
977 }
978
979
980
981 synchronized (connections) {
982 connections.removeValue(remoteId, this);
983 }
984
985
986 if (this.out != null) {
987 synchronized(this.out) {
988 IOUtils.closeStream(out);
989 this.out = null;
990 }
991 }
992 IOUtils.closeStream(in);
993 this.in = null;
994 disposeSasl();
995
996
997 if (closeException == null) {
998 if (!calls.isEmpty()) {
999 LOG.warn(getName() + ": connection is closed for no cause and calls are not empty. " +
1000 "#Calls: " + calls.size());
1001
1002
1003 closeException = new IOException("Unexpected closed connection");
1004 cleanupCalls();
1005 }
1006 } else {
1007
1008 if (LOG.isDebugEnabled()) {
1009 LOG.debug(getName() + ": closing ipc connection to " + server + ": " +
1010 closeException.getMessage(), closeException);
1011 }
1012
1013
1014 cleanupCalls();
1015 }
1016 if (LOG.isDebugEnabled())
1017 LOG.debug(getName() + ": closed");
1018 }
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028 protected void writeRequest(Call call, final int priority) {
1029 if (shouldCloseConnection.get()) return;
1030 try {
1031 RequestHeader.Builder builder = RequestHeader.newBuilder();
1032 builder.setCallId(call.id);
1033 if (Trace.isTracing()) {
1034 Span s = Trace.currentSpan();
1035 builder.setTraceInfo(RPCTInfo.newBuilder().
1036 setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
1037 }
1038 builder.setMethodName(call.md.getName());
1039 builder.setRequestParam(call.param != null);
1040 ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
1041 if (cellBlock != null) {
1042 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
1043 cellBlockBuilder.setLength(cellBlock.limit());
1044 builder.setCellBlockMeta(cellBlockBuilder.build());
1045 }
1046
1047 if (priority != 0) builder.setPriority(priority);
1048
1049 RequestHeader header = builder.build();
1050 synchronized (this.out) {
1051 IPCUtil.write(this.out, header, call.param, cellBlock);
1052 }
1053 if (LOG.isDebugEnabled()) {
1054 LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header));
1055 }
1056 } catch(IOException e) {
1057 markClosed(e);
1058 }
1059 }
1060
1061
1062
1063
1064 protected void readResponse() {
1065 if (shouldCloseConnection.get()) return;
1066 touch();
1067 int totalSize = -1;
1068 try {
1069
1070
1071 totalSize = in.readInt();
1072
1073
1074 ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
1075 int id = responseHeader.getCallId();
1076 if (LOG.isDebugEnabled()) {
1077 LOG.debug(getName() + ": got response header " +
1078 TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes");
1079 }
1080 Call call = calls.get(id);
1081 if (call == null) {
1082
1083
1084
1085
1086
1087 int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
1088 int whatIsLeftToRead = totalSize - readSoFar;
1089 LOG.debug("Unknown callId: " + id + ", skipping over this response of " +
1090 whatIsLeftToRead + " bytes");
1091 IOUtils.skipFully(in, whatIsLeftToRead);
1092 }
1093 if (responseHeader.hasException()) {
1094 ExceptionResponse exceptionResponse = responseHeader.getException();
1095 RemoteException re = createRemoteException(exceptionResponse);
1096 if (isFatalConnectionException(exceptionResponse)) {
1097 markClosed(re);
1098 } else {
1099 if (call != null) call.setException(re);
1100 }
1101 } else {
1102 Message value = null;
1103
1104 if (call != null && call.responseDefaultType != null) {
1105 Builder builder = call.responseDefaultType.newBuilderForType();
1106 builder.mergeDelimitedFrom(in);
1107 value = builder.build();
1108 }
1109 CellScanner cellBlockScanner = null;
1110 if (responseHeader.hasCellBlockMeta()) {
1111 int size = responseHeader.getCellBlockMeta().getLength();
1112 byte [] cellBlock = new byte[size];
1113 IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
1114 cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
1115 }
1116
1117
1118 if (call != null) call.setResponse(value, cellBlockScanner);
1119 }
1120 if (call != null) calls.remove(id);
1121 } catch (IOException e) {
1122 if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
1123
1124
1125
1126 closeException = e;
1127 } else {
1128
1129 markClosed(e);
1130 }
1131 } finally {
1132 if (remoteId.rpcTimeout > 0) {
1133 cleanupCalls(remoteId.rpcTimeout);
1134 }
1135 }
1136 }
1137
1138
1139
1140
1141
1142 private boolean isFatalConnectionException(final ExceptionResponse e) {
1143 return e.getExceptionClassName().
1144 equals(FatalConnectionException.class.getName());
1145 }
1146
1147
1148
1149
1150
1151 private RemoteException createRemoteException(final ExceptionResponse e) {
1152 String innerExceptionClassName = e.getExceptionClassName();
1153 boolean doNotRetry = e.getDoNotRetry();
1154 return e.hasHostname()?
1155
1156 new RemoteWithExtrasException(innerExceptionClassName,
1157 e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):
1158 new RemoteWithExtrasException(innerExceptionClassName,
1159 e.getStackTrace(), doNotRetry);
1160 }
1161
1162 protected synchronized void markClosed(IOException e) {
1163 if (shouldCloseConnection.compareAndSet(false, true)) {
1164 closeException = e;
1165 notifyAll();
1166 }
1167 }
1168
1169
1170 protected void cleanupCalls() {
1171 cleanupCalls(0);
1172 }
1173
1174 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
1175 justification="Notify because timedout")
1176 protected void cleanupCalls(long rpcTimeout) {
1177 Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
1178 while (itor.hasNext()) {
1179 Call c = itor.next().getValue();
1180 long waitTime = System.currentTimeMillis() - c.getStartTime();
1181 if (waitTime >= rpcTimeout) {
1182 if (this.closeException == null) {
1183
1184
1185
1186
1187
1188
1189 this.closeException = new CallTimeoutException("Call id=" + c.id +
1190 ", waitTime=" + waitTime + ", rpcTimetout=" + rpcTimeout);
1191 }
1192 c.setException(this.closeException);
1193 synchronized (c) {
1194 c.notifyAll();
1195 }
1196 itor.remove();
1197 } else {
1198 break;
1199 }
1200 }
1201 try {
1202 if (!calls.isEmpty()) {
1203 Call firstCall = calls.get(calls.firstKey());
1204 long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime();
1205 if (maxWaitTime < rpcTimeout) {
1206 rpcTimeout -= maxWaitTime;
1207 }
1208 }
1209 if (!shouldCloseConnection.get()) {
1210 closeException = null;
1211 setSocketTimeout(socket, (int) rpcTimeout);
1212 }
1213 } catch (SocketException e) {
1214 LOG.debug("Couldn't lower timeout, which may result in longer than expected calls");
1215 }
1216 }
1217 }
1218
1219 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
1220 justification="Presume sync not needed setting socket timeout")
1221 private static void setSocketTimeout(final Socket socket, final int rpcTimeout)
1222 throws java.net.SocketException {
1223 if (socket == null) return;
1224 socket.setSoTimeout(rpcTimeout);
1225 }
1226
1227
1228
1229
1230 @SuppressWarnings("serial")
1231 @InterfaceAudience.Public
1232 @InterfaceStability.Evolving
1233 public static class CallTimeoutException extends IOException {
1234 public CallTimeoutException(final String msg) {
1235 super(msg);
1236 }
1237 }
1238
1239
1240
1241
1242
1243
1244
1245 RpcClient(Configuration conf, String clusterId, SocketFactory factory) {
1246 this(conf, clusterId, factory, null);
1247 }
1248
1249
1250
1251
1252
1253
1254
1255
1256 RpcClient(Configuration conf, String clusterId, SocketFactory factory, SocketAddress localAddr) {
1257 this.maxIdleTime = conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
1258 this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
1259 this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1260 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1261 this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
1262 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
1263 this.pingInterval = getPingInterval(conf);
1264 this.ipcUtil = new IPCUtil(conf);
1265 this.conf = conf;
1266 this.codec = getCodec();
1267 this.compressor = getCompressor(conf);
1268 this.socketFactory = factory;
1269 this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
1270 this.connections = new PoolMap<ConnectionId, Connection>(getPoolType(conf), getPoolSize(conf));
1271 this.failedServers = new FailedServers(conf);
1272 this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
1273 IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
1274 this.localAddr = localAddr;
1275 this.userProvider = UserProvider.instantiate(conf);
1276
1277 if (LOG.isDebugEnabled()) {
1278 LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
1279 ", tcpKeepAlive=" + this.tcpKeepAlive +
1280 ", tcpNoDelay=" + this.tcpNoDelay +
1281 ", maxIdleTime=" + this.maxIdleTime +
1282 ", maxRetries=" + this.maxRetries +
1283 ", fallbackAllowed=" + this.fallbackAllowed +
1284 ", ping interval=" + this.pingInterval + "ms" +
1285 ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
1286 }
1287 }
1288
1289
1290
1291
1292
1293
1294 public RpcClient(Configuration conf, String clusterId) {
1295 this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null);
1296 }
1297
1298
1299
1300
1301
1302
1303
1304 public RpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
1305 this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr);
1306 }
1307
1308
1309
1310
1311
1312 Codec getCodec() {
1313
1314
1315 String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
1316 if (className == null || className.length() == 0) return null;
1317 try {
1318 return (Codec)Class.forName(className).newInstance();
1319 } catch (Exception e) {
1320 throw new RuntimeException("Failed getting codec " + className, e);
1321 }
1322 }
1323
1324 @VisibleForTesting
1325 public static String getDefaultCodec(final Configuration c) {
1326
1327
1328
1329 return c.get("hbase.client.default.rpc.codec", KeyValueCodec.class.getCanonicalName());
1330 }
1331
1332
1333
1334
1335
1336
1337 private static CompressionCodec getCompressor(final Configuration conf) {
1338 String className = conf.get("hbase.client.rpc.compressor", null);
1339 if (className == null || className.isEmpty()) return null;
1340 try {
1341 return (CompressionCodec)Class.forName(className).newInstance();
1342 } catch (Exception e) {
1343 throw new RuntimeException("Failed getting compressor " + className, e);
1344 }
1345 }
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362 protected static PoolType getPoolType(Configuration config) {
1363 return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
1364 PoolType.RoundRobin, PoolType.ThreadLocal);
1365 }
1366
1367
1368
1369
1370
1371
1372
1373
1374 protected static int getPoolSize(Configuration config) {
1375 return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
1376 }
1377
1378
1379
1380
1381
1382 SocketFactory getSocketFactory() {
1383 return socketFactory;
1384 }
1385
1386
1387
1388 public void stop() {
1389 if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");
1390 if (!running.compareAndSet(true, false)) return;
1391
1392
1393 synchronized (connections) {
1394 for (Connection conn : connections.values()) {
1395 conn.interrupt();
1396 }
1397 }
1398
1399
1400 while (!connections.isEmpty()) {
1401 try {
1402 Thread.sleep(100);
1403 } catch (InterruptedException ignored) {
1404 }
1405 }
1406 }
1407
1408 Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
1409 Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout)
1410 throws InterruptedException, IOException {
1411 return call(md, param, cells, returnType, ticket, addr, rpcTimeout, HConstants.NORMAL_QOS);
1412 }
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432 Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
1433 Message returnType, User ticket, InetSocketAddress addr,
1434 int rpcTimeout, int priority)
1435 throws InterruptedException, IOException {
1436 Call call = new Call(md, param, cells, returnType);
1437 Connection connection =
1438 getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor);
1439 connection.writeRequest(call, priority);
1440
1441
1442 synchronized (call) {
1443 while (!call.done) {
1444 if (connection.shouldCloseConnection.get()) {
1445 throw new IOException("Unexpected closed connection");
1446 }
1447 call.wait(1000);
1448 }
1449
1450 if (call.error != null) {
1451 if (call.error instanceof RemoteException) {
1452 call.error.fillInStackTrace();
1453 throw call.error;
1454 }
1455
1456 throw wrapException(addr, call.error);
1457 }
1458 return new Pair<Message, CellScanner>(call.response, call.cells);
1459 }
1460 }
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474 protected IOException wrapException(InetSocketAddress addr,
1475 IOException exception) {
1476 if (exception instanceof ConnectException) {
1477
1478 return (ConnectException)new ConnectException(
1479 "Call to " + addr + " failed on connection exception: " + exception).initCause(exception);
1480 } else if (exception instanceof SocketTimeoutException) {
1481 return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr +
1482 " failed because " + exception).initCause(exception);
1483 } else {
1484 return (IOException)new IOException("Call to " + addr + " failed on local exception: " +
1485 exception).initCause(exception);
1486 }
1487 }
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497 public void cancelConnections(String hostname, int port, IOException ioe) {
1498 synchronized (connections) {
1499 for (Connection connection : connections.values()) {
1500 if (connection.isAlive() &&
1501 connection.getRemoteAddress().getPort() == port &&
1502 connection.getRemoteAddress().getHostName().equals(hostname)) {
1503 LOG.info("The server on " + hostname + ":" + port +
1504 " is dead - stopping the connection " + connection.remoteId);
1505 connection.closeConnection();
1506
1507
1508 }
1509 }
1510 }
1511 }
1512
1513
1514
1515 protected Connection getConnection(User ticket, Call call, InetSocketAddress addr,
1516 int rpcTimeout, final Codec codec, final CompressionCodec compressor)
1517 throws IOException, InterruptedException {
1518 if (!running.get()) throw new StoppedRpcClientException();
1519 Connection connection;
1520 ConnectionId remoteId =
1521 new ConnectionId(ticket, call.md.getService().getName(), addr, rpcTimeout);
1522 synchronized (connections) {
1523 connection = connections.get(remoteId);
1524 if (connection == null) {
1525 connection = createConnection(remoteId, this.codec, this.compressor);
1526 connections.put(remoteId, connection);
1527 }
1528 }
1529 connection.addCall(call);
1530
1531
1532
1533
1534
1535
1536
1537
1538 connection.setupIOstreams();
1539 return connection;
1540 }
1541
1542
1543
1544
1545
1546 protected static class ConnectionId {
1547 final InetSocketAddress address;
1548 final User ticket;
1549 final int rpcTimeout;
1550 private static final int PRIME = 16777619;
1551 final String serviceName;
1552
1553 ConnectionId(User ticket,
1554 String serviceName,
1555 InetSocketAddress address,
1556 int rpcTimeout) {
1557 this.address = address;
1558 this.ticket = ticket;
1559 this.rpcTimeout = rpcTimeout;
1560 this.serviceName = serviceName;
1561 }
1562
1563 String getServiceName() {
1564 return this.serviceName;
1565 }
1566
1567 InetSocketAddress getAddress() {
1568 return address;
1569 }
1570
1571 User getTicket() {
1572 return ticket;
1573 }
1574
1575 @Override
1576 public String toString() {
1577 return this.address.toString() + "/" + this.serviceName + "/" + this.ticket + "/" +
1578 this.rpcTimeout;
1579 }
1580
1581 @Override
1582 public boolean equals(Object obj) {
1583 if (obj instanceof ConnectionId) {
1584 ConnectionId id = (ConnectionId) obj;
1585 return address.equals(id.address) &&
1586 ((ticket != null && ticket.equals(id.ticket)) ||
1587 (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout &&
1588 this.serviceName == id.serviceName;
1589 }
1590 return false;
1591 }
1592
1593 @Override
1594 public int hashCode() {
1595 int hashcode = (address.hashCode() +
1596 PRIME * (PRIME * this.serviceName.hashCode() ^
1597 (ticket == null ? 0 : ticket.hashCode()) )) ^
1598 rpcTimeout;
1599 return hashcode;
1600 }
1601 }
1602
1603 public static void setRpcTimeout(int t) {
1604 rpcTimeout.set(t);
1605 }
1606
1607 public static int getRpcTimeout() {
1608 return rpcTimeout.get();
1609 }
1610
1611
1612
1613
1614
1615 public static int getRpcTimeout(int defaultTimeout) {
1616 return Math.min(defaultTimeout, rpcTimeout.get());
1617 }
1618
1619 public static void resetRpcTimeout() {
1620 rpcTimeout.remove();
1621 }
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639 Message callBlockingMethod(MethodDescriptor md, RpcController controller,
1640 Message param, Message returnType, final User ticket, final InetSocketAddress isa,
1641 final int rpcTimeout)
1642 throws ServiceException {
1643 long startTime = 0;
1644 if (LOG.isTraceEnabled()) {
1645 startTime = System.currentTimeMillis();
1646 }
1647 PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller;
1648 CellScanner cells = null;
1649 if (pcrc != null) {
1650 cells = pcrc.cellScanner();
1651
1652 pcrc.setCellScanner(null);
1653 }
1654 Pair<Message, CellScanner> val = null;
1655 try {
1656 val = call(md, param, cells, returnType, ticket, isa, rpcTimeout,
1657 pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
1658 if (pcrc != null) {
1659
1660 if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond());
1661 } else if (val.getSecond() != null) {
1662 throw new ServiceException("Client dropping data on the floor!");
1663 }
1664
1665 if (LOG.isTraceEnabled()) {
1666 long callTime = System.currentTimeMillis() - startTime;
1667 if (LOG.isTraceEnabled()) {
1668 LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
1669 }
1670 }
1671 return val.getFirst();
1672 } catch (Throwable e) {
1673 throw new ServiceException(e);
1674 }
1675 }
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685 public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn,
1686 final User ticket, final int rpcTimeout) {
1687 return new BlockingRpcChannelImplementation(this, sn, ticket, rpcTimeout);
1688 }
1689
1690
1691
1692
1693
1694 public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
1695 private final InetSocketAddress isa;
1696 private volatile RpcClient rpcClient;
1697 private final int rpcTimeout;
1698 private final User ticket;
1699
1700 protected BlockingRpcChannelImplementation(final RpcClient rpcClient, final ServerName sn,
1701 final User ticket, final int rpcTimeout) {
1702 this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
1703 this.rpcClient = rpcClient;
1704
1705
1706 this.rpcTimeout = getRpcTimeout(rpcTimeout);
1707 this.ticket = ticket;
1708 }
1709
1710 @Override
1711 public Message callBlockingMethod(MethodDescriptor md, RpcController controller,
1712 Message param, Message returnType)
1713 throws ServiceException {
1714 return this.rpcClient.callBlockingMethod(md, controller, param, returnType, this.ticket,
1715 this.isa, this.rpcTimeout);
1716 }
1717 }
1718 }