1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io.hfile;
19
20 import java.io.IOException;
21 import java.nio.ByteBuffer;
22 import java.security.SecureRandom;
23 import java.text.DateFormat;
24 import java.text.SimpleDateFormat;
25 import java.util.Random;
26
27 import org.apache.commons.cli.CommandLine;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.FSDataInputStream;
30 import org.apache.hadoop.fs.FSDataOutputStream;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.HBaseConfiguration;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.KeyValue;
37 import org.apache.hadoop.hbase.MediumTests;
38 import org.apache.hadoop.hbase.io.crypto.Encryption;
39 import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
40 import org.apache.hadoop.hbase.io.crypto.aes.AES;
41 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
42 import org.apache.hadoop.io.BytesWritable;
43 import org.apache.hadoop.io.SequenceFile;
44 import org.apache.hadoop.io.compress.CompressionCodec;
45 import org.apache.hadoop.io.compress.GzipCodec;
46 import org.apache.hadoop.util.ToolRunner;
47
48
49
50
51
52
53
54
55
56 public class TestHFilePerformance extends AbstractHBaseTool {
57 private HBaseTestingUtility TEST_UTIL;
58 private static String ROOT_DIR;
59 private FileSystem fs;
60 private long startTimeEpoch;
61 private long finishTimeEpoch;
62 private DateFormat formatter;
63
64 @Override
65 public void setConf(Configuration conf) {
66 super.setConf(conf);
67 try {
68 fs = FileSystem.get(conf);
69 } catch (IOException e) {
70 throw new RuntimeException(e);
71 }
72 conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
73 conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
74 formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
75 TEST_UTIL = new HBaseTestingUtility(conf);
76 ROOT_DIR = TEST_UTIL.getDataTestDir("TestHFilePerformance").toString();
77 }
78
79 public void startTime() {
80 startTimeEpoch = System.currentTimeMillis();
81 System.out.println(formatTime() + " Started timing.");
82 }
83
84 public void stopTime() {
85 finishTimeEpoch = System.currentTimeMillis();
86 System.out.println(formatTime() + " Stopped timing.");
87 }
88
89 public long getIntervalMillis() {
90 return finishTimeEpoch - startTimeEpoch;
91 }
92
93 public void printlnWithTimestamp(String message) {
94 System.out.println(formatTime() + " " + message);
95 }
96
97
98
99
100 public String formatTime(long milis){
101 return formatter.format(milis);
102 }
103
104 public String formatTime(){
105 return formatTime(System.currentTimeMillis());
106 }
107
108 private FSDataOutputStream createFSOutput(Path name) throws IOException {
109 if (fs.exists(name))
110 fs.delete(name, true);
111 FSDataOutputStream fout = fs.create(name);
112 return fout;
113 }
114
115
116
117
118
119 private static class KeyValueGenerator {
120 Random keyRandomizer;
121 Random valueRandomizer;
122 long randomValueRatio = 3;
123 long valueSequence = 0 ;
124
125
126 KeyValueGenerator() {
127 keyRandomizer = new Random(0L);
128 valueRandomizer = new Random(1L);
129 }
130
131
132 void getKey(byte[] key) {
133 keyRandomizer.nextBytes(key);
134 }
135
136 void getValue(byte[] value) {
137 if (valueSequence % randomValueRatio == 0)
138 valueRandomizer.nextBytes(value);
139 valueSequence++;
140 }
141 }
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156 public void timeWrite(String fileType, int keyLength, int valueLength,
157 String codecName, String cipherName, long rows, String writeMethod, int minBlockSize)
158 throws IOException {
159 System.out.println("File Type: " + fileType);
160 System.out.println("Writing " + fileType + " with codecName: " + codecName +
161 " cipherName: " + cipherName);
162 long totalBytesWritten = 0;
163
164
165
166 byte[] key = new byte[keyLength];
167 byte[] value = new byte[valueLength];
168 KeyValueGenerator generator = new KeyValueGenerator();
169
170 startTime();
171
172 Path path = new Path(ROOT_DIR, fileType + ".Performance");
173 System.out.println(ROOT_DIR + path.getName());
174 FSDataOutputStream fout = createFSOutput(path);
175
176 if ("HFile".equals(fileType)){
177 HFileContextBuilder builder = new HFileContextBuilder()
178 .withCompression(AbstractHFileWriter.compressionByName(codecName))
179 .withBlockSize(minBlockSize);
180 if (cipherName != "none") {
181 byte[] cipherKey = new byte[AES.KEY_LENGTH];
182 new SecureRandom().nextBytes(cipherKey);
183 builder.withEncryptionContext(
184 Encryption.newContext(conf)
185 .setCipher(Encryption.getCipher(conf, cipherName))
186 .setKey(cipherKey));
187 }
188 HFileContext context = builder.build();
189 System.out.println("HFile write method: ");
190 HFile.Writer writer = HFile.getWriterFactoryNoCache(conf)
191 .withOutputStream(fout)
192 .withFileContext(context)
193 .withComparator(new KeyValue.RawBytesComparator())
194 .create();
195
196
197 for (long l=0; l<rows; l++ ) {
198 generator.getKey(key);
199 generator.getValue(value);
200 writer.append(key, value);
201 totalBytesWritten += key.length;
202 totalBytesWritten += value.length;
203 }
204 writer.close();
205 } else if ("SequenceFile".equals(fileType)){
206 CompressionCodec codec = null;
207 if ("gz".equals(codecName))
208 codec = new GzipCodec();
209 else if (!"none".equals(codecName))
210 throw new IOException("Codec not supported.");
211
212 SequenceFile.Writer writer;
213
214
215
216
217 if (!"none".equals(codecName))
218 writer = SequenceFile.createWriter(conf, fout, BytesWritable.class,
219 BytesWritable.class, SequenceFile.CompressionType.BLOCK, codec);
220 else
221 writer = SequenceFile.createWriter(conf, fout, BytesWritable.class,
222 BytesWritable.class, SequenceFile.CompressionType.NONE, null);
223
224 BytesWritable keyBsw;
225 BytesWritable valBsw;
226 for (long l=0; l<rows; l++ ) {
227
228 generator.getKey(key);
229 keyBsw = new BytesWritable(key);
230 totalBytesWritten += keyBsw.getSize();
231
232 generator.getValue(value);
233 valBsw = new BytesWritable(value);
234 writer.append(keyBsw, valBsw);
235 totalBytesWritten += valBsw.getSize();
236 }
237
238 writer.close();
239 } else
240 throw new IOException("File Type is not supported");
241
242 fout.close();
243 stopTime();
244
245 printlnWithTimestamp("Data written: ");
246 printlnWithTimestamp(" rate = " +
247 totalBytesWritten / getIntervalMillis() * 1000 / 1024 / 1024 + "MB/s");
248 printlnWithTimestamp(" total = " + totalBytesWritten + "B");
249
250 printlnWithTimestamp("File written: ");
251 printlnWithTimestamp(" rate = " +
252 fs.getFileStatus(path).getLen() / getIntervalMillis() * 1000 / 1024 / 1024 + "MB/s");
253 printlnWithTimestamp(" total = " + fs.getFileStatus(path).getLen() + "B");
254 }
255
256 public void timeReading(String fileType, int keyLength, int valueLength,
257 long rows, int method) throws IOException {
258 System.out.println("Reading file of type: " + fileType);
259 Path path = new Path(ROOT_DIR, fileType + ".Performance");
260 System.out.println("Input file size: " + fs.getFileStatus(path).getLen());
261 long totalBytesRead = 0;
262
263
264 ByteBuffer val;
265
266 ByteBuffer key;
267
268 startTime();
269 FSDataInputStream fin = fs.open(path);
270
271 if ("HFile".equals(fileType)){
272 HFile.Reader reader = HFile.createReaderFromStream(path, fs.open(path),
273 fs.getFileStatus(path).getLen(), new CacheConfig(conf), conf);
274 reader.loadFileInfo();
275 switch (method) {
276
277 case 0:
278 case 1:
279 default:
280 {
281 HFileScanner scanner = reader.getScanner(false, false);
282 scanner.seekTo();
283 for (long l=0; l<rows; l++ ) {
284 key = scanner.getKey();
285 val = scanner.getValue();
286 totalBytesRead += key.limit() + val.limit();
287 scanner.next();
288 }
289 }
290 break;
291 }
292 reader.close();
293 } else if("SequenceFile".equals(fileType)){
294
295 SequenceFile.Reader reader;
296 reader = new SequenceFile.Reader(fs, path, new Configuration());
297
298 if (reader.getCompressionCodec() != null) {
299 printlnWithTimestamp("Compression codec class: " + reader.getCompressionCodec().getClass());
300 } else
301 printlnWithTimestamp("Compression codec class: " + "none");
302
303 BytesWritable keyBsw = new BytesWritable();
304 BytesWritable valBsw = new BytesWritable();
305
306 for (long l=0; l<rows; l++ ) {
307 reader.next(keyBsw, valBsw);
308 totalBytesRead += keyBsw.getSize() + valBsw.getSize();
309 }
310 reader.close();
311
312
313
314 } else {
315 throw new IOException("File Type not supported.");
316 }
317
318
319
320 fin.close();
321 stopTime();
322
323
324 printlnWithTimestamp("Finished in " + getIntervalMillis() + "ms");
325 printlnWithTimestamp("Data read: ");
326 printlnWithTimestamp(" rate = " +
327 totalBytesRead / getIntervalMillis() * 1000 / 1024 / 1024 + "MB/s");
328 printlnWithTimestamp(" total = " + totalBytesRead + "B");
329
330 printlnWithTimestamp("File read: ");
331 printlnWithTimestamp(" rate = " +
332 fs.getFileStatus(path).getLen() / getIntervalMillis() * 1000 / 1024 / 1024 + "MB/s");
333 printlnWithTimestamp(" total = " + fs.getFileStatus(path).getLen() + "B");
334
335
336
337 }
338
339 public void testRunComparisons() throws IOException {
340
341 int keyLength = 100;
342 int valueLength = 5*1024;
343 int minBlockSize = 10*1024*1024;
344 int rows = 10000;
345
346 System.out.println("****************************** Sequence File *****************************");
347
348 timeWrite("SequenceFile", keyLength, valueLength, "none", "none", rows, null, minBlockSize);
349 System.out.println("\n+++++++\n");
350 timeReading("SequenceFile", keyLength, valueLength, rows, -1);
351
352 System.out.println("");
353 System.out.println("----------------------");
354 System.out.println("");
355
356
357
358
359
360
361
362
363
364
365
366
367 try {
368 timeWrite("SequenceFile", keyLength, valueLength, "gz", "none", rows, null,
369 minBlockSize);
370 System.out.println("\n+++++++\n");
371 timeReading("SequenceFile", keyLength, valueLength, rows, -1);
372 } catch (IllegalArgumentException e) {
373 System.out.println("Skipping sequencefile gz: " + e.getMessage());
374 }
375
376
377 System.out.println("\n\n\n");
378 System.out.println("****************************** HFile *****************************");
379
380 timeWrite("HFile", keyLength, valueLength, "none", "none", rows, null, minBlockSize);
381 System.out.println("\n+++++++\n");
382 timeReading("HFile", keyLength, valueLength, rows, 0 );
383
384 System.out.println("");
385 System.out.println("----------------------");
386 System.out.println("");
387
388 timeWrite("HFile", keyLength, valueLength, "none", "aes", rows, null, minBlockSize);
389 System.out.println("\n+++++++\n");
390 timeReading("HFile", keyLength, valueLength, rows, 0 );
391
392 System.out.println("");
393 System.out.println("----------------------");
394 System.out.println("");
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410 timeWrite("HFile", keyLength, valueLength, "gz", "none", rows, null, minBlockSize);
411 System.out.println("\n+++++++\n");
412 timeReading("HFile", keyLength, valueLength, rows, 0 );
413
414 System.out.println("");
415 System.out.println("----------------------");
416 System.out.println("");
417
418 timeWrite("HFile", keyLength, valueLength, "gz", "aes", rows, null, minBlockSize);
419 System.out.println("\n+++++++\n");
420 timeReading("HFile", keyLength, valueLength, rows, 0 );
421
422 System.out.println("\n\n\n\nNotes: ");
423 System.out.println(" * Timing includes open/closing of files.");
424 System.out.println(" * Timing includes reading both Key and Value");
425 System.out.println(" * Data is generated as random bytes. Other methods e.g. using " +
426 "dictionary with care for distributation of words is under development.");
427 System.out.println(" * Timing of write currently, includes random value/key generations. " +
428 "Which is the same for Sequence File and HFile. Another possibility is to generate " +
429 "test data beforehand");
430 System.out.println(" * We need to mitigate cache effect on benchmark. We can apply several " +
431 "ideas, for next step we do a large dummy read between benchmark read to dismantle " +
432 "caching of data. Renaming of file may be helpful. We can have a loop that reads with" +
433 " the same method several times and flood cache every time and average it to get a" +
434 " better number.");
435 }
436
437 @Override
438 protected void addOptions() {
439 }
440
441 @Override
442 protected void processOptions(CommandLine cmd) {
443 }
444
445 @Override
446 protected int doWork() throws Exception {
447 testRunComparisons();
448 return 0;
449 }
450
451 public static void main(String[] args) throws Exception {
452 int ret = ToolRunner.run(HBaseConfiguration.create(), new TestHFilePerformance(), args);
453 System.exit(ret);
454 }
455 }