View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase;
19  
20  import java.io.File;
21  import java.io.IOException;
22  import java.io.OutputStream;
23  import java.lang.reflect.Field;
24  import java.lang.reflect.Modifier;
25  import java.net.InetAddress;
26  import java.net.ServerSocket;
27  import java.net.Socket;
28  import java.net.UnknownHostException;
29  import java.security.MessageDigest;
30  import java.util.ArrayList;
31  import java.util.Arrays;
32  import java.util.Collection;
33  import java.util.Collections;
34  import java.util.HashSet;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.NavigableSet;
38  import java.util.Random;
39  import java.util.Set;
40  import java.util.TreeSet;
41  import java.util.UUID;
42  import java.util.concurrent.TimeUnit;
43  
44  import org.apache.commons.lang.RandomStringUtils;
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.commons.logging.impl.Jdk14Logger;
48  import org.apache.commons.logging.impl.Log4JLogger;
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.fs.FileSystem;
51  import org.apache.hadoop.fs.Path;
52  import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
53  import org.apache.hadoop.hbase.Waiter.Predicate;
54  import org.apache.hadoop.hbase.classification.InterfaceAudience;
55  import org.apache.hadoop.hbase.classification.InterfaceStability;
56  import org.apache.hadoop.hbase.client.Admin;
57  import org.apache.hadoop.hbase.client.Connection;
58  import org.apache.hadoop.hbase.client.ConnectionFactory;
59  import org.apache.hadoop.hbase.client.Consistency;
60  import org.apache.hadoop.hbase.client.Delete;
61  import org.apache.hadoop.hbase.client.Durability;
62  import org.apache.hadoop.hbase.client.Get;
63  import org.apache.hadoop.hbase.client.HBaseAdmin;
64  import org.apache.hadoop.hbase.client.HConnection;
65  import org.apache.hadoop.hbase.client.HTable;
66  import org.apache.hadoop.hbase.client.Put;
67  import org.apache.hadoop.hbase.client.RegionLocator;
68  import org.apache.hadoop.hbase.client.Result;
69  import org.apache.hadoop.hbase.client.ResultScanner;
70  import org.apache.hadoop.hbase.client.Scan;
71  import org.apache.hadoop.hbase.client.Table;
72  import org.apache.hadoop.hbase.fs.HFileSystem;
73  import org.apache.hadoop.hbase.io.compress.Compression;
74  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
75  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
76  import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
77  import org.apache.hadoop.hbase.io.hfile.HFile;
78  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
79  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
80  import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
81  import org.apache.hadoop.hbase.master.HMaster;
82  import org.apache.hadoop.hbase.master.AssignmentManager;
83  import org.apache.hadoop.hbase.master.RegionStates;
84  import org.apache.hadoop.hbase.master.ServerManager;
85  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
86  import org.apache.hadoop.hbase.regionserver.BloomType;
87  import org.apache.hadoop.hbase.regionserver.HRegion;
88  import org.apache.hadoop.hbase.regionserver.HRegionServer;
89  import org.apache.hadoop.hbase.regionserver.HStore;
90  import org.apache.hadoop.hbase.regionserver.InternalScanner;
91  import org.apache.hadoop.hbase.regionserver.Region;
92  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
93  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
94  import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
95  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
96  import org.apache.hadoop.hbase.security.User;
97  import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
98  import org.apache.hadoop.hbase.tool.Canary;
99  import org.apache.hadoop.hbase.util.Bytes;
100 import org.apache.hadoop.hbase.util.FSTableDescriptors;
101 import org.apache.hadoop.hbase.util.FSUtils;
102 import org.apache.hadoop.hbase.util.JVMClusterUtil;
103 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
104 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
105 import org.apache.hadoop.hbase.util.Pair;
106 import org.apache.hadoop.hbase.util.RegionSplitter;
107 import org.apache.hadoop.hbase.util.RetryCounter;
108 import org.apache.hadoop.hbase.util.Threads;
109 import org.apache.hadoop.hbase.wal.WAL;
110 import org.apache.hadoop.hbase.wal.WALFactory;
111 import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
112 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
113 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
114 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
115 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
116 import org.apache.hadoop.hdfs.DFSClient;
117 import org.apache.hadoop.hdfs.DistributedFileSystem;
118 import org.apache.hadoop.hdfs.MiniDFSCluster;
119 import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
120 import org.apache.hadoop.mapred.JobConf;
121 import org.apache.hadoop.mapred.MiniMRCluster;
122 import org.apache.hadoop.mapred.TaskLog;
123 import org.apache.zookeeper.KeeperException;
124 import org.apache.zookeeper.KeeperException.NodeExistsException;
125 import org.apache.zookeeper.WatchedEvent;
126 import org.apache.zookeeper.ZooKeeper;
127 import org.apache.zookeeper.ZooKeeper.States;
128 
129 import static org.junit.Assert.assertEquals;
130 import static org.junit.Assert.assertTrue;
131 import static org.junit.Assert.fail;
132 
133 /**
134  * Facility for testing HBase. Replacement for
135  * old HBaseTestCase and HBaseClusterTestCase functionality.
136  * Create an instance and keep it around testing HBase.  This class is
137  * meant to be your one-stop shop for anything you might need testing.  Manages
138  * one cluster at a time only. Managed cluster can be an in-process
139  * {@link MiniHBaseCluster}, or a deployed cluster of type {@link HBaseCluster}.
140  * Not all methods work with the real cluster.
141  * Depends on log4j being on classpath and
142  * hbase-site.xml for logging and test-run configuration.  It does not set
143  * logging levels nor make changes to configuration parameters.
144  * <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
145  * setting it to true.
146  */
147 @InterfaceAudience.Public
148 @InterfaceStability.Evolving
149 @SuppressWarnings("deprecation")
150 public class HBaseTestingUtility extends HBaseCommonTestingUtility {
151    private MiniZooKeeperCluster zkCluster = null;
152 
153   public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
154   /**
155    * The default number of regions per regionserver when creating a pre-split
156    * table.
157    */
158   public static final int DEFAULT_REGIONS_PER_SERVER = 3;
159 
160 
161   public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table";
162   public static final boolean PRESPLIT_TEST_TABLE = true;
163 
164   public static final String USE_LOCAL_FILESYSTEM = "hbase.test.local.fileSystem";
165   /**
166    * Set if we were passed a zkCluster.  If so, we won't shutdown zk as
167    * part of general shutdown.
168    */
169   private boolean passedZkCluster = false;
170   private MiniDFSCluster dfsCluster = null;
171 
172   private volatile HBaseCluster hbaseCluster = null;
173   private MiniMRCluster mrCluster = null;
174 
175   /** If there is a mini cluster running for this testing utility instance. */
176   private volatile boolean miniClusterRunning;
177 
178   private String hadoopLogDir;
179 
180   /** Directory (a subdirectory of dataTestDir) used by the dfs cluster if any */
181   private File clusterTestDir = null;
182 
183   /** Directory on test filesystem where we put the data for this instance of
184     * HBaseTestingUtility*/
185   private Path dataTestDirOnTestFS = null;
186 
187   /**
188    * Shared cluster connection.
189    */
190   private volatile Connection connection;
191 
192   /**
193    * System property key to get test directory value.
194    * Name is as it is because mini dfs has hard-codings to put test data here.
195    * It should NOT be used directly in HBase, as it's a property used in
196    *  mini dfs.
197    *  @deprecated can be used only with mini dfs
198    */
199   @Deprecated
200   private static final String TEST_DIRECTORY_KEY = "test.build.data";
201 
202   /** Filesystem URI used for map-reduce mini-cluster setup */
203   private static String FS_URI;
204 
205   /** A set of ports that have been claimed using {@link #randomFreePort()}. */
206   private static final Set<Integer> takenRandomPorts = new HashSet<Integer>();
207 
208   /** Compression algorithms to use in parameterized JUnit 4 tests */
209   public static final List<Object[]> COMPRESSION_ALGORITHMS_PARAMETERIZED =
210     Arrays.asList(new Object[][] {
211       { Compression.Algorithm.NONE },
212       { Compression.Algorithm.GZ }
213     });
214 
215   /** This is for unit tests parameterized with a two booleans. */
216   public static final List<Object[]> BOOLEAN_PARAMETERIZED =
217       Arrays.asList(new Object[][] {
218           { new Boolean(false) },
219           { new Boolean(true) }
220       });
221 
222   /** This is for unit tests parameterized with a single boolean. */
223   public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination()  ;
224   /** Compression algorithms to use in testing */
225   public static final Compression.Algorithm[] COMPRESSION_ALGORITHMS ={
226       Compression.Algorithm.NONE, Compression.Algorithm.GZ
227     };
228 
229   /**
230    * Create all combinations of Bloom filters and compression algorithms for
231    * testing.
232    */
233   private static List<Object[]> bloomAndCompressionCombinations() {
234     List<Object[]> configurations = new ArrayList<Object[]>();
235     for (Compression.Algorithm comprAlgo :
236          HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
237       for (BloomType bloomType : BloomType.values()) {
238         configurations.add(new Object[] { comprAlgo, bloomType });
239       }
240     }
241     return Collections.unmodifiableList(configurations);
242   }
243 
244   /**
245    * Create combination of memstoreTS and tags
246    */
247   private static List<Object[]> memStoreTSAndTagsCombination() {
248     List<Object[]> configurations = new ArrayList<Object[]>();
249     configurations.add(new Object[] { false, false });
250     configurations.add(new Object[] { false, true });
251     configurations.add(new Object[] { true, false });
252     configurations.add(new Object[] { true, true });
253     return Collections.unmodifiableList(configurations);
254   }
255 
256   public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
257       bloomAndCompressionCombinations();
258 
259   public HBaseTestingUtility() {
260     this(HBaseConfiguration.create());
261   }
262 
263   public HBaseTestingUtility(Configuration conf) {
264     super(conf);
265 
266     // a hbase checksum verification failure will cause unit tests to fail
267     ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
268   }
269 
270   /**
271    * Create an HBaseTestingUtility where all tmp files are written to the local test data dir.
272    * It is needed to properly base FSUtil.getRootDirs so that they drop temp files in the proper
273    * test dir.  Use this when you aren't using an Mini HDFS cluster.
274    * @return HBaseTestingUtility that use local fs for temp files.
275    */
276   public static HBaseTestingUtility createLocalHTU() {
277     Configuration c = HBaseConfiguration.create();
278     return createLocalHTU(c);
279   }
280 
281   /**
282    * Create an HBaseTestingUtility where all tmp files are written to the local test data dir.
283    * It is needed to properly base FSUtil.getRootDirs so that they drop temp files in the proper
284    * test dir.  Use this when you aren't using an Mini HDFS cluster.
285    * @param c Configuration (will be modified)
286    * @return HBaseTestingUtility that use local fs for temp files.
287    */
288   public static HBaseTestingUtility createLocalHTU(Configuration c) {
289     HBaseTestingUtility htu = new HBaseTestingUtility(c);
290     String dataTestDir = htu.getDataTestDir().toString();
291     htu.getConfiguration().set(HConstants.HBASE_DIR, dataTestDir);
292     LOG.debug("Setting " + HConstants.HBASE_DIR + " to " + dataTestDir);
293     return htu;
294   }
295 
296  /**
297   * Close the Region {@code r}. For use in tests.
298   */
299  public static void closeRegion(final Region r) throws IOException {
300    if (r != null) {
301      ((HRegion)r).close();
302    }
303  }
304 
305   /**
306    * Returns this classes's instance of {@link Configuration}.  Be careful how
307    * you use the returned Configuration since {@link HConnection} instances
308    * can be shared.  The Map of HConnections is keyed by the Configuration.  If
309    * say, a Connection was being used against a cluster that had been shutdown,
310    * see {@link #shutdownMiniCluster()}, then the Connection will no longer
311    * be wholesome.  Rather than use the return direct, its usually best to
312    * make a copy and use that.  Do
313    * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
314    * @return Instance of Configuration.
315    */
316   @Override
317   public Configuration getConfiguration() {
318     return super.getConfiguration();
319   }
320 
321   public void setHBaseCluster(HBaseCluster hbaseCluster) {
322     this.hbaseCluster = hbaseCluster;
323   }
324 
325   /**
326    * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}.
327    * Give it a random name so can have many concurrent tests running if
328    * we need to.  It needs to amend the {@link #TEST_DIRECTORY_KEY}
329    * System property, as it's what minidfscluster bases
330    * it data dir on.  Moding a System property is not the way to do concurrent
331    * instances -- another instance could grab the temporary
332    * value unintentionally -- but not anything can do about it at moment;
333    * single instance only is how the minidfscluster works.
334    *
335    * We also create the underlying directory for
336    *  hadoop.log.dir, mapreduce.cluster.local.dir and hadoop.tmp.dir, and set the values
337    *  in the conf, and as a system property for hadoop.tmp.dir
338    *
339    * @return The calculated data test build directory, if newly-created.
340    */
341   @Override
342   protected Path setupDataTestDir() {
343     Path testPath = super.setupDataTestDir();
344     if (null == testPath) {
345       return null;
346     }
347 
348     createSubDirAndSystemProperty(
349       "hadoop.log.dir",
350       testPath, "hadoop-log-dir");
351 
352     // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
353     //  we want our own value to ensure uniqueness on the same machine
354     createSubDirAndSystemProperty(
355       "hadoop.tmp.dir",
356       testPath, "hadoop-tmp-dir");
357 
358     // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
359     createSubDir(
360       "mapreduce.cluster.local.dir",
361       testPath, "mapred-local-dir");
362 
363     return testPath;
364   }
365 
366   public void setJobWithoutMRCluster() throws IOException {
367     conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
368     conf.setBoolean(HBaseTestingUtility.USE_LOCAL_FILESYSTEM, true);
369   }
370 
371   private void createSubDirAndSystemProperty(
372     String propertyName, Path parent, String subDirName){
373 
374     String sysValue = System.getProperty(propertyName);
375 
376     if (sysValue != null) {
377       // There is already a value set. So we do nothing but hope
378       //  that there will be no conflicts
379       LOG.info("System.getProperty(\""+propertyName+"\") already set to: "+
380         sysValue + " so I do NOT create it in " + parent);
381       String confValue = conf.get(propertyName);
382       if (confValue != null && !confValue.endsWith(sysValue)){
383        LOG.warn(
384          propertyName + " property value differs in configuration and system: "+
385          "Configuration="+confValue+" while System="+sysValue+
386          " Erasing configuration value by system value."
387        );
388       }
389       conf.set(propertyName, sysValue);
390     } else {
391       // Ok, it's not set, so we create it as a subdirectory
392       createSubDir(propertyName, parent, subDirName);
393       System.setProperty(propertyName, conf.get(propertyName));
394     }
395   }
396 
397   /**
398    * @return Where to write test data on the test filesystem; Returns working directory
399    * for the test filesystem by default
400    * @see #setupDataTestDirOnTestFS()
401    * @see #getTestFileSystem()
402    */
403   private Path getBaseTestDirOnTestFS() throws IOException {
404     FileSystem fs = getTestFileSystem();
405     return new Path(fs.getWorkingDirectory(), "test-data");
406   }
407 
408   /**
409    * @return META table descriptor
410    */
411   public HTableDescriptor getMetaTableDescriptor() {
412     try {
413       return new FSTableDescriptors(conf).get(TableName.META_TABLE_NAME);
414     } catch (IOException e) {
415       throw new RuntimeException("Unable to create META table descriptor", e);
416     }
417   }
418 
419   /**
420    * @return Where the DFS cluster will write data on the local subsystem.
421    * Creates it if it does not exist already.  A subdir of {@link #getBaseTestDir()}
422    * @see #getTestFileSystem()
423    */
424   Path getClusterTestDir() {
425     if (clusterTestDir == null){
426       setupClusterTestDir();
427     }
428     return new Path(clusterTestDir.getAbsolutePath());
429   }
430 
431   /**
432    * Creates a directory for the DFS cluster, under the test data
433    */
434   private void setupClusterTestDir() {
435     if (clusterTestDir != null) {
436       return;
437     }
438 
439     // Using randomUUID ensures that multiple clusters can be launched by
440     //  a same test, if it stops & starts them
441     Path testDir = getDataTestDir("dfscluster_" + UUID.randomUUID().toString());
442     clusterTestDir = new File(testDir.toString()).getAbsoluteFile();
443     // Have it cleaned up on exit
444     boolean b = deleteOnExit();
445     if (b) clusterTestDir.deleteOnExit();
446     conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
447     LOG.info("Created new mini-cluster data directory: " + clusterTestDir + ", deleteOnExit=" + b);
448   }
449 
450   /**
451    * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
452    * to write temporary test data. Call this method after setting up the mini dfs cluster
453    * if the test relies on it.
454    * @return a unique path in the test filesystem
455    */
456   public Path getDataTestDirOnTestFS() throws IOException {
457     if (dataTestDirOnTestFS == null) {
458       setupDataTestDirOnTestFS();
459     }
460 
461     return dataTestDirOnTestFS;
462   }
463 
464   /**
465    * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
466    * to write temporary test data. Call this method after setting up the mini dfs cluster
467    * if the test relies on it.
468    * @return a unique path in the test filesystem
469    * @param subdirName name of the subdir to create under the base test dir
470    */
471   public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
472     return new Path(getDataTestDirOnTestFS(), subdirName);
473   }
474 
475   /**
476    * Sets up a path in test filesystem to be used by tests.
477    * Creates a new directory if not already setup.
478    */
479   private void setupDataTestDirOnTestFS() throws IOException {
480     if (dataTestDirOnTestFS != null) {
481       LOG.warn("Data test on test fs dir already setup in "
482           + dataTestDirOnTestFS.toString());
483       return;
484     }
485     dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
486   }
487 
488   /**
489    * Sets up a new path in test filesystem to be used by tests.
490    */
491   private Path getNewDataTestDirOnTestFS() throws IOException {
492     //The file system can be either local, mini dfs, or if the configuration
493     //is supplied externally, it can be an external cluster FS. If it is a local
494     //file system, the tests should use getBaseTestDir, otherwise, we can use
495     //the working directory, and create a unique sub dir there
496     FileSystem fs = getTestFileSystem();
497     Path newDataTestDir = null;
498     if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
499       File dataTestDir = new File(getDataTestDir().toString());
500       if (deleteOnExit()) dataTestDir.deleteOnExit();
501       newDataTestDir = new Path(dataTestDir.getAbsolutePath());
502     } else {
503       Path base = getBaseTestDirOnTestFS();
504       String randomStr = UUID.randomUUID().toString();
505       newDataTestDir = new Path(base, randomStr);
506       if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
507     }
508     return newDataTestDir;
509   }
510 
511   /**
512    * Cleans the test data directory on the test filesystem.
513    * @return True if we removed the test dirs
514    * @throws IOException
515    */
516   public boolean cleanupDataTestDirOnTestFS() throws IOException {
517     boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
518     if (ret)
519       dataTestDirOnTestFS = null;
520     return ret;
521   }
522 
523   /**
524    * Cleans a subdirectory under the test data directory on the test filesystem.
525    * @return True if we removed child
526    * @throws IOException
527    */
528   public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
529     Path cpath = getDataTestDirOnTestFS(subdirName);
530     return getTestFileSystem().delete(cpath, true);
531   }
532 
533   /**
534    * Start a minidfscluster.
535    * @param servers How many DNs to start.
536    * @throws Exception
537    * @see {@link #shutdownMiniDFSCluster()}
538    * @return The mini dfs cluster created.
539    */
540   public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
541     return startMiniDFSCluster(servers, null);
542   }
543 
544   /**
545    * Start a minidfscluster.
546    * This is useful if you want to run datanode on distinct hosts for things
547    * like HDFS block location verification.
548    * If you start MiniDFSCluster without host names, all instances of the
549    * datanodes will have the same host name.
550    * @param hosts hostnames DNs to run on.
551    * @throws Exception
552    * @see {@link #shutdownMiniDFSCluster()}
553    * @return The mini dfs cluster created.
554    */
555   public MiniDFSCluster startMiniDFSCluster(final String hosts[])
556   throws Exception {
557     if ( hosts != null && hosts.length != 0) {
558       return startMiniDFSCluster(hosts.length, hosts);
559     } else {
560       return startMiniDFSCluster(1, null);
561     }
562   }
563 
564   /**
565    * Start a minidfscluster.
566    * Can only create one.
567    * @param servers How many DNs to start.
568    * @param hosts hostnames DNs to run on.
569    * @throws Exception
570    * @see {@link #shutdownMiniDFSCluster()}
571    * @return The mini dfs cluster created.
572    */
573   public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[])
574   throws Exception {
575     createDirsAndSetProperties();
576     EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
577 
578     // Error level to skip some warnings specific to the minicluster. See HBASE-4709
579     org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.util.MBeans.class).
580         setLevel(org.apache.log4j.Level.ERROR);
581     org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class).
582         setLevel(org.apache.log4j.Level.ERROR);
583 
584 
585     this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
586       true, null, null, hosts, null);
587 
588     // Set this just-started cluster as our filesystem.
589     setFs();
590 
591     // Wait for the cluster to be totally up
592     this.dfsCluster.waitClusterUp();
593 
594     //reset the test directory for test file system
595     dataTestDirOnTestFS = null;
596 
597     return this.dfsCluster;
598   }
599 
600   private void setFs() throws IOException {
601     if(this.dfsCluster == null){
602       LOG.info("Skipping setting fs because dfsCluster is null");
603       return;
604     }
605     FileSystem fs = this.dfsCluster.getFileSystem();
606     FSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
607     if (this.conf.getBoolean(USE_LOCAL_FILESYSTEM, false)) {
608       FSUtils.setFsDefault(this.conf, new Path("file:///"));
609     }
610   }
611 
612   public MiniDFSCluster startMiniDFSCluster(int servers, final  String racks[], String hosts[])
613       throws Exception {
614     createDirsAndSetProperties();
615     this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
616         true, null, racks, hosts, null);
617 
618     // Set this just-started cluster as our filesystem.
619     FileSystem fs = this.dfsCluster.getFileSystem();
620     FSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
621 
622     // Wait for the cluster to be totally up
623     this.dfsCluster.waitClusterUp();
624 
625     //reset the test directory for test file system
626     dataTestDirOnTestFS = null;
627 
628     return this.dfsCluster;
629   }
630 
631   public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
632     createDirsAndSetProperties();
633     dfsCluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null,
634         null, null, null);
635     return dfsCluster;
636   }
637 
638   /** This is used before starting HDFS and map-reduce mini-clusters */
639   private void createDirsAndSetProperties() throws IOException {
640     setupClusterTestDir();
641     System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
642     createDirAndSetProperty("cache_data", "test.cache.data");
643     createDirAndSetProperty("hadoop_tmp", "hadoop.tmp.dir");
644     hadoopLogDir = createDirAndSetProperty("hadoop_logs", "hadoop.log.dir");
645     createDirAndSetProperty("mapred_local", "mapreduce.cluster.local.dir");
646     createDirAndSetProperty("mapred_temp", "mapreduce.cluster.temp.dir");
647     enableShortCircuit();
648 
649     Path root = getDataTestDirOnTestFS("hadoop");
650     conf.set(MapreduceTestingShim.getMROutputDirProp(),
651       new Path(root, "mapred-output-dir").toString());
652     conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
653     conf.set("mapreduce.jobtracker.staging.root.dir",
654       new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
655     conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
656   }
657 
658 
659   /**
660    *  Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property.
661    *  This allows to specify this parameter on the command line.
662    *   If not set, default is true.
663    */
664   public boolean isReadShortCircuitOn(){
665     final String propName = "hbase.tests.use.shortcircuit.reads";
666     String readOnProp = System.getProperty(propName);
667     if (readOnProp != null){
668       return  Boolean.parseBoolean(readOnProp);
669     } else {
670       return conf.getBoolean(propName, false);
671     }
672   }
673 
674   /** Enable the short circuit read, unless configured differently.
675    * Set both HBase and HDFS settings, including skipping the hdfs checksum checks.
676    */
677   private void enableShortCircuit() {
678     if (isReadShortCircuitOn()) {
679       String curUser = System.getProperty("user.name");
680       LOG.info("read short circuit is ON for user " + curUser);
681       // read short circuit, for hdfs
682       conf.set("dfs.block.local-path-access.user", curUser);
683       // read short circuit, for hbase
684       conf.setBoolean("dfs.client.read.shortcircuit", true);
685       // Skip checking checksum, for the hdfs client and the datanode
686       conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
687     } else {
688       LOG.info("read short circuit is OFF");
689     }
690   }
691 
692   private String createDirAndSetProperty(final String relPath, String property) {
693     String path = getDataTestDir(relPath).toString();
694     System.setProperty(property, path);
695     conf.set(property, path);
696     new File(path).mkdirs();
697     LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
698     return path;
699   }
700 
701   /**
702    * Shuts down instance created by call to {@link #startMiniDFSCluster(int)}
703    * or does nothing.
704    * @throws IOException
705    */
706   public void shutdownMiniDFSCluster() throws IOException {
707     if (this.dfsCluster != null) {
708       // The below throws an exception per dn, AsynchronousCloseException.
709       this.dfsCluster.shutdown();
710       dfsCluster = null;
711       dataTestDirOnTestFS = null;
712       FSUtils.setFsDefault(this.conf, new Path("file:///"));
713     }
714   }
715 
716   /**
717    * Call this if you only want a zk cluster.
718    * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
719    * @throws Exception
720    * @see #shutdownMiniZKCluster()
721    * @return zk cluster started.
722    */
723   public MiniZooKeeperCluster startMiniZKCluster() throws Exception {
724     return startMiniZKCluster(1);
725   }
726 
727   /**
728    * Call this if you only want a zk cluster.
729    * @param zooKeeperServerNum
730    * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
731    * @throws Exception
732    * @see #shutdownMiniZKCluster()
733    * @return zk cluster started.
734    */
735   public MiniZooKeeperCluster startMiniZKCluster(
736       final int zooKeeperServerNum,
737       final int ... clientPortList)
738       throws Exception {
739     setupClusterTestDir();
740     return startMiniZKCluster(clusterTestDir, zooKeeperServerNum, clientPortList);
741   }
742 
743   private MiniZooKeeperCluster startMiniZKCluster(final File dir)
744     throws Exception {
745     return startMiniZKCluster(dir, 1, null);
746   }
747 
748   /**
749    * Start a mini ZK cluster. If the property "test.hbase.zookeeper.property.clientPort" is set
750    *  the port mentionned is used as the default port for ZooKeeper.
751    */
752   private MiniZooKeeperCluster startMiniZKCluster(final File dir,
753       final int zooKeeperServerNum,
754       final int [] clientPortList)
755   throws Exception {
756     if (this.zkCluster != null) {
757       throw new IOException("Cluster already running at " + dir);
758     }
759     this.passedZkCluster = false;
760     this.zkCluster = new MiniZooKeeperCluster(this.getConfiguration());
761     final int defPort = this.conf.getInt("test.hbase.zookeeper.property.clientPort", 0);
762     if (defPort > 0){
763       // If there is a port in the config file, we use it.
764       this.zkCluster.setDefaultClientPort(defPort);
765     }
766 
767     if (clientPortList != null) {
768       // Ignore extra client ports
769       int clientPortListSize = (clientPortList.length <= zooKeeperServerNum) ?
770           clientPortList.length : zooKeeperServerNum;
771       for (int i=0; i < clientPortListSize; i++) {
772         this.zkCluster.addClientPort(clientPortList[i]);
773       }
774     }
775     int clientPort =   this.zkCluster.startup(dir,zooKeeperServerNum);
776     this.conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,
777       Integer.toString(clientPort));
778     return this.zkCluster;
779   }
780 
781   /**
782    * Shuts down zk cluster created by call to {@link #startMiniZKCluster(File)}
783    * or does nothing.
784    * @throws IOException
785    * @see #startMiniZKCluster()
786    */
787   public void shutdownMiniZKCluster() throws IOException {
788     if (this.zkCluster != null) {
789       this.zkCluster.shutdown();
790       this.zkCluster = null;
791     }
792   }
793 
794   /**
795    * Start up a minicluster of hbase, dfs, and zookeeper.
796    * @throws Exception
797    * @return Mini hbase cluster instance created.
798    * @see {@link #shutdownMiniDFSCluster()}
799    */
800   public MiniHBaseCluster startMiniCluster() throws Exception {
801     return startMiniCluster(1, 1);
802   }
803 
804   /**
805    * Start up a minicluster of hbase, dfs, and zookeeper.
806    * Set the <code>create</code> flag to create root or data directory path or not
807    * (will overwrite if dir already exists)
808    * @throws Exception
809    * @return Mini hbase cluster instance created.
810    * @see {@link #shutdownMiniDFSCluster()}
811    */
812   public MiniHBaseCluster startMiniCluster(final int numSlaves, boolean create)
813   throws Exception {
814     return startMiniCluster(1, numSlaves, create);
815   }
816 
817   /**
818    * Start up a minicluster of hbase, optionally dfs, and zookeeper.
819    * Modifies Configuration.  Homes the cluster data directory under a random
820    * subdirectory in a directory under System property test.build.data.
821    * Directory is cleaned up on exit.
822    * @param numSlaves Number of slaves to start up.  We'll start this many
823    * datanodes and regionservers.  If numSlaves is > 1, then make sure
824    * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
825    * bind errors.
826    * @throws Exception
827    * @see {@link #shutdownMiniCluster()}
828    * @return Mini hbase cluster instance created.
829    */
830   public MiniHBaseCluster startMiniCluster(final int numSlaves)
831   throws Exception {
832     return startMiniCluster(1, numSlaves, false);
833   }
834 
835   /**
836    * Start minicluster. Whether to create a new root or data dir path even if such a path
837    * has been created earlier is decided based on flag <code>create</code>
838    * @throws Exception
839    * @see {@link #shutdownMiniCluster()}
840    * @return Mini hbase cluster instance created.
841    */
842   public MiniHBaseCluster startMiniCluster(final int numMasters,
843       final int numSlaves, boolean create)
844     throws Exception {
845       return startMiniCluster(numMasters, numSlaves, null, create);
846   }
847 
848   /**
849    * start minicluster
850    * @throws Exception
851    * @see {@link #shutdownMiniCluster()}
852    * @return Mini hbase cluster instance created.
853    */
854   public MiniHBaseCluster startMiniCluster(final int numMasters,
855     final int numSlaves)
856   throws Exception {
857     return startMiniCluster(numMasters, numSlaves, null, false);
858   }
859 
860   public MiniHBaseCluster startMiniCluster(final int numMasters,
861       final int numSlaves, final String[] dataNodeHosts, boolean create)
862       throws Exception {
863     return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts,
864         null, null, create);
865   }
866 
867   /**
868    * Start up a minicluster of hbase, optionally dfs, and zookeeper.
869    * Modifies Configuration.  Homes the cluster data directory under a random
870    * subdirectory in a directory under System property test.build.data.
871    * Directory is cleaned up on exit.
872    * @param numMasters Number of masters to start up.  We'll start this many
873    * hbase masters.  If numMasters > 1, you can find the active/primary master
874    * with {@link MiniHBaseCluster#getMaster()}.
875    * @param numSlaves Number of slaves to start up.  We'll start this many
876    * regionservers. If dataNodeHosts == null, this also indicates the number of
877    * datanodes to start. If dataNodeHosts != null, the number of datanodes is
878    * based on dataNodeHosts.length.
879    * If numSlaves is > 1, then make sure
880    * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
881    * bind errors.
882    * @param dataNodeHosts hostnames DNs to run on.
883    * This is useful if you want to run datanode on distinct hosts for things
884    * like HDFS block location verification.
885    * If you start MiniDFSCluster without host names,
886    * all instances of the datanodes will have the same host name.
887    * @throws Exception
888    * @see {@link #shutdownMiniCluster()}
889    * @return Mini hbase cluster instance created.
890    */
891   public MiniHBaseCluster startMiniCluster(final int numMasters,
892       final int numSlaves, final String[] dataNodeHosts) throws Exception {
893     return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts,
894         null, null);
895   }
896 
897   /**
898    * Same as {@link #startMiniCluster(int, int)}, but with custom number of datanodes.
899    * @param numDataNodes Number of data nodes.
900    */
901   public MiniHBaseCluster startMiniCluster(final int numMasters,
902       final int numSlaves, final int numDataNodes) throws Exception {
903     return startMiniCluster(numMasters, numSlaves, numDataNodes, null, null, null);
904   }
905 
906   /**
907    * Start up a minicluster of hbase, optionally dfs, and zookeeper.
908    * Modifies Configuration.  Homes the cluster data directory under a random
909    * subdirectory in a directory under System property test.build.data.
910    * Directory is cleaned up on exit.
911    * @param numMasters Number of masters to start up.  We'll start this many
912    * hbase masters.  If numMasters > 1, you can find the active/primary master
913    * with {@link MiniHBaseCluster#getMaster()}.
914    * @param numSlaves Number of slaves to start up.  We'll start this many
915    * regionservers. If dataNodeHosts == null, this also indicates the number of
916    * datanodes to start. If dataNodeHosts != null, the number of datanodes is
917    * based on dataNodeHosts.length.
918    * If numSlaves is > 1, then make sure
919    * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
920    * bind errors.
921    * @param dataNodeHosts hostnames DNs to run on.
922    * This is useful if you want to run datanode on distinct hosts for things
923    * like HDFS block location verification.
924    * If you start MiniDFSCluster without host names,
925    * all instances of the datanodes will have the same host name.
926    * @param masterClass The class to use as HMaster, or null for default
927    * @param regionserverClass The class to use as HRegionServer, or null for
928    * default
929    * @throws Exception
930    * @see {@link #shutdownMiniCluster()}
931    * @return Mini hbase cluster instance created.
932    */
933   public MiniHBaseCluster startMiniCluster(final int numMasters,
934       final int numSlaves, final String[] dataNodeHosts, Class<? extends HMaster> masterClass,
935       Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
936           throws Exception {
937     return startMiniCluster(
938         numMasters, numSlaves, numSlaves, dataNodeHosts, masterClass, regionserverClass);
939   }
940 
941   public MiniHBaseCluster startMiniCluster(final int numMasters,
942       final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
943       Class<? extends HMaster> masterClass,
944       Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
945     throws Exception {
946     return startMiniCluster(numMasters, numSlaves, numDataNodes, dataNodeHosts,
947         masterClass, regionserverClass, false);
948   }
949 
950   /**
951    * Same as {@link #startMiniCluster(int, int, String[], Class, Class)}, but with custom
952    * number of datanodes.
953    * @param numDataNodes Number of data nodes.
954    * @param create Set this flag to create a new
955    * root or data directory path or not (will overwrite if exists already).
956    */
957   public MiniHBaseCluster startMiniCluster(final int numMasters,
958     final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
959     Class<? extends HMaster> masterClass,
960     Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
961     boolean create)
962   throws Exception {
963     if (dataNodeHosts != null && dataNodeHosts.length != 0) {
964       numDataNodes = dataNodeHosts.length;
965     }
966 
967     LOG.info("Starting up minicluster with " + numMasters + " master(s) and " +
968         numSlaves + " regionserver(s) and " + numDataNodes + " datanode(s)");
969 
970     // If we already put up a cluster, fail.
971     if (miniClusterRunning) {
972       throw new IllegalStateException("A mini-cluster is already running");
973     }
974     miniClusterRunning = true;
975 
976     setupClusterTestDir();
977     System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
978 
979     // Bring up mini dfs cluster. This spews a bunch of warnings about missing
980     // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
981     if(this.dfsCluster == null) {
982       dfsCluster = startMiniDFSCluster(numDataNodes, dataNodeHosts);
983     }
984 
985     // Start up a zk cluster.
986     if (this.zkCluster == null) {
987       startMiniZKCluster(clusterTestDir);
988     }
989 
990     // Start the MiniHBaseCluster
991     return startMiniHBaseCluster(numMasters, numSlaves, masterClass,
992       regionserverClass, create);
993   }
994 
995   public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves)
996       throws IOException, InterruptedException{
997     return startMiniHBaseCluster(numMasters, numSlaves, null, null, false);
998   }
999 
1000   /**
1001    * Starts up mini hbase cluster.  Usually used after call to
1002    * {@link #startMiniCluster(int, int)} when doing stepped startup of clusters.
1003    * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1004    * @param numMasters
1005    * @param numSlaves
1006    * @param create Whether to create a
1007    * root or data directory path or not; will overwrite if exists already.
1008    * @return Reference to the hbase mini hbase cluster.
1009    * @throws IOException
1010    * @throws InterruptedException
1011    * @see {@link #startMiniCluster()}
1012    */
1013   public MiniHBaseCluster startMiniHBaseCluster(final int numMasters,
1014         final int numSlaves, Class<? extends HMaster> masterClass,
1015         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
1016         boolean create)
1017   throws IOException, InterruptedException {
1018     // Now do the mini hbase cluster.  Set the hbase.rootdir in config.
1019     createRootDir(create);
1020 
1021     // These settings will make the server waits until this exact number of
1022     // regions servers are connected.
1023     if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
1024       conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, numSlaves);
1025     }
1026     if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
1027       conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, numSlaves);
1028     }
1029 
1030     Configuration c = new Configuration(this.conf);
1031     this.hbaseCluster =
1032         new MiniHBaseCluster(c, numMasters, numSlaves, masterClass, regionserverClass);
1033     // Don't leave here till we've done a successful scan of the hbase:meta
1034     Table t = new HTable(c, TableName.META_TABLE_NAME);
1035     ResultScanner s = t.getScanner(new Scan());
1036     while (s.next() != null) {
1037       continue;
1038     }
1039     s.close();
1040     t.close();
1041 
1042     getHBaseAdmin(); // create immediately the hbaseAdmin
1043     LOG.info("Minicluster is up");
1044 
1045     // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
1046     // for tests that do not read hbase-defaults.xml
1047     setHBaseFsTmpDir();
1048 
1049     return (MiniHBaseCluster)this.hbaseCluster;
1050   }
1051 
1052   /**
1053    * Starts the hbase cluster up again after shutting it down previously in a
1054    * test.  Use this if you want to keep dfs/zk up and just stop/start hbase.
1055    * @param servers number of region servers
1056    * @throws IOException
1057    */
1058   public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1059     this.hbaseCluster = new MiniHBaseCluster(this.conf, servers);
1060     // Don't leave here till we've done a successful scan of the hbase:meta
1061     Table t = new HTable(new Configuration(this.conf), TableName.META_TABLE_NAME);
1062     ResultScanner s = t.getScanner(new Scan());
1063     while (s.next() != null) {
1064       // do nothing
1065     }
1066     LOG.info("HBase has been restarted");
1067     s.close();
1068     t.close();
1069   }
1070 
1071   /**
1072    * @return Current mini hbase cluster. Only has something in it after a call
1073    * to {@link #startMiniCluster()}.
1074    * @see #startMiniCluster()
1075    */
1076   public MiniHBaseCluster getMiniHBaseCluster() {
1077     if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) {
1078       return (MiniHBaseCluster)this.hbaseCluster;
1079     }
1080     throw new RuntimeException(hbaseCluster + " not an instance of " +
1081                                MiniHBaseCluster.class.getName());
1082   }
1083 
1084   /**
1085    * Stops mini hbase, zk, and hdfs clusters.
1086    * @throws IOException
1087    * @see {@link #startMiniCluster(int)}
1088    */
1089   public void shutdownMiniCluster() throws Exception {
1090     LOG.info("Shutting down minicluster");
1091     if (this.connection != null && !this.connection.isClosed()) {
1092       this.connection.close();
1093       this.connection = null;
1094     }
1095     shutdownMiniHBaseCluster();
1096     if (!this.passedZkCluster){
1097       shutdownMiniZKCluster();
1098     }
1099     shutdownMiniDFSCluster();
1100 
1101     cleanupTestDir();
1102     miniClusterRunning = false;
1103     LOG.info("Minicluster is down");
1104   }
1105 
1106   /**
1107    * @return True if we removed the test dirs
1108    * @throws IOException
1109    */
1110   @Override
1111   public boolean cleanupTestDir() throws IOException {
1112     boolean ret = super.cleanupTestDir();
1113     if (deleteDir(this.clusterTestDir)) {
1114       this.clusterTestDir = null;
1115       return ret & true;
1116     }
1117     return false;
1118   }
1119 
1120   /**
1121    * Shutdown HBase mini cluster.  Does not shutdown zk or dfs if running.
1122    * @throws IOException
1123    */
1124   public void shutdownMiniHBaseCluster() throws IOException {
1125     if (hbaseAdmin != null) {
1126       hbaseAdmin.close0();
1127       hbaseAdmin = null;
1128     }
1129 
1130     // unset the configuration for MIN and MAX RS to start
1131     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1132     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1133     if (this.hbaseCluster != null) {
1134       this.hbaseCluster.shutdown();
1135       // Wait till hbase is down before going on to shutdown zk.
1136       this.hbaseCluster.waitUntilShutDown();
1137       this.hbaseCluster = null;
1138     }
1139 
1140     if (zooKeeperWatcher != null) {
1141       zooKeeperWatcher.close();
1142       zooKeeperWatcher = null;
1143     }
1144   }
1145 
1146   /**
1147    * Returns the path to the default root dir the minicluster uses. If <code>create</code>
1148    * is true, a new root directory path is fetched irrespective of whether it has been fetched
1149    * before or not. If false, previous path is used.
1150    * Note: this does not cause the root dir to be created.
1151    * @return Fully qualified path for the default hbase root dir
1152    * @throws IOException
1153    */
1154   public Path getDefaultRootDirPath(boolean create) throws IOException {
1155     if (!create) {
1156       return getDataTestDirOnTestFS();
1157     } else {
1158       return getNewDataTestDirOnTestFS();
1159     }
1160   }
1161 
1162   /**
1163    * Same as {{@link HBaseTestingUtility#getDefaultRootDirPath(boolean create)}
1164    * except that <code>create</code> flag is false.
1165    * Note: this does not cause the root dir to be created.
1166    * @return Fully qualified path for the default hbase root dir
1167    * @throws IOException
1168    */
1169   public Path getDefaultRootDirPath() throws IOException {
1170     return getDefaultRootDirPath(false);
1171   }
1172 
1173   /**
1174    * Creates an hbase rootdir in user home directory.  Also creates hbase
1175    * version file.  Normally you won't make use of this method.  Root hbasedir
1176    * is created for you as part of mini cluster startup.  You'd only use this
1177    * method if you were doing manual operation.
1178    * @param create This flag decides whether to get a new
1179    * root or data directory path or not, if it has been fetched already.
1180    * Note : Directory will be made irrespective of whether path has been fetched or not.
1181    * If directory already exists, it will be overwritten
1182    * @return Fully qualified path to hbase root dir
1183    * @throws IOException
1184    */
1185   public Path createRootDir(boolean create) throws IOException {
1186     FileSystem fs = FileSystem.get(this.conf);
1187     Path hbaseRootdir = getDefaultRootDirPath(create);
1188     FSUtils.setRootDir(this.conf, hbaseRootdir);
1189     fs.mkdirs(hbaseRootdir);
1190     FSUtils.setVersion(fs, hbaseRootdir);
1191     return hbaseRootdir;
1192   }
1193 
1194   /**
1195    * Same as {@link HBaseTestingUtility#createRootDir(boolean create)}
1196    * except that <code>create</code> flag is false.
1197    * @return Fully qualified path to hbase root dir
1198    * @throws IOException
1199    */
1200   public Path createRootDir() throws IOException {
1201     return createRootDir(false);
1202   }
1203 
1204 
1205   private void setHBaseFsTmpDir() throws IOException {
1206     String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1207     if (hbaseFsTmpDirInString == null) {
1208       this.conf.set("hbase.fs.tmp.dir",  getDataTestDirOnTestFS("hbase-staging").toString());
1209       LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1210     } else {
1211       LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1212     }
1213   }
1214 
1215   /**
1216    * Flushes all caches in the mini hbase cluster
1217    * @throws IOException
1218    */
1219   public void flush() throws IOException {
1220     getMiniHBaseCluster().flushcache();
1221   }
1222 
1223   /**
1224    * Flushes all caches in the mini hbase cluster
1225    * @throws IOException
1226    */
1227   public void flush(TableName tableName) throws IOException {
1228     getMiniHBaseCluster().flushcache(tableName);
1229   }
1230 
1231   /**
1232    * Compact all regions in the mini hbase cluster
1233    * @throws IOException
1234    */
1235   public void compact(boolean major) throws IOException {
1236     getMiniHBaseCluster().compact(major);
1237   }
1238 
1239   /**
1240    * Compact all of a table's reagion in the mini hbase cluster
1241    * @throws IOException
1242    */
1243   public void compact(TableName tableName, boolean major) throws IOException {
1244     getMiniHBaseCluster().compact(tableName, major);
1245   }
1246 
1247   /**
1248    * Create a table.
1249    * @param tableName
1250    * @param family
1251    * @return An HTable instance for the created table.
1252    * @throws IOException
1253    */
1254   public Table createTable(TableName tableName, String family)
1255   throws IOException{
1256     return createTable(tableName, new String[]{family});
1257   }
1258 
1259   /**
1260    * Create a table.
1261    * @param tableName
1262    * @param family
1263    * @return An HTable instance for the created table.
1264    * @throws IOException
1265    */
1266   public HTable createTable(byte[] tableName, byte[] family)
1267   throws IOException{
1268     return createTable(TableName.valueOf(tableName), new byte[][]{family});
1269   }
1270 
1271   /**
1272    * Create a table.
1273    * @param tableName
1274    * @param families
1275    * @return An HTable instance for the created table.
1276    * @throws IOException
1277    */
1278   public Table createTable(TableName tableName, String[] families)
1279   throws IOException {
1280     List<byte[]> fams = new ArrayList<byte[]>(families.length);
1281     for (String family : families) {
1282       fams.add(Bytes.toBytes(family));
1283     }
1284     return createTable(tableName, fams.toArray(new byte[0][]));
1285   }
1286 
1287   /**
1288    * Create a table.
1289    * @param tableName
1290    * @param family
1291    * @return An HTable instance for the created table.
1292    * @throws IOException
1293    */
1294   public HTable createTable(TableName tableName, byte[] family)
1295   throws IOException{
1296     return createTable(tableName, new byte[][]{family});
1297   }
1298 
1299   /**
1300    * Create a table with multiple regions.
1301    * @param tableName
1302    * @param family
1303    * @param numRegions
1304    * @return An HTable instance for the created table.
1305    * @throws IOException
1306    */
1307   public HTable createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1308       throws IOException {
1309     if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1310     byte[] startKey = Bytes.toBytes("aaaaa");
1311     byte[] endKey = Bytes.toBytes("zzzzz");
1312     byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1313 
1314     return createTable(tableName, new byte[][] { family }, splitKeys);
1315   }
1316 
1317 
1318   /**
1319    * Create a table.
1320    * @param tableName
1321    * @param families
1322    * @return An HTable instance for the created table.
1323    * @throws IOException
1324    */
1325   public HTable createTable(byte[] tableName, byte[][] families)
1326   throws IOException {
1327     return createTable(tableName, families,
1328         new Configuration(getConfiguration()));
1329   }
1330 
1331   /**
1332    * Create a table.
1333    * @param tableName
1334    * @param families
1335    * @return An HTable instance for the created table.
1336    * @throws IOException
1337    */
1338   public HTable createTable(TableName tableName, byte[][] families)
1339   throws IOException {
1340     return createTable(tableName, families, (byte[][]) null);
1341   }
1342 
1343   /**
1344    * Create a table with multiple regions.
1345    * @param tableName
1346    * @param families
1347    * @return An HTable instance for the created table.
1348    * @throws IOException
1349    */
1350   public HTable createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1351     return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1352   }
1353 
1354   /**
1355    * Create a table.
1356    * @param tableName
1357    * @param families
1358    * @param splitKeys
1359    * @return An HTable instance for the created table.
1360    * @throws IOException
1361    */
1362   public HTable createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1363       throws IOException {
1364     return createTable(tableName, families, splitKeys, new Configuration(getConfiguration()));
1365   }
1366 
1367   public HTable createTable(byte[] tableName, byte[][] families,
1368       int numVersions, byte[] startKey, byte[] endKey, int numRegions) throws IOException {
1369     return createTable(TableName.valueOf(tableName), families, numVersions,
1370         startKey, endKey, numRegions);
1371   }
1372 
1373   public HTable createTable(String tableName, byte[][] families,
1374       int numVersions, byte[] startKey, byte[] endKey, int numRegions) throws IOException {
1375     return createTable(TableName.valueOf(tableName), families, numVersions,
1376         startKey, endKey, numRegions);
1377   }
1378 
1379   public HTable createTable(TableName tableName, byte[][] families,
1380       int numVersions, byte[] startKey, byte[] endKey, int numRegions)
1381   throws IOException{
1382     HTableDescriptor desc = new HTableDescriptor(tableName);
1383     for (byte[] family : families) {
1384       HColumnDescriptor hcd = new HColumnDescriptor(family)
1385           .setMaxVersions(numVersions);
1386       desc.addFamily(hcd);
1387     }
1388     getHBaseAdmin().createTable(desc, startKey, endKey, numRegions);
1389     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1390     waitUntilAllRegionsAssigned(tableName);
1391     return new HTable(getConfiguration(), tableName);
1392   }
1393 
1394   /**
1395    * Create a table.
1396    * @param htd
1397    * @param families
1398    * @param c Configuration to use
1399    * @return An HTable instance for the created table.
1400    * @throws IOException
1401    */
1402   public HTable createTable(HTableDescriptor htd, byte[][] families, Configuration c)
1403   throws IOException {
1404     return createTable(htd, families, (byte[][]) null, c);
1405   }
1406 
1407   /**
1408    * Create a table.
1409    * @param htd
1410    * @param families
1411    * @param splitKeys
1412    * @param c Configuration to use
1413    * @return An HTable instance for the created table.
1414    * @throws IOException
1415    */
1416   public HTable createTable(HTableDescriptor htd, byte[][] families, byte[][] splitKeys,
1417       Configuration c) throws IOException {
1418     for (byte[] family : families) {
1419       HColumnDescriptor hcd = new HColumnDescriptor(family);
1420       // Disable blooms (they are on by default as of 0.95) but we disable them here because
1421       // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1422       // on is interfering.
1423       hcd.setBloomFilterType(BloomType.NONE);
1424       htd.addFamily(hcd);
1425     }
1426     getHBaseAdmin().createTable(htd, splitKeys);
1427     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1428     // assigned
1429     waitUntilAllRegionsAssigned(htd.getTableName());
1430     return (HTable) getConnection().getTable(htd.getTableName());
1431   }
1432 
1433   /**
1434    * Create a table.
1435    * @param htd
1436    * @param splitRows
1437    * @return An HTable instance for the created table.
1438    * @throws IOException
1439    */
1440   public HTable createTable(HTableDescriptor htd, byte[][] splitRows)
1441       throws IOException {
1442     getHBaseAdmin().createTable(htd, splitRows);
1443     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1444     waitUntilAllRegionsAssigned(htd.getTableName());
1445     return new HTable(getConfiguration(), htd.getTableName());
1446   }
1447 
1448   /**
1449    * Create a table.
1450    * @param tableName
1451    * @param families
1452    * @param c Configuration to use
1453    * @return An HTable instance for the created table.
1454    * @throws IOException
1455    */
1456   public HTable createTable(TableName tableName, byte[][] families,
1457       final Configuration c)
1458   throws IOException {
1459     return createTable(tableName, families, (byte[][]) null, c);
1460   }
1461 
1462   /**
1463    * Create a table.
1464    * @param tableName
1465    * @param families
1466    * @param splitKeys
1467    * @param c Configuration to use
1468    * @return An HTable instance for the created table.
1469    * @throws IOException
1470    */
1471   public HTable createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1472       final Configuration c) throws IOException {
1473     return createTable(new HTableDescriptor(tableName), families, splitKeys, c);
1474   }
1475 
1476   /**
1477    * Create a table.
1478    * @param tableName
1479    * @param families
1480    * @param c Configuration to use
1481    * @return An HTable instance for the created table.
1482    * @throws IOException
1483    */
1484   public HTable createTable(byte[] tableName, byte[][] families,
1485       final Configuration c)
1486   throws IOException {
1487     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1488     for(byte[] family : families) {
1489       HColumnDescriptor hcd = new HColumnDescriptor(family);
1490       // Disable blooms (they are on by default as of 0.95) but we disable them here because
1491       // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1492       // on is interfering.
1493       hcd.setBloomFilterType(BloomType.NONE);
1494       desc.addFamily(hcd);
1495     }
1496     getHBaseAdmin().createTable(desc);
1497     return new HTable(c, desc.getTableName());
1498   }
1499 
1500   /**
1501    * Create a table.
1502    * @param tableName
1503    * @param families
1504    * @param c Configuration to use
1505    * @param numVersions
1506    * @return An HTable instance for the created table.
1507    * @throws IOException
1508    */
1509   public HTable createTable(TableName tableName, byte[][] families,
1510       final Configuration c, int numVersions)
1511   throws IOException {
1512     HTableDescriptor desc = new HTableDescriptor(tableName);
1513     for(byte[] family : families) {
1514       HColumnDescriptor hcd = new HColumnDescriptor(family)
1515           .setMaxVersions(numVersions);
1516       desc.addFamily(hcd);
1517     }
1518     getHBaseAdmin().createTable(desc);
1519     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1520     waitUntilAllRegionsAssigned(tableName);
1521     return new HTable(c, tableName);
1522   }
1523 
1524   /**
1525    * Create a table.
1526    * @param tableName
1527    * @param families
1528    * @param c Configuration to use
1529    * @param numVersions
1530    * @return An HTable instance for the created table.
1531    * @throws IOException
1532    */
1533   public HTable createTable(byte[] tableName, byte[][] families,
1534       final Configuration c, int numVersions)
1535   throws IOException {
1536     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1537     for(byte[] family : families) {
1538       HColumnDescriptor hcd = new HColumnDescriptor(family)
1539           .setMaxVersions(numVersions);
1540       desc.addFamily(hcd);
1541     }
1542     getHBaseAdmin().createTable(desc);
1543     return new HTable(c, desc.getTableName());
1544   }
1545 
1546   /**
1547    * Create a table.
1548    * @param tableName
1549    * @param family
1550    * @param numVersions
1551    * @return An HTable instance for the created table.
1552    * @throws IOException
1553    */
1554   public HTable createTable(byte[] tableName, byte[] family, int numVersions)
1555   throws IOException {
1556     return createTable(tableName, new byte[][]{family}, numVersions);
1557   }
1558 
1559   /**
1560    * Create a table.
1561    * @param tableName
1562    * @param family
1563    * @param numVersions
1564    * @return An HTable instance for the created table.
1565    * @throws IOException
1566    */
1567   public HTable createTable(TableName tableName, byte[] family, int numVersions)
1568   throws IOException {
1569     return createTable(tableName, new byte[][]{family}, numVersions);
1570   }
1571 
1572   /**
1573    * Create a table.
1574    * @param tableName
1575    * @param families
1576    * @param numVersions
1577    * @return An HTable instance for the created table.
1578    * @throws IOException
1579    */
1580   public HTable createTable(byte[] tableName, byte[][] families,
1581       int numVersions)
1582   throws IOException {
1583     return createTable(TableName.valueOf(tableName), families, numVersions);
1584   }
1585 
1586   /**
1587    * Create a table.
1588    * @param tableName
1589    * @param families
1590    * @param numVersions
1591    * @return An HTable instance for the created table.
1592    * @throws IOException
1593    */
1594   public HTable createTable(TableName tableName, byte[][] families,
1595       int numVersions)
1596   throws IOException {
1597     return createTable(tableName, families, numVersions, (byte[][]) null);
1598   }
1599 
1600   /**
1601    * Create a table.
1602    * @param tableName
1603    * @param families
1604    * @param numVersions
1605    * @param splitKeys
1606    * @return An HTable instance for the created table.
1607    * @throws IOException
1608    */
1609   public HTable createTable(TableName tableName, byte[][] families, int numVersions,
1610       byte[][] splitKeys) throws IOException {
1611     HTableDescriptor desc = new HTableDescriptor(tableName);
1612     for (byte[] family : families) {
1613       HColumnDescriptor hcd = new HColumnDescriptor(family).setMaxVersions(numVersions);
1614       desc.addFamily(hcd);
1615     }
1616     getHBaseAdmin().createTable(desc, splitKeys);
1617     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1618     waitUntilAllRegionsAssigned(tableName);
1619     return new HTable(new Configuration(getConfiguration()), tableName);
1620   }
1621 
1622   /**
1623    * Create a table with multiple regions.
1624    * @param tableName
1625    * @param families
1626    * @param numVersions
1627    * @return An HTable instance for the created table.
1628    * @throws IOException
1629    */
1630   public HTable createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1631       throws IOException {
1632     return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1633   }
1634 
1635   /**
1636    * Create a table.
1637    * @param tableName
1638    * @param families
1639    * @param numVersions
1640    * @param blockSize
1641    * @return An HTable instance for the created table.
1642    * @throws IOException
1643    */
1644   public HTable createTable(byte[] tableName, byte[][] families,
1645     int numVersions, int blockSize) throws IOException {
1646     return createTable(TableName.valueOf(tableName),
1647         families, numVersions, blockSize);
1648   }
1649 
1650   /**
1651    * Create a table.
1652    * @param tableName
1653    * @param families
1654    * @param numVersions
1655    * @param blockSize
1656    * @return An HTable instance for the created table.
1657    * @throws IOException
1658    */
1659   public HTable createTable(TableName tableName, byte[][] families,
1660     int numVersions, int blockSize) throws IOException {
1661     HTableDescriptor desc = new HTableDescriptor(tableName);
1662     for (byte[] family : families) {
1663       HColumnDescriptor hcd = new HColumnDescriptor(family)
1664           .setMaxVersions(numVersions)
1665           .setBlocksize(blockSize);
1666       desc.addFamily(hcd);
1667     }
1668     getHBaseAdmin().createTable(desc);
1669     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1670     waitUntilAllRegionsAssigned(tableName);
1671     return new HTable(new Configuration(getConfiguration()), tableName);
1672   }
1673 
1674   /**
1675    * Create a table.
1676    * @param tableName
1677    * @param families
1678    * @param numVersions
1679    * @return An HTable instance for the created table.
1680    * @throws IOException
1681    */
1682   public HTable createTable(byte[] tableName, byte[][] families,
1683       int[] numVersions)
1684   throws IOException {
1685     return createTable(TableName.valueOf(tableName), families, numVersions);
1686   }
1687 
1688   /**
1689    * Create a table.
1690    * @param tableName
1691    * @param families
1692    * @param numVersions
1693    * @return An HTable instance for the created table.
1694    * @throws IOException
1695    */
1696   public HTable createTable(TableName tableName, byte[][] families,
1697       int[] numVersions)
1698   throws IOException {
1699     HTableDescriptor desc = new HTableDescriptor(tableName);
1700     int i = 0;
1701     for (byte[] family : families) {
1702       HColumnDescriptor hcd = new HColumnDescriptor(family)
1703           .setMaxVersions(numVersions[i]);
1704       desc.addFamily(hcd);
1705       i++;
1706     }
1707     getHBaseAdmin().createTable(desc);
1708     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1709     waitUntilAllRegionsAssigned(tableName);
1710     return new HTable(new Configuration(getConfiguration()), tableName);
1711   }
1712 
1713   /**
1714    * Create a table.
1715    * @param tableName
1716    * @param family
1717    * @param splitRows
1718    * @return An HTable instance for the created table.
1719    * @throws IOException
1720    */
1721   public HTable createTable(byte[] tableName, byte[] family, byte[][] splitRows)
1722     throws IOException{
1723     return createTable(TableName.valueOf(tableName), family, splitRows);
1724   }
1725 
1726   /**
1727    * Create a table.
1728    * @param tableName
1729    * @param family
1730    * @param splitRows
1731    * @return An HTable instance for the created table.
1732    * @throws IOException
1733    */
1734   public HTable createTable(TableName tableName, byte[] family, byte[][] splitRows)
1735       throws IOException {
1736     HTableDescriptor desc = new HTableDescriptor(tableName);
1737     HColumnDescriptor hcd = new HColumnDescriptor(family);
1738     desc.addFamily(hcd);
1739     getHBaseAdmin().createTable(desc, splitRows);
1740     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1741     waitUntilAllRegionsAssigned(tableName);
1742     return new HTable(getConfiguration(), tableName);
1743   }
1744 
1745   /**
1746    * Create a table with multiple regions.
1747    * @param tableName
1748    * @param family
1749    * @return An HTable instance for the created table.
1750    * @throws IOException
1751    */
1752   public HTable createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1753     return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1754   }
1755 
1756   /**
1757    * Create a table.
1758    * @param tableName
1759    * @param families
1760    * @param splitRows
1761    * @return An HTable instance for the created table.
1762    * @throws IOException
1763    */
1764   public HTable createTable(byte[] tableName, byte[][] families, byte[][] splitRows)
1765       throws IOException {
1766     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1767     for(byte[] family:families) {
1768       HColumnDescriptor hcd = new HColumnDescriptor(family);
1769       desc.addFamily(hcd);
1770     }
1771     getHBaseAdmin().createTable(desc, splitRows);
1772     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1773     waitUntilAllRegionsAssigned(desc.getTableName());
1774     return new HTable(getConfiguration(), desc.getTableName());
1775   }
1776 
1777   /**
1778    * Create an unmanaged WAL. Be sure to close it when you're through.
1779    */
1780   public static WAL createWal(final Configuration conf, final Path rootDir, final HRegionInfo hri)
1781       throws IOException {
1782     // The WAL subsystem will use the default rootDir rather than the passed in rootDir
1783     // unless I pass along via the conf.
1784     Configuration confForWAL = new Configuration(conf);
1785     confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
1786     return (new WALFactory(confForWAL,
1787         Collections.<WALActionsListener>singletonList(new MetricsWAL()),
1788         "hregion-" + RandomStringUtils.randomNumeric(8))).
1789         getWAL(hri.getEncodedNameAsBytes());
1790   }
1791 
1792   /**
1793    * Create a region with it's own WAL. Be sure to call
1794    * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
1795    */
1796   public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir,
1797       final Configuration conf, final HTableDescriptor htd) throws IOException {
1798     return createRegionAndWAL(info, rootDir, conf, htd, true);
1799   }
1800 
1801   /**
1802    * Create a region with it's own WAL. Be sure to call
1803    * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
1804    */
1805   public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir,
1806       final Configuration conf, final HTableDescriptor htd, boolean initialize)
1807       throws IOException {
1808     WAL wal = createWal(conf, rootDir, info);
1809     return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
1810   }
1811 
1812   /**
1813    * Close both the region {@code r} and it's underlying WAL. For use in tests.
1814    */
1815   public static void closeRegionAndWAL(final Region r) throws IOException {
1816     closeRegionAndWAL((HRegion)r);
1817   }
1818 
1819   /**
1820    * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
1821    */
1822   public static void closeRegionAndWAL(final HRegion r) throws IOException {
1823     if (r == null) return;
1824     r.close();
1825     if (r.getWAL() == null) return;
1826     r.getWAL().close();
1827   }
1828 
1829   /**
1830    * Modify a table, synchronous. Waiting logic similar to that of {@code admin.rb#alter_status}.
1831    */
1832   @SuppressWarnings("serial")
1833   public static void modifyTableSync(Admin admin, HTableDescriptor desc)
1834       throws IOException, InterruptedException {
1835     admin.modifyTable(desc.getTableName(), desc);
1836     Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{
1837       setFirst(0);
1838       setSecond(0);
1839     }};
1840     int i = 0;
1841     do {
1842       status = admin.getAlterStatus(desc.getTableName());
1843       if (status.getSecond() != 0) {
1844         LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond()
1845           + " regions updated.");
1846         Thread.sleep(1 * 1000l);
1847       } else {
1848         LOG.debug("All regions updated.");
1849         break;
1850       }
1851     } while (status.getFirst() != 0 && i++ < 500);
1852     if (status.getFirst() != 0) {
1853       throw new IOException("Failed to update all regions even after 500 seconds.");
1854     }
1855   }
1856 
1857   /**
1858    * Set the number of Region replicas.
1859    */
1860   public static void setReplicas(Admin admin, TableName table, int replicaCount)
1861       throws IOException, InterruptedException {
1862     admin.disableTable(table);
1863     HTableDescriptor desc = admin.getTableDescriptor(table);
1864     desc.setRegionReplication(replicaCount);
1865     admin.modifyTable(desc.getTableName(), desc);
1866     admin.enableTable(table);
1867   }
1868 
1869   /**
1870    * Drop an existing table
1871    * @param tableName existing table
1872    */
1873   public void deleteTable(String tableName) throws IOException {
1874     deleteTable(TableName.valueOf(tableName));
1875   }
1876 
1877   /**
1878    * Drop an existing table
1879    * @param tableName existing table
1880    */
1881   public void deleteTable(byte[] tableName) throws IOException {
1882     deleteTable(TableName.valueOf(tableName));
1883   }
1884 
1885   /**
1886    * Drop an existing table
1887    * @param tableName existing table
1888    */
1889   public void deleteTable(TableName tableName) throws IOException {
1890     try {
1891       getHBaseAdmin().disableTable(tableName);
1892     } catch (TableNotEnabledException e) {
1893       LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1894     }
1895     getHBaseAdmin().deleteTable(tableName);
1896   }
1897 
1898   /**
1899    * Drop an existing table
1900    * @param tableName existing table
1901    */
1902   public void deleteTableIfAny(TableName tableName) throws IOException {
1903     try {
1904       deleteTable(tableName);
1905     } catch (TableNotFoundException e) {
1906       // ignore
1907     }
1908   }
1909 
1910   // ==========================================================================
1911   // Canned table and table descriptor creation
1912   // TODO replace HBaseTestCase
1913 
1914   public final static byte [] fam1 = Bytes.toBytes("colfamily11");
1915   public final static byte [] fam2 = Bytes.toBytes("colfamily21");
1916   public final static byte [] fam3 = Bytes.toBytes("colfamily31");
1917   public static final byte[][] COLUMNS = {fam1, fam2, fam3};
1918   private static final int MAXVERSIONS = 3;
1919 
1920   public static final char FIRST_CHAR = 'a';
1921   public static final char LAST_CHAR = 'z';
1922   public static final byte [] START_KEY_BYTES = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
1923   public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1924 
1925   /**
1926    * Create a table of name <code>name</code> with {@link COLUMNS} for
1927    * families.
1928    * @param name Name to give table.
1929    * @param versions How many versions to allow per column.
1930    * @return Column descriptor.
1931    */
1932   public HTableDescriptor createTableDescriptor(final String name,
1933       final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1934     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name));
1935     for (byte[] cfName : new byte[][]{ fam1, fam2, fam3 }) {
1936       htd.addFamily(new HColumnDescriptor(cfName)
1937           .setMinVersions(minVersions)
1938           .setMaxVersions(versions)
1939           .setKeepDeletedCells(keepDeleted)
1940           .setBlockCacheEnabled(false)
1941           .setTimeToLive(ttl)
1942       );
1943     }
1944     return htd;
1945   }
1946 
1947   /**
1948    * Create a table of name <code>name</code> with {@link COLUMNS} for
1949    * families.
1950    * @param name Name to give table.
1951    * @return Column descriptor.
1952    */
1953   public HTableDescriptor createTableDescriptor(final String name) {
1954     return createTableDescriptor(name,  HColumnDescriptor.DEFAULT_MIN_VERSIONS,
1955         MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
1956   }
1957 
1958   /**
1959    * Create an HRegion. Be sure to call {@link HBaseTestingUtility#closeRegion(Region)}
1960    * when you're finished with it.
1961    */
1962   public HRegion createHRegion(
1963       final HRegionInfo info,
1964       final Path rootDir,
1965       final Configuration conf,
1966       final HTableDescriptor htd) throws IOException {
1967     return HRegion.createHRegion(info, rootDir, conf, htd);
1968   }
1969 
1970   /**
1971    * Create an HRegion that writes to the local tmp dirs
1972    * @param desc
1973    * @param startKey
1974    * @param endKey
1975    * @return
1976    * @throws IOException
1977    */
1978   public HRegion createLocalHRegion(HTableDescriptor desc, byte [] startKey,
1979       byte [] endKey)
1980   throws IOException {
1981     HRegionInfo hri = new HRegionInfo(desc.getTableName(), startKey, endKey);
1982     return createLocalHRegion(hri, desc);
1983   }
1984 
1985   /**
1986    * Create an HRegion that writes to the local tmp dirs
1987    * @param info
1988    * @param desc
1989    * @return
1990    * @throws IOException
1991    */
1992   public HRegion createLocalHRegion(HRegionInfo info, HTableDescriptor desc) throws IOException {
1993     return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc);
1994   }
1995 
1996   /**
1997    * Create an HRegion that writes to the local tmp dirs with specified wal
1998    * @param info regioninfo
1999    * @param desc table descriptor
2000    * @param wal wal for this region.
2001    * @return created hregion
2002    * @throws IOException
2003    */
2004   public HRegion createLocalHRegion(HRegionInfo info, HTableDescriptor desc, WAL wal)
2005       throws IOException {
2006     return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, wal);
2007   }
2008 
2009   /**
2010    * @param tableName
2011    * @param startKey
2012    * @param stopKey
2013    * @param callingMethod
2014    * @param conf
2015    * @param isReadOnly
2016    * @param families
2017    * @throws IOException
2018    * @return A region on which you must call
2019    *         {@link HRegion#closeHRegion(HRegion)} when done.
2020    */
2021   public HRegion createLocalHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
2022       String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
2023       WAL wal, byte[]... families) throws IOException {
2024     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
2025     htd.setReadOnly(isReadOnly);
2026     for (byte[] family : families) {
2027       HColumnDescriptor hcd = new HColumnDescriptor(family);
2028       // Set default to be three versions.
2029       hcd.setMaxVersions(Integer.MAX_VALUE);
2030       htd.addFamily(hcd);
2031     }
2032     htd.setDurability(durability);
2033     HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false);
2034     return createLocalHRegion(info, htd, wal);
2035   }
2036   //
2037   // ==========================================================================
2038 
2039   /**
2040    * Provide an existing table name to truncate.
2041    * Scans the table and issues a delete for each row read.
2042    * @param tableName existing table
2043    * @return HTable to that new table
2044    * @throws IOException
2045    */
2046   public HTable deleteTableData(byte[] tableName) throws IOException {
2047     return deleteTableData(TableName.valueOf(tableName));
2048   }
2049 
2050   /**
2051    * Provide an existing table name to truncate.
2052    * Scans the table and issues a delete for each row read.
2053    * @param tableName existing table
2054    * @return HTable to that new table
2055    * @throws IOException
2056    */
2057   public HTable deleteTableData(TableName tableName) throws IOException {
2058     HTable table = new HTable(getConfiguration(), tableName);
2059     Scan scan = new Scan();
2060     ResultScanner resScan = table.getScanner(scan);
2061     for(Result res : resScan) {
2062       Delete del = new Delete(res.getRow());
2063       table.delete(del);
2064     }
2065     resScan = table.getScanner(scan);
2066     resScan.close();
2067     return table;
2068   }
2069 
2070   /**
2071    * Truncate a table using the admin command.
2072    * Effectively disables, deletes, and recreates the table.
2073    * @param tableName table which must exist.
2074    * @param preserveRegions keep the existing split points
2075    * @return HTable for the new table
2076    */
2077   public HTable truncateTable(final TableName tableName, final boolean preserveRegions)
2078       throws IOException {
2079     Admin admin = getHBaseAdmin();
2080     if (!admin.isTableDisabled(tableName)) {
2081       admin.disableTable(tableName);
2082     }
2083     admin.truncateTable(tableName, preserveRegions);
2084     return new HTable(getConfiguration(), tableName);
2085   }
2086 
2087   /**
2088    * Truncate a table using the admin command.
2089    * Effectively disables, deletes, and recreates the table.
2090    * For previous behavior of issuing row deletes, see
2091    * deleteTableData.
2092    * Expressly does not preserve regions of existing table.
2093    * @param tableName table which must exist.
2094    * @return HTable for the new table
2095    */
2096   public HTable truncateTable(final TableName tableName) throws IOException {
2097     return truncateTable(tableName, false);
2098   }
2099 
2100   /**
2101    * Truncate a table using the admin command.
2102    * Effectively disables, deletes, and recreates the table.
2103    * @param tableName table which must exist.
2104    * @param preserveRegions keep the existing split points
2105    * @return HTable for the new table
2106    */
2107   public HTable truncateTable(final byte[] tableName, final boolean preserveRegions)
2108       throws IOException {
2109     return truncateTable(TableName.valueOf(tableName), preserveRegions);
2110   }
2111 
2112   /**
2113    * Truncate a table using the admin command.
2114    * Effectively disables, deletes, and recreates the table.
2115    * For previous behavior of issuing row deletes, see
2116    * deleteTableData.
2117    * Expressly does not preserve regions of existing table.
2118    * @param tableName table which must exist.
2119    * @return HTable for the new table
2120    */
2121   public HTable truncateTable(final byte[] tableName) throws IOException {
2122     return truncateTable(tableName, false);
2123   }
2124 
2125   /**
2126    * Load table with rows from 'aaa' to 'zzz'.
2127    * @param t Table
2128    * @param f Family
2129    * @return Count of rows loaded.
2130    * @throws IOException
2131    */
2132   public int loadTable(final Table t, final byte[] f) throws IOException {
2133     return loadTable(t, new byte[][] {f});
2134   }
2135 
2136   /**
2137    * Load table with rows from 'aaa' to 'zzz'.
2138    * @param t Table
2139    * @param f Family
2140    * @return Count of rows loaded.
2141    * @throws IOException
2142    */
2143   public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
2144     return loadTable(t, new byte[][] {f}, null, writeToWAL);
2145   }
2146 
2147   /**
2148    * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2149    * @param t Table
2150    * @param f Array of Families to load
2151    * @return Count of rows loaded.
2152    * @throws IOException
2153    */
2154   public int loadTable(final Table t, final byte[][] f) throws IOException {
2155     return loadTable(t, f, null);
2156   }
2157 
2158   /**
2159    * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2160    * @param t Table
2161    * @param f Array of Families to load
2162    * @param value the values of the cells. If null is passed, the row key is used as value
2163    * @return Count of rows loaded.
2164    * @throws IOException
2165    */
2166   public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
2167     return loadTable(t, f, value, true);
2168   }
2169 
2170   /**
2171    * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2172    * @param t Table
2173    * @param f Array of Families to load
2174    * @param value the values of the cells. If null is passed, the row key is used as value
2175    * @return Count of rows loaded.
2176    * @throws IOException
2177    */
2178   public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL) throws IOException {
2179     List<Put> puts = new ArrayList<>();
2180     for (byte[] row : HBaseTestingUtility.ROWS) {
2181       Put put = new Put(row);
2182       put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
2183       for (int i = 0; i < f.length; i++) {
2184         put.add(f[i], null, value != null ? value : row);
2185       }
2186       puts.add(put);
2187     }
2188     t.put(puts);
2189     return puts.size();
2190   }
2191 
2192   /** A tracker for tracking and validating table rows
2193    * generated with {@link HBaseTestingUtility#loadTable(HTable, byte[])}
2194    */
2195   public static class SeenRowTracker {
2196     int dim = 'z' - 'a' + 1;
2197     int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen
2198     byte[] startRow;
2199     byte[] stopRow;
2200 
2201     public SeenRowTracker(byte[] startRow, byte[] stopRow) {
2202       this.startRow = startRow;
2203       this.stopRow = stopRow;
2204     }
2205 
2206     void reset() {
2207       for (byte[] row : ROWS) {
2208         seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
2209       }
2210     }
2211 
2212     int i(byte b) {
2213       return b - 'a';
2214     }
2215 
2216     public void addRow(byte[] row) {
2217       seenRows[i(row[0])][i(row[1])][i(row[2])]++;
2218     }
2219 
2220     /** Validate that all the rows between startRow and stopRow are seen exactly once, and
2221      * all other rows none
2222      */
2223     public void validate() {
2224       for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2225         for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2226           for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2227             int count = seenRows[i(b1)][i(b2)][i(b3)];
2228             int expectedCount = 0;
2229             if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0
2230                 && Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) {
2231               expectedCount = 1;
2232             }
2233             if (count != expectedCount) {
2234               String row = new String(new byte[] {b1,b2,b3});
2235               throw new RuntimeException("Row:" + row + " has a seen count of " + count + " instead of " + expectedCount);
2236             }
2237           }
2238         }
2239       }
2240     }
2241   }
2242 
2243   public int loadRegion(final HRegion r, final byte[] f) throws IOException {
2244     return loadRegion(r, f, false);
2245   }
2246 
2247   public int loadRegion(final Region r, final byte[] f) throws IOException {
2248     return loadRegion((HRegion)r, f);
2249   }
2250 
2251   /**
2252    * Load region with rows from 'aaa' to 'zzz'.
2253    * @param r Region
2254    * @param f Family
2255    * @param flush flush the cache if true
2256    * @return Count of rows loaded.
2257    * @throws IOException
2258    */
2259   public int loadRegion(final HRegion r, final byte[] f, final boolean flush)
2260   throws IOException {
2261     byte[] k = new byte[3];
2262     int rowCount = 0;
2263     for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2264       for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2265         for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2266           k[0] = b1;
2267           k[1] = b2;
2268           k[2] = b3;
2269           Put put = new Put(k);
2270           put.setDurability(Durability.SKIP_WAL);
2271           put.add(f, null, k);
2272           if (r.getWAL() == null) {
2273             put.setDurability(Durability.SKIP_WAL);
2274           }
2275           int preRowCount = rowCount;
2276           int pause = 10;
2277           int maxPause = 1000;
2278           while (rowCount == preRowCount) {
2279             try {
2280               r.put(put);
2281               rowCount++;
2282             } catch (RegionTooBusyException e) {
2283               pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
2284               Threads.sleep(pause);
2285             }
2286           }
2287         }
2288       }
2289       if (flush) {
2290         r.flush(true);
2291       }
2292     }
2293     return rowCount;
2294   }
2295 
2296   public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2297       throws IOException {
2298     for (int i = startRow; i < endRow; i++) {
2299       byte[] data = Bytes.toBytes(String.valueOf(i));
2300       Put put = new Put(data);
2301       put.add(f, null, data);
2302       t.put(put);
2303     }
2304   }
2305 
2306   public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2307       int replicaId)
2308       throws IOException {
2309     for (int i = startRow; i < endRow; i++) {
2310       String failMsg = "Failed verification of row :" + i;
2311       byte[] data = Bytes.toBytes(String.valueOf(i));
2312       Get get = new Get(data);
2313       get.setReplicaId(replicaId);
2314       get.setConsistency(Consistency.TIMELINE);
2315       Result result = table.get(get);
2316       assertTrue(failMsg, result.containsColumn(f, null));
2317       assertEquals(failMsg, result.getColumnCells(f, null).size(), 1);
2318       Cell cell = result.getColumnLatestCell(f, null);
2319       assertTrue(failMsg,
2320         Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2321           cell.getValueLength()));
2322     }
2323   }
2324 
2325   public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2326       throws IOException {
2327     verifyNumericRows((HRegion)region, f, startRow, endRow);
2328   }
2329 
2330   public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2331       throws IOException {
2332     verifyNumericRows(region, f, startRow, endRow, true);
2333   }
2334 
2335   public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2336       final boolean present) throws IOException {
2337     verifyNumericRows((HRegion)region, f, startRow, endRow, present);
2338   }
2339 
2340   public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2341       final boolean present) throws IOException {
2342     for (int i = startRow; i < endRow; i++) {
2343       String failMsg = "Failed verification of row :" + i;
2344       byte[] data = Bytes.toBytes(String.valueOf(i));
2345       Result result = region.get(new Get(data));
2346 
2347       boolean hasResult = result != null && !result.isEmpty();
2348       assertEquals(failMsg + result, present, hasResult);
2349       if (!present) continue;
2350 
2351       assertTrue(failMsg, result.containsColumn(f, null));
2352       assertEquals(failMsg, result.getColumnCells(f, null).size(), 1);
2353       Cell cell = result.getColumnLatestCell(f, null);
2354       assertTrue(failMsg,
2355         Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2356           cell.getValueLength()));
2357     }
2358   }
2359 
2360   public void deleteNumericRows(final HTable t, final byte[] f, int startRow, int endRow)
2361       throws IOException {
2362     for (int i = startRow; i < endRow; i++) {
2363       byte[] data = Bytes.toBytes(String.valueOf(i));
2364       Delete delete = new Delete(data);
2365       delete.deleteFamily(f);
2366       t.delete(delete);
2367     }
2368   }
2369 
2370   /**
2371    * Return the number of rows in the given table.
2372    */
2373   public int countRows(final Table table) throws IOException {
2374     Scan scan = new Scan();
2375     ResultScanner results = table.getScanner(scan);
2376     int count = 0;
2377     for (@SuppressWarnings("unused") Result res : results) {
2378       count++;
2379     }
2380     results.close();
2381     return count;
2382   }
2383 
2384   public int countRows(final Table table, final byte[]... families) throws IOException {
2385     Scan scan = new Scan();
2386     for (byte[] family: families) {
2387       scan.addFamily(family);
2388     }
2389     ResultScanner results = table.getScanner(scan);
2390     int count = 0;
2391     for (@SuppressWarnings("unused") Result res : results) {
2392       count++;
2393     }
2394     results.close();
2395     return count;
2396   }
2397 
2398   /**
2399    * Return the number of rows in the given table.
2400    */
2401   public int countRows(final TableName tableName) throws IOException {
2402     Table table = getConnection().getTable(tableName);
2403     try {
2404       return countRows(table);
2405     } finally {
2406       table.close();
2407     }
2408   }
2409 
2410   /**
2411    * Return an md5 digest of the entire contents of a table.
2412    */
2413   public String checksumRows(final Table table) throws Exception {
2414     Scan scan = new Scan();
2415     ResultScanner results = table.getScanner(scan);
2416     MessageDigest digest = MessageDigest.getInstance("MD5");
2417     for (Result res : results) {
2418       digest.update(res.getRow());
2419     }
2420     results.close();
2421     return digest.toString();
2422   }
2423 
2424   /** All the row values for the data loaded by {@link #loadTable(HTable, byte[])} */
2425   public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2426   static {
2427     int i = 0;
2428     for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2429       for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2430         for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2431           ROWS[i][0] = b1;
2432           ROWS[i][1] = b2;
2433           ROWS[i][2] = b3;
2434           i++;
2435         }
2436       }
2437     }
2438   }
2439 
2440   public static final byte[][] KEYS = {
2441     HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2442     Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2443     Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2444     Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2445     Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2446     Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2447     Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2448     Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2449     Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
2450   };
2451 
2452   public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = {
2453       Bytes.toBytes("bbb"),
2454       Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2455       Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2456       Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2457       Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2458       Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2459       Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2460       Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2461       Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz")
2462   };
2463 
2464   /**
2465    * Create rows in hbase:meta for regions of the specified table with the specified
2466    * start keys.  The first startKey should be a 0 length byte array if you
2467    * want to form a proper range of regions.
2468    * @param conf
2469    * @param htd
2470    * @param startKeys
2471    * @return list of region info for regions added to meta
2472    * @throws IOException
2473    */
2474   public List<HRegionInfo> createMultiRegionsInMeta(final Configuration conf,
2475       final HTableDescriptor htd, byte [][] startKeys)
2476   throws IOException {
2477     Table meta = new HTable(conf, TableName.META_TABLE_NAME);
2478     Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2479     List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length);
2480     // add custom ones
2481     for (int i = 0; i < startKeys.length; i++) {
2482       int j = (i + 1) % startKeys.length;
2483       HRegionInfo hri = new HRegionInfo(htd.getTableName(), startKeys[i],
2484           startKeys[j]);
2485       MetaTableAccessor.addRegionToMeta(meta, hri);
2486       newRegions.add(hri);
2487     }
2488 
2489     meta.close();
2490     return newRegions;
2491   }
2492 
2493   /**
2494    * Returns all rows from the hbase:meta table.
2495    *
2496    * @throws IOException When reading the rows fails.
2497    */
2498   public List<byte[]> getMetaTableRows() throws IOException {
2499     // TODO: Redo using MetaTableAccessor class
2500     Table t = new HTable(new Configuration(this.conf), TableName.META_TABLE_NAME);
2501     List<byte[]> rows = new ArrayList<byte[]>();
2502     ResultScanner s = t.getScanner(new Scan());
2503     for (Result result : s) {
2504       LOG.info("getMetaTableRows: row -> " +
2505         Bytes.toStringBinary(result.getRow()));
2506       rows.add(result.getRow());
2507     }
2508     s.close();
2509     t.close();
2510     return rows;
2511   }
2512 
2513   /**
2514    * Returns all rows from the hbase:meta table for a given user table
2515    *
2516    * @throws IOException When reading the rows fails.
2517    */
2518   public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
2519     // TODO: Redo using MetaTableAccessor.
2520     Table t = new HTable(new Configuration(this.conf), TableName.META_TABLE_NAME);
2521     List<byte[]> rows = new ArrayList<byte[]>();
2522     ResultScanner s = t.getScanner(new Scan());
2523     for (Result result : s) {
2524       HRegionInfo info = HRegionInfo.getHRegionInfo(result);
2525       if (info == null) {
2526         LOG.error("No region info for row " + Bytes.toString(result.getRow()));
2527         // TODO figure out what to do for this new hosed case.
2528         continue;
2529       }
2530 
2531       if (info.getTable().equals(tableName)) {
2532         LOG.info("getMetaTableRows: row -> " +
2533             Bytes.toStringBinary(result.getRow()) + info);
2534         rows.add(result.getRow());
2535       }
2536     }
2537     s.close();
2538     t.close();
2539     return rows;
2540   }
2541 
2542   /**
2543    * Tool to get the reference to the region server object that holds the
2544    * region of the specified user table.
2545    * It first searches for the meta rows that contain the region of the
2546    * specified table, then gets the index of that RS, and finally retrieves
2547    * the RS's reference.
2548    * @param tableName user table to lookup in hbase:meta
2549    * @return region server that holds it, null if the row doesn't exist
2550    * @throws IOException
2551    * @throws InterruptedException
2552    */
2553   public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2554       throws IOException, InterruptedException {
2555     List<byte[]> metaRows = getMetaTableRows(tableName);
2556     if (metaRows == null || metaRows.isEmpty()) {
2557       return null;
2558     }
2559     LOG.debug("Found " + metaRows.size() + " rows for table " +
2560       tableName);
2561     byte [] firstrow = metaRows.get(0);
2562     LOG.debug("FirstRow=" + Bytes.toString(firstrow));
2563     long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2564       HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2565     int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2566       HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2567     RetryCounter retrier = new RetryCounter(numRetries+1, (int)pause, TimeUnit.MICROSECONDS);
2568     while(retrier.shouldRetry()) {
2569       int index = getMiniHBaseCluster().getServerWith(firstrow);
2570       if (index != -1) {
2571         return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2572       }
2573       // Came back -1.  Region may not be online yet.  Sleep a while.
2574       retrier.sleepUntilNextRetry();
2575     }
2576     return null;
2577   }
2578 
2579   /**
2580    * Starts a <code>MiniMRCluster</code> with a default number of
2581    * <code>TaskTracker</code>'s.
2582    *
2583    * @throws IOException When starting the cluster fails.
2584    */
2585   public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2586     startMiniMapReduceCluster(2);
2587     return mrCluster;
2588   }
2589 
2590   /**
2591    * Tasktracker has a bug where changing the hadoop.log.dir system property
2592    * will not change its internal static LOG_DIR variable.
2593    */
2594   private void forceChangeTaskLogDir() {
2595     Field logDirField;
2596     try {
2597       logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2598       logDirField.setAccessible(true);
2599 
2600       Field modifiersField = Field.class.getDeclaredField("modifiers");
2601       modifiersField.setAccessible(true);
2602       modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2603 
2604       logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2605     } catch (SecurityException e) {
2606       throw new RuntimeException(e);
2607     } catch (NoSuchFieldException e) {
2608       // TODO Auto-generated catch block
2609       throw new RuntimeException(e);
2610     } catch (IllegalArgumentException e) {
2611       throw new RuntimeException(e);
2612     } catch (IllegalAccessException e) {
2613       throw new RuntimeException(e);
2614     }
2615   }
2616 
2617   /**
2618    * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2619    * filesystem.
2620    * @param servers  The number of <code>TaskTracker</code>'s to start.
2621    * @throws IOException When starting the cluster fails.
2622    */
2623   private void startMiniMapReduceCluster(final int servers) throws IOException {
2624     if (mrCluster != null) {
2625       throw new IllegalStateException("MiniMRCluster is already running");
2626     }
2627     LOG.info("Starting mini mapreduce cluster...");
2628     setupClusterTestDir();
2629     createDirsAndSetProperties();
2630 
2631     forceChangeTaskLogDir();
2632 
2633     //// hadoop2 specific settings
2634     // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2635     // we up the VM usable so that processes don't get killed.
2636     conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2637 
2638     // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2639     // this avoids the problem by disabling speculative task execution in tests.
2640     conf.setBoolean("mapreduce.map.speculative", false);
2641     conf.setBoolean("mapreduce.reduce.speculative", false);
2642     ////
2643 
2644     // Allow the user to override FS URI for this map-reduce cluster to use.
2645     mrCluster = new MiniMRCluster(servers,
2646       FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 1,
2647       null, null, new JobConf(this.conf));
2648     JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2649     if (jobConf == null) {
2650       jobConf = mrCluster.createJobConf();
2651     }
2652 
2653     jobConf.set("mapreduce.cluster.local.dir",
2654       conf.get("mapreduce.cluster.local.dir")); //Hadoop MiniMR overwrites this while it should not
2655     LOG.info("Mini mapreduce cluster started");
2656 
2657     // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2658     // Our HBase MR jobs need several of these settings in order to properly run.  So we copy the
2659     // necessary config properties here.  YARN-129 required adding a few properties.
2660     conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2661     // this for mrv2 support; mr1 ignores this
2662     conf.set("mapreduce.framework.name", "yarn");
2663     conf.setBoolean("yarn.is.minicluster", true);
2664     String rmAddress = jobConf.get("yarn.resourcemanager.address");
2665     if (rmAddress != null) {
2666       conf.set("yarn.resourcemanager.address", rmAddress);
2667     }
2668     String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2669     if (historyAddress != null) {
2670       conf.set("mapreduce.jobhistory.address", historyAddress);
2671     }
2672     String schedulerAddress =
2673       jobConf.get("yarn.resourcemanager.scheduler.address");
2674     if (schedulerAddress != null) {
2675       conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2676     }
2677   }
2678 
2679   /**
2680    * Stops the previously started <code>MiniMRCluster</code>.
2681    */
2682   public void shutdownMiniMapReduceCluster() {
2683     if (mrCluster != null) {
2684       LOG.info("Stopping mini mapreduce cluster...");
2685       mrCluster.shutdown();
2686       mrCluster = null;
2687       LOG.info("Mini mapreduce cluster stopped");
2688     }
2689     // Restore configuration to point to local jobtracker
2690     conf.set("mapreduce.jobtracker.address", "local");
2691   }
2692 
2693   /**
2694    * Create a stubbed out RegionServerService, mainly for getting FS.
2695    */
2696   public RegionServerServices createMockRegionServerService() throws IOException {
2697     return createMockRegionServerService((ServerName)null);
2698   }
2699 
2700   /**
2701    * Create a stubbed out RegionServerService, mainly for getting FS.
2702    * This version is used by TestTokenAuthentication
2703    */
2704   public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) throws IOException {
2705     final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2706     rss.setFileSystem(getTestFileSystem());
2707     rss.setRpcServer(rpc);
2708     return rss;
2709   }
2710 
2711   /**
2712    * Create a stubbed out RegionServerService, mainly for getting FS.
2713    * This version is used by TestOpenRegionHandler
2714    */
2715   public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2716     final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2717     rss.setFileSystem(getTestFileSystem());
2718     return rss;
2719   }
2720 
2721   /**
2722    * Switches the logger for the given class to DEBUG level.
2723    *
2724    * @param clazz  The class for which to switch to debug logging.
2725    */
2726   public void enableDebug(Class<?> clazz) {
2727     Log l = LogFactory.getLog(clazz);
2728     if (l instanceof Log4JLogger) {
2729       ((Log4JLogger) l).getLogger().setLevel(org.apache.log4j.Level.DEBUG);
2730     } else if (l instanceof Jdk14Logger) {
2731       ((Jdk14Logger) l).getLogger().setLevel(java.util.logging.Level.ALL);
2732     }
2733   }
2734 
2735   /**
2736    * Expire the Master's session
2737    * @throws Exception
2738    */
2739   public void expireMasterSession() throws Exception {
2740     HMaster master = getMiniHBaseCluster().getMaster();
2741     expireSession(master.getZooKeeper(), false);
2742   }
2743 
2744   /**
2745    * Expire a region server's session
2746    * @param index which RS
2747    * @throws Exception
2748    */
2749   public void expireRegionServerSession(int index) throws Exception {
2750     HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2751     expireSession(rs.getZooKeeper(), false);
2752     decrementMinRegionServerCount();
2753   }
2754 
2755   private void decrementMinRegionServerCount() {
2756     // decrement the count for this.conf, for newly spwaned master
2757     // this.hbaseCluster shares this configuration too
2758     decrementMinRegionServerCount(getConfiguration());
2759 
2760     // each master thread keeps a copy of configuration
2761     for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2762       decrementMinRegionServerCount(master.getMaster().getConfiguration());
2763     }
2764   }
2765 
2766   private void decrementMinRegionServerCount(Configuration conf) {
2767     int currentCount = conf.getInt(
2768         ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2769     if (currentCount != -1) {
2770       conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
2771           Math.max(currentCount - 1, 1));
2772     }
2773   }
2774 
2775   public void expireSession(ZooKeeperWatcher nodeZK) throws Exception {
2776    expireSession(nodeZK, false);
2777   }
2778 
2779   @Deprecated
2780   public void expireSession(ZooKeeperWatcher nodeZK, Server server)
2781     throws Exception {
2782     expireSession(nodeZK, false);
2783   }
2784 
2785   /**
2786    * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2787    * http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A4
2788    * There are issues when doing this:
2789    * [1] http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html
2790    * [2] https://issues.apache.org/jira/browse/ZOOKEEPER-1105
2791    *
2792    * @param nodeZK - the ZK watcher to expire
2793    * @param checkStatus - true to check if we can create an HTable with the
2794    *                    current configuration.
2795    */
2796   public void expireSession(ZooKeeperWatcher nodeZK, boolean checkStatus)
2797     throws Exception {
2798     Configuration c = new Configuration(this.conf);
2799     String quorumServers = ZKConfig.getZKQuorumServersString(c);
2800     ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2801     byte[] password = zk.getSessionPasswd();
2802     long sessionID = zk.getSessionId();
2803 
2804     // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2805     //  so we create a first watcher to be sure that the
2806     //  event was sent. We expect that if our watcher receives the event
2807     //  other watchers on the same machine will get is as well.
2808     // When we ask to close the connection, ZK does not close it before
2809     //  we receive all the events, so don't have to capture the event, just
2810     //  closing the connection should be enough.
2811     ZooKeeper monitor = new ZooKeeper(quorumServers,
2812       1000, new org.apache.zookeeper.Watcher(){
2813       @Override
2814       public void process(WatchedEvent watchedEvent) {
2815         LOG.info("Monitor ZKW received event="+watchedEvent);
2816       }
2817     } , sessionID, password);
2818 
2819     // Making it expire
2820     ZooKeeper newZK = new ZooKeeper(quorumServers,
2821         1000, EmptyWatcher.instance, sessionID, password);
2822 
2823     //ensure that we have connection to the server before closing down, otherwise
2824     //the close session event will be eaten out before we start CONNECTING state
2825     long start = System.currentTimeMillis();
2826     while (newZK.getState() != States.CONNECTED
2827          && System.currentTimeMillis() - start < 1000) {
2828        Thread.sleep(1);
2829     }
2830     newZK.close();
2831     LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2832 
2833     // Now closing & waiting to be sure that the clients get it.
2834     monitor.close();
2835 
2836     if (checkStatus) {
2837       new HTable(new Configuration(conf), TableName.META_TABLE_NAME).close();
2838     }
2839   }
2840 
2841   /**
2842    * Get the Mini HBase cluster.
2843    *
2844    * @return hbase cluster
2845    * @see #getHBaseClusterInterface()
2846    */
2847   public MiniHBaseCluster getHBaseCluster() {
2848     return getMiniHBaseCluster();
2849   }
2850 
2851   /**
2852    * Returns the HBaseCluster instance.
2853    * <p>Returned object can be any of the subclasses of HBaseCluster, and the
2854    * tests referring this should not assume that the cluster is a mini cluster or a
2855    * distributed one. If the test only works on a mini cluster, then specific
2856    * method {@link #getMiniHBaseCluster()} can be used instead w/o the
2857    * need to type-cast.
2858    */
2859   public HBaseCluster getHBaseClusterInterface() {
2860     //implementation note: we should rename this method as #getHBaseCluster(),
2861     //but this would require refactoring 90+ calls.
2862     return hbaseCluster;
2863   }
2864 
2865   /**
2866    * Get a Connection to the cluster.
2867    * Not thread-safe (This class needs a lot of work to make it thread-safe).
2868    * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2869    * @throws IOException
2870    */
2871   public Connection getConnection() throws IOException {
2872     if (this.connection == null) {
2873       this.connection = ConnectionFactory.createConnection(this.conf);
2874     }
2875     return this.connection;
2876   }
2877 
2878   /**
2879    * Returns a Admin instance.
2880    * This instance is shared between HBaseTestingUtility instance users.
2881    * Closing it has no effect, it will be closed automatically when the
2882    * cluster shutdowns
2883    *
2884    * @return An Admin instance.
2885    * @throws IOException
2886    */
2887   public synchronized HBaseAdmin getHBaseAdmin()
2888   throws IOException {
2889     if (hbaseAdmin == null){
2890       this.hbaseAdmin = new HBaseAdminForTests(getConnection());
2891     }
2892     return hbaseAdmin;
2893   }
2894 
2895   private HBaseAdminForTests hbaseAdmin = null;
2896   private static class HBaseAdminForTests extends HBaseAdmin {
2897     public HBaseAdminForTests(Connection connection) throws MasterNotRunningException,
2898         ZooKeeperConnectionException, IOException {
2899       super(connection);
2900     }
2901 
2902     @Override
2903     public synchronized void close() throws IOException {
2904       LOG.warn("close() called on HBaseAdmin instance returned from " +
2905         "HBaseTestingUtility.getHBaseAdmin()");
2906     }
2907 
2908     private synchronized void close0() throws IOException {
2909       super.close();
2910     }
2911   }
2912 
2913   /**
2914    * Returns a ZooKeeperWatcher instance.
2915    * This instance is shared between HBaseTestingUtility instance users.
2916    * Don't close it, it will be closed automatically when the
2917    * cluster shutdowns
2918    *
2919    * @return The ZooKeeperWatcher instance.
2920    * @throws IOException
2921    */
2922   public synchronized ZooKeeperWatcher getZooKeeperWatcher()
2923     throws IOException {
2924     if (zooKeeperWatcher == null) {
2925       zooKeeperWatcher = new ZooKeeperWatcher(conf, "testing utility",
2926         new Abortable() {
2927         @Override public void abort(String why, Throwable e) {
2928           throw new RuntimeException("Unexpected abort in HBaseTestingUtility:"+why, e);
2929         }
2930         @Override public boolean isAborted() {return false;}
2931       });
2932     }
2933     return zooKeeperWatcher;
2934   }
2935   private ZooKeeperWatcher zooKeeperWatcher;
2936 
2937 
2938 
2939   /**
2940    * Closes the named region.
2941    *
2942    * @param regionName  The region to close.
2943    * @throws IOException
2944    */
2945   public void closeRegion(String regionName) throws IOException {
2946     closeRegion(Bytes.toBytes(regionName));
2947   }
2948 
2949   /**
2950    * Closes the named region.
2951    *
2952    * @param regionName  The region to close.
2953    * @throws IOException
2954    */
2955   public void closeRegion(byte[] regionName) throws IOException {
2956     getHBaseAdmin().closeRegion(regionName, null);
2957   }
2958 
2959   /**
2960    * Closes the region containing the given row.
2961    *
2962    * @param row  The row to find the containing region.
2963    * @param table  The table to find the region.
2964    * @throws IOException
2965    */
2966   public void closeRegionByRow(String row, RegionLocator table) throws IOException {
2967     closeRegionByRow(Bytes.toBytes(row), table);
2968   }
2969 
2970   /**
2971    * Closes the region containing the given row.
2972    *
2973    * @param row  The row to find the containing region.
2974    * @param table  The table to find the region.
2975    * @throws IOException
2976    */
2977   public void closeRegionByRow(byte[] row, RegionLocator table) throws IOException {
2978     HRegionLocation hrl = table.getRegionLocation(row);
2979     closeRegion(hrl.getRegionInfo().getRegionName());
2980   }
2981 
2982   /*
2983    * Retrieves a splittable region randomly from tableName
2984    *
2985    * @param tableName name of table
2986    * @param maxAttempts maximum number of attempts, unlimited for value of -1
2987    * @return the HRegion chosen, null if none was found within limit of maxAttempts
2988    */
2989   public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2990     List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2991     int regCount = regions.size();
2992     Set<Integer> attempted = new HashSet<Integer>();
2993     int idx;
2994     int attempts = 0;
2995     do {
2996       regions = getHBaseCluster().getRegions(tableName);
2997       if (regCount != regions.size()) {
2998         // if there was region movement, clear attempted Set
2999         attempted.clear();
3000       }
3001       regCount = regions.size();
3002       // There are chances that before we get the region for the table from an RS the region may
3003       // be going for CLOSE.  This may be because online schema change is enabled
3004       if (regCount > 0) {
3005         idx = random.nextInt(regCount);
3006         // if we have just tried this region, there is no need to try again
3007         if (attempted.contains(idx))
3008           continue;
3009         try {
3010           regions.get(idx).checkSplit();
3011           return regions.get(idx);
3012         } catch (Exception ex) {
3013           LOG.warn("Caught exception", ex);
3014           attempted.add(idx);
3015         }
3016       }
3017       attempts++;
3018     } while (maxAttempts == -1 || attempts < maxAttempts);
3019     return null;
3020   }
3021 
3022   public MiniZooKeeperCluster getZkCluster() {
3023     return zkCluster;
3024   }
3025 
3026   public void setZkCluster(MiniZooKeeperCluster zkCluster) {
3027     this.passedZkCluster = true;
3028     this.zkCluster = zkCluster;
3029     conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkCluster.getClientPort());
3030   }
3031 
3032   public MiniDFSCluster getDFSCluster() {
3033     return dfsCluster;
3034   }
3035 
3036   public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
3037     setDFSCluster(cluster, true);
3038   }
3039 
3040   /**
3041    * Set the MiniDFSCluster
3042    * @param cluster cluster to use
3043    * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before
3044    * it is set.
3045    * @throws IllegalStateException if the passed cluster is up when it is required to be down
3046    * @throws IOException if the FileSystem could not be set from the passed dfs cluster
3047    */
3048   public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
3049       throws IllegalStateException, IOException {
3050     if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
3051       throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
3052     }
3053     this.dfsCluster = cluster;
3054     this.setFs();
3055   }
3056 
3057   public FileSystem getTestFileSystem() throws IOException {
3058     return HFileSystem.get(conf);
3059   }
3060 
3061   /**
3062    * Wait until all regions in a table have been assigned.  Waits default timeout before giving up
3063    * (30 seconds).
3064    * @param table Table to wait on.
3065    * @throws InterruptedException
3066    * @throws IOException
3067    */
3068   public void waitTableAvailable(TableName table)
3069       throws InterruptedException, IOException {
3070     waitTableAvailable(table.getName(), 30000);
3071   }
3072 
3073   public void waitTableAvailable(TableName table, long timeoutMillis)
3074       throws InterruptedException, IOException {
3075     waitFor(timeoutMillis, predicateTableAvailable(table));
3076   }
3077 
3078   public String explainTableAvailability(TableName tableName) throws IOException {
3079     String msg = explainTableState(tableName) + ",";
3080     if (getHBaseCluster().getMaster().isAlive()) {
3081       Map<HRegionInfo, ServerName> assignments =
3082           getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
3083               .getRegionAssignments();
3084       final List<Pair<HRegionInfo, ServerName>> metaLocations =
3085           MetaTableAccessor
3086               .getTableRegionsAndLocations(getZooKeeperWatcher(), connection, tableName);
3087       for (Pair<HRegionInfo, ServerName> metaLocation : metaLocations) {
3088         HRegionInfo hri = metaLocation.getFirst();
3089         ServerName sn = metaLocation.getSecond();
3090         if (!assignments.containsKey(hri)) {
3091           msg += ", region " + hri
3092               + " not assigned, but found in meta, it expected to be on " + sn;
3093 
3094         } else if (sn == null) {
3095           msg += ",  region " + hri
3096               + " assigned,  but has no server in meta";
3097         } else if (!sn.equals(assignments.get(hri))) {
3098           msg += ",  region " + hri
3099               + " assigned,  but has different servers in meta and AM ( " +
3100               sn + " <> " + assignments.get(hri);
3101         }
3102       }
3103     }
3104     return msg;
3105   }
3106 
3107   public String explainTableState(TableName tableName) throws IOException {
3108     try {
3109       if (getHBaseAdmin().isTableEnabled(tableName))
3110         return "table enabled in zk";
3111       else if (getHBaseAdmin().isTableDisabled(tableName))
3112         return "table disabled in zk";
3113       else
3114         return "table in uknown state";
3115     } catch (TableNotFoundException e) {
3116       return "table not exists";
3117     }
3118   }
3119 
3120   /**
3121    * Wait until all regions in a table have been assigned
3122    * @param table Table to wait on.
3123    * @param timeoutMillis Timeout.
3124    * @throws InterruptedException
3125    * @throws IOException
3126    */
3127   public void waitTableAvailable(byte[] table, long timeoutMillis)
3128   throws InterruptedException, IOException {
3129     waitTableAvailable(getHBaseAdmin(), table, timeoutMillis);
3130   }
3131 
3132   public void waitTableAvailable(Admin admin, byte[] table, long timeoutMillis)
3133   throws InterruptedException, IOException {
3134     long startWait = System.currentTimeMillis();
3135     while (!admin.isTableAvailable(TableName.valueOf(table))) {
3136       assertTrue("Timed out waiting for table to become available " +
3137         Bytes.toStringBinary(table),
3138         System.currentTimeMillis() - startWait < timeoutMillis);
3139       Thread.sleep(200);
3140     }
3141   }
3142 
3143   /**
3144    * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
3145    * regions have been all assigned.  Will timeout after default period (30 seconds)
3146    * @see #waitTableAvailable(byte[])
3147    * @param table Table to wait on.
3148    * @param table
3149    * @throws InterruptedException
3150    * @throws IOException
3151    */
3152   public void waitTableEnabled(TableName table)
3153       throws InterruptedException, IOException {
3154     waitTableEnabled(table, 30000);
3155   }
3156 
3157   /**
3158    * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
3159    * regions have been all assigned.
3160    * @see #waitTableAvailable(byte[])
3161    * @param table Table to wait on.
3162    * @param timeoutMillis Time to wait on it being marked enabled.
3163    * @throws InterruptedException
3164    * @throws IOException
3165    */
3166   public void waitTableEnabled(byte[] table, long timeoutMillis)
3167   throws InterruptedException, IOException {
3168     waitTableEnabled(TableName.valueOf(table), timeoutMillis);
3169   }
3170 
3171   public void waitTableEnabled(TableName table, long timeoutMillis)
3172       throws IOException {
3173     waitFor(timeoutMillis, predicateTableEnabled(table));
3174   }
3175 
3176   /**
3177    * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
3178    * Will timeout after default period (30 seconds)
3179    * @param table Table to wait on.
3180    * @throws InterruptedException
3181    * @throws IOException
3182    */
3183   public void waitTableDisabled(byte[] table)
3184       throws InterruptedException, IOException {
3185     waitTableDisabled(getHBaseAdmin(), table, 30000);
3186   }
3187 
3188   public void waitTableDisabled(Admin admin, byte[] table)
3189       throws InterruptedException, IOException {
3190     waitTableDisabled(admin, table, 30000);
3191   }
3192 
3193   /**
3194    * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
3195    * @param table Table to wait on.
3196    * @param timeoutMillis Time to wait on it being marked disabled.
3197    * @throws InterruptedException
3198    * @throws IOException
3199    */
3200   public void waitTableDisabled(byte[] table, long timeoutMillis)
3201       throws InterruptedException, IOException {
3202     waitTableDisabled(getHBaseAdmin(), table, timeoutMillis);
3203   }
3204 
3205   public void waitTableDisabled(Admin admin, byte[] table, long timeoutMillis)
3206       throws InterruptedException, IOException {
3207     TableName tableName = TableName.valueOf(table);
3208     long startWait = System.currentTimeMillis();
3209     while (!admin.isTableDisabled(tableName)) {
3210       assertTrue("Timed out waiting for table to become disabled " +
3211               Bytes.toStringBinary(table),
3212           System.currentTimeMillis() - startWait < timeoutMillis);
3213       Thread.sleep(200);
3214     }
3215   }
3216 
3217   /**
3218    * Make sure that at least the specified number of region servers
3219    * are running
3220    * @param num minimum number of region servers that should be running
3221    * @return true if we started some servers
3222    * @throws IOException
3223    */
3224   public boolean ensureSomeRegionServersAvailable(final int num)
3225       throws IOException {
3226     boolean startedServer = false;
3227     MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
3228     for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i<num; ++i) {
3229       LOG.info("Started new server=" + hbaseCluster.startRegionServer());
3230       startedServer = true;
3231     }
3232 
3233     return startedServer;
3234   }
3235 
3236 
3237   /**
3238    * Make sure that at least the specified number of region servers
3239    * are running. We don't count the ones that are currently stopping or are
3240    * stopped.
3241    * @param num minimum number of region servers that should be running
3242    * @return true if we started some servers
3243    * @throws IOException
3244    */
3245   public boolean ensureSomeNonStoppedRegionServersAvailable(final int num)
3246     throws IOException {
3247     boolean startedServer = ensureSomeRegionServersAvailable(num);
3248 
3249     int nonStoppedServers = 0;
3250     for (JVMClusterUtil.RegionServerThread rst :
3251       getMiniHBaseCluster().getRegionServerThreads()) {
3252 
3253       HRegionServer hrs = rst.getRegionServer();
3254       if (hrs.isStopping() || hrs.isStopped()) {
3255         LOG.info("A region server is stopped or stopping:"+hrs);
3256       } else {
3257         nonStoppedServers++;
3258       }
3259     }
3260     for (int i=nonStoppedServers; i<num; ++i) {
3261       LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
3262       startedServer = true;
3263     }
3264     return startedServer;
3265   }
3266 
3267 
3268   /**
3269    * This method clones the passed <code>c</code> configuration setting a new
3270    * user into the clone.  Use it getting new instances of FileSystem.  Only
3271    * works for DistributedFileSystem w/o Kerberos.
3272    * @param c Initial configuration
3273    * @param differentiatingSuffix Suffix to differentiate this user from others.
3274    * @return A new configuration instance with a different user set into it.
3275    * @throws IOException
3276    */
3277   public static User getDifferentUser(final Configuration c,
3278     final String differentiatingSuffix)
3279   throws IOException {
3280     FileSystem currentfs = FileSystem.get(c);
3281     if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
3282       return User.getCurrent();
3283     }
3284     // Else distributed filesystem.  Make a new instance per daemon.  Below
3285     // code is taken from the AppendTestUtil over in hdfs.
3286     String username = User.getCurrent().getName() +
3287       differentiatingSuffix;
3288     User user = User.createUserForTesting(c, username,
3289         new String[]{"supergroup"});
3290     return user;
3291   }
3292 
3293   public static NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
3294       throws IOException {
3295     NavigableSet<String> online = new TreeSet<String>();
3296     for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
3297       try {
3298         for (HRegionInfo region :
3299             ProtobufUtil.getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
3300           online.add(region.getRegionNameAsString());
3301         }
3302       } catch (RegionServerStoppedException e) {
3303         // That's fine.
3304       }
3305     }
3306     for (MasterThread mt : cluster.getLiveMasterThreads()) {
3307       try {
3308         for (HRegionInfo region :
3309             ProtobufUtil.getOnlineRegions(mt.getMaster().getRSRpcServices())) {
3310           online.add(region.getRegionNameAsString());
3311         }
3312       } catch (RegionServerStoppedException e) {
3313         // That's fine.
3314       } catch (ServerNotRunningYetException e) {
3315         // That's fine.
3316       }
3317     }
3318     return online;
3319   }
3320 
3321   /**
3322    * Set maxRecoveryErrorCount in DFSClient.  In 0.20 pre-append its hard-coded to 5 and
3323    * makes tests linger.  Here is the exception you'll see:
3324    * <pre>
3325    * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683 failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
3326    * </pre>
3327    * @param stream A DFSClient.DFSOutputStream.
3328    * @param max
3329    * @throws NoSuchFieldException
3330    * @throws SecurityException
3331    * @throws IllegalAccessException
3332    * @throws IllegalArgumentException
3333    */
3334   public static void setMaxRecoveryErrorCount(final OutputStream stream,
3335       final int max) {
3336     try {
3337       Class<?> [] clazzes = DFSClient.class.getDeclaredClasses();
3338       for (Class<?> clazz: clazzes) {
3339         String className = clazz.getSimpleName();
3340         if (className.equals("DFSOutputStream")) {
3341           if (clazz.isInstance(stream)) {
3342             Field maxRecoveryErrorCountField =
3343               stream.getClass().getDeclaredField("maxRecoveryErrorCount");
3344             maxRecoveryErrorCountField.setAccessible(true);
3345             maxRecoveryErrorCountField.setInt(stream, max);
3346             break;
3347           }
3348         }
3349       }
3350     } catch (Exception e) {
3351       LOG.info("Could not set max recovery field", e);
3352     }
3353   }
3354 
3355   /**
3356    * Wait until all regions for a table in hbase:meta have a non-empty
3357    * info:server, up to 60 seconds. This means all regions have been deployed,
3358    * master has been informed and updated hbase:meta with the regions deployed
3359    * server.
3360    * @param tableName the table name
3361    * @throws IOException
3362    */
3363   public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3364     waitUntilAllRegionsAssigned(tableName, 60000);
3365   }
3366 
3367   /**
3368    * Wait until all regions for a table in hbase:meta have a non-empty
3369    * info:server, or until timeout.  This means all regions have been deployed,
3370    * master has been informed and updated hbase:meta with the regions deployed
3371    * server.
3372    * @param tableName the table name
3373    * @param timeout timeout, in milliseconds
3374    * @throws IOException
3375    */
3376   public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3377       throws IOException {
3378     final Table meta = new HTable(getConfiguration(), TableName.META_TABLE_NAME);
3379     try {
3380       waitFor(timeout, 200, true, new Predicate<IOException>() {
3381         @Override
3382         public boolean evaluate() throws IOException {
3383           boolean allRegionsAssigned = true;
3384           Scan scan = new Scan();
3385           scan.addFamily(HConstants.CATALOG_FAMILY);
3386           ResultScanner s = meta.getScanner(scan);
3387           try {
3388             Result r;
3389             while ((r = s.next()) != null) {
3390               byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3391               HRegionInfo info = HRegionInfo.parseFromOrNull(b);
3392               if (info != null && info.getTable().equals(tableName)) {
3393                 b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3394                 allRegionsAssigned &= (b != null);
3395               }
3396             }
3397           } finally {
3398             s.close();
3399           }
3400           return allRegionsAssigned;
3401         }
3402       });
3403     } finally {
3404       meta.close();
3405     }
3406 
3407     // check from the master state if we are using a mini cluster
3408     if (!getHBaseClusterInterface().isDistributedCluster()) {
3409       // So, all regions are in the meta table but make sure master knows of the assignments before
3410       // returing -- sometimes this can lag.
3411       HMaster master = getHBaseCluster().getMaster();
3412       final RegionStates states = master.getAssignmentManager().getRegionStates();
3413       waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3414         @Override
3415         public String explainFailure() throws IOException {
3416           return explainTableAvailability(tableName);
3417         }
3418 
3419         @Override
3420         public boolean evaluate() throws IOException {
3421           List<HRegionInfo> hris = states.getRegionsOfTable(tableName);
3422           return hris != null && !hris.isEmpty();
3423         }
3424       });
3425     }
3426   }
3427 
3428   /**
3429    * Do a small get/scan against one store. This is required because store
3430    * has no actual methods of querying itself, and relies on StoreScanner.
3431    */
3432   public static List<Cell> getFromStoreFile(HStore store,
3433                                                 Get get) throws IOException {
3434     Scan scan = new Scan(get);
3435     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3436         scan.getFamilyMap().get(store.getFamily().getName()),
3437         // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3438         // readpoint 0.
3439         0);
3440 
3441     List<Cell> result = new ArrayList<Cell>();
3442     scanner.next(result);
3443     if (!result.isEmpty()) {
3444       // verify that we are on the row we want:
3445       Cell kv = result.get(0);
3446       if (!CellUtil.matchingRow(kv, get.getRow())) {
3447         result.clear();
3448       }
3449     }
3450     scanner.close();
3451     return result;
3452   }
3453 
3454   /**
3455    * Create region split keys between startkey and endKey
3456    *
3457    * @param startKey
3458    * @param endKey
3459    * @param numRegions the number of regions to be created. it has to be greater than 3.
3460    * @return
3461    */
3462   public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions){
3463     assertTrue(numRegions>3);
3464     byte [][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3465     byte [][] result = new byte[tmpSplitKeys.length+1][];
3466     System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3467     result[0] = HConstants.EMPTY_BYTE_ARRAY;
3468     return result;
3469   }
3470 
3471   /**
3472    * Do a small get/scan against one store. This is required because store
3473    * has no actual methods of querying itself, and relies on StoreScanner.
3474    */
3475   public static List<Cell> getFromStoreFile(HStore store,
3476                                                 byte [] row,
3477                                                 NavigableSet<byte[]> columns
3478                                                 ) throws IOException {
3479     Get get = new Get(row);
3480     Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3481     s.put(store.getFamily().getName(), columns);
3482 
3483     return getFromStoreFile(store,get);
3484   }
3485 
3486   /**
3487    * Gets a ZooKeeperWatcher.
3488    * @param TEST_UTIL
3489    */
3490   public static ZooKeeperWatcher getZooKeeperWatcher(
3491       HBaseTestingUtility TEST_UTIL) throws ZooKeeperConnectionException,
3492       IOException {
3493     ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
3494         "unittest", new Abortable() {
3495           boolean aborted = false;
3496 
3497           @Override
3498           public void abort(String why, Throwable e) {
3499             aborted = true;
3500             throw new RuntimeException("Fatal ZK error, why=" + why, e);
3501           }
3502 
3503           @Override
3504           public boolean isAborted() {
3505             return aborted;
3506           }
3507         });
3508     return zkw;
3509   }
3510 
3511   /**
3512    * Creates a znode with OPENED state.
3513    * @param TEST_UTIL
3514    * @param region
3515    * @param serverName
3516    * @return
3517    * @throws IOException
3518    * @throws org.apache.hadoop.hbase.ZooKeeperConnectionException
3519    * @throws KeeperException
3520    * @throws NodeExistsException
3521    */
3522   public static ZooKeeperWatcher createAndForceNodeToOpenedState(
3523       HBaseTestingUtility TEST_UTIL, Region region,
3524       ServerName serverName) throws ZooKeeperConnectionException,
3525       IOException, KeeperException, NodeExistsException {
3526     return createAndForceNodeToOpenedState(TEST_UTIL, (HRegion)region, serverName);
3527   }
3528 
3529   /**
3530    * Creates a znode with OPENED state.
3531    * @param TEST_UTIL
3532    * @param region
3533    * @param serverName
3534    * @return
3535    * @throws IOException
3536    * @throws org.apache.hadoop.hbase.ZooKeeperConnectionException
3537    * @throws KeeperException
3538    * @throws NodeExistsException
3539    */
3540   public static ZooKeeperWatcher createAndForceNodeToOpenedState(
3541       HBaseTestingUtility TEST_UTIL, HRegion region,
3542       ServerName serverName) throws ZooKeeperConnectionException,
3543       IOException, KeeperException, NodeExistsException {
3544     ZooKeeperWatcher zkw = getZooKeeperWatcher(TEST_UTIL);
3545     ZKAssign.createNodeOffline(zkw, region.getRegionInfo(), serverName);
3546     int version = ZKAssign.transitionNodeOpening(zkw, region
3547         .getRegionInfo(), serverName);
3548     ZKAssign.transitionNodeOpened(zkw, region.getRegionInfo(), serverName,
3549         version);
3550     return zkw;
3551   }
3552 
3553   public static void assertKVListsEqual(String additionalMsg,
3554       final List<? extends Cell> expected,
3555       final List<? extends Cell> actual) {
3556     final int eLen = expected.size();
3557     final int aLen = actual.size();
3558     final int minLen = Math.min(eLen, aLen);
3559 
3560     int i;
3561     for (i = 0; i < minLen
3562         && KeyValue.COMPARATOR.compare(expected.get(i), actual.get(i)) == 0;
3563         ++i) {}
3564 
3565     if (additionalMsg == null) {
3566       additionalMsg = "";
3567     }
3568     if (!additionalMsg.isEmpty()) {
3569       additionalMsg = ". " + additionalMsg;
3570     }
3571 
3572     if (eLen != aLen || i != minLen) {
3573       throw new AssertionError(
3574           "Expected and actual KV arrays differ at position " + i + ": " +
3575           safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
3576           safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
3577     }
3578   }
3579 
3580   public static <T> String safeGetAsStr(List<T> lst, int i) {
3581     if (0 <= i && i < lst.size()) {
3582       return lst.get(i).toString();
3583     } else {
3584       return "<out_of_range>";
3585     }
3586   }
3587 
3588   public String getClusterKey() {
3589     return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3590         + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":"
3591         + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
3592             HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3593   }
3594 
3595   /** Creates a random table with the given parameters */
3596   public HTable createRandomTable(String tableName,
3597       final Collection<String> families,
3598       final int maxVersions,
3599       final int numColsPerRow,
3600       final int numFlushes,
3601       final int numRegions,
3602       final int numRowsPerFlush)
3603       throws IOException, InterruptedException {
3604 
3605     LOG.info("\n\nCreating random table " + tableName + " with " + numRegions +
3606         " regions, " + numFlushes + " storefiles per region, " +
3607         numRowsPerFlush + " rows per flush, maxVersions=" +  maxVersions +
3608         "\n");
3609 
3610     final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3611     final int numCF = families.size();
3612     final byte[][] cfBytes = new byte[numCF][];
3613     {
3614       int cfIndex = 0;
3615       for (String cf : families) {
3616         cfBytes[cfIndex++] = Bytes.toBytes(cf);
3617       }
3618     }
3619 
3620     final int actualStartKey = 0;
3621     final int actualEndKey = Integer.MAX_VALUE;
3622     final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3623     final int splitStartKey = actualStartKey + keysPerRegion;
3624     final int splitEndKey = actualEndKey - keysPerRegion;
3625     final String keyFormat = "%08x";
3626     final HTable table = createTable(tableName, cfBytes,
3627         maxVersions,
3628         Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3629         Bytes.toBytes(String.format(keyFormat, splitEndKey)),
3630         numRegions);
3631 
3632     if (hbaseCluster != null) {
3633       getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3634     }
3635 
3636     for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3637       for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3638         final byte[] row = Bytes.toBytes(String.format(keyFormat,
3639             actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3640 
3641         Put put = new Put(row);
3642         Delete del = new Delete(row);
3643         for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3644           final byte[] cf = cfBytes[rand.nextInt(numCF)];
3645           final long ts = rand.nextInt();
3646           final byte[] qual = Bytes.toBytes("col" + iCol);
3647           if (rand.nextBoolean()) {
3648             final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
3649                 "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
3650                 ts + "_random_" + rand.nextLong());
3651             put.add(cf, qual, ts, value);
3652           } else if (rand.nextDouble() < 0.8) {
3653             del.deleteColumn(cf, qual, ts);
3654           } else {
3655             del.deleteColumns(cf, qual, ts);
3656           }
3657         }
3658 
3659         if (!put.isEmpty()) {
3660           table.put(put);
3661         }
3662 
3663         if (!del.isEmpty()) {
3664           table.delete(del);
3665         }
3666       }
3667       LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3668       table.flushCommits();
3669       if (hbaseCluster != null) {
3670         getMiniHBaseCluster().flushcache(table.getName());
3671       }
3672     }
3673 
3674     return table;
3675   }
3676 
3677   private static final int MIN_RANDOM_PORT = 0xc000;
3678   private static final int MAX_RANDOM_PORT = 0xfffe;
3679   private static Random random = new Random();
3680 
3681   /**
3682    * Returns a random port. These ports cannot be registered with IANA and are
3683    * intended for dynamic allocation (see http://bit.ly/dynports).
3684    */
3685   public static int randomPort() {
3686     return MIN_RANDOM_PORT
3687         + random.nextInt(MAX_RANDOM_PORT - MIN_RANDOM_PORT);
3688   }
3689 
3690   /**
3691    * Returns a random free port and marks that port as taken. Not thread-safe. Expected to be
3692    * called from single-threaded test setup code/
3693    */
3694   public static int randomFreePort() {
3695     int port = 0;
3696     do {
3697       port = randomPort();
3698       if (takenRandomPorts.contains(port)) {
3699         port = 0;
3700         continue;
3701       }
3702       takenRandomPorts.add(port);
3703 
3704       try {
3705         ServerSocket sock = new ServerSocket(port);
3706         sock.close();
3707       } catch (IOException ex) {
3708         port = 0;
3709       }
3710     } while (port == 0);
3711     return port;
3712   }
3713 
3714 
3715   public static String randomMultiCastAddress() {
3716     return "226.1.1." + random.nextInt(254);
3717   }
3718 
3719 
3720 
3721   public static void waitForHostPort(String host, int port)
3722       throws IOException {
3723     final int maxTimeMs = 10000;
3724     final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3725     IOException savedException = null;
3726     LOG.info("Waiting for server at " + host + ":" + port);
3727     for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3728       try {
3729         Socket sock = new Socket(InetAddress.getByName(host), port);
3730         sock.close();
3731         savedException = null;
3732         LOG.info("Server at " + host + ":" + port + " is available");
3733         break;
3734       } catch (UnknownHostException e) {
3735         throw new IOException("Failed to look up " + host, e);
3736       } catch (IOException e) {
3737         savedException = e;
3738       }
3739       Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3740     }
3741 
3742     if (savedException != null) {
3743       throw savedException;
3744     }
3745   }
3746 
3747   /**
3748    * Creates a pre-split table for load testing. If the table already exists,
3749    * logs a warning and continues.
3750    * @return the number of regions the table was split into
3751    */
3752   public static int createPreSplitLoadTestTable(Configuration conf,
3753       TableName tableName, byte[] columnFamily, Algorithm compression,
3754       DataBlockEncoding dataBlockEncoding) throws IOException {
3755     return createPreSplitLoadTestTable(conf, tableName,
3756       columnFamily, compression, dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1,
3757       Durability.USE_DEFAULT);
3758   }
3759   /**
3760    * Creates a pre-split table for load testing. If the table already exists,
3761    * logs a warning and continues.
3762    * @return the number of regions the table was split into
3763    */
3764   public static int createPreSplitLoadTestTable(Configuration conf,
3765       TableName tableName, byte[] columnFamily, Algorithm compression,
3766       DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
3767       Durability durability)
3768           throws IOException {
3769     HTableDescriptor desc = new HTableDescriptor(tableName);
3770     desc.setDurability(durability);
3771     desc.setRegionReplication(regionReplication);
3772     HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
3773     hcd.setDataBlockEncoding(dataBlockEncoding);
3774     hcd.setCompressionType(compression);
3775     return createPreSplitLoadTestTable(conf, desc, hcd, numRegionsPerServer);
3776   }
3777 
3778   /**
3779    * Creates a pre-split table for load testing. If the table already exists,
3780    * logs a warning and continues.
3781    * @return the number of regions the table was split into
3782    */
3783   public static int createPreSplitLoadTestTable(Configuration conf,
3784       TableName tableName, byte[][] columnFamilies, Algorithm compression,
3785       DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
3786       Durability durability)
3787           throws IOException {
3788     HTableDescriptor desc = new HTableDescriptor(tableName);
3789     desc.setDurability(durability);
3790     desc.setRegionReplication(regionReplication);
3791     HColumnDescriptor[] hcds = new HColumnDescriptor[columnFamilies.length];
3792     for (int i = 0; i < columnFamilies.length; i++) {
3793       HColumnDescriptor hcd = new HColumnDescriptor(columnFamilies[i]);
3794       hcd.setDataBlockEncoding(dataBlockEncoding);
3795       hcd.setCompressionType(compression);
3796       hcds[i] = hcd;
3797     }
3798     return createPreSplitLoadTestTable(conf, desc, hcds, numRegionsPerServer);
3799   }
3800 
3801   /**
3802    * Creates a pre-split table for load testing. If the table already exists,
3803    * logs a warning and continues.
3804    * @return the number of regions the table was split into
3805    */
3806   public static int createPreSplitLoadTestTable(Configuration conf,
3807       HTableDescriptor desc, HColumnDescriptor hcd) throws IOException {
3808     return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3809   }
3810 
3811   /**
3812    * Creates a pre-split table for load testing. If the table already exists,
3813    * logs a warning and continues.
3814    * @return the number of regions the table was split into
3815    */
3816   public static int createPreSplitLoadTestTable(Configuration conf,
3817       HTableDescriptor desc, HColumnDescriptor hcd, int numRegionsPerServer) throws IOException {
3818     return createPreSplitLoadTestTable(conf, desc, new HColumnDescriptor[] {hcd},
3819         numRegionsPerServer);
3820   }
3821 
3822   /**
3823    * Creates a pre-split table for load testing. If the table already exists,
3824    * logs a warning and continues.
3825    * @return the number of regions the table was split into
3826    */
3827   public static int createPreSplitLoadTestTable(Configuration conf,
3828       HTableDescriptor desc, HColumnDescriptor[] hcds, int numRegionsPerServer) throws IOException {
3829     for (HColumnDescriptor hcd : hcds) {
3830       if (!desc.hasFamily(hcd.getName())) {
3831         desc.addFamily(hcd);
3832       }
3833     }
3834 
3835     int totalNumberOfRegions = 0;
3836     Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3837     Admin admin = unmanagedConnection.getAdmin();
3838 
3839     try {
3840       // create a table a pre-splits regions.
3841       // The number of splits is set as:
3842       //    region servers * regions per region server).
3843       int numberOfServers = admin.getClusterStatus().getServers().size();
3844       if (numberOfServers == 0) {
3845         throw new IllegalStateException("No live regionservers");
3846       }
3847 
3848       totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3849       LOG.info("Number of live regionservers: " + numberOfServers + ", " +
3850           "pre-splitting table into " + totalNumberOfRegions + " regions " +
3851           "(regions per server: " + numRegionsPerServer + ")");
3852 
3853       byte[][] splits = new RegionSplitter.HexStringSplit().split(
3854           totalNumberOfRegions);
3855 
3856       admin.createTable(desc, splits);
3857     } catch (MasterNotRunningException e) {
3858       LOG.error("Master not running", e);
3859       throw new IOException(e);
3860     } catch (TableExistsException e) {
3861       LOG.warn("Table " + desc.getTableName() +
3862           " already exists, continuing");
3863     } finally {
3864       admin.close();
3865       unmanagedConnection.close();
3866     }
3867     return totalNumberOfRegions;
3868   }
3869 
3870   public static int getMetaRSPort(Configuration conf) throws IOException {
3871     try (Connection c = ConnectionFactory.createConnection();
3872         RegionLocator locator = c.getRegionLocator(TableName.META_TABLE_NAME)) {
3873       return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3874     }
3875   }
3876 
3877   /**
3878    *  Due to async racing issue, a region may not be in
3879    *  the online region list of a region server yet, after
3880    *  the assignment znode is deleted and the new assignment
3881    *  is recorded in master.
3882    */
3883   public void assertRegionOnServer(
3884       final HRegionInfo hri, final ServerName server,
3885       final long timeout) throws IOException, InterruptedException {
3886     long timeoutTime = System.currentTimeMillis() + timeout;
3887     while (true) {
3888       List<HRegionInfo> regions = getHBaseAdmin().getOnlineRegions(server);
3889       if (regions.contains(hri)) return;
3890       long now = System.currentTimeMillis();
3891       if (now > timeoutTime) break;
3892       Thread.sleep(10);
3893     }
3894     fail("Could not find region " + hri.getRegionNameAsString()
3895       + " on server " + server);
3896   }
3897 
3898   /**
3899    * Check to make sure the region is open on the specified
3900    * region server, but not on any other one.
3901    */
3902   public void assertRegionOnlyOnServer(
3903       final HRegionInfo hri, final ServerName server,
3904       final long timeout) throws IOException, InterruptedException {
3905     long timeoutTime = System.currentTimeMillis() + timeout;
3906     while (true) {
3907       List<HRegionInfo> regions = getHBaseAdmin().getOnlineRegions(server);
3908       if (regions.contains(hri)) {
3909         List<JVMClusterUtil.RegionServerThread> rsThreads =
3910           getHBaseCluster().getLiveRegionServerThreads();
3911         for (JVMClusterUtil.RegionServerThread rsThread: rsThreads) {
3912           HRegionServer rs = rsThread.getRegionServer();
3913           if (server.equals(rs.getServerName())) {
3914             continue;
3915           }
3916           Collection<Region> hrs = rs.getOnlineRegionsLocalContext();
3917           for (Region r: hrs) {
3918             assertTrue("Region should not be double assigned",
3919               r.getRegionInfo().getRegionId() != hri.getRegionId());
3920           }
3921         }
3922         return; // good, we are happy
3923       }
3924       long now = System.currentTimeMillis();
3925       if (now > timeoutTime) break;
3926       Thread.sleep(10);
3927     }
3928     fail("Could not find region " + hri.getRegionNameAsString()
3929       + " on server " + server);
3930   }
3931 
3932   public HRegion createTestRegion(String tableName, HColumnDescriptor hcd)
3933       throws IOException {
3934     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
3935     htd.addFamily(hcd);
3936     HRegionInfo info =
3937         new HRegionInfo(TableName.valueOf(tableName), null, null, false);
3938     HRegion region =
3939         HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), htd);
3940     return region;
3941   }
3942 
3943   public void setFileSystemURI(String fsURI) {
3944     FS_URI = fsURI;
3945   }
3946 
3947   /**
3948    * Wrapper method for {@link Waiter#waitFor(Configuration, long, Predicate)}.
3949    */
3950   public <E extends Exception> long waitFor(long timeout, Predicate<E> predicate)
3951       throws E {
3952     return Waiter.waitFor(this.conf, timeout, predicate);
3953   }
3954 
3955   /**
3956    * Wrapper method for {@link Waiter#waitFor(Configuration, long, long, Predicate)}.
3957    */
3958   public <E extends Exception> long waitFor(long timeout, long interval, Predicate<E> predicate)
3959       throws E {
3960     return Waiter.waitFor(this.conf, timeout, interval, predicate);
3961   }
3962 
3963   /**
3964    * Wrapper method for {@link Waiter#waitFor(Configuration, long, long, boolean, Predicate)}.
3965    */
3966   public <E extends Exception> long waitFor(long timeout, long interval,
3967       boolean failIfTimeout, Predicate<E> predicate) throws E {
3968     return Waiter.waitFor(this.conf, timeout, interval, failIfTimeout, predicate);
3969   }
3970 
3971   /**
3972    * Returns a {@link Predicate} for checking that there are no regions in transition in master
3973    */
3974   public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3975     return new ExplainingPredicate<IOException>() {
3976       @Override
3977       public String explainFailure() throws IOException {
3978         final RegionStates regionStates = getMiniHBaseCluster().getMaster()
3979             .getAssignmentManager().getRegionStates();
3980         return "found in transition: " + regionStates.getRegionsInTransition().toString();
3981       }
3982 
3983       @Override
3984       public boolean evaluate() throws IOException {
3985         HMaster master = getMiniHBaseCluster().getMaster();
3986         if (master == null) return false;
3987         AssignmentManager am = master.getAssignmentManager();
3988         if (am == null) return false;
3989         final RegionStates regionStates = am.getRegionStates();
3990         return !regionStates.isRegionsInTransition();
3991       }
3992     };
3993   }
3994 
3995   /**
3996    * Returns a {@link Predicate} for checking that table is enabled
3997    */
3998   public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3999     return new ExplainingPredicate<IOException>() {
4000       @Override
4001       public String explainFailure() throws IOException {
4002         return explainTableState(tableName);
4003       }
4004 
4005       @Override
4006       public boolean evaluate() throws IOException {
4007         return getHBaseAdmin().tableExists(tableName) && getHBaseAdmin().isTableEnabled(tableName);
4008       }
4009     };
4010   }
4011 
4012   /**
4013    * Returns a {@link Predicate} for checking that table is enabled
4014    */
4015   public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
4016     return new ExplainingPredicate<IOException>() {
4017       @Override
4018       public String explainFailure() throws IOException {
4019         return explainTableState(tableName);
4020       }
4021 
4022       @Override
4023       public boolean evaluate() throws IOException {
4024         return getHBaseAdmin().isTableDisabled(tableName);
4025       }
4026     };
4027   }
4028 
4029   /**
4030    * Returns a {@link Predicate} for checking that table is enabled
4031    */
4032   public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
4033     return new ExplainingPredicate<IOException>() {
4034       @Override
4035       public String explainFailure() throws IOException {
4036         return explainTableAvailability(tableName);
4037       }
4038 
4039       @Override
4040       public boolean evaluate() throws IOException {
4041         boolean tableAvailable = getHBaseAdmin().isTableAvailable(tableName);
4042         if (tableAvailable) {
4043           try {
4044             Canary.sniff(getHBaseAdmin(), tableName);
4045           } catch (Exception e) {
4046             throw new IOException("Canary sniff failed for table " + tableName, e);
4047           }
4048         }
4049         return tableAvailable;
4050       }
4051     };
4052   }
4053 
4054   /**
4055    * Wait until no regions in transition.
4056    * @param timeout How long to wait.
4057    * @throws Exception
4058    */
4059   public void waitUntilNoRegionsInTransition(
4060       final long timeout) throws Exception {
4061     waitFor(timeout, predicateNoRegionsInTransition());
4062   }
4063 
4064   /**
4065    * Wait until labels is ready in VisibilityLabelsCache.
4066    * @param timeoutMillis
4067    * @param labels
4068    */
4069   public void waitLabelAvailable(long timeoutMillis, final String... labels) {
4070     final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
4071     waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
4072 
4073       @Override
4074       public boolean evaluate() {
4075         for (String label : labels) {
4076           if (labelsCache.getLabelOrdinal(label) == 0) {
4077             return false;
4078           }
4079         }
4080         return true;
4081       }
4082 
4083       @Override
4084       public String explainFailure() {
4085         for (String label : labels) {
4086           if (labelsCache.getLabelOrdinal(label) == 0) {
4087             return label + " is not available yet";
4088           }
4089         }
4090         return "";
4091       }
4092     });
4093   }
4094 
4095   /**
4096    * Create a set of column descriptors with the combination of compression,
4097    * encoding, bloom codecs available.
4098    * @return the list of column descriptors
4099    */
4100   public static List<HColumnDescriptor> generateColumnDescriptors() {
4101     return generateColumnDescriptors("");
4102   }
4103 
4104   /**
4105    * Create a set of column descriptors with the combination of compression,
4106    * encoding, bloom codecs available.
4107    * @param prefix family names prefix
4108    * @return the list of column descriptors
4109    */
4110   public static List<HColumnDescriptor> generateColumnDescriptors(final String prefix) {
4111     List<HColumnDescriptor> htds = new ArrayList<HColumnDescriptor>();
4112     long familyId = 0;
4113     for (Compression.Algorithm compressionType: getSupportedCompressionAlgorithms()) {
4114       for (DataBlockEncoding encodingType: DataBlockEncoding.values()) {
4115         for (BloomType bloomType: BloomType.values()) {
4116           String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
4117           HColumnDescriptor htd = new HColumnDescriptor(name);
4118           htd.setCompressionType(compressionType);
4119           htd.setDataBlockEncoding(encodingType);
4120           htd.setBloomFilterType(bloomType);
4121           htds.add(htd);
4122           familyId++;
4123         }
4124       }
4125     }
4126     return htds;
4127   }
4128 
4129   /**
4130    * Get supported compression algorithms.
4131    * @return supported compression algorithms.
4132    */
4133   public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
4134     String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
4135     List<Compression.Algorithm> supportedAlgos = new ArrayList<Compression.Algorithm>();
4136     for (String algoName : allAlgos) {
4137       try {
4138         Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
4139         algo.getCompressor();
4140         supportedAlgos.add(algo);
4141       } catch (Throwable t) {
4142         // this algo is not available
4143       }
4144     }
4145     return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
4146   }
4147 }