View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertEquals;
22  
23  import com.google.common.base.Joiner;
24  
25  import com.google.common.collect.Sets;
26  import org.apache.commons.cli.CommandLine;
27  import org.apache.commons.lang.RandomStringUtils;
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.HBaseConfiguration;
35  import org.apache.hadoop.hbase.HBaseTestingUtility;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.IntegrationTestBase;
38  import org.apache.hadoop.hbase.IntegrationTestingUtility;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.client.Admin;
42  import org.apache.hadoop.hbase.client.Connection;
43  import org.apache.hadoop.hbase.client.ConnectionFactory;
44  import org.apache.hadoop.hbase.client.Consistency;
45  import org.apache.hadoop.hbase.client.RegionLocator;
46  import org.apache.hadoop.hbase.client.Result;
47  import org.apache.hadoop.hbase.client.Scan;
48  import org.apache.hadoop.hbase.client.Table;
49  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
50  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
51  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
52  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
53  import org.apache.hadoop.hbase.regionserver.InternalScanner;
54  import org.apache.hadoop.hbase.regionserver.RegionScanner;
55  import org.apache.hadoop.hbase.testclassification.IntegrationTests;
56  import org.apache.hadoop.hbase.util.Bytes;
57  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
58  import org.apache.hadoop.hbase.util.RegionSplitter;
59  import org.apache.hadoop.io.LongWritable;
60  import org.apache.hadoop.io.NullWritable;
61  import org.apache.hadoop.io.Writable;
62  import org.apache.hadoop.io.WritableComparable;
63  import org.apache.hadoop.io.WritableComparator;
64  import org.apache.hadoop.io.WritableUtils;
65  import org.apache.hadoop.mapreduce.InputFormat;
66  import org.apache.hadoop.mapreduce.InputSplit;
67  import org.apache.hadoop.mapreduce.Job;
68  import org.apache.hadoop.mapreduce.JobContext;
69  import org.apache.hadoop.mapreduce.Mapper;
70  import org.apache.hadoop.mapreduce.Partitioner;
71  import org.apache.hadoop.mapreduce.RecordReader;
72  import org.apache.hadoop.mapreduce.Reducer;
73  import org.apache.hadoop.mapreduce.TaskAttemptContext;
74  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
75  import org.apache.hadoop.util.ToolRunner;
76  import org.junit.Test;
77  import org.junit.experimental.categories.Category;
78  
79  import java.io.DataInput;
80  import java.io.DataOutput;
81  import java.io.IOException;
82  import java.util.ArrayList;
83  import java.util.HashSet;
84  import java.util.List;
85  import java.util.Map;
86  import java.util.Random;
87  import java.util.Set;
88  import java.util.concurrent.atomic.AtomicLong;
89  
90  /**
91   * Test Bulk Load and MR on a distributed cluster.
92   * It starts an MR job that creates linked chains
93   *
94   * The format of rows is like this:
95   * Row Key -> Long
96   *
97   * L:<< Chain Id >> -> Row Key of the next link in the chain
98   * S:<< Chain Id >> -> The step in the chain that his link is.
99   * D:<< Chain Id >> -> Random Data.
100  *
101  * All chains start on row 0.
102  * All rk's are > 0.
103  *
104  * After creating the linked lists they are walked over using a TableMapper based Mapreduce Job.
105  *
106  * There are a few options exposed:
107  *
108  * hbase.IntegrationTestBulkLoad.chainLength
109  * The number of rows that will be part of each and every chain.
110  *
111  * hbase.IntegrationTestBulkLoad.numMaps
112  * The number of mappers that will be run.  Each mapper creates on linked list chain.
113  *
114  * hbase.IntegrationTestBulkLoad.numImportRounds
115  * How many jobs will be run to create linked lists.
116  *
117  * hbase.IntegrationTestBulkLoad.tableName
118  * The name of the table.
119  *
120  * hbase.IntegrationTestBulkLoad.replicaCount
121  * How many region replicas to configure for the table under test.
122  */
123 @Category(IntegrationTests.class)
124 public class IntegrationTestBulkLoad extends IntegrationTestBase {
125 
126   private static final Log LOG = LogFactory.getLog(IntegrationTestBulkLoad.class);
127 
128   private static final byte[] CHAIN_FAM = Bytes.toBytes("L");
129   private static final byte[] SORT_FAM  = Bytes.toBytes("S");
130   private static final byte[] DATA_FAM  = Bytes.toBytes("D");
131 
132   private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength";
133   private static int CHAIN_LENGTH = 500000;
134 
135   private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
136   private static int NUM_MAPS = 1;
137 
138   private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
139   private static int NUM_IMPORT_ROUNDS = 1;
140 
141   private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum";
142 
143   private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
144   private static String TABLE_NAME = "IntegrationTestBulkLoad";
145 
146   private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount";
147   private static int NUM_REPLICA_COUNT_DEFAULT = 1;
148 
149   private static final String OPT_LOAD = "load";
150   private static final String OPT_CHECK = "check";
151 
152   private boolean load = false;
153   private boolean check = false;
154 
155   public static class SlowMeCoproScanOperations extends BaseRegionObserver {
156     static final AtomicLong sleepTime = new AtomicLong(2000);
157     Random r = new Random();
158     AtomicLong countOfNext = new AtomicLong(0);
159     AtomicLong countOfOpen = new AtomicLong(0);
160     public SlowMeCoproScanOperations() {}
161     @Override
162     public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
163         final Scan scan, final RegionScanner s) throws IOException {
164       if (countOfOpen.incrementAndGet() == 2) { //slowdown openScanner randomly
165         slowdownCode(e);
166       }
167       return s;
168     }
169 
170     @Override
171     public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
172         final InternalScanner s, final List<Result> results,
173         final int limit, final boolean hasMore) throws IOException {
174       //this will slow down a certain next operation if the conditions are met. The slowness
175       //will allow the call to go to a replica
176       countOfNext.incrementAndGet();
177       if (countOfNext.get() == 0 || countOfNext.get() == 4) {
178         slowdownCode(e);
179       }
180       return true;
181     }
182     protected void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
183       if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
184         try {
185           if (sleepTime.get() > 0) {
186             LOG.info("Sleeping for " + sleepTime.get() + " ms");
187             Thread.sleep(sleepTime.get());
188           }
189         } catch (InterruptedException e1) {
190           LOG.error(e1);
191         }
192       }
193     }
194   }
195 
196   /**
197    * Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}.
198    */
199   private void installSlowingCoproc() throws IOException, InterruptedException {
200     int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
201     if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
202 
203     TableName t = getTablename();
204     Admin admin = util.getHBaseAdmin();
205     HTableDescriptor desc = admin.getTableDescriptor(t);
206     desc.addCoprocessor(SlowMeCoproScanOperations.class.getName());
207     HBaseTestingUtility.modifyTableSync(admin, desc);
208     //sleep for sometime. Hope is that the regions are closed/opened before 
209     //the sleep returns. TODO: do this better
210     Thread.sleep(30000);
211   }
212 
213   @Test
214   public void testBulkLoad() throws Exception {
215     runLoad();
216     installSlowingCoproc();
217     runCheck();
218   }
219 
220   public void runLoad() throws Exception {
221     setupTable();
222     int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
223     LOG.info("Running load with numIterations:" + numImportRounds);
224     for (int i = 0; i < numImportRounds; i++) {
225       runLinkedListMRJob(i);
226     }
227   }
228 
229   private byte[][] getSplits(int numRegions) {
230     RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
231     split.setFirstRow(Bytes.toBytes(0L));
232     split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
233     return split.split(numRegions);
234   }
235 
236   private void setupTable() throws IOException, InterruptedException {
237     if (util.getHBaseAdmin().tableExists(getTablename())) {
238       util.deleteTable(getTablename());
239     }
240 
241     util.createTable(
242         getTablename().getName(),
243         new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM},
244         getSplits(16)
245     );
246 
247     int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
248     if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
249 
250     TableName t = getTablename();
251     HBaseTestingUtility.setReplicas(util.getHBaseAdmin(), t, replicaCount);
252   }
253 
254   private void runLinkedListMRJob(int iteration) throws Exception {
255     String jobName =  IntegrationTestBulkLoad.class.getSimpleName() + " - " +
256         EnvironmentEdgeManager.currentTime();
257     Configuration conf = new Configuration(util.getConfiguration());
258     Path p = null;
259     if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
260       p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration);
261     } else {
262       p = new Path(conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY));
263     }
264 
265     conf.setBoolean("mapreduce.map.speculative", false);
266     conf.setBoolean("mapreduce.reduce.speculative", false);
267     conf.setInt(ROUND_NUM_KEY, iteration);
268 
269     Job job = new Job(conf);
270 
271     job.setJobName(jobName);
272 
273     // set the input format so that we can create map tasks with no data input.
274     job.setInputFormatClass(ITBulkLoadInputFormat.class);
275 
276     // Set the mapper classes.
277     job.setMapperClass(LinkedListCreationMapper.class);
278     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
279     job.setMapOutputValueClass(KeyValue.class);
280 
281     // Use the identity reducer
282     // So nothing to do here.
283 
284     // Set this jar.
285     job.setJarByClass(getClass());
286 
287     // Set where to place the hfiles.
288     FileOutputFormat.setOutputPath(job, p);
289     try (Connection conn = ConnectionFactory.createConnection(conf);
290         Admin admin = conn.getAdmin();
291         Table table = conn.getTable(getTablename());
292         RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
293       
294       // Configure the partitioner and other things needed for HFileOutputFormat.
295       HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
296 
297       // Run the job making sure it works.
298       assertEquals(true, job.waitForCompletion(true));
299 
300       // Create a new loader.
301       LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
302 
303       // Load the HFiles in.
304       loader.doBulkLoad(p, admin, table, regionLocator);
305     }
306 
307     // Delete the files.
308     util.getTestFileSystem().delete(p, true);
309   }
310 
311   public static class EmptySplit extends InputSplit implements Writable {
312     @Override
313     public void write(DataOutput out) throws IOException { }
314     @Override
315     public void readFields(DataInput in) throws IOException { }
316     @Override
317     public long getLength() { return 0L; }
318     @Override
319     public String[] getLocations() { return new String[0]; }
320   }
321 
322   public static class FixedRecordReader<K, V> extends RecordReader<K, V> {
323     private int index = -1;
324     private K[] keys;
325     private V[] values;
326 
327     public FixedRecordReader(K[] keys, V[] values) {
328       this.keys = keys;
329       this.values = values;
330     }
331     @Override
332     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
333     InterruptedException { }
334     @Override
335     public boolean nextKeyValue() throws IOException, InterruptedException {
336       return ++index < keys.length;
337     }
338     @Override
339     public K getCurrentKey() throws IOException, InterruptedException {
340       return keys[index];
341     }
342     @Override
343     public V getCurrentValue() throws IOException, InterruptedException {
344       return values[index];
345     }
346     @Override
347     public float getProgress() throws IOException, InterruptedException {
348       return (float)index / keys.length;
349     }
350     @Override
351     public void close() throws IOException {
352     }
353   }
354 
355   public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> {
356     @Override
357     public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
358       int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
359       ArrayList<InputSplit> ret = new ArrayList<InputSplit>(numSplits);
360       for (int i = 0; i < numSplits; ++i) {
361         ret.add(new EmptySplit());
362       }
363       return ret;
364     }
365 
366     @Override
367     public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
368       TaskAttemptContext context)
369           throws IOException, InterruptedException {
370       int taskId = context.getTaskAttemptID().getTaskID().getId();
371       int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
372       int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
373       int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
374 
375       taskId = taskId + iteration * numMapTasks;
376       numMapTasks = numMapTasks * numIterations;
377 
378       long chainId = Math.abs(new Random().nextLong());
379       chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations
380       LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};
381 
382       return new FixedRecordReader<LongWritable, LongWritable>(keys, keys);
383     }
384   }
385 
386   /**
387    * Mapper that creates a linked list of KeyValues.
388    *
389    * Each map task generates one linked list.
390    * All lists start on row key 0L.
391    * All lists should be CHAIN_LENGTH long.
392    */
393   public static class LinkedListCreationMapper
394       extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
395 
396     private Random rand = new Random();
397 
398     @Override
399     protected void map(LongWritable key, LongWritable value, Context context)
400         throws IOException, InterruptedException {
401       long chainId = value.get();
402       LOG.info("Starting mapper with chainId:" + chainId);
403 
404       byte[] chainIdArray = Bytes.toBytes(chainId);
405       long currentRow = 0;
406 
407       long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
408       long nextRow = getNextRow(0, chainLength);
409 
410       for (long i = 0; i < chainLength; i++) {
411         byte[] rk = Bytes.toBytes(currentRow);
412 
413         // Next link in the chain.
414         KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow));
415         // What link in the chain this is.
416         KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i));
417         // Added data so that large stores are created.
418         KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray,
419           Bytes.toBytes(RandomStringUtils.randomAlphabetic(50))
420         );
421 
422         // Emit the key values.
423         context.write(new ImmutableBytesWritable(rk), linkKv);
424         context.write(new ImmutableBytesWritable(rk), sortKv);
425         context.write(new ImmutableBytesWritable(rk), dataKv);
426         // Move to the next row.
427         currentRow = nextRow;
428         nextRow = getNextRow(i+1, chainLength);
429       }
430     }
431 
432     /** Returns a unique row id within this chain for this index */
433     private long getNextRow(long index, long chainLength) {
434       long nextRow = Math.abs(rand.nextLong());
435       // use significant bits from the random number, but pad with index to ensure it is unique
436       // this also ensures that we do not reuse row = 0
437       // row collisions from multiple mappers are fine, since we guarantee unique chainIds
438       nextRow = nextRow - (nextRow % chainLength) + index;
439       return nextRow;
440     }
441   }
442 
443   /**
444    * Writable class used as the key to group links in the linked list.
445    *
446    * Used as the key emited from a pass over the table.
447    */
448   public static class LinkKey implements WritableComparable<LinkKey> {
449 
450     private Long chainId;
451 
452     public Long getOrder() {
453       return order;
454     }
455 
456     public Long getChainId() {
457       return chainId;
458     }
459 
460     private Long order;
461 
462     public LinkKey() {}
463 
464     public LinkKey(long chainId, long order) {
465       this.chainId = chainId;
466       this.order = order;
467     }
468 
469     @Override
470     public int compareTo(LinkKey linkKey) {
471       int res = getChainId().compareTo(linkKey.getChainId());
472       if (res == 0) {
473         res = getOrder().compareTo(linkKey.getOrder());
474       }
475       return res;
476     }
477 
478     @Override
479     public void write(DataOutput dataOutput) throws IOException {
480       WritableUtils.writeVLong(dataOutput, chainId);
481       WritableUtils.writeVLong(dataOutput, order);
482     }
483 
484     @Override
485     public void readFields(DataInput dataInput) throws IOException {
486       chainId = WritableUtils.readVLong(dataInput);
487       order = WritableUtils.readVLong(dataInput);
488     }
489   }
490 
491   /**
492    * Writable used as the value emitted from a pass over the hbase table.
493    */
494   public static class LinkChain implements WritableComparable<LinkChain> {
495 
496     public Long getNext() {
497       return next;
498     }
499 
500     public Long getRk() {
501       return rk;
502     }
503 
504     public LinkChain() {}
505 
506     public LinkChain(Long rk, Long next) {
507       this.rk = rk;
508       this.next = next;
509     }
510 
511     private Long rk;
512     private Long next;
513 
514     @Override
515     public int compareTo(LinkChain linkChain) {
516       int res = getRk().compareTo(linkChain.getRk());
517       if (res == 0) {
518         res = getNext().compareTo(linkChain.getNext());
519       }
520       return res;
521     }
522 
523     @Override
524     public void write(DataOutput dataOutput) throws IOException {
525       WritableUtils.writeVLong(dataOutput, rk);
526       WritableUtils.writeVLong(dataOutput, next);
527     }
528 
529     @Override
530     public void readFields(DataInput dataInput) throws IOException {
531       rk = WritableUtils.readVLong(dataInput);
532       next = WritableUtils.readVLong(dataInput);
533     }
534   }
535 
536   /**
537    * Class to figure out what partition to send a link in the chain to.  This is based upon
538    * the linkKey's ChainId.
539    */
540   public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> {
541     @Override
542     public int getPartition(LinkKey linkKey,
543                             LinkChain linkChain,
544                             int numPartitions) {
545       int hash = linkKey.getChainId().hashCode();
546       return Math.abs(hash % numPartitions);
547     }
548   }
549 
550   /**
551    * Comparator used to figure out if a linkKey should be grouped together.  This is based upon the
552    * linkKey's ChainId.
553    */
554   public static class NaturalKeyGroupingComparator extends WritableComparator {
555 
556     protected NaturalKeyGroupingComparator() {
557       super(LinkKey.class, true);
558     }
559 
560     @Override
561     public int compare(WritableComparable w1, WritableComparable w2) {
562       LinkKey k1 = (LinkKey) w1;
563       LinkKey k2 = (LinkKey) w2;
564 
565       return k1.getChainId().compareTo(k2.getChainId());
566     }
567   }
568 
569   /**
570    * Comparator used to order linkKeys so that they are passed to a reducer in order.  This is based
571    * upon linkKey ChainId and Order.
572    */
573   public static class CompositeKeyComparator extends WritableComparator {
574 
575     protected CompositeKeyComparator() {
576       super(LinkKey.class, true);
577     }
578 
579     @Override
580     public int compare(WritableComparable w1, WritableComparable w2) {
581       LinkKey k1 = (LinkKey) w1;
582       LinkKey k2 = (LinkKey) w2;
583 
584       return k1.compareTo(k2);
585     }
586   }
587 
588   /**
589    * Mapper to pass over the table.
590    *
591    * For every row there could be multiple chains that landed on this row. So emit a linkKey
592    * and value for each.
593    */
594   public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
595     @Override
596     protected void map(ImmutableBytesWritable key, Result value, Context context)
597         throws IOException, InterruptedException {
598       long longRk = Bytes.toLong(value.getRow());
599 
600       for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) {
601         long chainId = Bytes.toLong(entry.getKey());
602         long next = Bytes.toLong(entry.getValue());
603         Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0);
604         long order = Bytes.toLong(CellUtil.cloneValue(c));
605         context.write(new LinkKey(chainId, order), new LinkChain(longRk, next));
606       }
607     }
608   }
609 
610   /**
611    * Class that does the actual checking of the links.
612    *
613    * All links in the chain should be grouped and sorted when sent to this class.  Then the chain
614    * will be traversed making sure that no link is missing and that the chain is the correct length.
615    *
616    * This will throw an exception if anything is not correct.  That causes the job to fail if any
617    * data is corrupt.
618    */
619   public static class LinkedListCheckingReducer
620       extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
621     @Override
622     protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
623         throws java.io.IOException, java.lang.InterruptedException {
624       long next = -1L;
625       long prev = -1L;
626       long count = 0L;
627 
628       for (LinkChain lc : values) {
629 
630         if (next == -1) {
631           if (lc.getRk() != 0L) {
632             String msg = "Chains should all start at rk 0, but read rk " + lc.getRk()
633                 + ". Chain:" + key.chainId + ", order:" + key.order;
634             logError(msg, context);
635             throw new RuntimeException(msg);
636           }
637           next = lc.getNext();
638         } else {
639           if (next != lc.getRk()) {
640             String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting "
641                 + next + " but got " + lc.getRk() + ". Chain:" + key.chainId
642                 + ", order:" + key.order;
643             logError(msg, context);
644             throw new RuntimeException(msg);
645           }
646           prev = lc.getRk();
647           next = lc.getNext();
648         }
649         count++;
650       }
651 
652       int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
653       if (count != expectedChainLen) {
654         String msg = "Chain wasn't the correct length.  Expected " + expectedChainLen + " got "
655             + count + ". Chain:" + key.chainId + ", order:" + key.order;
656         logError(msg, context);
657         throw new RuntimeException(msg);
658       }
659     }
660 
661     private static void logError(String msg, Context context) throws IOException {
662       HBaseTestingUtility util = new HBaseTestingUtility(context.getConfiguration());
663       TableName table = getTableName(context.getConfiguration());
664 
665       LOG.error("Failure in chain verification: " + msg);
666       LOG.error("cluster status:\n" + util.getHBaseClusterInterface().getClusterStatus());
667       LOG.error("table regions:\n"
668           + Joiner.on("\n").join(util.getHBaseAdmin().getTableRegions(table)));
669     }
670   }
671 
672   /**
673    * After adding data to the table start a mr job to
674    * @throws IOException
675    * @throws ClassNotFoundException
676    * @throws InterruptedException
677    */
678   private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
679     LOG.info("Running check");
680     Configuration conf = getConf();
681     String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime();
682     Path p = util.getDataTestDirOnTestFS(jobName);
683 
684     Job job = new Job(conf);
685     job.setJarByClass(getClass());
686     job.setJobName(jobName);
687 
688     job.setPartitionerClass(NaturalKeyPartitioner.class);
689     job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
690     job.setSortComparatorClass(CompositeKeyComparator.class);
691 
692     Scan scan = new Scan();
693     scan.addFamily(CHAIN_FAM);
694     scan.addFamily(SORT_FAM);
695     scan.setMaxVersions(1);
696     scan.setCacheBlocks(false);
697     scan.setBatch(1000);
698 
699     int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
700     if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
701       scan.setConsistency(Consistency.TIMELINE);
702     }
703 
704     TableMapReduceUtil.initTableMapperJob(
705         getTablename().getName(),
706         scan,
707         LinkedListCheckingMapper.class,
708         LinkKey.class,
709         LinkChain.class,
710         job
711     );
712 
713     job.setReducerClass(LinkedListCheckingReducer.class);
714     job.setOutputKeyClass(NullWritable.class);
715     job.setOutputValueClass(NullWritable.class);
716 
717     FileOutputFormat.setOutputPath(job, p);
718 
719     assertEquals(true, job.waitForCompletion(true));
720 
721     // Delete the files.
722     util.getTestFileSystem().delete(p, true);
723   }
724 
725   @Override
726   public void setUpCluster() throws Exception {
727     util = getTestingUtil(getConf());
728     util.initializeCluster(1);
729     int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
730     if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
731       LOG.debug("Region Replicas enabled: " + replicaCount);
732     }
733 
734     // Scale this up on a real cluster
735     if (util.isDistributedCluster()) {
736       util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
737           Integer.toString(util.getHBaseAdmin().getClusterStatus().getServersSize() * 10)
738       );
739       util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5");
740     } else {
741       util.startMiniMapReduceCluster();
742     }
743   }
744 
745   @Override
746   protected void addOptions() {
747     super.addOptions();
748     super.addOptNoArg(OPT_CHECK, "Run check only");
749     super.addOptNoArg(OPT_LOAD, "Run load only");
750   }
751 
752   @Override
753   protected void processOptions(CommandLine cmd) {
754     super.processOptions(cmd);
755     check = cmd.hasOption(OPT_CHECK);
756     load = cmd.hasOption(OPT_LOAD);
757   }
758 
759   @Override
760   public int runTestFromCommandLine() throws Exception {
761     if (load) {
762       runLoad();
763     } else if (check) {
764       installSlowingCoproc();
765       runCheck();
766     } else {
767       testBulkLoad();
768     }
769     return 0;
770   }
771 
772   @Override
773   public TableName getTablename() {
774     return getTableName(getConf());
775   }
776 
777   public static TableName getTableName(Configuration conf) {
778     return TableName.valueOf(conf.get(TABLE_NAME_KEY, TABLE_NAME));
779   }
780 
781   @Override
782   protected Set<String> getColumnFamilies() {
783     return Sets.newHashSet(Bytes.toString(CHAIN_FAM) , Bytes.toString(DATA_FAM),
784         Bytes.toString(SORT_FAM));
785   }
786 
787   public static void main(String[] args) throws Exception {
788     Configuration conf = HBaseConfiguration.create();
789     IntegrationTestingUtility.setUseDistributedCluster(conf);
790     int status =  ToolRunner.run(conf, new IntegrationTestBulkLoad(), args);
791     System.exit(status);
792   }
793 }