View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.Set;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.testclassification.IntegrationTests;
30  import org.apache.hadoop.hbase.util.Bytes;
31  import org.apache.hadoop.hbase.util.LoadTestTool;
32  import org.apache.hadoop.hbase.util.Threads;
33  import org.apache.hadoop.util.StringUtils;
34  import org.apache.hadoop.util.ToolRunner;
35  import org.junit.Assert;
36  import org.junit.Test;
37  import org.junit.experimental.categories.Category;
38  
39  import com.google.common.collect.Sets;
40  
41  /**
42   * A base class for tests that do something with the cluster while running
43   * {@link LoadTestTool} to write and verify some data.
44   */
45  @Category(IntegrationTests.class)
46  public class IntegrationTestIngest extends IntegrationTestBase {
47    public static final char HIPHEN = '-';
48    private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
49    protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
50    protected static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
51  
52    /** A soft limit on how long we should run */
53    protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
54  
55    protected static final String NUM_KEYS_PER_SERVER_KEY = "num_keys_per_server";
56    protected static final long DEFAULT_NUM_KEYS_PER_SERVER = 2500;
57  
58    protected static final String NUM_WRITE_THREADS_KEY = "num_write_threads";
59    protected static final int DEFAULT_NUM_WRITE_THREADS = 20;
60  
61    protected static final String NUM_READ_THREADS_KEY = "num_read_threads";
62    protected static final int DEFAULT_NUM_READ_THREADS = 20;
63  
64    // Log is being used in IntegrationTestIngestWithEncryption, hence it is protected
65    protected static final Log LOG = LogFactory.getLog(IntegrationTestIngest.class);
66    protected IntegrationTestingUtility util;
67    protected HBaseCluster cluster;
68    protected LoadTestTool loadTool;
69  
70    protected String[] LOAD_TEST_TOOL_INIT_ARGS = {
71        LoadTestTool.OPT_COLUMN_FAMILIES,
72        LoadTestTool.OPT_COMPRESSION,
73        LoadTestTool.OPT_DATA_BLOCK_ENCODING,
74        LoadTestTool.OPT_INMEMORY,
75        LoadTestTool.OPT_ENCRYPTION,
76        LoadTestTool.OPT_NUM_REGIONS_PER_SERVER,
77        LoadTestTool.OPT_REGION_REPLICATION,
78    };
79  
80    @Override
81    public void setUpCluster() throws Exception {
82      util = getTestingUtil(getConf());
83      LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
84      util.initializeCluster(getMinServerCount());
85      LOG.debug("Done initializing/checking cluster");
86      cluster = util.getHBaseClusterInterface();
87      deleteTableIfNecessary();
88      loadTool = new LoadTestTool();
89      loadTool.setConf(util.getConfiguration());
90      // Initialize load test tool before we start breaking things;
91      // LoadTestTool init, even when it is a no-op, is very fragile.
92      initTable();
93    }
94  
95    protected int getMinServerCount() {
96      return SERVER_COUNT;
97    }
98  
99    protected void initTable() throws IOException {
100     int ret = loadTool.run(getArgsForLoadTestToolInitTable());
101     Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
102   }
103 
104   @Override
105   public int runTestFromCommandLine() throws Exception {
106     internalRunIngestTest(DEFAULT_RUN_TIME);
107     return 0;
108   }
109 
110   @Test
111   public void testIngest() throws Exception {
112     runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10, 20);
113   }
114 
115   protected void internalRunIngestTest(long runTime) throws Exception {
116     String clazz = this.getClass().getSimpleName();
117     long numKeysPerServer = conf.getLong(String.format("%s.%s", clazz, NUM_KEYS_PER_SERVER_KEY),
118       DEFAULT_NUM_KEYS_PER_SERVER);
119     int numWriteThreads = conf.getInt(
120       String.format("%s.%s", clazz, NUM_WRITE_THREADS_KEY), DEFAULT_NUM_WRITE_THREADS);
121     int numReadThreads = conf.getInt(
122       String.format("%s.%s", clazz, NUM_READ_THREADS_KEY), DEFAULT_NUM_READ_THREADS);
123     runIngestTest(runTime, numKeysPerServer, 10, 1024, numWriteThreads, numReadThreads);
124   }
125 
126   @Override
127   public TableName getTablename() {
128     String clazz = this.getClass().getSimpleName();
129     return TableName.valueOf(
130       conf.get(String.format("%s.%s", clazz, LoadTestTool.OPT_TABLE_NAME), clazz));
131   }
132 
133   @Override
134   protected Set<String> getColumnFamilies() {
135     Set<String> families = Sets.newHashSet();
136     String clazz = this.getClass().getSimpleName();
137     // parse conf for getting the column famly names because LTT is not initialized yet.
138     String familiesString = getConf().get(
139       String.format("%s.%s", clazz, LoadTestTool.OPT_COLUMN_FAMILIES));
140     if (familiesString == null) {
141       for (byte[] family : LoadTestTool.DEFAULT_COLUMN_FAMILIES) {
142         families.add(Bytes.toString(family));
143       }
144     } else {
145        for (String family : familiesString.split(",")) {
146          families.add(family);
147        }
148     }
149 
150     return families;
151   }
152 
153   private void deleteTableIfNecessary() throws IOException {
154     if (util.getHBaseAdmin().tableExists(getTablename())) {
155       util.deleteTable(getTablename());
156     }
157   }
158 
159   protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
160       int recordSize, int writeThreads, int readThreads) throws Exception {
161 
162     LOG.info("Running ingest");
163     LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
164 
165     long start = System.currentTimeMillis();
166     String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
167     long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
168     long startKey = 0;
169 
170     long numKeys = getNumKeys(keysPerServerPerIter);
171     while (System.currentTimeMillis() - start < 0.9 * runtime) {
172       LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
173           ((runtime - (System.currentTimeMillis() - start))/60000) + " min");
174 
175       int ret = -1;
176       ret = loadTool.run(getArgsForLoadTestTool("-write",
177           String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
178       if (0 != ret) {
179         String errorMsg = "Load failed with error code " + ret;
180         LOG.error(errorMsg);
181         Assert.fail(errorMsg);
182       }
183 
184       ret = loadTool.run(getArgsForLoadTestTool("-update", String.format("60:%d:1", writeThreads),
185           startKey, numKeys));
186       if (0 != ret) {
187         String errorMsg = "Update failed with error code " + ret;
188         LOG.error(errorMsg);
189         Assert.fail(errorMsg);
190       }
191 
192       ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
193         , startKey, numKeys));
194       if (0 != ret) {
195         String errorMsg = "Verification failed with error code " + ret;
196         LOG.error(errorMsg + " Rerunning verification after 1 minute for debugging");
197         Threads.sleep(1000 * 60);
198         ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
199             , startKey, numKeys));
200         if (0 != ret) {
201           LOG.error("Rerun of Verification failed with error code " + ret);
202         }
203         Assert.fail(errorMsg);
204       }
205       startKey += numKeys;
206     }
207   }
208 
209   protected String[] getArgsForLoadTestToolInitTable() {
210     List<String> args = new ArrayList<String>();
211     args.add("-tn");
212     args.add(getTablename().getNameAsString());
213     // pass all remaining args from conf with keys <test class name>.<load test tool arg>
214     String clazz = this.getClass().getSimpleName();
215     for (String arg : LOAD_TEST_TOOL_INIT_ARGS) {
216       String val = conf.get(String.format("%s.%s", clazz, arg));
217       if (val != null) {
218         args.add("-" + arg);
219         args.add(val);
220       }
221     }
222     args.add("-init_only");
223     return args.toArray(new String[args.size()]);
224   }
225 
226   protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
227       long numKeys) {
228     List<String> args = new ArrayList<String>();
229     args.add("-tn");
230     args.add(getTablename().getNameAsString());
231     args.add("-families");
232     args.add(getColumnFamiliesAsString());
233     args.add(mode);
234     args.add(modeSpecificArg);
235     args.add("-start_key");
236     args.add(String.valueOf(startKey));
237     args.add("-num_keys");
238     args.add(String.valueOf(numKeys));
239     args.add("-skip_init");
240 
241     return args.toArray(new String[args.size()]);
242   }
243 
244   private String getColumnFamiliesAsString() {
245     return StringUtils.join(",", getColumnFamilies());
246   }
247 
248   /** Estimates a data size based on the cluster size */
249   protected long getNumKeys(long keysPerServer)
250       throws IOException {
251     int numRegionServers = cluster.getClusterStatus().getServersSize();
252     return keysPerServer * numRegionServers;
253   }
254 
255   public static void main(String[] args) throws Exception {
256     Configuration conf = HBaseConfiguration.create();
257     IntegrationTestingUtility.setUseDistributedCluster(conf);
258     int ret = ToolRunner.run(conf, new IntegrationTestIngest(), args);
259     System.exit(ret);
260   }
261 }