1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mttr;
20
21 import static org.junit.Assert.assertEquals;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.TimeUnit;
30
31 import org.apache.commons.lang.RandomStringUtils;
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
35 import org.apache.hadoop.hbase.ClusterStatus;
36 import org.apache.hadoop.hbase.HColumnDescriptor;
37 import org.apache.hadoop.hbase.HTableDescriptor;
38 import org.apache.hadoop.hbase.IntegrationTestingUtility;
39 import org.apache.hadoop.hbase.IntegrationTests;
40 import org.apache.hadoop.hbase.InvalidFamilyOperationException;
41 import org.apache.hadoop.hbase.NamespaceExistException;
42 import org.apache.hadoop.hbase.NamespaceNotFoundException;
43 import org.apache.hadoop.hbase.TableExistsException;
44 import org.apache.hadoop.hbase.TableName;
45 import org.apache.hadoop.hbase.TableNotFoundException;
46 import org.apache.hadoop.hbase.chaos.actions.Action;
47 import org.apache.hadoop.hbase.chaos.actions.MoveRegionsOfTableAction;
48 import org.apache.hadoop.hbase.chaos.actions.RestartActiveMasterAction;
49 import org.apache.hadoop.hbase.chaos.actions.RestartRsHoldingMetaAction;
50 import org.apache.hadoop.hbase.chaos.actions.RestartRsHoldingTableAction;
51 import org.apache.hadoop.hbase.client.HBaseAdmin;
52 import org.apache.hadoop.hbase.client.HTable;
53 import org.apache.hadoop.hbase.client.Put;
54 import org.apache.hadoop.hbase.client.Result;
55 import org.apache.hadoop.hbase.client.ResultScanner;
56 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
57 import org.apache.hadoop.hbase.client.Scan;
58 import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
59 import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
60 import org.apache.hadoop.hbase.ipc.FatalConnectionException;
61 import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
62 import org.apache.hadoop.hbase.security.AccessDeniedException;
63 import org.apache.hadoop.hbase.util.Bytes;
64 import org.apache.hadoop.hbase.util.LoadTestTool;
65 import org.cloudera.htrace.Span;
66 import org.cloudera.htrace.Trace;
67 import org.cloudera.htrace.TraceScope;
68 import org.cloudera.htrace.impl.AlwaysSampler;
69 import org.junit.AfterClass;
70 import org.junit.BeforeClass;
71 import org.junit.Test;
72 import org.junit.experimental.categories.Category;
73
74 import com.google.common.base.Objects;
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 @Category(IntegrationTests.class)
116 public class IntegrationTestMTTR {
117
118
119
120 private static final byte[] FAMILY = Bytes.toBytes("d");
121 private static final Log LOG = LogFactory.getLog(IntegrationTestMTTR.class);
122 private static long sleepTime;
123 private static final String SLEEP_TIME_KEY = "hbase.IntegrationTestMTTR.sleeptime";
124 private static final long SLEEP_TIME_DEFAULT = 60 * 1000l;
125
126
127
128
129 private static TableName tableName;
130 private static TableName loadTableName;
131
132
133
134
135 private static IntegrationTestingUtility util;
136
137
138
139
140 private static ExecutorService executorService;
141
142
143
144
145 private static Action restartRSAction;
146 private static Action restartMetaAction;
147 private static Action moveRegionAction;
148 private static Action restartMasterAction;
149
150
151
152
153 private static LoadTestTool loadTool;
154
155
156 @BeforeClass
157 public static void setUp() throws Exception {
158
159 if (util == null) {
160 util = new IntegrationTestingUtility();
161 }
162
163
164 util.initializeCluster(3);
165
166
167 loadTool = new LoadTestTool();
168 loadTool.setConf(util.getConfiguration());
169
170
171
172 executorService = Executors.newFixedThreadPool(8);
173
174
175 setupTables();
176
177
178 sleepTime = util.getConfiguration().getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT);
179 setupActions();
180 }
181
182 private static void setupActions() throws IOException {
183
184
185 restartRSAction = new RestartRsHoldingTableAction(sleepTime, tableName.getNameAsString());
186
187
188 restartMetaAction = new RestartRsHoldingMetaAction(sleepTime);
189
190
191 moveRegionAction = new MoveRegionsOfTableAction(sleepTime, tableName.getNameAsString());
192
193
194 restartMasterAction = new RestartActiveMasterAction(1000);
195
196
197 Action.ActionContext actionContext = new Action.ActionContext(util);
198 restartRSAction.init(actionContext);
199 restartMetaAction.init(actionContext);
200 moveRegionAction.init(actionContext);
201 restartMasterAction.init(actionContext);
202 }
203
204 private static void setupTables() throws IOException {
205
206 tableName = TableName.valueOf(util.getConfiguration()
207 .get("hbase.IntegrationTestMTTR.tableName", "IntegrationTestMTTR"));
208
209 loadTableName = TableName.valueOf(util.getConfiguration()
210 .get("hbase.IntegrationTestMTTR.loadTableName", "IntegrationTestMTTRLoadTestTool"));
211
212 if (util.getHBaseAdmin().tableExists(tableName)) {
213 util.deleteTable(tableName);
214 }
215
216 if (util.getHBaseAdmin().tableExists(loadTableName)) {
217 util.deleteTable(loadTableName);
218 }
219
220
221 HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
222
223
224 tableDescriptor.setMaxFileSize(Long.MAX_VALUE);
225
226 HColumnDescriptor descriptor = new HColumnDescriptor(FAMILY);
227 descriptor.setMaxVersions(1);
228 tableDescriptor.addFamily(descriptor);
229 util.getHBaseAdmin().createTable(tableDescriptor);
230
231
232 int ret = loadTool.run(new String[]{"-tn", loadTableName.getNameAsString(), "-init_only"});
233 assertEquals("Failed to initialize LoadTestTool", 0, ret);
234 }
235
236 @AfterClass
237 public static void after() throws IOException {
238
239 util.restoreCluster();
240 util = null;
241
242
243 executorService.shutdown();
244 executorService = null;
245
246
247 moveRegionAction = null;
248 restartMetaAction = null;
249 restartRSAction = null;
250 restartMasterAction = null;
251
252 loadTool = null;
253 }
254
255 @Test
256 public void testRestartRsHoldingTable() throws Exception {
257 run(new ActionCallable(restartRSAction), "RestartRsHoldingTableAction");
258 }
259
260 @Test
261 public void testKillRsHoldingMeta() throws Exception {
262 run(new ActionCallable(restartMetaAction), "KillRsHoldingMeta");
263 }
264
265 @Test
266 public void testMoveRegion() throws Exception {
267 run(new ActionCallable(moveRegionAction), "MoveRegion");
268 }
269
270 @Test
271 public void testRestartMaster() throws Exception {
272 run(new ActionCallable(restartMasterAction), "RestartMaster");
273 }
274
275 public void run(Callable<Boolean> monkeyCallable, String testName) throws Exception {
276 int maxIters = util.getHBaseClusterInterface().isDistributedCluster() ? 10 : 3;
277
278
279 ArrayList<TimingResult> resultPuts = new ArrayList<TimingResult>(maxIters);
280 ArrayList<TimingResult> resultScan = new ArrayList<TimingResult>(maxIters);
281 ArrayList<TimingResult> resultAdmin = new ArrayList<TimingResult>(maxIters);
282 long start = System.nanoTime();
283
284
285 for (int fullIterations = 0; fullIterations < maxIters; fullIterations++) {
286
287 Future<Boolean> monkeyFuture = executorService.submit(monkeyCallable);
288
289
290 Future<TimingResult> putFuture = executorService.submit(new PutCallable(monkeyFuture));
291 Future<TimingResult> scanFuture = executorService.submit(new ScanCallable(monkeyFuture));
292 Future<TimingResult> adminFuture = executorService.submit(new AdminCallable(monkeyFuture));
293
294 Future<Boolean> loadFuture = executorService.submit(new LoadCallable(monkeyFuture));
295
296 monkeyFuture.get();
297 loadFuture.get();
298
299
300 TimingResult putTime = putFuture.get();
301 TimingResult scanTime = scanFuture.get();
302 TimingResult adminTime = adminFuture.get();
303
304
305 resultPuts.add(putTime);
306 resultScan.add(scanTime);
307 resultAdmin.add(adminTime);
308
309
310 Thread.sleep(5000l);
311 }
312
313 long runtimeMs = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
314
315 Objects.ToStringHelper helper = Objects.toStringHelper("MTTRResults")
316 .add("putResults", resultPuts)
317 .add("scanResults", resultScan)
318 .add("adminResults", resultAdmin)
319 .add("totalRuntimeMs", runtimeMs)
320 .add("name", testName);
321
322
323 LOG.info(helper.toString());
324 }
325
326
327
328
329
330
331 private static class TimingResult {
332 DescriptiveStatistics stats = new DescriptiveStatistics();
333 ArrayList<Long> traces = new ArrayList<Long>(10);
334
335
336
337
338
339
340 public void addResult(long time, Span span) {
341 stats.addValue(TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS));
342 if (TimeUnit.SECONDS.convert(time, TimeUnit.NANOSECONDS) >= 1) {
343 traces.add(span.getTraceId());
344 }
345 }
346
347 public String toString() {
348 Objects.ToStringHelper helper = Objects.toStringHelper(this)
349 .add("numResults", stats.getN())
350 .add("minTime", stats.getMin())
351 .add("meanTime", stats.getMean())
352 .add("maxTime", stats.getMax())
353 .add("25th", stats.getPercentile(25))
354 .add("50th", stats.getPercentile(50))
355 .add("75th", stats.getPercentile(75))
356 .add("90th", stats.getPercentile(90))
357 .add("95th", stats.getPercentile(95))
358 .add("99th", stats.getPercentile(99))
359 .add("99.9th", stats.getPercentile(99.9))
360 .add("99.99th", stats.getPercentile(99.99))
361 .add("traces", traces);
362 return helper.toString();
363 }
364 }
365
366
367
368
369 static abstract class TimingCallable implements Callable<TimingResult> {
370 protected final Future<?> future;
371
372 public TimingCallable(Future<?> f) {
373 future = f;
374 }
375
376 @Override
377 public TimingResult call() throws Exception {
378 TimingResult result = new TimingResult();
379 final int maxIterations = 10;
380 int numAfterDone = 0;
381 int resetCount = 0;
382
383 while (numAfterDone < maxIterations) {
384 long start = System.nanoTime();
385 TraceScope scope = null;
386 try {
387 scope = Trace.startSpan(getSpanName(), AlwaysSampler.INSTANCE);
388 boolean actionResult = doAction();
389 if (actionResult && future.isDone()) {
390 numAfterDone++;
391 }
392
393
394
395
396
397
398 } catch (AccessDeniedException e) {
399 throw e;
400 } catch (CoprocessorException e) {
401 throw e;
402 } catch (FatalConnectionException e) {
403 throw e;
404 } catch (InvalidFamilyOperationException e) {
405 throw e;
406 } catch (NamespaceExistException e) {
407 throw e;
408 } catch (NamespaceNotFoundException e) {
409 throw e;
410 } catch (NoSuchColumnFamilyException e) {
411 throw e;
412 } catch (TableExistsException e) {
413 throw e;
414 } catch (TableNotFoundException e) {
415 throw e;
416 } catch (RetriesExhaustedException e){
417 throw e;
418
419
420
421
422
423 } catch (Exception e) {
424 resetCount++;
425 if (resetCount < maxIterations) {
426 LOG.info("Non-fatal exception while running " + this.toString()
427 + ". Resetting loop counter", e);
428 numAfterDone = 0;
429 } else {
430 LOG.info("Too many unexpected Exceptions. Aborting.", e);
431 throw e;
432 }
433 } finally {
434 if (scope != null) {
435 scope.close();
436 }
437 }
438 result.addResult(System.nanoTime() - start, scope.getSpan());
439 }
440 return result;
441 }
442
443 protected abstract boolean doAction() throws Exception;
444
445 protected String getSpanName() {
446 return this.getClass().getSimpleName();
447 }
448
449 @Override
450 public String toString() {
451 return this.getSpanName();
452 }
453 }
454
455
456
457
458
459 static class PutCallable extends TimingCallable {
460
461 private final HTable table;
462
463 public PutCallable(Future<?> f) throws IOException {
464 super(f);
465 this.table = new HTable(util.getConfiguration(), tableName);
466 }
467
468 @Override
469 protected boolean doAction() throws Exception {
470 Put p = new Put(Bytes.toBytes(RandomStringUtils.randomAlphanumeric(5)));
471 p.add(FAMILY, Bytes.toBytes("\0"), Bytes.toBytes(RandomStringUtils.randomAscii(5)));
472 table.put(p);
473 table.flushCommits();
474 return true;
475 }
476
477 @Override
478 protected String getSpanName() {
479 return "MTTR Put Test";
480 }
481 }
482
483
484
485
486
487 static class ScanCallable extends TimingCallable {
488 private final HTable table;
489
490 public ScanCallable(Future<?> f) throws IOException {
491 super(f);
492 this.table = new HTable(util.getConfiguration(), tableName);
493 }
494
495 @Override
496 protected boolean doAction() throws Exception {
497 ResultScanner rs = null;
498 try {
499 Scan s = new Scan();
500 s.setBatch(2);
501 s.addFamily(FAMILY);
502 s.setFilter(new KeyOnlyFilter());
503 s.setMaxVersions(1);
504
505 rs = table.getScanner(s);
506 Result result = rs.next();
507 return result != null && result.size() > 0;
508 } finally {
509 if (rs != null) {
510 rs.close();
511 }
512 }
513 }
514 @Override
515 protected String getSpanName() {
516 return "MTTR Scan Test";
517 }
518 }
519
520
521
522
523 static class AdminCallable extends TimingCallable {
524
525 public AdminCallable(Future<?> f) throws IOException {
526 super(f);
527 }
528
529 @Override
530 protected boolean doAction() throws Exception {
531 HBaseAdmin admin = null;
532 try {
533 admin = new HBaseAdmin(util.getConfiguration());
534 ClusterStatus status = admin.getClusterStatus();
535 return status != null;
536 } finally {
537 if (admin != null) {
538 admin.close();
539 }
540 }
541 }
542
543 @Override
544 protected String getSpanName() {
545 return "MTTR Admin Test";
546 }
547 }
548
549
550 static class ActionCallable implements Callable<Boolean> {
551 private final Action action;
552
553 public ActionCallable(Action action) {
554 this.action = action;
555 }
556
557 @Override
558 public Boolean call() throws Exception {
559 this.action.perform();
560 return true;
561 }
562 }
563
564
565
566
567
568 public static class LoadCallable implements Callable<Boolean> {
569
570 private final Future<?> future;
571
572 public LoadCallable(Future<?> f) {
573 future = f;
574 }
575
576 @Override
577 public Boolean call() throws Exception {
578 int colsPerKey = 10;
579 int numServers = util.getHBaseClusterInterface().getInitialClusterStatus().getServersSize();
580 int numKeys = numServers * 5000;
581 int writeThreads = 10;
582
583
584
585
586 do {
587 int ret = loadTool.run(new String[]{
588 "-tn", loadTableName.getNameAsString(),
589 "-write", String.format("%d:%d:%d", colsPerKey, 500, writeThreads),
590 "-num_keys", String.valueOf(numKeys),
591 "-skip_init"
592 });
593 assertEquals("Load failed", 0, ret);
594 } while (!future.isDone());
595
596 return true;
597 }
598 }
599 }