View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Random;
27  import java.util.UUID;
28  import java.util.concurrent.TimeUnit;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.classification.InterfaceAudience;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.conf.Configured;
35  import org.apache.hadoop.fs.FileStatus;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.HBaseConfiguration;
40  import org.apache.hadoop.hbase.HBaseTestingUtility;
41  import org.apache.hadoop.hbase.HColumnDescriptor;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.KeyValue;
46  import org.apache.hadoop.hbase.KeyValueUtil;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.Put;
49  import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
50  import org.apache.hadoop.hbase.regionserver.HRegion;
51  import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.util.FSUtils;
54  import org.apache.hadoop.util.Tool;
55  import org.apache.hadoop.util.ToolRunner;
56  
57  import com.yammer.metrics.core.Meter;
58  import com.yammer.metrics.core.MetricsRegistry;
59  import com.yammer.metrics.reporting.ConsoleReporter;
60  
61  /**
62   * This class runs performance benchmarks for {@link HLog}.
63   * See usage for this tool by running:
64   * <code>$ hbase org.apache.hadoop.hbase.regionserver.wal.HLogPerformanceEvaluation -h</code>
65   */
66  @InterfaceAudience.Private
67  public final class HLogPerformanceEvaluation extends Configured implements Tool {
68    static final Log LOG = LogFactory.getLog(HLogPerformanceEvaluation.class.getName());
69    private final MetricsRegistry metrics = new MetricsRegistry();
70    private final Meter syncMeter =
71      metrics.newMeter(HLogPerformanceEvaluation.class, "syncMeter", "syncs", TimeUnit.MILLISECONDS);
72    private final Meter appendMeter =
73      metrics.newMeter(HLogPerformanceEvaluation.class, "append", "bytes", TimeUnit.MILLISECONDS);
74  
75    private HBaseTestingUtility TEST_UTIL;
76  
77    static final String TABLE_NAME = "HLogPerformanceEvaluation";
78    static final String QUALIFIER_PREFIX = "q";
79    static final String FAMILY_PREFIX = "cf";
80  
81    private int numQualifiers = 1;
82    private int valueSize = 512;
83    private int keySize = 16;
84  
85    @Override
86    public void setConf(Configuration conf) {
87      super.setConf(conf);
88      TEST_UTIL = new HBaseTestingUtility(conf);
89    }
90  
91    /**
92     * Perform HLog.append() of Put object, for the number of iterations requested.
93     * Keys and Vaues are generated randomly, the number of column families,
94     * qualifiers and key/value size is tunable by the user.
95     */
96    class HLogPutBenchmark implements Runnable {
97      private final long numIterations;
98      private final int numFamilies;
99      private final boolean noSync;
100     private final HRegion region;
101     private final int syncInterval;
102     private final HTableDescriptor htd;
103 
104     HLogPutBenchmark(final HRegion region, final HTableDescriptor htd,
105         final long numIterations, final boolean noSync, final int syncInterval) {
106       this.numIterations = numIterations;
107       this.noSync = noSync;
108       this.syncInterval = syncInterval;
109       this.numFamilies = htd.getColumnFamilies().length;
110       this.region = region;
111       this.htd = htd;
112     }
113 
114     @Override
115     public void run() {
116       byte[] key = new byte[keySize];
117       byte[] value = new byte[valueSize];
118       Random rand = new Random(Thread.currentThread().getId());
119       HLog hlog = region.getLog();
120       ArrayList<UUID> clusters = new ArrayList<UUID>();
121       long nonce = HConstants.NO_NONCE;
122 
123       try {
124         long startTime = System.currentTimeMillis();
125         int lastSync = 0;
126         for (int i = 0; i < numIterations; ++i) {
127           Put put = setupPut(rand, key, value, numFamilies);
128           long now = System.currentTimeMillis();
129           WALEdit walEdit = new WALEdit();
130           addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit);
131           HRegionInfo hri = region.getRegionInfo();
132           hlog.appendNoSync(hri, hri.getTable(), walEdit, clusters, now, htd,
133             region.getSequenceId(), true, nonce, nonce);
134           if (!this.noSync) {
135             if (++lastSync >= this.syncInterval) {
136               hlog.sync();
137               lastSync = 0;
138             }
139           }
140         }
141         long totalTime = (System.currentTimeMillis() - startTime);
142         logBenchmarkResult(Thread.currentThread().getName(), numIterations, totalTime);
143       } catch (Exception e) {
144         LOG.error(getClass().getSimpleName() + " Thread failed", e);
145       }
146     }
147   }
148 
149   @Override
150   public int run(String[] args) throws Exception {
151     Path rootRegionDir = null;
152     int numThreads = 1;
153     long numIterations = 1000000;
154     int numFamilies = 1;
155     int syncInterval = 0;
156     boolean noSync = false;
157     boolean verify = false;
158     boolean verbose = false;
159     boolean cleanup = true;
160     boolean noclosefs = false;
161     long roll = Long.MAX_VALUE;
162     boolean compress = false;
163     String cipher = null;
164     // Process command line args
165     for (int i = 0; i < args.length; i++) {
166       String cmd = args[i];
167       try {
168         if (cmd.equals("-threads")) {
169           numThreads = Integer.parseInt(args[++i]);
170         } else if (cmd.equals("-iterations")) {
171           numIterations = Long.parseLong(args[++i]);
172         } else if (cmd.equals("-path")) {
173           rootRegionDir = new Path(args[++i]);
174         } else if (cmd.equals("-families")) {
175           numFamilies = Integer.parseInt(args[++i]);
176         } else if (cmd.equals("-qualifiers")) {
177           numQualifiers = Integer.parseInt(args[++i]);
178         } else if (cmd.equals("-keySize")) {
179           keySize = Integer.parseInt(args[++i]);
180         } else if (cmd.equals("-valueSize")) {
181           valueSize = Integer.parseInt(args[++i]);
182         } else if (cmd.equals("-syncInterval")) {
183           syncInterval = Integer.parseInt(args[++i]);
184         } else if (cmd.equals("-nosync")) {
185           noSync = true;
186         } else if (cmd.equals("-verify")) {
187           verify = true;
188         } else if (cmd.equals("-verbose")) {
189           verbose = true;
190         } else if (cmd.equals("-nocleanup")) {
191           cleanup = false;
192         } else if (cmd.equals("-noclosefs")) {
193           noclosefs = true;
194         } else if (cmd.equals("-roll")) {
195           roll = Long.parseLong(args[++i]);
196         } else if (cmd.equals("-compress")) {
197           compress = true;
198         } else if (cmd.equals("-encryption")) {
199           cipher = args[++i];
200         } else if (cmd.equals("-h")) {
201           printUsageAndExit();
202         } else if (cmd.equals("--help")) {
203           printUsageAndExit();
204         } else {
205           System.err.println("UNEXPECTED: " + cmd);
206           printUsageAndExit();
207         }
208       } catch (Exception e) {
209         printUsageAndExit();
210       }
211     }
212 
213     if (compress) {
214       Configuration conf = getConf();
215       conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
216     }
217 
218     if (cipher != null) {
219       // Set up HLog for encryption
220       Configuration conf = getConf();
221       conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
222       conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
223       conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
224         HLog.Reader.class);
225       conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
226         HLog.Writer.class);
227       conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
228       conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher);
229     }
230 
231     // Run HLog Performance Evaluation
232     // First set the fs from configs.  In case we are on hadoop1
233     FSUtils.setFsDefault(getConf(), FSUtils.getRootDir(getConf()));
234     FileSystem fs = FileSystem.get(getConf());
235     LOG.info("FileSystem: " + fs);
236     try {
237       if (rootRegionDir == null) {
238         rootRegionDir = TEST_UTIL.getDataTestDirOnTestFS("HLogPerformanceEvaluation");
239       }
240       rootRegionDir = rootRegionDir.makeQualified(fs);
241       cleanRegionRootDir(fs, rootRegionDir);
242       // Initialize Table Descriptor
243       HTableDescriptor htd = createHTableDescriptor(numFamilies);
244       final long whenToRoll = roll;
245       HLog hlog = new FSHLog(fs, rootRegionDir, "wals", getConf()) {
246         int appends = 0;
247         @Override
248         protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
249             HTableDescriptor htd)
250         throws IOException {
251           this.appends++;
252           if (this.appends % whenToRoll == 0) {
253             LOG.info("Rolling after " + appends + " edits");
254             rollWriter();
255           }
256           super.doWrite(info, logKey, logEdit, htd);
257         };
258 
259         @Override
260         public void postSync() {
261           super.postSync();
262           syncMeter.mark();
263         }
264 
265         @Override
266         public void postAppend(List<Entry> entries) {
267           super.postAppend(entries);
268           int size = 0;
269           for (Entry e: entries) size += e.getEdit().heapSize();
270           appendMeter.mark(size);
271         }
272       };
273       hlog.rollWriter();
274       HRegion region = null;
275       try {
276         region = openRegion(fs, rootRegionDir, htd, hlog);
277         ConsoleReporter.enable(this.metrics, 1, TimeUnit.SECONDS);
278         long putTime =
279           runBenchmark(new HLogPutBenchmark(region, htd, numIterations, noSync, syncInterval),
280             numThreads);
281         logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" + numIterations +
282           ", syncInterval=" + syncInterval, numIterations * numThreads, putTime);
283         
284         if (region != null) {
285           closeRegion(region);
286           region = null;
287         }
288         if (verify) {
289           Path dir = ((FSHLog) hlog).getDir();
290           long editCount = 0;
291           FileStatus [] fsss = fs.listStatus(dir);
292           if (fsss.length == 0) throw new IllegalStateException("No WAL found");
293           for (FileStatus fss: fsss) {
294             Path p = fss.getPath();
295             if (!fs.exists(p)) throw new IllegalStateException(p.toString());
296             editCount += verify(p, verbose);
297           }
298           long expected = numIterations * numThreads;
299           if (editCount != expected) {
300             throw new IllegalStateException("Counted=" + editCount + ", expected=" + expected);
301           }
302         }
303       } finally {
304         if (region != null) closeRegion(region);
305         // Remove the root dir for this test region
306         if (cleanup) cleanRegionRootDir(fs, rootRegionDir);
307       }
308     } finally {
309       // We may be called inside a test that wants to keep on using the fs.
310       if (!noclosefs) fs.close();
311     }
312 
313     return(0);
314   }
315 
316   private static HTableDescriptor createHTableDescriptor(final int numFamilies) {
317     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
318     for (int i = 0; i < numFamilies; ++i) {
319       HColumnDescriptor colDef = new HColumnDescriptor(FAMILY_PREFIX + i);
320       htd.addFamily(colDef);
321     }
322     return htd;
323   }
324 
325   /**
326    * Verify the content of the WAL file.
327    * Verify that the file has expected number of edits.
328    * @param wal
329    * @return Count of edits.
330    * @throws IOException
331    */
332   private long verify(final Path wal, final boolean verbose) throws IOException {
333     HLog.Reader reader = HLogFactory.createReader(wal.getFileSystem(getConf()), wal, getConf());
334     long count = 0;
335     Map<String, Long> sequenceIds = new HashMap<String, Long>();
336     try {
337       while (true) {
338         Entry e = reader.next();
339         if (e == null) {
340           LOG.debug("Read count=" + count + " from " + wal);
341           break;
342         }
343         count++;
344         long seqid = e.getKey().getLogSeqNum();
345         if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) {
346           // sequenceIds should be increasing for every regions
347           if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) {
348             throw new IllegalStateException("wal = " + wal.getName() + ", " + "previous seqid = "
349                 + sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName()))
350                 + ", current seqid = " + seqid);
351           }
352         }
353         // update the sequence Id.
354         sequenceIds.put(Bytes.toString(e.getKey().getEncodedRegionName()), seqid);
355         if (verbose) LOG.info("seqid=" + seqid);
356       }
357     } finally {
358       reader.close();
359     }
360     return count;
361   }
362 
363   private static void logBenchmarkResult(String testName, long numTests, long totalTime) {
364     float tsec = totalTime / 1000.0f;
365     LOG.info(String.format("%s took %.3fs %.3fops/s", testName, tsec, numTests / tsec));
366     
367   }
368 
369   private void printUsageAndExit() {
370     System.err.printf("Usage: bin/hbase %s [options]\n", getClass().getName());
371     System.err.println(" where [options] are:");
372     System.err.println("  -h|-help         Show this help and exit.");
373     System.err.println("  -threads <N>     Number of threads writing on the WAL.");
374     System.err.println("  -iterations <N>  Number of iterations per thread.");
375     System.err.println("  -path <PATH>     Path where region's root directory is created.");
376     System.err.println("  -families <N>    Number of column families to write.");
377     System.err.println("  -qualifiers <N>  Number of qualifiers to write.");
378     System.err.println("  -keySize <N>     Row key size in byte.");
379     System.err.println("  -valueSize <N>   Row/Col value size in byte.");
380     System.err.println("  -nocleanup       Do NOT remove test data when done.");
381     System.err.println("  -noclosefs       Do NOT close the filesystem when done.");
382     System.err.println("  -nosync          Append without syncing");
383     System.err.println("  -syncInterval <N> Append N edits and then sync. Default=0, i.e. sync every edit.");
384     System.err.println("  -verify          Verify edits written in sequence");
385     System.err.println("  -verbose         Output extra info; e.g. all edit seq ids when verifying");
386     System.err.println("  -roll <N>        Roll the way every N appends");
387     System.err.println("  -encryption <A>  Encrypt the WAL with algorithm A, e.g. AES");
388     System.err.println("");
389     System.err.println("Examples:");
390     System.err.println("");
391     System.err.println(" To run 100 threads on hdfs with log rolling every 10k edits and verification afterward do:");
392     System.err.println(" $ ./bin/hbase org.apache.hadoop.hbase.regionserver.wal.HLogPerformanceEvaluation \\");
393     System.err.println("    -conf ./core-site.xml -path hdfs://example.org:7000/tmp -threads 100 -roll 10000 -verify");
394     System.exit(1);
395   }
396 
397   private HRegion openRegion(final FileSystem fs, final Path dir, final HTableDescriptor htd, final HLog hlog)
398   throws IOException {
399     // Initialize HRegion
400     HRegionInfo regionInfo = new HRegionInfo(htd.getTableName());
401     return HRegion.createHRegion(regionInfo, dir, getConf(), htd, hlog);
402   }
403 
404   private void closeRegion(final HRegion region) throws IOException {
405     if (region != null) {
406       region.close();
407       HLog wal = region.getLog();
408       if (wal != null) wal.close();
409     }
410   }
411 
412   private void cleanRegionRootDir(final FileSystem fs, final Path dir) throws IOException {
413     if (fs.exists(dir)) {
414       fs.delete(dir, true);
415     }
416   }
417 
418   private Put setupPut(Random rand, byte[] key, byte[] value, final int numFamilies) {
419     rand.nextBytes(key);
420     Put put = new Put(key);
421     for (int cf = 0; cf < numFamilies; ++cf) {
422       for (int q = 0; q < numQualifiers; ++q) {
423         rand.nextBytes(value);
424         put.add(Bytes.toBytes(FAMILY_PREFIX + cf), Bytes.toBytes(QUALIFIER_PREFIX + q), value);
425       }
426     }
427     return put;
428   }
429 
430   private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
431       WALEdit walEdit) {
432     for (List<Cell> edits : familyMap.values()) {
433       for (Cell cell : edits) {
434         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
435         walEdit.add(kv);
436       }
437     }
438   }
439 
440   private long runBenchmark(Runnable runnable, final int numThreads) throws InterruptedException {
441     Thread[] threads = new Thread[numThreads];
442     long startTime = System.currentTimeMillis();
443     for (int i = 0; i < numThreads; ++i) {
444       threads[i] = new Thread(runnable, "t" + i);
445       threads[i].start();
446     }
447     for (Thread t : threads) t.join();
448     long endTime = System.currentTimeMillis();
449     return(endTime - startTime);
450   }
451 
452   /**
453    * The guts of the {@link #main} method.
454    * Call this method to avoid the {@link #main(String[])} System.exit.
455    * @param args
456    * @return errCode
457    * @throws Exception
458    */
459   static int innerMain(final Configuration c, final String [] args) throws Exception {
460     return ToolRunner.run(c, new HLogPerformanceEvaluation(), args);
461   }
462 
463   public static void main(String[] args) throws Exception {
464      System.exit(innerMain(HBaseConfiguration.create(), args));
465   }
466 }