1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.Random;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.commons.math.random.RandomData;
28 import org.apache.commons.math.random.RandomDataImpl;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
34 import org.apache.hadoop.hbase.io.hfile.HFile;
35 import org.apache.hadoop.hbase.io.hfile.HFileContext;
36 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
37 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
38 import org.apache.hadoop.hbase.util.Bytes;
39
40
41
42
43
44
45 public class HFilePerformanceEvaluation {
46
47 private static final int ROW_LENGTH = 10;
48 private static final int ROW_COUNT = 1000000;
49 private static final int RFILE_BLOCKSIZE = 8 * 1024;
50
51 static final Log LOG =
52 LogFactory.getLog(HFilePerformanceEvaluation.class.getName());
53
54 static byte [] format(final int i) {
55 String v = Integer.toString(i);
56 return Bytes.toBytes("0000000000".substring(v.length()) + v);
57 }
58
59 static ImmutableBytesWritable format(final int i, ImmutableBytesWritable w) {
60 w.set(format(i));
61 return w;
62 }
63
64 private void runBenchmarks() throws Exception {
65 final Configuration conf = new Configuration();
66 final FileSystem fs = FileSystem.get(conf);
67 final Path mf = fs.makeQualified(new Path("performanceevaluation.mapfile"));
68 if (fs.exists(mf)) {
69 fs.delete(mf, true);
70 }
71
72 runBenchmark(new SequentialWriteBenchmark(conf, fs, mf, ROW_COUNT),
73 ROW_COUNT);
74 PerformanceEvaluationCommons.concurrentReads(new Runnable() {
75 public void run() {
76 try {
77 runBenchmark(new UniformRandomSmallScan(conf, fs, mf, ROW_COUNT),
78 ROW_COUNT);
79 } catch (Exception e) {
80 e.printStackTrace();
81 }
82 }
83 });
84 PerformanceEvaluationCommons.concurrentReads(new Runnable() {
85 public void run() {
86 try {
87 runBenchmark(new UniformRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
88 ROW_COUNT);
89 } catch (Exception e) {
90 e.printStackTrace();
91 }
92 }
93 });
94 PerformanceEvaluationCommons.concurrentReads(new Runnable() {
95 public void run() {
96 try {
97 runBenchmark(new GaussianRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
98 ROW_COUNT);
99 } catch (Exception e) {
100 e.printStackTrace();
101 }
102 }
103 });
104 PerformanceEvaluationCommons.concurrentReads(new Runnable() {
105 public void run() {
106 try {
107 runBenchmark(new SequentialReadBenchmark(conf, fs, mf, ROW_COUNT),
108 ROW_COUNT);
109 } catch (Exception e) {
110 e.printStackTrace();
111 }
112 }
113 });
114
115 }
116
117 protected void runBenchmark(RowOrientedBenchmark benchmark, int rowCount)
118 throws Exception {
119 LOG.info("Running " + benchmark.getClass().getSimpleName() + " for " +
120 rowCount + " rows.");
121 long elapsedTime = benchmark.run();
122 LOG.info("Running " + benchmark.getClass().getSimpleName() + " for " +
123 rowCount + " rows took " + elapsedTime + "ms.");
124 }
125
126 static abstract class RowOrientedBenchmark {
127
128 protected final Configuration conf;
129 protected final FileSystem fs;
130 protected final Path mf;
131 protected final int totalRows;
132
133 public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf,
134 int totalRows) {
135 this.conf = conf;
136 this.fs = fs;
137 this.mf = mf;
138 this.totalRows = totalRows;
139 }
140
141 void setUp() throws Exception {
142
143 }
144
145 abstract void doRow(int i) throws Exception;
146
147 protected int getReportingPeriod() {
148 return this.totalRows / 10;
149 }
150
151 void tearDown() throws Exception {
152
153 }
154
155
156
157
158
159
160 long run() throws Exception {
161 long elapsedTime;
162 setUp();
163 long startTime = System.currentTimeMillis();
164 try {
165 for (int i = 0; i < totalRows; i++) {
166 if (i > 0 && i % getReportingPeriod() == 0) {
167 LOG.info("Processed " + i + " rows.");
168 }
169 doRow(i);
170 }
171 elapsedTime = System.currentTimeMillis() - startTime;
172 } finally {
173 tearDown();
174 }
175 return elapsedTime;
176 }
177
178 }
179
180 static class SequentialWriteBenchmark extends RowOrientedBenchmark {
181 protected HFile.Writer writer;
182 private Random random = new Random();
183 private byte[] bytes = new byte[ROW_LENGTH];
184
185 public SequentialWriteBenchmark(Configuration conf, FileSystem fs, Path mf,
186 int totalRows) {
187 super(conf, fs, mf, totalRows);
188 }
189
190 @Override
191 void setUp() throws Exception {
192 HFileContext hFileContext = new HFileContextBuilder().withBlockSize(RFILE_BLOCKSIZE).build();
193 writer =
194 HFile.getWriterFactoryNoCache(conf)
195 .withPath(fs, mf)
196 .withFileContext(hFileContext)
197 .withComparator(new KeyValue.RawBytesComparator())
198 .create();
199 }
200
201 @Override
202 void doRow(int i) throws Exception {
203 writer.append(format(i), generateValue());
204 }
205
206 private byte[] generateValue() {
207 random.nextBytes(bytes);
208 return bytes;
209 }
210
211 @Override
212 protected int getReportingPeriod() {
213 return this.totalRows;
214 }
215
216 @Override
217 void tearDown() throws Exception {
218 writer.close();
219 }
220
221 }
222
223 static abstract class ReadBenchmark extends RowOrientedBenchmark {
224
225 protected HFile.Reader reader;
226
227 public ReadBenchmark(Configuration conf, FileSystem fs, Path mf,
228 int totalRows) {
229 super(conf, fs, mf, totalRows);
230 }
231
232 @Override
233 void setUp() throws Exception {
234 reader = HFile.createReader(this.fs, this.mf, new CacheConfig(this.conf), this.conf);
235 this.reader.loadFileInfo();
236 }
237
238 @Override
239 void tearDown() throws Exception {
240 reader.close();
241 }
242
243 }
244
245 static class SequentialReadBenchmark extends ReadBenchmark {
246 private HFileScanner scanner;
247
248 public SequentialReadBenchmark(Configuration conf, FileSystem fs,
249 Path mf, int totalRows) {
250 super(conf, fs, mf, totalRows);
251 }
252
253 @Override
254 void setUp() throws Exception {
255 super.setUp();
256 this.scanner = this.reader.getScanner(false, false);
257 this.scanner.seekTo();
258 }
259
260 @Override
261 void doRow(int i) throws Exception {
262 if (this.scanner.next()) {
263 ByteBuffer k = this.scanner.getKey();
264 PerformanceEvaluationCommons.assertKey(format(i + 1), k);
265 ByteBuffer v = scanner.getValue();
266 PerformanceEvaluationCommons.assertValueSize(v.limit(), ROW_LENGTH);
267 }
268 }
269
270 @Override
271 protected int getReportingPeriod() {
272 return this.totalRows;
273 }
274
275 }
276
277 static class UniformRandomReadBenchmark extends ReadBenchmark {
278
279 private Random random = new Random();
280
281 public UniformRandomReadBenchmark(Configuration conf, FileSystem fs,
282 Path mf, int totalRows) {
283 super(conf, fs, mf, totalRows);
284 }
285
286 @Override
287 void doRow(int i) throws Exception {
288 HFileScanner scanner = this.reader.getScanner(false, true);
289 byte [] b = getRandomRow();
290 if (scanner.seekTo(b) < 0) {
291 LOG.info("Not able to seekTo " + new String(b));
292 return;
293 }
294 ByteBuffer k = scanner.getKey();
295 PerformanceEvaluationCommons.assertKey(b, k);
296 ByteBuffer v = scanner.getValue();
297 PerformanceEvaluationCommons.assertValueSize(v.limit(), ROW_LENGTH);
298 }
299
300 private byte [] getRandomRow() {
301 return format(random.nextInt(totalRows));
302 }
303 }
304
305 static class UniformRandomSmallScan extends ReadBenchmark {
306 private Random random = new Random();
307
308 public UniformRandomSmallScan(Configuration conf, FileSystem fs,
309 Path mf, int totalRows) {
310 super(conf, fs, mf, totalRows/10);
311 }
312
313 @Override
314 void doRow(int i) throws Exception {
315 HFileScanner scanner = this.reader.getScanner(false, false);
316 byte [] b = getRandomRow();
317 if (scanner.seekTo(b) != 0) {
318 LOG.info("Nonexistent row: " + new String(b));
319 return;
320 }
321 ByteBuffer k = scanner.getKey();
322 PerformanceEvaluationCommons.assertKey(b, k);
323
324 for (int ii = 0; ii < 30; ii++) {
325 if (!scanner.next()) {
326 LOG.info("NOTHING FOLLOWS");
327 return;
328 }
329 ByteBuffer v = scanner.getValue();
330 PerformanceEvaluationCommons.assertValueSize(v.limit(), ROW_LENGTH);
331 }
332 }
333
334 private byte [] getRandomRow() {
335 return format(random.nextInt(totalRows));
336 }
337 }
338
339 static class GaussianRandomReadBenchmark extends ReadBenchmark {
340
341 private RandomData randomData = new RandomDataImpl();
342
343 public GaussianRandomReadBenchmark(Configuration conf, FileSystem fs,
344 Path mf, int totalRows) {
345 super(conf, fs, mf, totalRows);
346 }
347
348 @Override
349 void doRow(int i) throws Exception {
350 HFileScanner scanner = this.reader.getScanner(false, true);
351 byte[] gaussianRandomRowBytes = getGaussianRandomRowBytes();
352 scanner.seekTo(gaussianRandomRowBytes);
353 for (int ii = 0; ii < 30; ii++) {
354 if (!scanner.next()) {
355 LOG.info("NOTHING FOLLOWS");
356 return;
357 }
358 scanner.getKey();
359 scanner.getValue();
360 }
361 }
362
363 private byte [] getGaussianRandomRowBytes() {
364 int r = (int) randomData.nextGaussian((double)totalRows / 2.0,
365 (double)totalRows / 10.0);
366
367 return format(Math.min(totalRows, Math.max(r,0)));
368 }
369 }
370
371
372
373
374
375
376 public static void main(String[] args) throws Exception {
377 new HFilePerformanceEvaluation().runBenchmarks();
378 }
379 }