View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertEquals;
22  
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Random;
30  import java.util.Set;
31  
32  import org.apache.commons.cli.CommandLine;
33  import org.apache.commons.lang.RandomStringUtils;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.HBaseConfiguration;
41  import org.apache.hadoop.hbase.IntegrationTestBase;
42  import org.apache.hadoop.hbase.IntegrationTestingUtility;
43  import org.apache.hadoop.hbase.IntegrationTests;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.client.HTable;
46  import org.apache.hadoop.hbase.client.Result;
47  import org.apache.hadoop.hbase.client.Scan;
48  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51  import org.apache.hadoop.hbase.util.RegionSplitter;
52  import org.apache.hadoop.io.LongWritable;
53  import org.apache.hadoop.io.NullWritable;
54  import org.apache.hadoop.io.Writable;
55  import org.apache.hadoop.io.WritableComparable;
56  import org.apache.hadoop.io.WritableComparator;
57  import org.apache.hadoop.io.WritableUtils;
58  import org.apache.hadoop.mapreduce.InputFormat;
59  import org.apache.hadoop.mapreduce.InputSplit;
60  import org.apache.hadoop.mapreduce.Job;
61  import org.apache.hadoop.mapreduce.JobContext;
62  import org.apache.hadoop.mapreduce.Mapper;
63  import org.apache.hadoop.mapreduce.Partitioner;
64  import org.apache.hadoop.mapreduce.RecordReader;
65  import org.apache.hadoop.mapreduce.Reducer;
66  import org.apache.hadoop.mapreduce.TaskAttemptContext;
67  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
68  import org.apache.hadoop.util.ToolRunner;
69  import org.junit.Test;
70  import org.junit.experimental.categories.Category;
71  
72  /**
73   * Test Bulk Load and MR on a distributed cluster.
74   * It starts an MR job that creates linked chains
75   *
76   * The format of rows is like this:
77   * Row Key -> Long
78   *
79   * L:<< Chain Id >> -> Row Key of the next link in the chain
80   * S:<< Chain Id >> -> The step in the chain that his link is.
81   * D:<< Chain Id >> -> Random Data.
82   *
83   * All chains start on row 0.
84   * All rk's are > 0.
85   *
86   * After creating the linked lists they are walked over using a TableMapper based Mapreduce Job.
87   *
88   * There are a few options exposed:
89   *
90   * hbase.IntegrationTestBulkLoad.chainLength
91   * The number of rows that will be part of each and every chain.
92   *
93   * hbase.IntegrationTestBulkLoad.numMaps
94   * The number of mappers that will be run.  Each mapper creates on linked list chain.
95   *
96   * hbase.IntegrationTestBulkLoad.numImportRounds
97   * How many jobs will be run to create linked lists.
98   *
99   * hbase.IntegrationTestBulkLoad.tableName
100  * The name of the table.
101  *
102  */
103 @Category(IntegrationTests.class)
104 public class IntegrationTestBulkLoad extends IntegrationTestBase {
105 
106   private static final Log LOG = LogFactory.getLog(IntegrationTestBulkLoad.class);
107 
108   private static byte[] CHAIN_FAM = Bytes.toBytes("L");
109   private static byte[] SORT_FAM  = Bytes.toBytes("S");
110   private static byte[] DATA_FAM  = Bytes.toBytes("D");
111 
112   private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength";
113   private static int CHAIN_LENGTH = 500000;
114 
115   private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
116   private static int NUM_MAPS = 1;
117 
118   private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
119   private static int NUM_IMPORT_ROUNDS = 1;
120 
121   private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum";
122 
123   private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
124   private static String TABLE_NAME = "IntegrationTestBulkLoad";
125 
126   @Test
127   public void testBulkLoad() throws Exception {
128     runLoad();
129     runCheck();
130   }
131 
132   public void runLoad() throws Exception {
133     setupTable();
134     int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
135     LOG.info("Running load with numIterations:" + numImportRounds);
136     for (int i = 0; i < numImportRounds; i++) {
137       runLinkedListMRJob(i);
138     }
139   }
140 
141   private byte[][] getSplits(int numRegions) {
142     RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
143     split.setFirstRow(Bytes.toBytes(0L));
144     split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
145     return split.split(numRegions);
146   }
147 
148   private void setupTable() throws IOException {
149     if (util.getHBaseAdmin().tableExists(getTablename())) {
150       util.deleteTable(getTablename());
151     }
152 
153     util.createTable(
154         Bytes.toBytes(getTablename()),
155         new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM},
156         getSplits(16)
157     );
158   }
159 
160   private void runLinkedListMRJob(int iteration) throws Exception {
161     String jobName =  IntegrationTestBulkLoad.class.getSimpleName() + " - " +
162         EnvironmentEdgeManager.currentTimeMillis();
163     Configuration conf = new Configuration(util.getConfiguration());
164     Path p = util.getDataTestDirOnTestFS(getTablename() +  "-" + iteration);
165     HTable table = new HTable(conf, getTablename());
166 
167     conf.setBoolean("mapreduce.map.speculative", false);
168     conf.setBoolean("mapreduce.reduce.speculative", false);
169     conf.setInt(ROUND_NUM_KEY, iteration);
170 
171     Job job = new Job(conf);
172 
173     job.setJobName(jobName);
174 
175     // set the input format so that we can create map tasks with no data input.
176     job.setInputFormatClass(ITBulkLoadInputFormat.class);
177 
178     // Set the mapper classes.
179     job.setMapperClass(LinkedListCreationMapper.class);
180     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
181     job.setMapOutputValueClass(KeyValue.class);
182 
183     // Use the identity reducer
184     // So nothing to do here.
185 
186     // Set this jar.
187     job.setJarByClass(getClass());
188 
189     // Set where to place the hfiles.
190     FileOutputFormat.setOutputPath(job, p);
191 
192     // Configure the partitioner and other things needed for HFileOutputFormat.
193     HFileOutputFormat.configureIncrementalLoad(job, table);
194 
195     // Run the job making sure it works.
196     assertEquals(true, job.waitForCompletion(true));
197 
198     // Create a new loader.
199     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
200 
201     // Load the HFiles in.
202     loader.doBulkLoad(p, table);
203 
204     // Delete the files.
205     util.getTestFileSystem().delete(p, true);
206   }
207 
208   public static class EmptySplit extends InputSplit implements Writable {
209     @Override
210     public void write(DataOutput out) throws IOException { }
211     @Override
212     public void readFields(DataInput in) throws IOException { }
213     @Override
214     public long getLength() { return 0L; }
215     @Override
216     public String[] getLocations() { return new String[0]; }
217   }
218 
219   public static class FixedRecordReader<K, V> extends RecordReader<K, V> {
220     private int index = -1;
221     private K[] keys;
222     private V[] values;
223 
224     public FixedRecordReader(K[] keys, V[] values) {
225       this.keys = keys;
226       this.values = values;
227     }
228     @Override
229     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
230     InterruptedException { }
231     @Override
232     public boolean nextKeyValue() throws IOException, InterruptedException {
233       return ++index < keys.length;
234     }
235     @Override
236     public K getCurrentKey() throws IOException, InterruptedException {
237       return keys[index];
238     }
239     @Override
240     public V getCurrentValue() throws IOException, InterruptedException {
241       return values[index];
242     }
243     @Override
244     public float getProgress() throws IOException, InterruptedException {
245       return (float)index / keys.length;
246     }
247     @Override
248     public void close() throws IOException {
249     }
250   }
251 
252   public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> {
253     @Override
254     public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
255       int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
256       ArrayList<InputSplit> ret = new ArrayList<InputSplit>(numSplits);
257       for (int i = 0; i < numSplits; ++i) {
258         ret.add(new EmptySplit());
259       }
260       return ret;
261     }
262 
263     @Override
264     public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
265       TaskAttemptContext context)
266           throws IOException, InterruptedException {
267       int taskId = context.getTaskAttemptID().getTaskID().getId();
268       int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
269       int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
270       int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
271 
272       taskId = taskId + iteration * numMapTasks;
273       numMapTasks = numMapTasks * numIterations;
274 
275       long chainId = Math.abs(new Random().nextLong());
276       chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations
277       LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};
278 
279       return new FixedRecordReader<LongWritable, LongWritable>(keys, keys);
280     }
281   }
282 
283   /**
284    * Mapper that creates a linked list of KeyValues.
285    *
286    * Each map task generates one linked list.
287    * All lists start on row key 0L.
288    * All lists should be CHAIN_LENGTH long.
289    */
290   public static class LinkedListCreationMapper
291       extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
292 
293     private Random rand = new Random();
294 
295     @Override
296     protected void map(LongWritable key, LongWritable value, Context context)
297         throws IOException, InterruptedException {
298       long chainId = value.get();
299       LOG.info("Starting mapper with chainId:" + chainId);
300 
301       byte[] chainIdArray = Bytes.toBytes(chainId);
302       long currentRow = 0;
303 
304       long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
305       long nextRow = getNextRow(0, chainLength);
306 
307       for (long i = 0; i < chainLength; i++) {
308         byte[] rk = Bytes.toBytes(currentRow);
309 
310         // Next link in the chain.
311         KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow));
312         // What link in the chain this is.
313         KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i));
314         // Added data so that large stores are created.
315         KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray,
316           Bytes.toBytes(RandomStringUtils.randomAlphabetic(50))
317         );
318 
319         // Emit the key values.
320         context.write(new ImmutableBytesWritable(rk), linkKv);
321         context.write(new ImmutableBytesWritable(rk), sortKv);
322         context.write(new ImmutableBytesWritable(rk), dataKv);
323         // Move to the next row.
324         currentRow = nextRow;
325         nextRow = getNextRow(i+1, chainLength);
326       }
327     }
328 
329     /** Returns a unique row id within this chain for this index */
330     private long getNextRow(long index, long chainLength) {
331       long nextRow = Math.abs(rand.nextLong());
332       // use significant bits from the random number, but pad with index to ensure it is unique
333       // this also ensures that we do not reuse row = 0
334       // row collisions from multiple mappers are fine, since we guarantee unique chainIds
335       nextRow = nextRow - (nextRow % chainLength) + index;
336       return nextRow;
337     }
338   }
339 
340   /**
341    * Writable class used as the key to group links in the linked list.
342    *
343    * Used as the key emited from a pass over the table.
344    */
345   public static class LinkKey implements WritableComparable<LinkKey> {
346 
347     private Long chainId;
348 
349     public Long getOrder() {
350       return order;
351     }
352 
353     public Long getChainId() {
354       return chainId;
355     }
356 
357     private Long order;
358 
359     public LinkKey() {}
360 
361     public LinkKey(long chainId, long order) {
362       this.chainId = chainId;
363       this.order = order;
364     }
365 
366     @Override
367     public int compareTo(LinkKey linkKey) {
368       int res = getChainId().compareTo(linkKey.getChainId());
369       if (res == 0) {
370         res = getOrder().compareTo(linkKey.getOrder());
371       }
372       return res;
373     }
374 
375     @Override
376     public void write(DataOutput dataOutput) throws IOException {
377       WritableUtils.writeVLong(dataOutput, chainId);
378       WritableUtils.writeVLong(dataOutput, order);
379     }
380 
381     @Override
382     public void readFields(DataInput dataInput) throws IOException {
383       chainId = WritableUtils.readVLong(dataInput);
384       order = WritableUtils.readVLong(dataInput);
385     }
386   }
387 
388   /**
389    * Writable used as the value emitted from a pass over the hbase table.
390    */
391   public static class LinkChain implements WritableComparable<LinkChain> {
392 
393     public Long getNext() {
394       return next;
395     }
396 
397     public Long getRk() {
398       return rk;
399     }
400 
401     public LinkChain() {}
402 
403     public LinkChain(Long rk, Long next) {
404       this.rk = rk;
405       this.next = next;
406     }
407 
408     private Long rk;
409     private Long next;
410 
411     @Override
412     public int compareTo(LinkChain linkChain) {
413       int res = getRk().compareTo(linkChain.getRk());
414       if (res == 0) {
415         res = getNext().compareTo(linkChain.getNext());
416       }
417       return res;
418     }
419 
420     @Override
421     public void write(DataOutput dataOutput) throws IOException {
422       WritableUtils.writeVLong(dataOutput, rk);
423       WritableUtils.writeVLong(dataOutput, next);
424     }
425 
426     @Override
427     public void readFields(DataInput dataInput) throws IOException {
428       rk = WritableUtils.readVLong(dataInput);
429       next = WritableUtils.readVLong(dataInput);
430     }
431   }
432 
433   /**
434    * Class to figure out what partition to send a link in the chain to.  This is based upon
435    * the linkKey's ChainId.
436    */
437   public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> {
438     @Override
439     public int getPartition(LinkKey linkKey,
440                             LinkChain linkChain,
441                             int numPartitions) {
442       int hash = linkKey.getChainId().hashCode();
443       return hash % numPartitions;
444     }
445   }
446 
447   /**
448    * Comparator used to figure out if a linkKey should be grouped together.  This is based upon the
449    * linkKey's ChainId.
450    */
451   public static class NaturalKeyGroupingComparator extends WritableComparator {
452 
453     protected NaturalKeyGroupingComparator() {
454       super(LinkKey.class, true);
455     }
456 
457     @Override
458     public int compare(WritableComparable w1, WritableComparable w2) {
459       LinkKey k1 = (LinkKey) w1;
460       LinkKey k2 = (LinkKey) w2;
461 
462       return k1.getChainId().compareTo(k2.getChainId());
463     }
464   }
465 
466   /**
467    * Comparator used to order linkKeys so that they are passed to a reducer in order.  This is based
468    * upon linkKey ChainId and Order.
469    */
470   public static class CompositeKeyComparator extends WritableComparator {
471 
472     protected CompositeKeyComparator() {
473       super(LinkKey.class, true);
474     }
475 
476     @Override
477     public int compare(WritableComparable w1, WritableComparable w2) {
478       LinkKey k1 = (LinkKey) w1;
479       LinkKey k2 = (LinkKey) w2;
480 
481       return k1.compareTo(k2);
482     }
483   }
484 
485   /**
486    * Mapper to pass over the table.
487    *
488    * For every row there could be multiple chains that landed on this row. So emit a linkKey
489    * and value for each.
490    */
491   public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
492     @Override
493     protected void map(ImmutableBytesWritable key, Result value, Context context)
494         throws IOException, InterruptedException {
495       long longRk = Bytes.toLong(value.getRow());
496 
497       for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) {
498         long chainId = Bytes.toLong(entry.getKey());
499         long next = Bytes.toLong(entry.getValue());
500         Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0);
501         long order = Bytes.toLong(CellUtil.cloneValue(c));
502         context.write(new LinkKey(chainId, order), new LinkChain(longRk, next));
503       }
504     }
505   }
506 
507   /**
508    * Class that does the actual checking of the links.
509    *
510    * All links in the chain should be grouped and sorted when sent to this class.  Then the chain
511    * will be traversed making sure that no link is missing and that the chain is the correct length.
512    *
513    * This will throw an exception if anything is not correct.  That causes the job to fail if any
514    * data is corrupt.
515    */
516   public static class LinkedListCheckingReducer
517       extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
518     @Override
519     protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
520         throws java.io.IOException, java.lang.InterruptedException {
521       long next = -1L;
522       long count = 0L;
523 
524       for (LinkChain lc : values) {
525 
526         if (next == -1) {
527           if (lc.getRk() != 0L) throw new RuntimeException("Chains should all start at 0 rk"
528             + ". Chain:" + key.chainId + ", order:" + key.order);
529           next = lc.getNext();
530         } else {
531           if (next != lc.getRk())
532             throw new RuntimeException("Missing a link in the chain. Expecting " +
533                 next + " got " + lc.getRk() + ". Chain:" + key.chainId + ", order:" + key.order);
534           next = lc.getNext();
535         }
536         count++;
537       }
538 
539       int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
540       if (count != expectedChainLen)
541         throw new RuntimeException("Chain wasn't the correct length.  Expected " +
542             expectedChainLen + " got " + count + ". Chain:" + key.chainId + ", order:" + key.order);
543     }
544   }
545 
546   /**
547    * After adding data to the table start a mr job to
548    * @throws IOException
549    * @throws ClassNotFoundException
550    * @throws InterruptedException
551    */
552   private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
553     LOG.info("Running check");
554     Configuration conf = getConf();
555     String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTimeMillis();
556     Path p = util.getDataTestDirOnTestFS(jobName);
557 
558     Job job = new Job(conf);
559 
560     job.setJarByClass(getClass());
561 
562     job.setPartitionerClass(NaturalKeyPartitioner.class);
563     job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
564     job.setSortComparatorClass(CompositeKeyComparator.class);
565 
566     Scan s = new Scan();
567     s.addFamily(CHAIN_FAM);
568     s.addFamily(SORT_FAM);
569     s.setMaxVersions(1);
570     s.setCacheBlocks(false);
571     s.setBatch(1000);
572 
573     TableMapReduceUtil.initTableMapperJob(
574         Bytes.toBytes(getTablename()),
575         new Scan(),
576         LinkedListCheckingMapper.class,
577         LinkKey.class,
578         LinkChain.class,
579         job
580     );
581 
582     job.setReducerClass(LinkedListCheckingReducer.class);
583     job.setOutputKeyClass(NullWritable.class);
584     job.setOutputValueClass(NullWritable.class);
585 
586     FileOutputFormat.setOutputPath(job, p);
587 
588     assertEquals(true, job.waitForCompletion(true));
589 
590     // Delete the files.
591     util.getTestFileSystem().delete(p, true);
592   }
593 
594   @Override
595   public void setUpCluster() throws Exception {
596     util = getTestingUtil(getConf());
597     util.initializeCluster(1);
598 
599     // Scale this up on a real cluster
600     if (util.isDistributedCluster()) {
601       util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
602           Integer.toString(util.getHBaseAdmin().getClusterStatus().getServersSize() * 10)
603       );
604       util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5");
605     } else {
606       util.startMiniMapReduceCluster();
607     }
608   }
609 
610   private static final String OPT_LOAD = "load";
611   private static final String OPT_CHECK = "check";
612 
613   private boolean load = false;
614   private boolean check = false;
615 
616   @Override
617   protected void addOptions() {
618     super.addOptions();
619     super.addOptNoArg(OPT_CHECK, "Run check only");
620     super.addOptNoArg(OPT_LOAD, "Run load only");
621   }
622 
623   @Override
624   protected void processOptions(CommandLine cmd) {
625     super.processOptions(cmd);
626     check = cmd.hasOption(OPT_CHECK);
627     load = cmd.hasOption(OPT_LOAD);
628   }
629 
630   @Override
631   public int runTestFromCommandLine() throws Exception {
632     if (load) {
633       runLoad();
634     } else if (check) {
635       runCheck();
636     } else {
637       testBulkLoad();
638     }
639     return 0;
640   }
641 
642   @Override
643   public String getTablename() {
644     return getConf().get(TABLE_NAME_KEY, TABLE_NAME);
645   }
646 
647   @Override
648   protected Set<String> getColumnFamilies() {
649     return null;
650   }
651 
652   public static void main(String[] args) throws Exception {
653     Configuration conf = HBaseConfiguration.create();
654     IntegrationTestingUtility.setUseDistributedCluster(conf);
655     int status =  ToolRunner.run(conf, new IntegrationTestBulkLoad(), args);
656     System.exit(status);
657   }
658 
659 }