1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.util;
18
19 import java.io.IOException;
20 import java.io.InterruptedIOException;
21 import java.lang.reflect.Constructor;
22 import java.net.InetAddress;
23 import java.security.SecureRandom;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.List;
27 import java.util.Properties;
28 import java.util.Random;
29 import java.util.concurrent.atomic.AtomicReference;
30
31 import javax.crypto.spec.SecretKeySpec;
32
33 import org.apache.commons.cli.CommandLine;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.hbase.HBaseTestingUtility;
39 import org.apache.hadoop.hbase.HColumnDescriptor;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.hadoop.hbase.HTableDescriptor;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.client.Durability;
44 import org.apache.hadoop.hbase.client.HBaseAdmin;
45 import org.apache.hadoop.hbase.io.compress.Compression;
46 import org.apache.hadoop.hbase.io.crypto.Cipher;
47 import org.apache.hadoop.hbase.io.crypto.Encryption;
48 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
49 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
50 import org.apache.hadoop.hbase.regionserver.BloomType;
51 import org.apache.hadoop.hbase.security.EncryptionUtil;
52 import org.apache.hadoop.hbase.security.User;
53 import org.apache.hadoop.hbase.security.access.AccessControlClient;
54 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
55 import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL;
56 import org.apache.hadoop.security.SecurityUtil;
57 import org.apache.hadoop.security.UserGroupInformation;
58 import org.apache.hadoop.util.ToolRunner;
59
60
61
62
63
64
65 public class LoadTestTool extends AbstractHBaseTool {
66
67 private static final Log LOG = LogFactory.getLog(LoadTestTool.class);
68 private static final String COLON = ":";
69
70
71 private TableName tableName;
72
73
74 protected static final String DEFAULT_TABLE_NAME = "cluster_test";
75
76
77 public static byte[] COLUMN_FAMILY = Bytes.toBytes("test_cf");
78
79
80 protected static final byte[][] COLUMN_FAMILIES = { COLUMN_FAMILY };
81
82
83 protected static final int DEFAULT_DATA_SIZE = 64;
84
85
86 protected static final int DEFAULT_NUM_THREADS = 20;
87
88
89 protected static final String OPT_USAGE_LOAD =
90 "<avg_cols_per_key>:<avg_data_size>" +
91 "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
92
93
94 protected static final String OPT_USAGE_READ =
95 "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
96
97
98 protected static final String OPT_USAGE_UPDATE =
99 "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS
100 + ">][:<#whether to ignore nonce collisions=0>]";
101
102 protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
103 Arrays.toString(BloomType.values());
104
105 protected static final String OPT_USAGE_COMPRESSION = "Compression type, " +
106 "one of " + Arrays.toString(Compression.Algorithm.values());
107
108 public static final String OPT_DATA_BLOCK_ENCODING_USAGE =
109 "Encoding algorithm (e.g. prefix "
110 + "compression) to use for data blocks in the test column family, "
111 + "one of " + Arrays.toString(DataBlockEncoding.values()) + ".";
112
113 private static final String OPT_BLOOM = "bloom";
114 private static final String OPT_COMPRESSION = "compression";
115 private static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
116 public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush.";
117 public static final String OPT_DATA_BLOCK_ENCODING =
118 HColumnDescriptor.DATA_BLOCK_ENCODING.toLowerCase();
119
120 public static final String OPT_INMEMORY = "in_memory";
121 public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " +
122 "inmemory as far as possible. Not guaranteed that reads are always served from inmemory";
123
124 public static final String OPT_GENERATOR = "generator";
125 public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
126 + " Any args for this class can be passed as colon separated after class name";
127
128 protected static final String OPT_KEY_WINDOW = "key_window";
129 protected static final String OPT_WRITE = "write";
130 protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
131 protected static final String OPT_MULTIPUT = "multiput";
132 protected static final String OPT_NUM_KEYS = "num_keys";
133 protected static final String OPT_READ = "read";
134 protected static final String OPT_START_KEY = "start_key";
135 protected static final String OPT_TABLE_NAME = "tn";
136 protected static final String OPT_ZK_QUORUM = "zk";
137 protected static final String OPT_ZK_PARENT_NODE = "zk_root";
138 protected static final String OPT_SKIP_INIT = "skip_init";
139 protected static final String OPT_INIT_ONLY = "init_only";
140 protected static final String NUM_TABLES = "num_tables";
141 protected static final String OPT_REGIONS_PER_SERVER = "regions_per_server";
142 protected static final String OPT_BATCHUPDATE = "batchupdate";
143 protected static final String OPT_UPDATE = "update";
144
145 protected static final String OPT_ENCRYPTION = "encryption";
146 protected static final String OPT_ENCRYPTION_USAGE =
147 "Enables transparent encryption on the test table, one of " +
148 Arrays.toString(Encryption.getSupportedCiphers());
149
150 protected static final long DEFAULT_START_KEY = 0;
151
152
153 protected CommandLine cmd;
154
155 protected MultiThreadedWriter writerThreads = null;
156 protected MultiThreadedReader readerThreads = null;
157 protected MultiThreadedUpdater updaterThreads = null;
158
159 protected long startKey, endKey;
160
161 protected boolean isWrite, isRead, isUpdate;
162 protected boolean deferredLogFlush;
163
164
165 protected DataBlockEncoding dataBlockEncodingAlgo;
166 protected Compression.Algorithm compressAlgo;
167 protected BloomType bloomType;
168 private boolean inMemoryCF;
169
170 private User userOwner;
171
172 protected int numWriterThreads = DEFAULT_NUM_THREADS;
173 protected int minColsPerKey, maxColsPerKey;
174 protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE;
175 protected boolean isMultiPut;
176
177
178 protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
179 protected int updatePercent;
180 protected boolean ignoreConflicts = false;
181 protected boolean isBatchUpdate;
182
183
184 private int numReaderThreads = DEFAULT_NUM_THREADS;
185 private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
186 private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
187 private int verifyPercent;
188
189 private int numTables = 1;
190 private int regionsPerServer = HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER;
191
192 private String superUser;
193
194 private String userNames;
195
196 private String authnFileName;
197
198
199
200 protected boolean isSkipInit = false;
201 protected boolean isInitOnly = false;
202
203 protected Cipher cipher = null;
204
205 protected String[] splitColonSeparated(String option,
206 int minNumCols, int maxNumCols) {
207 String optVal = cmd.getOptionValue(option);
208 String[] cols = optVal.split(COLON);
209 if (cols.length < minNumCols || cols.length > maxNumCols) {
210 throw new IllegalArgumentException("Expected at least "
211 + minNumCols + " columns but no more than " + maxNumCols +
212 " in the colon-separated value '" + optVal + "' of the " +
213 "-" + option + " option");
214 }
215 return cols;
216 }
217
218 protected int getNumThreads(String numThreadsStr) {
219 return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
220 }
221
222
223
224
225
226 protected void applyColumnFamilyOptions(TableName tableName,
227 byte[][] columnFamilies) throws IOException {
228 HBaseAdmin admin = new HBaseAdmin(conf);
229 HTableDescriptor tableDesc = admin.getTableDescriptor(tableName);
230 LOG.info("Disabling table " + tableName);
231 admin.disableTable(tableName);
232 for (byte[] cf : columnFamilies) {
233 HColumnDescriptor columnDesc = tableDesc.getFamily(cf);
234 boolean isNewCf = columnDesc == null;
235 if (isNewCf) {
236 columnDesc = new HColumnDescriptor(cf);
237 }
238 if (bloomType != null) {
239 columnDesc.setBloomFilterType(bloomType);
240 }
241 if (compressAlgo != null) {
242 columnDesc.setCompressionType(compressAlgo);
243 }
244 if (dataBlockEncodingAlgo != null) {
245 columnDesc.setDataBlockEncoding(dataBlockEncodingAlgo);
246 }
247 if (inMemoryCF) {
248 columnDesc.setInMemory(inMemoryCF);
249 }
250 if (cipher != null) {
251 byte[] keyBytes = new byte[cipher.getKeyLength()];
252 new SecureRandom().nextBytes(keyBytes);
253 columnDesc.setEncryptionType(cipher.getName());
254 columnDesc.setEncryptionKey(EncryptionUtil.wrapKey(conf,
255 User.getCurrent().getShortName(),
256 new SecretKeySpec(keyBytes, cipher.getName())));
257 }
258 if (isNewCf) {
259 admin.addColumn(tableName, columnDesc);
260 } else {
261 admin.modifyColumn(tableName, columnDesc);
262 }
263 }
264 LOG.info("Enabling table " + tableName);
265 admin.enableTable(tableName);
266 admin.close();
267 }
268
269 @Override
270 protected void addOptions() {
271 addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " +
272 "without port numbers");
273 addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
274 addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
275 addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
276 addOptWithArg(OPT_READ, OPT_USAGE_READ);
277 addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
278 addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
279 addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
280 addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
281 addOptWithArg(OPT_DATA_BLOCK_ENCODING, OPT_DATA_BLOCK_ENCODING_USAGE);
282 addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " +
283 "to tolerate before terminating all reader threads. The default is " +
284 MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
285 addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " +
286 "reads and writes for concurrent write/read workload. The default " +
287 "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
288
289 addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
290 "separate puts for every column in a row");
291 addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " +
292 "separate updates for every column in a row");
293 addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
294 addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
295
296 addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
297 addOptWithArg(OPT_START_KEY, "The first key to read/write " +
298 "(a 0-based index). The default value is " +
299 DEFAULT_START_KEY + ".");
300 addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table "
301 + "already exists");
302
303 addOptWithArg(NUM_TABLES,
304 "A positive integer number. When a number n is speicfied, load test "
305 + "tool will load n table parallely. -tn parameter value becomes "
306 + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
307
308 addOptWithArg(OPT_REGIONS_PER_SERVER,
309 "A positive integer number. When a number n is specified, load test "
310 + "tool will create the test table with n regions per server");
311
312 addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE);
313 addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
314 }
315
316 @Override
317 protected void processOptions(CommandLine cmd) {
318 this.cmd = cmd;
319
320 tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME,
321 DEFAULT_TABLE_NAME));
322
323 isWrite = cmd.hasOption(OPT_WRITE);
324 isRead = cmd.hasOption(OPT_READ);
325 isUpdate = cmd.hasOption(OPT_UPDATE);
326 isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
327 deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH);
328
329 if (!isWrite && !isRead && !isUpdate && !isInitOnly) {
330 throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
331 "-" + OPT_UPDATE + "-" + OPT_READ + " has to be specified");
332 }
333
334 if (isInitOnly && (isRead || isWrite || isUpdate)) {
335 throw new IllegalArgumentException(OPT_INIT_ONLY + " cannot be specified with"
336 + " either -" + OPT_WRITE + " or -" + OPT_UPDATE + " or -" + OPT_READ);
337 }
338
339 if (!isInitOnly) {
340 if (!cmd.hasOption(OPT_NUM_KEYS)) {
341 throw new IllegalArgumentException(OPT_NUM_KEYS + " must be specified in "
342 + "read or write mode");
343 }
344 startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
345 String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
346 long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
347 Long.MAX_VALUE - startKey);
348 endKey = startKey + numKeys;
349 isSkipInit = cmd.hasOption(OPT_SKIP_INIT);
350 System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
351 }
352
353 parseColumnFamilyOptions(cmd);
354
355 if (isWrite) {
356 String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
357
358 int colIndex = 0;
359 minColsPerKey = 1;
360 maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
361 int avgColDataSize =
362 parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
363 minColDataSize = avgColDataSize / 2;
364 maxColDataSize = avgColDataSize * 3 / 2;
365
366 if (colIndex < writeOpts.length) {
367 numWriterThreads = getNumThreads(writeOpts[colIndex++]);
368 }
369
370 isMultiPut = cmd.hasOption(OPT_MULTIPUT);
371
372 System.out.println("Multi-puts: " + isMultiPut);
373 System.out.println("Columns per key: " + minColsPerKey + ".."
374 + maxColsPerKey);
375 System.out.println("Data size per column: " + minColDataSize + ".."
376 + maxColDataSize);
377 }
378
379 if (isUpdate) {
380 String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3);
381 int colIndex = 0;
382 updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
383 if (colIndex < mutateOpts.length) {
384 numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
385 }
386 if (colIndex < mutateOpts.length) {
387 ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1;
388 }
389
390 isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
391
392 System.out.println("Batch updates: " + isBatchUpdate);
393 System.out.println("Percent of keys to update: " + updatePercent);
394 System.out.println("Updater threads: " + numUpdaterThreads);
395 System.out.println("Ignore nonce conflicts: " + ignoreConflicts);
396 }
397
398 if (isRead) {
399 String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
400 int colIndex = 0;
401 verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
402 if (colIndex < readOpts.length) {
403 numReaderThreads = getNumThreads(readOpts[colIndex++]);
404 }
405
406 if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
407 maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS),
408 0, Integer.MAX_VALUE);
409 }
410
411 if (cmd.hasOption(OPT_KEY_WINDOW)) {
412 keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW),
413 0, Integer.MAX_VALUE);
414 }
415
416 System.out.println("Percent of keys to verify: " + verifyPercent);
417 System.out.println("Reader threads: " + numReaderThreads);
418 }
419
420 numTables = 1;
421 if (cmd.hasOption(NUM_TABLES)) {
422 numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
423 }
424 regionsPerServer = HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER;
425 if (cmd.hasOption(OPT_REGIONS_PER_SERVER)) {
426 regionsPerServer = parseInt(cmd.getOptionValue(OPT_REGIONS_PER_SERVER), 1,
427 Integer.MAX_VALUE);
428 conf.setInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY, regionsPerServer);
429 }
430 System.out.println("Regions per server: " + regionsPerServer);
431 }
432
433 private void parseColumnFamilyOptions(CommandLine cmd) {
434 String dataBlockEncodingStr = cmd.getOptionValue(OPT_DATA_BLOCK_ENCODING);
435 dataBlockEncodingAlgo = dataBlockEncodingStr == null ? null :
436 DataBlockEncoding.valueOf(dataBlockEncodingStr);
437
438 String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
439 compressAlgo = compressStr == null ? Compression.Algorithm.NONE :
440 Compression.Algorithm.valueOf(compressStr);
441
442 String bloomStr = cmd.getOptionValue(OPT_BLOOM);
443 bloomType = bloomStr == null ? BloomType.ROW :
444 BloomType.valueOf(bloomStr);
445
446 inMemoryCF = cmd.hasOption(OPT_INMEMORY);
447 if (cmd.hasOption(OPT_ENCRYPTION)) {
448 cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION));
449 }
450
451 }
452
453 public void initTestTable() throws IOException {
454 HTableDescriptor desc = new HTableDescriptor(tableName);
455 if (deferredLogFlush) {
456 desc.setDurability(Durability.ASYNC_WAL);
457 }
458 HColumnDescriptor hcd = new HColumnDescriptor(COLUMN_FAMILY);
459 hcd.setDataBlockEncoding(dataBlockEncodingAlgo);
460 hcd.setCompressionType(compressAlgo);
461 HBaseTestingUtility.createPreSplitLoadTestTable(conf, desc, hcd);
462 applyColumnFamilyOptions(tableName, COLUMN_FAMILIES);
463 }
464
465 @Override
466 protected int doWork() throws IOException {
467 if (numTables > 1) {
468 return parallelLoadTables();
469 } else {
470 return loadTable();
471 }
472 }
473
474 protected int loadTable() throws IOException {
475 if (cmd.hasOption(OPT_ZK_QUORUM)) {
476 conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
477 }
478 if (cmd.hasOption(OPT_ZK_PARENT_NODE)) {
479 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE));
480 }
481
482 if (isInitOnly) {
483 LOG.info("Initializing only; no reads or writes");
484 initTestTable();
485 return 0;
486 }
487
488 if (!isSkipInit) {
489 initTestTable();
490 }
491 LoadTestDataGenerator dataGen = null;
492 if (cmd.hasOption(OPT_GENERATOR)) {
493 String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
494 dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
495 String[] args;
496 if (dataGen instanceof LoadTestDataGeneratorWithACL) {
497 LOG.info("Using LoadTestDataGeneratorWithACL");
498 if (User.isHBaseSecurityEnabled(conf)) {
499 LOG.info("Security is enabled");
500 authnFileName = clazzAndArgs[1];
501 superUser = clazzAndArgs[2];
502 userNames = clazzAndArgs[3];
503 args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length);
504 Properties authConfig = new Properties();
505 authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName));
506 try {
507 addAuthInfoToConf(authConfig, conf, superUser, userNames);
508 } catch (IOException exp) {
509 LOG.error(exp);
510 return EXIT_FAILURE;
511 }
512 userOwner = User.create(loginAndReturnUGI(conf, superUser));
513 } else {
514 superUser = clazzAndArgs[1];
515 userNames = clazzAndArgs[2];
516 args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
517 userOwner = User.createUserForTesting(conf, superUser, new String[0]);
518 }
519 } else {
520 args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, 1,
521 clazzAndArgs.length);
522 }
523 dataGen.initialize(args);
524 } else {
525
526 dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
527 minColsPerKey, maxColsPerKey, COLUMN_FAMILY);
528 }
529
530 if (userOwner != null) {
531 LOG.info("Granting permissions for user " + userOwner.getShortName());
532 AccessControlProtos.Permission.Action[] actions = {
533 AccessControlProtos.Permission.Action.ADMIN, AccessControlProtos.Permission.Action.CREATE,
534 AccessControlProtos.Permission.Action.READ, AccessControlProtos.Permission.Action.WRITE };
535 try {
536 AccessControlClient.grant(conf, tableName, userOwner.getShortName(), null, null, actions);
537 } catch (Throwable e) {
538 LOG.fatal("Error in granting permission for the user " + userOwner.getShortName(), e);
539 return EXIT_FAILURE;
540 }
541 }
542 if (userNames != null) {
543
544 String users[] = userNames.split(",");
545 User user = null;
546 for (String userStr : users) {
547 if (User.isHBaseSecurityEnabled(conf)) {
548 user = User.create(loginAndReturnUGI(conf, userStr));
549 } else {
550 user = User.createUserForTesting(conf, userStr, new String[0]);
551 }
552 }
553 }
554
555 if (isWrite) {
556 if (userOwner != null) {
557 writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
558 } else {
559 writerThreads = new MultiThreadedWriter(dataGen, conf, tableName);
560 }
561 writerThreads.setMultiPut(isMultiPut);
562 }
563
564 if (isUpdate) {
565 if (userOwner != null) {
566 updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
567 userOwner, userNames);
568 } else {
569 updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent);
570 }
571 updaterThreads.setBatchUpdate(isBatchUpdate);
572 updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
573 }
574
575 if (isRead) {
576 if (userOwner != null) {
577 readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent,
578 userNames);
579 } else {
580 readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
581 }
582 readerThreads.setMaxErrors(maxReadErrors);
583 readerThreads.setKeyWindow(keyWindow);
584 }
585
586 if (isUpdate && isWrite) {
587 LOG.info("Concurrent write/update workload: making updaters aware of the " +
588 "write point");
589 updaterThreads.linkToWriter(writerThreads);
590 }
591
592 if (isRead && (isUpdate || isWrite)) {
593 LOG.info("Concurrent write/read workload: making readers aware of the " +
594 "write point");
595 readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
596 }
597
598 if (isWrite) {
599 System.out.println("Starting to write data...");
600 writerThreads.start(startKey, endKey, numWriterThreads);
601 }
602
603 if (isUpdate) {
604 LOG.info("Starting to mutate data...");
605 System.out.println("Starting to mutate data...");
606
607
608 updaterThreads.start(startKey, endKey, numUpdaterThreads);
609 }
610
611 if (isRead) {
612 System.out.println("Starting to read data...");
613 readerThreads.start(startKey, endKey, numReaderThreads);
614 }
615
616 if (isWrite) {
617 writerThreads.waitForFinish();
618 }
619
620 if (isUpdate) {
621 updaterThreads.waitForFinish();
622 }
623
624 if (isRead) {
625 readerThreads.waitForFinish();
626 }
627
628 boolean success = true;
629 if (isWrite) {
630 success = success && writerThreads.getNumWriteFailures() == 0;
631 }
632 if (isUpdate) {
633 success = success && updaterThreads.getNumWriteFailures() == 0;
634 }
635 if (isRead) {
636 success = success && readerThreads.getNumReadErrors() == 0
637 && readerThreads.getNumReadFailures() == 0;
638 }
639 return success ? EXIT_SUCCESS : EXIT_FAILURE;
640 }
641
642 private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException {
643 try {
644 Class<?> clazz = Class.forName(clazzName);
645 Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class,
646 byte[][].class);
647 return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
648 minColsPerKey, maxColsPerKey, COLUMN_FAMILIES);
649 } catch (Exception e) {
650 throw new IOException(e);
651 }
652 }
653
654 public static byte[] generateData(final Random r, int length) {
655 byte [] b = new byte [length];
656 int i = 0;
657
658 for(i = 0; i < (length-8); i += 8) {
659 b[i] = (byte) (65 + r.nextInt(26));
660 b[i+1] = b[i];
661 b[i+2] = b[i];
662 b[i+3] = b[i];
663 b[i+4] = b[i];
664 b[i+5] = b[i];
665 b[i+6] = b[i];
666 b[i+7] = b[i];
667 }
668
669 byte a = (byte) (65 + r.nextInt(26));
670 for(; i < length; i++) {
671 b[i] = a;
672 }
673 return b;
674 }
675 public static void main(String[] args) {
676 new LoadTestTool().doStaticMain(args);
677 }
678
679
680
681
682
683
684
685
686
687 private int parallelLoadTables()
688 throws IOException {
689
690 String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
691 String[] newArgs = null;
692 if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
693 newArgs = new String[cmdLineArgs.length + 2];
694 newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
695 newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME;
696 for (int i = 0; i < cmdLineArgs.length; i++) {
697 newArgs[i + 2] = cmdLineArgs[i];
698 }
699 } else {
700 newArgs = cmdLineArgs;
701 }
702
703 int tableNameValueIndex = -1;
704 for (int j = 0; j < newArgs.length; j++) {
705 if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
706 tableNameValueIndex = j + 1;
707 } else if (newArgs[j].endsWith(NUM_TABLES)) {
708
709 newArgs[j + 1] = "1";
710 }
711 }
712
713
714 List<WorkerThread> workers = new ArrayList<WorkerThread>();
715 for (int i = 0; i < numTables; i++) {
716 String[] workerArgs = newArgs.clone();
717 workerArgs[tableNameValueIndex] = tableName + "_" + (i+1);
718 WorkerThread worker = new WorkerThread(i, workerArgs);
719 workers.add(worker);
720 LOG.info(worker + " starting");
721 worker.start();
722 }
723
724
725 LOG.info("Waiting for worker threads to finish");
726 for (WorkerThread t : workers) {
727 try {
728 t.join();
729 } catch (InterruptedException ie) {
730 IOException iie = new InterruptedIOException();
731 iie.initCause(ie);
732 throw iie;
733 }
734 checkForErrors();
735 }
736
737 return EXIT_SUCCESS;
738 }
739
740
741
742 protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
743
744 private void workerThreadError(Throwable t) {
745 thrown.compareAndSet(null, t);
746 }
747
748
749
750
751 private void checkForErrors() throws IOException {
752 Throwable thrown = this.thrown.get();
753 if (thrown == null) return;
754 if (thrown instanceof IOException) {
755 throw (IOException) thrown;
756 } else {
757 throw new RuntimeException(thrown);
758 }
759 }
760
761 class WorkerThread extends Thread {
762 private String[] workerArgs;
763
764 WorkerThread(int i, String[] args) {
765 super("WorkerThread-" + i);
766 workerArgs = args;
767 }
768
769 @Override
770 public void run() {
771 try {
772 int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
773 if (ret != 0) {
774 throw new RuntimeException("LoadTestTool exit with non-zero return code.");
775 }
776 } catch (Exception ex) {
777 LOG.error("Error in worker thread", ex);
778 workerThreadError(ex);
779 }
780 }
781 }
782
783 private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner,
784 String userList) throws IOException {
785 List<String> users = Arrays.asList(userList.split(","));
786 users.add(owner);
787 for (String user : users) {
788 String keyTabFileConfKey = "hbase." + user + ".keytab.file";
789 String principalConfKey = "hbase." + user + ".kerberos.principal";
790 if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) {
791 throw new IOException("Authentication configs missing for user : " + user);
792 }
793 }
794 for (String key : authConfig.stringPropertyNames()) {
795 conf.set(key, authConfig.getProperty(key));
796 }
797 LOG.debug("Added authentication properties to config successfully.");
798 }
799
800 public static UserGroupInformation loginAndReturnUGI(Configuration conf, String username)
801 throws IOException {
802 String hostname = InetAddress.getLocalHost().getHostName();
803 String keyTabFileConfKey = "hbase." + username + ".keytab.file";
804 String keyTabFileLocation = conf.get(keyTabFileConfKey);
805 String principalConfKey = "hbase." + username + ".kerberos.principal";
806 String principal = SecurityUtil.getServerPrincipal(conf.get(principalConfKey), hostname);
807 if (keyTabFileLocation == null || principal == null) {
808 LOG.warn("Principal or key tab file null for : " + principalConfKey + ", "
809 + keyTabFileConfKey);
810 }
811 UserGroupInformation ugi =
812 UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFileLocation);
813 return ugi;
814 }
815 }