1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.test;
20
21 import java.io.IOException;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Set;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicLong;
30
31 import org.apache.commons.lang.math.RandomUtils;
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.HBaseConfiguration;
36 import org.apache.hadoop.hbase.HRegionLocation;
37 import org.apache.hadoop.hbase.IntegrationTestIngest;
38 import org.apache.hadoop.hbase.IntegrationTestingUtility;
39 import org.apache.hadoop.hbase.RegionLocations;
40 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
43 import org.apache.hadoop.hbase.client.Admin;
44 import org.apache.hadoop.hbase.client.ClusterConnection;
45 import org.apache.hadoop.hbase.client.Consistency;
46 import org.apache.hadoop.hbase.client.Get;
47 import org.apache.hadoop.hbase.client.Result;
48 import org.apache.hadoop.hbase.client.Table;
49 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
50 import org.apache.hadoop.hbase.util.LoadTestTool;
51 import org.apache.hadoop.hbase.util.MultiThreadedReader;
52 import org.apache.hadoop.hbase.util.Threads;
53 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
54 import org.apache.hadoop.util.StringUtils;
55 import org.apache.hadoop.util.ToolRunner;
56 import org.junit.Assert;
57 import org.junit.experimental.categories.Category;
58
59 import com.google.common.collect.Lists;
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 @Category(IntegrationTests.class)
98 public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends IntegrationTestIngest {
99
100 private static final Log LOG = LogFactory.getLog(
101 IntegrationTestTimeBoundedRequestsWithRegionReplicas.class);
102
103 private static final String TEST_NAME
104 = IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName();
105
106 protected static final long DEFAULT_GET_TIMEOUT = 5000;
107 protected static final String GET_TIMEOUT_KEY = "get_timeout_ms";
108
109 protected static final long DEFAUL_CHAOS_MONKEY_DELAY = 20 * 1000;
110 protected static final String CHAOS_MONKEY_DELAY_KEY = "chaos_monkey_delay";
111
112 protected static final int DEFAULT_REGION_REPLICATION = 3;
113
114 @Override
115 protected void startMonkey() throws Exception {
116
117 }
118
119 @Override
120 protected MonkeyFactory getDefaultMonkeyFactory() {
121 return MonkeyFactory.getFactory(MonkeyFactory.CALM);
122 }
123
124 @Override
125 public void setConf(Configuration conf) {
126 super.setConf(conf);
127
128 String clazz = this.getClass().getSimpleName();
129 conf.setIfUnset(String.format("%s.%s", clazz, LoadTestTool.OPT_REGION_REPLICATION),
130 Integer.toString(DEFAULT_REGION_REPLICATION));
131 }
132
133 protected void writeData(int colsPerKey, int recordSize, int writeThreads,
134 long startKey, long numKeys) throws IOException {
135 int ret = loadTool.run(getArgsForLoadTestTool("-write",
136 String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
137 if (0 != ret) {
138 String errorMsg = "Load failed with error code " + ret;
139 LOG.error(errorMsg);
140 Assert.fail(errorMsg);
141 }
142 }
143
144 @Override
145 protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
146 int recordSize, int writeThreads, int readThreads) throws Exception {
147 LOG.info("Cluster size:"+
148 util.getHBaseClusterInterface().getClusterStatus().getServersSize());
149
150 long start = System.currentTimeMillis();
151 String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
152 long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
153 long startKey = 0;
154
155 long numKeys = getNumKeys(keysPerServerPerIter);
156
157
158
159 LOG.info("Writing some data to the table");
160 writeData(colsPerKey, recordSize, writeThreads, startKey, numKeys);
161
162
163 LOG.info("Flushing the table");
164 Admin admin = util.getHBaseAdmin();
165 admin.flush(getTablename());
166
167
168 long refreshTime = conf.getLong(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0);
169 if (refreshTime > 0 && refreshTime <= 10000) {
170 LOG.info("Sleeping " + refreshTime + "ms to ensure that the data is replicated");
171 Threads.sleep(refreshTime*3);
172 } else {
173 LOG.info("Reopening the table");
174 admin.disableTable(getTablename());
175 admin.enableTable(getTablename());
176 }
177
178
179
180
181
182
183 long chaosMonkeyDelay = conf.getLong(String.format("%s.%s", TEST_NAME, CHAOS_MONKEY_DELAY_KEY)
184 , DEFAUL_CHAOS_MONKEY_DELAY);
185 ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
186 LOG.info(String.format("ChaosMonkey delay is : %d seconds. Will start %s " +
187 "ChaosMonkey after delay", chaosMonkeyDelay / 1000, monkeyToUse));
188 ScheduledFuture<?> result = executorService.schedule(new Runnable() {
189 @Override
190 public void run() {
191 try {
192 LOG.info("Starting ChaosMonkey");
193 monkey.start();
194 monkey.waitForStop();
195 } catch (Exception e) {
196 LOG.warn(StringUtils.stringifyException(e));
197 }
198
199 }
200 }, chaosMonkeyDelay, TimeUnit.MILLISECONDS);
201
202
203
204 long remainingTime = runtime - (System.currentTimeMillis() - start);
205 LOG.info("Reading random keys from the table for " + remainingTime/60000 + " min");
206 this.conf.setLong(
207 String.format(RUN_TIME_KEY, TimeBoundedMultiThreadedReader.class.getSimpleName())
208 , remainingTime);
209
210
211 try {
212 int ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
213 , startKey, numKeys));
214 if (0 != ret) {
215 String errorMsg = "Verification failed with error code " + ret;
216 LOG.error(errorMsg);
217 Assert.fail(errorMsg);
218 }
219 } finally {
220 if (result != null) result.cancel(false);
221 monkey.stop("Stopping the test");
222 monkey.waitForStop();
223 executorService.shutdown();
224 }
225 }
226
227 @Override
228 protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
229 long numKeys) {
230 List<String> args = Lists.newArrayList(super.getArgsForLoadTestTool(
231 mode, modeSpecificArg, startKey, numKeys));
232 args.add("-reader");
233 args.add(TimeBoundedMultiThreadedReader.class.getName());
234 return args.toArray(new String[args.size()]);
235 }
236
237 public static class TimeBoundedMultiThreadedReader extends MultiThreadedReader {
238 protected long timeoutNano;
239 protected AtomicLong timedOutReads = new AtomicLong();
240 protected long runTime;
241 protected Thread timeoutThread;
242 protected AtomicLong staleReads = new AtomicLong();
243
244 public TimeBoundedMultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
245 TableName tableName, double verifyPercent) throws IOException {
246 super(dataGen, conf, tableName, verifyPercent);
247 long timeoutMs = conf.getLong(
248 String.format("%s.%s", TEST_NAME, GET_TIMEOUT_KEY), DEFAULT_GET_TIMEOUT);
249 timeoutNano = timeoutMs * 1000000;
250 LOG.info("Timeout for gets: " + timeoutMs);
251 String runTimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
252 this.runTime = conf.getLong(runTimeKey, -1);
253 if (this.runTime <= 0) {
254 throw new IllegalArgumentException("Please configure " + runTimeKey);
255 }
256 }
257
258 @Override
259 public void waitForFinish() {
260 try {
261 this.timeoutThread.join();
262 } catch (InterruptedException e) {
263 e.printStackTrace();
264 }
265 this.aborted = true;
266 super.waitForFinish();
267 }
268
269 @Override
270 protected String progressInfo() {
271 StringBuilder builder = new StringBuilder(super.progressInfo());
272 appendToStatus(builder, "stale_reads", staleReads.get());
273 appendToStatus(builder, "get_timeouts", timedOutReads.get());
274 return builder.toString();
275 }
276
277 @Override
278 public void start(long startKey, long endKey, int numThreads) throws IOException {
279 super.start(startKey, endKey, numThreads);
280 this.timeoutThread = new TimeoutThread(this.runTime);
281 this.timeoutThread.start();
282 }
283
284 @Override
285 protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
286 return new TimeBoundedMultiThreadedReaderThread(readerId);
287 }
288
289 private class TimeoutThread extends Thread {
290 long timeout;
291 long reportInterval = 60000;
292 public TimeoutThread(long timeout) {
293 this.timeout = timeout;
294 }
295
296 @Override
297 public void run() {
298 while (true) {
299 long rem = Math.min(timeout, reportInterval);
300 if (rem <= 0) {
301 break;
302 }
303 LOG.info("Remaining execution time:" + timeout / 60000 + " min");
304 Threads.sleep(rem);
305 timeout -= rem;
306 }
307 }
308 }
309
310 public class TimeBoundedMultiThreadedReaderThread
311 extends MultiThreadedReader.HBaseReaderThread {
312
313 public TimeBoundedMultiThreadedReaderThread(int readerId) throws IOException {
314 super(readerId);
315 }
316
317 @Override
318 protected Get createGet(long keyToRead) throws IOException {
319 Get get = super.createGet(keyToRead);
320 get.setConsistency(Consistency.TIMELINE);
321 return get;
322 }
323
324 @Override
325 protected long getNextKeyToRead() {
326
327 long key = startKey + Math.abs(RandomUtils.nextLong())
328 % (endKey - startKey);
329 return key;
330 }
331
332 @Override
333 protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
334 Result[] results, Table table, boolean isNullExpected)
335 throws IOException {
336 super.verifyResultsAndUpdateMetrics(verify, gets, elapsedNano, results, table, isNullExpected);
337 for (Result r : results) {
338 if (r.isStale()) staleReads.incrementAndGet();
339 }
340
341
342 if (elapsedNano > timeoutNano) {
343 timedOutReads.incrementAndGet();
344 numReadFailures.addAndGet(1);
345 for (Result r : results) {
346 LOG.error("FAILED FOR " + r);
347 RegionLocations rl = ((ClusterConnection)connection).
348 locateRegion(tableName, r.getRow(), true, true);
349 HRegionLocation locations[] = rl.getRegionLocations();
350 for (HRegionLocation h : locations) {
351 LOG.error("LOCATION " + h);
352 }
353 }
354 }
355 }
356 }
357 }
358
359 public static void main(String[] args) throws Exception {
360 Configuration conf = HBaseConfiguration.create();
361 IntegrationTestingUtility.setUseDistributedCluster(conf);
362 int ret = ToolRunner.run(conf, new IntegrationTestTimeBoundedRequestsWithRegionReplicas(), args);
363 System.exit(ret);
364 }
365 }