View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.test;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Random;
29  import java.util.Set;
30  import java.util.UUID;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  import org.apache.commons.cli.CommandLine;
34  import org.apache.commons.cli.GnuParser;
35  import org.apache.commons.cli.HelpFormatter;
36  import org.apache.commons.cli.Options;
37  import org.apache.commons.cli.ParseException;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.conf.Configured;
42  import org.apache.hadoop.fs.Path;
43  import org.apache.hadoop.hbase.HBaseConfiguration;
44  import org.apache.hadoop.hbase.HBaseTestingUtility;
45  import org.apache.hadoop.hbase.HColumnDescriptor;
46  import org.apache.hadoop.hbase.HRegionLocation;
47  import org.apache.hadoop.hbase.HTableDescriptor;
48  import org.apache.hadoop.hbase.IntegrationTestBase;
49  import org.apache.hadoop.hbase.IntegrationTestingUtility;
50  import org.apache.hadoop.hbase.IntegrationTests;
51  import org.apache.hadoop.hbase.MasterNotRunningException;
52  import org.apache.hadoop.hbase.TableName;
53  import org.apache.hadoop.hbase.client.Get;
54  import org.apache.hadoop.hbase.client.HBaseAdmin;
55  import org.apache.hadoop.hbase.client.HConnection;
56  import org.apache.hadoop.hbase.client.HConnectionManager;
57  import org.apache.hadoop.hbase.client.HTable;
58  import org.apache.hadoop.hbase.client.Put;
59  import org.apache.hadoop.hbase.client.Result;
60  import org.apache.hadoop.hbase.client.ResultScanner;
61  import org.apache.hadoop.hbase.client.Scan;
62  import org.apache.hadoop.hbase.client.ScannerCallable;
63  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
64  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
65  import org.apache.hadoop.hbase.mapreduce.TableMapper;
66  import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
67  import org.apache.hadoop.hbase.util.AbstractHBaseTool;
68  import org.apache.hadoop.hbase.util.Bytes;
69  import org.apache.hadoop.hbase.util.RegionSplitter;
70  import org.apache.hadoop.io.BytesWritable;
71  import org.apache.hadoop.io.NullWritable;
72  import org.apache.hadoop.io.Text;
73  import org.apache.hadoop.io.Writable;
74  import org.apache.hadoop.mapreduce.Counter;
75  import org.apache.hadoop.mapreduce.CounterGroup;
76  import org.apache.hadoop.mapreduce.Counters;
77  import org.apache.hadoop.mapreduce.InputFormat;
78  import org.apache.hadoop.mapreduce.InputSplit;
79  import org.apache.hadoop.mapreduce.Job;
80  import org.apache.hadoop.mapreduce.JobContext;
81  import org.apache.hadoop.mapreduce.Mapper;
82  import org.apache.hadoop.mapreduce.RecordReader;
83  import org.apache.hadoop.mapreduce.Reducer;
84  import org.apache.hadoop.mapreduce.TaskAttemptContext;
85  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
86  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
87  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
88  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
89  import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
90  import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
91  import org.apache.hadoop.util.Tool;
92  import org.apache.hadoop.util.ToolRunner;
93  import org.junit.Test;
94  import org.junit.experimental.categories.Category;
95  
96  import com.google.common.collect.Sets;
97  
98  /**
99   * This is an integration test borrowed from goraci, written by Keith Turner,
100  * which is in turn inspired by the Accumulo test called continous ingest (ci).
101  * The original source code can be found here:
102  * https://github.com/keith-turner/goraci
103  * https://github.com/enis/goraci/
104  *
105  * Apache Accumulo [0] has a simple test suite that verifies that data is not
106  * lost at scale. This test suite is called continuous ingest. This test runs
107  * many ingest clients that continually create linked lists containing 25
108  * million nodes. At some point the clients are stopped and a map reduce job is
109  * run to ensure no linked list has a hole. A hole indicates data was lost.··
110  *
111  * The nodes in the linked list are random. This causes each linked list to
112  * spread across the table. Therefore if one part of a table loses data, then it
113  * will be detected by references in another part of the table.
114  *
115  * THE ANATOMY OF THE TEST
116  *
117  * Below is rough sketch of how data is written. For specific details look at
118  * the Generator code.
119  *
120  * 1 Write out 1 million nodes· 2 Flush the client· 3 Write out 1 million that
121  * reference previous million· 4 If this is the 25th set of 1 million nodes,
122  * then update 1st set of million to point to last· 5 goto 1
123  *
124  * The key is that nodes only reference flushed nodes. Therefore a node should
125  * never reference a missing node, even if the ingest client is killed at any
126  * point in time.
127  *
128  * When running this test suite w/ Accumulo there is a script running in
129  * parallel called the Aggitator that randomly and continuously kills server
130  * processes.·· The outcome was that many data loss bugs were found in Accumulo
131  * by doing this.· This test suite can also help find bugs that impact uptime
132  * and stability when· run for days or weeks.··
133  *
134  * This test suite consists the following· - a few Java programs· - a little
135  * helper script to run the java programs - a maven script to build it.··
136  *
137  * When generating data, its best to have each map task generate a multiple of
138  * 25 million. The reason for this is that circular linked list are generated
139  * every 25M. Not generating a multiple in 25M will result in some nodes in the
140  * linked list not having references. The loss of an unreferenced node can not
141  * be detected.
142  *
143  *
144  * Below is a description of the Java programs
145  *
146  * Generator - A map only job that generates data. As stated previously,·
147  * its best to generate data in multiples of 25M.
148  *
149  * Verify - A map reduce job that looks for holes. Look at the counts after running. REFERENCED and
150  * UNREFERENCED are· ok, any UNDEFINED counts are bad. Do not run at the· same
151  * time as the Generator.
152  *
153  * Walker - A standalong program that start following a linked list· and emits timing info.··
154  *
155  * Print - A standalone program that prints nodes in the linked list
156  *
157  * Delete - A standalone program that deletes a single node
158  *
159  * This class can be run as a unit test, as an integration test, or from the command line
160  */
161 @Category(IntegrationTests.class)
162 public class IntegrationTestBigLinkedList extends IntegrationTestBase {
163   protected static final byte[] NO_KEY = new byte[1];
164 
165   protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
166 
167   protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
168 
169   protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
170 
171   //link to the id of the prev node in the linked list
172   protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
173 
174   //identifier of the mapred task that generated this row
175   protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
176 
177   //the id of the row within the same client.
178   protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
179 
180   /** How many rows to write per map task. This has to be a multiple of 25M */
181   private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
182     = "IntegrationTestBigLinkedList.generator.num_rows";
183 
184   private static final String GENERATOR_NUM_MAPPERS_KEY
185     = "IntegrationTestBigLinkedList.generator.map.tasks";
186 
187   private static final String GENERATOR_WIDTH_KEY
188     = "IntegrationTestBigLinkedList.generator.width";
189 
190   private static final String GENERATOR_WRAP_KEY
191     = "IntegrationTestBigLinkedList.generator.wrap";
192 
193   protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
194 
195   private static final int MISSING_ROWS_TO_LOG = 50;
196 
197   private static final int WIDTH_DEFAULT = 1000000;
198   private static final int WRAP_DEFAULT = 25;
199   private static final int ROWKEY_LENGTH = 16;
200 
201   protected String toRun;
202   protected String[] otherArgs;
203 
204   static class CINode {
205     byte[] key;
206     byte[] prev;
207     String client;
208     long count;
209   }
210 
211   /**
212    * A Map only job that generates random linked list and stores them.
213    */
214   static class Generator extends Configured implements Tool {
215 
216     private static final Log LOG = LogFactory.getLog(Generator.class);
217 
218     static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
219       static class GeneratorInputSplit extends InputSplit implements Writable {
220         @Override
221         public long getLength() throws IOException, InterruptedException {
222           return 1;
223         }
224         @Override
225         public String[] getLocations() throws IOException, InterruptedException {
226           return new String[0];
227         }
228         @Override
229         public void readFields(DataInput arg0) throws IOException {
230         }
231         @Override
232         public void write(DataOutput arg0) throws IOException {
233         }
234       }
235 
236       static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
237         private long count;
238         private long numNodes;
239         private Random rand;
240 
241         @Override
242         public void close() throws IOException {
243         }
244 
245         @Override
246         public BytesWritable getCurrentKey() throws IOException, InterruptedException {
247           byte[] bytes = new byte[ROWKEY_LENGTH];
248           rand.nextBytes(bytes);
249           return new BytesWritable(bytes);
250         }
251 
252         @Override
253         public NullWritable getCurrentValue() throws IOException, InterruptedException {
254           return NullWritable.get();
255         }
256 
257         @Override
258         public float getProgress() throws IOException, InterruptedException {
259           return (float)(count / (double)numNodes);
260         }
261 
262         @Override
263         public void initialize(InputSplit arg0, TaskAttemptContext context)
264             throws IOException, InterruptedException {
265           numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
266           rand = new Random();
267         }
268 
269         @Override
270         public boolean nextKeyValue() throws IOException, InterruptedException {
271           return count++ < numNodes;
272         }
273 
274       }
275 
276       @Override
277       public RecordReader<BytesWritable,NullWritable> createRecordReader(
278           InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
279         GeneratorRecordReader rr = new GeneratorRecordReader();
280         rr.initialize(split, context);
281         return rr;
282       }
283 
284       @Override
285       public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
286         int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
287 
288         ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numMappers);
289 
290         for (int i = 0; i < numMappers; i++) {
291           splits.add(new GeneratorInputSplit());
292         }
293 
294         return splits;
295       }
296     }
297 
298     /** Ensure output files from prev-job go to map inputs for current job */
299     static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
300       @Override
301       protected boolean isSplitable(JobContext context, Path filename) {
302         return false;
303       }
304     }
305 
306     /**
307      * Some ASCII art time:
308      * [ . . . ] represents one batch of random longs of length WIDTH
309      *
310      *                _________________________
311      *               |                  ______ |
312      *               |                 |      ||
313      *             __+_________________+_____ ||
314      *             v v                 v     |||
315      * first   = [ . . . . . . . . . . . ]   |||
316      *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
317      *             | | | | | | | | | | |     |||
318      * prev    = [ . . . . . . . . . . . ]   |||
319      *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
320      *             | | | | | | | | | | |     |||
321      * current = [ . . . . . . . . . . . ]   |||
322      *                                       |||
323      * ...                                   |||
324      *                                       |||
325      * last    = [ . . . . . . . . . . . ]   |||
326      *             | | | | | | | | | | |-----|||
327      *             |                 |--------||
328      *             |___________________________|
329      */
330     static class GeneratorMapper
331       extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
332 
333       byte[][] first = null;
334       byte[][] prev = null;
335       byte[][] current = null;
336       byte[] id;
337       long count = 0;
338       int i;
339       HTable table;
340       long numNodes;
341       long wrap;
342       int width;
343 
344       @Override
345       protected void setup(Context context) throws IOException, InterruptedException {
346         id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID());
347         Configuration conf = context.getConfiguration();
348         instantiateHTable(conf);
349         this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
350         current = new byte[this.width][];
351         int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
352         this.wrap = (long)wrapMultiplier * width;
353         this.numNodes = context.getConfiguration().getLong(
354             GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT);
355         if (this.numNodes < this.wrap) {
356           this.wrap = this.numNodes;
357         }
358       }
359 
360       protected void instantiateHTable(Configuration conf) throws IOException {
361         table = new HTable(conf, getTableName(conf));
362         table.setAutoFlush(false, true);
363         table.setWriteBufferSize(4 * 1024 * 1024);
364       }
365 
366       @Override
367       protected void cleanup(Context context) throws IOException ,InterruptedException {
368         table.close();
369       }
370 
371       @Override
372       protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
373         current[i] = new byte[key.getLength()];
374         System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
375         if (++i == current.length) {
376           persist(output, count, prev, current, id);
377           i = 0;
378 
379           if (first == null)
380             first = current;
381           prev = current;
382           current = new byte[this.width][];
383 
384           count += current.length;
385           output.setStatus("Count " + count);
386 
387           if (count % wrap == 0) {
388             // this block of code turns the 1 million linked list of length 25 into one giant
389             //circular linked list of 25 million
390             circularLeftShift(first);
391 
392             persist(output, -1, prev, first, null);
393 
394             first = null;
395             prev = null;
396           }
397         }
398       }
399 
400       private static <T> void circularLeftShift(T[] first) {
401         T ez = first[0];
402         for (int i = 0; i < first.length - 1; i++)
403           first[i] = first[i + 1];
404         first[first.length - 1] = ez;
405       }
406 
407       protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
408           throws IOException {
409         for (int i = 0; i < current.length; i++) {
410           Put put = new Put(current[i]);
411           put.add(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
412 
413           if (count >= 0) {
414             put.add(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
415           }
416           if (id != null) {
417             put.add(FAMILY_NAME, COLUMN_CLIENT, id);
418           }
419           table.put(put);
420 
421           if (i % 1000 == 0) {
422             // Tickle progress every so often else maprunner will think us hung
423             output.progress();
424           }
425         }
426 
427         table.flushCommits();
428       }
429     }
430 
431     @Override
432     public int run(String[] args) throws Exception {
433       if (args.length < 3) {
434         System.out.println("Usage : " + Generator.class.getSimpleName() +
435             " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>]");
436         System.out.println("   where <num nodes per map> should be a multiple of " +
437             " width*wrap multiplier, 25M by default");
438         return 0;
439       }
440 
441       int numMappers = Integer.parseInt(args[0]);
442       long numNodes = Long.parseLong(args[1]);
443       Path tmpOutput = new Path(args[2]);
444       Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
445       Integer wrapMuplitplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
446       return run(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
447     }
448 
449     protected void createSchema() throws IOException {
450       Configuration conf = getConf();
451       HBaseAdmin admin = new HBaseAdmin(conf);
452       TableName tableName = getTableName(conf);
453       try {
454         if (!admin.tableExists(tableName)) {
455           HTableDescriptor htd = new HTableDescriptor(getTableName(getConf()));
456           htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
457           int numberOfServers = admin.getClusterStatus().getServers().size();
458           if (numberOfServers == 0) {
459             throw new IllegalStateException("No live regionservers");
460           }
461           int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY,
462                                 HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
463           int totalNumberOfRegions = numberOfServers * regionsPerServer;
464           LOG.info("Number of live regionservers: " + numberOfServers + ", " +
465               "pre-splitting table into " + totalNumberOfRegions + " regions " +
466               "(default regions per server: " + regionsPerServer + ")");
467 
468           byte[][] splits = new RegionSplitter.UniformSplit().split(
469               totalNumberOfRegions);
470 
471           admin.createTable(htd, splits);
472         }
473       } catch (MasterNotRunningException e) {
474         LOG.error("Master not running", e);
475         throw new IOException(e);
476       } finally {
477         admin.close();
478       }
479     }
480 
481     public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
482         Integer width, Integer wrapMuplitplier) throws Exception {
483       LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
484           + ", numNodes=" + numNodes);
485       Job job = new Job(getConf());
486 
487       job.setJobName("Random Input Generator");
488       job.setNumReduceTasks(0);
489       job.setJarByClass(getClass());
490 
491       job.setInputFormatClass(GeneratorInputFormat.class);
492       job.setOutputKeyClass(BytesWritable.class);
493       job.setOutputValueClass(NullWritable.class);
494 
495       setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
496 
497       job.setMapperClass(Mapper.class); //identity mapper
498 
499       FileOutputFormat.setOutputPath(job, tmpOutput);
500       job.setOutputFormatClass(SequenceFileOutputFormat.class);
501 
502       boolean success = jobCompletion(job);
503 
504       return success ? 0 : 1;
505     }
506 
507     public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
508         Integer width, Integer wrapMuplitplier) throws Exception {
509       LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
510       createSchema();
511       Job job = new Job(getConf());
512 
513       job.setJobName("Link Generator");
514       job.setNumReduceTasks(0);
515       job.setJarByClass(getClass());
516 
517       FileInputFormat.setInputPaths(job, tmpOutput);
518       job.setInputFormatClass(OneFilePerMapperSFIF.class);
519       job.setOutputKeyClass(NullWritable.class);
520       job.setOutputValueClass(NullWritable.class);
521 
522       setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
523 
524       setMapperForGenerator(job);
525 
526       job.setOutputFormatClass(NullOutputFormat.class);
527 
528       job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
529       TableMapReduceUtil.addDependencyJars(job);
530       TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
531       TableMapReduceUtil.initCredentials(job);
532 
533       boolean success = jobCompletion(job);
534 
535       return success ? 0 : 1;
536     }
537 
538     protected boolean jobCompletion(Job job) throws IOException, InterruptedException,
539         ClassNotFoundException {
540       boolean success = job.waitForCompletion(true);
541       return success;
542     }
543 
544     protected void setMapperForGenerator(Job job) {
545       job.setMapperClass(GeneratorMapper.class);
546     }
547 
548     public int run(int numMappers, long numNodes, Path tmpOutput,
549         Integer width, Integer wrapMuplitplier) throws Exception {
550       int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
551       if (ret > 0) {
552         return ret;
553       }
554       return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
555     }
556   }
557 
558   /**
559    * A Map Reduce job that verifies that the linked lists generated by
560    * {@link Generator} do not have any holes.
561    */
562   static class Verify extends Configured implements Tool {
563 
564     private static final Log LOG = LogFactory.getLog(Verify.class);
565     protected static final BytesWritable DEF = new BytesWritable(NO_KEY);
566 
567     protected Job job;
568 
569     public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
570       private BytesWritable row = new BytesWritable();
571       private BytesWritable ref = new BytesWritable();
572 
573       @Override
574       protected void map(ImmutableBytesWritable key, Result value, Context context)
575           throws IOException ,InterruptedException {
576         byte[] rowKey = key.get();
577         row.set(rowKey, 0, rowKey.length);
578         context.write(row, DEF);
579         byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
580         if (prev != null && prev.length > 0) {
581           ref.set(prev, 0, prev.length);
582           context.write(ref, row);
583         } else {
584           LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
585         }
586       }
587     }
588 
589     public static enum Counts {
590       UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES
591     }
592 
593     public static class VerifyReducer extends Reducer<BytesWritable,BytesWritable,Text,Text> {
594       private ArrayList<byte[]> refs = new ArrayList<byte[]>();
595 
596       private AtomicInteger rows = new AtomicInteger(0);
597 
598       @Override
599       public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
600           throws IOException, InterruptedException {
601 
602         int defCount = 0;
603 
604         refs.clear();
605         for (BytesWritable type : values) {
606           if (type.getLength() == DEF.getLength()) {
607             defCount++;
608           } else {
609             byte[] bytes = new byte[type.getLength()];
610             System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
611             refs.add(bytes);
612           }
613         }
614 
615         // TODO check for more than one def, should not happen
616 
617         StringBuilder refsSb = null;
618         String keyString = null;
619         if (defCount == 0 || refs.size() != 1) {
620           refsSb = new StringBuilder();
621           String comma = "";
622           for (byte[] ref : refs) {
623             refsSb.append(comma);
624             comma = ",";
625             refsSb.append(Bytes.toStringBinary(ref));
626           }
627           keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
628 
629           LOG.error("Linked List error: Key = " + keyString + " References = " + refsSb.toString());
630         }
631 
632         if (defCount == 0 && refs.size() > 0) {
633           // this is bad, found a node that is referenced but not defined. It must have been
634           // lost, emit some info about this node for debugging purposes.
635           context.write(new Text(keyString), new Text(refsSb.toString()));
636           context.getCounter(Counts.UNDEFINED).increment(1);
637           if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
638             context.getCounter("undef", keyString).increment(1);
639           }
640         } else if (defCount > 0 && refs.size() == 0) {
641           // node is defined but not referenced
642           context.write(new Text(keyString), new Text("none"));
643           context.getCounter(Counts.UNREFERENCED).increment(1);
644           if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
645             context.getCounter("unref", keyString).increment(1);
646           }
647         } else {
648           if (refs.size() > 1) {
649             if (refsSb != null) {
650               context.write(new Text(keyString), new Text(refsSb.toString()));
651             }
652             context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
653           }
654           // node is defined and referenced
655           context.getCounter(Counts.REFERENCED).increment(1);
656         }
657 
658       }
659     }
660 
661     @Override
662     public int run(String[] args) throws Exception {
663 
664       if (args.length != 2) {
665         System.out.println("Usage : " + Verify.class.getSimpleName() + " <output dir> <num reducers>");
666         return 0;
667       }
668 
669       String outputDir = args[0];
670       int numReducers = Integer.parseInt(args[1]);
671 
672        return run(outputDir, numReducers);
673     }
674 
675     public int run(String outputDir, int numReducers) throws Exception {
676       return run(new Path(outputDir), numReducers);
677     }
678 
679     public int run(Path outputDir, int numReducers) throws Exception {
680       LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers);
681 
682       job = new Job(getConf());
683 
684       job.setJobName("Link Verifier");
685       job.setNumReduceTasks(numReducers);
686       job.setJarByClass(getClass());
687 
688       setJobScannerConf(job);
689 
690       Scan scan = new Scan();
691       scan.addColumn(FAMILY_NAME, COLUMN_PREV);
692       scan.setCaching(10000);
693       scan.setCacheBlocks(false);
694 
695       TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
696           VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
697       TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
698 
699       job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
700 
701       job.setReducerClass(VerifyReducer.class);
702       job.setOutputFormatClass(TextOutputFormat.class);
703       TextOutputFormat.setOutputPath(job, outputDir);
704 
705       boolean success = job.waitForCompletion(true);
706 
707       return success ? 0 : 1;
708     }
709 
710     @SuppressWarnings("deprecation")
711     public boolean verify(long expectedReferenced) throws Exception {
712       if (job == null) {
713         throw new IllegalStateException("You should call run() first");
714       }
715 
716       Counters counters = job.getCounters();
717 
718       Counter referenced = counters.findCounter(Counts.REFERENCED);
719       Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
720       Counter undefined = counters.findCounter(Counts.UNDEFINED);
721       Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
722 
723       boolean success = true;
724       //assert
725       if (expectedReferenced != referenced.getValue()) {
726         LOG.error("Expected referenced count does not match with actual referenced count. " +
727             "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
728         success = false;
729       }
730 
731       if (unreferenced.getValue() > 0) {
732         boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
733         LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
734             + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
735         success = false;
736       }
737 
738       if (undefined.getValue() > 0) {
739         LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
740         success = false;
741       }
742 
743       if (!success) {
744         handleFailure(counters);
745       }
746       return success;
747     }
748 
749     protected void handleFailure(Counters counters) throws IOException {
750       Configuration conf = job.getConfiguration();
751       HConnection conn = HConnectionManager.getConnection(conf);
752       TableName tableName = getTableName(conf);
753       CounterGroup g = counters.getGroup("undef");
754       Iterator<Counter> it = g.iterator();
755       while (it.hasNext()) {
756         String keyString = it.next().getName();
757         byte[] key = Bytes.toBytes(keyString);
758         HRegionLocation loc = conn.relocateRegion(tableName, key);
759         LOG.error("undefined row " + keyString + ", " + loc);
760       }
761       g = counters.getGroup("unref");
762       it = g.iterator();
763       while (it.hasNext()) {
764         String keyString = it.next().getName();
765         byte[] key = Bytes.toBytes(keyString);
766         HRegionLocation loc = conn.relocateRegion(tableName, key);
767         LOG.error("unreferred row " + keyString + ", " + loc);
768       }
769     }
770   }
771 
772   /**
773    * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration
774    * adds more data.
775    */
776   static class Loop extends Configured implements Tool {
777 
778     private static final Log LOG = LogFactory.getLog(Loop.class);
779 
780     IntegrationTestBigLinkedList it;
781 
782     protected void runGenerator(int numMappers, long numNodes,
783         String outputDir, Integer width, Integer wrapMuplitplier) throws Exception {
784       Path outputPath = new Path(outputDir);
785       UUID uuid = UUID.randomUUID(); //create a random UUID.
786       Path generatorOutput = new Path(outputPath, uuid.toString());
787 
788       Generator generator = new Generator();
789       generator.setConf(getConf());
790       int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier);
791       if (retCode > 0) {
792         throw new RuntimeException("Generator failed with return code: " + retCode);
793       }
794     }
795 
796     protected void runVerify(String outputDir,
797         int numReducers, long expectedNumNodes) throws Exception {
798       Path outputPath = new Path(outputDir);
799       UUID uuid = UUID.randomUUID(); //create a random UUID.
800       Path iterationOutput = new Path(outputPath, uuid.toString());
801 
802       Verify verify = new Verify();
803       verify.setConf(getConf());
804       int retCode = verify.run(iterationOutput, numReducers);
805       if (retCode > 0) {
806         throw new RuntimeException("Verify.run failed with return code: " + retCode);
807       }
808 
809       if (!verify.verify(expectedNumNodes)) {
810         throw new RuntimeException("Verify.verify failed");
811       }
812 
813       LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
814     }
815 
816     @Override
817     public int run(String[] args) throws Exception {
818       if (args.length < 5) {
819         System.err.println("Usage: Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>]");
820         return 1;
821       }
822       LOG.info("Running Loop with args:" + Arrays.deepToString(args));
823 
824       int numIterations = Integer.parseInt(args[0]);
825       int numMappers = Integer.parseInt(args[1]);
826       long numNodes = Long.parseLong(args[2]);
827       String outputDir = args[3];
828       int numReducers = Integer.parseInt(args[4]);
829       Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
830       Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
831 
832       long expectedNumNodes = 0;
833 
834       if (numIterations < 0) {
835         numIterations = Integer.MAX_VALUE; //run indefinitely (kind of)
836       }
837 
838       for (int i = 0; i < numIterations; i++) {
839         LOG.info("Starting iteration = " + i);
840         runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier);
841         expectedNumNodes += numMappers * numNodes;
842 
843         runVerify(outputDir, numReducers, expectedNumNodes);
844       }
845 
846       return 0;
847     }
848   }
849 
850   /**
851    * A stand alone program that prints out portions of a list created by {@link Generator}
852    */
853   private static class Print extends Configured implements Tool {
854     @Override
855     public int run(String[] args) throws Exception {
856       Options options = new Options();
857       options.addOption("s", "start", true, "start key");
858       options.addOption("e", "end", true, "end key");
859       options.addOption("l", "limit", true, "number to print");
860 
861       GnuParser parser = new GnuParser();
862       CommandLine cmd = null;
863       try {
864         cmd = parser.parse(options, args);
865         if (cmd.getArgs().length != 0) {
866           throw new ParseException("Command takes no arguments");
867         }
868       } catch (ParseException e) {
869         System.err.println("Failed to parse command line " + e.getMessage());
870         System.err.println();
871         HelpFormatter formatter = new HelpFormatter();
872         formatter.printHelp(getClass().getSimpleName(), options);
873         System.exit(-1);
874       }
875 
876       HTable table = new HTable(getConf(), getTableName(getConf()));
877 
878       Scan scan = new Scan();
879       scan.setBatch(10000);
880 
881       if (cmd.hasOption("s"))
882         scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
883 
884       if (cmd.hasOption("e"))
885         scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
886 
887       int limit = 0;
888       if (cmd.hasOption("l"))
889         limit = Integer.parseInt(cmd.getOptionValue("l"));
890       else
891         limit = 100;
892 
893       ResultScanner scanner = table.getScanner(scan);
894 
895       CINode node = new CINode();
896       Result result = scanner.next();
897       int count = 0;
898       while (result != null && count++ < limit) {
899         node = getCINode(result, node);
900         System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
901             Bytes.toStringBinary(node.prev), node.count, node.client);
902         result = scanner.next();
903       }
904       scanner.close();
905       table.close();
906 
907       return 0;
908     }
909   }
910 
911   /**
912    * A stand alone program that deletes a single node.
913    */
914   private static class Delete extends Configured implements Tool {
915     @Override
916     public int run(String[] args) throws Exception {
917       if (args.length != 1) {
918         System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
919         return 0;
920       }
921       byte[] val = Bytes.toBytesBinary(args[0]);
922 
923       org.apache.hadoop.hbase.client.Delete delete
924         = new org.apache.hadoop.hbase.client.Delete(val);
925 
926       HTable table = new HTable(getConf(), getTableName(getConf()));
927 
928       table.delete(delete);
929       table.flushCommits();
930       table.close();
931 
932       System.out.println("Delete successful");
933       return 0;
934     }
935   }
936 
937   /**
938    * A stand alone program that follows a linked list created by {@link Generator} and prints timing info.
939    */
940   private static class Walker extends Configured implements Tool {
941     @Override
942     public int run(String[] args) throws IOException {
943       Options options = new Options();
944       options.addOption("n", "num", true, "number of queries");
945       options.addOption("s", "start", true, "key to start at, binary string");
946       options.addOption("l", "logevery", true, "log every N queries");
947 
948       GnuParser parser = new GnuParser();
949       CommandLine cmd = null;
950       try {
951         cmd = parser.parse(options, args);
952         if (cmd.getArgs().length != 0) {
953           throw new ParseException("Command takes no arguments");
954         }
955       } catch (ParseException e) {
956         System.err.println("Failed to parse command line " + e.getMessage());
957         System.err.println();
958         HelpFormatter formatter = new HelpFormatter();
959         formatter.printHelp(getClass().getSimpleName(), options);
960         System.exit(-1);
961       }
962 
963       long maxQueries = Long.MAX_VALUE;
964       if (cmd.hasOption('n')) {
965         maxQueries = Long.parseLong(cmd.getOptionValue("n"));
966       }
967       Random rand = new Random();
968       boolean isSpecificStart = cmd.hasOption('s');
969       byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
970       int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
971 
972       HTable table = new HTable(getConf(), getTableName(getConf()));
973       long numQueries = 0;
974       // If isSpecificStart is set, only walk one list from that particular node.
975       // Note that in case of circular (or P-shaped) list it will walk forever, as is
976       // the case in normal run without startKey.
977       while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
978         if (!isSpecificStart) {
979           startKey = new byte[ROWKEY_LENGTH];
980           rand.nextBytes(startKey);
981         }
982         CINode node = findStartNode(table, startKey);
983         if (node == null && isSpecificStart) {
984           System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
985         }
986         numQueries++;
987         while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) {
988           byte[] prev = node.prev;
989           long t1 = System.currentTimeMillis();
990           node = getNode(prev, table, node);
991           long t2 = System.currentTimeMillis();
992           if (numQueries % logEvery == 0) {
993             System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
994           }
995           numQueries++;
996           if (node == null) {
997             System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
998           } else if (node.prev.length == NO_KEY.length) {
999             System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
1000           }
1001         }
1002       }
1003 
1004       table.close();
1005       return 0;
1006     }
1007 
1008     private static CINode findStartNode(HTable table, byte[] startKey) throws IOException {
1009       Scan scan = new Scan();
1010       scan.setStartRow(startKey);
1011       scan.setBatch(1);
1012       scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1013 
1014       long t1 = System.currentTimeMillis();
1015       ResultScanner scanner = table.getScanner(scan);
1016       Result result = scanner.next();
1017       long t2 = System.currentTimeMillis();
1018       scanner.close();
1019 
1020       if ( result != null) {
1021         CINode node = getCINode(result, new CINode());
1022         System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
1023         return node;
1024       }
1025 
1026       System.out.println("FSR " + (t2 - t1));
1027 
1028       return null;
1029     }
1030 
1031     private CINode getNode(byte[] row, HTable table, CINode node) throws IOException {
1032       Get get = new Get(row);
1033       get.addColumn(FAMILY_NAME, COLUMN_PREV);
1034       Result result = table.get(get);
1035       return getCINode(result, node);
1036     }
1037   }
1038 
1039   static TableName getTableName(Configuration conf) {
1040     return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1041   }
1042 
1043   private static CINode getCINode(Result result, CINode node) {
1044     node.key = Bytes.copy(result.getRow());
1045     if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
1046       node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
1047     } else {
1048       node.prev = NO_KEY;
1049     }
1050     if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
1051       node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
1052     } else {
1053       node.count = -1;
1054     }
1055     if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) {
1056       node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT));
1057     } else {
1058       node.client = "";
1059     }
1060     return node;
1061   }
1062 
1063   protected IntegrationTestingUtility util;
1064 
1065   @Override
1066   public void setUpCluster() throws Exception {
1067     util = getTestingUtil(getConf());
1068     boolean isDistributed = util.isDistributedCluster();
1069     util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE);
1070     if (!isDistributed) {
1071       util.startMiniMapReduceCluster();
1072     }
1073     this.setConf(util.getConfiguration());
1074   }
1075 
1076   @Override
1077   public void cleanUpCluster() throws Exception {
1078     super.cleanUpCluster();
1079     if (util.isDistributedCluster()) {
1080       util.shutdownMiniMapReduceCluster();
1081     }
1082   }
1083 
1084   @Test
1085   public void testContinuousIngest() throws IOException, Exception {
1086     //Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers>
1087     int ret = ToolRunner.run(getTestingUtil(getConf()).getConfiguration(), new Loop(),
1088         new String[] {"1", "1", "2000000",
1089                      util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1"});
1090     org.junit.Assert.assertEquals(0, ret);
1091   }
1092 
1093   private void usage() {
1094     System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
1095     System.err.println("  where COMMAND is one of:");
1096     System.err.println("");
1097     System.err.println("  Generator                  A map only job that generates data.");
1098     System.err.println("  Verify                     A map reduce job that looks for holes");
1099     System.err.println("                             Look at the counts after running");
1100     System.err.println("                             REFERENCED and UNREFERENCED are ok");
1101     System.err.println("                             any UNDEFINED counts are bad. Do not");
1102     System.err.println("                             run at the same time as the Generator.");
1103     System.err.println("  Walker                     A standalong program that starts ");
1104     System.err.println("                             following a linked list and emits");
1105     System.err.println("                             timing info.");
1106     System.err.println("  Print                      A standalone program that prints nodes");
1107     System.err.println("                             in the linked list.");
1108     System.err.println("  Delete                     A standalone program that deletes a·");
1109     System.err.println("                             single node.");
1110     System.err.println("  Loop                       A program to Loop through Generator and");
1111     System.err.println("                             Verify steps");
1112     System.err.println("\t  ");
1113     System.err.flush();
1114   }
1115 
1116   @Override
1117   protected void processOptions(CommandLine cmd) {
1118     super.processOptions(cmd);
1119     String[] args = cmd.getArgs();
1120     //get the class, run with the conf
1121     if (args.length < 1) {
1122       printUsage();
1123       throw new RuntimeException("Incorrect Number of args.");
1124     }
1125     toRun = args[0];
1126     otherArgs = Arrays.copyOfRange(args, 1, args.length);
1127   }
1128 
1129   @Override
1130   public int runTestFromCommandLine() throws Exception {
1131 
1132     Tool tool = null;
1133     if (toRun.equals("Generator")) {
1134       tool = new Generator();
1135     } else if (toRun.equals("Verify")) {
1136       tool = new Verify();
1137     } else if (toRun.equals("Loop")) {
1138       Loop loop = new Loop();
1139       loop.it = this;
1140       tool = loop;
1141     } else if (toRun.equals("Walker")) {
1142       tool = new Walker();
1143     } else if (toRun.equals("Print")) {
1144       tool = new Print();
1145     } else if (toRun.equals("Delete")) {
1146       tool = new Delete();
1147     } else {
1148       usage();
1149       throw new RuntimeException("Unknown arg");
1150     }
1151 
1152     return ToolRunner.run(getConf(), tool, otherArgs);
1153   }
1154 
1155   @Override
1156   public String getTablename() {
1157     Configuration c = getConf();
1158     return c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME);
1159   }
1160 
1161   @Override
1162   protected Set<String> getColumnFamilies() {
1163     return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
1164   }
1165 
1166   private static void setJobConf(Job job, int numMappers, long numNodes,
1167       Integer width, Integer wrapMultiplier) {
1168     job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
1169     job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
1170     if (width != null) {
1171       job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
1172     }
1173     if (wrapMultiplier != null) {
1174       job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
1175     }
1176   }
1177 
1178   public static void setJobScannerConf(Job job) {
1179     // Make sure scanners log something useful to make debugging possible.
1180     job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
1181     job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
1182   }
1183 
1184   public static void main(String[] args) throws Exception {
1185     Configuration conf = HBaseConfiguration.create();
1186     IntegrationTestingUtility.setUseDistributedCluster(conf);
1187     int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
1188     System.exit(ret);
1189   }
1190 }