1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.test;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Random;
29 import java.util.Set;
30 import java.util.UUID;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.commons.cli.CommandLine;
34 import org.apache.commons.cli.GnuParser;
35 import org.apache.commons.cli.HelpFormatter;
36 import org.apache.commons.cli.Options;
37 import org.apache.commons.cli.ParseException;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.conf.Configured;
42 import org.apache.hadoop.fs.Path;
43 import org.apache.hadoop.hbase.HBaseConfiguration;
44 import org.apache.hadoop.hbase.HBaseTestingUtility;
45 import org.apache.hadoop.hbase.HColumnDescriptor;
46 import org.apache.hadoop.hbase.HRegionLocation;
47 import org.apache.hadoop.hbase.HTableDescriptor;
48 import org.apache.hadoop.hbase.IntegrationTestBase;
49 import org.apache.hadoop.hbase.IntegrationTestingUtility;
50 import org.apache.hadoop.hbase.IntegrationTests;
51 import org.apache.hadoop.hbase.MasterNotRunningException;
52 import org.apache.hadoop.hbase.TableName;
53 import org.apache.hadoop.hbase.client.Get;
54 import org.apache.hadoop.hbase.client.HBaseAdmin;
55 import org.apache.hadoop.hbase.client.HConnection;
56 import org.apache.hadoop.hbase.client.HConnectionManager;
57 import org.apache.hadoop.hbase.client.HTable;
58 import org.apache.hadoop.hbase.client.Put;
59 import org.apache.hadoop.hbase.client.Result;
60 import org.apache.hadoop.hbase.client.ResultScanner;
61 import org.apache.hadoop.hbase.client.Scan;
62 import org.apache.hadoop.hbase.client.ScannerCallable;
63 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
64 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
65 import org.apache.hadoop.hbase.mapreduce.TableMapper;
66 import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
67 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
68 import org.apache.hadoop.hbase.util.Bytes;
69 import org.apache.hadoop.hbase.util.RegionSplitter;
70 import org.apache.hadoop.io.BytesWritable;
71 import org.apache.hadoop.io.NullWritable;
72 import org.apache.hadoop.io.Text;
73 import org.apache.hadoop.io.Writable;
74 import org.apache.hadoop.mapreduce.Counter;
75 import org.apache.hadoop.mapreduce.CounterGroup;
76 import org.apache.hadoop.mapreduce.Counters;
77 import org.apache.hadoop.mapreduce.InputFormat;
78 import org.apache.hadoop.mapreduce.InputSplit;
79 import org.apache.hadoop.mapreduce.Job;
80 import org.apache.hadoop.mapreduce.JobContext;
81 import org.apache.hadoop.mapreduce.Mapper;
82 import org.apache.hadoop.mapreduce.RecordReader;
83 import org.apache.hadoop.mapreduce.Reducer;
84 import org.apache.hadoop.mapreduce.TaskAttemptContext;
85 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
86 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
87 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
88 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
89 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
90 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
91 import org.apache.hadoop.util.Tool;
92 import org.apache.hadoop.util.ToolRunner;
93 import org.junit.Test;
94 import org.junit.experimental.categories.Category;
95
96 import com.google.common.collect.Sets;
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161 @Category(IntegrationTests.class)
162 public class IntegrationTestBigLinkedList extends IntegrationTestBase {
163 protected static final byte[] NO_KEY = new byte[1];
164
165 protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
166
167 protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
168
169 protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
170
171
172 protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
173
174
175 protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
176
177
178 protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
179
180
181 private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
182 = "IntegrationTestBigLinkedList.generator.num_rows";
183
184 private static final String GENERATOR_NUM_MAPPERS_KEY
185 = "IntegrationTestBigLinkedList.generator.map.tasks";
186
187 private static final String GENERATOR_WIDTH_KEY
188 = "IntegrationTestBigLinkedList.generator.width";
189
190 private static final String GENERATOR_WRAP_KEY
191 = "IntegrationTestBigLinkedList.generator.wrap";
192
193 protected int NUM_SLAVES_BASE = 3;
194
195 private static final int MISSING_ROWS_TO_LOG = 50;
196
197 private static final int WIDTH_DEFAULT = 1000000;
198 private static final int WRAP_DEFAULT = 25;
199 private static final int ROWKEY_LENGTH = 16;
200
201 protected String toRun;
202 protected String[] otherArgs;
203
204 static class CINode {
205 byte[] key;
206 byte[] prev;
207 String client;
208 long count;
209 }
210
211
212
213
214 static class Generator extends Configured implements Tool {
215
216 private static final Log LOG = LogFactory.getLog(Generator.class);
217
218 static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
219 static class GeneratorInputSplit extends InputSplit implements Writable {
220 @Override
221 public long getLength() throws IOException, InterruptedException {
222 return 1;
223 }
224 @Override
225 public String[] getLocations() throws IOException, InterruptedException {
226 return new String[0];
227 }
228 @Override
229 public void readFields(DataInput arg0) throws IOException {
230 }
231 @Override
232 public void write(DataOutput arg0) throws IOException {
233 }
234 }
235
236 static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
237 private long count;
238 private long numNodes;
239 private Random rand;
240
241 @Override
242 public void close() throws IOException {
243 }
244
245 @Override
246 public BytesWritable getCurrentKey() throws IOException, InterruptedException {
247 byte[] bytes = new byte[ROWKEY_LENGTH];
248 rand.nextBytes(bytes);
249 return new BytesWritable(bytes);
250 }
251
252 @Override
253 public NullWritable getCurrentValue() throws IOException, InterruptedException {
254 return NullWritable.get();
255 }
256
257 @Override
258 public float getProgress() throws IOException, InterruptedException {
259 return (float)(count / (double)numNodes);
260 }
261
262 @Override
263 public void initialize(InputSplit arg0, TaskAttemptContext context)
264 throws IOException, InterruptedException {
265 numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
266 rand = new Random();
267 }
268
269 @Override
270 public boolean nextKeyValue() throws IOException, InterruptedException {
271 return count++ < numNodes;
272 }
273
274 }
275
276 @Override
277 public RecordReader<BytesWritable,NullWritable> createRecordReader(
278 InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
279 GeneratorRecordReader rr = new GeneratorRecordReader();
280 rr.initialize(split, context);
281 return rr;
282 }
283
284 @Override
285 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
286 int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
287
288 ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numMappers);
289
290 for (int i = 0; i < numMappers; i++) {
291 splits.add(new GeneratorInputSplit());
292 }
293
294 return splits;
295 }
296 }
297
298
299 static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
300 @Override
301 protected boolean isSplitable(JobContext context, Path filename) {
302 return false;
303 }
304 }
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330 static class GeneratorMapper
331 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
332
333 byte[][] first = null;
334 byte[][] prev = null;
335 byte[][] current = null;
336 byte[] id;
337 long count = 0;
338 int i;
339 HTable table;
340 long numNodes;
341 long wrap;
342 int width;
343
344 @Override
345 protected void setup(Context context) throws IOException, InterruptedException {
346 id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID());
347 Configuration conf = context.getConfiguration();
348 instantiateHTable(conf);
349 this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
350 current = new byte[this.width][];
351 int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
352 this.wrap = (long)wrapMultiplier * width;
353 this.numNodes = context.getConfiguration().getLong(
354 GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT);
355 if (this.numNodes < this.wrap) {
356 this.wrap = this.numNodes;
357 }
358 }
359
360 protected void instantiateHTable(Configuration conf) throws IOException {
361 table = new HTable(conf, getTableName(conf));
362 table.setAutoFlush(false, true);
363 table.setWriteBufferSize(4 * 1024 * 1024);
364 }
365
366 @Override
367 protected void cleanup(Context context) throws IOException ,InterruptedException {
368 table.close();
369 }
370
371 @Override
372 protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
373 current[i] = new byte[key.getLength()];
374 System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
375 if (++i == current.length) {
376 persist(output, count, prev, current, id);
377 i = 0;
378
379 if (first == null)
380 first = current;
381 prev = current;
382 current = new byte[this.width][];
383
384 count += current.length;
385 output.setStatus("Count " + count);
386
387 if (count % wrap == 0) {
388
389
390 circularLeftShift(first);
391
392 persist(output, -1, prev, first, null);
393
394 first = null;
395 prev = null;
396 }
397 }
398 }
399
400 private static <T> void circularLeftShift(T[] first) {
401 T ez = first[0];
402 for (int i = 0; i < first.length - 1; i++)
403 first[i] = first[i + 1];
404 first[first.length - 1] = ez;
405 }
406
407 protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
408 throws IOException {
409 for (int i = 0; i < current.length; i++) {
410 Put put = new Put(current[i]);
411 put.add(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
412
413 if (count >= 0) {
414 put.add(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
415 }
416 if (id != null) {
417 put.add(FAMILY_NAME, COLUMN_CLIENT, id);
418 }
419 table.put(put);
420
421 if (i % 1000 == 0) {
422
423 output.progress();
424 }
425 }
426
427 table.flushCommits();
428 }
429 }
430
431 @Override
432 public int run(String[] args) throws Exception {
433 if (args.length < 3) {
434 System.out.println("Usage : " + Generator.class.getSimpleName() +
435 " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>]");
436 System.out.println(" where <num nodes per map> should be a multiple of " +
437 " width*wrap multiplier, 25M by default");
438 return 0;
439 }
440
441 int numMappers = Integer.parseInt(args[0]);
442 long numNodes = Long.parseLong(args[1]);
443 Path tmpOutput = new Path(args[2]);
444 Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
445 Integer wrapMuplitplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
446 return run(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
447 }
448
449 protected void createSchema() throws IOException {
450 Configuration conf = getConf();
451 HBaseAdmin admin = new HBaseAdmin(conf);
452 TableName tableName = getTableName(conf);
453 try {
454 if (!admin.tableExists(tableName)) {
455 HTableDescriptor htd = new HTableDescriptor(getTableName(getConf()));
456 htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
457 int numberOfServers = admin.getClusterStatus().getServers().size();
458 if (numberOfServers == 0) {
459 throw new IllegalStateException("No live regionservers");
460 }
461 int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY,
462 HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
463 int totalNumberOfRegions = numberOfServers * regionsPerServer;
464 LOG.info("Number of live regionservers: " + numberOfServers + ", " +
465 "pre-splitting table into " + totalNumberOfRegions + " regions " +
466 "(default regions per server: " + regionsPerServer + ")");
467
468 byte[][] splits = new RegionSplitter.UniformSplit().split(
469 totalNumberOfRegions);
470
471 admin.createTable(htd, splits);
472 }
473 } catch (MasterNotRunningException e) {
474 LOG.error("Master not running", e);
475 throw new IOException(e);
476 } finally {
477 admin.close();
478 }
479 }
480
481 public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
482 Integer width, Integer wrapMuplitplier) throws Exception {
483 LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
484 + ", numNodes=" + numNodes);
485 Job job = new Job(getConf());
486
487 job.setJobName("Random Input Generator");
488 job.setNumReduceTasks(0);
489 job.setJarByClass(getClass());
490
491 job.setInputFormatClass(GeneratorInputFormat.class);
492 job.setOutputKeyClass(BytesWritable.class);
493 job.setOutputValueClass(NullWritable.class);
494
495 setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
496
497 job.setMapperClass(Mapper.class);
498
499 FileOutputFormat.setOutputPath(job, tmpOutput);
500 job.setOutputFormatClass(SequenceFileOutputFormat.class);
501
502 boolean success = jobCompletion(job);
503
504 return success ? 0 : 1;
505 }
506
507 public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
508 Integer width, Integer wrapMuplitplier) throws Exception {
509 LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
510 createSchema();
511 Job job = new Job(getConf());
512
513 job.setJobName("Link Generator");
514 job.setNumReduceTasks(0);
515 job.setJarByClass(getClass());
516
517 FileInputFormat.setInputPaths(job, tmpOutput);
518 job.setInputFormatClass(OneFilePerMapperSFIF.class);
519 job.setOutputKeyClass(NullWritable.class);
520 job.setOutputValueClass(NullWritable.class);
521
522 setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
523
524 setMapperForGenerator(job);
525
526 job.setOutputFormatClass(NullOutputFormat.class);
527
528 job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
529 TableMapReduceUtil.addDependencyJars(job);
530 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
531 TableMapReduceUtil.initCredentials(job);
532
533 boolean success = jobCompletion(job);
534
535 return success ? 0 : 1;
536 }
537
538 protected boolean jobCompletion(Job job) throws IOException, InterruptedException,
539 ClassNotFoundException {
540 boolean success = job.waitForCompletion(true);
541 return success;
542 }
543
544 protected void setMapperForGenerator(Job job) {
545 job.setMapperClass(GeneratorMapper.class);
546 }
547
548 public int run(int numMappers, long numNodes, Path tmpOutput,
549 Integer width, Integer wrapMuplitplier) throws Exception {
550 int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
551 if (ret > 0) {
552 return ret;
553 }
554 return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
555 }
556 }
557
558
559
560
561
562 static class Verify extends Configured implements Tool {
563
564 private static final Log LOG = LogFactory.getLog(Verify.class);
565 protected static final BytesWritable DEF = new BytesWritable(NO_KEY);
566
567 protected Job job;
568
569 public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
570 private BytesWritable row = new BytesWritable();
571 private BytesWritable ref = new BytesWritable();
572
573 @Override
574 protected void map(ImmutableBytesWritable key, Result value, Context context)
575 throws IOException ,InterruptedException {
576 byte[] rowKey = key.get();
577 row.set(rowKey, 0, rowKey.length);
578 context.write(row, DEF);
579 byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
580 if (prev != null && prev.length > 0) {
581 ref.set(prev, 0, prev.length);
582 context.write(ref, row);
583 } else {
584 LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
585 }
586 }
587 }
588
589 public static enum Counts {
590 UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES
591 }
592
593 public static class VerifyReducer extends Reducer<BytesWritable,BytesWritable,Text,Text> {
594 private ArrayList<byte[]> refs = new ArrayList<byte[]>();
595
596 private AtomicInteger rows = new AtomicInteger(0);
597
598 @Override
599 public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
600 throws IOException, InterruptedException {
601
602 int defCount = 0;
603
604 refs.clear();
605 for (BytesWritable type : values) {
606 if (type.getLength() == DEF.getLength()) {
607 defCount++;
608 } else {
609 byte[] bytes = new byte[type.getLength()];
610 System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
611 refs.add(bytes);
612 }
613 }
614
615
616
617 StringBuilder refsSb = null;
618 String keyString = null;
619 if (defCount == 0 || refs.size() != 1) {
620 refsSb = new StringBuilder();
621 String comma = "";
622 for (byte[] ref : refs) {
623 refsSb.append(comma);
624 comma = ",";
625 refsSb.append(Bytes.toStringBinary(ref));
626 }
627 keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
628
629 LOG.error("Linked List error: Key = " + keyString + " References = " + refsSb.toString());
630 }
631
632 if (defCount == 0 && refs.size() > 0) {
633
634
635 context.write(new Text(keyString), new Text(refsSb.toString()));
636 context.getCounter(Counts.UNDEFINED).increment(1);
637 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
638 context.getCounter("undef", keyString).increment(1);
639 }
640 } else if (defCount > 0 && refs.size() == 0) {
641
642 context.write(new Text(keyString), new Text("none"));
643 context.getCounter(Counts.UNREFERENCED).increment(1);
644 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
645 context.getCounter("unref", keyString).increment(1);
646 }
647 } else {
648 if (refs.size() > 1) {
649 if (refsSb != null) {
650 context.write(new Text(keyString), new Text(refsSb.toString()));
651 }
652 context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
653 }
654
655 context.getCounter(Counts.REFERENCED).increment(1);
656 }
657
658 }
659 }
660
661 @Override
662 public int run(String[] args) throws Exception {
663
664 if (args.length != 2) {
665 System.out.println("Usage : " + Verify.class.getSimpleName() + " <output dir> <num reducers>");
666 return 0;
667 }
668
669 String outputDir = args[0];
670 int numReducers = Integer.parseInt(args[1]);
671
672 return run(outputDir, numReducers);
673 }
674
675 public int run(String outputDir, int numReducers) throws Exception {
676 return run(new Path(outputDir), numReducers);
677 }
678
679 public int run(Path outputDir, int numReducers) throws Exception {
680 LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers);
681
682 job = new Job(getConf());
683
684 job.setJobName("Link Verifier");
685 job.setNumReduceTasks(numReducers);
686 job.setJarByClass(getClass());
687
688 setJobScannerConf(job);
689
690 Scan scan = new Scan();
691 scan.addColumn(FAMILY_NAME, COLUMN_PREV);
692 scan.setCaching(10000);
693 scan.setCacheBlocks(false);
694
695 TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
696 VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
697 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
698
699 job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
700
701 job.setReducerClass(VerifyReducer.class);
702 job.setOutputFormatClass(TextOutputFormat.class);
703 TextOutputFormat.setOutputPath(job, outputDir);
704
705 boolean success = job.waitForCompletion(true);
706
707 return success ? 0 : 1;
708 }
709
710 @SuppressWarnings("deprecation")
711 public boolean verify(long expectedReferenced) throws Exception {
712 if (job == null) {
713 throw new IllegalStateException("You should call run() first");
714 }
715
716 Counters counters = job.getCounters();
717
718 Counter referenced = counters.findCounter(Counts.REFERENCED);
719 Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
720 Counter undefined = counters.findCounter(Counts.UNDEFINED);
721 Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
722
723 boolean success = true;
724
725 if (expectedReferenced != referenced.getValue()) {
726 LOG.error("Expected referenced count does not match with actual referenced count. " +
727 "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
728 success = false;
729 }
730
731 if (unreferenced.getValue() > 0) {
732 boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
733 LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
734 + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
735 success = false;
736 }
737
738 if (undefined.getValue() > 0) {
739 LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
740 success = false;
741 }
742
743 if (!success) {
744 handleFailure(counters);
745 }
746 return success;
747 }
748
749 protected void handleFailure(Counters counters) throws IOException {
750 Configuration conf = job.getConfiguration();
751 HConnection conn = HConnectionManager.getConnection(conf);
752 TableName tableName = getTableName(conf);
753 CounterGroup g = counters.getGroup("undef");
754 Iterator<Counter> it = g.iterator();
755 while (it.hasNext()) {
756 String keyString = it.next().getName();
757 byte[] key = Bytes.toBytes(keyString);
758 HRegionLocation loc = conn.relocateRegion(tableName, key);
759 LOG.error("undefined row " + keyString + ", " + loc);
760 }
761 g = counters.getGroup("unref");
762 it = g.iterator();
763 while (it.hasNext()) {
764 String keyString = it.next().getName();
765 byte[] key = Bytes.toBytes(keyString);
766 HRegionLocation loc = conn.relocateRegion(tableName, key);
767 LOG.error("unreferred row " + keyString + ", " + loc);
768 }
769 }
770 }
771
772
773
774
775
776 static class Loop extends Configured implements Tool {
777
778 private static final Log LOG = LogFactory.getLog(Loop.class);
779
780 IntegrationTestBigLinkedList it;
781
782 protected void runGenerator(int numMappers, long numNodes,
783 String outputDir, Integer width, Integer wrapMuplitplier) throws Exception {
784 Path outputPath = new Path(outputDir);
785 UUID uuid = UUID.randomUUID();
786 Path generatorOutput = new Path(outputPath, uuid.toString());
787
788 Generator generator = new Generator();
789 generator.setConf(getConf());
790 int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier);
791 if (retCode > 0) {
792 throw new RuntimeException("Generator failed with return code: " + retCode);
793 }
794 }
795
796 protected void runVerify(String outputDir,
797 int numReducers, long expectedNumNodes) throws Exception {
798 Path outputPath = new Path(outputDir);
799 UUID uuid = UUID.randomUUID();
800 Path iterationOutput = new Path(outputPath, uuid.toString());
801
802 Verify verify = new Verify();
803 verify.setConf(getConf());
804 int retCode = verify.run(iterationOutput, numReducers);
805 if (retCode > 0) {
806 throw new RuntimeException("Verify.run failed with return code: " + retCode);
807 }
808
809 if (!verify.verify(expectedNumNodes)) {
810 throw new RuntimeException("Verify.verify failed");
811 }
812
813 LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
814 }
815
816 @Override
817 public int run(String[] args) throws Exception {
818 if (args.length < 5) {
819 System.err.println("Usage: Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>]");
820 return 1;
821 }
822 LOG.info("Running Loop with args:" + Arrays.deepToString(args));
823
824 int numIterations = Integer.parseInt(args[0]);
825 int numMappers = Integer.parseInt(args[1]);
826 long numNodes = Long.parseLong(args[2]);
827 String outputDir = args[3];
828 int numReducers = Integer.parseInt(args[4]);
829 Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
830 Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
831
832 long expectedNumNodes = 0;
833
834 if (numIterations < 0) {
835 numIterations = Integer.MAX_VALUE;
836 }
837
838 for (int i = 0; i < numIterations; i++) {
839 LOG.info("Starting iteration = " + i);
840 runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier);
841 expectedNumNodes += numMappers * numNodes;
842
843 runVerify(outputDir, numReducers, expectedNumNodes);
844 }
845
846 return 0;
847 }
848 }
849
850
851
852
853 private static class Print extends Configured implements Tool {
854 @Override
855 public int run(String[] args) throws Exception {
856 Options options = new Options();
857 options.addOption("s", "start", true, "start key");
858 options.addOption("e", "end", true, "end key");
859 options.addOption("l", "limit", true, "number to print");
860
861 GnuParser parser = new GnuParser();
862 CommandLine cmd = null;
863 try {
864 cmd = parser.parse(options, args);
865 if (cmd.getArgs().length != 0) {
866 throw new ParseException("Command takes no arguments");
867 }
868 } catch (ParseException e) {
869 System.err.println("Failed to parse command line " + e.getMessage());
870 System.err.println();
871 HelpFormatter formatter = new HelpFormatter();
872 formatter.printHelp(getClass().getSimpleName(), options);
873 System.exit(-1);
874 }
875
876 HTable table = new HTable(getConf(), getTableName(getConf()));
877
878 Scan scan = new Scan();
879 scan.setBatch(10000);
880
881 if (cmd.hasOption("s"))
882 scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
883
884 if (cmd.hasOption("e"))
885 scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
886
887 int limit = 0;
888 if (cmd.hasOption("l"))
889 limit = Integer.parseInt(cmd.getOptionValue("l"));
890 else
891 limit = 100;
892
893 ResultScanner scanner = table.getScanner(scan);
894
895 CINode node = new CINode();
896 Result result = scanner.next();
897 int count = 0;
898 while (result != null && count++ < limit) {
899 node = getCINode(result, node);
900 System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
901 Bytes.toStringBinary(node.prev), node.count, node.client);
902 result = scanner.next();
903 }
904 scanner.close();
905 table.close();
906
907 return 0;
908 }
909 }
910
911
912
913
914 private static class Delete extends Configured implements Tool {
915 @Override
916 public int run(String[] args) throws Exception {
917 if (args.length != 1) {
918 System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
919 return 0;
920 }
921 byte[] val = Bytes.toBytesBinary(args[0]);
922
923 org.apache.hadoop.hbase.client.Delete delete
924 = new org.apache.hadoop.hbase.client.Delete(val);
925
926 HTable table = new HTable(getConf(), getTableName(getConf()));
927
928 table.delete(delete);
929 table.flushCommits();
930 table.close();
931
932 System.out.println("Delete successful");
933 return 0;
934 }
935 }
936
937
938
939
940 private static class Walker extends Configured implements Tool {
941 @Override
942 public int run(String[] args) throws IOException {
943 Options options = new Options();
944 options.addOption("n", "num", true, "number of queries");
945 options.addOption("s", "start", true, "key to start at, binary string");
946 options.addOption("l", "logevery", true, "log every N queries");
947
948 GnuParser parser = new GnuParser();
949 CommandLine cmd = null;
950 try {
951 cmd = parser.parse(options, args);
952 if (cmd.getArgs().length != 0) {
953 throw new ParseException("Command takes no arguments");
954 }
955 } catch (ParseException e) {
956 System.err.println("Failed to parse command line " + e.getMessage());
957 System.err.println();
958 HelpFormatter formatter = new HelpFormatter();
959 formatter.printHelp(getClass().getSimpleName(), options);
960 System.exit(-1);
961 }
962
963 long maxQueries = Long.MAX_VALUE;
964 if (cmd.hasOption('n')) {
965 maxQueries = Long.parseLong(cmd.getOptionValue("n"));
966 }
967 Random rand = new Random();
968 boolean isSpecificStart = cmd.hasOption('s');
969 byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
970 int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
971
972 HTable table = new HTable(getConf(), getTableName(getConf()));
973 long numQueries = 0;
974
975
976
977 while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
978 if (!isSpecificStart) {
979 startKey = new byte[ROWKEY_LENGTH];
980 rand.nextBytes(startKey);
981 }
982 CINode node = findStartNode(table, startKey);
983 if (node == null && isSpecificStart) {
984 System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
985 }
986 numQueries++;
987 while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) {
988 byte[] prev = node.prev;
989 long t1 = System.currentTimeMillis();
990 node = getNode(prev, table, node);
991 long t2 = System.currentTimeMillis();
992 if (numQueries % logEvery == 0) {
993 System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
994 }
995 numQueries++;
996 if (node == null) {
997 System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
998 } else if (node.prev.length == NO_KEY.length) {
999 System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
1000 }
1001 }
1002 }
1003
1004 table.close();
1005 return 0;
1006 }
1007
1008 private static CINode findStartNode(HTable table, byte[] startKey) throws IOException {
1009 Scan scan = new Scan();
1010 scan.setStartRow(startKey);
1011 scan.setBatch(1);
1012 scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1013
1014 long t1 = System.currentTimeMillis();
1015 ResultScanner scanner = table.getScanner(scan);
1016 Result result = scanner.next();
1017 long t2 = System.currentTimeMillis();
1018 scanner.close();
1019
1020 if ( result != null) {
1021 CINode node = getCINode(result, new CINode());
1022 System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
1023 return node;
1024 }
1025
1026 System.out.println("FSR " + (t2 - t1));
1027
1028 return null;
1029 }
1030
1031 private CINode getNode(byte[] row, HTable table, CINode node) throws IOException {
1032 Get get = new Get(row);
1033 get.addColumn(FAMILY_NAME, COLUMN_PREV);
1034 Result result = table.get(get);
1035 return getCINode(result, node);
1036 }
1037 }
1038
1039 static TableName getTableName(Configuration conf) {
1040 return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1041 }
1042
1043 private static CINode getCINode(Result result, CINode node) {
1044 node.key = Bytes.copy(result.getRow());
1045 if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
1046 node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
1047 } else {
1048 node.prev = NO_KEY;
1049 }
1050 if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
1051 node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
1052 } else {
1053 node.count = -1;
1054 }
1055 if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) {
1056 node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT));
1057 } else {
1058 node.client = "";
1059 }
1060 return node;
1061 }
1062
1063 protected IntegrationTestingUtility util;
1064
1065 @Override
1066 public void setUpCluster() throws Exception {
1067 util = getTestingUtil(getConf());
1068 boolean isDistributed = util.isDistributedCluster();
1069 util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE);
1070 if (!isDistributed) {
1071 util.startMiniMapReduceCluster();
1072 }
1073 this.setConf(util.getConfiguration());
1074 }
1075
1076 @Override
1077 public void cleanUpCluster() throws Exception {
1078 super.cleanUpCluster();
1079 if (util.isDistributedCluster()) {
1080 util.shutdownMiniMapReduceCluster();
1081 }
1082 }
1083
1084 @Test
1085 public void testContinuousIngest() throws IOException, Exception {
1086
1087 int ret = ToolRunner.run(getTestingUtil(getConf()).getConfiguration(), new Loop(),
1088 new String[] {"1", "1", "2000000",
1089 util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1"});
1090 org.junit.Assert.assertEquals(0, ret);
1091 }
1092
1093 private void usage() {
1094 System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
1095 System.err.println(" where COMMAND is one of:");
1096 System.err.println("");
1097 System.err.println(" Generator A map only job that generates data.");
1098 System.err.println(" Verify A map reduce job that looks for holes");
1099 System.err.println(" Look at the counts after running");
1100 System.err.println(" REFERENCED and UNREFERENCED are ok");
1101 System.err.println(" any UNDEFINED counts are bad. Do not");
1102 System.err.println(" run at the same time as the Generator.");
1103 System.err.println(" Walker A standalong program that starts ");
1104 System.err.println(" following a linked list and emits");
1105 System.err.println(" timing info.");
1106 System.err.println(" Print A standalone program that prints nodes");
1107 System.err.println(" in the linked list.");
1108 System.err.println(" Delete A standalone program that deletes a·");
1109 System.err.println(" single node.");
1110 System.err.println(" Loop A program to Loop through Generator and");
1111 System.err.println(" Verify steps");
1112 System.err.println("\t ");
1113 System.err.flush();
1114 }
1115
1116 @Override
1117 protected void processOptions(CommandLine cmd) {
1118 super.processOptions(cmd);
1119 String[] args = cmd.getArgs();
1120
1121 if (args.length < 1) {
1122 printUsage();
1123 throw new RuntimeException("Incorrect Number of args.");
1124 }
1125 toRun = args[0];
1126 otherArgs = Arrays.copyOfRange(args, 1, args.length);
1127 }
1128
1129 @Override
1130 public int runTestFromCommandLine() throws Exception {
1131
1132 Tool tool = null;
1133 if (toRun.equals("Generator")) {
1134 tool = new Generator();
1135 } else if (toRun.equals("Verify")) {
1136 tool = new Verify();
1137 } else if (toRun.equals("Loop")) {
1138 Loop loop = new Loop();
1139 loop.it = this;
1140 tool = loop;
1141 } else if (toRun.equals("Walker")) {
1142 tool = new Walker();
1143 } else if (toRun.equals("Print")) {
1144 tool = new Print();
1145 } else if (toRun.equals("Delete")) {
1146 tool = new Delete();
1147 } else {
1148 usage();
1149 throw new RuntimeException("Unknown arg");
1150 }
1151
1152 return ToolRunner.run(getConf(), tool, otherArgs);
1153 }
1154
1155 @Override
1156 public String getTablename() {
1157 Configuration c = getConf();
1158 return c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME);
1159 }
1160
1161 @Override
1162 protected Set<String> getColumnFamilies() {
1163 return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
1164 }
1165
1166 private static void setJobConf(Job job, int numMappers, long numNodes,
1167 Integer width, Integer wrapMultiplier) {
1168 job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
1169 job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
1170 if (width != null) {
1171 job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
1172 }
1173 if (wrapMultiplier != null) {
1174 job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
1175 }
1176 }
1177
1178 public static void setJobScannerConf(Job job) {
1179
1180 job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
1181 job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
1182 }
1183
1184 public static void main(String[] args) throws Exception {
1185 Configuration conf = HBaseConfiguration.create();
1186 IntegrationTestingUtility.setUseDistributedCluster(conf);
1187 int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
1188 System.exit(ret);
1189 }
1190 }