1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.util.concurrent.TimeUnit;
23
24 import org.apache.commons.cli.CommandLine;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.fs.FSDataInputStream;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.client.HTable;
30 import org.apache.hadoop.hbase.client.Result;
31 import org.apache.hadoop.hbase.client.ResultScanner;
32 import org.apache.hadoop.hbase.client.Scan;
33 import org.apache.hadoop.hbase.client.TableSnapshotScanner;
34 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
37 import org.apache.hadoop.hbase.mapreduce.TableMapper;
38 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
39 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.hbase.util.FSUtils;
42 import org.apache.hadoop.io.NullWritable;
43 import org.apache.hadoop.mapreduce.Counters;
44 import org.apache.hadoop.mapreduce.Job;
45 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
46 import org.apache.hadoop.util.StringUtils;
47 import org.apache.hadoop.util.ToolRunner;
48
49 import com.google.common.base.Stopwatch;
50
51
52
53
54
55 public class ScanPerformanceEvaluation extends AbstractHBaseTool {
56
57 private static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
58
59 private String type;
60 private String file;
61 private String tablename;
62 private String snapshotName;
63 private String restoreDir;
64 private String caching;
65
66 @Override
67 public void setConf(Configuration conf) {
68 super.setConf(conf);
69 Path rootDir;
70 try {
71 rootDir = FSUtils.getRootDir(conf);
72 rootDir.getFileSystem(conf);
73 } catch (IOException ex) {
74 throw new RuntimeException(ex);
75 }
76 }
77
78 @Override
79 protected void addOptions() {
80 this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce");
81 this.addOptWithArg("f", "file", "the filename to read from");
82 this.addOptWithArg("tn", "table", "the tablename to read from");
83 this.addOptWithArg("sn", "snapshot", "the snapshot name to read from");
84 this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot");
85 this.addOptWithArg("ch", "caching", "scanner caching value");
86 }
87
88 @Override
89 protected void processOptions(CommandLine cmd) {
90 type = cmd.getOptionValue("type");
91 file = cmd.getOptionValue("file");
92 tablename = cmd.getOptionValue("table");
93 snapshotName = cmd.getOptionValue("snapshot");
94 restoreDir = cmd.getOptionValue("restoredir");
95 caching = cmd.getOptionValue("caching");
96 }
97
98 protected void testHdfsStreaming(Path filename) throws IOException {
99 byte[] buf = new byte[1024];
100 FileSystem fs = filename.getFileSystem(getConf());
101
102
103 Stopwatch fileOpenTimer = new Stopwatch();
104 Stopwatch streamTimer = new Stopwatch();
105
106 fileOpenTimer.start();
107 FSDataInputStream in = fs.open(filename);
108 fileOpenTimer.stop();
109
110 long totalBytes = 0;
111 streamTimer.start();
112 while (true) {
113 int read = in.read(buf);
114 if (read < 0) {
115 break;
116 }
117 totalBytes += read;
118 }
119 streamTimer.stop();
120
121 double throughput = (double)totalBytes / streamTimer.elapsedTime(TimeUnit.SECONDS);
122
123 System.out.println("HDFS streaming: ");
124 System.out.println("total time to open: " + fileOpenTimer.elapsedMillis() + " ms");
125 System.out.println("total time to read: " + streamTimer.elapsedMillis() + " ms");
126 System.out.println("total bytes: " + totalBytes + " bytes ("
127 + StringUtils.humanReadableInt(totalBytes) + ")");
128 System.out.println("throghput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
129 }
130
131 private Scan getScan() {
132 Scan scan = new Scan();
133 scan.setCacheBlocks(false);
134 scan.setMaxVersions(1);
135 scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
136 if (caching != null) {
137 scan.setCaching(Integer.parseInt(caching));
138 }
139
140 return scan;
141 }
142
143 public void testScan() throws IOException {
144 Stopwatch tableOpenTimer = new Stopwatch();
145 Stopwatch scanOpenTimer = new Stopwatch();
146 Stopwatch scanTimer = new Stopwatch();
147
148 tableOpenTimer.start();
149 HTable table = new HTable(getConf(), TableName.valueOf(tablename));
150 tableOpenTimer.stop();
151
152 Scan scan = getScan();
153 scanOpenTimer.start();
154 ResultScanner scanner = table.getScanner(scan);
155 scanOpenTimer.stop();
156
157 long numRows = 0;
158 long numCells = 0;
159 scanTimer.start();
160 while (true) {
161 Result result = scanner.next();
162 if (result == null) {
163 break;
164 }
165 numRows++;
166
167 numCells += result.rawCells().length;
168 }
169 scanTimer.stop();
170 scanner.close();
171 table.close();
172
173 ScanMetrics metrics = ProtobufUtil.toScanMetrics(scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA));
174 long totalBytes = metrics.countOfBytesInResults.get();
175 double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
176 double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
177 double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
178
179 System.out.println("HBase scan: ");
180 System.out.println("total time to open table: " + tableOpenTimer.elapsedMillis() + " ms");
181 System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
182 System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
183
184 System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
185
186 System.out.println("total bytes: " + totalBytes + " bytes ("
187 + StringUtils.humanReadableInt(totalBytes) + ")");
188 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
189 System.out.println("total rows : " + numRows);
190 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
191 System.out.println("total cells : " + numCells);
192 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
193 }
194
195
196 public void testSnapshotScan() throws IOException {
197 Stopwatch snapshotRestoreTimer = new Stopwatch();
198 Stopwatch scanOpenTimer = new Stopwatch();
199 Stopwatch scanTimer = new Stopwatch();
200
201 Path restoreDir = new Path(this.restoreDir);
202
203 snapshotRestoreTimer.start();
204 restoreDir.getFileSystem(conf).delete(restoreDir, true);
205 snapshotRestoreTimer.stop();
206
207 Scan scan = getScan();
208 scanOpenTimer.start();
209 TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
210 scanOpenTimer.stop();
211
212 long numRows = 0;
213 long numCells = 0;
214 scanTimer.start();
215 while (true) {
216 Result result = scanner.next();
217 if (result == null) {
218 break;
219 }
220 numRows++;
221
222 numCells += result.rawCells().length;
223 }
224 scanTimer.stop();
225 scanner.close();
226
227 ScanMetrics metrics = scanner.getScanMetrics();
228 long totalBytes = metrics.countOfBytesInResults.get();
229 double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
230 double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
231 double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
232
233 System.out.println("HBase scan snapshot: ");
234 System.out.println("total time to restore snapshot: " + snapshotRestoreTimer.elapsedMillis() + " ms");
235 System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
236 System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
237
238 System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
239
240 System.out.println("total bytes: " + totalBytes + " bytes ("
241 + StringUtils.humanReadableInt(totalBytes) + ")");
242 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
243 System.out.println("total rows : " + numRows);
244 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
245 System.out.println("total cells : " + numCells);
246 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
247
248 }
249
250 public static enum ScanCounter {
251 NUM_ROWS,
252 NUM_CELLS,
253 }
254
255 public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> {
256 @Override
257 protected void map(ImmutableBytesWritable key, Result value,
258 Context context) throws IOException,
259 InterruptedException {
260 context.getCounter(ScanCounter.NUM_ROWS).increment(1);
261 context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length);
262 }
263 }
264
265 public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
266 Stopwatch scanOpenTimer = new Stopwatch();
267 Stopwatch scanTimer = new Stopwatch();
268
269 Scan scan = getScan();
270
271 String jobName = "testScanMapReduce";
272
273 Job job = new Job(conf);
274 job.setJobName(jobName);
275
276 job.setJarByClass(getClass());
277
278 TableMapReduceUtil.initTableMapperJob(
279 this.tablename,
280 scan,
281 MyMapper.class,
282 NullWritable.class,
283 NullWritable.class,
284 job
285 );
286
287 job.setNumReduceTasks(0);
288 job.setOutputKeyClass(NullWritable.class);
289 job.setOutputValueClass(NullWritable.class);
290 job.setOutputFormatClass(NullOutputFormat.class);
291
292 scanTimer.start();
293 job.waitForCompletion(true);
294 scanTimer.stop();
295
296 Counters counters = job.getCounters();
297 long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
298 long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
299
300 long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
301 double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
302 double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
303 double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
304
305 System.out.println("HBase scan mapreduce: ");
306 System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
307 System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
308
309 System.out.println("total bytes: " + totalBytes + " bytes ("
310 + StringUtils.humanReadableInt(totalBytes) + ")");
311 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
312 System.out.println("total rows : " + numRows);
313 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
314 System.out.println("total cells : " + numCells);
315 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
316 }
317
318 public void testSnapshotScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
319 Stopwatch scanOpenTimer = new Stopwatch();
320 Stopwatch scanTimer = new Stopwatch();
321
322 Scan scan = getScan();
323
324 String jobName = "testSnapshotScanMapReduce";
325
326 Job job = new Job(conf);
327 job.setJobName(jobName);
328
329 job.setJarByClass(getClass());
330
331 TableMapReduceUtil.initTableSnapshotMapperJob(
332 this.snapshotName,
333 scan,
334 MyMapper.class,
335 NullWritable.class,
336 NullWritable.class,
337 job,
338 true,
339 new Path(restoreDir)
340 );
341
342 job.setNumReduceTasks(0);
343 job.setOutputKeyClass(NullWritable.class);
344 job.setOutputValueClass(NullWritable.class);
345 job.setOutputFormatClass(NullOutputFormat.class);
346
347 scanTimer.start();
348 job.waitForCompletion(true);
349 scanTimer.stop();
350
351 Counters counters = job.getCounters();
352 long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
353 long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
354
355 long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
356 double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
357 double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
358 double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
359
360 System.out.println("HBase scan mapreduce: ");
361 System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
362 System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
363
364 System.out.println("total bytes: " + totalBytes + " bytes ("
365 + StringUtils.humanReadableInt(totalBytes) + ")");
366 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
367 System.out.println("total rows : " + numRows);
368 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
369 System.out.println("total cells : " + numCells);
370 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
371 }
372
373 @Override
374 protected int doWork() throws Exception {
375 if (type.equals("streaming")) {
376 testHdfsStreaming(new Path(file));
377 } else if (type.equals("scan")){
378 testScan();
379 } else if (type.equals("snapshotscan")) {
380 testSnapshotScan();
381 } else if (type.equals("scanmapreduce")) {
382 testScanMapReduce();
383 } else if (type.equals("snapshotscanmapreduce")) {
384 testSnapshotScanMapReduce();
385 }
386 return 0;
387 }
388
389 public static void main (String[] args) throws Exception {
390 int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args);
391 System.exit(ret);
392 }
393 }