View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import java.io.IOException;
21  import java.nio.ByteBuffer;
22  import java.security.SecureRandom;
23  import java.text.DateFormat;
24  import java.text.SimpleDateFormat;
25  import java.util.Random;
26  
27  import org.apache.commons.cli.CommandLine;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.fs.FSDataInputStream;
30  import org.apache.hadoop.fs.FSDataOutputStream;
31  import org.apache.hadoop.fs.FileSystem;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.HBaseConfiguration;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.MediumTests;
38  import org.apache.hadoop.hbase.io.crypto.Encryption;
39  import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
40  import org.apache.hadoop.hbase.io.crypto.aes.AES;
41  import org.apache.hadoop.hbase.util.AbstractHBaseTool;
42  import org.apache.hadoop.io.BytesWritable;
43  import org.apache.hadoop.io.SequenceFile;
44  import org.apache.hadoop.io.compress.CompressionCodec;
45  import org.apache.hadoop.io.compress.GzipCodec;
46  import org.apache.hadoop.util.ToolRunner;
47  
48  /**
49   *  Set of long-running tests to measure performance of HFile.
50   * <p>
51   * Copied from
52   * <a href="https://issues.apache.org/jira/browse/HADOOP-3315">hadoop-3315 tfile</a>.
53   * Remove after tfile is committed and use the tfile version of this class
54   * instead.</p>
55   */
56  public class TestHFilePerformance extends AbstractHBaseTool {
57    private HBaseTestingUtility TEST_UTIL;
58    private static String ROOT_DIR;
59    private FileSystem fs;
60    private long startTimeEpoch;
61    private long finishTimeEpoch;
62    private DateFormat formatter;
63  
64    @Override
65    public void setConf(Configuration conf) {
66      super.setConf(conf);
67      try {
68        fs = FileSystem.get(conf);
69      } catch (IOException e) {
70        throw new RuntimeException(e);
71      }
72      conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
73      conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
74      formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
75      TEST_UTIL = new HBaseTestingUtility(conf);
76      ROOT_DIR = TEST_UTIL.getDataTestDir("TestHFilePerformance").toString();
77    }
78  
79    public void startTime() {
80      startTimeEpoch = System.currentTimeMillis();
81      System.out.println(formatTime() + " Started timing.");
82    }
83  
84    public void stopTime() {
85      finishTimeEpoch = System.currentTimeMillis();
86      System.out.println(formatTime() + " Stopped timing.");
87    }
88  
89    public long getIntervalMillis() {
90      return finishTimeEpoch - startTimeEpoch;
91    }
92  
93    public void printlnWithTimestamp(String message) {
94      System.out.println(formatTime() + "  " +  message);
95    }
96  
97    /*
98     * Format millis into minutes and seconds.
99     */
100   public String formatTime(long milis){
101     return formatter.format(milis);
102   }
103 
104   public String formatTime(){
105     return formatTime(System.currentTimeMillis());
106   }
107 
108   private FSDataOutputStream createFSOutput(Path name) throws IOException {
109     if (fs.exists(name))
110       fs.delete(name, true);
111     FSDataOutputStream fout = fs.create(name);
112     return fout;
113   }
114 
115   //TODO have multiple ways of generating key/value e.g. dictionary words
116   //TODO to have a sample compressable data, for now, made 1 out of 3 values random
117   //     keys are all random.
118 
119   private static class KeyValueGenerator {
120     Random keyRandomizer;
121     Random valueRandomizer;
122     long randomValueRatio = 3; // 1 out of randomValueRatio generated values will be random.
123     long valueSequence = 0 ;
124 
125 
126     KeyValueGenerator() {
127       keyRandomizer = new Random(0L); //TODO with seed zero
128       valueRandomizer = new Random(1L); //TODO with seed one
129     }
130 
131     // Key is always random now.
132     void getKey(byte[] key) {
133       keyRandomizer.nextBytes(key);
134     }
135 
136     void getValue(byte[] value) {
137       if (valueSequence % randomValueRatio == 0)
138           valueRandomizer.nextBytes(value);
139       valueSequence++;
140     }
141   }
142 
143   /**
144    *
145    * @param fileType "HFile" or "SequenceFile"
146    * @param keyLength
147    * @param valueLength
148    * @param codecName "none", "lzo", "gz", "snappy"
149    * @param cipherName "none", "aes"
150    * @param rows number of rows to be written.
151    * @param writeMethod used for HFile only.
152    * @param minBlockSize used for HFile only.
153    * @throws IOException
154    */
155    //TODO writeMethod: implement multiple ways of writing e.g. A) known length (no chunk) B) using a buffer and streaming (for many chunks).
156   public void timeWrite(String fileType, int keyLength, int valueLength,
157     String codecName, String cipherName, long rows, String writeMethod, int minBlockSize)
158   throws IOException {
159     System.out.println("File Type: " + fileType);
160     System.out.println("Writing " + fileType + " with codecName: " + codecName +
161       " cipherName: " + cipherName);
162     long totalBytesWritten = 0;
163 
164 
165     //Using separate randomizer for key/value with seeds matching Sequence File.
166     byte[] key = new byte[keyLength];
167     byte[] value = new byte[valueLength];
168     KeyValueGenerator generator = new KeyValueGenerator();
169 
170     startTime();
171 
172     Path path = new Path(ROOT_DIR, fileType + ".Performance");
173     System.out.println(ROOT_DIR + path.getName());
174     FSDataOutputStream fout =  createFSOutput(path);
175 
176     if ("HFile".equals(fileType)){
177         HFileContextBuilder builder = new HFileContextBuilder()
178 	    .withCompression(AbstractHFileWriter.compressionByName(codecName))
179 	    .withBlockSize(minBlockSize);
180         if (cipherName != "none") {
181           byte[] cipherKey = new byte[AES.KEY_LENGTH];
182           new SecureRandom().nextBytes(cipherKey);
183           builder.withEncryptionContext(
184             Encryption.newContext(conf)
185               .setCipher(Encryption.getCipher(conf, cipherName))
186               .setKey(cipherKey));
187         }
188         HFileContext context = builder.build();
189         System.out.println("HFile write method: ");
190         HFile.Writer writer = HFile.getWriterFactoryNoCache(conf)
191             .withOutputStream(fout)
192             .withFileContext(context)
193             .withComparator(new KeyValue.RawBytesComparator())
194             .create();
195 
196         // Writing value in one shot.
197         for (long l=0; l<rows; l++ ) {
198           generator.getKey(key);
199           generator.getValue(value);
200           writer.append(key, value);
201           totalBytesWritten += key.length;
202           totalBytesWritten += value.length;
203          }
204         writer.close();
205     } else if ("SequenceFile".equals(fileType)){
206         CompressionCodec codec = null;
207         if ("gz".equals(codecName))
208           codec = new GzipCodec();
209         else if (!"none".equals(codecName))
210           throw new IOException("Codec not supported.");
211 
212         SequenceFile.Writer writer;
213 
214         //TODO
215         //JobConf conf = new JobConf();
216 
217         if (!"none".equals(codecName))
218           writer = SequenceFile.createWriter(conf, fout, BytesWritable.class,
219             BytesWritable.class, SequenceFile.CompressionType.BLOCK, codec);
220         else
221           writer = SequenceFile.createWriter(conf, fout, BytesWritable.class,
222             BytesWritable.class, SequenceFile.CompressionType.NONE, null);
223 
224         BytesWritable keyBsw;
225         BytesWritable valBsw;
226         for (long l=0; l<rows; l++ ) {
227 
228            generator.getKey(key);
229            keyBsw = new BytesWritable(key);
230            totalBytesWritten += keyBsw.getSize();
231 
232            generator.getValue(value);
233            valBsw = new BytesWritable(value);
234            writer.append(keyBsw, valBsw);
235            totalBytesWritten += valBsw.getSize();
236         }
237 
238         writer.close();
239     } else
240        throw new IOException("File Type is not supported");
241 
242     fout.close();
243     stopTime();
244 
245     printlnWithTimestamp("Data written: ");
246     printlnWithTimestamp("  rate  = " +
247       totalBytesWritten / getIntervalMillis() * 1000 / 1024 / 1024 + "MB/s");
248     printlnWithTimestamp("  total = " + totalBytesWritten + "B");
249 
250     printlnWithTimestamp("File written: ");
251     printlnWithTimestamp("  rate  = " +
252       fs.getFileStatus(path).getLen() / getIntervalMillis() * 1000 / 1024 / 1024 + "MB/s");
253     printlnWithTimestamp("  total = " + fs.getFileStatus(path).getLen() + "B");
254   }
255 
256   public void timeReading(String fileType, int keyLength, int valueLength,
257       long rows, int method) throws IOException {
258     System.out.println("Reading file of type: " + fileType);
259     Path path = new Path(ROOT_DIR, fileType + ".Performance");
260     System.out.println("Input file size: " + fs.getFileStatus(path).getLen());
261     long totalBytesRead = 0;
262 
263 
264     ByteBuffer val;
265 
266     ByteBuffer key;
267 
268     startTime();
269     FSDataInputStream fin = fs.open(path);
270 
271     if ("HFile".equals(fileType)){
272         HFile.Reader reader = HFile.createReaderFromStream(path, fs.open(path),
273           fs.getFileStatus(path).getLen(), new CacheConfig(conf), conf);
274         reader.loadFileInfo();
275         switch (method) {
276 
277           case 0:
278           case 1:
279           default:
280             {
281               HFileScanner scanner = reader.getScanner(false, false);
282               scanner.seekTo();
283               for (long l=0; l<rows; l++ ) {
284                 key = scanner.getKey();
285                 val = scanner.getValue();
286                 totalBytesRead += key.limit() + val.limit();
287                 scanner.next();
288               }
289             }
290             break;
291         }
292       reader.close();
293     } else if("SequenceFile".equals(fileType)){
294 
295         SequenceFile.Reader reader;
296         reader = new SequenceFile.Reader(fs, path, new Configuration());
297 
298         if (reader.getCompressionCodec() != null) {
299             printlnWithTimestamp("Compression codec class: " + reader.getCompressionCodec().getClass());
300         } else
301             printlnWithTimestamp("Compression codec class: " + "none");
302 
303         BytesWritable keyBsw = new BytesWritable();
304         BytesWritable valBsw = new BytesWritable();
305 
306         for (long l=0; l<rows; l++ ) {
307           reader.next(keyBsw, valBsw);
308           totalBytesRead += keyBsw.getSize() + valBsw.getSize();
309         }
310         reader.close();
311 
312         //TODO make a tests for other types of SequenceFile reading scenarios
313 
314     } else {
315         throw new IOException("File Type not supported.");
316     }
317 
318 
319     //printlnWithTimestamp("Closing reader");
320     fin.close();
321     stopTime();
322     //printlnWithTimestamp("Finished close");
323 
324     printlnWithTimestamp("Finished in " + getIntervalMillis() + "ms");
325     printlnWithTimestamp("Data read: ");
326     printlnWithTimestamp("  rate  = " +
327       totalBytesRead / getIntervalMillis() * 1000 / 1024 / 1024 + "MB/s");
328     printlnWithTimestamp("  total = " + totalBytesRead + "B");
329 
330     printlnWithTimestamp("File read: ");
331     printlnWithTimestamp("  rate  = " +
332       fs.getFileStatus(path).getLen() / getIntervalMillis() * 1000 / 1024 / 1024 + "MB/s");
333     printlnWithTimestamp("  total = " + fs.getFileStatus(path).getLen() + "B");
334 
335     //TODO uncomment this for final committing so test files is removed.
336     //fs.delete(path, true);
337   }
338 
339   public void testRunComparisons() throws IOException {
340 
341     int keyLength = 100; // 100B
342     int valueLength = 5*1024; // 5KB
343     int minBlockSize = 10*1024*1024; // 10MB
344     int rows = 10000;
345 
346     System.out.println("****************************** Sequence File *****************************");
347 
348     timeWrite("SequenceFile", keyLength, valueLength, "none", "none", rows, null, minBlockSize);
349     System.out.println("\n+++++++\n");
350     timeReading("SequenceFile", keyLength, valueLength, rows, -1);
351 
352     System.out.println("");
353     System.out.println("----------------------");
354     System.out.println("");
355 
356     /* DISABLED LZO
357     timeWrite("SequenceFile", keyLength, valueLength, "lzo", rows, null, minBlockSize);
358     System.out.println("\n+++++++\n");
359     timeReading("SequenceFile", keyLength, valueLength, rows, -1);
360 
361     System.out.println("");
362     System.out.println("----------------------");
363     System.out.println("");
364 
365     /* Sequence file can only use native hadoop libs gzipping so commenting out.
366      */
367     try {
368       timeWrite("SequenceFile", keyLength, valueLength, "gz", "none", rows, null,
369         minBlockSize);
370       System.out.println("\n+++++++\n");
371       timeReading("SequenceFile", keyLength, valueLength, rows, -1);
372     } catch (IllegalArgumentException e) {
373       System.out.println("Skipping sequencefile gz: " + e.getMessage());
374     }
375 
376 
377     System.out.println("\n\n\n");
378     System.out.println("****************************** HFile *****************************");
379 
380     timeWrite("HFile", keyLength, valueLength, "none", "none", rows, null, minBlockSize);
381     System.out.println("\n+++++++\n");
382     timeReading("HFile", keyLength, valueLength, rows, 0 );
383 
384     System.out.println("");
385     System.out.println("----------------------");
386     System.out.println("");
387 
388     timeWrite("HFile", keyLength, valueLength, "none", "aes", rows, null, minBlockSize);
389     System.out.println("\n+++++++\n");
390     timeReading("HFile", keyLength, valueLength, rows, 0 );
391 
392     System.out.println("");
393     System.out.println("----------------------");
394     System.out.println("");
395 
396 /* DISABLED LZO
397     timeWrite("HFile", keyLength, valueLength, "lzo", rows, null, minBlockSize);
398     System.out.println("\n+++++++\n");
399     timeReading("HFile", keyLength, valueLength, rows, 0 );
400     System.out.println("\n+++++++\n");
401     timeReading("HFile", keyLength, valueLength, rows, 1 );
402     System.out.println("\n+++++++\n");
403     timeReading("HFile", keyLength, valueLength, rows, 2 );
404 
405     System.out.println("");
406     System.out.println("----------------------");
407     System.out.println("");
408 */
409 
410     timeWrite("HFile", keyLength, valueLength, "gz", "none", rows, null, minBlockSize);
411     System.out.println("\n+++++++\n");
412     timeReading("HFile", keyLength, valueLength, rows, 0 );
413 
414     System.out.println("");
415     System.out.println("----------------------");
416     System.out.println("");
417 
418     timeWrite("HFile", keyLength, valueLength, "gz", "aes", rows, null, minBlockSize);
419     System.out.println("\n+++++++\n");
420     timeReading("HFile", keyLength, valueLength, rows, 0 );
421 
422     System.out.println("\n\n\n\nNotes: ");
423     System.out.println(" * Timing includes open/closing of files.");
424     System.out.println(" * Timing includes reading both Key and Value");
425     System.out.println(" * Data is generated as random bytes. Other methods e.g. using " +
426             "dictionary with care for distributation of words is under development.");
427     System.out.println(" * Timing of write currently, includes random value/key generations. " +
428             "Which is the same for Sequence File and HFile. Another possibility is to generate " +
429             "test data beforehand");
430     System.out.println(" * We need to mitigate cache effect on benchmark. We can apply several " +
431             "ideas, for next step we do a large dummy read between benchmark read to dismantle " +
432             "caching of data. Renaming of file may be helpful. We can have a loop that reads with" +
433             " the same method several times and flood cache every time and average it to get a" +
434             " better number.");
435   }
436 
437   @Override
438   protected void addOptions() {
439   }
440 
441   @Override
442   protected void processOptions(CommandLine cmd) {
443   }
444 
445   @Override
446   protected int doWork() throws Exception {
447     testRunComparisons();
448     return 0;
449   }
450 
451   public static void main(String[] args) throws Exception {
452     int ret = ToolRunner.run(HBaseConfiguration.create(), new TestHFilePerformance(), args);
453     System.exit(ret);
454   }
455 }