1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.lang.reflect.Constructor;
25 import java.lang.reflect.UndeclaredThrowableException;
26 import java.net.SocketException;
27 import java.util.ArrayList;
28 import java.util.Date;
29 import java.util.HashSet;
30 import java.util.LinkedHashMap;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.NavigableMap;
35 import java.util.Set;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ConcurrentMap;
38 import java.util.concurrent.ConcurrentSkipListMap;
39 import java.util.concurrent.ConcurrentSkipListSet;
40 import java.util.concurrent.CopyOnWriteArraySet;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.LinkedBlockingQueue;
43 import java.util.concurrent.ThreadPoolExecutor;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.atomic.AtomicBoolean;
46 import java.util.concurrent.atomic.AtomicInteger;
47
48 import org.apache.commons.logging.Log;
49 import org.apache.commons.logging.LogFactory;
50 import org.apache.hadoop.classification.InterfaceAudience;
51 import org.apache.hadoop.classification.InterfaceStability;
52 import org.apache.hadoop.conf.Configuration;
53 import org.apache.hadoop.hbase.Chore;
54 import org.apache.hadoop.hbase.HBaseConfiguration;
55 import org.apache.hadoop.hbase.HConstants;
56 import org.apache.hadoop.hbase.HRegionInfo;
57 import org.apache.hadoop.hbase.HRegionLocation;
58 import org.apache.hadoop.hbase.HTableDescriptor;
59 import org.apache.hadoop.hbase.MasterNotRunningException;
60 import org.apache.hadoop.hbase.RegionTooBusyException;
61 import org.apache.hadoop.hbase.ServerName;
62 import org.apache.hadoop.hbase.Stoppable;
63 import org.apache.hadoop.hbase.TableName;
64 import org.apache.hadoop.hbase.TableNotEnabledException;
65 import org.apache.hadoop.hbase.TableNotFoundException;
66 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
67 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
68 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
69 import org.apache.hadoop.hbase.client.coprocessor.Batch;
70 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
71 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
72 import org.apache.hadoop.hbase.ipc.RpcClient;
73 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
74 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
75 import org.apache.hadoop.hbase.protobuf.RequestConverter;
76 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
77 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
78 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
79 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
80 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
81 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
82 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
83 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
84 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
85 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
86 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
87 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
88 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
89 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
90 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
91 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
92 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
93 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
94 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
95 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
96 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
97 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
98 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
99 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
100 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
101 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
102 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
103 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
104 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
105 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
106 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
107 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
108 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
109 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
110 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
111 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
113 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
114 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
115 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.*;
117 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
118 import org.apache.hadoop.hbase.security.User;
119 import org.apache.hadoop.hbase.security.UserProvider;
120 import org.apache.hadoop.hbase.util.Bytes;
121 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
122 import org.apache.hadoop.hbase.util.ExceptionUtil;
123 import org.apache.hadoop.hbase.util.Threads;
124 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
125 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
126 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
127 import org.apache.hadoop.ipc.RemoteException;
128 import org.apache.zookeeper.KeeperException;
129
130 import com.google.common.annotations.VisibleForTesting;
131 import com.google.protobuf.BlockingRpcChannel;
132 import com.google.protobuf.RpcController;
133 import com.google.protobuf.ServiceException;
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197 @SuppressWarnings("serial")
198 @InterfaceAudience.Public
199 @InterfaceStability.Evolving
200 public class HConnectionManager {
201 static final Log LOG = LogFactory.getLog(HConnectionManager.class);
202
203 public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
204 private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
205
206
207
208
209 static final Map<HConnectionKey, HConnectionImplementation> CONNECTION_INSTANCES;
210
211 public static final int MAX_CACHED_CONNECTION_INSTANCES;
212
213
214
215
216
217 private static volatile NonceGenerator nonceGenerator = null;
218
219 private static Object nonceGeneratorCreateLock = new Object();
220
221 static {
222
223
224
225
226 MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt(
227 HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
228 CONNECTION_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
229 (int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) {
230 @Override
231 protected boolean removeEldestEntry(
232 Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
233 return size() > MAX_CACHED_CONNECTION_INSTANCES;
234 }
235 };
236 }
237
238
239
240
241 private HConnectionManager() {
242 super();
243 }
244
245
246
247
248
249
250 @VisibleForTesting
251 public static NonceGenerator injectNonceGeneratorForTesting(
252 HConnection conn, NonceGenerator cnm) {
253 NonceGenerator ng = conn.getNonceGenerator();
254 LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName());
255 ((HConnectionImplementation)conn).nonceGenerator = cnm;
256 return ng;
257 }
258
259
260
261
262
263
264
265
266
267
268 @Deprecated
269 public static HConnection getConnection(final Configuration conf)
270 throws IOException {
271 HConnectionKey connectionKey = new HConnectionKey(conf);
272 synchronized (CONNECTION_INSTANCES) {
273 HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
274 if (connection == null) {
275 connection = (HConnectionImplementation)createConnection(conf, true);
276 CONNECTION_INSTANCES.put(connectionKey, connection);
277 } else if (connection.isClosed()) {
278 HConnectionManager.deleteConnection(connectionKey, true);
279 connection = (HConnectionImplementation)createConnection(conf, true);
280 CONNECTION_INSTANCES.put(connectionKey, connection);
281 }
282 connection.incCount();
283 return connection;
284 }
285 }
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307 public static HConnection createConnection(Configuration conf)
308 throws IOException {
309 UserProvider provider = UserProvider.instantiate(conf);
310 return createConnection(conf, false, null, provider.getCurrent());
311 }
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333 public static HConnection createConnection(Configuration conf, ExecutorService pool)
334 throws IOException {
335 UserProvider provider = UserProvider.instantiate(conf);
336 return createConnection(conf, false, pool, provider.getCurrent());
337 }
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359 public static HConnection createConnection(Configuration conf, User user)
360 throws IOException {
361 return createConnection(conf, false, null, user);
362 }
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385 public static HConnection createConnection(Configuration conf, ExecutorService pool, User user)
386 throws IOException {
387 return createConnection(conf, false, pool, user);
388 }
389
390 @Deprecated
391 static HConnection createConnection(final Configuration conf, final boolean managed)
392 throws IOException {
393 UserProvider provider = UserProvider.instantiate(conf);
394 return createConnection(conf, managed, null, provider.getCurrent());
395 }
396
397 @Deprecated
398 static HConnection createConnection(final Configuration conf, final boolean managed,
399 final ExecutorService pool, final User user)
400 throws IOException {
401 String className = conf.get("hbase.client.connection.impl",
402 HConnectionManager.HConnectionImplementation.class.getName());
403 Class<?> clazz = null;
404 try {
405 clazz = Class.forName(className);
406 } catch (ClassNotFoundException e) {
407 throw new IOException(e);
408 }
409 try {
410
411 Constructor<?> constructor =
412 clazz.getDeclaredConstructor(Configuration.class,
413 boolean.class, ExecutorService.class, User.class);
414 constructor.setAccessible(true);
415 return (HConnection) constructor.newInstance(conf, managed, pool, user);
416 } catch (Exception e) {
417 throw new IOException(e);
418 }
419 }
420
421
422
423
424
425
426
427
428
429 public static void deleteConnection(Configuration conf) {
430 deleteConnection(new HConnectionKey(conf), false);
431 }
432
433
434
435
436
437
438
439
440 public static void deleteStaleConnection(HConnection connection) {
441 deleteConnection(connection, true);
442 }
443
444
445
446
447
448
449
450 public static void deleteAllConnections(boolean staleConnection) {
451 synchronized (CONNECTION_INSTANCES) {
452 Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
453 connectionKeys.addAll(CONNECTION_INSTANCES.keySet());
454 for (HConnectionKey connectionKey : connectionKeys) {
455 deleteConnection(connectionKey, staleConnection);
456 }
457 CONNECTION_INSTANCES.clear();
458 }
459 }
460
461
462
463
464
465 @Deprecated
466 public static void deleteAllConnections() {
467 deleteAllConnections(false);
468 }
469
470
471 @Deprecated
472 private static void deleteConnection(HConnection connection, boolean staleConnection) {
473 synchronized (CONNECTION_INSTANCES) {
474 for (Entry<HConnectionKey, HConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
475 if (e.getValue() == connection) {
476 deleteConnection(e.getKey(), staleConnection);
477 break;
478 }
479 }
480 }
481 }
482
483 @Deprecated
484 private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
485 synchronized (CONNECTION_INSTANCES) {
486 HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
487 if (connection != null) {
488 connection.decCount();
489 if (connection.isZeroReference() || staleConnection) {
490 CONNECTION_INSTANCES.remove(connectionKey);
491 connection.internalClose();
492 }
493 } else {
494 LOG.error("Connection not found in the list, can't delete it "+
495 "(connection key=" + connectionKey + "). May be the key was modified?", new Exception());
496 }
497 }
498 }
499
500
501
502
503
504
505
506 static int getCachedRegionCount(Configuration conf, final TableName tableName)
507 throws IOException {
508 return execute(new HConnectable<Integer>(conf) {
509 @Override
510 public Integer connect(HConnection connection) {
511 return ((HConnectionImplementation)connection).getNumberOfCachedRegionLocations(tableName);
512 }
513 });
514 }
515
516
517
518
519
520
521
522 static boolean isRegionCached(Configuration conf,
523 final TableName tableName,
524 final byte[] row)
525 throws IOException {
526 return execute(new HConnectable<Boolean>(conf) {
527 @Override
528 public Boolean connect(HConnection connection) {
529 return ((HConnectionImplementation) connection).isRegionCached(tableName, row);
530 }
531 });
532 }
533
534
535
536
537
538
539
540
541
542
543
544 @InterfaceAudience.Private
545 public static <T> T execute(HConnectable<T> connectable) throws IOException {
546 if (connectable == null || connectable.conf == null) {
547 return null;
548 }
549 Configuration conf = connectable.conf;
550 HConnection connection = HConnectionManager.getConnection(conf);
551 boolean connectSucceeded = false;
552 try {
553 T returnValue = connectable.connect(connection);
554 connectSucceeded = true;
555 return returnValue;
556 } finally {
557 try {
558 connection.close();
559 } catch (Exception e) {
560 ExceptionUtil.rethrowIfInterrupt(e);
561 if (connectSucceeded) {
562 throw new IOException("The connection to " + connection
563 + " could not be deleted.", e);
564 }
565 }
566 }
567 }
568
569
570 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
571 value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
572 justification="Access to the conncurrent hash map is under a lock so should be fine.")
573 static class HConnectionImplementation implements HConnection, Closeable {
574 static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
575 private final long pause;
576 private final int numTries;
577 final int rpcTimeout;
578 private NonceGenerator nonceGenerator = null;
579 private final int prefetchRegionLimit;
580
581 private volatile boolean closed;
582 private volatile boolean aborted;
583
584
585 ClusterStatusListener clusterStatusListener;
586
587 private final Object userRegionLock = new Object();
588
589
590
591
592
593
594 private final Object masterAndZKLock = new Object();
595
596 private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
597 private final DelayedClosing delayedClosing =
598 DelayedClosing.createAndStart(this);
599
600
601
602 private volatile ExecutorService batchPool = null;
603 private volatile boolean cleanupPool = false;
604
605 private final Configuration conf;
606
607
608 private RpcClient rpcClient;
609
610
611
612
613 private final ConcurrentMap<TableName, ConcurrentSkipListMap<byte[], HRegionLocation>>
614 cachedRegionLocations =
615 new ConcurrentHashMap<TableName, ConcurrentSkipListMap<byte[], HRegionLocation>>();
616
617
618
619
620
621
622 private final Set<ServerName> cachedServers = new ConcurrentSkipListSet<ServerName>();
623
624
625
626 private final Set<Integer> regionCachePrefetchDisabledTables =
627 new CopyOnWriteArraySet<Integer>();
628
629 private int refCount;
630
631
632 private boolean managed;
633
634 private User user;
635
636
637
638
639 Registry registry;
640
641 HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
642 this(conf, managed, null, null);
643 }
644
645
646
647
648
649
650
651
652
653
654
655
656 HConnectionImplementation(Configuration conf, boolean managed,
657 ExecutorService pool, User user) throws IOException {
658 this(conf);
659 this.user = user;
660 this.batchPool = pool;
661 this.managed = managed;
662 this.registry = setupRegistry();
663 retrieveClusterId();
664
665 this.rpcClient = new RpcClient(this.conf, this.clusterId);
666
667
668 boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
669 HConstants.STATUS_PUBLISHED_DEFAULT);
670 Class<? extends ClusterStatusListener.Listener> listenerClass =
671 conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
672 ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
673 ClusterStatusListener.Listener.class);
674 if (shouldListen) {
675 if (listenerClass == null) {
676 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
677 ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
678 } else {
679 clusterStatusListener = new ClusterStatusListener(
680 new ClusterStatusListener.DeadServerHandler() {
681 @Override
682 public void newDead(ServerName sn) {
683 clearCaches(sn);
684 rpcClient.cancelConnections(sn.getHostname(), sn.getPort(),
685 new SocketException(sn.getServerName() +
686 " is dead: closing its connection."));
687 }
688 }, conf, listenerClass);
689 }
690 }
691 }
692
693
694 private static class NoNonceGenerator implements NonceGenerator {
695 @Override
696 public long getNonceGroup() {
697 return HConstants.NO_NONCE;
698 }
699 @Override
700 public long newNonce() {
701 return HConstants.NO_NONCE;
702 }
703 }
704
705
706
707
708 protected HConnectionImplementation(Configuration conf) {
709 this.conf = conf;
710 this.closed = false;
711 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
712 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
713 this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
714 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
715 this.rpcTimeout = conf.getInt(
716 HConstants.HBASE_RPC_TIMEOUT_KEY,
717 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
718 if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
719 synchronized (HConnectionManager.nonceGeneratorCreateLock) {
720 if (HConnectionManager.nonceGenerator == null) {
721 HConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();
722 }
723 this.nonceGenerator = HConnectionManager.nonceGenerator;
724 }
725 } else {
726 this.nonceGenerator = new NoNonceGenerator();
727 }
728
729 this.prefetchRegionLimit = conf.getInt(
730 HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
731 HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
732 }
733
734 @Override
735 public HTableInterface getTable(String tableName) throws IOException {
736 return getTable(TableName.valueOf(tableName));
737 }
738
739 @Override
740 public HTableInterface getTable(byte[] tableName) throws IOException {
741 return getTable(TableName.valueOf(tableName));
742 }
743
744 @Override
745 public HTableInterface getTable(TableName tableName) throws IOException {
746 return getTable(tableName, getBatchPool());
747 }
748
749 @Override
750 public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
751 return getTable(TableName.valueOf(tableName), pool);
752 }
753
754 @Override
755 public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
756 return getTable(TableName.valueOf(tableName), pool);
757 }
758
759 @Override
760 public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
761 if (managed) {
762 throw new IOException("The connection has to be unmanaged.");
763 }
764 return new HTable(tableName, this, pool);
765 }
766
767 private ExecutorService getBatchPool() {
768 if (batchPool == null) {
769
770 synchronized (this) {
771 if (batchPool == null) {
772 int maxThreads = conf.getInt("hbase.hconnection.threads.max", 256);
773 int coreThreads = conf.getInt("hbase.hconnection.threads.core", 256);
774 if (maxThreads == 0) {
775 maxThreads = Runtime.getRuntime().availableProcessors() * 8;
776 }
777 if (coreThreads == 0) {
778 coreThreads = Runtime.getRuntime().availableProcessors() * 8;
779 }
780 long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
781 LinkedBlockingQueue<Runnable> workQueue =
782 new LinkedBlockingQueue<Runnable>(maxThreads *
783 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
784 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
785 ThreadPoolExecutor tpe = new ThreadPoolExecutor(
786 coreThreads,
787 maxThreads,
788 keepAliveTime,
789 TimeUnit.SECONDS,
790 workQueue,
791 Threads.newDaemonThreadFactory(toString() + "-shared-"));
792 tpe.allowCoreThreadTimeOut(true);
793 this.batchPool = tpe;
794 }
795 this.cleanupPool = true;
796 }
797 }
798 return this.batchPool;
799 }
800
801 protected ExecutorService getCurrentBatchPool() {
802 return batchPool;
803 }
804
805 private void shutdownBatchPool() {
806 if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
807 this.batchPool.shutdown();
808 try {
809 if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) {
810 this.batchPool.shutdownNow();
811 }
812 } catch (InterruptedException e) {
813 this.batchPool.shutdownNow();
814 }
815 }
816 }
817
818
819
820
821
822 private Registry setupRegistry() throws IOException {
823 String registryClass = this.conf.get("hbase.client.registry.impl",
824 ZooKeeperRegistry.class.getName());
825 Registry registry = null;
826 try {
827 registry = (Registry)Class.forName(registryClass).newInstance();
828 } catch (Throwable t) {
829 throw new IOException(t);
830 }
831 registry.init(this);
832 return registry;
833 }
834
835
836
837
838
839
840 RpcClient setRpcClient(final RpcClient rpcClient) {
841 RpcClient oldRpcClient = this.rpcClient;
842 this.rpcClient = rpcClient;
843 return oldRpcClient;
844 }
845
846
847
848
849
850 public String toString(){
851 return "hconnection-0x" + Integer.toHexString(hashCode());
852 }
853
854 protected String clusterId = null;
855
856 void retrieveClusterId() {
857 if (clusterId != null) return;
858 this.clusterId = this.registry.getClusterId();
859 if (clusterId == null) {
860 clusterId = HConstants.CLUSTER_ID_DEFAULT;
861 LOG.debug("clusterid came back null, using default " + clusterId);
862 }
863 }
864
865 @Override
866 public Configuration getConfiguration() {
867 return this.conf;
868 }
869
870 private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
871 throws MasterNotRunningException {
872 String errorMsg;
873 try {
874 if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
875 errorMsg = "The node " + zkw.baseZNode+" is not in ZooKeeper. "
876 + "It should have been written by the master. "
877 + "Check the value configured in 'zookeeper.znode.parent'. "
878 + "There could be a mismatch with the one configured in the master.";
879 LOG.error(errorMsg);
880 throw new MasterNotRunningException(errorMsg);
881 }
882 } catch (KeeperException e) {
883 errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage();
884 LOG.error(errorMsg);
885 throw new MasterNotRunningException(errorMsg, e);
886 }
887 }
888
889
890
891
892
893
894 @Override
895 public boolean isMasterRunning()
896 throws MasterNotRunningException, ZooKeeperConnectionException {
897
898
899
900 MasterKeepAliveConnection m = getKeepAliveMasterService();
901 m.close();
902 return true;
903 }
904
905 @Override
906 public HRegionLocation getRegionLocation(final TableName tableName,
907 final byte [] row, boolean reload)
908 throws IOException {
909 return reload? relocateRegion(tableName, row): locateRegion(tableName, row);
910 }
911
912 @Override
913 public HRegionLocation getRegionLocation(final byte[] tableName,
914 final byte [] row, boolean reload)
915 throws IOException {
916 return getRegionLocation(TableName.valueOf(tableName), row, reload);
917 }
918
919 @Override
920 public boolean isTableEnabled(TableName tableName) throws IOException {
921 return this.registry.isTableOnlineState(tableName, true);
922 }
923
924 @Override
925 public boolean isTableEnabled(byte[] tableName) throws IOException {
926 return isTableEnabled(TableName.valueOf(tableName));
927 }
928
929 @Override
930 public boolean isTableDisabled(TableName tableName) throws IOException {
931 return this.registry.isTableOnlineState(tableName, false);
932 }
933
934 @Override
935 public boolean isTableDisabled(byte[] tableName) throws IOException {
936 return isTableDisabled(TableName.valueOf(tableName));
937 }
938
939 @Override
940 public boolean isTableAvailable(final TableName tableName) throws IOException {
941 final AtomicBoolean available = new AtomicBoolean(true);
942 final AtomicInteger regionCount = new AtomicInteger(0);
943 MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
944 @Override
945 public boolean processRow(Result row) throws IOException {
946 HRegionInfo info = MetaScanner.getHRegionInfo(row);
947 if (info != null && !info.isSplitParent()) {
948 if (tableName.equals(info.getTable())) {
949 ServerName server = HRegionInfo.getServerName(row);
950 if (server == null) {
951 available.set(false);
952 return false;
953 }
954 regionCount.incrementAndGet();
955 } else if (tableName.compareTo(info.getTable()) < 0) {
956
957 return false;
958 }
959 }
960 return true;
961 }
962 };
963 MetaScanner.metaScan(conf, this, visitor, tableName);
964 return available.get() && (regionCount.get() > 0);
965 }
966
967 @Override
968 public boolean isTableAvailable(final byte[] tableName) throws IOException {
969 return isTableAvailable(TableName.valueOf(tableName));
970 }
971
972 @Override
973 public boolean isTableAvailable(final TableName tableName, final byte[][] splitKeys)
974 throws IOException {
975 final AtomicBoolean available = new AtomicBoolean(true);
976 final AtomicInteger regionCount = new AtomicInteger(0);
977 MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
978 @Override
979 public boolean processRow(Result row) throws IOException {
980 HRegionInfo info = MetaScanner.getHRegionInfo(row);
981 if (info != null && !info.isSplitParent()) {
982 if (tableName.equals(info.getTable())) {
983 ServerName server = HRegionInfo.getServerName(row);
984 if (server == null) {
985 available.set(false);
986 return false;
987 }
988 if (!Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
989 for (byte[] splitKey : splitKeys) {
990
991 if (Bytes.equals(info.getStartKey(), splitKey)) {
992 regionCount.incrementAndGet();
993 break;
994 }
995 }
996 } else {
997
998 regionCount.incrementAndGet();
999 }
1000 } else if (tableName.compareTo(info.getTable()) < 0) {
1001
1002 return false;
1003 }
1004 }
1005 return true;
1006 }
1007 };
1008 MetaScanner.metaScan(conf, this, visitor, tableName);
1009
1010 return available.get() && (regionCount.get() == splitKeys.length + 1);
1011 }
1012
1013 @Override
1014 public boolean isTableAvailable(final byte[] tableName, final byte[][] splitKeys)
1015 throws IOException {
1016 return isTableAvailable(TableName.valueOf(tableName), splitKeys);
1017 }
1018
1019 @Override
1020 public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
1021 return locateRegion(HRegionInfo.getTable(regionName),
1022 HRegionInfo.getStartKey(regionName), false, true);
1023 }
1024
1025 @Override
1026 public boolean isDeadServer(ServerName sn) {
1027 if (clusterStatusListener == null) {
1028 return false;
1029 } else {
1030 return clusterStatusListener.isDeadServer(sn);
1031 }
1032 }
1033
1034 @Override
1035 public List<HRegionLocation> locateRegions(final TableName tableName)
1036 throws IOException {
1037 return locateRegions (tableName, false, true);
1038 }
1039
1040 @Override
1041 public List<HRegionLocation> locateRegions(final byte[] tableName)
1042 throws IOException {
1043 return locateRegions(TableName.valueOf(tableName));
1044 }
1045
1046 @Override
1047 public List<HRegionLocation> locateRegions(final TableName tableName,
1048 final boolean useCache, final boolean offlined) throws IOException {
1049 NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(conf, this,
1050 tableName, offlined);
1051 final List<HRegionLocation> locations = new ArrayList<HRegionLocation>();
1052 for (HRegionInfo regionInfo : regions.keySet()) {
1053 locations.add(locateRegion(tableName, regionInfo.getStartKey(), useCache, true));
1054 }
1055 return locations;
1056 }
1057
1058 @Override
1059 public List<HRegionLocation> locateRegions(final byte[] tableName,
1060 final boolean useCache, final boolean offlined) throws IOException {
1061 return locateRegions(TableName.valueOf(tableName), useCache, offlined);
1062 }
1063
1064 @Override
1065 public HRegionLocation locateRegion(final TableName tableName,
1066 final byte [] row)
1067 throws IOException{
1068 return locateRegion(tableName, row, true, true);
1069 }
1070
1071 @Override
1072 public HRegionLocation locateRegion(final byte[] tableName,
1073 final byte [] row)
1074 throws IOException{
1075 return locateRegion(TableName.valueOf(tableName), row);
1076 }
1077
1078 @Override
1079 public HRegionLocation relocateRegion(final TableName tableName,
1080 final byte [] row) throws IOException{
1081
1082
1083
1084 if (isTableDisabled(tableName)) {
1085 throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
1086 }
1087
1088 return locateRegion(tableName, row, false, true);
1089 }
1090
1091 @Override
1092 public HRegionLocation relocateRegion(final byte[] tableName,
1093 final byte [] row) throws IOException {
1094 return relocateRegion(TableName.valueOf(tableName), row);
1095 }
1096
1097
1098 private HRegionLocation locateRegion(final TableName tableName,
1099 final byte [] row, boolean useCache, boolean retry)
1100 throws IOException {
1101 if (this.closed) throw new IOException(toString() + " closed");
1102 if (tableName== null || tableName.getName().length == 0) {
1103 throw new IllegalArgumentException(
1104 "table name cannot be null or zero length");
1105 }
1106
1107 if (tableName.equals(TableName.META_TABLE_NAME)) {
1108 return this.registry.getMetaRegionLocation();
1109 } else {
1110
1111 return locateRegionInMeta(TableName.META_TABLE_NAME, tableName, row,
1112 useCache, userRegionLock, retry);
1113 }
1114 }
1115
1116
1117
1118
1119
1120
1121 private void prefetchRegionCache(final TableName tableName,
1122 final byte[] row) {
1123
1124
1125 MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
1126 public boolean processRow(Result result) throws IOException {
1127 try {
1128 HRegionInfo regionInfo = MetaScanner.getHRegionInfo(result);
1129 if (regionInfo == null) {
1130 return true;
1131 }
1132
1133
1134 if (!regionInfo.getTable().equals(tableName)) {
1135 return false;
1136 }
1137 if (regionInfo.isOffline()) {
1138
1139 return true;
1140 }
1141
1142 ServerName serverName = HRegionInfo.getServerName(result);
1143 if (serverName == null) {
1144 return true;
1145 }
1146
1147 long seqNum = HRegionInfo.getSeqNumDuringOpen(result);
1148 HRegionLocation loc = new HRegionLocation(regionInfo, serverName, seqNum);
1149
1150 cacheLocation(tableName, null, loc);
1151 return true;
1152 } catch (RuntimeException e) {
1153 throw new IOException(e);
1154 }
1155 }
1156 };
1157 try {
1158
1159 MetaScanner.metaScan(conf, this, visitor, tableName, row,
1160 this.prefetchRegionLimit, TableName.META_TABLE_NAME);
1161 } catch (IOException e) {
1162 if (ExceptionUtil.isInterrupt(e)) {
1163 Thread.currentThread().interrupt();
1164 } else {
1165 LOG.warn("Encountered problems when prefetch hbase:meta table: ", e);
1166 }
1167 }
1168 }
1169
1170
1171
1172
1173
1174 private HRegionLocation locateRegionInMeta(final TableName parentTable,
1175 final TableName tableName, final byte [] row, boolean useCache,
1176 Object regionLockObject, boolean retry)
1177 throws IOException {
1178 HRegionLocation location;
1179
1180
1181 if (useCache) {
1182 location = getCachedLocation(tableName, row);
1183 if (location != null) {
1184 return location;
1185 }
1186 }
1187 int localNumRetries = retry ? numTries : 1;
1188
1189
1190
1191 byte [] metaKey = HRegionInfo.createRegionName(tableName, row,
1192 HConstants.NINES, false);
1193 for (int tries = 0; true; tries++) {
1194 if (tries >= localNumRetries) {
1195 throw new NoServerForRegionException("Unable to find region for "
1196 + Bytes.toStringBinary(row) + " after " + numTries + " tries.");
1197 }
1198
1199 HRegionLocation metaLocation = null;
1200 try {
1201
1202 metaLocation = locateRegion(parentTable, metaKey, true, false);
1203
1204 if (metaLocation == null) continue;
1205 ClientService.BlockingInterface service = getClient(metaLocation.getServerName());
1206
1207 Result regionInfoRow;
1208
1209
1210
1211 if (useCache) {
1212 if (TableName.META_TABLE_NAME.equals(parentTable) &&
1213 getRegionCachePrefetch(tableName)) {
1214 synchronized (regionLockObject) {
1215
1216
1217 location = getCachedLocation(tableName, row);
1218 if (location != null) {
1219 return location;
1220 }
1221
1222
1223 prefetchRegionCache(tableName, row);
1224 }
1225 }
1226 location = getCachedLocation(tableName, row);
1227 if (location != null) {
1228 return location;
1229 }
1230 } else {
1231
1232
1233 forceDeleteCachedLocation(tableName, row);
1234 }
1235
1236
1237 regionInfoRow =
1238 ProtobufUtil.getRowOrBefore(service, metaLocation.getRegionInfo().getRegionName(),
1239 metaKey, HConstants.CATALOG_FAMILY);
1240
1241 if (regionInfoRow == null) {
1242 throw new TableNotFoundException(tableName);
1243 }
1244
1245
1246 HRegionInfo regionInfo = MetaScanner.getHRegionInfo(regionInfoRow);
1247 if (regionInfo == null) {
1248 throw new IOException("HRegionInfo was null or empty in " +
1249 parentTable + ", row=" + regionInfoRow);
1250 }
1251
1252
1253 if (!regionInfo.getTable().equals(tableName)) {
1254 throw new TableNotFoundException(
1255 "Table '" + tableName + "' was not found, got: " +
1256 regionInfo.getTable() + ".");
1257 }
1258 if (regionInfo.isSplit()) {
1259 throw new RegionOfflineException("the only available region for" +
1260 " the required row is a split parent," +
1261 " the daughters should be online soon: " +
1262 regionInfo.getRegionNameAsString());
1263 }
1264 if (regionInfo.isOffline()) {
1265 throw new RegionOfflineException("the region is offline, could" +
1266 " be caused by a disable table call: " +
1267 regionInfo.getRegionNameAsString());
1268 }
1269
1270 ServerName serverName = HRegionInfo.getServerName(regionInfoRow);
1271 if (serverName == null) {
1272 throw new NoServerForRegionException("No server address listed " +
1273 "in " + parentTable + " for region " +
1274 regionInfo.getRegionNameAsString() + " containing row " +
1275 Bytes.toStringBinary(row));
1276 }
1277
1278 if (isDeadServer(serverName)){
1279 throw new RegionServerStoppedException("hbase:meta says the region "+
1280 regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
1281 ", but it is dead.");
1282 }
1283
1284
1285 location = new HRegionLocation(regionInfo, serverName,
1286 HRegionInfo.getSeqNumDuringOpen(regionInfoRow));
1287 cacheLocation(tableName, null, location);
1288 return location;
1289 } catch (TableNotFoundException e) {
1290
1291
1292
1293 throw e;
1294 } catch (IOException e) {
1295 ExceptionUtil.rethrowIfInterrupt(e);
1296
1297 if (e instanceof RemoteException) {
1298 e = ((RemoteException)e).unwrapRemoteException();
1299 }
1300 if (tries < numTries - 1) {
1301 if (LOG.isDebugEnabled()) {
1302 LOG.debug("locateRegionInMeta parentTable=" +
1303 parentTable + ", metaLocation=" +
1304 ((metaLocation == null)? "null": "{" + metaLocation + "}") +
1305 ", attempt=" + tries + " of " +
1306 this.numTries + " failed; retrying after sleep of " +
1307 ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
1308 }
1309 } else {
1310 throw e;
1311 }
1312
1313 if(!(e instanceof RegionOfflineException ||
1314 e instanceof NoServerForRegionException)) {
1315 relocateRegion(parentTable, metaKey);
1316 }
1317 }
1318 try{
1319 Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
1320 } catch (InterruptedException e) {
1321 throw new InterruptedIOException("Giving up trying to location region in " +
1322 "meta: thread is interrupted.");
1323 }
1324 }
1325 }
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335 HRegionLocation getCachedLocation(final TableName tableName,
1336 final byte [] row) {
1337 ConcurrentSkipListMap<byte[], HRegionLocation> tableLocations =
1338 getTableLocations(tableName);
1339
1340 Entry<byte[], HRegionLocation> e = tableLocations.floorEntry(row);
1341 if (e == null) {
1342 return null;
1343 }
1344 HRegionLocation possibleRegion = e.getValue();
1345
1346
1347
1348
1349
1350
1351 byte[] endKey = possibleRegion.getRegionInfo().getEndKey();
1352 if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
1353 tableName.getRowComparator().compareRows(
1354 endKey, 0, endKey.length, row, 0, row.length) > 0) {
1355 return possibleRegion;
1356 }
1357
1358
1359 return null;
1360 }
1361
1362
1363
1364
1365
1366
1367 void forceDeleteCachedLocation(final TableName tableName, final byte [] row) {
1368 HRegionLocation rl = null;
1369 Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
1370
1371
1372 rl = getCachedLocation(tableName, row);
1373 if (rl != null) {
1374 tableLocations.remove(rl.getRegionInfo().getStartKey());
1375 }
1376 if ((rl != null) && LOG.isDebugEnabled()) {
1377 LOG.debug("Removed " + rl.getHostname() + ":" + rl.getPort()
1378 + " as a location of " + rl.getRegionInfo().getRegionNameAsString() +
1379 " for tableName=" + tableName + " from cache");
1380 }
1381 }
1382
1383
1384
1385
1386 @Override
1387 public void clearCaches(final ServerName serverName) {
1388 if (!this.cachedServers.contains(serverName)) {
1389 return;
1390 }
1391
1392 boolean deletedSomething = false;
1393 synchronized (this.cachedServers) {
1394
1395
1396
1397
1398 if (!this.cachedServers.contains(serverName)) {
1399 return;
1400 }
1401 for (Map<byte[], HRegionLocation> tableLocations : cachedRegionLocations.values()) {
1402 for (Entry<byte[], HRegionLocation> e : tableLocations.entrySet()) {
1403 HRegionLocation value = e.getValue();
1404 if (value != null
1405 && serverName.equals(value.getServerName())) {
1406 tableLocations.remove(e.getKey());
1407 deletedSomething = true;
1408 }
1409 }
1410 }
1411 this.cachedServers.remove(serverName);
1412 }
1413 if (deletedSomething && LOG.isDebugEnabled()) {
1414 LOG.debug("Removed all cached region locations that map to " + serverName);
1415 }
1416 }
1417
1418
1419
1420
1421
1422 private ConcurrentSkipListMap<byte[], HRegionLocation> getTableLocations(
1423 final TableName tableName) {
1424
1425 ConcurrentSkipListMap<byte[], HRegionLocation> result;
1426 result = this.cachedRegionLocations.get(tableName);
1427
1428 if (result == null) {
1429 result = new ConcurrentSkipListMap<byte[], HRegionLocation>(Bytes.BYTES_COMPARATOR);
1430 ConcurrentSkipListMap<byte[], HRegionLocation> old =
1431 this.cachedRegionLocations.putIfAbsent(tableName, result);
1432 if (old != null) {
1433 return old;
1434 }
1435 }
1436 return result;
1437 }
1438
1439 @Override
1440 public void clearRegionCache() {
1441 this.cachedRegionLocations.clear();
1442 this.cachedServers.clear();
1443 }
1444
1445 @Override
1446 public void clearRegionCache(final TableName tableName) {
1447 this.cachedRegionLocations.remove(tableName);
1448 }
1449
1450 @Override
1451 public void clearRegionCache(final byte[] tableName) {
1452 clearRegionCache(TableName.valueOf(tableName));
1453 }
1454
1455
1456
1457
1458
1459
1460
1461 private void cacheLocation(final TableName tableName, final HRegionLocation source,
1462 final HRegionLocation location) {
1463 boolean isFromMeta = (source == null);
1464 byte [] startKey = location.getRegionInfo().getStartKey();
1465 ConcurrentMap<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
1466 HRegionLocation oldLocation = tableLocations.putIfAbsent(startKey, location);
1467 boolean isNewCacheEntry = (oldLocation == null);
1468 if (isNewCacheEntry) {
1469 cachedServers.add(location.getServerName());
1470 return;
1471 }
1472 boolean updateCache;
1473
1474 if (oldLocation.equals(source)) {
1475 updateCache = true;
1476 } else {
1477 long newLocationSeqNum = location.getSeqNum();
1478
1479
1480 boolean isStaleMetaRecord = isFromMeta && (oldLocation.getSeqNum() > newLocationSeqNum);
1481
1482
1483
1484
1485
1486 boolean isStaleRedirect = !isFromMeta && (oldLocation.getSeqNum() >= newLocationSeqNum);
1487 boolean isStaleUpdate = (isStaleMetaRecord || isStaleRedirect);
1488 updateCache = (!isStaleUpdate);
1489 }
1490 if (updateCache) {
1491 tableLocations.replace(startKey, oldLocation, location);
1492 cachedServers.add(location.getServerName());
1493 }
1494 }
1495
1496
1497 private final ConcurrentHashMap<String, Object> stubs =
1498 new ConcurrentHashMap<String, Object>();
1499
1500 private final ConcurrentHashMap<String, String> connectionLock =
1501 new ConcurrentHashMap<String, String>();
1502
1503
1504
1505
1506 static class MasterServiceState {
1507 HConnection connection;
1508 MasterService.BlockingInterface stub;
1509 int userCount;
1510 long keepAliveUntil = Long.MAX_VALUE;
1511
1512 MasterServiceState (final HConnection connection) {
1513 super();
1514 this.connection = connection;
1515 }
1516
1517 @Override
1518 public String toString() {
1519 return "MasterService";
1520 }
1521
1522 Object getStub() {
1523 return this.stub;
1524 }
1525
1526 void clearStub() {
1527 this.stub = null;
1528 }
1529
1530 boolean isMasterRunning() throws ServiceException {
1531 IsMasterRunningResponse response =
1532 this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1533 return response != null? response.getIsMasterRunning(): false;
1534 }
1535 }
1536
1537
1538
1539
1540
1541
1542 abstract class StubMaker {
1543
1544
1545
1546 protected abstract String getServiceName();
1547
1548
1549
1550
1551
1552 protected abstract Object makeStub(final BlockingRpcChannel channel);
1553
1554
1555
1556
1557
1558 protected abstract void isMasterRunning() throws ServiceException;
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568 private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
1569 ZooKeeperKeepAliveConnection zkw;
1570 try {
1571 zkw = getKeepAliveZooKeeperWatcher();
1572 } catch (IOException e) {
1573 ExceptionUtil.rethrowIfInterrupt(e);
1574 throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
1575 }
1576 try {
1577 checkIfBaseNodeAvailable(zkw);
1578 ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
1579 if (sn == null) {
1580 String msg = "ZooKeeper available but no active master location found";
1581 LOG.info(msg);
1582 throw new MasterNotRunningException(msg);
1583 }
1584 if (isDeadServer(sn)) {
1585 throw new MasterNotRunningException(sn + " is dead.");
1586 }
1587
1588 String key = getStubKey(getServiceName(), sn.getHostAndPort());
1589 connectionLock.putIfAbsent(key, key);
1590 Object stub = null;
1591 synchronized (connectionLock.get(key)) {
1592 stub = stubs.get(key);
1593 if (stub == null) {
1594 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn,
1595 user, rpcTimeout);
1596 stub = makeStub(channel);
1597 isMasterRunning();
1598 stubs.put(key, stub);
1599 }
1600 }
1601 return stub;
1602 } finally {
1603 zkw.close();
1604 }
1605 }
1606
1607
1608
1609
1610
1611
1612 @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="SWL_SLEEP_WITH_LOCK_HELD")
1613 Object makeStub() throws MasterNotRunningException {
1614
1615
1616 synchronized (masterAndZKLock) {
1617 Exception exceptionCaught = null;
1618 Object stub = null;
1619 int tries = 0;
1620 while (!closed && stub == null) {
1621 tries++;
1622 try {
1623 stub = makeStubNoRetries();
1624 } catch (IOException e) {
1625 exceptionCaught = e;
1626 } catch (KeeperException e) {
1627 exceptionCaught = e;
1628 } catch (ServiceException e) {
1629 exceptionCaught = e;
1630 }
1631
1632 if (exceptionCaught != null)
1633
1634 if (tries < numTries && !ExceptionUtil.isInterrupt(exceptionCaught)) {
1635
1636 long pauseTime = ConnectionUtils.getPauseTime(pause, tries - 1);
1637 LOG.info("getMaster attempt " + tries + " of " + numTries +
1638 " failed; retrying after sleep of " + pauseTime + ", exception=" +
1639 exceptionCaught);
1640
1641 try {
1642 Thread.sleep(pauseTime);
1643 } catch (InterruptedException e) {
1644 throw new MasterNotRunningException(
1645 "Thread was interrupted while trying to connect to master.", e);
1646 }
1647 } else {
1648
1649 LOG.info("getMaster attempt " + tries + " of " + numTries +
1650 " failed; no more retrying.", exceptionCaught);
1651 throw new MasterNotRunningException(exceptionCaught);
1652 }
1653 }
1654
1655 if (stub == null) {
1656
1657 throw new MasterNotRunningException("Connection was closed while trying to get master");
1658 }
1659 return stub;
1660 }
1661 }
1662 }
1663
1664
1665
1666
1667 class MasterServiceStubMaker extends StubMaker {
1668 private MasterService.BlockingInterface stub;
1669 @Override
1670 protected String getServiceName() {
1671 return MasterService.getDescriptor().getName();
1672 }
1673
1674 @Override
1675 @edu.umd.cs.findbugs.annotations.SuppressWarnings("SWL_SLEEP_WITH_LOCK_HELD")
1676 MasterService.BlockingInterface makeStub() throws MasterNotRunningException {
1677 return (MasterService.BlockingInterface)super.makeStub();
1678 }
1679
1680 @Override
1681 protected Object makeStub(BlockingRpcChannel channel) {
1682 this.stub = MasterService.newBlockingStub(channel);
1683 return this.stub;
1684 }
1685
1686 @Override
1687 protected void isMasterRunning() throws ServiceException {
1688 this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1689 }
1690 }
1691
1692 @Override
1693 public AdminService.BlockingInterface getAdmin(final ServerName serverName)
1694 throws IOException {
1695 return getAdmin(serverName, false);
1696 }
1697
1698 @Override
1699
1700 public AdminService.BlockingInterface getAdmin(final ServerName serverName,
1701 final boolean master)
1702 throws IOException {
1703 if (isDeadServer(serverName)) {
1704 throw new RegionServerStoppedException(serverName + " is dead.");
1705 }
1706 String key = getStubKey(AdminService.BlockingInterface.class.getName(),
1707 serverName.getHostAndPort());
1708 this.connectionLock.putIfAbsent(key, key);
1709 AdminService.BlockingInterface stub = null;
1710 synchronized (this.connectionLock.get(key)) {
1711 stub = (AdminService.BlockingInterface)this.stubs.get(key);
1712 if (stub == null) {
1713 BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName,
1714 user, this.rpcTimeout);
1715 stub = AdminService.newBlockingStub(channel);
1716 this.stubs.put(key, stub);
1717 }
1718 }
1719 return stub;
1720 }
1721
1722 @Override
1723 public ClientService.BlockingInterface getClient(final ServerName sn)
1724 throws IOException {
1725 if (isDeadServer(sn)) {
1726 throw new RegionServerStoppedException(sn + " is dead.");
1727 }
1728 String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostAndPort());
1729 this.connectionLock.putIfAbsent(key, key);
1730 ClientService.BlockingInterface stub = null;
1731 synchronized (this.connectionLock.get(key)) {
1732 stub = (ClientService.BlockingInterface)this.stubs.get(key);
1733 if (stub == null) {
1734 BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
1735 user, this.rpcTimeout);
1736 stub = ClientService.newBlockingStub(channel);
1737
1738
1739 this.stubs.put(key, stub);
1740 }
1741 }
1742 return stub;
1743 }
1744
1745 static String getStubKey(final String serviceName, final String rsHostnamePort) {
1746 return serviceName + "@" + rsHostnamePort;
1747 }
1748
1749 private ZooKeeperKeepAliveConnection keepAliveZookeeper;
1750 private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);
1751 private boolean canCloseZKW = true;
1752
1753
1754 private static final long keepAlive = 5 * 60 * 1000;
1755
1756
1757
1758
1759
1760 ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
1761 throws IOException {
1762 synchronized (masterAndZKLock) {
1763 if (keepAliveZookeeper == null) {
1764 if (this.closed) {
1765 throw new IOException(toString() + " closed");
1766 }
1767
1768
1769 keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
1770 }
1771 keepAliveZookeeperUserCount.incrementAndGet();
1772 keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
1773 return keepAliveZookeeper;
1774 }
1775 }
1776
1777 void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) {
1778 if (zkw == null){
1779 return;
1780 }
1781 synchronized (masterAndZKLock) {
1782 if (keepAliveZookeeperUserCount.decrementAndGet() <= 0 ){
1783 keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive;
1784 }
1785 }
1786 }
1787
1788
1789
1790
1791
1792
1793
1794
1795 private static class DelayedClosing extends Chore implements Stoppable {
1796 private HConnectionImplementation hci;
1797 Stoppable stoppable;
1798
1799 private DelayedClosing(
1800 HConnectionImplementation hci, Stoppable stoppable){
1801 super(
1802 "ZooKeeperWatcher and Master delayed closing for connection "+hci,
1803 60*1000,
1804 stoppable);
1805 this.hci = hci;
1806 this.stoppable = stoppable;
1807 }
1808
1809 static DelayedClosing createAndStart(HConnectionImplementation hci){
1810 Stoppable stoppable = new Stoppable() {
1811 private volatile boolean isStopped = false;
1812 @Override public void stop(String why) { isStopped = true;}
1813 @Override public boolean isStopped() {return isStopped;}
1814 };
1815
1816 return new DelayedClosing(hci, stoppable);
1817 }
1818
1819 protected void closeMasterProtocol(MasterServiceState protocolState) {
1820 if (System.currentTimeMillis() > protocolState.keepAliveUntil) {
1821 hci.closeMasterService(protocolState);
1822 protocolState.keepAliveUntil = Long.MAX_VALUE;
1823 }
1824 }
1825
1826 @Override
1827 protected void chore() {
1828 synchronized (hci.masterAndZKLock) {
1829 if (hci.canCloseZKW) {
1830 if (System.currentTimeMillis() >
1831 hci.keepZooKeeperWatcherAliveUntil) {
1832
1833 hci.closeZooKeeperWatcher();
1834 hci.keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
1835 }
1836 }
1837 closeMasterProtocol(hci.masterServiceState);
1838 closeMasterProtocol(hci.masterServiceState);
1839 }
1840 }
1841
1842 @Override
1843 public void stop(String why) {
1844 stoppable.stop(why);
1845 }
1846
1847 @Override
1848 public boolean isStopped() {
1849 return stoppable.isStopped();
1850 }
1851 }
1852
1853 private void closeZooKeeperWatcher() {
1854 synchronized (masterAndZKLock) {
1855 if (keepAliveZookeeper != null) {
1856 LOG.info("Closing zookeeper sessionid=0x" +
1857 Long.toHexString(
1858 keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
1859 keepAliveZookeeper.internalClose();
1860 keepAliveZookeeper = null;
1861 }
1862 keepAliveZookeeperUserCount.set(0);
1863 }
1864 }
1865
1866 final MasterServiceState masterServiceState = new MasterServiceState(this);
1867
1868 @Override
1869 public MasterService.BlockingInterface getMaster() throws MasterNotRunningException {
1870 return getKeepAliveMasterService();
1871 }
1872
1873 private void resetMasterServiceState(final MasterServiceState mss) {
1874 mss.userCount++;
1875 mss.keepAliveUntil = Long.MAX_VALUE;
1876 }
1877
1878 @Override
1879 public MasterKeepAliveConnection getKeepAliveMasterService()
1880 throws MasterNotRunningException {
1881 synchronized (masterAndZKLock) {
1882 if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1883 MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1884 this.masterServiceState.stub = stubMaker.makeStub();
1885 }
1886 resetMasterServiceState(this.masterServiceState);
1887 }
1888
1889 final MasterService.BlockingInterface stub = this.masterServiceState.stub;
1890 return new MasterKeepAliveConnection() {
1891 MasterServiceState mss = masterServiceState;
1892 @Override
1893 public AddColumnResponse addColumn(RpcController controller, AddColumnRequest request)
1894 throws ServiceException {
1895 return stub.addColumn(controller, request);
1896 }
1897
1898 @Override
1899 public DeleteColumnResponse deleteColumn(RpcController controller,
1900 DeleteColumnRequest request)
1901 throws ServiceException {
1902 return stub.deleteColumn(controller, request);
1903 }
1904
1905 @Override
1906 public ModifyColumnResponse modifyColumn(RpcController controller,
1907 ModifyColumnRequest request)
1908 throws ServiceException {
1909 return stub.modifyColumn(controller, request);
1910 }
1911
1912 @Override
1913 public MoveRegionResponse moveRegion(RpcController controller,
1914 MoveRegionRequest request) throws ServiceException {
1915 return stub.moveRegion(controller, request);
1916 }
1917
1918 @Override
1919 public DispatchMergingRegionsResponse dispatchMergingRegions(
1920 RpcController controller, DispatchMergingRegionsRequest request)
1921 throws ServiceException {
1922 return stub.dispatchMergingRegions(controller, request);
1923 }
1924
1925 @Override
1926 public AssignRegionResponse assignRegion(RpcController controller,
1927 AssignRegionRequest request) throws ServiceException {
1928 return stub.assignRegion(controller, request);
1929 }
1930
1931 @Override
1932 public UnassignRegionResponse unassignRegion(RpcController controller,
1933 UnassignRegionRequest request) throws ServiceException {
1934 return stub.unassignRegion(controller, request);
1935 }
1936
1937 @Override
1938 public OfflineRegionResponse offlineRegion(RpcController controller,
1939 OfflineRegionRequest request) throws ServiceException {
1940 return stub.offlineRegion(controller, request);
1941 }
1942
1943 @Override
1944 public DeleteTableResponse deleteTable(RpcController controller,
1945 DeleteTableRequest request) throws ServiceException {
1946 return stub.deleteTable(controller, request);
1947 }
1948
1949 @Override
1950 public EnableTableResponse enableTable(RpcController controller,
1951 EnableTableRequest request) throws ServiceException {
1952 return stub.enableTable(controller, request);
1953 }
1954
1955 @Override
1956 public DisableTableResponse disableTable(RpcController controller,
1957 DisableTableRequest request) throws ServiceException {
1958 return stub.disableTable(controller, request);
1959 }
1960
1961 @Override
1962 public ModifyTableResponse modifyTable(RpcController controller,
1963 ModifyTableRequest request) throws ServiceException {
1964 return stub.modifyTable(controller, request);
1965 }
1966
1967 @Override
1968 public CreateTableResponse createTable(RpcController controller,
1969 CreateTableRequest request) throws ServiceException {
1970 return stub.createTable(controller, request);
1971 }
1972
1973 @Override
1974 public ShutdownResponse shutdown(RpcController controller,
1975 ShutdownRequest request) throws ServiceException {
1976 return stub.shutdown(controller, request);
1977 }
1978
1979 @Override
1980 public StopMasterResponse stopMaster(RpcController controller,
1981 StopMasterRequest request) throws ServiceException {
1982 return stub.stopMaster(controller, request);
1983 }
1984
1985 @Override
1986 public BalanceResponse balance(RpcController controller,
1987 BalanceRequest request) throws ServiceException {
1988 return stub.balance(controller, request);
1989 }
1990
1991 @Override
1992 public SetBalancerRunningResponse setBalancerRunning(
1993 RpcController controller, SetBalancerRunningRequest request)
1994 throws ServiceException {
1995 return stub.setBalancerRunning(controller, request);
1996 }
1997
1998 @Override
1999 public RunCatalogScanResponse runCatalogScan(RpcController controller,
2000 RunCatalogScanRequest request) throws ServiceException {
2001 return stub.runCatalogScan(controller, request);
2002 }
2003
2004 @Override
2005 public EnableCatalogJanitorResponse enableCatalogJanitor(
2006 RpcController controller, EnableCatalogJanitorRequest request)
2007 throws ServiceException {
2008 return stub.enableCatalogJanitor(controller, request);
2009 }
2010
2011 @Override
2012 public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
2013 RpcController controller, IsCatalogJanitorEnabledRequest request)
2014 throws ServiceException {
2015 return stub.isCatalogJanitorEnabled(controller, request);
2016 }
2017
2018 @Override
2019 public CoprocessorServiceResponse execMasterService(
2020 RpcController controller, CoprocessorServiceRequest request)
2021 throws ServiceException {
2022 return stub.execMasterService(controller, request);
2023 }
2024
2025 @Override
2026 public SnapshotResponse snapshot(RpcController controller,
2027 SnapshotRequest request) throws ServiceException {
2028 return stub.snapshot(controller, request);
2029 }
2030
2031 @Override
2032 public GetCompletedSnapshotsResponse getCompletedSnapshots(
2033 RpcController controller, GetCompletedSnapshotsRequest request)
2034 throws ServiceException {
2035 return stub.getCompletedSnapshots(controller, request);
2036 }
2037
2038 @Override
2039 public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
2040 DeleteSnapshotRequest request) throws ServiceException {
2041 return stub.deleteSnapshot(controller, request);
2042 }
2043
2044 @Override
2045 public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
2046 IsSnapshotDoneRequest request) throws ServiceException {
2047 return stub.isSnapshotDone(controller, request);
2048 }
2049
2050 @Override
2051 public RestoreSnapshotResponse restoreSnapshot(
2052 RpcController controller, RestoreSnapshotRequest request)
2053 throws ServiceException {
2054 return stub.restoreSnapshot(controller, request);
2055 }
2056
2057 @Override
2058 public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
2059 RpcController controller, IsRestoreSnapshotDoneRequest request)
2060 throws ServiceException {
2061 return stub.isRestoreSnapshotDone(controller, request);
2062 }
2063
2064 @Override
2065 public ExecProcedureResponse execProcedure(
2066 RpcController controller, ExecProcedureRequest request)
2067 throws ServiceException {
2068 return stub.execProcedure(controller, request);
2069 }
2070
2071 @Override
2072 public IsProcedureDoneResponse isProcedureDone(RpcController controller,
2073 IsProcedureDoneRequest request) throws ServiceException {
2074 return stub.isProcedureDone(controller, request);
2075 }
2076
2077 @Override
2078 public IsMasterRunningResponse isMasterRunning(
2079 RpcController controller, IsMasterRunningRequest request)
2080 throws ServiceException {
2081 return stub.isMasterRunning(controller, request);
2082 }
2083
2084 @Override
2085 public ModifyNamespaceResponse modifyNamespace(RpcController controller,
2086 ModifyNamespaceRequest request)
2087 throws ServiceException {
2088 return stub.modifyNamespace(controller, request);
2089 }
2090
2091 @Override
2092 public CreateNamespaceResponse createNamespace(RpcController controller, CreateNamespaceRequest request) throws ServiceException {
2093 return stub.createNamespace(controller, request);
2094 }
2095
2096 @Override
2097 public DeleteNamespaceResponse deleteNamespace(RpcController controller, DeleteNamespaceRequest request) throws ServiceException {
2098 return stub.deleteNamespace(controller, request);
2099 }
2100
2101 @Override
2102 public GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController controller, GetNamespaceDescriptorRequest request) throws ServiceException {
2103 return stub.getNamespaceDescriptor(controller, request);
2104 }
2105
2106 @Override
2107 public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController controller, ListNamespaceDescriptorsRequest request) throws ServiceException {
2108 return stub.listNamespaceDescriptors(controller, request);
2109 }
2110
2111 @Override
2112 public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(RpcController controller, ListTableDescriptorsByNamespaceRequest request) throws ServiceException {
2113 return stub.listTableDescriptorsByNamespace(controller, request);
2114 }
2115
2116 @Override
2117 public ListTableNamesByNamespaceResponse listTableNamesByNamespace(RpcController controller,
2118 ListTableNamesByNamespaceRequest request) throws ServiceException {
2119 return stub.listTableNamesByNamespace(controller, request);
2120 }
2121
2122 @Override
2123 public void close() {
2124 release(this.mss);
2125 }
2126
2127 @Override
2128 public GetSchemaAlterStatusResponse getSchemaAlterStatus(
2129 RpcController controller, GetSchemaAlterStatusRequest request)
2130 throws ServiceException {
2131 return stub.getSchemaAlterStatus(controller, request);
2132 }
2133
2134 @Override
2135 public GetTableDescriptorsResponse getTableDescriptors(
2136 RpcController controller, GetTableDescriptorsRequest request)
2137 throws ServiceException {
2138 return stub.getTableDescriptors(controller, request);
2139 }
2140
2141 @Override
2142 public GetTableNamesResponse getTableNames(
2143 RpcController controller, GetTableNamesRequest request)
2144 throws ServiceException {
2145 return stub.getTableNames(controller, request);
2146 }
2147
2148 @Override
2149 public GetClusterStatusResponse getClusterStatus(
2150 RpcController controller, GetClusterStatusRequest request)
2151 throws ServiceException {
2152 return stub.getClusterStatus(controller, request);
2153 }
2154 };
2155 }
2156
2157
2158 private static void release(MasterServiceState mss) {
2159 if (mss != null && mss.connection != null) {
2160 ((HConnectionImplementation)mss.connection).releaseMaster(mss);
2161 }
2162 }
2163
2164 private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
2165 if (mss.getStub() == null){
2166 return false;
2167 }
2168 try {
2169 return mss.isMasterRunning();
2170 } catch (UndeclaredThrowableException e) {
2171
2172
2173 LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
2174 return false;
2175 } catch (ServiceException se) {
2176 LOG.warn("Checking master connection", se);
2177 return false;
2178 }
2179 }
2180
2181 void releaseMaster(MasterServiceState mss) {
2182 if (mss.getStub() == null) return;
2183 synchronized (masterAndZKLock) {
2184 --mss.userCount;
2185 if (mss.userCount <= 0) {
2186 mss.keepAliveUntil = System.currentTimeMillis() + keepAlive;
2187 }
2188 }
2189 }
2190
2191 private void closeMasterService(MasterServiceState mss) {
2192 if (mss.getStub() != null) {
2193 LOG.info("Closing master protocol: " + mss);
2194 mss.clearStub();
2195 }
2196 mss.userCount = 0;
2197 }
2198
2199
2200
2201
2202
2203 private void closeMaster() {
2204 synchronized (masterAndZKLock) {
2205 closeMasterService(masterServiceState);
2206 }
2207 }
2208
2209 void updateCachedLocation(HRegionInfo hri, HRegionLocation source,
2210 ServerName serverName, long seqNum) {
2211 HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
2212 cacheLocation(hri.getTable(), source, newHrl);
2213 }
2214
2215
2216
2217
2218
2219
2220 void deleteCachedLocation(HRegionInfo hri, HRegionLocation source) {
2221 ConcurrentMap<byte[], HRegionLocation> tableLocations = getTableLocations(hri.getTable());
2222 tableLocations.remove(hri.getStartKey(), source);
2223 }
2224
2225 @Override
2226 public void deleteCachedRegionLocation(final HRegionLocation location) {
2227 if (location == null) {
2228 return;
2229 }
2230
2231 HRegionLocation removedLocation;
2232 TableName tableName = location.getRegionInfo().getTable();
2233 Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
2234 removedLocation = tableLocations.remove(location.getRegionInfo().getStartKey());
2235 if (LOG.isDebugEnabled() && removedLocation != null) {
2236 LOG.debug("Removed " +
2237 location.getRegionInfo().getRegionNameAsString() +
2238 " for tableName=" + tableName +
2239 " from cache");
2240 }
2241 }
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251 @Override
2252 public void updateCachedLocations(final TableName tableName, byte[] rowkey,
2253 final Object exception, final HRegionLocation source) {
2254 if (rowkey == null || tableName == null) {
2255 LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
2256 ", tableName=" + (tableName == null ? "null" : tableName));
2257 return;
2258 }
2259
2260 if (source == null || source.getServerName() == null){
2261
2262 return;
2263 }
2264
2265
2266 final HRegionLocation oldLocation = getCachedLocation(tableName, rowkey);
2267 if (oldLocation == null || !source.getServerName().equals(oldLocation.getServerName())) {
2268
2269
2270 return;
2271 }
2272
2273 HRegionInfo regionInfo = oldLocation.getRegionInfo();
2274 Throwable cause = findException(exception);
2275 if (cause != null) {
2276 if (cause instanceof RegionTooBusyException || cause instanceof RegionOpeningException) {
2277
2278 return;
2279 }
2280
2281 if (cause instanceof RegionMovedException) {
2282 RegionMovedException rme = (RegionMovedException) cause;
2283 if (LOG.isTraceEnabled()) {
2284 LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
2285 rme.getHostname() + ":" + rme.getPort() +
2286 " according to " + source.getHostnamePort());
2287 }
2288
2289
2290 updateCachedLocation(
2291 regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
2292 return;
2293 }
2294 }
2295
2296
2297
2298 deleteCachedLocation(regionInfo, source);
2299 }
2300
2301 @Override
2302 public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
2303 final Object exception, final HRegionLocation source) {
2304 updateCachedLocations(TableName.valueOf(tableName), rowkey, exception, source);
2305 }
2306
2307 @Override
2308 @Deprecated
2309 public void processBatch(List<? extends Row> list,
2310 final TableName tableName,
2311 ExecutorService pool,
2312 Object[] results) throws IOException, InterruptedException {
2313
2314
2315
2316 if (results.length != list.size()) {
2317 throw new IllegalArgumentException(
2318 "argument results must be the same size as argument list");
2319 }
2320 processBatchCallback(list, tableName, pool, results, null);
2321 }
2322
2323 @Override
2324 @Deprecated
2325 public void processBatch(List<? extends Row> list,
2326 final byte[] tableName,
2327 ExecutorService pool,
2328 Object[] results) throws IOException, InterruptedException {
2329 processBatch(list, TableName.valueOf(tableName), pool, results);
2330 }
2331
2332
2333
2334
2335
2336
2337
2338
2339 @Override
2340 @Deprecated
2341 public <R> void processBatchCallback(
2342 List<? extends Row> list,
2343 TableName tableName,
2344 ExecutorService pool,
2345 Object[] results,
2346 Batch.Callback<R> callback)
2347 throws IOException, InterruptedException {
2348
2349
2350
2351 ObjectResultFiller<R> cb = new ObjectResultFiller<R>(results, callback);
2352 AsyncProcess<?> asyncProcess = createAsyncProcess(tableName, pool, cb, conf);
2353
2354
2355 asyncProcess.submitAll(list);
2356 asyncProcess.waitUntilDone();
2357
2358 if (asyncProcess.hasError()) {
2359 throw asyncProcess.getErrors();
2360 }
2361 }
2362
2363 @Override
2364 @Deprecated
2365 public <R> void processBatchCallback(
2366 List<? extends Row> list,
2367 byte[] tableName,
2368 ExecutorService pool,
2369 Object[] results,
2370 Batch.Callback<R> callback)
2371 throws IOException, InterruptedException {
2372 processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback);
2373 }
2374
2375
2376 protected <R> AsyncProcess createAsyncProcess(TableName tableName, ExecutorService pool,
2377 AsyncProcess.AsyncProcessCallback<R> callback, Configuration conf) {
2378 return new AsyncProcess<R>(this, tableName, pool, callback, conf,
2379 RpcRetryingCallerFactory.instantiate(conf), RpcControllerFactory.instantiate(conf));
2380 }
2381
2382
2383
2384
2385
2386 private static class ObjectResultFiller<Res>
2387 implements AsyncProcess.AsyncProcessCallback<Res> {
2388
2389 private final Object[] results;
2390 private Batch.Callback<Res> callback;
2391
2392 ObjectResultFiller(Object[] results, Batch.Callback<Res> callback) {
2393 this.results = results;
2394 this.callback = callback;
2395 }
2396
2397 @Override
2398 public void success(int pos, byte[] region, Row row, Res result) {
2399 assert pos < results.length;
2400 results[pos] = result;
2401 if (callback != null) {
2402 callback.update(region, row.getRow(), result);
2403 }
2404 }
2405
2406 @Override
2407 public boolean failure(int pos, byte[] region, Row row, Throwable t) {
2408 assert pos < results.length;
2409 results[pos] = t;
2410
2411 return true;
2412 }
2413
2414 @Override
2415 public boolean retriableFailure(int originalIndex, Row row, byte[] region,
2416 Throwable exception) {
2417 return true;
2418 }
2419 }
2420
2421
2422
2423
2424
2425
2426 int getNumberOfCachedRegionLocations(final TableName tableName) {
2427 Map<byte[], HRegionLocation> tableLocs = this.cachedRegionLocations.get(tableName);
2428 if (tableLocs == null) {
2429 return 0;
2430 }
2431 return tableLocs.values().size();
2432 }
2433
2434
2435
2436
2437
2438
2439
2440
2441 boolean isRegionCached(TableName tableName, final byte[] row) {
2442 HRegionLocation location = getCachedLocation(tableName, row);
2443 return location != null;
2444 }
2445
2446 @Override
2447 public void setRegionCachePrefetch(final TableName tableName,
2448 final boolean enable) {
2449 if (!enable) {
2450 regionCachePrefetchDisabledTables.add(Bytes.mapKey(tableName.getName()));
2451 }
2452 else {
2453 regionCachePrefetchDisabledTables.remove(Bytes.mapKey(tableName.getName()));
2454 }
2455 }
2456
2457 @Override
2458 public void setRegionCachePrefetch(final byte[] tableName,
2459 final boolean enable) {
2460 setRegionCachePrefetch(TableName.valueOf(tableName), enable);
2461 }
2462
2463 @Override
2464 public boolean getRegionCachePrefetch(TableName tableName) {
2465 return !regionCachePrefetchDisabledTables.contains(Bytes.mapKey(tableName.getName()));
2466 }
2467
2468 @Override
2469 public boolean getRegionCachePrefetch(byte[] tableName) {
2470 return getRegionCachePrefetch(TableName.valueOf(tableName));
2471 }
2472
2473 @Override
2474 public void abort(final String msg, Throwable t) {
2475 if (t instanceof KeeperException.SessionExpiredException
2476 && keepAliveZookeeper != null) {
2477 synchronized (masterAndZKLock) {
2478 if (keepAliveZookeeper != null) {
2479 LOG.warn("This client just lost it's session with ZooKeeper," +
2480 " closing it." +
2481 " It will be recreated next time someone needs it", t);
2482 closeZooKeeperWatcher();
2483 }
2484 }
2485 } else {
2486 if (t != null) {
2487 LOG.fatal(msg, t);
2488 } else {
2489 LOG.fatal(msg);
2490 }
2491 this.aborted = true;
2492 close();
2493 this.closed = true;
2494 }
2495 }
2496
2497 @Override
2498 public boolean isClosed() {
2499 return this.closed;
2500 }
2501
2502 @Override
2503 public boolean isAborted(){
2504 return this.aborted;
2505 }
2506
2507 @Override
2508 public int getCurrentNrHRS() throws IOException {
2509 return this.registry.getCurrentNrHRS();
2510 }
2511
2512
2513
2514
2515 void incCount() {
2516 ++refCount;
2517 }
2518
2519
2520
2521
2522 void decCount() {
2523 if (refCount > 0) {
2524 --refCount;
2525 }
2526 }
2527
2528
2529
2530
2531
2532
2533 boolean isZeroReference() {
2534 return refCount == 0;
2535 }
2536
2537 void internalClose() {
2538 if (this.closed) {
2539 return;
2540 }
2541 delayedClosing.stop("Closing connection");
2542 closeMaster();
2543 shutdownBatchPool();
2544 this.closed = true;
2545 closeZooKeeperWatcher();
2546 this.stubs.clear();
2547 if (clusterStatusListener != null) {
2548 clusterStatusListener.close();
2549 }
2550 if (rpcClient != null) {
2551 rpcClient.stop();
2552 }
2553 }
2554
2555 @Override
2556 public void close() {
2557 if (managed) {
2558 if (aborted) {
2559 HConnectionManager.deleteStaleConnection(this);
2560 } else {
2561 HConnectionManager.deleteConnection(this, false);
2562 }
2563 } else {
2564 internalClose();
2565 }
2566 }
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579 @Override
2580 protected void finalize() throws Throwable {
2581 super.finalize();
2582
2583 refCount = 1;
2584 close();
2585 }
2586
2587 @Override
2588 public HTableDescriptor[] listTables() throws IOException {
2589 MasterKeepAliveConnection master = getKeepAliveMasterService();
2590 try {
2591 GetTableDescriptorsRequest req =
2592 RequestConverter.buildGetTableDescriptorsRequest((List<TableName>)null);
2593 return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2594 } catch (ServiceException se) {
2595 throw ProtobufUtil.getRemoteException(se);
2596 } finally {
2597 master.close();
2598 }
2599 }
2600
2601 @Override
2602 public String[] getTableNames() throws IOException {
2603 TableName[] tableNames = listTableNames();
2604 String result[] = new String[tableNames.length];
2605 for (int i = 0; i < tableNames.length; i++) {
2606 result[i] = tableNames[i].getNameAsString();
2607 }
2608 return result;
2609 }
2610
2611 @Override
2612 public TableName[] listTableNames() throws IOException {
2613 MasterKeepAliveConnection master = getKeepAliveMasterService();
2614 try {
2615 return ProtobufUtil.getTableNameArray(master.getTableNames(null,
2616 GetTableNamesRequest.newBuilder().build())
2617 .getTableNamesList());
2618 } catch (ServiceException se) {
2619 throw ProtobufUtil.getRemoteException(se);
2620 } finally {
2621 master.close();
2622 }
2623 }
2624
2625 @Override
2626 public HTableDescriptor[] getHTableDescriptorsByTableName(
2627 List<TableName> tableNames) throws IOException {
2628 if (tableNames == null || tableNames.isEmpty()) return new HTableDescriptor[0];
2629 MasterKeepAliveConnection master = getKeepAliveMasterService();
2630 try {
2631 GetTableDescriptorsRequest req =
2632 RequestConverter.buildGetTableDescriptorsRequest(tableNames);
2633 return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2634 } catch (ServiceException se) {
2635 throw ProtobufUtil.getRemoteException(se);
2636 } finally {
2637 master.close();
2638 }
2639 }
2640
2641 @Override
2642 public HTableDescriptor[] getHTableDescriptors(
2643 List<String> names) throws IOException {
2644 List<TableName> tableNames = new ArrayList(names.size());
2645 for(String name : names) {
2646 tableNames.add(TableName.valueOf(name));
2647 }
2648
2649 return getHTableDescriptorsByTableName(tableNames);
2650 }
2651
2652 @Override
2653 public NonceGenerator getNonceGenerator() {
2654 return this.nonceGenerator;
2655 }
2656
2657
2658
2659
2660
2661
2662
2663
2664 @Override
2665 public HTableDescriptor getHTableDescriptor(final TableName tableName)
2666 throws IOException {
2667 if (tableName == null) return null;
2668 if (tableName.equals(TableName.META_TABLE_NAME)) {
2669 return HTableDescriptor.META_TABLEDESC;
2670 }
2671 MasterKeepAliveConnection master = getKeepAliveMasterService();
2672 GetTableDescriptorsResponse htds;
2673 try {
2674 GetTableDescriptorsRequest req =
2675 RequestConverter.buildGetTableDescriptorsRequest(tableName);
2676 htds = master.getTableDescriptors(null, req);
2677 } catch (ServiceException se) {
2678 throw ProtobufUtil.getRemoteException(se);
2679 } finally {
2680 master.close();
2681 }
2682 if (!htds.getTableSchemaList().isEmpty()) {
2683 return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
2684 }
2685 throw new TableNotFoundException(tableName.getNameAsString());
2686 }
2687
2688 @Override
2689 public HTableDescriptor getHTableDescriptor(final byte[] tableName)
2690 throws IOException {
2691 return getHTableDescriptor(TableName.valueOf(tableName));
2692 }
2693 }
2694
2695
2696
2697
2698 static class ServerErrorTracker {
2699
2700 private final ConcurrentMap<HRegionLocation, ServerErrors> errorsByServer =
2701 new ConcurrentHashMap<HRegionLocation, ServerErrors>();
2702 private final long canRetryUntil;
2703 private final int maxRetries;
2704 private final String startTrackingTime;
2705
2706 public ServerErrorTracker(long timeout, int maxRetries) {
2707 this.maxRetries = maxRetries;
2708 this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout;
2709 this.startTrackingTime = new Date().toString();
2710 }
2711
2712
2713
2714
2715 boolean canRetryMore(int numRetry) {
2716
2717 return numRetry < maxRetries || (maxRetries > 1 &&
2718 EnvironmentEdgeManager.currentTimeMillis() < this.canRetryUntil);
2719 }
2720
2721
2722
2723
2724
2725
2726
2727
2728 long calculateBackoffTime(HRegionLocation server, long basePause) {
2729 long result;
2730 ServerErrors errorStats = errorsByServer.get(server);
2731 if (errorStats != null) {
2732 result = ConnectionUtils.getPauseTime(basePause, errorStats.retries.get());
2733 } else {
2734 result = 0;
2735 }
2736 return result;
2737 }
2738
2739
2740
2741
2742
2743
2744 void reportServerError(HRegionLocation server) {
2745 ServerErrors errors = errorsByServer.get(server);
2746 if (errors != null) {
2747 errors.addError();
2748 } else {
2749 errors = errorsByServer.putIfAbsent(server, new ServerErrors());
2750 if (errors != null){
2751 errors.addError();
2752 }
2753 }
2754 }
2755
2756 String getStartTrackingTime() {
2757 return startTrackingTime;
2758 }
2759
2760
2761
2762
2763 private static class ServerErrors {
2764 public final AtomicInteger retries = new AtomicInteger(0);
2765
2766 public void addError() {
2767 retries.incrementAndGet();
2768 }
2769 }
2770 }
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780 public static Throwable findException(Object exception) {
2781 if (exception == null || !(exception instanceof Throwable)) {
2782 return null;
2783 }
2784 Throwable cur = (Throwable) exception;
2785 while (cur != null) {
2786 if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
2787 || cur instanceof RegionTooBusyException) {
2788 return cur;
2789 }
2790 if (cur instanceof RemoteException) {
2791 RemoteException re = (RemoteException) cur;
2792 cur = re.unwrapRemoteException(
2793 RegionOpeningException.class, RegionMovedException.class,
2794 RegionTooBusyException.class);
2795 if (cur == null) {
2796 cur = re.unwrapRemoteException();
2797 }
2798
2799
2800
2801 if (cur == re) {
2802 return null;
2803 }
2804 } else {
2805 cur = cur.getCause();
2806 }
2807 }
2808
2809 return null;
2810 }
2811
2812
2813
2814
2815
2816
2817
2818
2819 public static void setServerSideHConnectionRetries(final Configuration c, final String sn,
2820 final Log log) {
2821 int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2822 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2823
2824
2825 int serversideMultiplier = c.getInt("hbase.client.serverside.retries.multiplier", 10);
2826 int retries = hcRetries * serversideMultiplier;
2827 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
2828 log.debug(sn + " HConnection server-to-server retries=" + retries);
2829 }
2830 }