1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertEquals;
22
23 import java.io.DataInput;
24 import java.io.DataOutput;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Random;
30 import java.util.Set;
31
32 import org.apache.commons.cli.CommandLine;
33 import org.apache.commons.lang.RandomStringUtils;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.Cell;
39 import org.apache.hadoop.hbase.CellUtil;
40 import org.apache.hadoop.hbase.HBaseConfiguration;
41 import org.apache.hadoop.hbase.IntegrationTestBase;
42 import org.apache.hadoop.hbase.IntegrationTestingUtility;
43 import org.apache.hadoop.hbase.IntegrationTests;
44 import org.apache.hadoop.hbase.KeyValue;
45 import org.apache.hadoop.hbase.client.HTable;
46 import org.apache.hadoop.hbase.client.Result;
47 import org.apache.hadoop.hbase.client.Scan;
48 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51 import org.apache.hadoop.hbase.util.RegionSplitter;
52 import org.apache.hadoop.io.LongWritable;
53 import org.apache.hadoop.io.NullWritable;
54 import org.apache.hadoop.io.Writable;
55 import org.apache.hadoop.io.WritableComparable;
56 import org.apache.hadoop.io.WritableComparator;
57 import org.apache.hadoop.io.WritableUtils;
58 import org.apache.hadoop.mapreduce.InputFormat;
59 import org.apache.hadoop.mapreduce.InputSplit;
60 import org.apache.hadoop.mapreduce.Job;
61 import org.apache.hadoop.mapreduce.JobContext;
62 import org.apache.hadoop.mapreduce.Mapper;
63 import org.apache.hadoop.mapreduce.Partitioner;
64 import org.apache.hadoop.mapreduce.RecordReader;
65 import org.apache.hadoop.mapreduce.Reducer;
66 import org.apache.hadoop.mapreduce.TaskAttemptContext;
67 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
68 import org.apache.hadoop.util.ToolRunner;
69 import org.junit.Test;
70 import org.junit.experimental.categories.Category;
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 @Category(IntegrationTests.class)
104 public class IntegrationTestBulkLoad extends IntegrationTestBase {
105
106 private static final Log LOG = LogFactory.getLog(IntegrationTestBulkLoad.class);
107
108 private static byte[] CHAIN_FAM = Bytes.toBytes("L");
109 private static byte[] SORT_FAM = Bytes.toBytes("S");
110 private static byte[] DATA_FAM = Bytes.toBytes("D");
111
112 private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength";
113 private static int CHAIN_LENGTH = 500000;
114
115 private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
116 private static int NUM_MAPS = 1;
117
118 private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
119 private static int NUM_IMPORT_ROUNDS = 1;
120
121 private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum";
122
123 private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
124 private static String TABLE_NAME = "IntegrationTestBulkLoad";
125
126 @Test
127 public void testBulkLoad() throws Exception {
128 runLoad();
129 runCheck();
130 }
131
132 public void runLoad() throws Exception {
133 setupTable();
134 int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
135 LOG.info("Running load with numIterations:" + numImportRounds);
136 for (int i = 0; i < numImportRounds; i++) {
137 runLinkedListMRJob(i);
138 }
139 }
140
141 private byte[][] getSplits(int numRegions) {
142 RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
143 split.setFirstRow(Bytes.toBytes(0L));
144 split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
145 return split.split(numRegions);
146 }
147
148 private void setupTable() throws IOException {
149 if (util.getHBaseAdmin().tableExists(getTablename())) {
150 util.deleteTable(getTablename());
151 }
152
153 util.createTable(
154 Bytes.toBytes(getTablename()),
155 new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM},
156 getSplits(16)
157 );
158 }
159
160 private void runLinkedListMRJob(int iteration) throws Exception {
161 String jobName = IntegrationTestBulkLoad.class.getSimpleName() + " - " +
162 EnvironmentEdgeManager.currentTimeMillis();
163 Configuration conf = new Configuration(util.getConfiguration());
164 Path p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration);
165 HTable table = new HTable(conf, getTablename());
166
167 conf.setBoolean("mapreduce.map.speculative", false);
168 conf.setBoolean("mapreduce.reduce.speculative", false);
169 conf.setInt(ROUND_NUM_KEY, iteration);
170
171 Job job = new Job(conf);
172
173 job.setJobName(jobName);
174
175
176 job.setInputFormatClass(ITBulkLoadInputFormat.class);
177
178
179 job.setMapperClass(LinkedListCreationMapper.class);
180 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
181 job.setMapOutputValueClass(KeyValue.class);
182
183
184
185
186
187 job.setJarByClass(getClass());
188
189
190 FileOutputFormat.setOutputPath(job, p);
191
192
193 HFileOutputFormat.configureIncrementalLoad(job, table);
194
195
196 assertEquals(true, job.waitForCompletion(true));
197
198
199 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
200
201
202 loader.doBulkLoad(p, table);
203
204
205 util.getTestFileSystem().delete(p, true);
206 }
207
208 public static class EmptySplit extends InputSplit implements Writable {
209 @Override
210 public void write(DataOutput out) throws IOException { }
211 @Override
212 public void readFields(DataInput in) throws IOException { }
213 @Override
214 public long getLength() { return 0L; }
215 @Override
216 public String[] getLocations() { return new String[0]; }
217 }
218
219 public static class FixedRecordReader<K, V> extends RecordReader<K, V> {
220 private int index = -1;
221 private K[] keys;
222 private V[] values;
223
224 public FixedRecordReader(K[] keys, V[] values) {
225 this.keys = keys;
226 this.values = values;
227 }
228 @Override
229 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
230 InterruptedException { }
231 @Override
232 public boolean nextKeyValue() throws IOException, InterruptedException {
233 return ++index < keys.length;
234 }
235 @Override
236 public K getCurrentKey() throws IOException, InterruptedException {
237 return keys[index];
238 }
239 @Override
240 public V getCurrentValue() throws IOException, InterruptedException {
241 return values[index];
242 }
243 @Override
244 public float getProgress() throws IOException, InterruptedException {
245 return (float)index / keys.length;
246 }
247 @Override
248 public void close() throws IOException {
249 }
250 }
251
252 public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> {
253 @Override
254 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
255 int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
256 ArrayList<InputSplit> ret = new ArrayList<InputSplit>(numSplits);
257 for (int i = 0; i < numSplits; ++i) {
258 ret.add(new EmptySplit());
259 }
260 return ret;
261 }
262
263 @Override
264 public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
265 TaskAttemptContext context)
266 throws IOException, InterruptedException {
267 int taskId = context.getTaskAttemptID().getTaskID().getId();
268 int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
269 int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
270 int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
271
272 taskId = taskId + iteration * numMapTasks;
273 numMapTasks = numMapTasks * numIterations;
274
275 long chainId = Math.abs(new Random().nextLong());
276 chainId = chainId - (chainId % numMapTasks) + taskId;
277 LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};
278
279 return new FixedRecordReader<LongWritable, LongWritable>(keys, keys);
280 }
281 }
282
283
284
285
286
287
288
289
290 public static class LinkedListCreationMapper
291 extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
292
293 private Random rand = new Random();
294
295 @Override
296 protected void map(LongWritable key, LongWritable value, Context context)
297 throws IOException, InterruptedException {
298 long chainId = value.get();
299 LOG.info("Starting mapper with chainId:" + chainId);
300
301 byte[] chainIdArray = Bytes.toBytes(chainId);
302 long currentRow = 0;
303
304 long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
305 long nextRow = getNextRow(0, chainLength);
306
307 for (long i = 0; i < chainLength; i++) {
308 byte[] rk = Bytes.toBytes(currentRow);
309
310
311 KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow));
312
313 KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i));
314
315 KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray,
316 Bytes.toBytes(RandomStringUtils.randomAlphabetic(50))
317 );
318
319
320 context.write(new ImmutableBytesWritable(rk), linkKv);
321 context.write(new ImmutableBytesWritable(rk), sortKv);
322 context.write(new ImmutableBytesWritable(rk), dataKv);
323
324 currentRow = nextRow;
325 nextRow = getNextRow(i+1, chainLength);
326 }
327 }
328
329
330 private long getNextRow(long index, long chainLength) {
331 long nextRow = Math.abs(rand.nextLong());
332
333
334
335 nextRow = nextRow - (nextRow % chainLength) + index;
336 return nextRow;
337 }
338 }
339
340
341
342
343
344
345 public static class LinkKey implements WritableComparable<LinkKey> {
346
347 private Long chainId;
348
349 public Long getOrder() {
350 return order;
351 }
352
353 public Long getChainId() {
354 return chainId;
355 }
356
357 private Long order;
358
359 public LinkKey() {}
360
361 public LinkKey(long chainId, long order) {
362 this.chainId = chainId;
363 this.order = order;
364 }
365
366 @Override
367 public int compareTo(LinkKey linkKey) {
368 int res = getChainId().compareTo(linkKey.getChainId());
369 if (res == 0) {
370 res = getOrder().compareTo(linkKey.getOrder());
371 }
372 return res;
373 }
374
375 @Override
376 public void write(DataOutput dataOutput) throws IOException {
377 WritableUtils.writeVLong(dataOutput, chainId);
378 WritableUtils.writeVLong(dataOutput, order);
379 }
380
381 @Override
382 public void readFields(DataInput dataInput) throws IOException {
383 chainId = WritableUtils.readVLong(dataInput);
384 order = WritableUtils.readVLong(dataInput);
385 }
386 }
387
388
389
390
391 public static class LinkChain implements WritableComparable<LinkChain> {
392
393 public Long getNext() {
394 return next;
395 }
396
397 public Long getRk() {
398 return rk;
399 }
400
401 public LinkChain() {}
402
403 public LinkChain(Long rk, Long next) {
404 this.rk = rk;
405 this.next = next;
406 }
407
408 private Long rk;
409 private Long next;
410
411 @Override
412 public int compareTo(LinkChain linkChain) {
413 int res = getRk().compareTo(linkChain.getRk());
414 if (res == 0) {
415 res = getNext().compareTo(linkChain.getNext());
416 }
417 return res;
418 }
419
420 @Override
421 public void write(DataOutput dataOutput) throws IOException {
422 WritableUtils.writeVLong(dataOutput, rk);
423 WritableUtils.writeVLong(dataOutput, next);
424 }
425
426 @Override
427 public void readFields(DataInput dataInput) throws IOException {
428 rk = WritableUtils.readVLong(dataInput);
429 next = WritableUtils.readVLong(dataInput);
430 }
431 }
432
433
434
435
436
437 public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> {
438 @Override
439 public int getPartition(LinkKey linkKey,
440 LinkChain linkChain,
441 int numPartitions) {
442 int hash = linkKey.getChainId().hashCode();
443 return hash % numPartitions;
444 }
445 }
446
447
448
449
450
451 public static class NaturalKeyGroupingComparator extends WritableComparator {
452
453 protected NaturalKeyGroupingComparator() {
454 super(LinkKey.class, true);
455 }
456
457 @Override
458 public int compare(WritableComparable w1, WritableComparable w2) {
459 LinkKey k1 = (LinkKey) w1;
460 LinkKey k2 = (LinkKey) w2;
461
462 return k1.getChainId().compareTo(k2.getChainId());
463 }
464 }
465
466
467
468
469
470 public static class CompositeKeyComparator extends WritableComparator {
471
472 protected CompositeKeyComparator() {
473 super(LinkKey.class, true);
474 }
475
476 @Override
477 public int compare(WritableComparable w1, WritableComparable w2) {
478 LinkKey k1 = (LinkKey) w1;
479 LinkKey k2 = (LinkKey) w2;
480
481 return k1.compareTo(k2);
482 }
483 }
484
485
486
487
488
489
490
491 public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
492 @Override
493 protected void map(ImmutableBytesWritable key, Result value, Context context)
494 throws IOException, InterruptedException {
495 long longRk = Bytes.toLong(value.getRow());
496
497 for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) {
498 long chainId = Bytes.toLong(entry.getKey());
499 long next = Bytes.toLong(entry.getValue());
500 Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0);
501 long order = Bytes.toLong(CellUtil.cloneValue(c));
502 context.write(new LinkKey(chainId, order), new LinkChain(longRk, next));
503 }
504 }
505 }
506
507
508
509
510
511
512
513
514
515
516 public static class LinkedListCheckingReducer
517 extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
518 @Override
519 protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
520 throws java.io.IOException, java.lang.InterruptedException {
521 long next = -1L;
522 long count = 0L;
523
524 for (LinkChain lc : values) {
525
526 if (next == -1) {
527 if (lc.getRk() != 0L) throw new RuntimeException("Chains should all start at 0 rk"
528 + ". Chain:" + key.chainId + ", order:" + key.order);
529 next = lc.getNext();
530 } else {
531 if (next != lc.getRk())
532 throw new RuntimeException("Missing a link in the chain. Expecting " +
533 next + " got " + lc.getRk() + ". Chain:" + key.chainId + ", order:" + key.order);
534 next = lc.getNext();
535 }
536 count++;
537 }
538
539 int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
540 if (count != expectedChainLen)
541 throw new RuntimeException("Chain wasn't the correct length. Expected " +
542 expectedChainLen + " got " + count + ". Chain:" + key.chainId + ", order:" + key.order);
543 }
544 }
545
546
547
548
549
550
551
552 private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
553 LOG.info("Running check");
554 Configuration conf = getConf();
555 String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTimeMillis();
556 Path p = util.getDataTestDirOnTestFS(jobName);
557
558 Job job = new Job(conf);
559
560 job.setJarByClass(getClass());
561
562 job.setPartitionerClass(NaturalKeyPartitioner.class);
563 job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
564 job.setSortComparatorClass(CompositeKeyComparator.class);
565
566 Scan s = new Scan();
567 s.addFamily(CHAIN_FAM);
568 s.addFamily(SORT_FAM);
569 s.setMaxVersions(1);
570 s.setCacheBlocks(false);
571 s.setBatch(1000);
572
573 TableMapReduceUtil.initTableMapperJob(
574 Bytes.toBytes(getTablename()),
575 new Scan(),
576 LinkedListCheckingMapper.class,
577 LinkKey.class,
578 LinkChain.class,
579 job
580 );
581
582 job.setReducerClass(LinkedListCheckingReducer.class);
583 job.setOutputKeyClass(NullWritable.class);
584 job.setOutputValueClass(NullWritable.class);
585
586 FileOutputFormat.setOutputPath(job, p);
587
588 assertEquals(true, job.waitForCompletion(true));
589
590
591 util.getTestFileSystem().delete(p, true);
592 }
593
594 @Override
595 public void setUpCluster() throws Exception {
596 util = getTestingUtil(getConf());
597 util.initializeCluster(1);
598
599
600 if (util.isDistributedCluster()) {
601 util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
602 Integer.toString(util.getHBaseAdmin().getClusterStatus().getServersSize() * 10)
603 );
604 util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5");
605 } else {
606 util.startMiniMapReduceCluster();
607 }
608 }
609
610 private static final String OPT_LOAD = "load";
611 private static final String OPT_CHECK = "check";
612
613 private boolean load = false;
614 private boolean check = false;
615
616 @Override
617 protected void addOptions() {
618 super.addOptions();
619 super.addOptNoArg(OPT_CHECK, "Run check only");
620 super.addOptNoArg(OPT_LOAD, "Run load only");
621 }
622
623 @Override
624 protected void processOptions(CommandLine cmd) {
625 super.processOptions(cmd);
626 check = cmd.hasOption(OPT_CHECK);
627 load = cmd.hasOption(OPT_LOAD);
628 }
629
630 @Override
631 public int runTestFromCommandLine() throws Exception {
632 if (load) {
633 runLoad();
634 } else if (check) {
635 runCheck();
636 } else {
637 testBulkLoad();
638 }
639 return 0;
640 }
641
642 @Override
643 public String getTablename() {
644 return getConf().get(TABLE_NAME_KEY, TABLE_NAME);
645 }
646
647 @Override
648 protected Set<String> getColumnFamilies() {
649 return null;
650 }
651
652 public static void main(String[] args) throws Exception {
653 Configuration conf = HBaseConfiguration.create();
654 IntegrationTestingUtility.setUseDistributedCluster(conf);
655 int status = ToolRunner.run(conf, new IntegrationTestBulkLoad(), args);
656 System.exit(status);
657 }
658
659 }