1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.regionserver;
18
19 import java.io.IOException;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Random;
25 import java.util.SortedSet;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ConcurrentSkipListSet;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorCompletionService;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicLong;
35
36 import org.apache.commons.cli.CommandLine;
37 import org.apache.commons.cli.CommandLineParser;
38 import org.apache.commons.cli.HelpFormatter;
39 import org.apache.commons.cli.Option;
40 import org.apache.commons.cli.OptionGroup;
41 import org.apache.commons.cli.Options;
42 import org.apache.commons.cli.ParseException;
43 import org.apache.commons.cli.PosixParser;
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.fs.FileSystem;
48 import org.apache.hadoop.fs.Path;
49 import org.apache.hadoop.hbase.Cell;
50 import org.apache.hadoop.hbase.HBaseConfiguration;
51 import org.apache.hadoop.hbase.HColumnDescriptor;
52 import org.apache.hadoop.hbase.HRegionInfo;
53 import org.apache.hadoop.hbase.HTableDescriptor;
54 import org.apache.hadoop.hbase.KeyValue;
55 import org.apache.hadoop.hbase.KeyValueUtil;
56 import org.apache.hadoop.hbase.TableName;
57 import org.apache.hadoop.hbase.client.IsolationLevel;
58 import org.apache.hadoop.hbase.client.Scan;
59 import org.apache.hadoop.hbase.io.compress.Compression;
60 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
61 import org.apache.hadoop.hbase.io.hfile.BlockCache;
62 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
63 import org.apache.hadoop.hbase.io.hfile.HFile;
64 import org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter;
65 import org.apache.hadoop.hbase.util.Bytes;
66 import org.apache.hadoop.hbase.util.LoadTestTool;
67 import org.apache.hadoop.hbase.util.MD5Hash;
68 import org.apache.hadoop.util.StringUtils;
69
70
71
72
73 public class HFileReadWriteTest {
74
75 private static final String TABLE_NAME = "MyTable";
76
77 private static enum Workload {
78 MERGE("merge", "Merge the specified HFiles", 1, Integer.MAX_VALUE),
79 RANDOM_READS("read", "Perform a random read benchmark on the given HFile",
80 1, 1);
81
82 private String option;
83 private String description;
84
85 public final int minNumInputFiles;
86 public final int maxNumInputFiles;
87
88 Workload(String option, String description, int minNumInputFiles,
89 int maxNumInputFiles) {
90 this.option = option;
91 this.description = description;
92 this.minNumInputFiles = minNumInputFiles;
93 this.maxNumInputFiles = maxNumInputFiles;
94 }
95
96 static OptionGroup getOptionGroup() {
97 OptionGroup optionGroup = new OptionGroup();
98 for (Workload w : values())
99 optionGroup.addOption(new Option(w.option, w.description));
100 return optionGroup;
101 }
102
103 private static String getOptionListStr() {
104 StringBuilder sb = new StringBuilder();
105 for (Workload w : values()) {
106 if (sb.length() > 0)
107 sb.append(", ");
108 sb.append("-" + w.option);
109 }
110 return sb.toString();
111 }
112
113 static Workload fromCmdLine(CommandLine cmdLine) {
114 for (Workload w : values()) {
115 if (cmdLine.hasOption(w.option))
116 return w;
117 }
118 LOG.error("No workload specified. Specify one of the options: " +
119 getOptionListStr());
120 return null;
121 }
122
123 public String onlyUsedFor() {
124 return ". Only used for the " + this + " workload.";
125 }
126 }
127
128 private static final String OUTPUT_DIR_OPTION = "output_dir";
129 private static final String COMPRESSION_OPTION = "compression";
130 private static final String BLOOM_FILTER_OPTION = "bloom";
131 private static final String BLOCK_SIZE_OPTION = "block_size";
132 private static final String DURATION_OPTION = "duration";
133 private static final String NUM_THREADS_OPTION = "num_threads";
134
135 private static final Log LOG = LogFactory.getLog(HFileReadWriteTest.class);
136
137 private Workload workload;
138 private FileSystem fs;
139 private Configuration conf;
140 private CacheConfig cacheConf;
141 private List<String> inputFileNames;
142 private Path outputDir;
143 private int numReadThreads;
144 private int durationSec;
145 private DataBlockEncoding dataBlockEncoding;
146
147 private BloomType bloomType = BloomType.NONE;
148 private int blockSize;
149 private Compression.Algorithm compression = Compression.Algorithm.NONE;
150
151 private byte[] firstRow, lastRow;
152
153 private AtomicLong numSeeks = new AtomicLong();
154 private AtomicLong numKV = new AtomicLong();
155 private AtomicLong totalBytes = new AtomicLong();
156
157 private byte[] family;
158
159 private long endTime = Long.MAX_VALUE;
160
161 private SortedSet<String> keysRead = new ConcurrentSkipListSet<String>();
162 private List<StoreFile> inputStoreFiles;
163
164 public HFileReadWriteTest() {
165 conf = HBaseConfiguration.create();
166 cacheConf = new CacheConfig(conf);
167 }
168
169 @SuppressWarnings("unchecked")
170 public boolean parseOptions(String args[]) {
171
172 Options options = new Options();
173 options.addOption(OUTPUT_DIR_OPTION, true, "Output directory" +
174 Workload.MERGE.onlyUsedFor());
175 options.addOption(COMPRESSION_OPTION, true, " Compression type, one of "
176 + Arrays.toString(Compression.Algorithm.values()) +
177 Workload.MERGE.onlyUsedFor());
178 options.addOption(BLOOM_FILTER_OPTION, true, "Bloom filter type, one of "
179 + Arrays.toString(BloomType.values()) +
180 Workload.MERGE.onlyUsedFor());
181 options.addOption(BLOCK_SIZE_OPTION, true, "HFile block size" +
182 Workload.MERGE.onlyUsedFor());
183 options.addOption(DURATION_OPTION, true, "The amount of time to run the " +
184 "random read workload for" + Workload.RANDOM_READS.onlyUsedFor());
185 options.addOption(NUM_THREADS_OPTION, true, "The number of random " +
186 "reader threads" + Workload.RANDOM_READS.onlyUsedFor());
187 options.addOption(NUM_THREADS_OPTION, true, "The number of random " +
188 "reader threads" + Workload.RANDOM_READS.onlyUsedFor());
189 options.addOption(LoadTestTool.OPT_DATA_BLOCK_ENCODING, true,
190 LoadTestTool.OPT_DATA_BLOCK_ENCODING_USAGE);
191 options.addOptionGroup(Workload.getOptionGroup());
192
193 if (args.length == 0) {
194 HelpFormatter formatter = new HelpFormatter();
195 formatter.printHelp(HFileReadWriteTest.class.getSimpleName(),
196 options, true);
197 return false;
198 }
199
200 CommandLineParser parser = new PosixParser();
201 CommandLine cmdLine;
202 try {
203 cmdLine = parser.parse(options, args);
204 } catch (ParseException ex) {
205 LOG.error(ex);
206 return false;
207 }
208
209 workload = Workload.fromCmdLine(cmdLine);
210 if (workload == null)
211 return false;
212
213 inputFileNames = (List<String>) cmdLine.getArgList();
214
215 if (inputFileNames.size() == 0) {
216 LOG.error("No input file names specified");
217 return false;
218 }
219
220 if (inputFileNames.size() < workload.minNumInputFiles) {
221 LOG.error("Too few input files: at least " + workload.minNumInputFiles +
222 " required");
223 return false;
224 }
225
226 if (inputFileNames.size() > workload.maxNumInputFiles) {
227 LOG.error("Too many input files: at most " + workload.minNumInputFiles +
228 " allowed");
229 return false;
230 }
231
232 if (cmdLine.hasOption(COMPRESSION_OPTION)) {
233 compression = Compression.Algorithm.valueOf(
234 cmdLine.getOptionValue(COMPRESSION_OPTION));
235 }
236
237 if (cmdLine.hasOption(BLOOM_FILTER_OPTION)) {
238 bloomType = BloomType.valueOf(cmdLine.getOptionValue(
239 BLOOM_FILTER_OPTION));
240 }
241
242 if (cmdLine.hasOption(LoadTestTool.OPT_DATA_BLOCK_ENCODING)) {
243 dataBlockEncoding = DataBlockEncoding.valueOf(
244 cmdLine.getOptionValue(LoadTestTool.OPT_DATA_BLOCK_ENCODING));
245 }
246
247 blockSize = conf.getInt("hfile.min.blocksize.size", 65536);
248 if (cmdLine.hasOption(BLOCK_SIZE_OPTION))
249 blockSize = Integer.valueOf(cmdLine.getOptionValue(BLOCK_SIZE_OPTION));
250
251 if (workload == Workload.MERGE) {
252 String outputDirStr = cmdLine.getOptionValue(OUTPUT_DIR_OPTION);
253 if (outputDirStr == null) {
254 LOG.error("Output directory is not specified");
255 return false;
256 }
257 outputDir = new Path(outputDirStr);
258
259 }
260
261 if (workload == Workload.RANDOM_READS) {
262 if (!requireOptions(cmdLine, new String[] { DURATION_OPTION,
263 NUM_THREADS_OPTION })) {
264 return false;
265 }
266
267 durationSec = Integer.parseInt(cmdLine.getOptionValue(DURATION_OPTION));
268 numReadThreads = Integer.parseInt(
269 cmdLine.getOptionValue(NUM_THREADS_OPTION));
270 }
271
272 Collections.sort(inputFileNames);
273
274 return true;
275 }
276
277
278 private boolean requireOptions(CommandLine cmdLine,
279 String[] requiredOptions) {
280 for (String option : requiredOptions)
281 if (!cmdLine.hasOption(option)) {
282 LOG.error("Required option -" + option + " not specified");
283 return false;
284 }
285 return true;
286 }
287
288 public boolean validateConfiguration() throws IOException {
289 fs = FileSystem.get(conf);
290
291 for (String inputFileName : inputFileNames) {
292 Path path = new Path(inputFileName);
293 if (!fs.exists(path)) {
294 LOG.error("File " + inputFileName + " does not exist");
295 return false;
296 }
297
298 if (fs.getFileStatus(path).isDir()) {
299 LOG.error(inputFileName + " is a directory");
300 return false;
301 }
302 }
303
304 if (outputDir != null &&
305 (!fs.exists(outputDir) || !fs.getFileStatus(outputDir).isDir())) {
306 LOG.error(outputDir.toString() + " does not exist or is not a " +
307 "directory");
308 return false;
309 }
310
311 return true;
312 }
313
314 public void runMergeWorkload() throws IOException {
315 long maxKeyCount = prepareForMerge();
316
317 HColumnDescriptor columnDescriptor = new HColumnDescriptor(
318 HFileReadWriteTest.class.getSimpleName());
319 columnDescriptor.setBlocksize(blockSize);
320 columnDescriptor.setBloomFilterType(bloomType);
321 columnDescriptor.setCompressionType(compression);
322 columnDescriptor.setDataBlockEncoding(dataBlockEncoding);
323 HRegionInfo regionInfo = new HRegionInfo();
324 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
325 HRegion region = new HRegion(outputDir, null, fs, conf, regionInfo, htd, null);
326 HStore store = new HStore(region, columnDescriptor, conf);
327
328 List<StoreFileScanner> scanners =
329 StoreFileScanner.getScannersForStoreFiles(inputStoreFiles, false,
330 false, region.getReadpoint(IsolationLevel.READ_COMMITTED));
331
332 StoreFile.Writer writer = store.createWriterInTmp(maxKeyCount, compression, false, true, false);
333
334 StatisticsPrinter statsPrinter = new StatisticsPrinter();
335 statsPrinter.startThread();
336
337 try {
338 performMerge(scanners, store, writer);
339 writer.close();
340 } finally {
341 statsPrinter.requestStop();
342 }
343
344 Path resultPath = writer.getPath();
345
346 resultPath = tryUsingSimpleOutputPath(resultPath);
347
348 long fileSize = fs.getFileStatus(resultPath).getLen();
349 LOG.info("Created " + resultPath + ", size " + fileSize);
350
351 System.out.println();
352 System.out.println("HFile information for " + resultPath);
353 System.out.println();
354
355 HFilePrettyPrinter hfpp = new HFilePrettyPrinter();
356 hfpp.run(new String[] { "-m", "-f", resultPath.toString() });
357 }
358
359 private Path tryUsingSimpleOutputPath(Path resultPath) throws IOException {
360 if (inputFileNames.size() == 1) {
361
362
363
364 Path inputPath = new Path(inputFileNames.get(0));
365 Path betterOutputPath = new Path(outputDir,
366 inputPath.getName());
367 if (!fs.exists(betterOutputPath)) {
368 fs.rename(resultPath, betterOutputPath);
369 resultPath = betterOutputPath;
370 }
371 }
372 return resultPath;
373 }
374
375 private void performMerge(List<StoreFileScanner> scanners, HStore store,
376 StoreFile.Writer writer) throws IOException {
377 InternalScanner scanner = null;
378 try {
379 Scan scan = new Scan();
380
381
382 scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners,
383 ScanType.COMPACT_DROP_DELETES, Long.MIN_VALUE, Long.MIN_VALUE);
384
385 ArrayList<Cell> kvs = new ArrayList<Cell>();
386
387 while (scanner.next(kvs) || kvs.size() != 0) {
388 numKV.addAndGet(kvs.size());
389 for (Cell c : kvs) {
390 KeyValue kv = KeyValueUtil.ensureKeyValue(c);
391 totalBytes.addAndGet(kv.getLength());
392 writer.append(kv);
393 }
394 kvs.clear();
395 }
396 } finally {
397 if (scanner != null)
398 scanner.close();
399 }
400 }
401
402
403
404
405
406 private long prepareForMerge() throws IOException {
407 LOG.info("Merging " + inputFileNames);
408 LOG.info("Using block size: " + blockSize);
409 inputStoreFiles = new ArrayList<StoreFile>();
410
411 long maxKeyCount = 0;
412 for (String fileName : inputFileNames) {
413 Path filePath = new Path(fileName);
414
415
416 StoreFile sf = openStoreFile(filePath, false);
417 sf.createReader();
418 inputStoreFiles.add(sf);
419
420 StoreFile.Reader r = sf.getReader();
421 if (r != null) {
422 long keyCount = r.getFilterEntries();
423 maxKeyCount += keyCount;
424 LOG.info("Compacting: " + sf + "; keyCount = " + keyCount
425 + "; Bloom Type = " + r.getBloomFilterType().toString()
426 + "; Size = " + StringUtils.humanReadableInt(r.length()));
427 }
428 }
429 return maxKeyCount;
430 }
431
432 public HFile.Reader[] getHFileReaders() {
433 HFile.Reader readers[] = new HFile.Reader[inputStoreFiles.size()];
434 for (int i = 0; i < inputStoreFiles.size(); ++i)
435 readers[i] = inputStoreFiles.get(i).getReader().getHFileReader();
436 return readers;
437 }
438
439 private StoreFile openStoreFile(Path filePath, boolean blockCache)
440 throws IOException {
441
442
443 return new StoreFile(fs, filePath, conf, cacheConf,
444 BloomType.ROWCOL);
445 }
446
447 public static int charToHex(int c) {
448 if ('0' <= c && c <= '9')
449 return c - '0';
450 if ('a' <= c && c <= 'f')
451 return 10 + c - 'a';
452 return -1;
453 }
454
455 public static int hexToChar(int h) {
456 h &= 0xff;
457 if (0 <= h && h <= 9)
458 return '0' + h;
459 if (10 <= h && h <= 15)
460 return 'a' + h - 10;
461 return -1;
462 }
463
464 public static byte[] createRandomRow(Random rand, byte[] first, byte[] last)
465 {
466 int resultLen = Math.max(first.length, last.length);
467 int minLen = Math.min(first.length, last.length);
468 byte[] result = new byte[resultLen];
469 boolean greaterThanFirst = false;
470 boolean lessThanLast = false;
471
472 for (int i = 0; i < resultLen; ++i) {
473
474
475 boolean isHex = i < minLen && charToHex(first[i]) != -1
476 && charToHex(last[i]) != -1;
477
478
479
480 int low = greaterThanFirst || i >= first.length ? 0 : first[i] & 0xff;
481
482
483
484 int high = lessThanLast || i >= last.length ? 0xff : last[i] & 0xff;
485
486
487
488
489
490
491 int r;
492 if (isHex) {
493
494 if (low < '0')
495 low = '0';
496
497 if (high > 'f')
498 high = 'f';
499
500 int lowHex = charToHex(low);
501 int highHex = charToHex(high);
502 r = hexToChar(lowHex + rand.nextInt(highHex - lowHex + 1));
503 } else {
504 r = low + rand.nextInt(high - low + 1);
505 }
506
507 if (r > low)
508 greaterThanFirst = true;
509
510 if (r < high)
511 lessThanLast = true;
512
513 result[i] = (byte) r;
514 }
515
516 if (Bytes.compareTo(result, first) < 0) {
517 throw new IllegalStateException("Generated key " +
518 Bytes.toStringBinary(result) + " is less than the first key " +
519 Bytes.toStringBinary(first));
520 }
521
522 if (Bytes.compareTo(result, last) > 0) {
523 throw new IllegalStateException("Generated key " +
524 Bytes.toStringBinary(result) + " is greater than te last key " +
525 Bytes.toStringBinary(last));
526 }
527
528 return result;
529 }
530
531 private static byte[] createRandomQualifier(Random rand) {
532 byte[] q = new byte[10 + rand.nextInt(30)];
533 rand.nextBytes(q);
534 return q;
535 }
536
537 private class RandomReader implements Callable<Boolean> {
538
539 private int readerId;
540 private StoreFile.Reader reader;
541 private boolean pread;
542
543 public RandomReader(int readerId, StoreFile.Reader reader,
544 boolean pread)
545 {
546 this.readerId = readerId;
547 this.reader = reader;
548 this.pread = pread;
549 }
550
551 @Override
552 public Boolean call() throws Exception {
553 Thread.currentThread().setName("reader " + readerId);
554 Random rand = new Random();
555 StoreFileScanner scanner = reader.getStoreFileScanner(true, pread);
556
557 while (System.currentTimeMillis() < endTime) {
558 byte[] row = createRandomRow(rand, firstRow, lastRow);
559 KeyValue kvToSeek = new KeyValue(row, family,
560 createRandomQualifier(rand));
561 if (rand.nextDouble() < 0.0001) {
562 LOG.info("kvToSeek=" + kvToSeek);
563 }
564 boolean seekResult;
565 try {
566 seekResult = scanner.seek(kvToSeek);
567 } catch (IOException ex) {
568 throw new IOException("Seek failed for key " + kvToSeek + ", pread="
569 + pread, ex);
570 }
571 numSeeks.incrementAndGet();
572 if (!seekResult) {
573 error("Seek returned false for row " + Bytes.toStringBinary(row));
574 return false;
575 }
576 for (int i = 0; i < rand.nextInt(10) + 1; ++i) {
577 KeyValue kv = scanner.next();
578 numKV.incrementAndGet();
579 if (i == 0 && kv == null) {
580 error("scanner.next() returned null at the first iteration for " +
581 "row " + Bytes.toStringBinary(row));
582 return false;
583 }
584 if (kv == null)
585 break;
586
587 String keyHashStr = MD5Hash.getMD5AsHex(kv.getKey());
588 keysRead.add(keyHashStr);
589 totalBytes.addAndGet(kv.getLength());
590 }
591 }
592
593 return true;
594 }
595
596 private void error(String msg) {
597 LOG.error("error in reader " + readerId + " (pread=" + pread + "): "
598 + msg);
599 }
600
601 }
602
603 private class StatisticsPrinter implements Callable<Boolean> {
604
605 private volatile boolean stopRequested;
606 private volatile Thread thread;
607 private long totalSeekAndReads, totalPositionalReads;
608
609
610
611
612 public void startThread() {
613 new Thread() {
614 @Override
615 public void run() {
616 try {
617 call();
618 } catch (Exception e) {
619 LOG.error(e);
620 }
621 }
622 }.start();
623 }
624
625 @Override
626 public Boolean call() throws Exception {
627 LOG.info("Starting statistics printer");
628 thread = Thread.currentThread();
629 thread.setName(StatisticsPrinter.class.getSimpleName());
630 long startTime = System.currentTimeMillis();
631 long curTime;
632 while ((curTime = System.currentTimeMillis()) < endTime &&
633 !stopRequested) {
634 long elapsedTime = curTime - startTime;
635 printStats(elapsedTime);
636 try {
637 Thread.sleep(1000 - elapsedTime % 1000);
638 } catch (InterruptedException iex) {
639 Thread.currentThread().interrupt();
640 if (stopRequested)
641 break;
642 }
643 }
644 printStats(curTime - startTime);
645 LOG.info("Stopping statistics printer");
646 return true;
647 }
648
649 private void printStats(long elapsedTime) {
650 long numSeeksL = numSeeks.get();
651 double timeSec = elapsedTime / 1000.0;
652 double seekPerSec = numSeeksL / timeSec;
653 long kvCount = numKV.get();
654 double kvPerSec = kvCount / timeSec;
655 long bytes = totalBytes.get();
656 double bytesPerSec = bytes / timeSec;
657
658
659
660
661
662 totalSeekAndReads += HFile.getReadOps();
663 totalPositionalReads += HFile.getPreadOps();
664 long totalBlocksRead = totalSeekAndReads + totalPositionalReads;
665
666 double blkReadPerSec = totalBlocksRead / timeSec;
667
668 double seekReadPerSec = totalSeekAndReads / timeSec;
669 double preadPerSec = totalPositionalReads / timeSec;
670
671 boolean isRead = workload == Workload.RANDOM_READS;
672
673 StringBuilder sb = new StringBuilder();
674 sb.append("Time: " + (long) timeSec + " sec");
675 if (isRead)
676 sb.append(", seek/sec: " + (long) seekPerSec);
677 sb.append(", kv/sec: " + (long) kvPerSec);
678 sb.append(", bytes/sec: " + (long) bytesPerSec);
679 sb.append(", blk/sec: " + (long) blkReadPerSec);
680 sb.append(", total KV: " + numKV);
681 sb.append(", total bytes: " + totalBytes);
682 sb.append(", total blk: " + totalBlocksRead);
683
684 sb.append(", seekRead/sec: " + (long) seekReadPerSec);
685 sb.append(", pread/sec: " + (long) preadPerSec);
686
687 if (isRead)
688 sb.append(", unique keys: " + (long) keysRead.size());
689
690 LOG.info(sb.toString());
691 }
692
693 public void requestStop() {
694 stopRequested = true;
695 if (thread != null)
696 thread.interrupt();
697 }
698
699 }
700
701 public boolean runRandomReadWorkload() throws IOException {
702 if (inputFileNames.size() != 1) {
703 throw new IOException("Need exactly one input file for random reads: " +
704 inputFileNames);
705 }
706
707 Path inputPath = new Path(inputFileNames.get(0));
708
709
710 StoreFile storeFile = openStoreFile(inputPath, true);
711
712 StoreFile.Reader reader = storeFile.createReader();
713
714 LOG.info("First key: " + Bytes.toStringBinary(reader.getFirstKey()));
715 LOG.info("Last key: " + Bytes.toStringBinary(reader.getLastKey()));
716
717 KeyValue firstKV = KeyValue.createKeyValueFromKey(reader.getFirstKey());
718 firstRow = firstKV.getRow();
719
720 KeyValue lastKV = KeyValue.createKeyValueFromKey(reader.getLastKey());
721 lastRow = lastKV.getRow();
722
723 byte[] family = firstKV.getFamily();
724 if (!Bytes.equals(family, lastKV.getFamily())) {
725 LOG.error("First and last key have different families: "
726 + Bytes.toStringBinary(family) + " and "
727 + Bytes.toStringBinary(lastKV.getFamily()));
728 return false;
729 }
730
731 if (Bytes.equals(firstRow, lastRow)) {
732 LOG.error("First and last row are the same, cannot run read workload: " +
733 "firstRow=" + Bytes.toStringBinary(firstRow) + ", " +
734 "lastRow=" + Bytes.toStringBinary(lastRow));
735 return false;
736 }
737
738 ExecutorService exec = Executors.newFixedThreadPool(numReadThreads + 1);
739 int numCompleted = 0;
740 int numFailed = 0;
741 try {
742 ExecutorCompletionService<Boolean> ecs =
743 new ExecutorCompletionService<Boolean>(exec);
744 endTime = System.currentTimeMillis() + 1000 * durationSec;
745 boolean pread = true;
746 for (int i = 0; i < numReadThreads; ++i)
747 ecs.submit(new RandomReader(i, reader, pread));
748 ecs.submit(new StatisticsPrinter());
749 Future<Boolean> result;
750 while (true) {
751 try {
752 result = ecs.poll(endTime + 1000 - System.currentTimeMillis(),
753 TimeUnit.MILLISECONDS);
754 if (result == null)
755 break;
756 try {
757 if (result.get()) {
758 ++numCompleted;
759 } else {
760 ++numFailed;
761 }
762 } catch (ExecutionException e) {
763 LOG.error("Worker thread failure", e.getCause());
764 ++numFailed;
765 }
766 } catch (InterruptedException ex) {
767 LOG.error("Interrupted after " + numCompleted +
768 " workers completed");
769 Thread.currentThread().interrupt();
770 continue;
771 }
772
773 }
774 } finally {
775 storeFile.closeReader(true);
776 exec.shutdown();
777
778 BlockCache c = cacheConf.getBlockCache();
779 if (c != null) {
780 c.shutdown();
781 }
782 }
783 LOG.info("Worker threads completed: " + numCompleted);
784 LOG.info("Worker threads failed: " + numFailed);
785 return true;
786 }
787
788 public boolean run() throws IOException {
789 LOG.info("Workload: " + workload);
790 switch (workload) {
791 case MERGE:
792 runMergeWorkload();
793 break;
794 case RANDOM_READS:
795 return runRandomReadWorkload();
796 default:
797 LOG.error("Unknown workload: " + workload);
798 return false;
799 }
800
801 return true;
802 }
803
804 private static void failure() {
805 System.exit(1);
806 }
807
808 public static void main(String[] args) {
809 HFileReadWriteTest app = new HFileReadWriteTest();
810 if (!app.parseOptions(args))
811 failure();
812
813 try {
814 if (!app.validateConfiguration() ||
815 !app.run())
816 failure();
817 } catch (IOException ex) {
818 LOG.error(ex);
819 failure();
820 }
821 }
822
823 }