1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.chaos.policies;
20
21 import org.apache.hadoop.hbase.DaemonThreadFactory;
22 import org.apache.hadoop.hbase.chaos.actions.Action;
23 import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
24 import org.apache.hadoop.util.StringUtils;
25
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30
31
32
33
34
35 public class TwoConcurrentActionPolicy extends PeriodicPolicy {
36 private final Action[] actionsOne;
37 private final Action[] actionsTwo;
38 private final ExecutorService executor;
39
40 public TwoConcurrentActionPolicy(long sleepTime, Action[] actionsOne, Action[] actionsTwo) {
41 super(sleepTime);
42 this.actionsOne = actionsOne;
43 this.actionsTwo = actionsTwo;
44 executor = Executors.newFixedThreadPool(2,
45 new DaemonThreadFactory("TwoConcurrentAction-"));
46 }
47
48 @Override
49 protected void runOneIteration() {
50 Action actionOne = PolicyBasedChaosMonkey.selectRandomItem(actionsOne);
51 Action actionTwo = PolicyBasedChaosMonkey.selectRandomItem(actionsTwo);
52
53 Future fOne = executor.submit(new ActionRunner(actionOne));
54 Future fTwo = executor.submit(new ActionRunner(actionTwo));
55
56 try {
57 fOne.get();
58 fTwo.get();
59 } catch (InterruptedException e) {
60 LOG.warn("Exception occurred during performing action: "
61 + StringUtils.stringifyException(e));
62 } catch (ExecutionException ex) {
63 LOG.warn("Exception occurred during performing action: "
64 + StringUtils.stringifyException(ex));
65 }
66 }
67
68 @Override
69 public void init(PolicyContext context) throws Exception {
70 super.init(context);
71 for (Action a : actionsOne) {
72 a.init(context);
73 }
74 for (Action a : actionsTwo) {
75 a.init(context);
76 }
77 }
78
79 private static class ActionRunner implements Runnable {
80
81 private final Action action;
82
83 public ActionRunner(Action action) {
84
85 this.action = action;
86 }
87
88 @Override public void run() {
89 try {
90 action.perform();
91 } catch (Exception ex) {
92 LOG.warn("Exception occurred during performing action: "
93 + StringUtils.stringifyException(ex));
94 }
95 }
96 }
97 }