View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.regionserver;
18  
19  import java.io.IOException;
20  import java.util.ArrayList;
21  import java.util.Arrays;
22  import java.util.Collections;
23  import java.util.List;
24  import java.util.Random;
25  import java.util.SortedSet;
26  import java.util.concurrent.Callable;
27  import java.util.concurrent.ConcurrentSkipListSet;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.ExecutorCompletionService;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicLong;
35  
36  import org.apache.commons.cli.CommandLine;
37  import org.apache.commons.cli.CommandLineParser;
38  import org.apache.commons.cli.HelpFormatter;
39  import org.apache.commons.cli.Option;
40  import org.apache.commons.cli.OptionGroup;
41  import org.apache.commons.cli.Options;
42  import org.apache.commons.cli.ParseException;
43  import org.apache.commons.cli.PosixParser;
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.hbase.Cell;
50  import org.apache.hadoop.hbase.HBaseConfiguration;
51  import org.apache.hadoop.hbase.HColumnDescriptor;
52  import org.apache.hadoop.hbase.HRegionInfo;
53  import org.apache.hadoop.hbase.HTableDescriptor;
54  import org.apache.hadoop.hbase.KeyValue;
55  import org.apache.hadoop.hbase.KeyValueUtil;
56  import org.apache.hadoop.hbase.TableName;
57  import org.apache.hadoop.hbase.client.IsolationLevel;
58  import org.apache.hadoop.hbase.client.Scan;
59  import org.apache.hadoop.hbase.io.compress.Compression;
60  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
61  import org.apache.hadoop.hbase.io.hfile.BlockCache;
62  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
63  import org.apache.hadoop.hbase.io.hfile.HFile;
64  import org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter;
65  import org.apache.hadoop.hbase.util.Bytes;
66  import org.apache.hadoop.hbase.util.LoadTestTool;
67  import org.apache.hadoop.hbase.util.MD5Hash;
68  import org.apache.hadoop.util.StringUtils;
69  
70  /**
71   * Tests HFile read/write workloads, such as merging HFiles and random reads.
72   */
73  public class HFileReadWriteTest {
74  
75    private static final String TABLE_NAME = "MyTable";
76  
77    private static enum Workload {
78      MERGE("merge", "Merge the specified HFiles", 1, Integer.MAX_VALUE),
79      RANDOM_READS("read", "Perform a random read benchmark on the given HFile",
80          1, 1);
81  
82      private String option;
83      private String description;
84  
85      public final int minNumInputFiles;
86      public final int maxNumInputFiles;
87  
88      Workload(String option, String description, int minNumInputFiles,
89          int maxNumInputFiles) {
90        this.option = option;
91        this.description = description;
92        this.minNumInputFiles = minNumInputFiles;
93        this.maxNumInputFiles = maxNumInputFiles;
94      }
95  
96      static OptionGroup getOptionGroup() {
97        OptionGroup optionGroup = new OptionGroup();
98        for (Workload w : values())
99          optionGroup.addOption(new Option(w.option, w.description));
100       return optionGroup;
101     }
102 
103     private static String getOptionListStr() {
104       StringBuilder sb = new StringBuilder();
105       for (Workload w : values()) {
106         if (sb.length() > 0)
107           sb.append(", ");
108         sb.append("-" + w.option);
109       }
110       return sb.toString();
111     }
112 
113     static Workload fromCmdLine(CommandLine cmdLine) {
114       for (Workload w : values()) {
115         if (cmdLine.hasOption(w.option))
116           return w;
117       }
118       LOG.error("No workload specified. Specify one of the options: " +
119           getOptionListStr());
120       return null;
121     }
122 
123     public String onlyUsedFor() {
124       return ". Only used for the " + this + " workload.";
125     }
126   }
127 
128   private static final String OUTPUT_DIR_OPTION = "output_dir";
129   private static final String COMPRESSION_OPTION = "compression";
130   private static final String BLOOM_FILTER_OPTION = "bloom";
131   private static final String BLOCK_SIZE_OPTION = "block_size";
132   private static final String DURATION_OPTION = "duration";
133   private static final String NUM_THREADS_OPTION = "num_threads";
134 
135   private static final Log LOG = LogFactory.getLog(HFileReadWriteTest.class);
136 
137   private Workload workload;
138   private FileSystem fs;
139   private Configuration conf;
140   private CacheConfig cacheConf;
141   private List<String> inputFileNames;
142   private Path outputDir;
143   private int numReadThreads;
144   private int durationSec;
145   private DataBlockEncoding dataBlockEncoding;
146 
147   private BloomType bloomType = BloomType.NONE;
148   private int blockSize;
149   private Compression.Algorithm compression = Compression.Algorithm.NONE;
150 
151   private byte[] firstRow, lastRow;
152 
153   private AtomicLong numSeeks = new AtomicLong();
154   private AtomicLong numKV = new AtomicLong();
155   private AtomicLong totalBytes = new AtomicLong();
156 
157   private byte[] family;
158 
159   private long endTime = Long.MAX_VALUE;
160 
161   private SortedSet<String> keysRead = new ConcurrentSkipListSet<String>();
162   private List<StoreFile> inputStoreFiles;
163 
164   public HFileReadWriteTest() {
165     conf = HBaseConfiguration.create();
166     cacheConf = new CacheConfig(conf);
167   }
168 
169   @SuppressWarnings("unchecked")
170   public boolean parseOptions(String args[]) {
171 
172     Options options = new Options();
173     options.addOption(OUTPUT_DIR_OPTION, true, "Output directory" +
174         Workload.MERGE.onlyUsedFor());
175     options.addOption(COMPRESSION_OPTION, true, " Compression type, one of "
176         + Arrays.toString(Compression.Algorithm.values()) +
177         Workload.MERGE.onlyUsedFor());
178     options.addOption(BLOOM_FILTER_OPTION, true, "Bloom filter type, one of "
179         + Arrays.toString(BloomType.values()) +
180         Workload.MERGE.onlyUsedFor());
181     options.addOption(BLOCK_SIZE_OPTION, true, "HFile block size" +
182         Workload.MERGE.onlyUsedFor());
183     options.addOption(DURATION_OPTION, true, "The amount of time to run the " +
184         "random read workload for" + Workload.RANDOM_READS.onlyUsedFor());
185     options.addOption(NUM_THREADS_OPTION, true, "The number of random " +
186         "reader threads" + Workload.RANDOM_READS.onlyUsedFor());
187     options.addOption(NUM_THREADS_OPTION, true, "The number of random " +
188         "reader threads" + Workload.RANDOM_READS.onlyUsedFor());
189     options.addOption(LoadTestTool.OPT_DATA_BLOCK_ENCODING, true,
190         LoadTestTool.OPT_DATA_BLOCK_ENCODING_USAGE);
191     options.addOptionGroup(Workload.getOptionGroup());
192 
193     if (args.length == 0) {
194       HelpFormatter formatter = new HelpFormatter();
195       formatter.printHelp(HFileReadWriteTest.class.getSimpleName(),
196           options, true);
197       return false;
198     }
199 
200     CommandLineParser parser = new PosixParser();
201     CommandLine cmdLine;
202     try {
203       cmdLine = parser.parse(options, args);
204     } catch (ParseException ex) {
205       LOG.error(ex);
206       return false;
207     }
208 
209     workload = Workload.fromCmdLine(cmdLine);
210     if (workload == null)
211       return false;
212 
213     inputFileNames = (List<String>) cmdLine.getArgList();
214 
215     if (inputFileNames.size() == 0) {
216       LOG.error("No input file names specified");
217       return false;
218     }
219 
220     if (inputFileNames.size() < workload.minNumInputFiles) {
221       LOG.error("Too few input files: at least " + workload.minNumInputFiles +
222           " required");
223       return false;
224     }
225 
226     if (inputFileNames.size() > workload.maxNumInputFiles) {
227       LOG.error("Too many input files: at most " + workload.minNumInputFiles +
228           " allowed");
229       return false;
230     }
231 
232     if (cmdLine.hasOption(COMPRESSION_OPTION)) {
233       compression = Compression.Algorithm.valueOf(
234           cmdLine.getOptionValue(COMPRESSION_OPTION));
235     }
236 
237     if (cmdLine.hasOption(BLOOM_FILTER_OPTION)) {
238       bloomType = BloomType.valueOf(cmdLine.getOptionValue(
239           BLOOM_FILTER_OPTION));
240     }
241 
242     if (cmdLine.hasOption(LoadTestTool.OPT_DATA_BLOCK_ENCODING)) {
243       dataBlockEncoding = DataBlockEncoding.valueOf(
244           cmdLine.getOptionValue(LoadTestTool.OPT_DATA_BLOCK_ENCODING));
245     }
246 
247     blockSize = conf.getInt("hfile.min.blocksize.size", 65536);
248     if (cmdLine.hasOption(BLOCK_SIZE_OPTION))
249       blockSize = Integer.valueOf(cmdLine.getOptionValue(BLOCK_SIZE_OPTION));
250 
251     if (workload == Workload.MERGE) {
252       String outputDirStr = cmdLine.getOptionValue(OUTPUT_DIR_OPTION);
253       if (outputDirStr == null) {
254         LOG.error("Output directory is not specified");
255         return false;
256       }
257       outputDir = new Path(outputDirStr);
258       // Will be checked for existence in validateConfiguration.
259     }
260 
261     if (workload == Workload.RANDOM_READS) {
262       if (!requireOptions(cmdLine, new String[] { DURATION_OPTION,
263           NUM_THREADS_OPTION })) {
264         return false;
265       }
266 
267       durationSec = Integer.parseInt(cmdLine.getOptionValue(DURATION_OPTION));
268       numReadThreads = Integer.parseInt(
269           cmdLine.getOptionValue(NUM_THREADS_OPTION));
270     }
271 
272     Collections.sort(inputFileNames);
273 
274     return true;
275   }
276 
277   /** @return true if all the given options are specified */
278   private boolean requireOptions(CommandLine cmdLine,
279       String[] requiredOptions) {
280     for (String option : requiredOptions)
281       if (!cmdLine.hasOption(option)) {
282         LOG.error("Required option -" + option + " not specified");
283         return false;
284       }
285     return true;
286   }
287 
288   public boolean validateConfiguration() throws IOException {
289     fs = FileSystem.get(conf);
290 
291     for (String inputFileName : inputFileNames) {
292       Path path = new Path(inputFileName);
293       if (!fs.exists(path)) {
294         LOG.error("File " + inputFileName + " does not exist");
295         return false;
296       }
297 
298       if (fs.getFileStatus(path).isDir()) {
299         LOG.error(inputFileName + " is a directory");
300         return false;
301       }
302     }
303 
304     if (outputDir != null &&
305         (!fs.exists(outputDir) || !fs.getFileStatus(outputDir).isDir())) {
306       LOG.error(outputDir.toString() + " does not exist or is not a " +
307           "directory");
308       return false;
309     }
310 
311     return true;
312   }
313 
314   public void runMergeWorkload() throws IOException {
315     long maxKeyCount = prepareForMerge();
316 
317     HColumnDescriptor columnDescriptor = new HColumnDescriptor(
318         HFileReadWriteTest.class.getSimpleName());
319     columnDescriptor.setBlocksize(blockSize);
320     columnDescriptor.setBloomFilterType(bloomType);
321     columnDescriptor.setCompressionType(compression);
322     columnDescriptor.setDataBlockEncoding(dataBlockEncoding);
323     HRegionInfo regionInfo = new HRegionInfo();
324     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
325     HRegion region = new HRegion(outputDir, null, fs, conf, regionInfo, htd, null);
326     HStore store = new HStore(region, columnDescriptor, conf);
327 
328     List<StoreFileScanner> scanners =
329         StoreFileScanner.getScannersForStoreFiles(inputStoreFiles, false,
330             false, region.getReadpoint(IsolationLevel.READ_COMMITTED));
331 
332     StoreFile.Writer writer = store.createWriterInTmp(maxKeyCount, compression, false, true, false);
333 
334     StatisticsPrinter statsPrinter = new StatisticsPrinter();
335     statsPrinter.startThread();
336 
337     try {
338       performMerge(scanners, store, writer);
339       writer.close();
340     } finally {
341       statsPrinter.requestStop();
342     }
343 
344     Path resultPath = writer.getPath();
345 
346     resultPath = tryUsingSimpleOutputPath(resultPath);
347 
348     long fileSize = fs.getFileStatus(resultPath).getLen();
349     LOG.info("Created " + resultPath + ", size " + fileSize);
350 
351     System.out.println();
352     System.out.println("HFile information for " + resultPath);
353     System.out.println();
354 
355     HFilePrettyPrinter hfpp = new HFilePrettyPrinter();
356     hfpp.run(new String[] { "-m", "-f", resultPath.toString() });
357   }
358 
359   private Path tryUsingSimpleOutputPath(Path resultPath) throws IOException {
360     if (inputFileNames.size() == 1) {
361       // In case of only one input set output to be consistent with the
362       // input name.
363 
364       Path inputPath = new Path(inputFileNames.get(0));
365       Path betterOutputPath = new Path(outputDir,
366           inputPath.getName());
367       if (!fs.exists(betterOutputPath)) {
368         fs.rename(resultPath, betterOutputPath);
369         resultPath = betterOutputPath;
370       }
371     }
372     return resultPath;
373   }
374 
375   private void performMerge(List<StoreFileScanner> scanners, HStore store,
376       StoreFile.Writer writer) throws IOException {
377     InternalScanner scanner = null;
378     try {
379       Scan scan = new Scan();
380 
381       // Include deletes
382       scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners,
383           ScanType.COMPACT_DROP_DELETES, Long.MIN_VALUE, Long.MIN_VALUE);
384 
385       ArrayList<Cell> kvs = new ArrayList<Cell>();
386 
387       while (scanner.next(kvs) || kvs.size() != 0) {
388         numKV.addAndGet(kvs.size());
389         for (Cell c : kvs) {
390           KeyValue kv = KeyValueUtil.ensureKeyValue(c);
391           totalBytes.addAndGet(kv.getLength());
392           writer.append(kv);
393         }
394         kvs.clear();
395       }
396     } finally {
397       if (scanner != null)
398         scanner.close();
399     }
400   }
401 
402   /**
403    * @return the total key count in the files being merged
404    * @throws IOException
405    */
406   private long prepareForMerge() throws IOException {
407     LOG.info("Merging " + inputFileNames);
408     LOG.info("Using block size: " + blockSize);
409     inputStoreFiles = new ArrayList<StoreFile>();
410 
411     long maxKeyCount = 0;
412     for (String fileName : inputFileNames) {
413       Path filePath = new Path(fileName);
414 
415       // Open without caching.
416       StoreFile sf = openStoreFile(filePath, false);
417       sf.createReader();
418       inputStoreFiles.add(sf);
419 
420       StoreFile.Reader r = sf.getReader();
421       if (r != null) {
422         long keyCount = r.getFilterEntries();
423         maxKeyCount += keyCount;
424         LOG.info("Compacting: " + sf + "; keyCount = " + keyCount
425             + "; Bloom Type = " + r.getBloomFilterType().toString()
426             + "; Size = " + StringUtils.humanReadableInt(r.length()));
427       }
428     }
429     return maxKeyCount;
430   }
431 
432   public HFile.Reader[] getHFileReaders() {
433     HFile.Reader readers[] = new HFile.Reader[inputStoreFiles.size()];
434     for (int i = 0; i < inputStoreFiles.size(); ++i)
435       readers[i] = inputStoreFiles.get(i).getReader().getHFileReader();
436     return readers;
437   }
438 
439   private StoreFile openStoreFile(Path filePath, boolean blockCache)
440       throws IOException {
441     // We are passing the ROWCOL Bloom filter type, but StoreFile will still
442     // use the Bloom filter type specified in the HFile.
443     return new StoreFile(fs, filePath, conf, cacheConf,
444       BloomType.ROWCOL);
445   }
446 
447   public static int charToHex(int c) {
448     if ('0' <= c && c <= '9')
449       return c - '0';
450     if ('a' <= c && c <= 'f')
451       return 10 + c - 'a';
452     return -1;
453   }
454 
455   public static int hexToChar(int h) {
456     h &= 0xff;
457     if (0 <= h && h <= 9)
458       return '0' + h;
459     if (10 <= h && h <= 15)
460       return 'a' + h - 10;
461     return -1;
462   }
463 
464   public static byte[] createRandomRow(Random rand, byte[] first, byte[] last)
465   {
466     int resultLen = Math.max(first.length, last.length);
467     int minLen = Math.min(first.length, last.length);
468     byte[] result = new byte[resultLen];
469     boolean greaterThanFirst = false;
470     boolean lessThanLast = false;
471 
472     for (int i = 0; i < resultLen; ++i) {
473       // Generate random hex characters if both first and last row are hex
474       // at this position.
475       boolean isHex = i < minLen && charToHex(first[i]) != -1
476           && charToHex(last[i]) != -1;
477 
478       // If our key is already greater than the first key, we can use
479       // arbitrarily low values.
480       int low = greaterThanFirst || i >= first.length ? 0 : first[i] & 0xff;
481 
482       // If our key is already less than the last key, we can use arbitrarily
483       // high values.
484       int high = lessThanLast || i >= last.length ? 0xff : last[i] & 0xff;
485 
486       // Randomly select the next byte between the lowest and the highest
487       // value allowed for this position. Restrict to hex characters if
488       // necessary. We are generally biased towards border cases, which is OK
489       // for test.
490 
491       int r;
492       if (isHex) {
493         // Use hex chars.
494         if (low < '0')
495           low = '0';
496 
497         if (high > 'f')
498           high = 'f';
499 
500         int lowHex = charToHex(low);
501         int highHex = charToHex(high);
502         r = hexToChar(lowHex + rand.nextInt(highHex - lowHex + 1));
503       } else {
504         r = low + rand.nextInt(high - low + 1);
505       }
506 
507       if (r > low)
508         greaterThanFirst = true;
509 
510       if (r < high)
511         lessThanLast = true;
512 
513       result[i] = (byte) r;
514     }
515 
516     if (Bytes.compareTo(result, first) < 0) {
517       throw new IllegalStateException("Generated key " +
518           Bytes.toStringBinary(result) + " is less than the first key " +
519           Bytes.toStringBinary(first));
520     }
521 
522     if (Bytes.compareTo(result, last) > 0) {
523       throw new IllegalStateException("Generated key " +
524           Bytes.toStringBinary(result) + " is greater than te last key " +
525           Bytes.toStringBinary(last));
526     }
527 
528     return result;
529   }
530 
531   private static byte[] createRandomQualifier(Random rand) {
532     byte[] q = new byte[10 + rand.nextInt(30)];
533     rand.nextBytes(q);
534     return q;
535   }
536 
537   private class RandomReader implements Callable<Boolean> {
538 
539     private int readerId;
540     private StoreFile.Reader reader;
541     private boolean pread;
542 
543     public RandomReader(int readerId, StoreFile.Reader reader,
544         boolean pread)
545     {
546       this.readerId = readerId;
547       this.reader = reader;
548       this.pread = pread;
549     }
550 
551     @Override
552     public Boolean call() throws Exception {
553       Thread.currentThread().setName("reader " + readerId);
554       Random rand = new Random();
555       StoreFileScanner scanner = reader.getStoreFileScanner(true, pread);
556 
557       while (System.currentTimeMillis() < endTime) {
558         byte[] row = createRandomRow(rand, firstRow, lastRow);
559         KeyValue kvToSeek = new KeyValue(row, family,
560             createRandomQualifier(rand));
561         if (rand.nextDouble() < 0.0001) {
562           LOG.info("kvToSeek=" + kvToSeek);
563         }
564         boolean seekResult;
565         try {
566           seekResult = scanner.seek(kvToSeek);
567         } catch (IOException ex) {
568           throw new IOException("Seek failed for key " + kvToSeek + ", pread="
569               + pread, ex);
570         }
571         numSeeks.incrementAndGet();
572         if (!seekResult) {
573           error("Seek returned false for row " + Bytes.toStringBinary(row));
574           return false;
575         }
576         for (int i = 0; i < rand.nextInt(10) + 1; ++i) {
577           KeyValue kv = scanner.next();
578           numKV.incrementAndGet();
579           if (i == 0 && kv == null) {
580             error("scanner.next() returned null at the first iteration for " +
581                 "row " + Bytes.toStringBinary(row));
582             return false;
583           }
584           if (kv == null)
585             break;
586 
587           String keyHashStr = MD5Hash.getMD5AsHex(kv.getKey());
588           keysRead.add(keyHashStr);
589           totalBytes.addAndGet(kv.getLength());
590         }
591       }
592 
593       return true;
594     }
595 
596     private void error(String msg) {
597       LOG.error("error in reader " + readerId + " (pread=" + pread + "): "
598           + msg);
599     }
600 
601   }
602 
603   private class StatisticsPrinter implements Callable<Boolean> {
604 
605     private volatile boolean stopRequested;
606     private volatile Thread thread;
607     private long totalSeekAndReads, totalPositionalReads;
608 
609     /**
610      * Run the statistics collector in a separate thread without an executor.
611      */
612     public void startThread() {
613       new Thread() {
614         @Override
615         public void run() {
616           try {
617             call();
618           } catch (Exception e) {
619             LOG.error(e);
620           }
621         }
622       }.start();
623     }
624 
625     @Override
626     public Boolean call() throws Exception {
627       LOG.info("Starting statistics printer");
628       thread = Thread.currentThread();
629       thread.setName(StatisticsPrinter.class.getSimpleName());
630       long startTime = System.currentTimeMillis();
631       long curTime;
632       while ((curTime = System.currentTimeMillis()) < endTime &&
633           !stopRequested) {
634         long elapsedTime = curTime - startTime;
635         printStats(elapsedTime);
636         try {
637           Thread.sleep(1000 - elapsedTime % 1000);
638         } catch (InterruptedException iex) {
639           Thread.currentThread().interrupt();
640           if (stopRequested)
641             break;
642         }
643       }
644       printStats(curTime - startTime);
645       LOG.info("Stopping statistics printer");
646       return true;
647     }
648 
649     private void printStats(long elapsedTime) {
650       long numSeeksL = numSeeks.get();
651       double timeSec = elapsedTime / 1000.0;
652       double seekPerSec = numSeeksL / timeSec;
653       long kvCount = numKV.get();
654       double kvPerSec = kvCount / timeSec;
655       long bytes = totalBytes.get();
656       double bytesPerSec = bytes / timeSec;
657 
658       // readOps and preadOps counters get reset on access, so we have to
659       // accumulate them here. HRegion metrics publishing thread should not
660       // be running in this tool, so no one else should be resetting these
661       // metrics.
662       totalSeekAndReads += HFile.getReadOps();
663       totalPositionalReads += HFile.getPreadOps();
664       long totalBlocksRead = totalSeekAndReads + totalPositionalReads;
665 
666       double blkReadPerSec = totalBlocksRead / timeSec;
667 
668       double seekReadPerSec = totalSeekAndReads / timeSec;
669       double preadPerSec = totalPositionalReads / timeSec;
670 
671       boolean isRead = workload == Workload.RANDOM_READS;
672 
673       StringBuilder sb = new StringBuilder();
674       sb.append("Time: " +  (long) timeSec + " sec");
675       if (isRead)
676         sb.append(", seek/sec: " + (long) seekPerSec);
677       sb.append(", kv/sec: " + (long) kvPerSec);
678       sb.append(", bytes/sec: " + (long) bytesPerSec);
679       sb.append(", blk/sec: " + (long) blkReadPerSec);
680       sb.append(", total KV: " + numKV);
681       sb.append(", total bytes: " + totalBytes);
682       sb.append(", total blk: " + totalBlocksRead);
683 
684       sb.append(", seekRead/sec: " + (long) seekReadPerSec);
685       sb.append(", pread/sec: " + (long) preadPerSec);
686 
687       if (isRead)
688         sb.append(", unique keys: " + (long) keysRead.size());
689 
690       LOG.info(sb.toString());
691     }
692 
693     public void requestStop() {
694       stopRequested = true;
695       if (thread != null)
696         thread.interrupt();
697     }
698 
699   }
700 
701   public boolean runRandomReadWorkload() throws IOException {
702     if (inputFileNames.size() != 1) {
703       throw new IOException("Need exactly one input file for random reads: " +
704           inputFileNames);
705     }
706 
707     Path inputPath = new Path(inputFileNames.get(0));
708 
709     // Make sure we are using caching.
710     StoreFile storeFile = openStoreFile(inputPath, true);
711 
712     StoreFile.Reader reader = storeFile.createReader();
713 
714     LOG.info("First key: " + Bytes.toStringBinary(reader.getFirstKey()));
715     LOG.info("Last key: " + Bytes.toStringBinary(reader.getLastKey()));
716 
717     KeyValue firstKV = KeyValue.createKeyValueFromKey(reader.getFirstKey());
718     firstRow = firstKV.getRow();
719 
720     KeyValue lastKV = KeyValue.createKeyValueFromKey(reader.getLastKey());
721     lastRow = lastKV.getRow();
722 
723     byte[] family = firstKV.getFamily();
724     if (!Bytes.equals(family, lastKV.getFamily())) {
725       LOG.error("First and last key have different families: "
726           + Bytes.toStringBinary(family) + " and "
727           + Bytes.toStringBinary(lastKV.getFamily()));
728       return false;
729     }
730 
731     if (Bytes.equals(firstRow, lastRow)) {
732       LOG.error("First and last row are the same, cannot run read workload: " +
733           "firstRow=" + Bytes.toStringBinary(firstRow) + ", " +
734           "lastRow=" + Bytes.toStringBinary(lastRow));
735       return false;
736     }
737 
738     ExecutorService exec = Executors.newFixedThreadPool(numReadThreads + 1);
739     int numCompleted = 0;
740     int numFailed = 0;
741     try {
742       ExecutorCompletionService<Boolean> ecs =
743           new ExecutorCompletionService<Boolean>(exec);
744       endTime = System.currentTimeMillis() + 1000 * durationSec;
745       boolean pread = true;
746       for (int i = 0; i < numReadThreads; ++i)
747         ecs.submit(new RandomReader(i, reader, pread));
748       ecs.submit(new StatisticsPrinter());
749       Future<Boolean> result;
750       while (true) {
751         try {
752           result = ecs.poll(endTime + 1000 - System.currentTimeMillis(),
753               TimeUnit.MILLISECONDS);
754           if (result == null)
755             break;
756           try {
757             if (result.get()) {
758               ++numCompleted;
759             } else {
760               ++numFailed;
761             }
762           } catch (ExecutionException e) {
763             LOG.error("Worker thread failure", e.getCause());
764             ++numFailed;
765           }
766         } catch (InterruptedException ex) {
767           LOG.error("Interrupted after " + numCompleted +
768               " workers completed");
769           Thread.currentThread().interrupt();
770           continue;
771         }
772 
773       }
774     } finally {
775       storeFile.closeReader(true);
776       exec.shutdown();
777 
778       BlockCache c = cacheConf.getBlockCache();
779       if (c != null) {
780         c.shutdown();
781       }
782     }
783     LOG.info("Worker threads completed: " + numCompleted);
784     LOG.info("Worker threads failed: " + numFailed);
785     return true;
786   }
787 
788   public boolean run() throws IOException {
789     LOG.info("Workload: " + workload);
790     switch (workload) {
791     case MERGE:
792       runMergeWorkload();
793       break;
794     case RANDOM_READS:
795       return runRandomReadWorkload();
796     default:
797       LOG.error("Unknown workload: " + workload);
798       return false;
799     }
800 
801     return true;
802   }
803 
804   private static void failure() {
805     System.exit(1);
806   }
807 
808   public static void main(String[] args) {
809     HFileReadWriteTest app = new HFileReadWriteTest();
810     if (!app.parseOptions(args))
811       failure();
812 
813     try {
814       if (!app.validateConfiguration() ||
815           !app.run())
816         failure();
817     } catch (IOException ex) {
818       LOG.error(ex);
819       failure();
820     }
821   }
822 
823 }