1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.concurrent.TimeoutException;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.concurrent.atomic.AtomicLong;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
35 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
36 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
37 import org.apache.hadoop.hbase.testclassification.SmallTests;
38 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Assert;
43 import org.junit.Test;
44 import org.junit.experimental.categories.Category;
45
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertFalse;
48 import static org.junit.Assert.assertTrue;
49 import static org.junit.Assert.fail;
50
51 @Category(SmallTests.class)
52 public class TestYieldProcedures {
53 private static final Log LOG = LogFactory.getLog(TestYieldProcedures.class);
54
55 private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
56 private static final Procedure NULL_PROC = null;
57
58 private ProcedureExecutor<TestProcEnv> procExecutor;
59 private TestRunQueue procRunnables;
60 private ProcedureStore procStore;
61
62 private HBaseCommonTestingUtility htu;
63 private FileSystem fs;
64 private Path testDir;
65 private Path logDir;
66
67 @Before
68 public void setUp() throws IOException {
69 htu = new HBaseCommonTestingUtility();
70 testDir = htu.getDataTestDir();
71 fs = testDir.getFileSystem(htu.getConfiguration());
72 assertTrue(testDir.depth() > 1);
73
74 logDir = new Path(testDir, "proc-logs");
75 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
76 procRunnables = new TestRunQueue();
77 procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(),
78 procStore, procRunnables);
79 procStore.start(PROCEDURE_EXECUTOR_SLOTS);
80 procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
81 }
82
83 @After
84 public void tearDown() throws IOException {
85 procExecutor.stop();
86 procStore.stop(false);
87 fs.delete(logDir, true);
88 }
89
90 @Test
91 public void testYieldEachExecutionStep() throws Exception {
92 final int NUM_STATES = 3;
93
94 TestStateMachineProcedure[] procs = new TestStateMachineProcedure[3];
95 for (int i = 0; i < procs.length; ++i) {
96 procs[i] = new TestStateMachineProcedure(true, false);
97 procExecutor.submitProcedure(procs[i]);
98 }
99 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
100
101 for (int i = 0; i < procs.length; ++i) {
102 assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
103
104
105 int index = 0;
106 for (int execStep = 0; execStep < NUM_STATES; ++execStep) {
107 TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
108 assertEquals(false, info.isRollback());
109 assertEquals(execStep, info.getStep().ordinal());
110 }
111
112
113 for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) {
114 TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
115 assertEquals(true, info.isRollback());
116 assertEquals(execStep, info.getStep().ordinal());
117 }
118 }
119
120
121 assertEquals(0, procRunnables.size());
122 assertEquals(0, procRunnables.addFrontCalls);
123 assertEquals(18, procRunnables.addBackCalls);
124 assertEquals(15, procRunnables.yieldCalls);
125 assertEquals(19, procRunnables.pollCalls);
126 assertEquals(3, procRunnables.completionCalls);
127 }
128
129 @Test
130 public void testYieldOnInterrupt() throws Exception {
131 final int NUM_STATES = 3;
132 int count = 0;
133
134 TestStateMachineProcedure proc = new TestStateMachineProcedure(true, true);
135 ProcedureTestingUtility.submitAndWait(procExecutor, proc);
136
137
138 assertEquals(NUM_STATES * 4, proc.getExecutionInfo().size());
139 for (int i = 0; i < NUM_STATES; ++i) {
140 TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
141 assertEquals(false, info.isRollback());
142 assertEquals(i, info.getStep().ordinal());
143
144 info = proc.getExecutionInfo().get(count++);
145 assertEquals(false, info.isRollback());
146 assertEquals(i, info.getStep().ordinal());
147 }
148
149
150 for (int i = NUM_STATES - 1; i >= 0; --i) {
151 TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
152 assertEquals(true, info.isRollback());
153 assertEquals(i, info.getStep().ordinal());
154
155 info = proc.getExecutionInfo().get(count++);
156 assertEquals(true, info.isRollback());
157 assertEquals(i, info.getStep().ordinal());
158 }
159
160
161 assertEquals(0, procRunnables.size());
162 assertEquals(0, procRunnables.addFrontCalls);
163 assertEquals(12, procRunnables.addBackCalls);
164 assertEquals(11, procRunnables.yieldCalls);
165 assertEquals(13, procRunnables.pollCalls);
166 assertEquals(1, procRunnables.completionCalls);
167 }
168
169 @Test
170 public void testYieldException() {
171 TestYieldProcedure proc = new TestYieldProcedure();
172 ProcedureTestingUtility.submitAndWait(procExecutor, proc);
173 assertEquals(6, proc.step);
174
175
176 assertEquals(0, procRunnables.size());
177 assertEquals(0, procRunnables.addFrontCalls);
178 assertEquals(6, procRunnables.addBackCalls);
179 assertEquals(5, procRunnables.yieldCalls);
180 assertEquals(7, procRunnables.pollCalls);
181 assertEquals(1, procRunnables.completionCalls);
182 }
183
184 private static class TestProcEnv {
185 public final AtomicLong timestamp = new AtomicLong(0);
186
187 public long nextTimestamp() {
188 return timestamp.incrementAndGet();
189 }
190 }
191
192 public static class TestStateMachineProcedure
193 extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
194 enum State { STATE_1, STATE_2, STATE_3 }
195
196 public class ExecutionInfo {
197 private final boolean rollback;
198 private final long timestamp;
199 private final State step;
200
201 public ExecutionInfo(long timestamp, State step, boolean isRollback) {
202 this.timestamp = timestamp;
203 this.step = step;
204 this.rollback = isRollback;
205 }
206
207 public State getStep() { return step; }
208 public long getTimestamp() { return timestamp; }
209 public boolean isRollback() { return rollback; }
210 }
211
212 private final ArrayList<ExecutionInfo> executionInfo = new ArrayList<ExecutionInfo>();
213 private final AtomicBoolean aborted = new AtomicBoolean(false);
214 private final boolean throwInterruptOnceOnEachStep;
215 private final boolean abortOnFinalStep;
216
217 public TestStateMachineProcedure() {
218 this(false, false);
219 }
220
221 public TestStateMachineProcedure(boolean abortOnFinalStep,
222 boolean throwInterruptOnceOnEachStep) {
223 this.abortOnFinalStep = abortOnFinalStep;
224 this.throwInterruptOnceOnEachStep = throwInterruptOnceOnEachStep;
225 }
226
227 public ArrayList<ExecutionInfo> getExecutionInfo() {
228 return executionInfo;
229 }
230
231 @Override
232 protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state)
233 throws InterruptedException {
234 final long ts = env.nextTimestamp();
235 LOG.info(getProcId() + " execute step " + state + " ts=" + ts);
236 executionInfo.add(new ExecutionInfo(ts, state, false));
237 Thread.sleep(150);
238
239 if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
240 LOG.debug("THROW INTERRUPT");
241 throw new InterruptedException("test interrupt");
242 }
243
244 switch (state) {
245 case STATE_1:
246 setNextState(State.STATE_2);
247 break;
248 case STATE_2:
249 setNextState(State.STATE_3);
250 break;
251 case STATE_3:
252 if (abortOnFinalStep) {
253 setFailure("test", new IOException("Requested abort on final step"));
254 }
255 return Flow.NO_MORE_STATE;
256 default:
257 throw new UnsupportedOperationException();
258 }
259 return Flow.HAS_MORE_STATE;
260 }
261
262 @Override
263 protected void rollbackState(TestProcEnv env, final State state)
264 throws InterruptedException {
265 final long ts = env.nextTimestamp();
266 LOG.debug(getProcId() + " rollback state " + state + " ts=" + ts);
267 executionInfo.add(new ExecutionInfo(ts, state, true));
268 Thread.sleep(150);
269
270 if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
271 LOG.debug("THROW INTERRUPT");
272 throw new InterruptedException("test interrupt");
273 }
274
275 switch (state) {
276 case STATE_1:
277 break;
278 case STATE_2:
279 break;
280 case STATE_3:
281 break;
282 default:
283 throw new UnsupportedOperationException();
284 }
285 }
286
287 @Override
288 protected State getState(final int stateId) {
289 return State.values()[stateId];
290 }
291
292 @Override
293 protected int getStateId(final State state) {
294 return state.ordinal();
295 }
296
297 @Override
298 protected State getInitialState() {
299 return State.STATE_1;
300 }
301
302 @Override
303 protected boolean isYieldBeforeExecuteFromState(TestProcEnv env, State state) {
304 return true;
305 }
306
307 @Override
308 protected boolean abort(TestProcEnv env) {
309 aborted.set(true);
310 return true;
311 }
312 }
313
314 public static class TestYieldProcedure extends Procedure<TestProcEnv> {
315 private int step = 0;
316
317 public TestYieldProcedure() {
318 }
319
320 @Override
321 protected Procedure[] execute(final TestProcEnv env) throws ProcedureYieldException {
322 LOG.info("execute step " + step);
323 if (step++ < 5) {
324 throw new ProcedureYieldException();
325 }
326 return null;
327 }
328
329 @Override
330 protected void rollback(TestProcEnv env) {
331 }
332
333 @Override
334 protected boolean abort(TestProcEnv env) {
335 return false;
336 }
337
338 @Override
339 protected boolean isYieldAfterExecutionStep(final TestProcEnv env) {
340 return true;
341 }
342
343 @Override
344 protected void serializeStateData(final OutputStream stream) throws IOException {
345 }
346
347 @Override
348 protected void deserializeStateData(final InputStream stream) throws IOException {
349 }
350 }
351
352 private static class TestRunQueue extends ProcedureSimpleRunQueue {
353 private int completionCalls;
354 private int addFrontCalls;
355 private int addBackCalls;
356 private int yieldCalls;
357 private int pollCalls;
358
359 public TestRunQueue() {}
360
361 public void addFront(final Procedure proc) {
362 addFrontCalls++;
363 super.addFront(proc);
364 }
365
366 @Override
367 public void addBack(final Procedure proc) {
368 addBackCalls++;
369 super.addBack(proc);
370 }
371
372 @Override
373 public void yield(final Procedure proc) {
374 yieldCalls++;
375 super.yield(proc);
376 }
377
378 @Override
379 public Procedure poll() {
380 pollCalls++;
381 return super.poll();
382 }
383
384 @Override
385 public void completionCleanup(Procedure proc) {
386 completionCalls++;
387 }
388 }
389 }