View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase;
19  
20  import static org.junit.Assert.assertTrue;
21  import static org.junit.Assert.fail;
22  
23  import java.io.File;
24  import java.io.IOException;
25  import java.io.OutputStream;
26  import java.lang.reflect.Field;
27  import java.lang.reflect.Modifier;
28  import java.net.InetAddress;
29  import java.net.ServerSocket;
30  import java.net.Socket;
31  import java.net.UnknownHostException;
32  import java.security.MessageDigest;
33  import java.util.ArrayList;
34  import java.util.Arrays;
35  import java.util.Collection;
36  import java.util.Collections;
37  import java.util.HashSet;
38  import java.util.List;
39  import java.util.Map;
40  import java.util.NavigableSet;
41  import java.util.Random;
42  import java.util.Set;
43  import java.util.UUID;
44  import java.util.concurrent.TimeUnit;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.commons.logging.impl.Jdk14Logger;
49  import org.apache.commons.logging.impl.Log4JLogger;
50  import org.apache.hadoop.classification.InterfaceAudience;
51  import org.apache.hadoop.classification.InterfaceStability;
52  import org.apache.hadoop.conf.Configuration;
53  import org.apache.hadoop.fs.FileSystem;
54  import org.apache.hadoop.fs.Path;
55  import org.apache.hadoop.hbase.Waiter.Predicate;
56  import org.apache.hadoop.hbase.catalog.MetaEditor;
57  import org.apache.hadoop.hbase.client.Delete;
58  import org.apache.hadoop.hbase.client.Durability;
59  import org.apache.hadoop.hbase.client.Get;
60  import org.apache.hadoop.hbase.client.HBaseAdmin;
61  import org.apache.hadoop.hbase.client.HConnection;
62  import org.apache.hadoop.hbase.client.HTable;
63  import org.apache.hadoop.hbase.client.Put;
64  import org.apache.hadoop.hbase.client.Result;
65  import org.apache.hadoop.hbase.client.ResultScanner;
66  import org.apache.hadoop.hbase.client.Scan;
67  import org.apache.hadoop.hbase.fs.HFileSystem;
68  import org.apache.hadoop.hbase.io.compress.Compression;
69  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
70  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
71  import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
72  import org.apache.hadoop.hbase.io.hfile.HFile;
73  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
74  import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
75  import org.apache.hadoop.hbase.master.HMaster;
76  import org.apache.hadoop.hbase.master.RegionStates;
77  import org.apache.hadoop.hbase.master.ServerManager;
78  import org.apache.hadoop.hbase.regionserver.BloomType;
79  import org.apache.hadoop.hbase.regionserver.HRegion;
80  import org.apache.hadoop.hbase.regionserver.HRegionServer;
81  import org.apache.hadoop.hbase.regionserver.HStore;
82  import org.apache.hadoop.hbase.regionserver.InternalScanner;
83  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
84  import org.apache.hadoop.hbase.regionserver.wal.HLog;
85  import org.apache.hadoop.hbase.security.User;
86  import org.apache.hadoop.hbase.tool.Canary;
87  import org.apache.hadoop.hbase.util.Bytes;
88  import org.apache.hadoop.hbase.util.FSUtils;
89  import org.apache.hadoop.hbase.util.JVMClusterUtil;
90  import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
91  import org.apache.hadoop.hbase.util.RegionSplitter;
92  import org.apache.hadoop.hbase.util.RetryCounter;
93  import org.apache.hadoop.hbase.util.Threads;
94  import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
95  import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
96  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
97  import org.apache.hadoop.hbase.zookeeper.ZKConfig;
98  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
99  import org.apache.hadoop.hdfs.DFSClient;
100 import org.apache.hadoop.hdfs.DistributedFileSystem;
101 import org.apache.hadoop.hdfs.MiniDFSCluster;
102 import org.apache.hadoop.mapred.JobConf;
103 import org.apache.hadoop.mapred.MiniMRCluster;
104 import org.apache.hadoop.mapred.TaskLog;
105 import org.apache.zookeeper.KeeperException;
106 import org.apache.zookeeper.KeeperException.NodeExistsException;
107 import org.apache.zookeeper.WatchedEvent;
108 import org.apache.zookeeper.ZooKeeper;
109 import org.apache.zookeeper.ZooKeeper.States;
110 
111 /**
112  * Facility for testing HBase. Replacement for
113  * old HBaseTestCase and HBaseClusterTestCase functionality.
114  * Create an instance and keep it around testing HBase.  This class is
115  * meant to be your one-stop shop for anything you might need testing.  Manages
116  * one cluster at a time only. Managed cluster can be an in-process
117  * {@link MiniHBaseCluster}, or a deployed cluster of type {@link DistributedHBaseCluster}.
118  * Not all methods work with the real cluster.
119  * Depends on log4j being on classpath and
120  * hbase-site.xml for logging and test-run configuration.  It does not set
121  * logging levels nor make changes to configuration parameters.
122  * <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
123  * setting it to true.
124  */
125 @InterfaceAudience.Public
126 @InterfaceStability.Evolving
127 public class HBaseTestingUtility extends HBaseCommonTestingUtility {
128    private MiniZooKeeperCluster zkCluster = null;
129 
130   public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
131   /**
132    * The default number of regions per regionserver when creating a pre-split
133    * table.
134    */
135   public static final int DEFAULT_REGIONS_PER_SERVER = 5;
136 
137   /**
138    * Set if we were passed a zkCluster.  If so, we won't shutdown zk as
139    * part of general shutdown.
140    */
141   private boolean passedZkCluster = false;
142   private MiniDFSCluster dfsCluster = null;
143 
144   private HBaseCluster hbaseCluster = null;
145   private MiniMRCluster mrCluster = null;
146 
147   /** If there is a mini cluster running for this testing utility instance. */
148   private boolean miniClusterRunning;
149 
150   private String hadoopLogDir;
151 
152   /** Directory (a subdirectory of dataTestDir) used by the dfs cluster if any */
153   private File clusterTestDir = null;
154 
155   /** Directory on test filesystem where we put the data for this instance of
156     * HBaseTestingUtility*/
157   private Path dataTestDirOnTestFS = null;
158 
159   /**
160    * System property key to get test directory value.
161    * Name is as it is because mini dfs has hard-codings to put test data here.
162    * It should NOT be used directly in HBase, as it's a property used in
163    *  mini dfs.
164    *  @deprecated can be used only with mini dfs
165    */
166   @Deprecated
167   private static final String TEST_DIRECTORY_KEY = "test.build.data";
168 
169   /** Filesystem URI used for map-reduce mini-cluster setup */
170   private static String FS_URI;
171 
172   /** A set of ports that have been claimed using {@link #randomFreePort()}. */
173   private static final Set<Integer> takenRandomPorts = new HashSet<Integer>();
174 
175   /** Compression algorithms to use in parameterized JUnit 4 tests */
176   public static final List<Object[]> COMPRESSION_ALGORITHMS_PARAMETERIZED =
177     Arrays.asList(new Object[][] {
178       { Compression.Algorithm.NONE },
179       { Compression.Algorithm.GZ }
180     });
181 
182   /** This is for unit tests parameterized with a two booleans. */
183   public static final List<Object[]> BOOLEAN_PARAMETERIZED =
184       Arrays.asList(new Object[][] {
185           { new Boolean(false) },
186           { new Boolean(true) }
187       });
188 
189   /** This is for unit tests parameterized with a single boolean. */
190   public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination()  ;
191   /** Compression algorithms to use in testing */
192   public static final Compression.Algorithm[] COMPRESSION_ALGORITHMS ={
193       Compression.Algorithm.NONE, Compression.Algorithm.GZ
194     };
195 
196   /**
197    * Create all combinations of Bloom filters and compression algorithms for
198    * testing.
199    */
200   private static List<Object[]> bloomAndCompressionCombinations() {
201     List<Object[]> configurations = new ArrayList<Object[]>();
202     for (Compression.Algorithm comprAlgo :
203          HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
204       for (BloomType bloomType : BloomType.values()) {
205         configurations.add(new Object[] { comprAlgo, bloomType });
206       }
207     }
208     return Collections.unmodifiableList(configurations);
209   }
210 
211   /**
212    * Create combination of memstoreTS and tags
213    */
214   private static List<Object[]> memStoreTSAndTagsCombination() {
215     List<Object[]> configurations = new ArrayList<Object[]>();
216     configurations.add(new Object[] { false, false });
217     configurations.add(new Object[] { false, true });
218     configurations.add(new Object[] { true, false });
219     configurations.add(new Object[] { true, true });
220     return Collections.unmodifiableList(configurations);
221   }
222 
223   public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
224       bloomAndCompressionCombinations();
225 
226   public HBaseTestingUtility() {
227     this(HBaseConfiguration.create());
228   }
229 
230   public HBaseTestingUtility(Configuration conf) {
231     super(conf);
232 
233     // a hbase checksum verification failure will cause unit tests to fail
234     ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
235   }
236 
237   /**
238    * Create an HBaseTestingUtility where all tmp files are written to the local test data dir.
239    * It is needed to properly base FSUtil.getRootDirs so that they drop temp files in the proper
240    * test dir.  Use this when you aren't using an Mini HDFS cluster.
241    * @return HBaseTestingUtility that use local fs for temp files.
242    */
243   public static HBaseTestingUtility createLocalHTU() {
244     Configuration c = HBaseConfiguration.create();
245     return createLocalHTU(c);
246   }
247 
248   /**
249    * Create an HBaseTestingUtility where all tmp files are written to the local test data dir.
250    * It is needed to properly base FSUtil.getRootDirs so that they drop temp files in the proper
251    * test dir.  Use this when you aren't using an Mini HDFS cluster.
252    * @param c Configuration (will be modified)
253    * @return HBaseTestingUtility that use local fs for temp files.
254    */
255   public static HBaseTestingUtility createLocalHTU(Configuration c) {
256     HBaseTestingUtility htu = new HBaseTestingUtility(c);
257     String dataTestDir = htu.getDataTestDir().toString();
258     htu.getConfiguration().set(HConstants.HBASE_DIR, dataTestDir);
259     LOG.debug("Setting " + HConstants.HBASE_DIR + " to " + dataTestDir);
260     return htu;
261   }
262 
263   /**
264    * Returns this classes's instance of {@link Configuration}.  Be careful how
265    * you use the returned Configuration since {@link HConnection} instances
266    * can be shared.  The Map of HConnections is keyed by the Configuration.  If
267    * say, a Connection was being used against a cluster that had been shutdown,
268    * see {@link #shutdownMiniCluster()}, then the Connection will no longer
269    * be wholesome.  Rather than use the return direct, its usually best to
270    * make a copy and use that.  Do
271    * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
272    * @return Instance of Configuration.
273    */
274   @Override
275   public Configuration getConfiguration() {
276     return super.getConfiguration();
277   }
278 
279   public void setHBaseCluster(HBaseCluster hbaseCluster) {
280     this.hbaseCluster = hbaseCluster;
281   }
282 
283   /**
284    * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}.
285    * Give it a random name so can have many concurrent tests running if
286    * we need to.  It needs to amend the {@link #TEST_DIRECTORY_KEY}
287    * System property, as it's what minidfscluster bases
288    * it data dir on.  Moding a System property is not the way to do concurrent
289    * instances -- another instance could grab the temporary
290    * value unintentionally -- but not anything can do about it at moment;
291    * single instance only is how the minidfscluster works.
292    *
293    * We also create the underlying directory for
294    *  hadoop.log.dir, mapred.local.dir and hadoop.tmp.dir, and set the values
295    *  in the conf, and as a system property for hadoop.tmp.dir
296    *
297    * @return The calculated data test build directory, if newly-created.
298    */
299   @Override
300   protected Path setupDataTestDir() {
301     Path testPath = super.setupDataTestDir();
302     if (null == testPath) {
303       return null;
304     }
305 
306     createSubDirAndSystemProperty(
307       "hadoop.log.dir",
308       testPath, "hadoop-log-dir");
309 
310     // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
311     //  we want our own value to ensure uniqueness on the same machine
312     createSubDirAndSystemProperty(
313       "hadoop.tmp.dir",
314       testPath, "hadoop-tmp-dir");
315 
316     // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
317     createSubDir(
318       "mapred.local.dir",
319       testPath, "mapred-local-dir");
320 
321     return testPath;
322   }
323 
324   private void createSubDirAndSystemProperty(
325     String propertyName, Path parent, String subDirName){
326 
327     String sysValue = System.getProperty(propertyName);
328 
329     if (sysValue != null) {
330       // There is already a value set. So we do nothing but hope
331       //  that there will be no conflicts
332       LOG.info("System.getProperty(\""+propertyName+"\") already set to: "+
333         sysValue + " so I do NOT create it in " + parent);
334       String confValue = conf.get(propertyName);
335       if (confValue != null && !confValue.endsWith(sysValue)){
336        LOG.warn(
337          propertyName + " property value differs in configuration and system: "+
338          "Configuration="+confValue+" while System="+sysValue+
339          " Erasing configuration value by system value."
340        );
341       }
342       conf.set(propertyName, sysValue);
343     } else {
344       // Ok, it's not set, so we create it as a subdirectory
345       createSubDir(propertyName, parent, subDirName);
346       System.setProperty(propertyName, conf.get(propertyName));
347     }
348   }
349 
350   /**
351    * @return Where to write test data on the test filesystem; Returns working directory
352    * for the test filesystem by default
353    * @see #setupDataTestDirOnTestFS()
354    * @see #getTestFileSystem()
355    */
356   private Path getBaseTestDirOnTestFS() throws IOException {
357     FileSystem fs = getTestFileSystem();
358     return new Path(fs.getWorkingDirectory(), "test-data");
359   }
360 
361   /**
362    * @return Where the DFS cluster will write data on the local subsystem.
363    * Creates it if it does not exist already.  A subdir of {@link #getBaseTestDir()}
364    * @see #getTestFileSystem()
365    */
366   Path getClusterTestDir() {
367     if (clusterTestDir == null){
368       setupClusterTestDir();
369     }
370     return new Path(clusterTestDir.getAbsolutePath());
371   }
372 
373   /**
374    * Creates a directory for the DFS cluster, under the test data
375    */
376   private void setupClusterTestDir() {
377     if (clusterTestDir != null) {
378       return;
379     }
380 
381     // Using randomUUID ensures that multiple clusters can be launched by
382     //  a same test, if it stops & starts them
383     Path testDir = getDataTestDir("dfscluster_" + UUID.randomUUID().toString());
384     clusterTestDir = new File(testDir.toString()).getAbsoluteFile();
385     // Have it cleaned up on exit
386     boolean b = deleteOnExit();
387     if (b) clusterTestDir.deleteOnExit();
388     conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
389     LOG.info("Created new mini-cluster data directory: " + clusterTestDir + ", deleteOnExit=" + b);
390   }
391 
392   /**
393    * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
394    * to write temporary test data. Call this method after setting up the mini dfs cluster
395    * if the test relies on it.
396    * @return a unique path in the test filesystem
397    */
398   public Path getDataTestDirOnTestFS() throws IOException {
399     if (dataTestDirOnTestFS == null) {
400       setupDataTestDirOnTestFS();
401     }
402 
403     return dataTestDirOnTestFS;
404   }
405 
406   /**
407    * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
408    * to write temporary test data. Call this method after setting up the mini dfs cluster
409    * if the test relies on it.
410    * @return a unique path in the test filesystem
411    * @param subdirName name of the subdir to create under the base test dir
412    */
413   public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
414     return new Path(getDataTestDirOnTestFS(), subdirName);
415   }
416 
417   /**
418    * Sets up a path in test filesystem to be used by tests
419    */
420   private void setupDataTestDirOnTestFS() throws IOException {
421     if (dataTestDirOnTestFS != null) {
422       LOG.warn("Data test on test fs dir already setup in "
423           + dataTestDirOnTestFS.toString());
424       return;
425     }
426 
427     //The file system can be either local, mini dfs, or if the configuration
428     //is supplied externally, it can be an external cluster FS. If it is a local
429     //file system, the tests should use getBaseTestDir, otherwise, we can use
430     //the working directory, and create a unique sub dir there
431     FileSystem fs = getTestFileSystem();
432     if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
433       File dataTestDir = new File(getDataTestDir().toString());
434       if (deleteOnExit()) dataTestDir.deleteOnExit();
435       dataTestDirOnTestFS = new Path(dataTestDir.getAbsolutePath());
436     } else {
437       Path base = getBaseTestDirOnTestFS();
438       String randomStr = UUID.randomUUID().toString();
439       dataTestDirOnTestFS = new Path(base, randomStr);
440       if (deleteOnExit()) fs.deleteOnExit(dataTestDirOnTestFS);
441     }
442   }
443 
444   /**
445    * Cleans the test data directory on the test filesystem.
446    * @return True if we removed the test dirs
447    * @throws IOException
448    */
449   public boolean cleanupDataTestDirOnTestFS() throws IOException {
450     boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
451     if (ret)
452       dataTestDirOnTestFS = null;
453     return ret;
454   }
455 
456   /**
457    * Cleans a subdirectory under the test data directory on the test filesystem.
458    * @return True if we removed child
459    * @throws IOException
460    */
461   public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
462     Path cpath = getDataTestDirOnTestFS(subdirName);
463     return getTestFileSystem().delete(cpath, true);
464   }
465 
466   /**
467    * Start a minidfscluster.
468    * @param servers How many DNs to start.
469    * @throws Exception
470    * @see {@link #shutdownMiniDFSCluster()}
471    * @return The mini dfs cluster created.
472    */
473   public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
474     return startMiniDFSCluster(servers, null);
475   }
476 
477   /**
478    * Start a minidfscluster.
479    * This is useful if you want to run datanode on distinct hosts for things
480    * like HDFS block location verification.
481    * If you start MiniDFSCluster without host names, all instances of the
482    * datanodes will have the same host name.
483    * @param hosts hostnames DNs to run on.
484    * @throws Exception
485    * @see {@link #shutdownMiniDFSCluster()}
486    * @return The mini dfs cluster created.
487    */
488   public MiniDFSCluster startMiniDFSCluster(final String hosts[])
489   throws Exception {
490     if ( hosts != null && hosts.length != 0) {
491       return startMiniDFSCluster(hosts.length, hosts);
492     } else {
493       return startMiniDFSCluster(1, null);
494     }
495   }
496 
497   /**
498    * Start a minidfscluster.
499    * Can only create one.
500    * @param servers How many DNs to start.
501    * @param hosts hostnames DNs to run on.
502    * @throws Exception
503    * @see {@link #shutdownMiniDFSCluster()}
504    * @return The mini dfs cluster created.
505    */
506   public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[])
507   throws Exception {
508     createDirsAndSetProperties();
509 
510     // Error level to skip some warnings specific to the minicluster. See HBASE-4709
511     org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.util.MBeans.class).
512         setLevel(org.apache.log4j.Level.ERROR);
513     org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class).
514         setLevel(org.apache.log4j.Level.ERROR);
515 
516 
517     this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
518       true, null, null, hosts, null);
519 
520     // Set this just-started cluster as our filesystem.
521     FileSystem fs = this.dfsCluster.getFileSystem();
522     FSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
523 
524     // Wait for the cluster to be totally up
525     this.dfsCluster.waitClusterUp();
526 
527     //reset the test directory for test file system
528     dataTestDirOnTestFS = null;
529 
530     return this.dfsCluster;
531   }
532 
533 
534   public MiniDFSCluster startMiniDFSCluster(int servers, final  String racks[], String hosts[])
535       throws Exception {
536     createDirsAndSetProperties();
537     this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
538         true, null, racks, hosts, null);
539 
540     // Set this just-started cluster as our filesystem.
541     FileSystem fs = this.dfsCluster.getFileSystem();
542     FSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
543 
544     // Wait for the cluster to be totally up
545     this.dfsCluster.waitClusterUp();
546 
547     //reset the test directory for test file system
548     dataTestDirOnTestFS = null;
549 
550     return this.dfsCluster;
551   }
552 
553   public MiniDFSCluster startMiniDFSClusterForTestHLog(int namenodePort) throws IOException {
554     createDirsAndSetProperties();
555     dfsCluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null,
556         null, null, null);
557     return dfsCluster;
558   }
559 
560   /** This is used before starting HDFS and map-reduce mini-clusters */
561   private void createDirsAndSetProperties() throws IOException {
562     setupClusterTestDir();
563     System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
564     createDirAndSetProperty("cache_data", "test.cache.data");
565     createDirAndSetProperty("hadoop_tmp", "hadoop.tmp.dir");
566     hadoopLogDir = createDirAndSetProperty("hadoop_logs", "hadoop.log.dir");
567     createDirAndSetProperty("mapred_local", "mapred.local.dir");
568     createDirAndSetProperty("mapred_temp", "mapred.temp.dir");
569     enableShortCircuit();
570 
571     Path root = getDataTestDirOnTestFS("hadoop");
572     conf.set(MapreduceTestingShim.getMROutputDirProp(),
573       new Path(root, "mapred-output-dir").toString());
574     conf.set("mapred.system.dir", new Path(root, "mapred-system-dir").toString());
575     conf.set("mapreduce.jobtracker.staging.root.dir",
576       new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
577     conf.set("mapred.working.dir", new Path(root, "mapred-working-dir").toString());
578   }
579 
580 
581   /**
582    *  Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property.
583    *  This allows to specify this parameter on the command line.
584    *   If not set, default is true.
585    */
586   public boolean isReadShortCircuitOn(){
587     final String propName = "hbase.tests.use.shortcircuit.reads";
588     String readOnProp = System.getProperty(propName);
589     if (readOnProp != null){
590       return  Boolean.parseBoolean(readOnProp);
591     } else {
592       return conf.getBoolean(propName, false);
593     }
594   }
595 
596   /** Enable the short circuit read, unless configured differently.
597    * Set both HBase and HDFS settings, including skipping the hdfs checksum checks.
598    */
599   private void enableShortCircuit() {
600     if (isReadShortCircuitOn()) {
601       String curUser = System.getProperty("user.name");
602       LOG.info("read short circuit is ON for user " + curUser);
603       // read short circuit, for hdfs
604       conf.set("dfs.block.local-path-access.user", curUser);
605       // read short circuit, for hbase
606       conf.setBoolean("dfs.client.read.shortcircuit", true);
607       // Skip checking checksum, for the hdfs client and the datanode
608       conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
609     } else {
610       LOG.info("read short circuit is OFF");
611     }
612   }
613 
614   private String createDirAndSetProperty(final String relPath, String property) {
615     String path = getDataTestDir(relPath).toString();
616     System.setProperty(property, path);
617     conf.set(property, path);
618     new File(path).mkdirs();
619     LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
620     return path;
621   }
622 
623   /**
624    * Shuts down instance created by call to {@link #startMiniDFSCluster(int)}
625    * or does nothing.
626    * @throws IOException
627    */
628   public void shutdownMiniDFSCluster() throws IOException {
629     if (this.dfsCluster != null) {
630       // The below throws an exception per dn, AsynchronousCloseException.
631       this.dfsCluster.shutdown();
632       dfsCluster = null;
633       dataTestDirOnTestFS = null;
634       FSUtils.setFsDefault(this.conf, new Path("file:///"));
635     }
636   }
637 
638   /**
639    * Call this if you only want a zk cluster.
640    * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
641    * @throws Exception
642    * @see #shutdownMiniZKCluster()
643    * @return zk cluster started.
644    */
645   public MiniZooKeeperCluster startMiniZKCluster() throws Exception {
646     return startMiniZKCluster(1);
647   }
648 
649   /**
650    * Call this if you only want a zk cluster.
651    * @param zooKeeperServerNum
652    * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
653    * @throws Exception
654    * @see #shutdownMiniZKCluster()
655    * @return zk cluster started.
656    */
657   public MiniZooKeeperCluster startMiniZKCluster(int zooKeeperServerNum)
658       throws Exception {
659     setupClusterTestDir();
660     return startMiniZKCluster(clusterTestDir, zooKeeperServerNum);
661   }
662 
663   private MiniZooKeeperCluster startMiniZKCluster(final File dir)
664     throws Exception {
665     return startMiniZKCluster(dir,1);
666   }
667 
668   /**
669    * Start a mini ZK cluster. If the property "test.hbase.zookeeper.property.clientPort" is set
670    *  the port mentionned is used as the default port for ZooKeeper.
671    */
672   private MiniZooKeeperCluster startMiniZKCluster(final File dir,
673       int zooKeeperServerNum)
674   throws Exception {
675     if (this.zkCluster != null) {
676       throw new IOException("Cluster already running at " + dir);
677     }
678     this.passedZkCluster = false;
679     this.zkCluster = new MiniZooKeeperCluster(this.getConfiguration());
680     final int defPort = this.conf.getInt("test.hbase.zookeeper.property.clientPort", 0);
681     if (defPort > 0){
682       // If there is a port in the config file, we use it.
683       this.zkCluster.setDefaultClientPort(defPort);
684     }
685     int clientPort =   this.zkCluster.startup(dir,zooKeeperServerNum);
686     this.conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,
687       Integer.toString(clientPort));
688     return this.zkCluster;
689   }
690 
691   /**
692    * Shuts down zk cluster created by call to {@link #startMiniZKCluster(File)}
693    * or does nothing.
694    * @throws IOException
695    * @see #startMiniZKCluster()
696    */
697   public void shutdownMiniZKCluster() throws IOException {
698     if (this.zkCluster != null) {
699       this.zkCluster.shutdown();
700       this.zkCluster = null;
701     }
702   }
703 
704   /**
705    * Start up a minicluster of hbase, dfs, and zookeeper.
706    * @throws Exception
707    * @return Mini hbase cluster instance created.
708    * @see {@link #shutdownMiniDFSCluster()}
709    */
710   public MiniHBaseCluster startMiniCluster() throws Exception {
711     return startMiniCluster(1, 1);
712   }
713 
714   /**
715    * Start up a minicluster of hbase, optionally dfs, and zookeeper.
716    * Modifies Configuration.  Homes the cluster data directory under a random
717    * subdirectory in a directory under System property test.build.data.
718    * Directory is cleaned up on exit.
719    * @param numSlaves Number of slaves to start up.  We'll start this many
720    * datanodes and regionservers.  If numSlaves is > 1, then make sure
721    * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
722    * bind errors.
723    * @throws Exception
724    * @see {@link #shutdownMiniCluster()}
725    * @return Mini hbase cluster instance created.
726    */
727   public MiniHBaseCluster startMiniCluster(final int numSlaves)
728   throws Exception {
729     return startMiniCluster(1, numSlaves);
730   }
731 
732 
733   /**
734    * start minicluster
735    * @throws Exception
736    * @see {@link #shutdownMiniCluster()}
737    * @return Mini hbase cluster instance created.
738    */
739   public MiniHBaseCluster startMiniCluster(final int numMasters,
740     final int numSlaves)
741   throws Exception {
742     return startMiniCluster(numMasters, numSlaves, null);
743   }
744 
745   /**
746    * Start up a minicluster of hbase, optionally dfs, and zookeeper.
747    * Modifies Configuration.  Homes the cluster data directory under a random
748    * subdirectory in a directory under System property test.build.data.
749    * Directory is cleaned up on exit.
750    * @param numMasters Number of masters to start up.  We'll start this many
751    * hbase masters.  If numMasters > 1, you can find the active/primary master
752    * with {@link MiniHBaseCluster#getMaster()}.
753    * @param numSlaves Number of slaves to start up.  We'll start this many
754    * regionservers. If dataNodeHosts == null, this also indicates the number of
755    * datanodes to start. If dataNodeHosts != null, the number of datanodes is
756    * based on dataNodeHosts.length.
757    * If numSlaves is > 1, then make sure
758    * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
759    * bind errors.
760    * @param dataNodeHosts hostnames DNs to run on.
761    * This is useful if you want to run datanode on distinct hosts for things
762    * like HDFS block location verification.
763    * If you start MiniDFSCluster without host names,
764    * all instances of the datanodes will have the same host name.
765    * @throws Exception
766    * @see {@link #shutdownMiniCluster()}
767    * @return Mini hbase cluster instance created.
768    */
769   public MiniHBaseCluster startMiniCluster(final int numMasters,
770       final int numSlaves, final String[] dataNodeHosts) throws Exception {
771     return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts, null, null);
772   }
773 
774   /**
775    * Same as {@link #startMiniCluster(int, int)}, but with custom number of datanodes.
776    * @param numDataNodes Number of data nodes.
777    */
778   public MiniHBaseCluster startMiniCluster(final int numMasters,
779       final int numSlaves, final int numDataNodes) throws Exception {
780     return startMiniCluster(numMasters, numSlaves, numDataNodes, null, null, null);
781   }
782 
783   /**
784    * Start up a minicluster of hbase, optionally dfs, and zookeeper.
785    * Modifies Configuration.  Homes the cluster data directory under a random
786    * subdirectory in a directory under System property test.build.data.
787    * Directory is cleaned up on exit.
788    * @param numMasters Number of masters to start up.  We'll start this many
789    * hbase masters.  If numMasters > 1, you can find the active/primary master
790    * with {@link MiniHBaseCluster#getMaster()}.
791    * @param numSlaves Number of slaves to start up.  We'll start this many
792    * regionservers. If dataNodeHosts == null, this also indicates the number of
793    * datanodes to start. If dataNodeHosts != null, the number of datanodes is
794    * based on dataNodeHosts.length.
795    * If numSlaves is > 1, then make sure
796    * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
797    * bind errors.
798    * @param dataNodeHosts hostnames DNs to run on.
799    * This is useful if you want to run datanode on distinct hosts for things
800    * like HDFS block location verification.
801    * If you start MiniDFSCluster without host names,
802    * all instances of the datanodes will have the same host name.
803    * @param masterClass The class to use as HMaster, or null for default
804    * @param regionserverClass The class to use as HRegionServer, or null for
805    * default
806    * @throws Exception
807    * @see {@link #shutdownMiniCluster()}
808    * @return Mini hbase cluster instance created.
809    */
810   public MiniHBaseCluster startMiniCluster(final int numMasters,
811       final int numSlaves, final String[] dataNodeHosts, Class<? extends HMaster> masterClass,
812       Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
813           throws Exception {
814     return startMiniCluster(
815         numMasters, numSlaves, numSlaves, dataNodeHosts, masterClass, regionserverClass);
816   }
817 
818   /**
819    * Same as {@link #startMiniCluster(int, int, String[], Class, Class)}, but with custom
820    * number of datanodes.
821    * @param numDataNodes Number of data nodes.
822    */
823   public MiniHBaseCluster startMiniCluster(final int numMasters,
824     final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
825     Class<? extends HMaster> masterClass,
826     Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
827   throws Exception {
828     if (dataNodeHosts != null && dataNodeHosts.length != 0) {
829       numDataNodes = dataNodeHosts.length;
830     }
831 
832     LOG.info("Starting up minicluster with " + numMasters + " master(s) and " +
833         numSlaves + " regionserver(s) and " + numDataNodes + " datanode(s)");
834 
835     // If we already put up a cluster, fail.
836     if (miniClusterRunning) {
837       throw new IllegalStateException("A mini-cluster is already running");
838     }
839     miniClusterRunning = true;
840 
841     setupClusterTestDir();
842     System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
843 
844     // Bring up mini dfs cluster. This spews a bunch of warnings about missing
845     // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
846     startMiniDFSCluster(numDataNodes, dataNodeHosts);
847 
848     // Start up a zk cluster.
849     if (this.zkCluster == null) {
850       startMiniZKCluster(clusterTestDir);
851     }
852 
853     // Start the MiniHBaseCluster
854     return startMiniHBaseCluster(numMasters, numSlaves, masterClass, regionserverClass);
855   }
856 
857   public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves)
858       throws IOException, InterruptedException{
859     return startMiniHBaseCluster(numMasters, numSlaves, null, null);
860   }
861 
862   /**
863    * Starts up mini hbase cluster.  Usually used after call to
864    * {@link #startMiniCluster(int, int)} when doing stepped startup of clusters.
865    * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
866    * @param numMasters
867    * @param numSlaves
868    * @return Reference to the hbase mini hbase cluster.
869    * @throws IOException
870    * @throws InterruptedException
871    * @see {@link #startMiniCluster()}
872    */
873   public MiniHBaseCluster startMiniHBaseCluster(final int numMasters,
874         final int numSlaves, Class<? extends HMaster> masterClass,
875         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
876   throws IOException, InterruptedException {
877     // Now do the mini hbase cluster.  Set the hbase.rootdir in config.
878     createRootDir();
879 
880     // These settings will make the server waits until this exact number of
881     // regions servers are connected.
882     if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
883       conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, numSlaves);
884     }
885     if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
886       conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, numSlaves);
887     }
888 
889     Configuration c = new Configuration(this.conf);
890     this.hbaseCluster =
891         new MiniHBaseCluster(c, numMasters, numSlaves, masterClass, regionserverClass);
892     // Don't leave here till we've done a successful scan of the hbase:meta
893     HTable t = new HTable(c, TableName.META_TABLE_NAME);
894     ResultScanner s = t.getScanner(new Scan());
895     while (s.next() != null) {
896       continue;
897     }
898     s.close();
899     t.close();
900 
901     getHBaseAdmin(); // create immediately the hbaseAdmin
902     LOG.info("Minicluster is up");
903     return (MiniHBaseCluster)this.hbaseCluster;
904   }
905 
906   /**
907    * Starts the hbase cluster up again after shutting it down previously in a
908    * test.  Use this if you want to keep dfs/zk up and just stop/start hbase.
909    * @param servers number of region servers
910    * @throws IOException
911    */
912   public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
913     this.hbaseCluster = new MiniHBaseCluster(this.conf, servers);
914     // Don't leave here till we've done a successful scan of the hbase:meta
915     HTable t = new HTable(new Configuration(this.conf), TableName.META_TABLE_NAME);
916     ResultScanner s = t.getScanner(new Scan());
917     while (s.next() != null) {
918       // do nothing
919     }
920     LOG.info("HBase has been restarted");
921     s.close();
922     t.close();
923   }
924 
925   /**
926    * @return Current mini hbase cluster. Only has something in it after a call
927    * to {@link #startMiniCluster()}.
928    * @see #startMiniCluster()
929    */
930   public MiniHBaseCluster getMiniHBaseCluster() {
931     if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) {
932       return (MiniHBaseCluster)this.hbaseCluster;
933     }
934     throw new RuntimeException(hbaseCluster + " not an instance of " +
935                                MiniHBaseCluster.class.getName());
936   }
937 
938   /**
939    * Stops mini hbase, zk, and hdfs clusters.
940    * @throws IOException
941    * @see {@link #startMiniCluster(int)}
942    */
943   public void shutdownMiniCluster() throws Exception {
944     LOG.info("Shutting down minicluster");
945     shutdownMiniHBaseCluster();
946     if (!this.passedZkCluster){
947       shutdownMiniZKCluster();
948     }
949     shutdownMiniDFSCluster();
950 
951     cleanupTestDir();
952     miniClusterRunning = false;
953     LOG.info("Minicluster is down");
954   }
955   
956   /**
957    * @return True if we removed the test dirs
958    * @throws IOException
959    */
960   @Override
961   public boolean cleanupTestDir() throws IOException {
962     boolean ret = super.cleanupTestDir();
963     if (deleteDir(this.clusterTestDir)) {
964       this.clusterTestDir = null;
965       return ret & true;
966     }
967     return false;
968   }
969 
970   /**
971    * Shutdown HBase mini cluster.  Does not shutdown zk or dfs if running.
972    * @throws IOException
973    */
974   public void shutdownMiniHBaseCluster() throws IOException {
975     if (hbaseAdmin != null) {
976       hbaseAdmin.close0();
977       hbaseAdmin = null;
978     }
979 
980     // unset the configuration for MIN and MAX RS to start
981     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
982     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
983     if (this.hbaseCluster != null) {
984       this.hbaseCluster.shutdown();
985       // Wait till hbase is down before going on to shutdown zk.
986       this.hbaseCluster.waitUntilShutDown();
987       this.hbaseCluster = null;
988     }
989 
990     if (zooKeeperWatcher != null) {
991       zooKeeperWatcher.close();
992       zooKeeperWatcher = null;
993     }
994   }
995 
996   /**
997    * Returns the path to the default root dir the minicluster uses.
998    * Note: this does not cause the root dir to be created.
999    * @return Fully qualified path for the default hbase root dir
1000    * @throws IOException
1001    */
1002   public Path getDefaultRootDirPath() throws IOException {
1003 	FileSystem fs = FileSystem.get(this.conf);
1004 	return new Path(fs.makeQualified(fs.getHomeDirectory()),"hbase");
1005   }
1006 
1007   /**
1008    * Creates an hbase rootdir in user home directory.  Also creates hbase
1009    * version file.  Normally you won't make use of this method.  Root hbasedir
1010    * is created for you as part of mini cluster startup.  You'd only use this
1011    * method if you were doing manual operation.
1012    * @return Fully qualified path to hbase root dir
1013    * @throws IOException
1014    */
1015   public Path createRootDir() throws IOException {
1016     FileSystem fs = FileSystem.get(this.conf);
1017     Path hbaseRootdir = getDefaultRootDirPath();
1018     FSUtils.setRootDir(this.conf, hbaseRootdir);
1019     fs.mkdirs(hbaseRootdir);
1020     FSUtils.setVersion(fs, hbaseRootdir);
1021     return hbaseRootdir;
1022   }
1023 
1024   /**
1025    * Flushes all caches in the mini hbase cluster
1026    * @throws IOException
1027    */
1028   public void flush() throws IOException {
1029     getMiniHBaseCluster().flushcache();
1030   }
1031 
1032   /**
1033    * Flushes all caches in the mini hbase cluster
1034    * @throws IOException
1035    */
1036   public void flush(TableName tableName) throws IOException {
1037     getMiniHBaseCluster().flushcache(tableName);
1038   }
1039 
1040   /**
1041    * Compact all regions in the mini hbase cluster
1042    * @throws IOException
1043    */
1044   public void compact(boolean major) throws IOException {
1045     getMiniHBaseCluster().compact(major);
1046   }
1047 
1048   /**
1049    * Compact all of a table's reagion in the mini hbase cluster
1050    * @throws IOException
1051    */
1052   public void compact(TableName tableName, boolean major) throws IOException {
1053     getMiniHBaseCluster().compact(tableName, major);
1054   }
1055 
1056   /**
1057    * Create a table.
1058    * @param tableName
1059    * @param family
1060    * @return An HTable instance for the created table.
1061    * @throws IOException
1062    */
1063   public HTable createTable(String tableName, String family)
1064   throws IOException{
1065     return createTable(TableName.valueOf(tableName), new String[]{family});
1066   }
1067 
1068   /**
1069    * Create a table.
1070    * @param tableName
1071    * @param family
1072    * @return An HTable instance for the created table.
1073    * @throws IOException
1074    */
1075   public HTable createTable(byte[] tableName, byte[] family)
1076   throws IOException{
1077     return createTable(TableName.valueOf(tableName), new byte[][]{family});
1078   }
1079 
1080   /**
1081    * Create a table.
1082    * @param tableName
1083    * @param families
1084    * @return An HTable instance for the created table.
1085    * @throws IOException
1086    */
1087   public HTable createTable(TableName tableName, String[] families)
1088   throws IOException {
1089     List<byte[]> fams = new ArrayList<byte[]>(families.length);
1090     for (String family : families) {
1091       fams.add(Bytes.toBytes(family));
1092     }
1093     return createTable(tableName, fams.toArray(new byte[0][]));
1094   }
1095 
1096   /**
1097    * Create a table.
1098    * @param tableName
1099    * @param family
1100    * @return An HTable instance for the created table.
1101    * @throws IOException
1102    */
1103   public HTable createTable(TableName tableName, byte[] family)
1104   throws IOException{
1105     return createTable(tableName, new byte[][]{family});
1106   }
1107 
1108 
1109   /**
1110    * Create a table.
1111    * @param tableName
1112    * @param families
1113    * @return An HTable instance for the created table.
1114    * @throws IOException
1115    */
1116   public HTable createTable(byte[] tableName, byte[][] families)
1117   throws IOException {
1118     return createTable(tableName, families,
1119         new Configuration(getConfiguration()));
1120   }
1121 
1122   /**
1123    * Create a table.
1124    * @param tableName
1125    * @param families
1126    * @return An HTable instance for the created table.
1127    * @throws IOException
1128    */
1129   public HTable createTable(TableName tableName, byte[][] families)
1130   throws IOException {
1131     return createTable(tableName, families,
1132         new Configuration(getConfiguration()));
1133   }
1134 
1135   public HTable createTable(byte[] tableName, byte[][] families,
1136       int numVersions, byte[] startKey, byte[] endKey, int numRegions) throws IOException {
1137     return createTable(TableName.valueOf(tableName), families, numVersions,
1138         startKey, endKey, numRegions);
1139   }
1140 
1141   public HTable createTable(String tableName, byte[][] families,
1142       int numVersions, byte[] startKey, byte[] endKey, int numRegions) throws IOException {
1143     return createTable(TableName.valueOf(tableName), families, numVersions,
1144         startKey, endKey, numRegions);
1145   }
1146 
1147   public HTable createTable(TableName tableName, byte[][] families,
1148       int numVersions, byte[] startKey, byte[] endKey, int numRegions)
1149   throws IOException{
1150     HTableDescriptor desc = new HTableDescriptor(tableName);
1151     for (byte[] family : families) {
1152       HColumnDescriptor hcd = new HColumnDescriptor(family)
1153           .setMaxVersions(numVersions);
1154       desc.addFamily(hcd);
1155     }
1156     getHBaseAdmin().createTable(desc, startKey, endKey, numRegions);
1157     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1158     waitUntilAllRegionsAssigned(tableName);
1159     return new HTable(getConfiguration(), tableName);
1160   }
1161 
1162   /**
1163    * Create a table.
1164    * @param htd
1165    * @param families
1166    * @param c Configuration to use
1167    * @return An HTable instance for the created table.
1168    * @throws IOException
1169    */
1170   public HTable createTable(HTableDescriptor htd, byte[][] families, Configuration c)
1171   throws IOException {
1172     for(byte[] family : families) {
1173       HColumnDescriptor hcd = new HColumnDescriptor(family);
1174       // Disable blooms (they are on by default as of 0.95) but we disable them here because
1175       // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1176       // on is interfering.
1177       hcd.setBloomFilterType(BloomType.NONE);
1178       htd.addFamily(hcd);
1179     }
1180     getHBaseAdmin().createTable(htd);
1181     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1182     waitUntilAllRegionsAssigned(htd.getTableName());
1183     return new HTable(c, htd.getTableName());
1184   }
1185 
1186   /**
1187    * Create a table.
1188    * @param tableName
1189    * @param families
1190    * @param c Configuration to use
1191    * @return An HTable instance for the created table.
1192    * @throws IOException
1193    */
1194   public HTable createTable(TableName tableName, byte[][] families,
1195       final Configuration c)
1196   throws IOException {
1197     return createTable(new HTableDescriptor(tableName), families, c);
1198   }
1199 
1200   /**
1201    * Create a table.
1202    * @param tableName
1203    * @param families
1204    * @param c Configuration to use
1205    * @return An HTable instance for the created table.
1206    * @throws IOException
1207    */
1208   public HTable createTable(byte[] tableName, byte[][] families,
1209       final Configuration c)
1210   throws IOException {
1211     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1212     for(byte[] family : families) {
1213       HColumnDescriptor hcd = new HColumnDescriptor(family);
1214       // Disable blooms (they are on by default as of 0.95) but we disable them here because
1215       // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1216       // on is interfering.
1217       hcd.setBloomFilterType(BloomType.NONE);
1218       desc.addFamily(hcd);
1219     }
1220     getHBaseAdmin().createTable(desc);
1221     return new HTable(c, tableName);
1222   }
1223 
1224   /**
1225    * Create a table.
1226    * @param tableName
1227    * @param families
1228    * @param c Configuration to use
1229    * @param numVersions
1230    * @return An HTable instance for the created table.
1231    * @throws IOException
1232    */
1233   public HTable createTable(TableName tableName, byte[][] families,
1234       final Configuration c, int numVersions)
1235   throws IOException {
1236     HTableDescriptor desc = new HTableDescriptor(tableName);
1237     for(byte[] family : families) {
1238       HColumnDescriptor hcd = new HColumnDescriptor(family)
1239           .setMaxVersions(numVersions);
1240       desc.addFamily(hcd);
1241     }
1242     getHBaseAdmin().createTable(desc);
1243     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1244     waitUntilAllRegionsAssigned(tableName);
1245     return new HTable(c, tableName);
1246   }
1247 
1248   /**
1249    * Create a table.
1250    * @param tableName
1251    * @param families
1252    * @param c Configuration to use
1253    * @param numVersions
1254    * @return An HTable instance for the created table.
1255    * @throws IOException
1256    */
1257   public HTable createTable(byte[] tableName, byte[][] families,
1258       final Configuration c, int numVersions)
1259   throws IOException {
1260     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1261     for(byte[] family : families) {
1262       HColumnDescriptor hcd = new HColumnDescriptor(family)
1263           .setMaxVersions(numVersions);
1264       desc.addFamily(hcd);
1265     }
1266     getHBaseAdmin().createTable(desc);
1267     return new HTable(c, tableName);
1268   }
1269 
1270   /**
1271    * Create a table.
1272    * @param tableName
1273    * @param family
1274    * @param numVersions
1275    * @return An HTable instance for the created table.
1276    * @throws IOException
1277    */
1278   public HTable createTable(byte[] tableName, byte[] family, int numVersions)
1279   throws IOException {
1280     return createTable(tableName, new byte[][]{family}, numVersions);
1281   }
1282 
1283   /**
1284    * Create a table.
1285    * @param tableName
1286    * @param family
1287    * @param numVersions
1288    * @return An HTable instance for the created table.
1289    * @throws IOException
1290    */
1291   public HTable createTable(TableName tableName, byte[] family, int numVersions)
1292   throws IOException {
1293     return createTable(tableName, new byte[][]{family}, numVersions);
1294   }
1295 
1296   /**
1297    * Create a table.
1298    * @param tableName
1299    * @param families
1300    * @param numVersions
1301    * @return An HTable instance for the created table.
1302    * @throws IOException
1303    */
1304   public HTable createTable(byte[] tableName, byte[][] families,
1305       int numVersions)
1306   throws IOException {
1307     return createTable(TableName.valueOf(tableName), families, numVersions);
1308   }
1309 
1310   /**
1311    * Create a table.
1312    * @param tableName
1313    * @param families
1314    * @param numVersions
1315    * @return An HTable instance for the created table.
1316    * @throws IOException
1317    */
1318   public HTable createTable(TableName tableName, byte[][] families,
1319       int numVersions)
1320   throws IOException {
1321     HTableDescriptor desc = new HTableDescriptor(tableName);
1322     for (byte[] family : families) {
1323       HColumnDescriptor hcd = new HColumnDescriptor(family).setMaxVersions(numVersions);
1324       desc.addFamily(hcd);
1325     }
1326     getHBaseAdmin().createTable(desc);
1327     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1328     waitUntilAllRegionsAssigned(tableName);
1329     return new HTable(new Configuration(getConfiguration()), tableName);
1330   }
1331 
1332   /**
1333    * Create a table.
1334    * @param tableName
1335    * @param families
1336    * @param numVersions
1337    * @return An HTable instance for the created table.
1338    * @throws IOException
1339    */
1340   public HTable createTable(byte[] tableName, byte[][] families,
1341     int numVersions, int blockSize) throws IOException {
1342     return createTable(TableName.valueOf(tableName),
1343         families, numVersions, blockSize);
1344   }
1345 
1346   /**
1347    * Create a table.
1348    * @param tableName
1349    * @param families
1350    * @param numVersions
1351    * @return An HTable instance for the created table.
1352    * @throws IOException
1353    */
1354   public HTable createTable(TableName tableName, byte[][] families,
1355     int numVersions, int blockSize) throws IOException {
1356     HTableDescriptor desc = new HTableDescriptor(tableName);
1357     for (byte[] family : families) {
1358       HColumnDescriptor hcd = new HColumnDescriptor(family)
1359           .setMaxVersions(numVersions)
1360           .setBlocksize(blockSize);
1361       desc.addFamily(hcd);
1362     }
1363     getHBaseAdmin().createTable(desc);
1364     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1365     waitUntilAllRegionsAssigned(tableName);
1366     return new HTable(new Configuration(getConfiguration()), tableName);
1367   }
1368 
1369   /**
1370    * Create a table.
1371    * @param tableName
1372    * @param families
1373    * @param numVersions
1374    * @return An HTable instance for the created table.
1375    * @throws IOException
1376    */
1377   public HTable createTable(byte[] tableName, byte[][] families,
1378       int[] numVersions)
1379   throws IOException {
1380     return createTable(TableName.valueOf(tableName), families, numVersions);
1381   }
1382 
1383   /**
1384    * Create a table.
1385    * @param tableName
1386    * @param families
1387    * @param numVersions
1388    * @return An HTable instance for the created table.
1389    * @throws IOException
1390    */
1391   public HTable createTable(TableName tableName, byte[][] families,
1392       int[] numVersions)
1393   throws IOException {
1394     HTableDescriptor desc = new HTableDescriptor(tableName);
1395     int i = 0;
1396     for (byte[] family : families) {
1397       HColumnDescriptor hcd = new HColumnDescriptor(family)
1398           .setMaxVersions(numVersions[i]);
1399       desc.addFamily(hcd);
1400       i++;
1401     }
1402     getHBaseAdmin().createTable(desc);
1403     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1404     waitUntilAllRegionsAssigned(tableName);
1405     return new HTable(new Configuration(getConfiguration()), tableName);
1406   }
1407 
1408   /**
1409    * Create a table.
1410    * @param tableName
1411    * @param family
1412    * @param splitRows
1413    * @return An HTable instance for the created table.
1414    * @throws IOException
1415    */
1416   public HTable createTable(byte[] tableName, byte[] family, byte[][] splitRows)
1417     throws IOException{
1418     return createTable(TableName.valueOf(tableName), family, splitRows);
1419   }
1420 
1421   /**
1422    * Create a table.
1423    * @param tableName
1424    * @param family
1425    * @param splitRows
1426    * @return An HTable instance for the created table.
1427    * @throws IOException
1428    */
1429   public HTable createTable(TableName tableName, byte[] family, byte[][] splitRows)
1430       throws IOException {
1431     HTableDescriptor desc = new HTableDescriptor(tableName);
1432     HColumnDescriptor hcd = new HColumnDescriptor(family);
1433     desc.addFamily(hcd);
1434     getHBaseAdmin().createTable(desc, splitRows);
1435     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1436     waitUntilAllRegionsAssigned(tableName);
1437     return new HTable(getConfiguration(), tableName);
1438   }
1439 
1440   /**
1441    * Create a table.
1442    * @param tableName
1443    * @param families
1444    * @param splitRows
1445    * @return An HTable instance for the created table.
1446    * @throws IOException
1447    */
1448   public HTable createTable(byte[] tableName, byte[][] families, byte[][] splitRows)
1449       throws IOException {
1450     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1451     for(byte[] family:families) {
1452       HColumnDescriptor hcd = new HColumnDescriptor(family);
1453       desc.addFamily(hcd);
1454     }
1455     getHBaseAdmin().createTable(desc, splitRows);
1456     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1457     waitUntilAllRegionsAssigned(TableName.valueOf(tableName));
1458     return new HTable(getConfiguration(), tableName);
1459   }
1460 
1461   /**
1462    * Drop an existing table
1463    * @param tableName existing table
1464    */
1465   public void deleteTable(String tableName) throws IOException {
1466     deleteTable(TableName.valueOf(tableName));
1467   }
1468 
1469   /**
1470    * Drop an existing table
1471    * @param tableName existing table
1472    */
1473   public void deleteTable(byte[] tableName) throws IOException {
1474     deleteTable(TableName.valueOf(tableName));
1475   }
1476 
1477   /**
1478    * Drop an existing table
1479    * @param tableName existing table
1480    */
1481   public void deleteTable(TableName tableName) throws IOException {
1482     try {
1483       getHBaseAdmin().disableTable(tableName);
1484     } catch (TableNotEnabledException e) {
1485       LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1486     }
1487     getHBaseAdmin().deleteTable(tableName);
1488   }
1489 
1490   // ==========================================================================
1491   // Canned table and table descriptor creation
1492   // TODO replace HBaseTestCase
1493 
1494   public final static byte [] fam1 = Bytes.toBytes("colfamily11");
1495   public final static byte [] fam2 = Bytes.toBytes("colfamily21");
1496   public final static byte [] fam3 = Bytes.toBytes("colfamily31");
1497   public static final byte[][] COLUMNS = {fam1, fam2, fam3};
1498   private static final int MAXVERSIONS = 3;
1499 
1500   public static final char FIRST_CHAR = 'a';
1501   public static final char LAST_CHAR = 'z';
1502   public static final byte [] START_KEY_BYTES = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
1503   public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1504 
1505   /**
1506    * Create a table of name <code>name</code> with {@link COLUMNS} for
1507    * families.
1508    * @param name Name to give table.
1509    * @param versions How many versions to allow per column.
1510    * @return Column descriptor.
1511    */
1512   public HTableDescriptor createTableDescriptor(final String name,
1513       final int minVersions, final int versions, final int ttl, boolean keepDeleted) {
1514     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name));
1515     for (byte[] cfName : new byte[][]{ fam1, fam2, fam3 }) {
1516       htd.addFamily(new HColumnDescriptor(cfName)
1517           .setMinVersions(minVersions)
1518           .setMaxVersions(versions)
1519           .setKeepDeletedCells(keepDeleted)
1520           .setBlockCacheEnabled(false)
1521           .setTimeToLive(ttl)
1522       );
1523     }
1524     return htd;
1525   }
1526 
1527   /**
1528    * Create a table of name <code>name</code> with {@link COLUMNS} for
1529    * families.
1530    * @param name Name to give table.
1531    * @return Column descriptor.
1532    */
1533   public HTableDescriptor createTableDescriptor(final String name) {
1534     return createTableDescriptor(name,  HColumnDescriptor.DEFAULT_MIN_VERSIONS,
1535         MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
1536   }
1537 
1538   /**
1539    * Create an HRegion that writes to the local tmp dirs
1540    * @param desc
1541    * @param startKey
1542    * @param endKey
1543    * @return
1544    * @throws IOException
1545    */
1546   public HRegion createLocalHRegion(HTableDescriptor desc, byte [] startKey,
1547       byte [] endKey)
1548   throws IOException {
1549     HRegionInfo hri = new HRegionInfo(desc.getTableName(), startKey, endKey);
1550     return createLocalHRegion(hri, desc);
1551   }
1552 
1553   /**
1554    * Create an HRegion that writes to the local tmp dirs
1555    * @param info
1556    * @param desc
1557    * @return
1558    * @throws IOException
1559    */
1560   public HRegion createLocalHRegion(HRegionInfo info, HTableDescriptor desc) throws IOException {
1561     return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc);
1562   }
1563 
1564   /**
1565    * Create an HRegion that writes to the local tmp dirs with specified hlog
1566    * @param info regioninfo
1567    * @param desc table descriptor
1568    * @param hlog hlog for this region.
1569    * @return created hregion
1570    * @throws IOException
1571    */
1572   public HRegion createLocalHRegion(HRegionInfo info, HTableDescriptor desc, HLog hlog) throws IOException {
1573     return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, hlog);
1574   }
1575 
1576 
1577   /**
1578    * @param tableName
1579    * @param startKey
1580    * @param stopKey
1581    * @param callingMethod
1582    * @param conf
1583    * @param isReadOnly
1584    * @param families
1585    * @throws IOException
1586    * @return A region on which you must call
1587    *         {@link HRegion#closeHRegion(HRegion)} when done.
1588    */
1589   public HRegion createLocalHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
1590       String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
1591       HLog hlog, byte[]... families) throws IOException {
1592     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
1593     htd.setReadOnly(isReadOnly);
1594     for (byte[] family : families) {
1595       HColumnDescriptor hcd = new HColumnDescriptor(family);
1596       // Set default to be three versions.
1597       hcd.setMaxVersions(Integer.MAX_VALUE);
1598       htd.addFamily(hcd);
1599     }
1600     htd.setDurability(durability);
1601     HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false);
1602     return createLocalHRegion(info, htd, hlog);
1603   }
1604   //
1605   // ==========================================================================
1606 
1607   /**
1608    * Provide an existing table name to truncate
1609    * @param tableName existing table
1610    * @return HTable to that new table
1611    * @throws IOException
1612    */
1613   public HTable truncateTable(byte[] tableName) throws IOException {
1614     return truncateTable(TableName.valueOf(tableName));
1615   }
1616 
1617   /**
1618    * Provide an existing table name to truncate
1619    * @param tableName existing table
1620    * @return HTable to that new table
1621    * @throws IOException
1622    */
1623   public HTable truncateTable(TableName tableName) throws IOException {
1624     HTable table = new HTable(getConfiguration(), tableName);
1625     Scan scan = new Scan();
1626     ResultScanner resScan = table.getScanner(scan);
1627     for(Result res : resScan) {
1628       Delete del = new Delete(res.getRow());
1629       table.delete(del);
1630     }
1631     resScan = table.getScanner(scan);
1632     resScan.close();
1633     return table;
1634   }
1635 
1636   /**
1637    * Load table with rows from 'aaa' to 'zzz'.
1638    * @param t Table
1639    * @param f Family
1640    * @return Count of rows loaded.
1641    * @throws IOException
1642    */
1643   public int loadTable(final HTable t, final byte[] f) throws IOException {
1644     return loadTable(t, new byte[][] {f});
1645   }
1646 
1647   /**
1648    * Load table with rows from 'aaa' to 'zzz'.
1649    * @param t Table
1650    * @param f Family
1651    * @return Count of rows loaded.
1652    * @throws IOException
1653    */
1654   public int loadTable(final HTable t, final byte[] f, boolean writeToWAL) throws IOException {
1655     return loadTable(t, new byte[][] {f}, null, writeToWAL);
1656   }
1657 
1658   /**
1659    * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1660    * @param t Table
1661    * @param f Array of Families to load
1662    * @return Count of rows loaded.
1663    * @throws IOException
1664    */
1665   public int loadTable(final HTable t, final byte[][] f) throws IOException {
1666     return loadTable(t, f, null);
1667   }
1668 
1669   /**
1670    * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1671    * @param t Table
1672    * @param f Array of Families to load
1673    * @param value the values of the cells. If null is passed, the row key is used as value
1674    * @return Count of rows loaded.
1675    * @throws IOException
1676    */
1677   public int loadTable(final HTable t, final byte[][] f, byte[] value) throws IOException {
1678     return loadTable(t, f, value, true);
1679   }
1680 
1681   /**
1682    * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1683    * @param t Table
1684    * @param f Array of Families to load
1685    * @param value the values of the cells. If null is passed, the row key is used as value
1686    * @return Count of rows loaded.
1687    * @throws IOException
1688    */
1689   public int loadTable(final HTable t, final byte[][] f, byte[] value, boolean writeToWAL) throws IOException {
1690     t.setAutoFlush(false);
1691     int rowCount = 0;
1692     for (byte[] row : HBaseTestingUtility.ROWS) {
1693       Put put = new Put(row);
1694       put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
1695       for (int i = 0; i < f.length; i++) {
1696         put.add(f[i], null, value != null ? value : row);
1697       }
1698       t.put(put);
1699       rowCount++;
1700     }
1701     t.flushCommits();
1702     return rowCount;
1703   }
1704 
1705   /** A tracker for tracking and validating table rows
1706    * generated with {@link HBaseTestingUtility#loadTable(HTable, byte[])}
1707    */
1708   public static class SeenRowTracker {
1709     int dim = 'z' - 'a' + 1;
1710     int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen
1711     byte[] startRow;
1712     byte[] stopRow;
1713 
1714     public SeenRowTracker(byte[] startRow, byte[] stopRow) {
1715       this.startRow = startRow;
1716       this.stopRow = stopRow;
1717     }
1718 
1719     void reset() {
1720       for (byte[] row : ROWS) {
1721         seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
1722       }
1723     }
1724 
1725     int i(byte b) {
1726       return b - 'a';
1727     }
1728 
1729     public void addRow(byte[] row) {
1730       seenRows[i(row[0])][i(row[1])][i(row[2])]++;
1731     }
1732 
1733     /** Validate that all the rows between startRow and stopRow are seen exactly once, and
1734      * all other rows none
1735      */
1736     public void validate() {
1737       for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1738         for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1739           for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1740             int count = seenRows[i(b1)][i(b2)][i(b3)];
1741             int expectedCount = 0;
1742             if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0
1743                 && Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) {
1744               expectedCount = 1;
1745             }
1746             if (count != expectedCount) {
1747               String row = new String(new byte[] {b1,b2,b3});
1748               throw new RuntimeException("Row:" + row + " has a seen count of " + count + " instead of " + expectedCount);
1749             }
1750           }
1751         }
1752       }
1753     }
1754   }
1755 
1756   public int loadRegion(final HRegion r, final byte[] f) throws IOException {
1757     return loadRegion(r, f, false);
1758   }
1759 
1760   /**
1761    * Load region with rows from 'aaa' to 'zzz'.
1762    * @param r Region
1763    * @param f Family
1764    * @param flush flush the cache if true
1765    * @return Count of rows loaded.
1766    * @throws IOException
1767    */
1768   public int loadRegion(final HRegion r, final byte[] f, final boolean flush)
1769   throws IOException {
1770     byte[] k = new byte[3];
1771     int rowCount = 0;
1772     for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1773       for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1774         for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1775           k[0] = b1;
1776           k[1] = b2;
1777           k[2] = b3;
1778           Put put = new Put(k);
1779           put.setDurability(Durability.SKIP_WAL);
1780           put.add(f, null, k);
1781           if (r.getLog() == null) put.setDurability(Durability.SKIP_WAL);
1782 
1783           int preRowCount = rowCount;
1784           int pause = 10;
1785           int maxPause = 1000;
1786           while (rowCount == preRowCount) {
1787             try {
1788               r.put(put);
1789               rowCount++;
1790             } catch (RegionTooBusyException e) {
1791               pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
1792               Threads.sleep(pause);
1793             }
1794           }
1795         }
1796       }
1797       if (flush) {
1798         r.flushcache();
1799       }
1800     }
1801     return rowCount;
1802   }
1803 
1804   public void loadNumericRows(final HTable t, final byte[] f, int startRow, int endRow) throws IOException {
1805     for (int i = startRow; i < endRow; i++) {
1806       byte[] data = Bytes.toBytes(String.valueOf(i));
1807       Put put = new Put(data);
1808       put.add(f, null, data);
1809       t.put(put);
1810     }
1811   }
1812 
1813   /**
1814    * Return the number of rows in the given table.
1815    */
1816   public int countRows(final HTable table) throws IOException {
1817     Scan scan = new Scan();
1818     ResultScanner results = table.getScanner(scan);
1819     int count = 0;
1820     for (@SuppressWarnings("unused") Result res : results) {
1821       count++;
1822     }
1823     results.close();
1824     return count;
1825   }
1826 
1827   public int countRows(final HTable table, final byte[]... families) throws IOException {
1828     Scan scan = new Scan();
1829     for (byte[] family: families) {
1830       scan.addFamily(family);
1831     }
1832     ResultScanner results = table.getScanner(scan);
1833     int count = 0;
1834     for (@SuppressWarnings("unused") Result res : results) {
1835       count++;
1836     }
1837     results.close();
1838     return count;
1839   }
1840 
1841   /**
1842    * Return an md5 digest of the entire contents of a table.
1843    */
1844   public String checksumRows(final HTable table) throws Exception {
1845     Scan scan = new Scan();
1846     ResultScanner results = table.getScanner(scan);
1847     MessageDigest digest = MessageDigest.getInstance("MD5");
1848     for (Result res : results) {
1849       digest.update(res.getRow());
1850     }
1851     results.close();
1852     return digest.toString();
1853   }
1854 
1855   /**
1856    * Creates many regions names "aaa" to "zzz".
1857    *
1858    * @param table  The table to use for the data.
1859    * @param columnFamily  The family to insert the data into.
1860    * @return count of regions created.
1861    * @throws IOException When creating the regions fails.
1862    */
1863   public int createMultiRegions(HTable table, byte[] columnFamily)
1864   throws IOException {
1865     return createMultiRegions(getConfiguration(), table, columnFamily);
1866   }
1867 
1868   /** All the row values for the data loaded by {@link #loadTable(HTable, byte[])} */
1869   public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
1870   static {
1871     int i = 0;
1872     for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1873       for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1874         for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1875           ROWS[i][0] = b1;
1876           ROWS[i][1] = b2;
1877           ROWS[i][2] = b3;
1878           i++;
1879         }
1880       }
1881     }
1882   }
1883 
1884   public static final byte[][] KEYS = {
1885     HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
1886     Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
1887     Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
1888     Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
1889     Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
1890     Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
1891     Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
1892     Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
1893     Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
1894   };
1895 
1896   public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = {
1897       Bytes.toBytes("bbb"),
1898       Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
1899       Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
1900       Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
1901       Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
1902       Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
1903       Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
1904       Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
1905       Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz")
1906   };
1907 
1908   /**
1909    * Creates many regions names "aaa" to "zzz".
1910    * @param c Configuration to use.
1911    * @param table  The table to use for the data.
1912    * @param columnFamily  The family to insert the data into.
1913    * @return count of regions created.
1914    * @throws IOException When creating the regions fails.
1915    */
1916   public int createMultiRegions(final Configuration c, final HTable table,
1917       final byte[] columnFamily)
1918   throws IOException {
1919     return createMultiRegions(c, table, columnFamily, KEYS);
1920   }
1921 
1922   /**
1923    * Creates the specified number of regions in the specified table.
1924    * @param c
1925    * @param table
1926    * @param family
1927    * @param numRegions
1928    * @return
1929    * @throws IOException
1930    */
1931   public int createMultiRegions(final Configuration c, final HTable table,
1932       final byte [] family, int numRegions)
1933   throws IOException {
1934     if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1935     byte [] startKey = Bytes.toBytes("aaaaa");
1936     byte [] endKey = Bytes.toBytes("zzzzz");
1937     byte [][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1938     byte [][] regionStartKeys = new byte[splitKeys.length+1][];
1939     for (int i=0;i<splitKeys.length;i++) {
1940       regionStartKeys[i+1] = splitKeys[i];
1941     }
1942     regionStartKeys[0] = HConstants.EMPTY_BYTE_ARRAY;
1943     return createMultiRegions(c, table, family, regionStartKeys);
1944   }
1945 
1946   @SuppressWarnings("deprecation")
1947   public int createMultiRegions(final Configuration c, final HTable table,
1948       final byte[] columnFamily, byte [][] startKeys)
1949   throws IOException {
1950     Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
1951     HTable meta = new HTable(c, TableName.META_TABLE_NAME);
1952     HTableDescriptor htd = table.getTableDescriptor();
1953     if(!htd.hasFamily(columnFamily)) {
1954       HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
1955       htd.addFamily(hcd);
1956     }
1957     // remove empty region - this is tricky as the mini cluster during the test
1958     // setup already has the "<tablename>,,123456789" row with an empty start
1959     // and end key. Adding the custom regions below adds those blindly,
1960     // including the new start region from empty to "bbb". lg
1961     List<byte[]> rows = getMetaTableRows(htd.getTableName());
1962     String regionToDeleteInFS = table
1963         .getRegionsInRange(Bytes.toBytes(""), Bytes.toBytes("")).get(0)
1964         .getRegionInfo().getEncodedName();
1965     List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length);
1966     // add custom ones
1967     int count = 0;
1968     for (int i = 0; i < startKeys.length; i++) {
1969       int j = (i + 1) % startKeys.length;
1970       HRegionInfo hri = new HRegionInfo(table.getName(),
1971         startKeys[i], startKeys[j]);
1972       MetaEditor.addRegionToMeta(meta, hri);
1973       newRegions.add(hri);
1974       count++;
1975     }
1976     // see comment above, remove "old" (or previous) single region
1977     for (byte[] row : rows) {
1978       LOG.info("createMultiRegions: deleting meta row -> " +
1979         Bytes.toStringBinary(row));
1980       meta.delete(new Delete(row));
1981     }
1982     // remove the "old" region from FS
1983     Path tableDir = new Path(getDefaultRootDirPath().toString()
1984         + System.getProperty("file.separator") + htd.getTableName()
1985         + System.getProperty("file.separator") + regionToDeleteInFS);
1986     FileSystem.get(c).delete(tableDir);
1987     // flush cache of regions
1988     HConnection conn = table.getConnection();
1989     conn.clearRegionCache();
1990     // assign all the new regions IF table is enabled.
1991     HBaseAdmin admin = getHBaseAdmin();
1992     if (admin.isTableEnabled(table.getTableName())) {
1993       for(HRegionInfo hri : newRegions) {
1994         admin.assign(hri.getRegionName());
1995       }
1996     }
1997 
1998     meta.close();
1999 
2000     return count;
2001   }
2002 
2003   /**
2004    * Create rows in hbase:meta for regions of the specified table with the specified
2005    * start keys.  The first startKey should be a 0 length byte array if you
2006    * want to form a proper range of regions.
2007    * @param conf
2008    * @param htd
2009    * @param startKeys
2010    * @return list of region info for regions added to meta
2011    * @throws IOException
2012    */
2013   public List<HRegionInfo> createMultiRegionsInMeta(final Configuration conf,
2014       final HTableDescriptor htd, byte [][] startKeys)
2015   throws IOException {
2016     HTable meta = new HTable(conf, TableName.META_TABLE_NAME);
2017     Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2018     List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length);
2019     // add custom ones
2020     for (int i = 0; i < startKeys.length; i++) {
2021       int j = (i + 1) % startKeys.length;
2022       HRegionInfo hri = new HRegionInfo(htd.getTableName(), startKeys[i],
2023           startKeys[j]);
2024       MetaEditor.addRegionToMeta(meta, hri);
2025       newRegions.add(hri);
2026     }
2027 
2028     meta.close();
2029     return newRegions;
2030   }
2031 
2032   /**
2033    * Returns all rows from the hbase:meta table.
2034    *
2035    * @throws IOException When reading the rows fails.
2036    */
2037   public List<byte[]> getMetaTableRows() throws IOException {
2038     // TODO: Redo using MetaReader class
2039     HTable t = new HTable(new Configuration(this.conf), TableName.META_TABLE_NAME);
2040     List<byte[]> rows = new ArrayList<byte[]>();
2041     ResultScanner s = t.getScanner(new Scan());
2042     for (Result result : s) {
2043       LOG.info("getMetaTableRows: row -> " +
2044         Bytes.toStringBinary(result.getRow()));
2045       rows.add(result.getRow());
2046     }
2047     s.close();
2048     t.close();
2049     return rows;
2050   }
2051 
2052   /**
2053    * Returns all rows from the hbase:meta table for a given user table
2054    *
2055    * @throws IOException When reading the rows fails.
2056    */
2057   public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
2058     // TODO: Redo using MetaReader.
2059     HTable t = new HTable(new Configuration(this.conf), TableName.META_TABLE_NAME);
2060     List<byte[]> rows = new ArrayList<byte[]>();
2061     ResultScanner s = t.getScanner(new Scan());
2062     for (Result result : s) {
2063       HRegionInfo info = HRegionInfo.getHRegionInfo(result);
2064       if (info == null) {
2065         LOG.error("No region info for row " + Bytes.toString(result.getRow()));
2066         // TODO figure out what to do for this new hosed case.
2067         continue;
2068       }
2069 
2070       if (info.getTable().equals(tableName)) {
2071         LOG.info("getMetaTableRows: row -> " +
2072             Bytes.toStringBinary(result.getRow()) + info);
2073         rows.add(result.getRow());
2074       }
2075     }
2076     s.close();
2077     t.close();
2078     return rows;
2079   }
2080 
2081   /**
2082    * Tool to get the reference to the region server object that holds the
2083    * region of the specified user table.
2084    * It first searches for the meta rows that contain the region of the
2085    * specified table, then gets the index of that RS, and finally retrieves
2086    * the RS's reference.
2087    * @param tableName user table to lookup in hbase:meta
2088    * @return region server that holds it, null if the row doesn't exist
2089    * @throws IOException
2090    * @throws InterruptedException
2091    */
2092   public HRegionServer getRSForFirstRegionInTable(byte[] tableName)
2093       throws IOException, InterruptedException {
2094     return getRSForFirstRegionInTable(TableName.valueOf(tableName));
2095   }
2096   /**
2097    * Tool to get the reference to the region server object that holds the
2098    * region of the specified user table.
2099    * It first searches for the meta rows that contain the region of the
2100    * specified table, then gets the index of that RS, and finally retrieves
2101    * the RS's reference.
2102    * @param tableName user table to lookup in hbase:meta
2103    * @return region server that holds it, null if the row doesn't exist
2104    * @throws IOException
2105    */
2106   public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2107       throws IOException, InterruptedException {
2108     List<byte[]> metaRows = getMetaTableRows(tableName);
2109     if (metaRows == null || metaRows.isEmpty()) {
2110       return null;
2111     }
2112     LOG.debug("Found " + metaRows.size() + " rows for table " +
2113       tableName);
2114     byte [] firstrow = metaRows.get(0);
2115     LOG.debug("FirstRow=" + Bytes.toString(firstrow));
2116     long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2117       HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2118     int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2119       HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2120     RetryCounter retrier = new RetryCounter(numRetries+1, (int)pause, TimeUnit.MICROSECONDS);
2121     while(retrier.shouldRetry()) {
2122       int index = getMiniHBaseCluster().getServerWith(firstrow);
2123       if (index != -1) {
2124         return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2125       }
2126       // Came back -1.  Region may not be online yet.  Sleep a while.
2127       retrier.sleepUntilNextRetry();
2128     }
2129     return null;
2130   }
2131 
2132   /**
2133    * Starts a <code>MiniMRCluster</code> with a default number of
2134    * <code>TaskTracker</code>'s.
2135    *
2136    * @throws IOException When starting the cluster fails.
2137    */
2138   public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2139     startMiniMapReduceCluster(2);
2140     return mrCluster;
2141   }
2142 
2143   /**
2144    * Tasktracker has a bug where changing the hadoop.log.dir system property
2145    * will not change its internal static LOG_DIR variable.
2146    */
2147   private void forceChangeTaskLogDir() {
2148     Field logDirField;
2149     try {
2150       logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2151       logDirField.setAccessible(true);
2152 
2153       Field modifiersField = Field.class.getDeclaredField("modifiers");
2154       modifiersField.setAccessible(true);
2155       modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2156 
2157       logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2158     } catch (SecurityException e) {
2159       throw new RuntimeException(e);
2160     } catch (NoSuchFieldException e) {
2161       // TODO Auto-generated catch block
2162       throw new RuntimeException(e);
2163     } catch (IllegalArgumentException e) {
2164       throw new RuntimeException(e);
2165     } catch (IllegalAccessException e) {
2166       throw new RuntimeException(e);
2167     }
2168   }
2169 
2170   /**
2171    * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2172    * filesystem.
2173    * @param servers  The number of <code>TaskTracker</code>'s to start.
2174    * @throws IOException When starting the cluster fails.
2175    */
2176   private void startMiniMapReduceCluster(final int servers) throws IOException {
2177     if (mrCluster != null) {
2178       throw new IllegalStateException("MiniMRCluster is already running");
2179     }
2180     LOG.info("Starting mini mapreduce cluster...");
2181     setupClusterTestDir();
2182     createDirsAndSetProperties();
2183 
2184     forceChangeTaskLogDir();
2185 
2186     //// hadoop2 specific settings
2187     // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2188     // we up the VM usable so that processes don't get killed.
2189     conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2190 
2191     // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2192     // this avoids the problem by disabling speculative task execution in tests.
2193     conf.setBoolean("mapreduce.map.speculative", false);
2194     conf.setBoolean("mapreduce.reduce.speculative", false);
2195     ////
2196 
2197     // Allow the user to override FS URI for this map-reduce cluster to use.
2198     mrCluster = new MiniMRCluster(servers,
2199       FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 1,
2200       null, null, new JobConf(this.conf));
2201     JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2202     if (jobConf == null) {
2203       jobConf = mrCluster.createJobConf();
2204     }
2205 
2206     jobConf.set("mapred.local.dir",
2207       conf.get("mapred.local.dir")); //Hadoop MiniMR overwrites this while it should not
2208     LOG.info("Mini mapreduce cluster started");
2209 
2210     // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2211     // Our HBase MR jobs need several of these settings in order to properly run.  So we copy the
2212     // necessary config properties here.  YARN-129 required adding a few properties.
2213     conf.set("mapred.job.tracker", jobConf.get("mapred.job.tracker"));
2214     // this for mrv2 support; mr1 ignores this
2215     conf.set("mapreduce.framework.name", "yarn");
2216     conf.setBoolean("yarn.is.minicluster", true);
2217     String rmAddress = jobConf.get("yarn.resourcemanager.address");
2218     if (rmAddress != null) {
2219       conf.set("yarn.resourcemanager.address", rmAddress);
2220     }
2221     String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2222     if (historyAddress != null) {
2223       conf.set("mapreduce.jobhistory.address", historyAddress);
2224     }
2225     String schedulerAddress =
2226       jobConf.get("yarn.resourcemanager.scheduler.address");
2227     if (schedulerAddress != null) {
2228       conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2229     }
2230   }
2231 
2232   /**
2233    * Stops the previously started <code>MiniMRCluster</code>.
2234    */
2235   public void shutdownMiniMapReduceCluster() {
2236     LOG.info("Stopping mini mapreduce cluster...");
2237     if (mrCluster != null) {
2238       mrCluster.shutdown();
2239       mrCluster = null;
2240     }
2241     // Restore configuration to point to local jobtracker
2242     conf.set("mapred.job.tracker", "local");
2243     LOG.info("Mini mapreduce cluster stopped");
2244   }
2245 
2246   /**
2247    * Create a stubbed out RegionServerService, mainly for getting FS.
2248    */
2249   public RegionServerServices createMockRegionServerService() throws IOException {
2250     return createMockRegionServerService((ServerName)null);
2251   }
2252 
2253   /**
2254    * Create a stubbed out RegionServerService, mainly for getting FS.
2255    * This version is used by TestTokenAuthentication
2256    */
2257   public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) throws IOException {
2258     final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2259     rss.setFileSystem(getTestFileSystem());
2260     rss.setRpcServer(rpc);
2261     return rss;
2262   }
2263 
2264   /**
2265    * Create a stubbed out RegionServerService, mainly for getting FS.
2266    * This version is used by TestOpenRegionHandler
2267    */
2268   public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2269     final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2270     rss.setFileSystem(getTestFileSystem());
2271     return rss;
2272   }
2273 
2274   /**
2275    * Switches the logger for the given class to DEBUG level.
2276    *
2277    * @param clazz  The class for which to switch to debug logging.
2278    */
2279   public void enableDebug(Class<?> clazz) {
2280     Log l = LogFactory.getLog(clazz);
2281     if (l instanceof Log4JLogger) {
2282       ((Log4JLogger) l).getLogger().setLevel(org.apache.log4j.Level.DEBUG);
2283     } else if (l instanceof Jdk14Logger) {
2284       ((Jdk14Logger) l).getLogger().setLevel(java.util.logging.Level.ALL);
2285     }
2286   }
2287 
2288   /**
2289    * Expire the Master's session
2290    * @throws Exception
2291    */
2292   public void expireMasterSession() throws Exception {
2293     HMaster master = getMiniHBaseCluster().getMaster();
2294     expireSession(master.getZooKeeper(), false);
2295   }
2296 
2297   /**
2298    * Expire a region server's session
2299    * @param index which RS
2300    * @throws Exception
2301    */
2302   public void expireRegionServerSession(int index) throws Exception {
2303     HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2304     expireSession(rs.getZooKeeper(), false);
2305     decrementMinRegionServerCount();
2306   }
2307 
2308   private void decrementMinRegionServerCount() {
2309     // decrement the count for this.conf, for newly spwaned master
2310     // this.hbaseCluster shares this configuration too
2311     decrementMinRegionServerCount(getConfiguration());
2312 
2313     // each master thread keeps a copy of configuration
2314     for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2315       decrementMinRegionServerCount(master.getMaster().getConfiguration());
2316     }
2317   }
2318 
2319   private void decrementMinRegionServerCount(Configuration conf) {
2320     int currentCount = conf.getInt(
2321         ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2322     if (currentCount != -1) {
2323       conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
2324           Math.max(currentCount - 1, 1));
2325     }
2326   }
2327 
2328   public void expireSession(ZooKeeperWatcher nodeZK) throws Exception {
2329    expireSession(nodeZK, false);
2330   }
2331 
2332   @Deprecated
2333   public void expireSession(ZooKeeperWatcher nodeZK, Server server)
2334     throws Exception {
2335     expireSession(nodeZK, false);
2336   }
2337 
2338   /**
2339    * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2340    * http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A4
2341    * There are issues when doing this:
2342    * [1] http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html
2343    * [2] https://issues.apache.org/jira/browse/ZOOKEEPER-1105
2344    *
2345    * @param nodeZK - the ZK watcher to expire
2346    * @param checkStatus - true to check if we can create an HTable with the
2347    *                    current configuration.
2348    */
2349   public void expireSession(ZooKeeperWatcher nodeZK, boolean checkStatus)
2350     throws Exception {
2351     Configuration c = new Configuration(this.conf);
2352     String quorumServers = ZKConfig.getZKQuorumServersString(c);
2353     ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2354     byte[] password = zk.getSessionPasswd();
2355     long sessionID = zk.getSessionId();
2356 
2357     // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2358     //  so we create a first watcher to be sure that the
2359     //  event was sent. We expect that if our watcher receives the event
2360     //  other watchers on the same machine will get is as well.
2361     // When we ask to close the connection, ZK does not close it before
2362     //  we receive all the events, so don't have to capture the event, just
2363     //  closing the connection should be enough.
2364     ZooKeeper monitor = new ZooKeeper(quorumServers,
2365       1000, new org.apache.zookeeper.Watcher(){
2366       @Override
2367       public void process(WatchedEvent watchedEvent) {
2368         LOG.info("Monitor ZKW received event="+watchedEvent);
2369       }
2370     } , sessionID, password);
2371 
2372     // Making it expire
2373     ZooKeeper newZK = new ZooKeeper(quorumServers,
2374         1000, EmptyWatcher.instance, sessionID, password);
2375 
2376     //ensure that we have connection to the server before closing down, otherwise
2377     //the close session event will be eaten out before we start CONNECTING state
2378     long start = System.currentTimeMillis();
2379     while (newZK.getState() != States.CONNECTED
2380          && System.currentTimeMillis() - start < 1000) {
2381        Thread.sleep(1);
2382     }
2383     newZK.close();
2384     LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2385 
2386     // Now closing & waiting to be sure that the clients get it.
2387     monitor.close();
2388 
2389     if (checkStatus) {
2390       new HTable(new Configuration(conf), TableName.META_TABLE_NAME).close();
2391     }
2392   }
2393 
2394   /**
2395    * Get the Mini HBase cluster.
2396    *
2397    * @return hbase cluster
2398    * @see #getHBaseClusterInterface()
2399    */
2400   public MiniHBaseCluster getHBaseCluster() {
2401     return getMiniHBaseCluster();
2402   }
2403 
2404   /**
2405    * Returns the HBaseCluster instance.
2406    * <p>Returned object can be any of the subclasses of HBaseCluster, and the
2407    * tests referring this should not assume that the cluster is a mini cluster or a
2408    * distributed one. If the test only works on a mini cluster, then specific
2409    * method {@link #getMiniHBaseCluster()} can be used instead w/o the
2410    * need to type-cast.
2411    */
2412   public HBaseCluster getHBaseClusterInterface() {
2413     //implementation note: we should rename this method as #getHBaseCluster(),
2414     //but this would require refactoring 90+ calls.
2415     return hbaseCluster;
2416   }
2417 
2418   /**
2419    * Returns a HBaseAdmin instance.
2420    * This instance is shared between HBaseTestingUtility instance users.
2421    * Closing it has no effect, it will be closed automatically when the
2422    * cluster shutdowns
2423    *
2424    * @return The HBaseAdmin instance.
2425    * @throws IOException
2426    */
2427   public synchronized HBaseAdmin getHBaseAdmin()
2428   throws IOException {
2429     if (hbaseAdmin == null){
2430       hbaseAdmin = new HBaseAdminForTests(getConfiguration());
2431     }
2432     return hbaseAdmin;
2433   }
2434 
2435   private HBaseAdminForTests hbaseAdmin = null;
2436   private static class HBaseAdminForTests extends HBaseAdmin {
2437     public HBaseAdminForTests(Configuration c) throws MasterNotRunningException,
2438         ZooKeeperConnectionException, IOException {
2439       super(c);
2440     }
2441 
2442     @Override
2443     public synchronized void close() throws IOException {
2444       LOG.warn("close() called on HBaseAdmin instance returned from HBaseTestingUtility.getHBaseAdmin()");
2445     }
2446 
2447     private synchronized void close0() throws IOException {
2448       super.close();
2449     }
2450   }
2451 
2452   /**
2453    * Returns a ZooKeeperWatcher instance.
2454    * This instance is shared between HBaseTestingUtility instance users.
2455    * Don't close it, it will be closed automatically when the
2456    * cluster shutdowns
2457    *
2458    * @return The ZooKeeperWatcher instance.
2459    * @throws IOException
2460    */
2461   public synchronized ZooKeeperWatcher getZooKeeperWatcher()
2462     throws IOException {
2463     if (zooKeeperWatcher == null) {
2464       zooKeeperWatcher = new ZooKeeperWatcher(conf, "testing utility",
2465         new Abortable() {
2466         @Override public void abort(String why, Throwable e) {
2467           throw new RuntimeException("Unexpected abort in HBaseTestingUtility:"+why, e);
2468         }
2469         @Override public boolean isAborted() {return false;}
2470       });
2471     }
2472     return zooKeeperWatcher;
2473   }
2474   private ZooKeeperWatcher zooKeeperWatcher;
2475 
2476 
2477 
2478   /**
2479    * Closes the named region.
2480    *
2481    * @param regionName  The region to close.
2482    * @throws IOException
2483    */
2484   public void closeRegion(String regionName) throws IOException {
2485     closeRegion(Bytes.toBytes(regionName));
2486   }
2487 
2488   /**
2489    * Closes the named region.
2490    *
2491    * @param regionName  The region to close.
2492    * @throws IOException
2493    */
2494   public void closeRegion(byte[] regionName) throws IOException {
2495     getHBaseAdmin().closeRegion(regionName, null);
2496   }
2497 
2498   /**
2499    * Closes the region containing the given row.
2500    *
2501    * @param row  The row to find the containing region.
2502    * @param table  The table to find the region.
2503    * @throws IOException
2504    */
2505   public void closeRegionByRow(String row, HTable table) throws IOException {
2506     closeRegionByRow(Bytes.toBytes(row), table);
2507   }
2508 
2509   /**
2510    * Closes the region containing the given row.
2511    *
2512    * @param row  The row to find the containing region.
2513    * @param table  The table to find the region.
2514    * @throws IOException
2515    */
2516   public void closeRegionByRow(byte[] row, HTable table) throws IOException {
2517     HRegionLocation hrl = table.getRegionLocation(row);
2518     closeRegion(hrl.getRegionInfo().getRegionName());
2519   }
2520 
2521   /*
2522    * Retrieves a splittable region randomly from tableName
2523    *
2524    * @param tableName name of table
2525    * @param maxAttempts maximum number of attempts, unlimited for value of -1
2526    * @return the HRegion chosen, null if none was found within limit of maxAttempts
2527    */
2528   public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2529     List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2530     int regCount = regions.size();
2531     Set<Integer> attempted = new HashSet<Integer>();
2532     int idx;
2533     int attempts = 0;
2534     do {
2535       regions = getHBaseCluster().getRegions(tableName);
2536       if (regCount != regions.size()) {
2537         // if there was region movement, clear attempted Set
2538         attempted.clear();
2539       }
2540       regCount = regions.size();
2541       // There are chances that before we get the region for the table from an RS the region may
2542       // be going for CLOSE.  This may be because online schema change is enabled
2543       if (regCount > 0) {
2544         idx = random.nextInt(regCount);
2545         // if we have just tried this region, there is no need to try again
2546         if (attempted.contains(idx))
2547           continue;
2548         try {
2549           regions.get(idx).checkSplit();
2550           return regions.get(idx);
2551         } catch (Exception ex) {
2552           LOG.warn("Caught exception", ex);
2553           attempted.add(idx);
2554         }
2555       }
2556       attempts++;
2557     } while (maxAttempts == -1 || attempts < maxAttempts);
2558     return null;
2559   }
2560 
2561   public MiniZooKeeperCluster getZkCluster() {
2562     return zkCluster;
2563   }
2564 
2565   public void setZkCluster(MiniZooKeeperCluster zkCluster) {
2566     this.passedZkCluster = true;
2567     this.zkCluster = zkCluster;
2568     conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkCluster.getClientPort());
2569   }
2570 
2571   public MiniDFSCluster getDFSCluster() {
2572     return dfsCluster;
2573   }
2574 
2575   public void setDFSCluster(MiniDFSCluster cluster) throws IOException {
2576     if (dfsCluster != null && dfsCluster.isClusterUp()) {
2577       throw new IOException("DFSCluster is already running! Shut it down first.");
2578     }
2579     this.dfsCluster = cluster;
2580   }
2581 
2582   public FileSystem getTestFileSystem() throws IOException {
2583     return HFileSystem.get(conf);
2584   }
2585 
2586   /**
2587    * Wait until all regions in a table have been assigned.  Waits default timeout before giving up
2588    * (30 seconds).
2589    * @param table Table to wait on.
2590    * @throws InterruptedException
2591    * @throws IOException
2592    */
2593   public void waitTableAvailable(byte[] table)
2594       throws InterruptedException, IOException {
2595     waitTableAvailable(getHBaseAdmin(), table, 30000);
2596   }
2597 
2598   public void waitTableAvailable(HBaseAdmin admin, byte[] table)
2599       throws InterruptedException, IOException {
2600     waitTableAvailable(admin, table, 30000);
2601   }
2602 
2603   /**
2604    * Wait until all regions in a table have been assigned
2605    * @param table Table to wait on.
2606    * @param timeoutMillis Timeout.
2607    * @throws InterruptedException
2608    * @throws IOException
2609    */
2610   public void waitTableAvailable(byte[] table, long timeoutMillis)
2611   throws InterruptedException, IOException {
2612     waitTableAvailable(getHBaseAdmin(), table, timeoutMillis);
2613   }
2614 
2615   public void waitTableAvailable(HBaseAdmin admin, byte[] table, long timeoutMillis)
2616   throws InterruptedException, IOException {
2617     long startWait = System.currentTimeMillis();
2618     while (!admin.isTableAvailable(table)) {
2619       assertTrue("Timed out waiting for table to become available " +
2620         Bytes.toStringBinary(table),
2621         System.currentTimeMillis() - startWait < timeoutMillis);
2622       Thread.sleep(200);
2623     }
2624   }
2625 
2626   /**
2627    * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
2628    * regions have been all assigned.  Will timeout after default period (30 seconds)
2629    * @see #waitTableAvailable(byte[])
2630    * @param table Table to wait on.
2631    * @param table
2632    * @throws InterruptedException
2633    * @throws IOException
2634    */
2635   public void waitTableEnabled(byte[] table)
2636       throws InterruptedException, IOException {
2637     waitTableEnabled(getHBaseAdmin(), table, 30000);
2638   }
2639 
2640   public void waitTableEnabled(HBaseAdmin admin, byte[] table)
2641       throws InterruptedException, IOException {
2642     waitTableEnabled(admin, table, 30000);
2643   }
2644 
2645   /**
2646    * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
2647    * regions have been all assigned.
2648    * @see #waitTableAvailable(byte[])
2649    * @param table Table to wait on.
2650    * @param timeoutMillis Time to wait on it being marked enabled.
2651    * @throws InterruptedException
2652    * @throws IOException
2653    */
2654   public void waitTableEnabled(byte[] table, long timeoutMillis)
2655   throws InterruptedException, IOException {
2656     waitTableEnabled(getHBaseAdmin(), table, timeoutMillis);
2657   }
2658 
2659   public void waitTableEnabled(HBaseAdmin admin, byte[] table, long timeoutMillis)
2660   throws InterruptedException, IOException {
2661     long startWait = System.currentTimeMillis();
2662     waitTableAvailable(admin, table, timeoutMillis);
2663     long remainder = System.currentTimeMillis() - startWait;
2664     while (!admin.isTableEnabled(table)) {
2665       assertTrue("Timed out waiting for table to become available and enabled " +
2666          Bytes.toStringBinary(table),
2667          System.currentTimeMillis() - remainder < timeoutMillis);
2668       Thread.sleep(200);
2669     }
2670     // Finally make sure all regions are fully open and online out on the cluster. Regions may be
2671     // in the hbase:meta table and almost open on all regionservers but there setting the region
2672     // online in the regionserver is the very last thing done and can take a little while to happen.
2673     // Below we do a get.  The get will retry if a NotServeringRegionException or a
2674     // RegionOpeningException.  It is crass but when done all will be online.
2675     try {
2676       Canary.sniff(admin, TableName.valueOf(table));
2677     } catch (Exception e) {
2678       throw new IOException(e);
2679     }
2680   }
2681 
2682   /**
2683    * Make sure that at least the specified number of region servers
2684    * are running
2685    * @param num minimum number of region servers that should be running
2686    * @return true if we started some servers
2687    * @throws IOException
2688    */
2689   public boolean ensureSomeRegionServersAvailable(final int num)
2690       throws IOException {
2691     boolean startedServer = false;
2692     MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
2693     for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i<num; ++i) {
2694       LOG.info("Started new server=" + hbaseCluster.startRegionServer());
2695       startedServer = true;
2696     }
2697 
2698     return startedServer;
2699   }
2700 
2701 
2702   /**
2703    * Make sure that at least the specified number of region servers
2704    * are running. We don't count the ones that are currently stopping or are
2705    * stopped.
2706    * @param num minimum number of region servers that should be running
2707    * @return true if we started some servers
2708    * @throws IOException
2709    */
2710   public boolean ensureSomeNonStoppedRegionServersAvailable(final int num)
2711     throws IOException {
2712     boolean startedServer = ensureSomeRegionServersAvailable(num);
2713 
2714     int nonStoppedServers = 0;
2715     for (JVMClusterUtil.RegionServerThread rst :
2716       getMiniHBaseCluster().getRegionServerThreads()) {
2717 
2718       HRegionServer hrs = rst.getRegionServer();
2719       if (hrs.isStopping() || hrs.isStopped()) {
2720         LOG.info("A region server is stopped or stopping:"+hrs);
2721       } else {
2722         nonStoppedServers++;
2723       }
2724     }
2725     for (int i=nonStoppedServers; i<num; ++i) {
2726       LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
2727       startedServer = true;
2728     }
2729     return startedServer;
2730   }
2731 
2732 
2733   /**
2734    * This method clones the passed <code>c</code> configuration setting a new
2735    * user into the clone.  Use it getting new instances of FileSystem.  Only
2736    * works for DistributedFileSystem.
2737    * @param c Initial configuration
2738    * @param differentiatingSuffix Suffix to differentiate this user from others.
2739    * @return A new configuration instance with a different user set into it.
2740    * @throws IOException
2741    */
2742   public static User getDifferentUser(final Configuration c,
2743     final String differentiatingSuffix)
2744   throws IOException {
2745     FileSystem currentfs = FileSystem.get(c);
2746     if (!(currentfs instanceof DistributedFileSystem)) {
2747       return User.getCurrent();
2748     }
2749     // Else distributed filesystem.  Make a new instance per daemon.  Below
2750     // code is taken from the AppendTestUtil over in hdfs.
2751     String username = User.getCurrent().getName() +
2752       differentiatingSuffix;
2753     User user = User.createUserForTesting(c, username,
2754         new String[]{"supergroup"});
2755     return user;
2756   }
2757 
2758   /**
2759    * Set maxRecoveryErrorCount in DFSClient.  In 0.20 pre-append its hard-coded to 5 and
2760    * makes tests linger.  Here is the exception you'll see:
2761    * <pre>
2762    * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/hlog.1276627923013 block blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683 failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
2763    * </pre>
2764    * @param stream A DFSClient.DFSOutputStream.
2765    * @param max
2766    * @throws NoSuchFieldException
2767    * @throws SecurityException
2768    * @throws IllegalAccessException
2769    * @throws IllegalArgumentException
2770    */
2771   public static void setMaxRecoveryErrorCount(final OutputStream stream,
2772       final int max) {
2773     try {
2774       Class<?> [] clazzes = DFSClient.class.getDeclaredClasses();
2775       for (Class<?> clazz: clazzes) {
2776         String className = clazz.getSimpleName();
2777         if (className.equals("DFSOutputStream")) {
2778           if (clazz.isInstance(stream)) {
2779             Field maxRecoveryErrorCountField =
2780               stream.getClass().getDeclaredField("maxRecoveryErrorCount");
2781             maxRecoveryErrorCountField.setAccessible(true);
2782             maxRecoveryErrorCountField.setInt(stream, max);
2783             break;
2784           }
2785         }
2786       }
2787     } catch (Exception e) {
2788       LOG.info("Could not set max recovery field", e);
2789     }
2790   }
2791 
2792   /**
2793    * Wait until all regions for a table in hbase:meta have a non-empty
2794    * info:server, up to 60 seconds. This means all regions have been deployed,
2795    * master has been informed and updated hbase:meta with the regions deployed
2796    * server.
2797    * @param tableName the table name
2798    * @throws IOException
2799    */
2800   public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
2801     waitUntilAllRegionsAssigned(tableName, 60000);
2802   }
2803 
2804   /**
2805    * Wait until all regions for a table in hbase:meta have a non-empty
2806    * info:server, or until timeout.  This means all regions have been deployed,
2807    * master has been informed and updated hbase:meta with the regions deployed
2808    * server.
2809    * @param tableName the table name
2810    * @param timeout timeout, in milliseconds
2811    * @throws IOException
2812    */
2813   public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
2814       throws IOException {
2815     final HTable meta = new HTable(getConfiguration(), TableName.META_TABLE_NAME);
2816     try {
2817       waitFor(timeout, 200, true, new Predicate<IOException>() {
2818         @Override
2819         public boolean evaluate() throws IOException {
2820           boolean allRegionsAssigned = true;
2821           Scan scan = new Scan();
2822           scan.addFamily(HConstants.CATALOG_FAMILY);
2823           ResultScanner s = meta.getScanner(scan);
2824           try {
2825             Result r;
2826             while ((r = s.next()) != null) {
2827               byte [] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
2828               HRegionInfo info = HRegionInfo.parseFromOrNull(b);
2829               if (info != null && info.getTable().equals(tableName)) {
2830                 b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
2831                 allRegionsAssigned &= (b != null);
2832               }
2833             }
2834           } finally {
2835             s.close();
2836           }
2837           return allRegionsAssigned;
2838         }
2839       });
2840     } finally {
2841       meta.close();
2842     }
2843   }
2844 
2845   /**
2846    * Do a small get/scan against one store. This is required because store
2847    * has no actual methods of querying itself, and relies on StoreScanner.
2848    */
2849   public static List<Cell> getFromStoreFile(HStore store,
2850                                                 Get get) throws IOException {
2851     Scan scan = new Scan(get);
2852     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
2853         scan.getFamilyMap().get(store.getFamily().getName()),
2854         // originally MultiVersionConsistencyControl.resetThreadReadPoint() was called to set
2855         // readpoint 0.
2856         0);
2857 
2858     List<Cell> result = new ArrayList<Cell>();
2859     scanner.next(result);
2860     if (!result.isEmpty()) {
2861       // verify that we are on the row we want:
2862       Cell kv = result.get(0);
2863       if (!CellUtil.matchingRow(kv, get.getRow())) {
2864         result.clear();
2865       }
2866     }
2867     scanner.close();
2868     return result;
2869   }
2870 
2871   /**
2872    * Create region split keys between startkey and endKey
2873    *
2874    * @param startKey
2875    * @param endKey
2876    * @param numRegions the number of regions to be created. it has to be greater than 3.
2877    * @return
2878    */
2879   public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions){
2880     assertTrue(numRegions>3);
2881     byte [][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2882     byte [][] result = new byte[tmpSplitKeys.length+1][];
2883     for (int i=0;i<tmpSplitKeys.length;i++) {
2884       result[i+1] = tmpSplitKeys[i];
2885     }
2886     result[0] = HConstants.EMPTY_BYTE_ARRAY;
2887     return result;
2888   }
2889 
2890   /**
2891    * Do a small get/scan against one store. This is required because store
2892    * has no actual methods of querying itself, and relies on StoreScanner.
2893    */
2894   public static List<Cell> getFromStoreFile(HStore store,
2895                                                 byte [] row,
2896                                                 NavigableSet<byte[]> columns
2897                                                 ) throws IOException {
2898     Get get = new Get(row);
2899     Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
2900     s.put(store.getFamily().getName(), columns);
2901 
2902     return getFromStoreFile(store,get);
2903   }
2904 
2905   /**
2906    * Gets a ZooKeeperWatcher.
2907    * @param TEST_UTIL
2908    */
2909   public static ZooKeeperWatcher getZooKeeperWatcher(
2910       HBaseTestingUtility TEST_UTIL) throws ZooKeeperConnectionException,
2911       IOException {
2912     ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
2913         "unittest", new Abortable() {
2914           boolean aborted = false;
2915 
2916           @Override
2917           public void abort(String why, Throwable e) {
2918             aborted = true;
2919             throw new RuntimeException("Fatal ZK error, why=" + why, e);
2920           }
2921 
2922           @Override
2923           public boolean isAborted() {
2924             return aborted;
2925           }
2926         });
2927     return zkw;
2928   }
2929 
2930   /**
2931    * Creates a znode with OPENED state.
2932    * @param TEST_UTIL
2933    * @param region
2934    * @param serverName
2935    * @return
2936    * @throws IOException
2937    * @throws org.apache.hadoop.hbase.ZooKeeperConnectionException
2938    * @throws KeeperException
2939    * @throws NodeExistsException
2940    */
2941   public static ZooKeeperWatcher createAndForceNodeToOpenedState(
2942       HBaseTestingUtility TEST_UTIL, HRegion region,
2943       ServerName serverName) throws ZooKeeperConnectionException,
2944       IOException, KeeperException, NodeExistsException {
2945     ZooKeeperWatcher zkw = getZooKeeperWatcher(TEST_UTIL);
2946     ZKAssign.createNodeOffline(zkw, region.getRegionInfo(), serverName);
2947     int version = ZKAssign.transitionNodeOpening(zkw, region
2948         .getRegionInfo(), serverName);
2949     ZKAssign.transitionNodeOpened(zkw, region.getRegionInfo(), serverName,
2950         version);
2951     return zkw;
2952   }
2953 
2954   public static void assertKVListsEqual(String additionalMsg,
2955       final List<? extends Cell> expected,
2956       final List<? extends Cell> actual) {
2957     final int eLen = expected.size();
2958     final int aLen = actual.size();
2959     final int minLen = Math.min(eLen, aLen);
2960 
2961     int i;
2962     for (i = 0; i < minLen
2963         && KeyValue.COMPARATOR.compare(expected.get(i), actual.get(i)) == 0;
2964         ++i) {}
2965 
2966     if (additionalMsg == null) {
2967       additionalMsg = "";
2968     }
2969     if (!additionalMsg.isEmpty()) {
2970       additionalMsg = ". " + additionalMsg;
2971     }
2972 
2973     if (eLen != aLen || i != minLen) {
2974       throw new AssertionError(
2975           "Expected and actual KV arrays differ at position " + i + ": " +
2976           safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
2977           safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
2978     }
2979   }
2980 
2981   private static <T> String safeGetAsStr(List<T> lst, int i) {
2982     if (0 <= i && i < lst.size()) {
2983       return lst.get(i).toString();
2984     } else {
2985       return "<out_of_range>";
2986     }
2987   }
2988 
2989   public String getClusterKey() {
2990     return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
2991         + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":"
2992         + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
2993             HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
2994   }
2995 
2996   /** Creates a random table with the given parameters */
2997   public HTable createRandomTable(String tableName,
2998       final Collection<String> families,
2999       final int maxVersions,
3000       final int numColsPerRow,
3001       final int numFlushes,
3002       final int numRegions,
3003       final int numRowsPerFlush)
3004       throws IOException, InterruptedException {
3005 
3006     LOG.info("\n\nCreating random table " + tableName + " with " + numRegions +
3007         " regions, " + numFlushes + " storefiles per region, " +
3008         numRowsPerFlush + " rows per flush, maxVersions=" +  maxVersions +
3009         "\n");
3010 
3011     final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3012     final int numCF = families.size();
3013     final byte[][] cfBytes = new byte[numCF][];
3014     {
3015       int cfIndex = 0;
3016       for (String cf : families) {
3017         cfBytes[cfIndex++] = Bytes.toBytes(cf);
3018       }
3019     }
3020 
3021     final int actualStartKey = 0;
3022     final int actualEndKey = Integer.MAX_VALUE;
3023     final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3024     final int splitStartKey = actualStartKey + keysPerRegion;
3025     final int splitEndKey = actualEndKey - keysPerRegion;
3026     final String keyFormat = "%08x";
3027     final HTable table = createTable(tableName, cfBytes,
3028         maxVersions,
3029         Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3030         Bytes.toBytes(String.format(keyFormat, splitEndKey)),
3031         numRegions);
3032 
3033     if (hbaseCluster != null) {
3034       getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3035     }
3036 
3037     for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3038       for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3039         final byte[] row = Bytes.toBytes(String.format(keyFormat,
3040             actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3041 
3042         Put put = new Put(row);
3043         Delete del = new Delete(row);
3044         for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3045           final byte[] cf = cfBytes[rand.nextInt(numCF)];
3046           final long ts = rand.nextInt();
3047           final byte[] qual = Bytes.toBytes("col" + iCol);
3048           if (rand.nextBoolean()) {
3049             final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
3050                 "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
3051                 ts + "_random_" + rand.nextLong());
3052             put.add(cf, qual, ts, value);
3053           } else if (rand.nextDouble() < 0.8) {
3054             del.deleteColumn(cf, qual, ts);
3055           } else {
3056             del.deleteColumns(cf, qual, ts);
3057           }
3058         }
3059 
3060         if (!put.isEmpty()) {
3061           table.put(put);
3062         }
3063 
3064         if (!del.isEmpty()) {
3065           table.delete(del);
3066         }
3067       }
3068       LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3069       table.flushCommits();
3070       if (hbaseCluster != null) {
3071         getMiniHBaseCluster().flushcache(table.getName());
3072       }
3073     }
3074 
3075     return table;
3076   }
3077 
3078   private static final int MIN_RANDOM_PORT = 0xc000;
3079   private static final int MAX_RANDOM_PORT = 0xfffe;
3080   private static Random random = new Random();
3081 
3082   /**
3083    * Returns a random port. These ports cannot be registered with IANA and are
3084    * intended for dynamic allocation (see http://bit.ly/dynports).
3085    */
3086   public static int randomPort() {
3087     return MIN_RANDOM_PORT
3088         + random.nextInt(MAX_RANDOM_PORT - MIN_RANDOM_PORT);
3089   }
3090 
3091   /**
3092    * Returns a random free port and marks that port as taken. Not thread-safe. Expected to be
3093    * called from single-threaded test setup code/
3094    */
3095   public static int randomFreePort() {
3096     int port = 0;
3097     do {
3098       port = randomPort();
3099       if (takenRandomPorts.contains(port)) {
3100         continue;
3101       }
3102       takenRandomPorts.add(port);
3103 
3104       try {
3105         ServerSocket sock = new ServerSocket(port);
3106         sock.close();
3107       } catch (IOException ex) {
3108         port = 0;
3109       }
3110     } while (port == 0);
3111     return port;
3112   }
3113 
3114 
3115   public static String randomMultiCastAddress() {
3116     return "226.1.1." + random.nextInt(254);
3117   }
3118 
3119 
3120 
3121   public static void waitForHostPort(String host, int port)
3122       throws IOException {
3123     final int maxTimeMs = 10000;
3124     final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3125     IOException savedException = null;
3126     LOG.info("Waiting for server at " + host + ":" + port);
3127     for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3128       try {
3129         Socket sock = new Socket(InetAddress.getByName(host), port);
3130         sock.close();
3131         savedException = null;
3132         LOG.info("Server at " + host + ":" + port + " is available");
3133         break;
3134       } catch (UnknownHostException e) {
3135         throw new IOException("Failed to look up " + host, e);
3136       } catch (IOException e) {
3137         savedException = e;
3138       }
3139       Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3140     }
3141 
3142     if (savedException != null) {
3143       throw savedException;
3144     }
3145   }
3146 
3147   /**
3148    * Creates a pre-split table for load testing. If the table already exists,
3149    * logs a warning and continues.
3150    * @return the number of regions the table was split into
3151    */
3152   public static int createPreSplitLoadTestTable(Configuration conf,
3153       TableName tableName, byte[] columnFamily, Algorithm compression,
3154       DataBlockEncoding dataBlockEncoding) throws IOException {
3155     HTableDescriptor desc = new HTableDescriptor(tableName);
3156     HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
3157     hcd.setDataBlockEncoding(dataBlockEncoding);
3158     hcd.setCompressionType(compression);
3159     return createPreSplitLoadTestTable(conf, desc, hcd);
3160   }
3161 
3162   /**
3163    * Creates a pre-split table for load testing. If the table already exists,
3164    * logs a warning and continues.
3165    * @return the number of regions the table was split into
3166    */
3167   public static int createPreSplitLoadTestTable(Configuration conf,
3168       HTableDescriptor desc, HColumnDescriptor hcd) throws IOException {
3169     if (!desc.hasFamily(hcd.getName())) {
3170       desc.addFamily(hcd);
3171     }
3172 
3173     int totalNumberOfRegions = 0;
3174     HBaseAdmin admin = new HBaseAdmin(conf);
3175     try {
3176       // create a table a pre-splits regions.
3177       // The number of splits is set as:
3178       //    region servers * regions per region server).
3179       int numberOfServers = admin.getClusterStatus().getServers().size();
3180       if (numberOfServers == 0) {
3181         throw new IllegalStateException("No live regionservers");
3182       }
3183 
3184       int regionsPerServer = conf.getInt(REGIONS_PER_SERVER_KEY, DEFAULT_REGIONS_PER_SERVER);
3185       totalNumberOfRegions = numberOfServers * regionsPerServer;
3186       LOG.info("Number of live regionservers: " + numberOfServers + ", " +
3187           "pre-splitting table into " + totalNumberOfRegions + " regions " +
3188           "(default regions per server: " + regionsPerServer + ")");
3189 
3190       byte[][] splits = new RegionSplitter.HexStringSplit().split(
3191           totalNumberOfRegions);
3192 
3193       admin.createTable(desc, splits);
3194     } catch (MasterNotRunningException e) {
3195       LOG.error("Master not running", e);
3196       throw new IOException(e);
3197     } catch (TableExistsException e) {
3198       LOG.warn("Table " + desc.getTableName() +
3199           " already exists, continuing");
3200     } finally {
3201       admin.close();
3202     }
3203     return totalNumberOfRegions;
3204   }
3205 
3206   public static int getMetaRSPort(Configuration conf) throws IOException {
3207     HTable table = new HTable(conf, TableName.META_TABLE_NAME);
3208     HRegionLocation hloc = table.getRegionLocation(Bytes.toBytes(""));
3209     table.close();
3210     return hloc.getPort();
3211   }
3212 
3213   /**
3214    *  Due to async racing issue, a region may not be in
3215    *  the online region list of a region server yet, after
3216    *  the assignment znode is deleted and the new assignment
3217    *  is recorded in master.
3218    */
3219   public void assertRegionOnServer(
3220       final HRegionInfo hri, final ServerName server,
3221       final long timeout) throws IOException, InterruptedException {
3222     long timeoutTime = System.currentTimeMillis() + timeout;
3223     while (true) {
3224       List<HRegionInfo> regions = getHBaseAdmin().getOnlineRegions(server);
3225       if (regions.contains(hri)) return;
3226       long now = System.currentTimeMillis();
3227       if (now > timeoutTime) break;
3228       Thread.sleep(10);
3229     }
3230     fail("Could not find region " + hri.getRegionNameAsString()
3231       + " on server " + server);
3232   }
3233 
3234   /**
3235    * Check to make sure the region is open on the specified
3236    * region server, but not on any other one.
3237    */
3238   public void assertRegionOnlyOnServer(
3239       final HRegionInfo hri, final ServerName server,
3240       final long timeout) throws IOException, InterruptedException {
3241     long timeoutTime = System.currentTimeMillis() + timeout;
3242     while (true) {
3243       List<HRegionInfo> regions = getHBaseAdmin().getOnlineRegions(server);
3244       if (regions.contains(hri)) {
3245         List<JVMClusterUtil.RegionServerThread> rsThreads =
3246           getHBaseCluster().getLiveRegionServerThreads();
3247         for (JVMClusterUtil.RegionServerThread rsThread: rsThreads) {
3248           HRegionServer rs = rsThread.getRegionServer();
3249           if (server.equals(rs.getServerName())) {
3250             continue;
3251           }
3252           Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3253           for (HRegion r: hrs) {
3254             assertTrue("Region should not be double assigned",
3255               r.getRegionId() != hri.getRegionId());
3256           }
3257         }
3258         return; // good, we are happy
3259       }
3260       long now = System.currentTimeMillis();
3261       if (now > timeoutTime) break;
3262       Thread.sleep(10);
3263     }
3264     fail("Could not find region " + hri.getRegionNameAsString()
3265       + " on server " + server);
3266   }
3267 
3268   public HRegion createTestRegion(String tableName, HColumnDescriptor hcd)
3269       throws IOException {
3270     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
3271     htd.addFamily(hcd);
3272     HRegionInfo info =
3273         new HRegionInfo(TableName.valueOf(tableName), null, null, false);
3274     HRegion region =
3275         HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), htd);
3276     return region;
3277   }
3278 
3279   public void setFileSystemURI(String fsURI) {
3280     FS_URI = fsURI;
3281   }
3282 
3283   /**
3284    * Wrapper method for {@link Waiter#waitFor(Configuration, long, Predicate)}.
3285    */
3286   public <E extends Exception> long waitFor(long timeout, Predicate<E> predicate)
3287       throws E {
3288     return Waiter.waitFor(this.conf, timeout, predicate);
3289   }
3290 
3291   /**
3292    * Wrapper method for {@link Waiter#waitFor(Configuration, long, long, Predicate)}.
3293    */
3294   public <E extends Exception> long waitFor(long timeout, long interval, Predicate<E> predicate)
3295       throws E {
3296     return Waiter.waitFor(this.conf, timeout, interval, predicate);
3297   }
3298 
3299   /**
3300    * Wrapper method for {@link Waiter#waitFor(Configuration, long, long, boolean, Predicate)}.
3301    */
3302   public <E extends Exception> long waitFor(long timeout, long interval,
3303       boolean failIfTimeout, Predicate<E> predicate) throws E {
3304     return Waiter.waitFor(this.conf, timeout, interval, failIfTimeout, predicate);
3305   }
3306 
3307   /**
3308    * Returns a {@link Predicate} for checking that there are no regions in transition in master
3309    */
3310   public Waiter.Predicate<Exception> predicateNoRegionsInTransition() {
3311     return new Waiter.Predicate<Exception>() {
3312       @Override
3313       public boolean evaluate() throws Exception {
3314         final RegionStates regionStates = getMiniHBaseCluster().getMaster()
3315             .getAssignmentManager().getRegionStates();
3316         return !regionStates.isRegionsInTransition();
3317       }
3318     };
3319   }
3320 
3321   /**
3322    * Returns a {@link Predicate} for checking that table is enabled
3323    */
3324   public Waiter.Predicate<Exception> predicateTableEnabled(final TableName tableName) {
3325     return new Waiter.Predicate<Exception>() {
3326      @Override
3327      public boolean evaluate() throws Exception {
3328        return getHBaseAdmin().isTableEnabled(tableName);
3329       }
3330     };
3331   }
3332 
3333   /**
3334    * Create a set of column descriptors with the combination of compression,
3335    * encoding, bloom codecs available.
3336    * @return the list of column descriptors
3337    */
3338   public static List<HColumnDescriptor> generateColumnDescriptors() {
3339     return generateColumnDescriptors("");
3340   }
3341 
3342   /**
3343    * Create a set of column descriptors with the combination of compression,
3344    * encoding, bloom codecs available.
3345    * @param prefix family names prefix
3346    * @return the list of column descriptors
3347    */
3348   public static List<HColumnDescriptor> generateColumnDescriptors(final String prefix) {
3349     List<HColumnDescriptor> htds = new ArrayList<HColumnDescriptor>();
3350     long familyId = 0;
3351     for (Compression.Algorithm compressionType: getSupportedCompressionAlgorithms()) {
3352       for (DataBlockEncoding encodingType: DataBlockEncoding.values()) {
3353         for (BloomType bloomType: BloomType.values()) {
3354           String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
3355           HColumnDescriptor htd = new HColumnDescriptor(name);
3356           htd.setCompressionType(compressionType);
3357           htd.setDataBlockEncoding(encodingType);
3358           htd.setBloomFilterType(bloomType);
3359           htds.add(htd);
3360           familyId++;
3361         }
3362       }
3363     }
3364     return htds;
3365   }
3366 
3367   /**
3368    * Get supported compression algorithms.
3369    * @return supported compression algorithms.
3370    */
3371   public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
3372     String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
3373     List<Compression.Algorithm> supportedAlgos = new ArrayList<Compression.Algorithm>();
3374     for (String algoName : allAlgos) {
3375       try {
3376         Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
3377         algo.getCompressor();
3378         supportedAlgos.add(algo);
3379       } catch (Throwable t) {
3380         // this algo is not available
3381       }
3382     }
3383     return supportedAlgos.toArray(new Compression.Algorithm[0]);
3384   }
3385 }