View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.Set;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.util.Bytes;
30  import org.apache.hadoop.hbase.util.LoadTestTool;
31  import org.apache.hadoop.util.ToolRunner;
32  import org.junit.Assert;
33  import org.junit.Test;
34  import org.junit.experimental.categories.Category;
35  
36  import com.google.common.collect.Sets;
37  
38  /**
39   * A base class for tests that do something with the cluster while running
40   * {@link LoadTestTool} to write and verify some data.
41   */
42  @Category(IntegrationTests.class)
43  public class IntegrationTestIngest extends IntegrationTestBase {
44    public static final char HIPHEN = '-';
45    private static final int SERVER_COUNT = 4; // number of slaves for the smallest cluster
46    private static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
47    private static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
48  
49    /** A soft limit on how long we should run */
50    private static final String RUN_TIME_KEY = "hbase.%s.runtime";
51  
52    protected static final Log LOG = LogFactory.getLog(IntegrationTestIngest.class);
53    protected IntegrationTestingUtility util;
54    protected HBaseCluster cluster;
55    protected LoadTestTool loadTool;
56  
57    @Override
58    public void setUpCluster() throws Exception {
59      util = getTestingUtil(null);
60      LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
61      util.initializeCluster(SERVER_COUNT);
62      LOG.debug("Done initializing/checking cluster");
63      cluster = util.getHBaseClusterInterface();
64      deleteTableIfNecessary();
65      loadTool = new LoadTestTool();
66      loadTool.setConf(util.getConfiguration());
67      // Initialize load test tool before we start breaking things;
68      // LoadTestTool init, even when it is a no-op, is very fragile.
69      initTable();
70    }
71  
72    protected void initTable() throws IOException {
73      int ret = loadTool.run(new String[] { "-tn", getTablename(), "-init_only" });
74      Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
75    }
76  
77    @Override
78    public int runTestFromCommandLine() throws Exception {
79      internalRunIngestTest(DEFAULT_RUN_TIME);
80      return 0;
81    }
82  
83    @Test
84    public void testIngest() throws Exception {
85      runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10);
86    }
87  
88    private void internalRunIngestTest(long runTime) throws Exception {
89      runIngestTest(runTime, 2500, 10, 1024, 10);
90    }
91  
92    @Override
93    public String getTablename() {
94      return this.getClass().getSimpleName();
95    }
96  
97    @Override
98    protected Set<String> getColumnFamilies() {
99      return Sets.newHashSet(Bytes.toString(LoadTestTool.COLUMN_FAMILY));
100   }
101 
102   private void deleteTableIfNecessary() throws IOException {
103     if (util.getHBaseAdmin().tableExists(getTablename())) {
104       util.deleteTable(Bytes.toBytes(getTablename()));
105     }
106   }
107   protected void runIngestTest(long defaultRunTime, int keysPerServerPerIter, int colsPerKey,
108       int recordSize, int writeThreads) throws Exception {
109     LOG.info("Running ingest");
110     LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
111 
112     long start = System.currentTimeMillis();
113     String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
114     long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
115     long startKey = 0;
116 
117     long numKeys = getNumKeys(keysPerServerPerIter);
118     while (System.currentTimeMillis() - start < 0.9 * runtime) {
119       LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
120           ((runtime - (System.currentTimeMillis() - start))/60000) + " min");
121 
122       int ret = -1;
123       ret = loadTool.run(getArgsForLoadTestTool("-write",
124           String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
125       if (0 != ret) {
126         String errorMsg = "Load failed with error code " + ret;
127         LOG.error(errorMsg);
128         Assert.fail(errorMsg);
129       }
130 
131       ret = loadTool.run(getArgsForLoadTestTool("-update", String.format("60:%d:1", writeThreads),
132           startKey, numKeys));
133       if (0 != ret) {
134         String errorMsg = "Update failed with error code " + ret;
135         LOG.error(errorMsg);
136         Assert.fail(errorMsg);
137       }
138 
139       ret = loadTool.run(getArgsForLoadTestTool("-read", "100:20", startKey, numKeys));
140       if (0 != ret) {
141         String errorMsg = "Verification failed with error code " + ret;
142         LOG.error(errorMsg);
143         Assert.fail(errorMsg);
144       }
145       startKey += numKeys;
146     }
147   }
148 
149   protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
150       long numKeys) {
151     List<String> args = new ArrayList<String>();
152     args.add("-tn");
153     args.add(getTablename());
154     args.add(mode);
155     args.add(modeSpecificArg);
156     args.add("-start_key");
157     args.add(String.valueOf(startKey));
158     args.add("-num_keys");
159     args.add(String.valueOf(numKeys));
160     args.add("-skip_init");
161     return args.toArray(new String[args.size()]);
162   }
163 
164   /** Estimates a data size based on the cluster size */
165   private long getNumKeys(int keysPerServer)
166       throws IOException {
167     int numRegionServers = cluster.getClusterStatus().getServersSize();
168     return keysPerServer * numRegionServers;
169   }
170 
171   public static void main(String[] args) throws Exception {
172     Configuration conf = HBaseConfiguration.create();
173     IntegrationTestingUtility.setUseDistributedCluster(conf);
174     int ret = ToolRunner.run(conf, new IntegrationTestIngest(), args);
175     System.exit(ret);
176   }
177 }