View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.util;
18  
19  import java.io.IOException;
20  import java.io.InterruptedIOException;
21  import java.lang.reflect.Constructor;
22  import java.net.InetAddress;
23  import java.security.SecureRandom;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.List;
27  import java.util.Properties;
28  import java.util.Random;
29  import java.util.concurrent.atomic.AtomicReference;
30  
31  import javax.crypto.spec.SecretKeySpec;
32  
33  import org.apache.commons.cli.CommandLine;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.HBaseConfiguration;
38  import org.apache.hadoop.hbase.HBaseTestingUtility;
39  import org.apache.hadoop.hbase.HColumnDescriptor;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.client.Durability;
44  import org.apache.hadoop.hbase.client.HBaseAdmin;
45  import org.apache.hadoop.hbase.io.compress.Compression;
46  import org.apache.hadoop.hbase.io.crypto.Cipher;
47  import org.apache.hadoop.hbase.io.crypto.Encryption;
48  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
49  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
50  import org.apache.hadoop.hbase.regionserver.BloomType;
51  import org.apache.hadoop.hbase.security.EncryptionUtil;
52  import org.apache.hadoop.hbase.security.User;
53  import org.apache.hadoop.hbase.security.access.AccessControlClient;
54  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
55  import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL;
56  import org.apache.hadoop.security.SecurityUtil;
57  import org.apache.hadoop.security.UserGroupInformation;
58  import org.apache.hadoop.util.ToolRunner;
59  
60  /**
61   * A command-line utility that reads, writes, and verifies data. Unlike
62   * {@link PerformanceEvaluation}, this tool validates the data written,
63   * and supports simultaneously writing and reading the same set of keys.
64   */
65  public class LoadTestTool extends AbstractHBaseTool {
66  
67    private static final Log LOG = LogFactory.getLog(LoadTestTool.class);
68    private static final String COLON = ":";
69  
70    /** Table name for the test */
71    private TableName tableName;
72  
73    /** Table name to use of not overridden on the command line */
74    protected static final String DEFAULT_TABLE_NAME = "cluster_test";
75  
76    /** Column family used by the test */
77    public static byte[] COLUMN_FAMILY = Bytes.toBytes("test_cf");
78  
79    /** Column families used by the test */
80    protected static final byte[][] COLUMN_FAMILIES = { COLUMN_FAMILY };
81  
82    /** The default data size if not specified */
83    protected static final int DEFAULT_DATA_SIZE = 64;
84  
85    /** The number of reader/writer threads if not specified */
86    protected static final int DEFAULT_NUM_THREADS = 20;
87  
88    /** Usage string for the load option */
89    protected static final String OPT_USAGE_LOAD =
90        "<avg_cols_per_key>:<avg_data_size>" +
91        "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
92  
93    /** Usage string for the read option */
94    protected static final String OPT_USAGE_READ =
95        "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
96  
97    /** Usage string for the update option */
98    protected static final String OPT_USAGE_UPDATE =
99        "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS
100       + ">][:<#whether to ignore nonce collisions=0>]";
101 
102   protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
103       Arrays.toString(BloomType.values());
104 
105   protected static final String OPT_USAGE_COMPRESSION = "Compression type, " +
106       "one of " + Arrays.toString(Compression.Algorithm.values());
107 
108   public static final String OPT_DATA_BLOCK_ENCODING_USAGE =
109     "Encoding algorithm (e.g. prefix "
110         + "compression) to use for data blocks in the test column family, "
111         + "one of " + Arrays.toString(DataBlockEncoding.values()) + ".";
112 
113   private static final String OPT_BLOOM = "bloom";
114   private static final String OPT_COMPRESSION = "compression";
115   private static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
116   public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush.";
117   public static final String OPT_DATA_BLOCK_ENCODING =
118       HColumnDescriptor.DATA_BLOCK_ENCODING.toLowerCase();
119 
120   public static final String OPT_INMEMORY = "in_memory";
121   public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " +
122   		"inmemory as far as possible.  Not guaranteed that reads are always served from inmemory";
123 
124   public static final String OPT_GENERATOR = "generator";
125   public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
126       + " Any args for this class can be passed as colon separated after class name";
127 
128   protected static final String OPT_KEY_WINDOW = "key_window";
129   protected static final String OPT_WRITE = "write";
130   protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
131   protected static final String OPT_MULTIPUT = "multiput";
132   protected static final String OPT_NUM_KEYS = "num_keys";
133   protected static final String OPT_READ = "read";
134   protected static final String OPT_START_KEY = "start_key";
135   protected static final String OPT_TABLE_NAME = "tn";
136   protected static final String OPT_ZK_QUORUM = "zk";
137   protected static final String OPT_ZK_PARENT_NODE = "zk_root";
138   protected static final String OPT_SKIP_INIT = "skip_init";
139   protected static final String OPT_INIT_ONLY = "init_only";
140   protected static final String NUM_TABLES = "num_tables";
141   protected static final String OPT_REGIONS_PER_SERVER = "regions_per_server";
142   protected static final String OPT_BATCHUPDATE = "batchupdate";
143   protected static final String OPT_UPDATE = "update";
144 
145   protected static final String OPT_ENCRYPTION = "encryption";
146   protected static final String OPT_ENCRYPTION_USAGE =
147     "Enables transparent encryption on the test table, one of " +
148     Arrays.toString(Encryption.getSupportedCiphers());
149 
150   protected static final long DEFAULT_START_KEY = 0;
151 
152   /** This will be removed as we factor out the dependency on command line */
153   protected CommandLine cmd;
154 
155   protected MultiThreadedWriter writerThreads = null;
156   protected MultiThreadedReader readerThreads = null;
157   protected MultiThreadedUpdater updaterThreads = null;
158 
159   protected long startKey, endKey;
160 
161   protected boolean isWrite, isRead, isUpdate;
162   protected boolean deferredLogFlush;
163 
164   // Column family options
165   protected DataBlockEncoding dataBlockEncodingAlgo;
166   protected Compression.Algorithm compressAlgo;
167   protected BloomType bloomType;
168   private boolean inMemoryCF;
169 
170   private User userOwner;
171   // Writer options
172   protected int numWriterThreads = DEFAULT_NUM_THREADS;
173   protected int minColsPerKey, maxColsPerKey;
174   protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE;
175   protected boolean isMultiPut;
176 
177   // Updater options
178   protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
179   protected int updatePercent;
180   protected boolean ignoreConflicts = false;
181   protected boolean isBatchUpdate;
182 
183   // Reader options
184   private int numReaderThreads = DEFAULT_NUM_THREADS;
185   private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
186   private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
187   private int verifyPercent;
188 
189   private int numTables = 1;
190   private int regionsPerServer = HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER;
191 
192   private String superUser;
193 
194   private String userNames;
195   //This file is used to read authentication information in secure clusters.
196   private String authnFileName;
197 
198   // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
199   //       console tool itself should only be used from console.
200   protected boolean isSkipInit = false;
201   protected boolean isInitOnly = false;
202 
203   protected Cipher cipher = null;
204 
205   protected String[] splitColonSeparated(String option,
206       int minNumCols, int maxNumCols) {
207     String optVal = cmd.getOptionValue(option);
208     String[] cols = optVal.split(COLON);
209     if (cols.length < minNumCols || cols.length > maxNumCols) {
210       throw new IllegalArgumentException("Expected at least "
211           + minNumCols + " columns but no more than " + maxNumCols +
212           " in the colon-separated value '" + optVal + "' of the " +
213           "-" + option + " option");
214     }
215     return cols;
216   }
217 
218   protected int getNumThreads(String numThreadsStr) {
219     return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
220   }
221 
222   /**
223    * Apply column family options such as Bloom filters, compression, and data
224    * block encoding.
225    */
226   protected void applyColumnFamilyOptions(TableName tableName,
227       byte[][] columnFamilies) throws IOException {
228     HBaseAdmin admin = new HBaseAdmin(conf);
229     HTableDescriptor tableDesc = admin.getTableDescriptor(tableName);
230     LOG.info("Disabling table " + tableName);
231     admin.disableTable(tableName);
232     for (byte[] cf : columnFamilies) {
233       HColumnDescriptor columnDesc = tableDesc.getFamily(cf);
234       boolean isNewCf = columnDesc == null;
235       if (isNewCf) {
236         columnDesc = new HColumnDescriptor(cf);
237       }
238       if (bloomType != null) {
239         columnDesc.setBloomFilterType(bloomType);
240       }
241       if (compressAlgo != null) {
242         columnDesc.setCompressionType(compressAlgo);
243       }
244       if (dataBlockEncodingAlgo != null) {
245         columnDesc.setDataBlockEncoding(dataBlockEncodingAlgo);
246       }
247       if (inMemoryCF) {
248         columnDesc.setInMemory(inMemoryCF);
249       }
250       if (cipher != null) {
251         byte[] keyBytes = new byte[cipher.getKeyLength()];
252         new SecureRandom().nextBytes(keyBytes);
253         columnDesc.setEncryptionType(cipher.getName());
254         columnDesc.setEncryptionKey(EncryptionUtil.wrapKey(conf,
255           User.getCurrent().getShortName(),
256           new SecretKeySpec(keyBytes, cipher.getName())));
257       }
258       if (isNewCf) {
259         admin.addColumn(tableName, columnDesc);
260       } else {
261         admin.modifyColumn(tableName, columnDesc);
262       }
263     }
264     LOG.info("Enabling table " + tableName);
265     admin.enableTable(tableName);
266     admin.close();
267   }
268 
269   @Override
270   protected void addOptions() {
271     addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " +
272         "without port numbers");
273     addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
274     addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
275     addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
276     addOptWithArg(OPT_READ, OPT_USAGE_READ);
277     addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
278     addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
279     addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
280     addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
281     addOptWithArg(OPT_DATA_BLOCK_ENCODING, OPT_DATA_BLOCK_ENCODING_USAGE);
282     addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " +
283         "to tolerate before terminating all reader threads. The default is " +
284         MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
285     addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " +
286         "reads and writes for concurrent write/read workload. The default " +
287         "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
288 
289     addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
290         "separate puts for every column in a row");
291     addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " +
292         "separate updates for every column in a row");
293     addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
294     addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
295 
296     addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
297     addOptWithArg(OPT_START_KEY, "The first key to read/write " +
298         "(a 0-based index). The default value is " +
299         DEFAULT_START_KEY + ".");
300     addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table "
301         + "already exists");
302 
303     addOptWithArg(NUM_TABLES,
304       "A positive integer number. When a number n is speicfied, load test "
305           + "tool  will load n table parallely. -tn parameter value becomes "
306           + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
307 
308     addOptWithArg(OPT_REGIONS_PER_SERVER,
309       "A positive integer number. When a number n is specified, load test "
310           + "tool will create the test table with n regions per server");
311 
312     addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE);
313     addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
314   }
315 
316   @Override
317   protected void processOptions(CommandLine cmd) {
318     this.cmd = cmd;
319 
320     tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME,
321         DEFAULT_TABLE_NAME));
322 
323     isWrite = cmd.hasOption(OPT_WRITE);
324     isRead = cmd.hasOption(OPT_READ);
325     isUpdate = cmd.hasOption(OPT_UPDATE);
326     isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
327     deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH);
328 
329     if (!isWrite && !isRead && !isUpdate && !isInitOnly) {
330       throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
331         "-" + OPT_UPDATE + "-" + OPT_READ + " has to be specified");
332     }
333 
334     if (isInitOnly && (isRead || isWrite || isUpdate)) {
335       throw new IllegalArgumentException(OPT_INIT_ONLY + " cannot be specified with"
336           + " either -" + OPT_WRITE + " or -" + OPT_UPDATE + " or -" + OPT_READ);
337     }
338 
339     if (!isInitOnly) {
340       if (!cmd.hasOption(OPT_NUM_KEYS)) {
341         throw new IllegalArgumentException(OPT_NUM_KEYS + " must be specified in "
342             + "read or write mode");
343       }
344       startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
345           String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
346       long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
347           Long.MAX_VALUE - startKey);
348       endKey = startKey + numKeys;
349       isSkipInit = cmd.hasOption(OPT_SKIP_INIT);
350       System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
351     }
352 
353     parseColumnFamilyOptions(cmd);
354 
355     if (isWrite) {
356       String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
357 
358       int colIndex = 0;
359       minColsPerKey = 1;
360       maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
361       int avgColDataSize =
362           parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
363       minColDataSize = avgColDataSize / 2;
364       maxColDataSize = avgColDataSize * 3 / 2;
365 
366       if (colIndex < writeOpts.length) {
367         numWriterThreads = getNumThreads(writeOpts[colIndex++]);
368       }
369 
370       isMultiPut = cmd.hasOption(OPT_MULTIPUT);
371 
372       System.out.println("Multi-puts: " + isMultiPut);
373       System.out.println("Columns per key: " + minColsPerKey + ".."
374           + maxColsPerKey);
375       System.out.println("Data size per column: " + minColDataSize + ".."
376           + maxColDataSize);
377     }
378 
379     if (isUpdate) {
380       String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3);
381       int colIndex = 0;
382       updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
383       if (colIndex < mutateOpts.length) {
384         numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
385       }
386       if (colIndex < mutateOpts.length) {
387         ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1;
388       }
389 
390       isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
391 
392       System.out.println("Batch updates: " + isBatchUpdate);
393       System.out.println("Percent of keys to update: " + updatePercent);
394       System.out.println("Updater threads: " + numUpdaterThreads);
395       System.out.println("Ignore nonce conflicts: " + ignoreConflicts);
396     }
397 
398     if (isRead) {
399       String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
400       int colIndex = 0;
401       verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
402       if (colIndex < readOpts.length) {
403         numReaderThreads = getNumThreads(readOpts[colIndex++]);
404       }
405 
406       if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
407         maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS),
408             0, Integer.MAX_VALUE);
409       }
410 
411       if (cmd.hasOption(OPT_KEY_WINDOW)) {
412         keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW),
413             0, Integer.MAX_VALUE);
414       }
415 
416       System.out.println("Percent of keys to verify: " + verifyPercent);
417       System.out.println("Reader threads: " + numReaderThreads);
418     }
419 
420     numTables = 1;
421     if (cmd.hasOption(NUM_TABLES)) {
422       numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
423     }
424     regionsPerServer = HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER;
425     if (cmd.hasOption(OPT_REGIONS_PER_SERVER)) {
426       regionsPerServer = parseInt(cmd.getOptionValue(OPT_REGIONS_PER_SERVER), 1,
427         Integer.MAX_VALUE);
428       conf.setInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY, regionsPerServer);
429     }
430     System.out.println("Regions per server: " + regionsPerServer);
431   }
432 
433   private void parseColumnFamilyOptions(CommandLine cmd) {
434     String dataBlockEncodingStr = cmd.getOptionValue(OPT_DATA_BLOCK_ENCODING);
435     dataBlockEncodingAlgo = dataBlockEncodingStr == null ? null :
436         DataBlockEncoding.valueOf(dataBlockEncodingStr);
437 
438     String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
439     compressAlgo = compressStr == null ? Compression.Algorithm.NONE :
440         Compression.Algorithm.valueOf(compressStr);
441 
442     String bloomStr = cmd.getOptionValue(OPT_BLOOM);
443     bloomType = bloomStr == null ? BloomType.ROW :
444         BloomType.valueOf(bloomStr);
445 
446     inMemoryCF = cmd.hasOption(OPT_INMEMORY);
447     if (cmd.hasOption(OPT_ENCRYPTION)) {
448       cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION));
449     }
450 
451   }
452 
453   public void initTestTable() throws IOException {
454     HTableDescriptor desc = new HTableDescriptor(tableName);
455     if (deferredLogFlush) {
456       desc.setDurability(Durability.ASYNC_WAL);
457     }
458     HColumnDescriptor hcd = new HColumnDescriptor(COLUMN_FAMILY);
459     hcd.setDataBlockEncoding(dataBlockEncodingAlgo);
460     hcd.setCompressionType(compressAlgo);
461     HBaseTestingUtility.createPreSplitLoadTestTable(conf, desc, hcd);
462     applyColumnFamilyOptions(tableName, COLUMN_FAMILIES);
463   }
464 
465   @Override
466   protected int doWork() throws IOException {
467     if (numTables > 1) {
468       return parallelLoadTables();
469     } else {
470       return loadTable();
471     }
472   }
473 
474   protected int loadTable() throws IOException {
475     if (cmd.hasOption(OPT_ZK_QUORUM)) {
476       conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
477     }
478     if (cmd.hasOption(OPT_ZK_PARENT_NODE)) {
479       conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE));
480     }
481 
482     if (isInitOnly) {
483       LOG.info("Initializing only; no reads or writes");
484       initTestTable();
485       return 0;
486     }
487 
488     if (!isSkipInit) {
489       initTestTable();
490     }
491     LoadTestDataGenerator dataGen = null;
492     if (cmd.hasOption(OPT_GENERATOR)) {
493       String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
494       dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
495       String[] args;
496       if (dataGen instanceof LoadTestDataGeneratorWithACL) {
497         LOG.info("Using LoadTestDataGeneratorWithACL");
498         if (User.isHBaseSecurityEnabled(conf)) {
499           LOG.info("Security is enabled");
500           authnFileName = clazzAndArgs[1];
501           superUser = clazzAndArgs[2];
502           userNames = clazzAndArgs[3];
503           args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length);
504           Properties authConfig = new Properties();
505           authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName));
506           try {
507             addAuthInfoToConf(authConfig, conf, superUser, userNames);
508           } catch (IOException exp) {
509             LOG.error(exp);
510             return EXIT_FAILURE;
511           }
512           userOwner = User.create(loginAndReturnUGI(conf, superUser));
513         } else {
514           superUser = clazzAndArgs[1];
515           userNames = clazzAndArgs[2];
516           args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
517           userOwner = User.createUserForTesting(conf, superUser, new String[0]);
518         }
519       } else {
520         args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, 1,
521             clazzAndArgs.length);
522       }
523       dataGen.initialize(args);
524     } else {
525       // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
526       dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
527           minColsPerKey, maxColsPerKey, COLUMN_FAMILY);
528     }
529 
530     if (userOwner != null) {
531       LOG.info("Granting permissions for user " + userOwner.getShortName());
532       AccessControlProtos.Permission.Action[] actions = {
533         AccessControlProtos.Permission.Action.ADMIN, AccessControlProtos.Permission.Action.CREATE,
534         AccessControlProtos.Permission.Action.READ, AccessControlProtos.Permission.Action.WRITE };
535       try {
536         AccessControlClient.grant(conf, tableName, userOwner.getShortName(), null, null, actions);
537       } catch (Throwable e) {
538         LOG.fatal("Error in granting permission for the user " + userOwner.getShortName(), e);
539         return EXIT_FAILURE;
540       }
541     }
542     if (userNames != null) {
543       // This will be comma separated list of expressions.
544       String users[] = userNames.split(",");
545       User user = null;
546       for (String userStr : users) {
547         if (User.isHBaseSecurityEnabled(conf)) {
548           user = User.create(loginAndReturnUGI(conf, userStr));
549         } else {
550           user = User.createUserForTesting(conf, userStr, new String[0]);
551         }
552       }
553     }
554 
555     if (isWrite) {
556       if (userOwner != null) {
557         writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
558       } else {
559         writerThreads = new MultiThreadedWriter(dataGen, conf, tableName);
560       }
561       writerThreads.setMultiPut(isMultiPut);
562     }
563 
564     if (isUpdate) {
565       if (userOwner != null) {
566         updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
567             userOwner, userNames);
568       } else {
569         updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent);
570       }
571       updaterThreads.setBatchUpdate(isBatchUpdate);
572       updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
573     }
574 
575     if (isRead) {
576       if (userOwner != null) {
577         readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent,
578             userNames);
579       } else {
580         readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
581       }
582       readerThreads.setMaxErrors(maxReadErrors);
583       readerThreads.setKeyWindow(keyWindow);
584     }
585 
586     if (isUpdate && isWrite) {
587       LOG.info("Concurrent write/update workload: making updaters aware of the " +
588         "write point");
589       updaterThreads.linkToWriter(writerThreads);
590     }
591 
592     if (isRead && (isUpdate || isWrite)) {
593       LOG.info("Concurrent write/read workload: making readers aware of the " +
594         "write point");
595       readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
596     }
597 
598     if (isWrite) {
599       System.out.println("Starting to write data...");
600       writerThreads.start(startKey, endKey, numWriterThreads);
601     }
602 
603     if (isUpdate) {
604       LOG.info("Starting to mutate data...");
605       System.out.println("Starting to mutate data...");
606       // TODO : currently append and increment operations not tested with tags
607       // Will update this aftet it is done
608       updaterThreads.start(startKey, endKey, numUpdaterThreads);
609     }
610 
611     if (isRead) {
612       System.out.println("Starting to read data...");
613       readerThreads.start(startKey, endKey, numReaderThreads);
614     }
615 
616     if (isWrite) {
617       writerThreads.waitForFinish();
618     }
619 
620     if (isUpdate) {
621       updaterThreads.waitForFinish();
622     }
623 
624     if (isRead) {
625       readerThreads.waitForFinish();
626     }
627 
628     boolean success = true;
629     if (isWrite) {
630       success = success && writerThreads.getNumWriteFailures() == 0;
631     }
632     if (isUpdate) {
633       success = success && updaterThreads.getNumWriteFailures() == 0;
634     }
635     if (isRead) {
636       success = success && readerThreads.getNumReadErrors() == 0
637           && readerThreads.getNumReadFailures() == 0;
638     }
639     return success ? EXIT_SUCCESS : EXIT_FAILURE;
640   }
641 
642   private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException {
643     try {
644       Class<?> clazz = Class.forName(clazzName);
645       Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class,
646           byte[][].class);
647       return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
648           minColsPerKey, maxColsPerKey, COLUMN_FAMILIES);
649     } catch (Exception e) {
650       throw new IOException(e);
651     }
652   }
653 
654   public static byte[] generateData(final Random r, int length) {
655     byte [] b = new byte [length];
656     int i = 0;
657 
658     for(i = 0; i < (length-8); i += 8) {
659       b[i] = (byte) (65 + r.nextInt(26));
660       b[i+1] = b[i];
661       b[i+2] = b[i];
662       b[i+3] = b[i];
663       b[i+4] = b[i];
664       b[i+5] = b[i];
665       b[i+6] = b[i];
666       b[i+7] = b[i];
667     }
668 
669     byte a = (byte) (65 + r.nextInt(26));
670     for(; i < length; i++) {
671       b[i] = a;
672     }
673     return b;
674   }
675   public static void main(String[] args) {
676     new LoadTestTool().doStaticMain(args);
677   }
678 
679   /**
680    * When NUM_TABLES is specified, the function starts multiple worker threads
681    * which individually start a LoadTestTool instance to load a table. Each
682    * table name is in format <tn>_<index>. For example, "-tn test -num_tables 2"
683    * , table names will be "test_1", "test_2"
684    *
685    * @throws IOException
686    */
687   private int parallelLoadTables()
688       throws IOException {
689     // create new command args
690     String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
691     String[] newArgs = null;
692     if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
693       newArgs = new String[cmdLineArgs.length + 2];
694       newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
695       newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME;
696       for (int i = 0; i < cmdLineArgs.length; i++) {
697         newArgs[i + 2] = cmdLineArgs[i];
698       }
699     } else {
700       newArgs = cmdLineArgs;
701     }
702 
703     int tableNameValueIndex = -1;
704     for (int j = 0; j < newArgs.length; j++) {
705       if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
706         tableNameValueIndex = j + 1;
707       } else if (newArgs[j].endsWith(NUM_TABLES)) {
708         // change NUM_TABLES to 1 so that each worker loads one table
709         newArgs[j + 1] = "1";
710       }
711     }
712 
713     // starting to load multiple tables
714     List<WorkerThread> workers = new ArrayList<WorkerThread>();
715     for (int i = 0; i < numTables; i++) {
716       String[] workerArgs = newArgs.clone();
717       workerArgs[tableNameValueIndex] = tableName + "_" + (i+1);
718       WorkerThread worker = new WorkerThread(i, workerArgs);
719       workers.add(worker);
720       LOG.info(worker + " starting");
721       worker.start();
722     }
723 
724     // wait for all workers finish
725     LOG.info("Waiting for worker threads to finish");
726     for (WorkerThread t : workers) {
727       try {
728         t.join();
729       } catch (InterruptedException ie) {
730         IOException iie = new InterruptedIOException();
731         iie.initCause(ie);
732         throw iie;
733       }
734       checkForErrors();
735     }
736 
737     return EXIT_SUCCESS;
738   }
739 
740   // If an exception is thrown by one of worker threads, it will be
741   // stored here.
742   protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
743 
744   private void workerThreadError(Throwable t) {
745     thrown.compareAndSet(null, t);
746   }
747 
748   /**
749    * Check for errors in the writer threads. If any is found, rethrow it.
750    */
751   private void checkForErrors() throws IOException {
752     Throwable thrown = this.thrown.get();
753     if (thrown == null) return;
754     if (thrown instanceof IOException) {
755       throw (IOException) thrown;
756     } else {
757       throw new RuntimeException(thrown);
758     }
759   }
760 
761   class WorkerThread extends Thread {
762     private String[] workerArgs;
763 
764     WorkerThread(int i, String[] args) {
765       super("WorkerThread-" + i);
766       workerArgs = args;
767     }
768 
769     @Override
770     public void run() {
771       try {
772         int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
773         if (ret != 0) {
774           throw new RuntimeException("LoadTestTool exit with non-zero return code.");
775         }
776       } catch (Exception ex) {
777         LOG.error("Error in worker thread", ex);
778         workerThreadError(ex);
779       }
780     }
781   }
782 
783   private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner,
784       String userList) throws IOException {
785     List<String> users = Arrays.asList(userList.split(","));
786     users.add(owner);
787     for (String user : users) {
788       String keyTabFileConfKey = "hbase." + user + ".keytab.file";
789       String principalConfKey = "hbase." + user + ".kerberos.principal";
790       if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) {
791         throw new IOException("Authentication configs missing for user : " + user);
792       }
793     }
794     for (String key : authConfig.stringPropertyNames()) {
795       conf.set(key, authConfig.getProperty(key));
796     }
797     LOG.debug("Added authentication properties to config successfully.");
798   }
799 
800   public static UserGroupInformation loginAndReturnUGI(Configuration conf, String username)
801       throws IOException {
802     String hostname = InetAddress.getLocalHost().getHostName();
803     String keyTabFileConfKey = "hbase." + username + ".keytab.file";
804     String keyTabFileLocation = conf.get(keyTabFileConfKey);
805     String principalConfKey = "hbase." + username + ".kerberos.principal";
806     String principal = SecurityUtil.getServerPrincipal(conf.get(principalConfKey), hostname);
807     if (keyTabFileLocation == null || principal == null) {
808       LOG.warn("Principal or key tab file null for : " + principalConfKey + ", "
809           + keyTabFileConfKey);
810     }
811     UserGroupInformation ugi =
812         UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFileLocation);
813     return ugi;
814   }
815 }