1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase;
19
20 import java.io.File;
21 import java.io.IOException;
22 import java.io.OutputStream;
23 import java.lang.reflect.Field;
24 import java.lang.reflect.Modifier;
25 import java.net.InetAddress;
26 import java.net.ServerSocket;
27 import java.net.Socket;
28 import java.net.UnknownHostException;
29 import java.security.MessageDigest;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.HashSet;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.NavigableSet;
38 import java.util.Random;
39 import java.util.Set;
40 import java.util.TreeSet;
41 import java.util.UUID;
42 import java.util.concurrent.TimeUnit;
43
44 import org.apache.commons.lang.RandomStringUtils;
45 import org.apache.commons.logging.Log;
46 import org.apache.commons.logging.LogFactory;
47 import org.apache.commons.logging.impl.Jdk14Logger;
48 import org.apache.commons.logging.impl.Log4JLogger;
49 import org.apache.hadoop.conf.Configuration;
50 import org.apache.hadoop.fs.FileSystem;
51 import org.apache.hadoop.fs.Path;
52 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
53 import org.apache.hadoop.hbase.Waiter.Predicate;
54 import org.apache.hadoop.hbase.classification.InterfaceAudience;
55 import org.apache.hadoop.hbase.classification.InterfaceStability;
56 import org.apache.hadoop.hbase.client.Admin;
57 import org.apache.hadoop.hbase.client.Connection;
58 import org.apache.hadoop.hbase.client.ConnectionFactory;
59 import org.apache.hadoop.hbase.client.Consistency;
60 import org.apache.hadoop.hbase.client.Delete;
61 import org.apache.hadoop.hbase.client.Durability;
62 import org.apache.hadoop.hbase.client.Get;
63 import org.apache.hadoop.hbase.client.HBaseAdmin;
64 import org.apache.hadoop.hbase.client.HConnection;
65 import org.apache.hadoop.hbase.client.HTable;
66 import org.apache.hadoop.hbase.client.Put;
67 import org.apache.hadoop.hbase.client.RegionLocator;
68 import org.apache.hadoop.hbase.client.Result;
69 import org.apache.hadoop.hbase.client.ResultScanner;
70 import org.apache.hadoop.hbase.client.Scan;
71 import org.apache.hadoop.hbase.client.Table;
72 import org.apache.hadoop.hbase.fs.HFileSystem;
73 import org.apache.hadoop.hbase.io.compress.Compression;
74 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
75 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
76 import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
77 import org.apache.hadoop.hbase.io.hfile.HFile;
78 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
79 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
80 import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
81 import org.apache.hadoop.hbase.master.HMaster;
82 import org.apache.hadoop.hbase.master.AssignmentManager;
83 import org.apache.hadoop.hbase.master.RegionStates;
84 import org.apache.hadoop.hbase.master.ServerManager;
85 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
86 import org.apache.hadoop.hbase.regionserver.BloomType;
87 import org.apache.hadoop.hbase.regionserver.HRegion;
88 import org.apache.hadoop.hbase.regionserver.HRegionServer;
89 import org.apache.hadoop.hbase.regionserver.HStore;
90 import org.apache.hadoop.hbase.regionserver.InternalScanner;
91 import org.apache.hadoop.hbase.regionserver.Region;
92 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
93 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
94 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
95 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
96 import org.apache.hadoop.hbase.security.User;
97 import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
98 import org.apache.hadoop.hbase.tool.Canary;
99 import org.apache.hadoop.hbase.util.Bytes;
100 import org.apache.hadoop.hbase.util.FSTableDescriptors;
101 import org.apache.hadoop.hbase.util.FSUtils;
102 import org.apache.hadoop.hbase.util.JVMClusterUtil;
103 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
104 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
105 import org.apache.hadoop.hbase.util.Pair;
106 import org.apache.hadoop.hbase.util.RegionSplitter;
107 import org.apache.hadoop.hbase.util.RetryCounter;
108 import org.apache.hadoop.hbase.util.Threads;
109 import org.apache.hadoop.hbase.wal.WAL;
110 import org.apache.hadoop.hbase.wal.WALFactory;
111 import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
112 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
113 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
114 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
115 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
116 import org.apache.hadoop.hdfs.DFSClient;
117 import org.apache.hadoop.hdfs.DistributedFileSystem;
118 import org.apache.hadoop.hdfs.MiniDFSCluster;
119 import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
120 import org.apache.hadoop.mapred.JobConf;
121 import org.apache.hadoop.mapred.MiniMRCluster;
122 import org.apache.hadoop.mapred.TaskLog;
123 import org.apache.zookeeper.KeeperException;
124 import org.apache.zookeeper.KeeperException.NodeExistsException;
125 import org.apache.zookeeper.WatchedEvent;
126 import org.apache.zookeeper.ZooKeeper;
127 import org.apache.zookeeper.ZooKeeper.States;
128
129 import static org.junit.Assert.assertEquals;
130 import static org.junit.Assert.assertTrue;
131 import static org.junit.Assert.fail;
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 @InterfaceAudience.Public
148 @InterfaceStability.Evolving
149 @SuppressWarnings("deprecation")
150 public class HBaseTestingUtility extends HBaseCommonTestingUtility {
151 private MiniZooKeeperCluster zkCluster = null;
152
153 public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
154
155
156
157
158 public static final int DEFAULT_REGIONS_PER_SERVER = 3;
159
160
161 public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table";
162 public static final boolean PRESPLIT_TEST_TABLE = true;
163
164 public static final String USE_LOCAL_FILESYSTEM = "hbase.test.local.fileSystem";
165
166
167
168
169 private boolean passedZkCluster = false;
170 private MiniDFSCluster dfsCluster = null;
171
172 private volatile HBaseCluster hbaseCluster = null;
173 private MiniMRCluster mrCluster = null;
174
175
176 private volatile boolean miniClusterRunning;
177
178 private String hadoopLogDir;
179
180
181 private File clusterTestDir = null;
182
183
184
185 private Path dataTestDirOnTestFS = null;
186
187
188
189
190 private volatile Connection connection;
191
192
193
194
195
196
197
198
199 @Deprecated
200 private static final String TEST_DIRECTORY_KEY = "test.build.data";
201
202
203 private static String FS_URI;
204
205
206 private static final Set<Integer> takenRandomPorts = new HashSet<Integer>();
207
208
209 public static final List<Object[]> COMPRESSION_ALGORITHMS_PARAMETERIZED =
210 Arrays.asList(new Object[][] {
211 { Compression.Algorithm.NONE },
212 { Compression.Algorithm.GZ }
213 });
214
215
216 public static final List<Object[]> BOOLEAN_PARAMETERIZED =
217 Arrays.asList(new Object[][] {
218 { new Boolean(false) },
219 { new Boolean(true) }
220 });
221
222
223 public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination() ;
224
225 public static final Compression.Algorithm[] COMPRESSION_ALGORITHMS ={
226 Compression.Algorithm.NONE, Compression.Algorithm.GZ
227 };
228
229
230
231
232
233 private static List<Object[]> bloomAndCompressionCombinations() {
234 List<Object[]> configurations = new ArrayList<Object[]>();
235 for (Compression.Algorithm comprAlgo :
236 HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
237 for (BloomType bloomType : BloomType.values()) {
238 configurations.add(new Object[] { comprAlgo, bloomType });
239 }
240 }
241 return Collections.unmodifiableList(configurations);
242 }
243
244
245
246
247 private static List<Object[]> memStoreTSAndTagsCombination() {
248 List<Object[]> configurations = new ArrayList<Object[]>();
249 configurations.add(new Object[] { false, false });
250 configurations.add(new Object[] { false, true });
251 configurations.add(new Object[] { true, false });
252 configurations.add(new Object[] { true, true });
253 return Collections.unmodifiableList(configurations);
254 }
255
256 public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
257 bloomAndCompressionCombinations();
258
259 public HBaseTestingUtility() {
260 this(HBaseConfiguration.create());
261 }
262
263 public HBaseTestingUtility(Configuration conf) {
264 super(conf);
265
266
267 ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
268 }
269
270
271
272
273
274
275
276 public static HBaseTestingUtility createLocalHTU() {
277 Configuration c = HBaseConfiguration.create();
278 return createLocalHTU(c);
279 }
280
281
282
283
284
285
286
287
288 public static HBaseTestingUtility createLocalHTU(Configuration c) {
289 HBaseTestingUtility htu = new HBaseTestingUtility(c);
290 String dataTestDir = htu.getDataTestDir().toString();
291 htu.getConfiguration().set(HConstants.HBASE_DIR, dataTestDir);
292 LOG.debug("Setting " + HConstants.HBASE_DIR + " to " + dataTestDir);
293 return htu;
294 }
295
296
297
298
299 public static void closeRegion(final Region r) throws IOException {
300 if (r != null) {
301 ((HRegion)r).close();
302 }
303 }
304
305
306
307
308
309
310
311
312
313
314
315
316 @Override
317 public Configuration getConfiguration() {
318 return super.getConfiguration();
319 }
320
321 public void setHBaseCluster(HBaseCluster hbaseCluster) {
322 this.hbaseCluster = hbaseCluster;
323 }
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341 @Override
342 protected Path setupDataTestDir() {
343 Path testPath = super.setupDataTestDir();
344 if (null == testPath) {
345 return null;
346 }
347
348 createSubDirAndSystemProperty(
349 "hadoop.log.dir",
350 testPath, "hadoop-log-dir");
351
352
353
354 createSubDirAndSystemProperty(
355 "hadoop.tmp.dir",
356 testPath, "hadoop-tmp-dir");
357
358
359 createSubDir(
360 "mapreduce.cluster.local.dir",
361 testPath, "mapred-local-dir");
362
363 return testPath;
364 }
365
366 public void setJobWithoutMRCluster() throws IOException {
367 conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
368 conf.setBoolean(HBaseTestingUtility.USE_LOCAL_FILESYSTEM, true);
369 }
370
371 private void createSubDirAndSystemProperty(
372 String propertyName, Path parent, String subDirName){
373
374 String sysValue = System.getProperty(propertyName);
375
376 if (sysValue != null) {
377
378
379 LOG.info("System.getProperty(\""+propertyName+"\") already set to: "+
380 sysValue + " so I do NOT create it in " + parent);
381 String confValue = conf.get(propertyName);
382 if (confValue != null && !confValue.endsWith(sysValue)){
383 LOG.warn(
384 propertyName + " property value differs in configuration and system: "+
385 "Configuration="+confValue+" while System="+sysValue+
386 " Erasing configuration value by system value."
387 );
388 }
389 conf.set(propertyName, sysValue);
390 } else {
391
392 createSubDir(propertyName, parent, subDirName);
393 System.setProperty(propertyName, conf.get(propertyName));
394 }
395 }
396
397
398
399
400
401
402
403 private Path getBaseTestDirOnTestFS() throws IOException {
404 FileSystem fs = getTestFileSystem();
405 return new Path(fs.getWorkingDirectory(), "test-data");
406 }
407
408
409
410
411 public HTableDescriptor getMetaTableDescriptor() {
412 try {
413 return new FSTableDescriptors(conf).get(TableName.META_TABLE_NAME);
414 } catch (IOException e) {
415 throw new RuntimeException("Unable to create META table descriptor", e);
416 }
417 }
418
419
420
421
422
423
424 Path getClusterTestDir() {
425 if (clusterTestDir == null){
426 setupClusterTestDir();
427 }
428 return new Path(clusterTestDir.getAbsolutePath());
429 }
430
431
432
433
434 private void setupClusterTestDir() {
435 if (clusterTestDir != null) {
436 return;
437 }
438
439
440
441 Path testDir = getDataTestDir("dfscluster_" + UUID.randomUUID().toString());
442 clusterTestDir = new File(testDir.toString()).getAbsoluteFile();
443
444 boolean b = deleteOnExit();
445 if (b) clusterTestDir.deleteOnExit();
446 conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
447 LOG.info("Created new mini-cluster data directory: " + clusterTestDir + ", deleteOnExit=" + b);
448 }
449
450
451
452
453
454
455
456 public Path getDataTestDirOnTestFS() throws IOException {
457 if (dataTestDirOnTestFS == null) {
458 setupDataTestDirOnTestFS();
459 }
460
461 return dataTestDirOnTestFS;
462 }
463
464
465
466
467
468
469
470
471 public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
472 return new Path(getDataTestDirOnTestFS(), subdirName);
473 }
474
475
476
477
478
479 private void setupDataTestDirOnTestFS() throws IOException {
480 if (dataTestDirOnTestFS != null) {
481 LOG.warn("Data test on test fs dir already setup in "
482 + dataTestDirOnTestFS.toString());
483 return;
484 }
485 dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
486 }
487
488
489
490
491 private Path getNewDataTestDirOnTestFS() throws IOException {
492
493
494
495
496 FileSystem fs = getTestFileSystem();
497 Path newDataTestDir = null;
498 if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
499 File dataTestDir = new File(getDataTestDir().toString());
500 if (deleteOnExit()) dataTestDir.deleteOnExit();
501 newDataTestDir = new Path(dataTestDir.getAbsolutePath());
502 } else {
503 Path base = getBaseTestDirOnTestFS();
504 String randomStr = UUID.randomUUID().toString();
505 newDataTestDir = new Path(base, randomStr);
506 if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
507 }
508 return newDataTestDir;
509 }
510
511
512
513
514
515
516 public boolean cleanupDataTestDirOnTestFS() throws IOException {
517 boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
518 if (ret)
519 dataTestDirOnTestFS = null;
520 return ret;
521 }
522
523
524
525
526
527
528 public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
529 Path cpath = getDataTestDirOnTestFS(subdirName);
530 return getTestFileSystem().delete(cpath, true);
531 }
532
533
534
535
536
537
538
539
540 public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
541 return startMiniDFSCluster(servers, null);
542 }
543
544
545
546
547
548
549
550
551
552
553
554
555 public MiniDFSCluster startMiniDFSCluster(final String hosts[])
556 throws Exception {
557 if ( hosts != null && hosts.length != 0) {
558 return startMiniDFSCluster(hosts.length, hosts);
559 } else {
560 return startMiniDFSCluster(1, null);
561 }
562 }
563
564
565
566
567
568
569
570
571
572
573 public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[])
574 throws Exception {
575 createDirsAndSetProperties();
576 EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
577
578
579 org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.util.MBeans.class).
580 setLevel(org.apache.log4j.Level.ERROR);
581 org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class).
582 setLevel(org.apache.log4j.Level.ERROR);
583
584
585 this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
586 true, null, null, hosts, null);
587
588
589 setFs();
590
591
592 this.dfsCluster.waitClusterUp();
593
594
595 dataTestDirOnTestFS = null;
596
597 return this.dfsCluster;
598 }
599
600 private void setFs() throws IOException {
601 if(this.dfsCluster == null){
602 LOG.info("Skipping setting fs because dfsCluster is null");
603 return;
604 }
605 FileSystem fs = this.dfsCluster.getFileSystem();
606 FSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
607 if (this.conf.getBoolean(USE_LOCAL_FILESYSTEM, false)) {
608 FSUtils.setFsDefault(this.conf, new Path("file:///"));
609 }
610 }
611
612 public MiniDFSCluster startMiniDFSCluster(int servers, final String racks[], String hosts[])
613 throws Exception {
614 createDirsAndSetProperties();
615 this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
616 true, null, racks, hosts, null);
617
618
619 FileSystem fs = this.dfsCluster.getFileSystem();
620 FSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
621
622
623 this.dfsCluster.waitClusterUp();
624
625
626 dataTestDirOnTestFS = null;
627
628 return this.dfsCluster;
629 }
630
631 public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
632 createDirsAndSetProperties();
633 dfsCluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null,
634 null, null, null);
635 return dfsCluster;
636 }
637
638
639 private void createDirsAndSetProperties() throws IOException {
640 setupClusterTestDir();
641 System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
642 createDirAndSetProperty("cache_data", "test.cache.data");
643 createDirAndSetProperty("hadoop_tmp", "hadoop.tmp.dir");
644 hadoopLogDir = createDirAndSetProperty("hadoop_logs", "hadoop.log.dir");
645 createDirAndSetProperty("mapred_local", "mapreduce.cluster.local.dir");
646 createDirAndSetProperty("mapred_temp", "mapreduce.cluster.temp.dir");
647 enableShortCircuit();
648
649 Path root = getDataTestDirOnTestFS("hadoop");
650 conf.set(MapreduceTestingShim.getMROutputDirProp(),
651 new Path(root, "mapred-output-dir").toString());
652 conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
653 conf.set("mapreduce.jobtracker.staging.root.dir",
654 new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
655 conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
656 }
657
658
659
660
661
662
663
664 public boolean isReadShortCircuitOn(){
665 final String propName = "hbase.tests.use.shortcircuit.reads";
666 String readOnProp = System.getProperty(propName);
667 if (readOnProp != null){
668 return Boolean.parseBoolean(readOnProp);
669 } else {
670 return conf.getBoolean(propName, false);
671 }
672 }
673
674
675
676
677 private void enableShortCircuit() {
678 if (isReadShortCircuitOn()) {
679 String curUser = System.getProperty("user.name");
680 LOG.info("read short circuit is ON for user " + curUser);
681
682 conf.set("dfs.block.local-path-access.user", curUser);
683
684 conf.setBoolean("dfs.client.read.shortcircuit", true);
685
686 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
687 } else {
688 LOG.info("read short circuit is OFF");
689 }
690 }
691
692 private String createDirAndSetProperty(final String relPath, String property) {
693 String path = getDataTestDir(relPath).toString();
694 System.setProperty(property, path);
695 conf.set(property, path);
696 new File(path).mkdirs();
697 LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
698 return path;
699 }
700
701
702
703
704
705
706 public void shutdownMiniDFSCluster() throws IOException {
707 if (this.dfsCluster != null) {
708
709 this.dfsCluster.shutdown();
710 dfsCluster = null;
711 dataTestDirOnTestFS = null;
712 FSUtils.setFsDefault(this.conf, new Path("file:///"));
713 }
714 }
715
716
717
718
719
720
721
722
723 public MiniZooKeeperCluster startMiniZKCluster() throws Exception {
724 return startMiniZKCluster(1);
725 }
726
727
728
729
730
731
732
733
734
735 public MiniZooKeeperCluster startMiniZKCluster(
736 final int zooKeeperServerNum,
737 final int ... clientPortList)
738 throws Exception {
739 setupClusterTestDir();
740 return startMiniZKCluster(clusterTestDir, zooKeeperServerNum, clientPortList);
741 }
742
743 private MiniZooKeeperCluster startMiniZKCluster(final File dir)
744 throws Exception {
745 return startMiniZKCluster(dir, 1, null);
746 }
747
748
749
750
751
752 private MiniZooKeeperCluster startMiniZKCluster(final File dir,
753 final int zooKeeperServerNum,
754 final int [] clientPortList)
755 throws Exception {
756 if (this.zkCluster != null) {
757 throw new IOException("Cluster already running at " + dir);
758 }
759 this.passedZkCluster = false;
760 this.zkCluster = new MiniZooKeeperCluster(this.getConfiguration());
761 final int defPort = this.conf.getInt("test.hbase.zookeeper.property.clientPort", 0);
762 if (defPort > 0){
763
764 this.zkCluster.setDefaultClientPort(defPort);
765 }
766
767 if (clientPortList != null) {
768
769 int clientPortListSize = (clientPortList.length <= zooKeeperServerNum) ?
770 clientPortList.length : zooKeeperServerNum;
771 for (int i=0; i < clientPortListSize; i++) {
772 this.zkCluster.addClientPort(clientPortList[i]);
773 }
774 }
775 int clientPort = this.zkCluster.startup(dir,zooKeeperServerNum);
776 this.conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,
777 Integer.toString(clientPort));
778 return this.zkCluster;
779 }
780
781
782
783
784
785
786
787 public void shutdownMiniZKCluster() throws IOException {
788 if (this.zkCluster != null) {
789 this.zkCluster.shutdown();
790 this.zkCluster = null;
791 }
792 }
793
794
795
796
797
798
799
800 public MiniHBaseCluster startMiniCluster() throws Exception {
801 return startMiniCluster(1, 1);
802 }
803
804
805
806
807
808
809
810
811
812 public MiniHBaseCluster startMiniCluster(final int numSlaves, boolean create)
813 throws Exception {
814 return startMiniCluster(1, numSlaves, create);
815 }
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830 public MiniHBaseCluster startMiniCluster(final int numSlaves)
831 throws Exception {
832 return startMiniCluster(1, numSlaves, false);
833 }
834
835
836
837
838
839
840
841
842 public MiniHBaseCluster startMiniCluster(final int numMasters,
843 final int numSlaves, boolean create)
844 throws Exception {
845 return startMiniCluster(numMasters, numSlaves, null, create);
846 }
847
848
849
850
851
852
853
854 public MiniHBaseCluster startMiniCluster(final int numMasters,
855 final int numSlaves)
856 throws Exception {
857 return startMiniCluster(numMasters, numSlaves, null, false);
858 }
859
860 public MiniHBaseCluster startMiniCluster(final int numMasters,
861 final int numSlaves, final String[] dataNodeHosts, boolean create)
862 throws Exception {
863 return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts,
864 null, null, create);
865 }
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891 public MiniHBaseCluster startMiniCluster(final int numMasters,
892 final int numSlaves, final String[] dataNodeHosts) throws Exception {
893 return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts,
894 null, null);
895 }
896
897
898
899
900
901 public MiniHBaseCluster startMiniCluster(final int numMasters,
902 final int numSlaves, final int numDataNodes) throws Exception {
903 return startMiniCluster(numMasters, numSlaves, numDataNodes, null, null, null);
904 }
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933 public MiniHBaseCluster startMiniCluster(final int numMasters,
934 final int numSlaves, final String[] dataNodeHosts, Class<? extends HMaster> masterClass,
935 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
936 throws Exception {
937 return startMiniCluster(
938 numMasters, numSlaves, numSlaves, dataNodeHosts, masterClass, regionserverClass);
939 }
940
941 public MiniHBaseCluster startMiniCluster(final int numMasters,
942 final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
943 Class<? extends HMaster> masterClass,
944 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
945 throws Exception {
946 return startMiniCluster(numMasters, numSlaves, numDataNodes, dataNodeHosts,
947 masterClass, regionserverClass, false);
948 }
949
950
951
952
953
954
955
956
957 public MiniHBaseCluster startMiniCluster(final int numMasters,
958 final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
959 Class<? extends HMaster> masterClass,
960 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
961 boolean create)
962 throws Exception {
963 if (dataNodeHosts != null && dataNodeHosts.length != 0) {
964 numDataNodes = dataNodeHosts.length;
965 }
966
967 LOG.info("Starting up minicluster with " + numMasters + " master(s) and " +
968 numSlaves + " regionserver(s) and " + numDataNodes + " datanode(s)");
969
970
971 if (miniClusterRunning) {
972 throw new IllegalStateException("A mini-cluster is already running");
973 }
974 miniClusterRunning = true;
975
976 setupClusterTestDir();
977 System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
978
979
980
981 if(this.dfsCluster == null) {
982 dfsCluster = startMiniDFSCluster(numDataNodes, dataNodeHosts);
983 }
984
985
986 if (this.zkCluster == null) {
987 startMiniZKCluster(clusterTestDir);
988 }
989
990
991 return startMiniHBaseCluster(numMasters, numSlaves, masterClass,
992 regionserverClass, create);
993 }
994
995 public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves)
996 throws IOException, InterruptedException{
997 return startMiniHBaseCluster(numMasters, numSlaves, null, null, false);
998 }
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013 public MiniHBaseCluster startMiniHBaseCluster(final int numMasters,
1014 final int numSlaves, Class<? extends HMaster> masterClass,
1015 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
1016 boolean create)
1017 throws IOException, InterruptedException {
1018
1019 createRootDir(create);
1020
1021
1022
1023 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
1024 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, numSlaves);
1025 }
1026 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
1027 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, numSlaves);
1028 }
1029
1030 Configuration c = new Configuration(this.conf);
1031 this.hbaseCluster =
1032 new MiniHBaseCluster(c, numMasters, numSlaves, masterClass, regionserverClass);
1033
1034 Table t = new HTable(c, TableName.META_TABLE_NAME);
1035 ResultScanner s = t.getScanner(new Scan());
1036 while (s.next() != null) {
1037 continue;
1038 }
1039 s.close();
1040 t.close();
1041
1042 getHBaseAdmin();
1043 LOG.info("Minicluster is up");
1044
1045
1046
1047 setHBaseFsTmpDir();
1048
1049 return (MiniHBaseCluster)this.hbaseCluster;
1050 }
1051
1052
1053
1054
1055
1056
1057
1058 public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1059 this.hbaseCluster = new MiniHBaseCluster(this.conf, servers);
1060
1061 Table t = new HTable(new Configuration(this.conf), TableName.META_TABLE_NAME);
1062 ResultScanner s = t.getScanner(new Scan());
1063 while (s.next() != null) {
1064
1065 }
1066 LOG.info("HBase has been restarted");
1067 s.close();
1068 t.close();
1069 }
1070
1071
1072
1073
1074
1075
1076 public MiniHBaseCluster getMiniHBaseCluster() {
1077 if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) {
1078 return (MiniHBaseCluster)this.hbaseCluster;
1079 }
1080 throw new RuntimeException(hbaseCluster + " not an instance of " +
1081 MiniHBaseCluster.class.getName());
1082 }
1083
1084
1085
1086
1087
1088
1089 public void shutdownMiniCluster() throws Exception {
1090 LOG.info("Shutting down minicluster");
1091 if (this.connection != null && !this.connection.isClosed()) {
1092 this.connection.close();
1093 this.connection = null;
1094 }
1095 shutdownMiniHBaseCluster();
1096 if (!this.passedZkCluster){
1097 shutdownMiniZKCluster();
1098 }
1099 shutdownMiniDFSCluster();
1100
1101 cleanupTestDir();
1102 miniClusterRunning = false;
1103 LOG.info("Minicluster is down");
1104 }
1105
1106
1107
1108
1109
1110 @Override
1111 public boolean cleanupTestDir() throws IOException {
1112 boolean ret = super.cleanupTestDir();
1113 if (deleteDir(this.clusterTestDir)) {
1114 this.clusterTestDir = null;
1115 return ret & true;
1116 }
1117 return false;
1118 }
1119
1120
1121
1122
1123
1124 public void shutdownMiniHBaseCluster() throws IOException {
1125 if (hbaseAdmin != null) {
1126 hbaseAdmin.close0();
1127 hbaseAdmin = null;
1128 }
1129
1130
1131 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1132 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1133 if (this.hbaseCluster != null) {
1134 this.hbaseCluster.shutdown();
1135
1136 this.hbaseCluster.waitUntilShutDown();
1137 this.hbaseCluster = null;
1138 }
1139
1140 if (zooKeeperWatcher != null) {
1141 zooKeeperWatcher.close();
1142 zooKeeperWatcher = null;
1143 }
1144 }
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154 public Path getDefaultRootDirPath(boolean create) throws IOException {
1155 if (!create) {
1156 return getDataTestDirOnTestFS();
1157 } else {
1158 return getNewDataTestDirOnTestFS();
1159 }
1160 }
1161
1162
1163
1164
1165
1166
1167
1168
1169 public Path getDefaultRootDirPath() throws IOException {
1170 return getDefaultRootDirPath(false);
1171 }
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185 public Path createRootDir(boolean create) throws IOException {
1186 FileSystem fs = FileSystem.get(this.conf);
1187 Path hbaseRootdir = getDefaultRootDirPath(create);
1188 FSUtils.setRootDir(this.conf, hbaseRootdir);
1189 fs.mkdirs(hbaseRootdir);
1190 FSUtils.setVersion(fs, hbaseRootdir);
1191 return hbaseRootdir;
1192 }
1193
1194
1195
1196
1197
1198
1199
1200 public Path createRootDir() throws IOException {
1201 return createRootDir(false);
1202 }
1203
1204
1205 private void setHBaseFsTmpDir() throws IOException {
1206 String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1207 if (hbaseFsTmpDirInString == null) {
1208 this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
1209 LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1210 } else {
1211 LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1212 }
1213 }
1214
1215
1216
1217
1218
1219 public void flush() throws IOException {
1220 getMiniHBaseCluster().flushcache();
1221 }
1222
1223
1224
1225
1226
1227 public void flush(TableName tableName) throws IOException {
1228 getMiniHBaseCluster().flushcache(tableName);
1229 }
1230
1231
1232
1233
1234
1235 public void compact(boolean major) throws IOException {
1236 getMiniHBaseCluster().compact(major);
1237 }
1238
1239
1240
1241
1242
1243 public void compact(TableName tableName, boolean major) throws IOException {
1244 getMiniHBaseCluster().compact(tableName, major);
1245 }
1246
1247
1248
1249
1250
1251
1252
1253
1254 public Table createTable(TableName tableName, String family)
1255 throws IOException{
1256 return createTable(tableName, new String[]{family});
1257 }
1258
1259
1260
1261
1262
1263
1264
1265
1266 public HTable createTable(byte[] tableName, byte[] family)
1267 throws IOException{
1268 return createTable(TableName.valueOf(tableName), new byte[][]{family});
1269 }
1270
1271
1272
1273
1274
1275
1276
1277
1278 public Table createTable(TableName tableName, String[] families)
1279 throws IOException {
1280 List<byte[]> fams = new ArrayList<byte[]>(families.length);
1281 for (String family : families) {
1282 fams.add(Bytes.toBytes(family));
1283 }
1284 return createTable(tableName, fams.toArray(new byte[0][]));
1285 }
1286
1287
1288
1289
1290
1291
1292
1293
1294 public HTable createTable(TableName tableName, byte[] family)
1295 throws IOException{
1296 return createTable(tableName, new byte[][]{family});
1297 }
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307 public HTable createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1308 throws IOException {
1309 if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1310 byte[] startKey = Bytes.toBytes("aaaaa");
1311 byte[] endKey = Bytes.toBytes("zzzzz");
1312 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1313
1314 return createTable(tableName, new byte[][] { family }, splitKeys);
1315 }
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325 public HTable createTable(byte[] tableName, byte[][] families)
1326 throws IOException {
1327 return createTable(tableName, families,
1328 new Configuration(getConfiguration()));
1329 }
1330
1331
1332
1333
1334
1335
1336
1337
1338 public HTable createTable(TableName tableName, byte[][] families)
1339 throws IOException {
1340 return createTable(tableName, families, (byte[][]) null);
1341 }
1342
1343
1344
1345
1346
1347
1348
1349
1350 public HTable createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1351 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1352 }
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362 public HTable createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1363 throws IOException {
1364 return createTable(tableName, families, splitKeys, new Configuration(getConfiguration()));
1365 }
1366
1367 public HTable createTable(byte[] tableName, byte[][] families,
1368 int numVersions, byte[] startKey, byte[] endKey, int numRegions) throws IOException {
1369 return createTable(TableName.valueOf(tableName), families, numVersions,
1370 startKey, endKey, numRegions);
1371 }
1372
1373 public HTable createTable(String tableName, byte[][] families,
1374 int numVersions, byte[] startKey, byte[] endKey, int numRegions) throws IOException {
1375 return createTable(TableName.valueOf(tableName), families, numVersions,
1376 startKey, endKey, numRegions);
1377 }
1378
1379 public HTable createTable(TableName tableName, byte[][] families,
1380 int numVersions, byte[] startKey, byte[] endKey, int numRegions)
1381 throws IOException{
1382 HTableDescriptor desc = new HTableDescriptor(tableName);
1383 for (byte[] family : families) {
1384 HColumnDescriptor hcd = new HColumnDescriptor(family)
1385 .setMaxVersions(numVersions);
1386 desc.addFamily(hcd);
1387 }
1388 getHBaseAdmin().createTable(desc, startKey, endKey, numRegions);
1389
1390 waitUntilAllRegionsAssigned(tableName);
1391 return new HTable(getConfiguration(), tableName);
1392 }
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402 public HTable createTable(HTableDescriptor htd, byte[][] families, Configuration c)
1403 throws IOException {
1404 return createTable(htd, families, (byte[][]) null, c);
1405 }
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416 public HTable createTable(HTableDescriptor htd, byte[][] families, byte[][] splitKeys,
1417 Configuration c) throws IOException {
1418 for (byte[] family : families) {
1419 HColumnDescriptor hcd = new HColumnDescriptor(family);
1420
1421
1422
1423 hcd.setBloomFilterType(BloomType.NONE);
1424 htd.addFamily(hcd);
1425 }
1426 getHBaseAdmin().createTable(htd, splitKeys);
1427
1428
1429 waitUntilAllRegionsAssigned(htd.getTableName());
1430 return (HTable) getConnection().getTable(htd.getTableName());
1431 }
1432
1433
1434
1435
1436
1437
1438
1439
1440 public HTable createTable(HTableDescriptor htd, byte[][] splitRows)
1441 throws IOException {
1442 getHBaseAdmin().createTable(htd, splitRows);
1443
1444 waitUntilAllRegionsAssigned(htd.getTableName());
1445 return new HTable(getConfiguration(), htd.getTableName());
1446 }
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456 public HTable createTable(TableName tableName, byte[][] families,
1457 final Configuration c)
1458 throws IOException {
1459 return createTable(tableName, families, (byte[][]) null, c);
1460 }
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471 public HTable createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1472 final Configuration c) throws IOException {
1473 return createTable(new HTableDescriptor(tableName), families, splitKeys, c);
1474 }
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484 public HTable createTable(byte[] tableName, byte[][] families,
1485 final Configuration c)
1486 throws IOException {
1487 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1488 for(byte[] family : families) {
1489 HColumnDescriptor hcd = new HColumnDescriptor(family);
1490
1491
1492
1493 hcd.setBloomFilterType(BloomType.NONE);
1494 desc.addFamily(hcd);
1495 }
1496 getHBaseAdmin().createTable(desc);
1497 return new HTable(c, desc.getTableName());
1498 }
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509 public HTable createTable(TableName tableName, byte[][] families,
1510 final Configuration c, int numVersions)
1511 throws IOException {
1512 HTableDescriptor desc = new HTableDescriptor(tableName);
1513 for(byte[] family : families) {
1514 HColumnDescriptor hcd = new HColumnDescriptor(family)
1515 .setMaxVersions(numVersions);
1516 desc.addFamily(hcd);
1517 }
1518 getHBaseAdmin().createTable(desc);
1519
1520 waitUntilAllRegionsAssigned(tableName);
1521 return new HTable(c, tableName);
1522 }
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533 public HTable createTable(byte[] tableName, byte[][] families,
1534 final Configuration c, int numVersions)
1535 throws IOException {
1536 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1537 for(byte[] family : families) {
1538 HColumnDescriptor hcd = new HColumnDescriptor(family)
1539 .setMaxVersions(numVersions);
1540 desc.addFamily(hcd);
1541 }
1542 getHBaseAdmin().createTable(desc);
1543 return new HTable(c, desc.getTableName());
1544 }
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554 public HTable createTable(byte[] tableName, byte[] family, int numVersions)
1555 throws IOException {
1556 return createTable(tableName, new byte[][]{family}, numVersions);
1557 }
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567 public HTable createTable(TableName tableName, byte[] family, int numVersions)
1568 throws IOException {
1569 return createTable(tableName, new byte[][]{family}, numVersions);
1570 }
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580 public HTable createTable(byte[] tableName, byte[][] families,
1581 int numVersions)
1582 throws IOException {
1583 return createTable(TableName.valueOf(tableName), families, numVersions);
1584 }
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594 public HTable createTable(TableName tableName, byte[][] families,
1595 int numVersions)
1596 throws IOException {
1597 return createTable(tableName, families, numVersions, (byte[][]) null);
1598 }
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609 public HTable createTable(TableName tableName, byte[][] families, int numVersions,
1610 byte[][] splitKeys) throws IOException {
1611 HTableDescriptor desc = new HTableDescriptor(tableName);
1612 for (byte[] family : families) {
1613 HColumnDescriptor hcd = new HColumnDescriptor(family).setMaxVersions(numVersions);
1614 desc.addFamily(hcd);
1615 }
1616 getHBaseAdmin().createTable(desc, splitKeys);
1617
1618 waitUntilAllRegionsAssigned(tableName);
1619 return new HTable(new Configuration(getConfiguration()), tableName);
1620 }
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630 public HTable createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1631 throws IOException {
1632 return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1633 }
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644 public HTable createTable(byte[] tableName, byte[][] families,
1645 int numVersions, int blockSize) throws IOException {
1646 return createTable(TableName.valueOf(tableName),
1647 families, numVersions, blockSize);
1648 }
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659 public HTable createTable(TableName tableName, byte[][] families,
1660 int numVersions, int blockSize) throws IOException {
1661 HTableDescriptor desc = new HTableDescriptor(tableName);
1662 for (byte[] family : families) {
1663 HColumnDescriptor hcd = new HColumnDescriptor(family)
1664 .setMaxVersions(numVersions)
1665 .setBlocksize(blockSize);
1666 desc.addFamily(hcd);
1667 }
1668 getHBaseAdmin().createTable(desc);
1669
1670 waitUntilAllRegionsAssigned(tableName);
1671 return new HTable(new Configuration(getConfiguration()), tableName);
1672 }
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682 public HTable createTable(byte[] tableName, byte[][] families,
1683 int[] numVersions)
1684 throws IOException {
1685 return createTable(TableName.valueOf(tableName), families, numVersions);
1686 }
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696 public HTable createTable(TableName tableName, byte[][] families,
1697 int[] numVersions)
1698 throws IOException {
1699 HTableDescriptor desc = new HTableDescriptor(tableName);
1700 int i = 0;
1701 for (byte[] family : families) {
1702 HColumnDescriptor hcd = new HColumnDescriptor(family)
1703 .setMaxVersions(numVersions[i]);
1704 desc.addFamily(hcd);
1705 i++;
1706 }
1707 getHBaseAdmin().createTable(desc);
1708
1709 waitUntilAllRegionsAssigned(tableName);
1710 return new HTable(new Configuration(getConfiguration()), tableName);
1711 }
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721 public HTable createTable(byte[] tableName, byte[] family, byte[][] splitRows)
1722 throws IOException{
1723 return createTable(TableName.valueOf(tableName), family, splitRows);
1724 }
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734 public HTable createTable(TableName tableName, byte[] family, byte[][] splitRows)
1735 throws IOException {
1736 HTableDescriptor desc = new HTableDescriptor(tableName);
1737 HColumnDescriptor hcd = new HColumnDescriptor(family);
1738 desc.addFamily(hcd);
1739 getHBaseAdmin().createTable(desc, splitRows);
1740
1741 waitUntilAllRegionsAssigned(tableName);
1742 return new HTable(getConfiguration(), tableName);
1743 }
1744
1745
1746
1747
1748
1749
1750
1751
1752 public HTable createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1753 return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1754 }
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764 public HTable createTable(byte[] tableName, byte[][] families, byte[][] splitRows)
1765 throws IOException {
1766 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1767 for(byte[] family:families) {
1768 HColumnDescriptor hcd = new HColumnDescriptor(family);
1769 desc.addFamily(hcd);
1770 }
1771 getHBaseAdmin().createTable(desc, splitRows);
1772
1773 waitUntilAllRegionsAssigned(desc.getTableName());
1774 return new HTable(getConfiguration(), desc.getTableName());
1775 }
1776
1777
1778
1779
1780 public static WAL createWal(final Configuration conf, final Path rootDir, final HRegionInfo hri)
1781 throws IOException {
1782
1783
1784 Configuration confForWAL = new Configuration(conf);
1785 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
1786 return (new WALFactory(confForWAL,
1787 Collections.<WALActionsListener>singletonList(new MetricsWAL()),
1788 "hregion-" + RandomStringUtils.randomNumeric(8))).
1789 getWAL(hri.getEncodedNameAsBytes());
1790 }
1791
1792
1793
1794
1795
1796 public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir,
1797 final Configuration conf, final HTableDescriptor htd) throws IOException {
1798 return createRegionAndWAL(info, rootDir, conf, htd, true);
1799 }
1800
1801
1802
1803
1804
1805 public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir,
1806 final Configuration conf, final HTableDescriptor htd, boolean initialize)
1807 throws IOException {
1808 WAL wal = createWal(conf, rootDir, info);
1809 return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
1810 }
1811
1812
1813
1814
1815 public static void closeRegionAndWAL(final Region r) throws IOException {
1816 closeRegionAndWAL((HRegion)r);
1817 }
1818
1819
1820
1821
1822 public static void closeRegionAndWAL(final HRegion r) throws IOException {
1823 if (r == null) return;
1824 r.close();
1825 if (r.getWAL() == null) return;
1826 r.getWAL().close();
1827 }
1828
1829
1830
1831
1832 @SuppressWarnings("serial")
1833 public static void modifyTableSync(Admin admin, HTableDescriptor desc)
1834 throws IOException, InterruptedException {
1835 admin.modifyTable(desc.getTableName(), desc);
1836 Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{
1837 setFirst(0);
1838 setSecond(0);
1839 }};
1840 int i = 0;
1841 do {
1842 status = admin.getAlterStatus(desc.getTableName());
1843 if (status.getSecond() != 0) {
1844 LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond()
1845 + " regions updated.");
1846 Thread.sleep(1 * 1000l);
1847 } else {
1848 LOG.debug("All regions updated.");
1849 break;
1850 }
1851 } while (status.getFirst() != 0 && i++ < 500);
1852 if (status.getFirst() != 0) {
1853 throw new IOException("Failed to update all regions even after 500 seconds.");
1854 }
1855 }
1856
1857
1858
1859
1860 public static void setReplicas(Admin admin, TableName table, int replicaCount)
1861 throws IOException, InterruptedException {
1862 admin.disableTable(table);
1863 HTableDescriptor desc = admin.getTableDescriptor(table);
1864 desc.setRegionReplication(replicaCount);
1865 admin.modifyTable(desc.getTableName(), desc);
1866 admin.enableTable(table);
1867 }
1868
1869
1870
1871
1872
1873 public void deleteTable(String tableName) throws IOException {
1874 deleteTable(TableName.valueOf(tableName));
1875 }
1876
1877
1878
1879
1880
1881 public void deleteTable(byte[] tableName) throws IOException {
1882 deleteTable(TableName.valueOf(tableName));
1883 }
1884
1885
1886
1887
1888
1889 public void deleteTable(TableName tableName) throws IOException {
1890 try {
1891 getHBaseAdmin().disableTable(tableName);
1892 } catch (TableNotEnabledException e) {
1893 LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1894 }
1895 getHBaseAdmin().deleteTable(tableName);
1896 }
1897
1898
1899
1900
1901
1902 public void deleteTableIfAny(TableName tableName) throws IOException {
1903 try {
1904 deleteTable(tableName);
1905 } catch (TableNotFoundException e) {
1906
1907 }
1908 }
1909
1910
1911
1912
1913
1914 public final static byte [] fam1 = Bytes.toBytes("colfamily11");
1915 public final static byte [] fam2 = Bytes.toBytes("colfamily21");
1916 public final static byte [] fam3 = Bytes.toBytes("colfamily31");
1917 public static final byte[][] COLUMNS = {fam1, fam2, fam3};
1918 private static final int MAXVERSIONS = 3;
1919
1920 public static final char FIRST_CHAR = 'a';
1921 public static final char LAST_CHAR = 'z';
1922 public static final byte [] START_KEY_BYTES = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
1923 public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1924
1925
1926
1927
1928
1929
1930
1931
1932 public HTableDescriptor createTableDescriptor(final String name,
1933 final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1934 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name));
1935 for (byte[] cfName : new byte[][]{ fam1, fam2, fam3 }) {
1936 htd.addFamily(new HColumnDescriptor(cfName)
1937 .setMinVersions(minVersions)
1938 .setMaxVersions(versions)
1939 .setKeepDeletedCells(keepDeleted)
1940 .setBlockCacheEnabled(false)
1941 .setTimeToLive(ttl)
1942 );
1943 }
1944 return htd;
1945 }
1946
1947
1948
1949
1950
1951
1952
1953 public HTableDescriptor createTableDescriptor(final String name) {
1954 return createTableDescriptor(name, HColumnDescriptor.DEFAULT_MIN_VERSIONS,
1955 MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
1956 }
1957
1958
1959
1960
1961
1962 public HRegion createHRegion(
1963 final HRegionInfo info,
1964 final Path rootDir,
1965 final Configuration conf,
1966 final HTableDescriptor htd) throws IOException {
1967 return HRegion.createHRegion(info, rootDir, conf, htd);
1968 }
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978 public HRegion createLocalHRegion(HTableDescriptor desc, byte [] startKey,
1979 byte [] endKey)
1980 throws IOException {
1981 HRegionInfo hri = new HRegionInfo(desc.getTableName(), startKey, endKey);
1982 return createLocalHRegion(hri, desc);
1983 }
1984
1985
1986
1987
1988
1989
1990
1991
1992 public HRegion createLocalHRegion(HRegionInfo info, HTableDescriptor desc) throws IOException {
1993 return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc);
1994 }
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004 public HRegion createLocalHRegion(HRegionInfo info, HTableDescriptor desc, WAL wal)
2005 throws IOException {
2006 return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, wal);
2007 }
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021 public HRegion createLocalHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
2022 String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
2023 WAL wal, byte[]... families) throws IOException {
2024 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
2025 htd.setReadOnly(isReadOnly);
2026 for (byte[] family : families) {
2027 HColumnDescriptor hcd = new HColumnDescriptor(family);
2028
2029 hcd.setMaxVersions(Integer.MAX_VALUE);
2030 htd.addFamily(hcd);
2031 }
2032 htd.setDurability(durability);
2033 HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false);
2034 return createLocalHRegion(info, htd, wal);
2035 }
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046 public HTable deleteTableData(byte[] tableName) throws IOException {
2047 return deleteTableData(TableName.valueOf(tableName));
2048 }
2049
2050
2051
2052
2053
2054
2055
2056
2057 public HTable deleteTableData(TableName tableName) throws IOException {
2058 HTable table = new HTable(getConfiguration(), tableName);
2059 Scan scan = new Scan();
2060 ResultScanner resScan = table.getScanner(scan);
2061 for(Result res : resScan) {
2062 Delete del = new Delete(res.getRow());
2063 table.delete(del);
2064 }
2065 resScan = table.getScanner(scan);
2066 resScan.close();
2067 return table;
2068 }
2069
2070
2071
2072
2073
2074
2075
2076
2077 public HTable truncateTable(final TableName tableName, final boolean preserveRegions)
2078 throws IOException {
2079 Admin admin = getHBaseAdmin();
2080 if (!admin.isTableDisabled(tableName)) {
2081 admin.disableTable(tableName);
2082 }
2083 admin.truncateTable(tableName, preserveRegions);
2084 return new HTable(getConfiguration(), tableName);
2085 }
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096 public HTable truncateTable(final TableName tableName) throws IOException {
2097 return truncateTable(tableName, false);
2098 }
2099
2100
2101
2102
2103
2104
2105
2106
2107 public HTable truncateTable(final byte[] tableName, final boolean preserveRegions)
2108 throws IOException {
2109 return truncateTable(TableName.valueOf(tableName), preserveRegions);
2110 }
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121 public HTable truncateTable(final byte[] tableName) throws IOException {
2122 return truncateTable(tableName, false);
2123 }
2124
2125
2126
2127
2128
2129
2130
2131
2132 public int loadTable(final Table t, final byte[] f) throws IOException {
2133 return loadTable(t, new byte[][] {f});
2134 }
2135
2136
2137
2138
2139
2140
2141
2142
2143 public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
2144 return loadTable(t, new byte[][] {f}, null, writeToWAL);
2145 }
2146
2147
2148
2149
2150
2151
2152
2153
2154 public int loadTable(final Table t, final byte[][] f) throws IOException {
2155 return loadTable(t, f, null);
2156 }
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166 public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
2167 return loadTable(t, f, value, true);
2168 }
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178 public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL) throws IOException {
2179 List<Put> puts = new ArrayList<>();
2180 for (byte[] row : HBaseTestingUtility.ROWS) {
2181 Put put = new Put(row);
2182 put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
2183 for (int i = 0; i < f.length; i++) {
2184 put.add(f[i], null, value != null ? value : row);
2185 }
2186 puts.add(put);
2187 }
2188 t.put(puts);
2189 return puts.size();
2190 }
2191
2192
2193
2194
2195 public static class SeenRowTracker {
2196 int dim = 'z' - 'a' + 1;
2197 int[][][] seenRows = new int[dim][dim][dim];
2198 byte[] startRow;
2199 byte[] stopRow;
2200
2201 public SeenRowTracker(byte[] startRow, byte[] stopRow) {
2202 this.startRow = startRow;
2203 this.stopRow = stopRow;
2204 }
2205
2206 void reset() {
2207 for (byte[] row : ROWS) {
2208 seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
2209 }
2210 }
2211
2212 int i(byte b) {
2213 return b - 'a';
2214 }
2215
2216 public void addRow(byte[] row) {
2217 seenRows[i(row[0])][i(row[1])][i(row[2])]++;
2218 }
2219
2220
2221
2222
2223 public void validate() {
2224 for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2225 for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2226 for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2227 int count = seenRows[i(b1)][i(b2)][i(b3)];
2228 int expectedCount = 0;
2229 if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0
2230 && Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) {
2231 expectedCount = 1;
2232 }
2233 if (count != expectedCount) {
2234 String row = new String(new byte[] {b1,b2,b3});
2235 throw new RuntimeException("Row:" + row + " has a seen count of " + count + " instead of " + expectedCount);
2236 }
2237 }
2238 }
2239 }
2240 }
2241 }
2242
2243 public int loadRegion(final HRegion r, final byte[] f) throws IOException {
2244 return loadRegion(r, f, false);
2245 }
2246
2247 public int loadRegion(final Region r, final byte[] f) throws IOException {
2248 return loadRegion((HRegion)r, f);
2249 }
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259 public int loadRegion(final HRegion r, final byte[] f, final boolean flush)
2260 throws IOException {
2261 byte[] k = new byte[3];
2262 int rowCount = 0;
2263 for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2264 for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2265 for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2266 k[0] = b1;
2267 k[1] = b2;
2268 k[2] = b3;
2269 Put put = new Put(k);
2270 put.setDurability(Durability.SKIP_WAL);
2271 put.add(f, null, k);
2272 if (r.getWAL() == null) {
2273 put.setDurability(Durability.SKIP_WAL);
2274 }
2275 int preRowCount = rowCount;
2276 int pause = 10;
2277 int maxPause = 1000;
2278 while (rowCount == preRowCount) {
2279 try {
2280 r.put(put);
2281 rowCount++;
2282 } catch (RegionTooBusyException e) {
2283 pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
2284 Threads.sleep(pause);
2285 }
2286 }
2287 }
2288 }
2289 if (flush) {
2290 r.flush(true);
2291 }
2292 }
2293 return rowCount;
2294 }
2295
2296 public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2297 throws IOException {
2298 for (int i = startRow; i < endRow; i++) {
2299 byte[] data = Bytes.toBytes(String.valueOf(i));
2300 Put put = new Put(data);
2301 put.add(f, null, data);
2302 t.put(put);
2303 }
2304 }
2305
2306 public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2307 int replicaId)
2308 throws IOException {
2309 for (int i = startRow; i < endRow; i++) {
2310 String failMsg = "Failed verification of row :" + i;
2311 byte[] data = Bytes.toBytes(String.valueOf(i));
2312 Get get = new Get(data);
2313 get.setReplicaId(replicaId);
2314 get.setConsistency(Consistency.TIMELINE);
2315 Result result = table.get(get);
2316 assertTrue(failMsg, result.containsColumn(f, null));
2317 assertEquals(failMsg, result.getColumnCells(f, null).size(), 1);
2318 Cell cell = result.getColumnLatestCell(f, null);
2319 assertTrue(failMsg,
2320 Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2321 cell.getValueLength()));
2322 }
2323 }
2324
2325 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2326 throws IOException {
2327 verifyNumericRows((HRegion)region, f, startRow, endRow);
2328 }
2329
2330 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2331 throws IOException {
2332 verifyNumericRows(region, f, startRow, endRow, true);
2333 }
2334
2335 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2336 final boolean present) throws IOException {
2337 verifyNumericRows((HRegion)region, f, startRow, endRow, present);
2338 }
2339
2340 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2341 final boolean present) throws IOException {
2342 for (int i = startRow; i < endRow; i++) {
2343 String failMsg = "Failed verification of row :" + i;
2344 byte[] data = Bytes.toBytes(String.valueOf(i));
2345 Result result = region.get(new Get(data));
2346
2347 boolean hasResult = result != null && !result.isEmpty();
2348 assertEquals(failMsg + result, present, hasResult);
2349 if (!present) continue;
2350
2351 assertTrue(failMsg, result.containsColumn(f, null));
2352 assertEquals(failMsg, result.getColumnCells(f, null).size(), 1);
2353 Cell cell = result.getColumnLatestCell(f, null);
2354 assertTrue(failMsg,
2355 Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2356 cell.getValueLength()));
2357 }
2358 }
2359
2360 public void deleteNumericRows(final HTable t, final byte[] f, int startRow, int endRow)
2361 throws IOException {
2362 for (int i = startRow; i < endRow; i++) {
2363 byte[] data = Bytes.toBytes(String.valueOf(i));
2364 Delete delete = new Delete(data);
2365 delete.deleteFamily(f);
2366 t.delete(delete);
2367 }
2368 }
2369
2370
2371
2372
2373 public int countRows(final Table table) throws IOException {
2374 Scan scan = new Scan();
2375 ResultScanner results = table.getScanner(scan);
2376 int count = 0;
2377 for (@SuppressWarnings("unused") Result res : results) {
2378 count++;
2379 }
2380 results.close();
2381 return count;
2382 }
2383
2384 public int countRows(final Table table, final byte[]... families) throws IOException {
2385 Scan scan = new Scan();
2386 for (byte[] family: families) {
2387 scan.addFamily(family);
2388 }
2389 ResultScanner results = table.getScanner(scan);
2390 int count = 0;
2391 for (@SuppressWarnings("unused") Result res : results) {
2392 count++;
2393 }
2394 results.close();
2395 return count;
2396 }
2397
2398
2399
2400
2401 public int countRows(final TableName tableName) throws IOException {
2402 Table table = getConnection().getTable(tableName);
2403 try {
2404 return countRows(table);
2405 } finally {
2406 table.close();
2407 }
2408 }
2409
2410
2411
2412
2413 public String checksumRows(final Table table) throws Exception {
2414 Scan scan = new Scan();
2415 ResultScanner results = table.getScanner(scan);
2416 MessageDigest digest = MessageDigest.getInstance("MD5");
2417 for (Result res : results) {
2418 digest.update(res.getRow());
2419 }
2420 results.close();
2421 return digest.toString();
2422 }
2423
2424
2425 public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3];
2426 static {
2427 int i = 0;
2428 for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2429 for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2430 for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2431 ROWS[i][0] = b1;
2432 ROWS[i][1] = b2;
2433 ROWS[i][2] = b3;
2434 i++;
2435 }
2436 }
2437 }
2438 }
2439
2440 public static final byte[][] KEYS = {
2441 HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2442 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2443 Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2444 Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2445 Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2446 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2447 Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2448 Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2449 Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
2450 };
2451
2452 public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = {
2453 Bytes.toBytes("bbb"),
2454 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2455 Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2456 Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2457 Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2458 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2459 Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2460 Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2461 Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz")
2462 };
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474 public List<HRegionInfo> createMultiRegionsInMeta(final Configuration conf,
2475 final HTableDescriptor htd, byte [][] startKeys)
2476 throws IOException {
2477 Table meta = new HTable(conf, TableName.META_TABLE_NAME);
2478 Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2479 List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length);
2480
2481 for (int i = 0; i < startKeys.length; i++) {
2482 int j = (i + 1) % startKeys.length;
2483 HRegionInfo hri = new HRegionInfo(htd.getTableName(), startKeys[i],
2484 startKeys[j]);
2485 MetaTableAccessor.addRegionToMeta(meta, hri);
2486 newRegions.add(hri);
2487 }
2488
2489 meta.close();
2490 return newRegions;
2491 }
2492
2493
2494
2495
2496
2497
2498 public List<byte[]> getMetaTableRows() throws IOException {
2499
2500 Table t = new HTable(new Configuration(this.conf), TableName.META_TABLE_NAME);
2501 List<byte[]> rows = new ArrayList<byte[]>();
2502 ResultScanner s = t.getScanner(new Scan());
2503 for (Result result : s) {
2504 LOG.info("getMetaTableRows: row -> " +
2505 Bytes.toStringBinary(result.getRow()));
2506 rows.add(result.getRow());
2507 }
2508 s.close();
2509 t.close();
2510 return rows;
2511 }
2512
2513
2514
2515
2516
2517
2518 public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
2519
2520 Table t = new HTable(new Configuration(this.conf), TableName.META_TABLE_NAME);
2521 List<byte[]> rows = new ArrayList<byte[]>();
2522 ResultScanner s = t.getScanner(new Scan());
2523 for (Result result : s) {
2524 HRegionInfo info = HRegionInfo.getHRegionInfo(result);
2525 if (info == null) {
2526 LOG.error("No region info for row " + Bytes.toString(result.getRow()));
2527
2528 continue;
2529 }
2530
2531 if (info.getTable().equals(tableName)) {
2532 LOG.info("getMetaTableRows: row -> " +
2533 Bytes.toStringBinary(result.getRow()) + info);
2534 rows.add(result.getRow());
2535 }
2536 }
2537 s.close();
2538 t.close();
2539 return rows;
2540 }
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553 public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2554 throws IOException, InterruptedException {
2555 List<byte[]> metaRows = getMetaTableRows(tableName);
2556 if (metaRows == null || metaRows.isEmpty()) {
2557 return null;
2558 }
2559 LOG.debug("Found " + metaRows.size() + " rows for table " +
2560 tableName);
2561 byte [] firstrow = metaRows.get(0);
2562 LOG.debug("FirstRow=" + Bytes.toString(firstrow));
2563 long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2564 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2565 int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2566 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2567 RetryCounter retrier = new RetryCounter(numRetries+1, (int)pause, TimeUnit.MICROSECONDS);
2568 while(retrier.shouldRetry()) {
2569 int index = getMiniHBaseCluster().getServerWith(firstrow);
2570 if (index != -1) {
2571 return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2572 }
2573
2574 retrier.sleepUntilNextRetry();
2575 }
2576 return null;
2577 }
2578
2579
2580
2581
2582
2583
2584
2585 public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2586 startMiniMapReduceCluster(2);
2587 return mrCluster;
2588 }
2589
2590
2591
2592
2593
2594 private void forceChangeTaskLogDir() {
2595 Field logDirField;
2596 try {
2597 logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2598 logDirField.setAccessible(true);
2599
2600 Field modifiersField = Field.class.getDeclaredField("modifiers");
2601 modifiersField.setAccessible(true);
2602 modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2603
2604 logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2605 } catch (SecurityException e) {
2606 throw new RuntimeException(e);
2607 } catch (NoSuchFieldException e) {
2608
2609 throw new RuntimeException(e);
2610 } catch (IllegalArgumentException e) {
2611 throw new RuntimeException(e);
2612 } catch (IllegalAccessException e) {
2613 throw new RuntimeException(e);
2614 }
2615 }
2616
2617
2618
2619
2620
2621
2622
2623 private void startMiniMapReduceCluster(final int servers) throws IOException {
2624 if (mrCluster != null) {
2625 throw new IllegalStateException("MiniMRCluster is already running");
2626 }
2627 LOG.info("Starting mini mapreduce cluster...");
2628 setupClusterTestDir();
2629 createDirsAndSetProperties();
2630
2631 forceChangeTaskLogDir();
2632
2633
2634
2635
2636 conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2637
2638
2639
2640 conf.setBoolean("mapreduce.map.speculative", false);
2641 conf.setBoolean("mapreduce.reduce.speculative", false);
2642
2643
2644
2645 mrCluster = new MiniMRCluster(servers,
2646 FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 1,
2647 null, null, new JobConf(this.conf));
2648 JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2649 if (jobConf == null) {
2650 jobConf = mrCluster.createJobConf();
2651 }
2652
2653 jobConf.set("mapreduce.cluster.local.dir",
2654 conf.get("mapreduce.cluster.local.dir"));
2655 LOG.info("Mini mapreduce cluster started");
2656
2657
2658
2659
2660 conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2661
2662 conf.set("mapreduce.framework.name", "yarn");
2663 conf.setBoolean("yarn.is.minicluster", true);
2664 String rmAddress = jobConf.get("yarn.resourcemanager.address");
2665 if (rmAddress != null) {
2666 conf.set("yarn.resourcemanager.address", rmAddress);
2667 }
2668 String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2669 if (historyAddress != null) {
2670 conf.set("mapreduce.jobhistory.address", historyAddress);
2671 }
2672 String schedulerAddress =
2673 jobConf.get("yarn.resourcemanager.scheduler.address");
2674 if (schedulerAddress != null) {
2675 conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2676 }
2677 }
2678
2679
2680
2681
2682 public void shutdownMiniMapReduceCluster() {
2683 if (mrCluster != null) {
2684 LOG.info("Stopping mini mapreduce cluster...");
2685 mrCluster.shutdown();
2686 mrCluster = null;
2687 LOG.info("Mini mapreduce cluster stopped");
2688 }
2689
2690 conf.set("mapreduce.jobtracker.address", "local");
2691 }
2692
2693
2694
2695
2696 public RegionServerServices createMockRegionServerService() throws IOException {
2697 return createMockRegionServerService((ServerName)null);
2698 }
2699
2700
2701
2702
2703
2704 public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) throws IOException {
2705 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2706 rss.setFileSystem(getTestFileSystem());
2707 rss.setRpcServer(rpc);
2708 return rss;
2709 }
2710
2711
2712
2713
2714
2715 public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2716 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2717 rss.setFileSystem(getTestFileSystem());
2718 return rss;
2719 }
2720
2721
2722
2723
2724
2725
2726 public void enableDebug(Class<?> clazz) {
2727 Log l = LogFactory.getLog(clazz);
2728 if (l instanceof Log4JLogger) {
2729 ((Log4JLogger) l).getLogger().setLevel(org.apache.log4j.Level.DEBUG);
2730 } else if (l instanceof Jdk14Logger) {
2731 ((Jdk14Logger) l).getLogger().setLevel(java.util.logging.Level.ALL);
2732 }
2733 }
2734
2735
2736
2737
2738
2739 public void expireMasterSession() throws Exception {
2740 HMaster master = getMiniHBaseCluster().getMaster();
2741 expireSession(master.getZooKeeper(), false);
2742 }
2743
2744
2745
2746
2747
2748
2749 public void expireRegionServerSession(int index) throws Exception {
2750 HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2751 expireSession(rs.getZooKeeper(), false);
2752 decrementMinRegionServerCount();
2753 }
2754
2755 private void decrementMinRegionServerCount() {
2756
2757
2758 decrementMinRegionServerCount(getConfiguration());
2759
2760
2761 for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2762 decrementMinRegionServerCount(master.getMaster().getConfiguration());
2763 }
2764 }
2765
2766 private void decrementMinRegionServerCount(Configuration conf) {
2767 int currentCount = conf.getInt(
2768 ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2769 if (currentCount != -1) {
2770 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
2771 Math.max(currentCount - 1, 1));
2772 }
2773 }
2774
2775 public void expireSession(ZooKeeperWatcher nodeZK) throws Exception {
2776 expireSession(nodeZK, false);
2777 }
2778
2779 @Deprecated
2780 public void expireSession(ZooKeeperWatcher nodeZK, Server server)
2781 throws Exception {
2782 expireSession(nodeZK, false);
2783 }
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796 public void expireSession(ZooKeeperWatcher nodeZK, boolean checkStatus)
2797 throws Exception {
2798 Configuration c = new Configuration(this.conf);
2799 String quorumServers = ZKConfig.getZKQuorumServersString(c);
2800 ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2801 byte[] password = zk.getSessionPasswd();
2802 long sessionID = zk.getSessionId();
2803
2804
2805
2806
2807
2808
2809
2810
2811 ZooKeeper monitor = new ZooKeeper(quorumServers,
2812 1000, new org.apache.zookeeper.Watcher(){
2813 @Override
2814 public void process(WatchedEvent watchedEvent) {
2815 LOG.info("Monitor ZKW received event="+watchedEvent);
2816 }
2817 } , sessionID, password);
2818
2819
2820 ZooKeeper newZK = new ZooKeeper(quorumServers,
2821 1000, EmptyWatcher.instance, sessionID, password);
2822
2823
2824
2825 long start = System.currentTimeMillis();
2826 while (newZK.getState() != States.CONNECTED
2827 && System.currentTimeMillis() - start < 1000) {
2828 Thread.sleep(1);
2829 }
2830 newZK.close();
2831 LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2832
2833
2834 monitor.close();
2835
2836 if (checkStatus) {
2837 new HTable(new Configuration(conf), TableName.META_TABLE_NAME).close();
2838 }
2839 }
2840
2841
2842
2843
2844
2845
2846
2847 public MiniHBaseCluster getHBaseCluster() {
2848 return getMiniHBaseCluster();
2849 }
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859 public HBaseCluster getHBaseClusterInterface() {
2860
2861
2862 return hbaseCluster;
2863 }
2864
2865
2866
2867
2868
2869
2870
2871 public Connection getConnection() throws IOException {
2872 if (this.connection == null) {
2873 this.connection = ConnectionFactory.createConnection(this.conf);
2874 }
2875 return this.connection;
2876 }
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887 public synchronized HBaseAdmin getHBaseAdmin()
2888 throws IOException {
2889 if (hbaseAdmin == null){
2890 this.hbaseAdmin = new HBaseAdminForTests(getConnection());
2891 }
2892 return hbaseAdmin;
2893 }
2894
2895 private HBaseAdminForTests hbaseAdmin = null;
2896 private static class HBaseAdminForTests extends HBaseAdmin {
2897 public HBaseAdminForTests(Connection connection) throws MasterNotRunningException,
2898 ZooKeeperConnectionException, IOException {
2899 super(connection);
2900 }
2901
2902 @Override
2903 public synchronized void close() throws IOException {
2904 LOG.warn("close() called on HBaseAdmin instance returned from " +
2905 "HBaseTestingUtility.getHBaseAdmin()");
2906 }
2907
2908 private synchronized void close0() throws IOException {
2909 super.close();
2910 }
2911 }
2912
2913
2914
2915
2916
2917
2918
2919
2920
2921
2922 public synchronized ZooKeeperWatcher getZooKeeperWatcher()
2923 throws IOException {
2924 if (zooKeeperWatcher == null) {
2925 zooKeeperWatcher = new ZooKeeperWatcher(conf, "testing utility",
2926 new Abortable() {
2927 @Override public void abort(String why, Throwable e) {
2928 throw new RuntimeException("Unexpected abort in HBaseTestingUtility:"+why, e);
2929 }
2930 @Override public boolean isAborted() {return false;}
2931 });
2932 }
2933 return zooKeeperWatcher;
2934 }
2935 private ZooKeeperWatcher zooKeeperWatcher;
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945 public void closeRegion(String regionName) throws IOException {
2946 closeRegion(Bytes.toBytes(regionName));
2947 }
2948
2949
2950
2951
2952
2953
2954
2955 public void closeRegion(byte[] regionName) throws IOException {
2956 getHBaseAdmin().closeRegion(regionName, null);
2957 }
2958
2959
2960
2961
2962
2963
2964
2965
2966 public void closeRegionByRow(String row, RegionLocator table) throws IOException {
2967 closeRegionByRow(Bytes.toBytes(row), table);
2968 }
2969
2970
2971
2972
2973
2974
2975
2976
2977 public void closeRegionByRow(byte[] row, RegionLocator table) throws IOException {
2978 HRegionLocation hrl = table.getRegionLocation(row);
2979 closeRegion(hrl.getRegionInfo().getRegionName());
2980 }
2981
2982
2983
2984
2985
2986
2987
2988
2989 public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2990 List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2991 int regCount = regions.size();
2992 Set<Integer> attempted = new HashSet<Integer>();
2993 int idx;
2994 int attempts = 0;
2995 do {
2996 regions = getHBaseCluster().getRegions(tableName);
2997 if (regCount != regions.size()) {
2998
2999 attempted.clear();
3000 }
3001 regCount = regions.size();
3002
3003
3004 if (regCount > 0) {
3005 idx = random.nextInt(regCount);
3006
3007 if (attempted.contains(idx))
3008 continue;
3009 try {
3010 regions.get(idx).checkSplit();
3011 return regions.get(idx);
3012 } catch (Exception ex) {
3013 LOG.warn("Caught exception", ex);
3014 attempted.add(idx);
3015 }
3016 }
3017 attempts++;
3018 } while (maxAttempts == -1 || attempts < maxAttempts);
3019 return null;
3020 }
3021
3022 public MiniZooKeeperCluster getZkCluster() {
3023 return zkCluster;
3024 }
3025
3026 public void setZkCluster(MiniZooKeeperCluster zkCluster) {
3027 this.passedZkCluster = true;
3028 this.zkCluster = zkCluster;
3029 conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkCluster.getClientPort());
3030 }
3031
3032 public MiniDFSCluster getDFSCluster() {
3033 return dfsCluster;
3034 }
3035
3036 public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
3037 setDFSCluster(cluster, true);
3038 }
3039
3040
3041
3042
3043
3044
3045
3046
3047
3048 public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
3049 throws IllegalStateException, IOException {
3050 if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
3051 throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
3052 }
3053 this.dfsCluster = cluster;
3054 this.setFs();
3055 }
3056
3057 public FileSystem getTestFileSystem() throws IOException {
3058 return HFileSystem.get(conf);
3059 }
3060
3061
3062
3063
3064
3065
3066
3067
3068 public void waitTableAvailable(TableName table)
3069 throws InterruptedException, IOException {
3070 waitTableAvailable(table.getName(), 30000);
3071 }
3072
3073 public void waitTableAvailable(TableName table, long timeoutMillis)
3074 throws InterruptedException, IOException {
3075 waitFor(timeoutMillis, predicateTableAvailable(table));
3076 }
3077
3078 public String explainTableAvailability(TableName tableName) throws IOException {
3079 String msg = explainTableState(tableName) + ",";
3080 if (getHBaseCluster().getMaster().isAlive()) {
3081 Map<HRegionInfo, ServerName> assignments =
3082 getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
3083 .getRegionAssignments();
3084 final List<Pair<HRegionInfo, ServerName>> metaLocations =
3085 MetaTableAccessor
3086 .getTableRegionsAndLocations(getZooKeeperWatcher(), connection, tableName);
3087 for (Pair<HRegionInfo, ServerName> metaLocation : metaLocations) {
3088 HRegionInfo hri = metaLocation.getFirst();
3089 ServerName sn = metaLocation.getSecond();
3090 if (!assignments.containsKey(hri)) {
3091 msg += ", region " + hri
3092 + " not assigned, but found in meta, it expected to be on " + sn;
3093
3094 } else if (sn == null) {
3095 msg += ", region " + hri
3096 + " assigned, but has no server in meta";
3097 } else if (!sn.equals(assignments.get(hri))) {
3098 msg += ", region " + hri
3099 + " assigned, but has different servers in meta and AM ( " +
3100 sn + " <> " + assignments.get(hri);
3101 }
3102 }
3103 }
3104 return msg;
3105 }
3106
3107 public String explainTableState(TableName tableName) throws IOException {
3108 try {
3109 if (getHBaseAdmin().isTableEnabled(tableName))
3110 return "table enabled in zk";
3111 else if (getHBaseAdmin().isTableDisabled(tableName))
3112 return "table disabled in zk";
3113 else
3114 return "table in uknown state";
3115 } catch (TableNotFoundException e) {
3116 return "table not exists";
3117 }
3118 }
3119
3120
3121
3122
3123
3124
3125
3126
3127 public void waitTableAvailable(byte[] table, long timeoutMillis)
3128 throws InterruptedException, IOException {
3129 waitTableAvailable(getHBaseAdmin(), table, timeoutMillis);
3130 }
3131
3132 public void waitTableAvailable(Admin admin, byte[] table, long timeoutMillis)
3133 throws InterruptedException, IOException {
3134 long startWait = System.currentTimeMillis();
3135 while (!admin.isTableAvailable(TableName.valueOf(table))) {
3136 assertTrue("Timed out waiting for table to become available " +
3137 Bytes.toStringBinary(table),
3138 System.currentTimeMillis() - startWait < timeoutMillis);
3139 Thread.sleep(200);
3140 }
3141 }
3142
3143
3144
3145
3146
3147
3148
3149
3150
3151
3152 public void waitTableEnabled(TableName table)
3153 throws InterruptedException, IOException {
3154 waitTableEnabled(table, 30000);
3155 }
3156
3157
3158
3159
3160
3161
3162
3163
3164
3165
3166 public void waitTableEnabled(byte[] table, long timeoutMillis)
3167 throws InterruptedException, IOException {
3168 waitTableEnabled(TableName.valueOf(table), timeoutMillis);
3169 }
3170
3171 public void waitTableEnabled(TableName table, long timeoutMillis)
3172 throws IOException {
3173 waitFor(timeoutMillis, predicateTableEnabled(table));
3174 }
3175
3176
3177
3178
3179
3180
3181
3182
3183 public void waitTableDisabled(byte[] table)
3184 throws InterruptedException, IOException {
3185 waitTableDisabled(getHBaseAdmin(), table, 30000);
3186 }
3187
3188 public void waitTableDisabled(Admin admin, byte[] table)
3189 throws InterruptedException, IOException {
3190 waitTableDisabled(admin, table, 30000);
3191 }
3192
3193
3194
3195
3196
3197
3198
3199
3200 public void waitTableDisabled(byte[] table, long timeoutMillis)
3201 throws InterruptedException, IOException {
3202 waitTableDisabled(getHBaseAdmin(), table, timeoutMillis);
3203 }
3204
3205 public void waitTableDisabled(Admin admin, byte[] table, long timeoutMillis)
3206 throws InterruptedException, IOException {
3207 TableName tableName = TableName.valueOf(table);
3208 long startWait = System.currentTimeMillis();
3209 while (!admin.isTableDisabled(tableName)) {
3210 assertTrue("Timed out waiting for table to become disabled " +
3211 Bytes.toStringBinary(table),
3212 System.currentTimeMillis() - startWait < timeoutMillis);
3213 Thread.sleep(200);
3214 }
3215 }
3216
3217
3218
3219
3220
3221
3222
3223
3224 public boolean ensureSomeRegionServersAvailable(final int num)
3225 throws IOException {
3226 boolean startedServer = false;
3227 MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
3228 for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i<num; ++i) {
3229 LOG.info("Started new server=" + hbaseCluster.startRegionServer());
3230 startedServer = true;
3231 }
3232
3233 return startedServer;
3234 }
3235
3236
3237
3238
3239
3240
3241
3242
3243
3244
3245 public boolean ensureSomeNonStoppedRegionServersAvailable(final int num)
3246 throws IOException {
3247 boolean startedServer = ensureSomeRegionServersAvailable(num);
3248
3249 int nonStoppedServers = 0;
3250 for (JVMClusterUtil.RegionServerThread rst :
3251 getMiniHBaseCluster().getRegionServerThreads()) {
3252
3253 HRegionServer hrs = rst.getRegionServer();
3254 if (hrs.isStopping() || hrs.isStopped()) {
3255 LOG.info("A region server is stopped or stopping:"+hrs);
3256 } else {
3257 nonStoppedServers++;
3258 }
3259 }
3260 for (int i=nonStoppedServers; i<num; ++i) {
3261 LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
3262 startedServer = true;
3263 }
3264 return startedServer;
3265 }
3266
3267
3268
3269
3270
3271
3272
3273
3274
3275
3276
3277 public static User getDifferentUser(final Configuration c,
3278 final String differentiatingSuffix)
3279 throws IOException {
3280 FileSystem currentfs = FileSystem.get(c);
3281 if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
3282 return User.getCurrent();
3283 }
3284
3285
3286 String username = User.getCurrent().getName() +
3287 differentiatingSuffix;
3288 User user = User.createUserForTesting(c, username,
3289 new String[]{"supergroup"});
3290 return user;
3291 }
3292
3293 public static NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
3294 throws IOException {
3295 NavigableSet<String> online = new TreeSet<String>();
3296 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
3297 try {
3298 for (HRegionInfo region :
3299 ProtobufUtil.getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
3300 online.add(region.getRegionNameAsString());
3301 }
3302 } catch (RegionServerStoppedException e) {
3303
3304 }
3305 }
3306 for (MasterThread mt : cluster.getLiveMasterThreads()) {
3307 try {
3308 for (HRegionInfo region :
3309 ProtobufUtil.getOnlineRegions(mt.getMaster().getRSRpcServices())) {
3310 online.add(region.getRegionNameAsString());
3311 }
3312 } catch (RegionServerStoppedException e) {
3313
3314 } catch (ServerNotRunningYetException e) {
3315
3316 }
3317 }
3318 return online;
3319 }
3320
3321
3322
3323
3324
3325
3326
3327
3328
3329
3330
3331
3332
3333
3334 public static void setMaxRecoveryErrorCount(final OutputStream stream,
3335 final int max) {
3336 try {
3337 Class<?> [] clazzes = DFSClient.class.getDeclaredClasses();
3338 for (Class<?> clazz: clazzes) {
3339 String className = clazz.getSimpleName();
3340 if (className.equals("DFSOutputStream")) {
3341 if (clazz.isInstance(stream)) {
3342 Field maxRecoveryErrorCountField =
3343 stream.getClass().getDeclaredField("maxRecoveryErrorCount");
3344 maxRecoveryErrorCountField.setAccessible(true);
3345 maxRecoveryErrorCountField.setInt(stream, max);
3346 break;
3347 }
3348 }
3349 }
3350 } catch (Exception e) {
3351 LOG.info("Could not set max recovery field", e);
3352 }
3353 }
3354
3355
3356
3357
3358
3359
3360
3361
3362
3363 public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3364 waitUntilAllRegionsAssigned(tableName, 60000);
3365 }
3366
3367
3368
3369
3370
3371
3372
3373
3374
3375
3376 public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3377 throws IOException {
3378 final Table meta = new HTable(getConfiguration(), TableName.META_TABLE_NAME);
3379 try {
3380 waitFor(timeout, 200, true, new Predicate<IOException>() {
3381 @Override
3382 public boolean evaluate() throws IOException {
3383 boolean allRegionsAssigned = true;
3384 Scan scan = new Scan();
3385 scan.addFamily(HConstants.CATALOG_FAMILY);
3386 ResultScanner s = meta.getScanner(scan);
3387 try {
3388 Result r;
3389 while ((r = s.next()) != null) {
3390 byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3391 HRegionInfo info = HRegionInfo.parseFromOrNull(b);
3392 if (info != null && info.getTable().equals(tableName)) {
3393 b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3394 allRegionsAssigned &= (b != null);
3395 }
3396 }
3397 } finally {
3398 s.close();
3399 }
3400 return allRegionsAssigned;
3401 }
3402 });
3403 } finally {
3404 meta.close();
3405 }
3406
3407
3408 if (!getHBaseClusterInterface().isDistributedCluster()) {
3409
3410
3411 HMaster master = getHBaseCluster().getMaster();
3412 final RegionStates states = master.getAssignmentManager().getRegionStates();
3413 waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3414 @Override
3415 public String explainFailure() throws IOException {
3416 return explainTableAvailability(tableName);
3417 }
3418
3419 @Override
3420 public boolean evaluate() throws IOException {
3421 List<HRegionInfo> hris = states.getRegionsOfTable(tableName);
3422 return hris != null && !hris.isEmpty();
3423 }
3424 });
3425 }
3426 }
3427
3428
3429
3430
3431
3432 public static List<Cell> getFromStoreFile(HStore store,
3433 Get get) throws IOException {
3434 Scan scan = new Scan(get);
3435 InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3436 scan.getFamilyMap().get(store.getFamily().getName()),
3437
3438
3439 0);
3440
3441 List<Cell> result = new ArrayList<Cell>();
3442 scanner.next(result);
3443 if (!result.isEmpty()) {
3444
3445 Cell kv = result.get(0);
3446 if (!CellUtil.matchingRow(kv, get.getRow())) {
3447 result.clear();
3448 }
3449 }
3450 scanner.close();
3451 return result;
3452 }
3453
3454
3455
3456
3457
3458
3459
3460
3461
3462 public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions){
3463 assertTrue(numRegions>3);
3464 byte [][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3465 byte [][] result = new byte[tmpSplitKeys.length+1][];
3466 System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3467 result[0] = HConstants.EMPTY_BYTE_ARRAY;
3468 return result;
3469 }
3470
3471
3472
3473
3474
3475 public static List<Cell> getFromStoreFile(HStore store,
3476 byte [] row,
3477 NavigableSet<byte[]> columns
3478 ) throws IOException {
3479 Get get = new Get(row);
3480 Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3481 s.put(store.getFamily().getName(), columns);
3482
3483 return getFromStoreFile(store,get);
3484 }
3485
3486
3487
3488
3489
3490 public static ZooKeeperWatcher getZooKeeperWatcher(
3491 HBaseTestingUtility TEST_UTIL) throws ZooKeeperConnectionException,
3492 IOException {
3493 ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
3494 "unittest", new Abortable() {
3495 boolean aborted = false;
3496
3497 @Override
3498 public void abort(String why, Throwable e) {
3499 aborted = true;
3500 throw new RuntimeException("Fatal ZK error, why=" + why, e);
3501 }
3502
3503 @Override
3504 public boolean isAborted() {
3505 return aborted;
3506 }
3507 });
3508 return zkw;
3509 }
3510
3511
3512
3513
3514
3515
3516
3517
3518
3519
3520
3521
3522 public static ZooKeeperWatcher createAndForceNodeToOpenedState(
3523 HBaseTestingUtility TEST_UTIL, Region region,
3524 ServerName serverName) throws ZooKeeperConnectionException,
3525 IOException, KeeperException, NodeExistsException {
3526 return createAndForceNodeToOpenedState(TEST_UTIL, (HRegion)region, serverName);
3527 }
3528
3529
3530
3531
3532
3533
3534
3535
3536
3537
3538
3539
3540 public static ZooKeeperWatcher createAndForceNodeToOpenedState(
3541 HBaseTestingUtility TEST_UTIL, HRegion region,
3542 ServerName serverName) throws ZooKeeperConnectionException,
3543 IOException, KeeperException, NodeExistsException {
3544 ZooKeeperWatcher zkw = getZooKeeperWatcher(TEST_UTIL);
3545 ZKAssign.createNodeOffline(zkw, region.getRegionInfo(), serverName);
3546 int version = ZKAssign.transitionNodeOpening(zkw, region
3547 .getRegionInfo(), serverName);
3548 ZKAssign.transitionNodeOpened(zkw, region.getRegionInfo(), serverName,
3549 version);
3550 return zkw;
3551 }
3552
3553 public static void assertKVListsEqual(String additionalMsg,
3554 final List<? extends Cell> expected,
3555 final List<? extends Cell> actual) {
3556 final int eLen = expected.size();
3557 final int aLen = actual.size();
3558 final int minLen = Math.min(eLen, aLen);
3559
3560 int i;
3561 for (i = 0; i < minLen
3562 && KeyValue.COMPARATOR.compare(expected.get(i), actual.get(i)) == 0;
3563 ++i) {}
3564
3565 if (additionalMsg == null) {
3566 additionalMsg = "";
3567 }
3568 if (!additionalMsg.isEmpty()) {
3569 additionalMsg = ". " + additionalMsg;
3570 }
3571
3572 if (eLen != aLen || i != minLen) {
3573 throw new AssertionError(
3574 "Expected and actual KV arrays differ at position " + i + ": " +
3575 safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
3576 safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
3577 }
3578 }
3579
3580 public static <T> String safeGetAsStr(List<T> lst, int i) {
3581 if (0 <= i && i < lst.size()) {
3582 return lst.get(i).toString();
3583 } else {
3584 return "<out_of_range>";
3585 }
3586 }
3587
3588 public String getClusterKey() {
3589 return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3590 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":"
3591 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
3592 HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3593 }
3594
3595
3596 public HTable createRandomTable(String tableName,
3597 final Collection<String> families,
3598 final int maxVersions,
3599 final int numColsPerRow,
3600 final int numFlushes,
3601 final int numRegions,
3602 final int numRowsPerFlush)
3603 throws IOException, InterruptedException {
3604
3605 LOG.info("\n\nCreating random table " + tableName + " with " + numRegions +
3606 " regions, " + numFlushes + " storefiles per region, " +
3607 numRowsPerFlush + " rows per flush, maxVersions=" + maxVersions +
3608 "\n");
3609
3610 final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3611 final int numCF = families.size();
3612 final byte[][] cfBytes = new byte[numCF][];
3613 {
3614 int cfIndex = 0;
3615 for (String cf : families) {
3616 cfBytes[cfIndex++] = Bytes.toBytes(cf);
3617 }
3618 }
3619
3620 final int actualStartKey = 0;
3621 final int actualEndKey = Integer.MAX_VALUE;
3622 final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3623 final int splitStartKey = actualStartKey + keysPerRegion;
3624 final int splitEndKey = actualEndKey - keysPerRegion;
3625 final String keyFormat = "%08x";
3626 final HTable table = createTable(tableName, cfBytes,
3627 maxVersions,
3628 Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3629 Bytes.toBytes(String.format(keyFormat, splitEndKey)),
3630 numRegions);
3631
3632 if (hbaseCluster != null) {
3633 getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3634 }
3635
3636 for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3637 for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3638 final byte[] row = Bytes.toBytes(String.format(keyFormat,
3639 actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3640
3641 Put put = new Put(row);
3642 Delete del = new Delete(row);
3643 for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3644 final byte[] cf = cfBytes[rand.nextInt(numCF)];
3645 final long ts = rand.nextInt();
3646 final byte[] qual = Bytes.toBytes("col" + iCol);
3647 if (rand.nextBoolean()) {
3648 final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
3649 "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
3650 ts + "_random_" + rand.nextLong());
3651 put.add(cf, qual, ts, value);
3652 } else if (rand.nextDouble() < 0.8) {
3653 del.deleteColumn(cf, qual, ts);
3654 } else {
3655 del.deleteColumns(cf, qual, ts);
3656 }
3657 }
3658
3659 if (!put.isEmpty()) {
3660 table.put(put);
3661 }
3662
3663 if (!del.isEmpty()) {
3664 table.delete(del);
3665 }
3666 }
3667 LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3668 table.flushCommits();
3669 if (hbaseCluster != null) {
3670 getMiniHBaseCluster().flushcache(table.getName());
3671 }
3672 }
3673
3674 return table;
3675 }
3676
3677 private static final int MIN_RANDOM_PORT = 0xc000;
3678 private static final int MAX_RANDOM_PORT = 0xfffe;
3679 private static Random random = new Random();
3680
3681
3682
3683
3684
3685 public static int randomPort() {
3686 return MIN_RANDOM_PORT
3687 + random.nextInt(MAX_RANDOM_PORT - MIN_RANDOM_PORT);
3688 }
3689
3690
3691
3692
3693
3694 public static int randomFreePort() {
3695 int port = 0;
3696 do {
3697 port = randomPort();
3698 if (takenRandomPorts.contains(port)) {
3699 port = 0;
3700 continue;
3701 }
3702 takenRandomPorts.add(port);
3703
3704 try {
3705 ServerSocket sock = new ServerSocket(port);
3706 sock.close();
3707 } catch (IOException ex) {
3708 port = 0;
3709 }
3710 } while (port == 0);
3711 return port;
3712 }
3713
3714
3715 public static String randomMultiCastAddress() {
3716 return "226.1.1." + random.nextInt(254);
3717 }
3718
3719
3720
3721 public static void waitForHostPort(String host, int port)
3722 throws IOException {
3723 final int maxTimeMs = 10000;
3724 final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3725 IOException savedException = null;
3726 LOG.info("Waiting for server at " + host + ":" + port);
3727 for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3728 try {
3729 Socket sock = new Socket(InetAddress.getByName(host), port);
3730 sock.close();
3731 savedException = null;
3732 LOG.info("Server at " + host + ":" + port + " is available");
3733 break;
3734 } catch (UnknownHostException e) {
3735 throw new IOException("Failed to look up " + host, e);
3736 } catch (IOException e) {
3737 savedException = e;
3738 }
3739 Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3740 }
3741
3742 if (savedException != null) {
3743 throw savedException;
3744 }
3745 }
3746
3747
3748
3749
3750
3751
3752 public static int createPreSplitLoadTestTable(Configuration conf,
3753 TableName tableName, byte[] columnFamily, Algorithm compression,
3754 DataBlockEncoding dataBlockEncoding) throws IOException {
3755 return createPreSplitLoadTestTable(conf, tableName,
3756 columnFamily, compression, dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1,
3757 Durability.USE_DEFAULT);
3758 }
3759
3760
3761
3762
3763
3764 public static int createPreSplitLoadTestTable(Configuration conf,
3765 TableName tableName, byte[] columnFamily, Algorithm compression,
3766 DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
3767 Durability durability)
3768 throws IOException {
3769 HTableDescriptor desc = new HTableDescriptor(tableName);
3770 desc.setDurability(durability);
3771 desc.setRegionReplication(regionReplication);
3772 HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
3773 hcd.setDataBlockEncoding(dataBlockEncoding);
3774 hcd.setCompressionType(compression);
3775 return createPreSplitLoadTestTable(conf, desc, hcd, numRegionsPerServer);
3776 }
3777
3778
3779
3780
3781
3782
3783 public static int createPreSplitLoadTestTable(Configuration conf,
3784 TableName tableName, byte[][] columnFamilies, Algorithm compression,
3785 DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
3786 Durability durability)
3787 throws IOException {
3788 HTableDescriptor desc = new HTableDescriptor(tableName);
3789 desc.setDurability(durability);
3790 desc.setRegionReplication(regionReplication);
3791 HColumnDescriptor[] hcds = new HColumnDescriptor[columnFamilies.length];
3792 for (int i = 0; i < columnFamilies.length; i++) {
3793 HColumnDescriptor hcd = new HColumnDescriptor(columnFamilies[i]);
3794 hcd.setDataBlockEncoding(dataBlockEncoding);
3795 hcd.setCompressionType(compression);
3796 hcds[i] = hcd;
3797 }
3798 return createPreSplitLoadTestTable(conf, desc, hcds, numRegionsPerServer);
3799 }
3800
3801
3802
3803
3804
3805
3806 public static int createPreSplitLoadTestTable(Configuration conf,
3807 HTableDescriptor desc, HColumnDescriptor hcd) throws IOException {
3808 return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3809 }
3810
3811
3812
3813
3814
3815
3816 public static int createPreSplitLoadTestTable(Configuration conf,
3817 HTableDescriptor desc, HColumnDescriptor hcd, int numRegionsPerServer) throws IOException {
3818 return createPreSplitLoadTestTable(conf, desc, new HColumnDescriptor[] {hcd},
3819 numRegionsPerServer);
3820 }
3821
3822
3823
3824
3825
3826
3827 public static int createPreSplitLoadTestTable(Configuration conf,
3828 HTableDescriptor desc, HColumnDescriptor[] hcds, int numRegionsPerServer) throws IOException {
3829 for (HColumnDescriptor hcd : hcds) {
3830 if (!desc.hasFamily(hcd.getName())) {
3831 desc.addFamily(hcd);
3832 }
3833 }
3834
3835 int totalNumberOfRegions = 0;
3836 Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3837 Admin admin = unmanagedConnection.getAdmin();
3838
3839 try {
3840
3841
3842
3843 int numberOfServers = admin.getClusterStatus().getServers().size();
3844 if (numberOfServers == 0) {
3845 throw new IllegalStateException("No live regionservers");
3846 }
3847
3848 totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3849 LOG.info("Number of live regionservers: " + numberOfServers + ", " +
3850 "pre-splitting table into " + totalNumberOfRegions + " regions " +
3851 "(regions per server: " + numRegionsPerServer + ")");
3852
3853 byte[][] splits = new RegionSplitter.HexStringSplit().split(
3854 totalNumberOfRegions);
3855
3856 admin.createTable(desc, splits);
3857 } catch (MasterNotRunningException e) {
3858 LOG.error("Master not running", e);
3859 throw new IOException(e);
3860 } catch (TableExistsException e) {
3861 LOG.warn("Table " + desc.getTableName() +
3862 " already exists, continuing");
3863 } finally {
3864 admin.close();
3865 unmanagedConnection.close();
3866 }
3867 return totalNumberOfRegions;
3868 }
3869
3870 public static int getMetaRSPort(Configuration conf) throws IOException {
3871 try (Connection c = ConnectionFactory.createConnection();
3872 RegionLocator locator = c.getRegionLocator(TableName.META_TABLE_NAME)) {
3873 return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3874 }
3875 }
3876
3877
3878
3879
3880
3881
3882
3883 public void assertRegionOnServer(
3884 final HRegionInfo hri, final ServerName server,
3885 final long timeout) throws IOException, InterruptedException {
3886 long timeoutTime = System.currentTimeMillis() + timeout;
3887 while (true) {
3888 List<HRegionInfo> regions = getHBaseAdmin().getOnlineRegions(server);
3889 if (regions.contains(hri)) return;
3890 long now = System.currentTimeMillis();
3891 if (now > timeoutTime) break;
3892 Thread.sleep(10);
3893 }
3894 fail("Could not find region " + hri.getRegionNameAsString()
3895 + " on server " + server);
3896 }
3897
3898
3899
3900
3901
3902 public void assertRegionOnlyOnServer(
3903 final HRegionInfo hri, final ServerName server,
3904 final long timeout) throws IOException, InterruptedException {
3905 long timeoutTime = System.currentTimeMillis() + timeout;
3906 while (true) {
3907 List<HRegionInfo> regions = getHBaseAdmin().getOnlineRegions(server);
3908 if (regions.contains(hri)) {
3909 List<JVMClusterUtil.RegionServerThread> rsThreads =
3910 getHBaseCluster().getLiveRegionServerThreads();
3911 for (JVMClusterUtil.RegionServerThread rsThread: rsThreads) {
3912 HRegionServer rs = rsThread.getRegionServer();
3913 if (server.equals(rs.getServerName())) {
3914 continue;
3915 }
3916 Collection<Region> hrs = rs.getOnlineRegionsLocalContext();
3917 for (Region r: hrs) {
3918 assertTrue("Region should not be double assigned",
3919 r.getRegionInfo().getRegionId() != hri.getRegionId());
3920 }
3921 }
3922 return;
3923 }
3924 long now = System.currentTimeMillis();
3925 if (now > timeoutTime) break;
3926 Thread.sleep(10);
3927 }
3928 fail("Could not find region " + hri.getRegionNameAsString()
3929 + " on server " + server);
3930 }
3931
3932 public HRegion createTestRegion(String tableName, HColumnDescriptor hcd)
3933 throws IOException {
3934 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
3935 htd.addFamily(hcd);
3936 HRegionInfo info =
3937 new HRegionInfo(TableName.valueOf(tableName), null, null, false);
3938 HRegion region =
3939 HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), htd);
3940 return region;
3941 }
3942
3943 public void setFileSystemURI(String fsURI) {
3944 FS_URI = fsURI;
3945 }
3946
3947
3948
3949
3950 public <E extends Exception> long waitFor(long timeout, Predicate<E> predicate)
3951 throws E {
3952 return Waiter.waitFor(this.conf, timeout, predicate);
3953 }
3954
3955
3956
3957
3958 public <E extends Exception> long waitFor(long timeout, long interval, Predicate<E> predicate)
3959 throws E {
3960 return Waiter.waitFor(this.conf, timeout, interval, predicate);
3961 }
3962
3963
3964
3965
3966 public <E extends Exception> long waitFor(long timeout, long interval,
3967 boolean failIfTimeout, Predicate<E> predicate) throws E {
3968 return Waiter.waitFor(this.conf, timeout, interval, failIfTimeout, predicate);
3969 }
3970
3971
3972
3973
3974 public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3975 return new ExplainingPredicate<IOException>() {
3976 @Override
3977 public String explainFailure() throws IOException {
3978 final RegionStates regionStates = getMiniHBaseCluster().getMaster()
3979 .getAssignmentManager().getRegionStates();
3980 return "found in transition: " + regionStates.getRegionsInTransition().toString();
3981 }
3982
3983 @Override
3984 public boolean evaluate() throws IOException {
3985 HMaster master = getMiniHBaseCluster().getMaster();
3986 if (master == null) return false;
3987 AssignmentManager am = master.getAssignmentManager();
3988 if (am == null) return false;
3989 final RegionStates regionStates = am.getRegionStates();
3990 return !regionStates.isRegionsInTransition();
3991 }
3992 };
3993 }
3994
3995
3996
3997
3998 public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3999 return new ExplainingPredicate<IOException>() {
4000 @Override
4001 public String explainFailure() throws IOException {
4002 return explainTableState(tableName);
4003 }
4004
4005 @Override
4006 public boolean evaluate() throws IOException {
4007 return getHBaseAdmin().tableExists(tableName) && getHBaseAdmin().isTableEnabled(tableName);
4008 }
4009 };
4010 }
4011
4012
4013
4014
4015 public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
4016 return new ExplainingPredicate<IOException>() {
4017 @Override
4018 public String explainFailure() throws IOException {
4019 return explainTableState(tableName);
4020 }
4021
4022 @Override
4023 public boolean evaluate() throws IOException {
4024 return getHBaseAdmin().isTableDisabled(tableName);
4025 }
4026 };
4027 }
4028
4029
4030
4031
4032 public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
4033 return new ExplainingPredicate<IOException>() {
4034 @Override
4035 public String explainFailure() throws IOException {
4036 return explainTableAvailability(tableName);
4037 }
4038
4039 @Override
4040 public boolean evaluate() throws IOException {
4041 boolean tableAvailable = getHBaseAdmin().isTableAvailable(tableName);
4042 if (tableAvailable) {
4043 try {
4044 Canary.sniff(getHBaseAdmin(), tableName);
4045 } catch (Exception e) {
4046 throw new IOException("Canary sniff failed for table " + tableName, e);
4047 }
4048 }
4049 return tableAvailable;
4050 }
4051 };
4052 }
4053
4054
4055
4056
4057
4058
4059 public void waitUntilNoRegionsInTransition(
4060 final long timeout) throws Exception {
4061 waitFor(timeout, predicateNoRegionsInTransition());
4062 }
4063
4064
4065
4066
4067
4068
4069 public void waitLabelAvailable(long timeoutMillis, final String... labels) {
4070 final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
4071 waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
4072
4073 @Override
4074 public boolean evaluate() {
4075 for (String label : labels) {
4076 if (labelsCache.getLabelOrdinal(label) == 0) {
4077 return false;
4078 }
4079 }
4080 return true;
4081 }
4082
4083 @Override
4084 public String explainFailure() {
4085 for (String label : labels) {
4086 if (labelsCache.getLabelOrdinal(label) == 0) {
4087 return label + " is not available yet";
4088 }
4089 }
4090 return "";
4091 }
4092 });
4093 }
4094
4095
4096
4097
4098
4099
4100 public static List<HColumnDescriptor> generateColumnDescriptors() {
4101 return generateColumnDescriptors("");
4102 }
4103
4104
4105
4106
4107
4108
4109
4110 public static List<HColumnDescriptor> generateColumnDescriptors(final String prefix) {
4111 List<HColumnDescriptor> htds = new ArrayList<HColumnDescriptor>();
4112 long familyId = 0;
4113 for (Compression.Algorithm compressionType: getSupportedCompressionAlgorithms()) {
4114 for (DataBlockEncoding encodingType: DataBlockEncoding.values()) {
4115 for (BloomType bloomType: BloomType.values()) {
4116 String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
4117 HColumnDescriptor htd = new HColumnDescriptor(name);
4118 htd.setCompressionType(compressionType);
4119 htd.setDataBlockEncoding(encodingType);
4120 htd.setBloomFilterType(bloomType);
4121 htds.add(htd);
4122 familyId++;
4123 }
4124 }
4125 }
4126 return htds;
4127 }
4128
4129
4130
4131
4132
4133 public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
4134 String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
4135 List<Compression.Algorithm> supportedAlgos = new ArrayList<Compression.Algorithm>();
4136 for (String algoName : allAlgos) {
4137 try {
4138 Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
4139 algo.getCompressor();
4140 supportedAlgos.add(algo);
4141 } catch (Throwable t) {
4142
4143 }
4144 }
4145 return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
4146 }
4147 }