1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Random;
27 import java.util.UUID;
28 import java.util.concurrent.TimeUnit;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.classification.InterfaceAudience;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.conf.Configured;
35 import org.apache.hadoop.fs.FileStatus;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.Cell;
39 import org.apache.hadoop.hbase.HBaseConfiguration;
40 import org.apache.hadoop.hbase.HBaseTestingUtility;
41 import org.apache.hadoop.hbase.HColumnDescriptor;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.HRegionInfo;
44 import org.apache.hadoop.hbase.HTableDescriptor;
45 import org.apache.hadoop.hbase.KeyValue;
46 import org.apache.hadoop.hbase.KeyValueUtil;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.client.Put;
49 import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
50 import org.apache.hadoop.hbase.regionserver.HRegion;
51 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.hbase.util.FSUtils;
54 import org.apache.hadoop.util.Tool;
55 import org.apache.hadoop.util.ToolRunner;
56
57 import com.yammer.metrics.core.Meter;
58 import com.yammer.metrics.core.MetricsRegistry;
59 import com.yammer.metrics.reporting.ConsoleReporter;
60
61
62
63
64
65
66 @InterfaceAudience.Private
67 public final class HLogPerformanceEvaluation extends Configured implements Tool {
68 static final Log LOG = LogFactory.getLog(HLogPerformanceEvaluation.class.getName());
69 private final MetricsRegistry metrics = new MetricsRegistry();
70 private final Meter syncMeter =
71 metrics.newMeter(HLogPerformanceEvaluation.class, "syncMeter", "syncs", TimeUnit.MILLISECONDS);
72 private final Meter appendMeter =
73 metrics.newMeter(HLogPerformanceEvaluation.class, "append", "bytes", TimeUnit.MILLISECONDS);
74
75 private HBaseTestingUtility TEST_UTIL;
76
77 static final String TABLE_NAME = "HLogPerformanceEvaluation";
78 static final String QUALIFIER_PREFIX = "q";
79 static final String FAMILY_PREFIX = "cf";
80
81 private int numQualifiers = 1;
82 private int valueSize = 512;
83 private int keySize = 16;
84
85 @Override
86 public void setConf(Configuration conf) {
87 super.setConf(conf);
88 TEST_UTIL = new HBaseTestingUtility(conf);
89 }
90
91
92
93
94
95
96 class HLogPutBenchmark implements Runnable {
97 private final long numIterations;
98 private final int numFamilies;
99 private final boolean noSync;
100 private final HRegion region;
101 private final int syncInterval;
102 private final HTableDescriptor htd;
103
104 HLogPutBenchmark(final HRegion region, final HTableDescriptor htd,
105 final long numIterations, final boolean noSync, final int syncInterval) {
106 this.numIterations = numIterations;
107 this.noSync = noSync;
108 this.syncInterval = syncInterval;
109 this.numFamilies = htd.getColumnFamilies().length;
110 this.region = region;
111 this.htd = htd;
112 }
113
114 @Override
115 public void run() {
116 byte[] key = new byte[keySize];
117 byte[] value = new byte[valueSize];
118 Random rand = new Random(Thread.currentThread().getId());
119 HLog hlog = region.getLog();
120 ArrayList<UUID> clusters = new ArrayList<UUID>();
121 long nonce = HConstants.NO_NONCE;
122
123 try {
124 long startTime = System.currentTimeMillis();
125 int lastSync = 0;
126 for (int i = 0; i < numIterations; ++i) {
127 Put put = setupPut(rand, key, value, numFamilies);
128 long now = System.currentTimeMillis();
129 WALEdit walEdit = new WALEdit();
130 addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit);
131 HRegionInfo hri = region.getRegionInfo();
132 hlog.appendNoSync(hri, hri.getTable(), walEdit, clusters, now, htd,
133 region.getSequenceId(), true, nonce, nonce);
134 if (!this.noSync) {
135 if (++lastSync >= this.syncInterval) {
136 hlog.sync();
137 lastSync = 0;
138 }
139 }
140 }
141 long totalTime = (System.currentTimeMillis() - startTime);
142 logBenchmarkResult(Thread.currentThread().getName(), numIterations, totalTime);
143 } catch (Exception e) {
144 LOG.error(getClass().getSimpleName() + " Thread failed", e);
145 }
146 }
147 }
148
149 @Override
150 public int run(String[] args) throws Exception {
151 Path rootRegionDir = null;
152 int numThreads = 1;
153 long numIterations = 1000000;
154 int numFamilies = 1;
155 int syncInterval = 0;
156 boolean noSync = false;
157 boolean verify = false;
158 boolean verbose = false;
159 boolean cleanup = true;
160 boolean noclosefs = false;
161 long roll = Long.MAX_VALUE;
162 boolean compress = false;
163 String cipher = null;
164
165 for (int i = 0; i < args.length; i++) {
166 String cmd = args[i];
167 try {
168 if (cmd.equals("-threads")) {
169 numThreads = Integer.parseInt(args[++i]);
170 } else if (cmd.equals("-iterations")) {
171 numIterations = Long.parseLong(args[++i]);
172 } else if (cmd.equals("-path")) {
173 rootRegionDir = new Path(args[++i]);
174 } else if (cmd.equals("-families")) {
175 numFamilies = Integer.parseInt(args[++i]);
176 } else if (cmd.equals("-qualifiers")) {
177 numQualifiers = Integer.parseInt(args[++i]);
178 } else if (cmd.equals("-keySize")) {
179 keySize = Integer.parseInt(args[++i]);
180 } else if (cmd.equals("-valueSize")) {
181 valueSize = Integer.parseInt(args[++i]);
182 } else if (cmd.equals("-syncInterval")) {
183 syncInterval = Integer.parseInt(args[++i]);
184 } else if (cmd.equals("-nosync")) {
185 noSync = true;
186 } else if (cmd.equals("-verify")) {
187 verify = true;
188 } else if (cmd.equals("-verbose")) {
189 verbose = true;
190 } else if (cmd.equals("-nocleanup")) {
191 cleanup = false;
192 } else if (cmd.equals("-noclosefs")) {
193 noclosefs = true;
194 } else if (cmd.equals("-roll")) {
195 roll = Long.parseLong(args[++i]);
196 } else if (cmd.equals("-compress")) {
197 compress = true;
198 } else if (cmd.equals("-encryption")) {
199 cipher = args[++i];
200 } else if (cmd.equals("-h")) {
201 printUsageAndExit();
202 } else if (cmd.equals("--help")) {
203 printUsageAndExit();
204 } else {
205 System.err.println("UNEXPECTED: " + cmd);
206 printUsageAndExit();
207 }
208 } catch (Exception e) {
209 printUsageAndExit();
210 }
211 }
212
213 if (compress) {
214 Configuration conf = getConf();
215 conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
216 }
217
218 if (cipher != null) {
219
220 Configuration conf = getConf();
221 conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
222 conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
223 conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
224 HLog.Reader.class);
225 conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
226 HLog.Writer.class);
227 conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
228 conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher);
229 }
230
231
232
233 FSUtils.setFsDefault(getConf(), FSUtils.getRootDir(getConf()));
234 FileSystem fs = FileSystem.get(getConf());
235 LOG.info("FileSystem: " + fs);
236 try {
237 if (rootRegionDir == null) {
238 rootRegionDir = TEST_UTIL.getDataTestDirOnTestFS("HLogPerformanceEvaluation");
239 }
240 rootRegionDir = rootRegionDir.makeQualified(fs);
241 cleanRegionRootDir(fs, rootRegionDir);
242
243 HTableDescriptor htd = createHTableDescriptor(numFamilies);
244 final long whenToRoll = roll;
245 HLog hlog = new FSHLog(fs, rootRegionDir, "wals", getConf()) {
246 int appends = 0;
247 @Override
248 protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
249 HTableDescriptor htd)
250 throws IOException {
251 this.appends++;
252 if (this.appends % whenToRoll == 0) {
253 LOG.info("Rolling after " + appends + " edits");
254 rollWriter();
255 }
256 super.doWrite(info, logKey, logEdit, htd);
257 };
258
259 @Override
260 public void postSync() {
261 super.postSync();
262 syncMeter.mark();
263 }
264
265 @Override
266 public void postAppend(List<Entry> entries) {
267 super.postAppend(entries);
268 int size = 0;
269 for (Entry e: entries) size += e.getEdit().heapSize();
270 appendMeter.mark(size);
271 }
272 };
273 hlog.rollWriter();
274 HRegion region = null;
275 try {
276 region = openRegion(fs, rootRegionDir, htd, hlog);
277 ConsoleReporter.enable(this.metrics, 1, TimeUnit.SECONDS);
278 long putTime =
279 runBenchmark(new HLogPutBenchmark(region, htd, numIterations, noSync, syncInterval),
280 numThreads);
281 logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" + numIterations +
282 ", syncInterval=" + syncInterval, numIterations * numThreads, putTime);
283
284 if (region != null) {
285 closeRegion(region);
286 region = null;
287 }
288 if (verify) {
289 Path dir = ((FSHLog) hlog).getDir();
290 long editCount = 0;
291 FileStatus [] fsss = fs.listStatus(dir);
292 if (fsss.length == 0) throw new IllegalStateException("No WAL found");
293 for (FileStatus fss: fsss) {
294 Path p = fss.getPath();
295 if (!fs.exists(p)) throw new IllegalStateException(p.toString());
296 editCount += verify(p, verbose);
297 }
298 long expected = numIterations * numThreads;
299 if (editCount != expected) {
300 throw new IllegalStateException("Counted=" + editCount + ", expected=" + expected);
301 }
302 }
303 } finally {
304 if (region != null) closeRegion(region);
305
306 if (cleanup) cleanRegionRootDir(fs, rootRegionDir);
307 }
308 } finally {
309
310 if (!noclosefs) fs.close();
311 }
312
313 return(0);
314 }
315
316 private static HTableDescriptor createHTableDescriptor(final int numFamilies) {
317 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
318 for (int i = 0; i < numFamilies; ++i) {
319 HColumnDescriptor colDef = new HColumnDescriptor(FAMILY_PREFIX + i);
320 htd.addFamily(colDef);
321 }
322 return htd;
323 }
324
325
326
327
328
329
330
331
332 private long verify(final Path wal, final boolean verbose) throws IOException {
333 HLog.Reader reader = HLogFactory.createReader(wal.getFileSystem(getConf()), wal, getConf());
334 long count = 0;
335 Map<String, Long> sequenceIds = new HashMap<String, Long>();
336 try {
337 while (true) {
338 Entry e = reader.next();
339 if (e == null) {
340 LOG.debug("Read count=" + count + " from " + wal);
341 break;
342 }
343 count++;
344 long seqid = e.getKey().getLogSeqNum();
345 if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) {
346
347 if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) {
348 throw new IllegalStateException("wal = " + wal.getName() + ", " + "previous seqid = "
349 + sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName()))
350 + ", current seqid = " + seqid);
351 }
352 }
353
354 sequenceIds.put(Bytes.toString(e.getKey().getEncodedRegionName()), seqid);
355 if (verbose) LOG.info("seqid=" + seqid);
356 }
357 } finally {
358 reader.close();
359 }
360 return count;
361 }
362
363 private static void logBenchmarkResult(String testName, long numTests, long totalTime) {
364 float tsec = totalTime / 1000.0f;
365 LOG.info(String.format("%s took %.3fs %.3fops/s", testName, tsec, numTests / tsec));
366
367 }
368
369 private void printUsageAndExit() {
370 System.err.printf("Usage: bin/hbase %s [options]\n", getClass().getName());
371 System.err.println(" where [options] are:");
372 System.err.println(" -h|-help Show this help and exit.");
373 System.err.println(" -threads <N> Number of threads writing on the WAL.");
374 System.err.println(" -iterations <N> Number of iterations per thread.");
375 System.err.println(" -path <PATH> Path where region's root directory is created.");
376 System.err.println(" -families <N> Number of column families to write.");
377 System.err.println(" -qualifiers <N> Number of qualifiers to write.");
378 System.err.println(" -keySize <N> Row key size in byte.");
379 System.err.println(" -valueSize <N> Row/Col value size in byte.");
380 System.err.println(" -nocleanup Do NOT remove test data when done.");
381 System.err.println(" -noclosefs Do NOT close the filesystem when done.");
382 System.err.println(" -nosync Append without syncing");
383 System.err.println(" -syncInterval <N> Append N edits and then sync. Default=0, i.e. sync every edit.");
384 System.err.println(" -verify Verify edits written in sequence");
385 System.err.println(" -verbose Output extra info; e.g. all edit seq ids when verifying");
386 System.err.println(" -roll <N> Roll the way every N appends");
387 System.err.println(" -encryption <A> Encrypt the WAL with algorithm A, e.g. AES");
388 System.err.println("");
389 System.err.println("Examples:");
390 System.err.println("");
391 System.err.println(" To run 100 threads on hdfs with log rolling every 10k edits and verification afterward do:");
392 System.err.println(" $ ./bin/hbase org.apache.hadoop.hbase.regionserver.wal.HLogPerformanceEvaluation \\");
393 System.err.println(" -conf ./core-site.xml -path hdfs://example.org:7000/tmp -threads 100 -roll 10000 -verify");
394 System.exit(1);
395 }
396
397 private HRegion openRegion(final FileSystem fs, final Path dir, final HTableDescriptor htd, final HLog hlog)
398 throws IOException {
399
400 HRegionInfo regionInfo = new HRegionInfo(htd.getTableName());
401 return HRegion.createHRegion(regionInfo, dir, getConf(), htd, hlog);
402 }
403
404 private void closeRegion(final HRegion region) throws IOException {
405 if (region != null) {
406 region.close();
407 HLog wal = region.getLog();
408 if (wal != null) wal.close();
409 }
410 }
411
412 private void cleanRegionRootDir(final FileSystem fs, final Path dir) throws IOException {
413 if (fs.exists(dir)) {
414 fs.delete(dir, true);
415 }
416 }
417
418 private Put setupPut(Random rand, byte[] key, byte[] value, final int numFamilies) {
419 rand.nextBytes(key);
420 Put put = new Put(key);
421 for (int cf = 0; cf < numFamilies; ++cf) {
422 for (int q = 0; q < numQualifiers; ++q) {
423 rand.nextBytes(value);
424 put.add(Bytes.toBytes(FAMILY_PREFIX + cf), Bytes.toBytes(QUALIFIER_PREFIX + q), value);
425 }
426 }
427 return put;
428 }
429
430 private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
431 WALEdit walEdit) {
432 for (List<Cell> edits : familyMap.values()) {
433 for (Cell cell : edits) {
434 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
435 walEdit.add(kv);
436 }
437 }
438 }
439
440 private long runBenchmark(Runnable runnable, final int numThreads) throws InterruptedException {
441 Thread[] threads = new Thread[numThreads];
442 long startTime = System.currentTimeMillis();
443 for (int i = 0; i < numThreads; ++i) {
444 threads[i] = new Thread(runnable, "t" + i);
445 threads[i].start();
446 }
447 for (Thread t : threads) t.join();
448 long endTime = System.currentTimeMillis();
449 return(endTime - startTime);
450 }
451
452
453
454
455
456
457
458
459 static int innerMain(final Configuration c, final String [] args) throws Exception {
460 return ToolRunner.run(c, new HLogPerformanceEvaluation(), args);
461 }
462
463 public static void main(String[] args) throws Exception {
464 System.exit(innerMain(HBaseConfiguration.create(), args));
465 }
466 }