1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Set;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.util.Bytes;
30 import org.apache.hadoop.hbase.util.LoadTestTool;
31 import org.apache.hadoop.util.ToolRunner;
32 import org.junit.Assert;
33 import org.junit.Test;
34 import org.junit.experimental.categories.Category;
35
36 import com.google.common.collect.Sets;
37
38
39
40
41
42 @Category(IntegrationTests.class)
43 public class IntegrationTestIngest extends IntegrationTestBase {
44 public static final char HIPHEN = '-';
45 private static final int SERVER_COUNT = 4;
46 private static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
47 private static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
48
49
50 private static final String RUN_TIME_KEY = "hbase.%s.runtime";
51
52 protected static final Log LOG = LogFactory.getLog(IntegrationTestIngest.class);
53 protected IntegrationTestingUtility util;
54 protected HBaseCluster cluster;
55 protected LoadTestTool loadTool;
56
57 @Override
58 public void setUpCluster() throws Exception {
59 util = getTestingUtil(null);
60 LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
61 util.initializeCluster(SERVER_COUNT);
62 LOG.debug("Done initializing/checking cluster");
63 cluster = util.getHBaseClusterInterface();
64 deleteTableIfNecessary();
65 loadTool = new LoadTestTool();
66 loadTool.setConf(util.getConfiguration());
67
68
69 initTable();
70 }
71
72 protected void initTable() throws IOException {
73 int ret = loadTool.run(new String[] { "-tn", getTablename(), "-init_only" });
74 Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
75 }
76
77 @Override
78 public int runTestFromCommandLine() throws Exception {
79 internalRunIngestTest(DEFAULT_RUN_TIME);
80 return 0;
81 }
82
83 @Test
84 public void testIngest() throws Exception {
85 runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10);
86 }
87
88 private void internalRunIngestTest(long runTime) throws Exception {
89 runIngestTest(runTime, 2500, 10, 1024, 10);
90 }
91
92 @Override
93 public String getTablename() {
94 return this.getClass().getSimpleName();
95 }
96
97 @Override
98 protected Set<String> getColumnFamilies() {
99 return Sets.newHashSet(Bytes.toString(LoadTestTool.COLUMN_FAMILY));
100 }
101
102 private void deleteTableIfNecessary() throws IOException {
103 if (util.getHBaseAdmin().tableExists(getTablename())) {
104 util.deleteTable(Bytes.toBytes(getTablename()));
105 }
106 }
107 protected void runIngestTest(long defaultRunTime, int keysPerServerPerIter, int colsPerKey,
108 int recordSize, int writeThreads) throws Exception {
109 LOG.info("Running ingest");
110 LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
111
112 long start = System.currentTimeMillis();
113 String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
114 long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
115 long startKey = 0;
116
117 long numKeys = getNumKeys(keysPerServerPerIter);
118 while (System.currentTimeMillis() - start < 0.9 * runtime) {
119 LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
120 ((runtime - (System.currentTimeMillis() - start))/60000) + " min");
121
122 int ret = -1;
123 ret = loadTool.run(getArgsForLoadTestTool("-write",
124 String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
125 if (0 != ret) {
126 String errorMsg = "Load failed with error code " + ret;
127 LOG.error(errorMsg);
128 Assert.fail(errorMsg);
129 }
130
131 ret = loadTool.run(getArgsForLoadTestTool("-update", String.format("60:%d:1", writeThreads),
132 startKey, numKeys));
133 if (0 != ret) {
134 String errorMsg = "Update failed with error code " + ret;
135 LOG.error(errorMsg);
136 Assert.fail(errorMsg);
137 }
138
139 ret = loadTool.run(getArgsForLoadTestTool("-read", "100:20", startKey, numKeys));
140 if (0 != ret) {
141 String errorMsg = "Verification failed with error code " + ret;
142 LOG.error(errorMsg);
143 Assert.fail(errorMsg);
144 }
145 startKey += numKeys;
146 }
147 }
148
149 protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
150 long numKeys) {
151 List<String> args = new ArrayList<String>();
152 args.add("-tn");
153 args.add(getTablename());
154 args.add(mode);
155 args.add(modeSpecificArg);
156 args.add("-start_key");
157 args.add(String.valueOf(startKey));
158 args.add("-num_keys");
159 args.add(String.valueOf(numKeys));
160 args.add("-skip_init");
161 return args.toArray(new String[args.size()]);
162 }
163
164
165 private long getNumKeys(int keysPerServer)
166 throws IOException {
167 int numRegionServers = cluster.getClusterStatus().getServersSize();
168 return keysPerServer * numRegionServers;
169 }
170
171 public static void main(String[] args) throws Exception {
172 Configuration conf = HBaseConfiguration.create();
173 IntegrationTestingUtility.setUseDistributedCluster(conf);
174 int ret = ToolRunner.run(conf, new IntegrationTestIngest(), args);
175 System.exit(ret);
176 }
177 }