1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import static org.junit.Assert.assertNull;
21 import static org.junit.Assert.assertTrue;
22 import static org.mockito.Matchers.any;
23 import static org.mockito.Matchers.anyListOf;
24 import static org.mockito.Matchers.anyString;
25 import static org.mockito.Matchers.eq;
26 import static org.mockito.Mockito.atLeastOnce;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.doThrow;
29 import static org.mockito.Mockito.inOrder;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.never;
32 import static org.mockito.Mockito.reset;
33 import static org.mockito.Mockito.spy;
34 import static org.mockito.Mockito.times;
35 import static org.mockito.Mockito.verify;
36 import static org.mockito.Mockito.when;
37
38 import java.io.IOException;
39 import java.util.Arrays;
40 import java.util.List;
41 import java.util.concurrent.ThreadPoolExecutor;
42 import java.util.concurrent.TimeUnit;
43
44 import org.apache.hadoop.hbase.SmallTests;
45 import org.apache.hadoop.hbase.errorhandling.ForeignException;
46 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
47 import org.junit.After;
48 import org.junit.Test;
49 import org.junit.experimental.categories.Category;
50 import org.mockito.InOrder;
51 import org.mockito.invocation.InvocationOnMock;
52 import org.mockito.stubbing.Answer;
53
54 import com.google.common.collect.Lists;
55
56
57
58
59
60
61
62 @Category(SmallTests.class)
63 public class TestProcedureCoordinator {
64
65 private static final long WAKE_FREQUENCY = 1000;
66 private static final long TIMEOUT = 100000;
67 private static final long POOL_KEEP_ALIVE = 1;
68 private static final String nodeName = "node";
69 private static final String procName = "some op";
70 private static final byte[] procData = new byte[0];
71 private static final List<String> expected = Lists.newArrayList("remote1", "remote2");
72
73
74 private final ProcedureCoordinatorRpcs controller = mock(ProcedureCoordinatorRpcs.class);
75 private final Procedure task = mock(Procedure.class);
76 private final ForeignExceptionDispatcher monitor = mock(ForeignExceptionDispatcher.class);
77
78
79 private ProcedureCoordinator coordinator;
80
81 @After
82 public void resetTest() throws IOException {
83
84 reset(controller, task, monitor);
85
86 if (coordinator != null) coordinator.close();
87 }
88
89 private ProcedureCoordinator buildNewCoordinator() {
90 ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 1, POOL_KEEP_ALIVE);
91 return spy(new ProcedureCoordinator(controller, pool));
92 }
93
94
95
96
97
98 @Test
99 public void testThreadPoolSize() throws Exception {
100 ProcedureCoordinator coordinator = buildNewCoordinator();
101 Procedure proc = new Procedure(coordinator, monitor,
102 WAKE_FREQUENCY, TIMEOUT, procName, procData, expected);
103 Procedure procSpy = spy(proc);
104
105 Procedure proc2 = new Procedure(coordinator, monitor,
106 WAKE_FREQUENCY, TIMEOUT, procName +"2", procData, expected);
107 Procedure procSpy2 = spy(proc2);
108 when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
109 .thenReturn(procSpy, procSpy2);
110
111 coordinator.startProcedure(procSpy.getErrorMonitor(), procName, procData, expected);
112
113 assertNull("Coordinator successfully ran two tasks at once with a single thread pool.",
114 coordinator.startProcedure(proc2.getErrorMonitor(), "another op", procData, expected));
115 }
116
117
118
119
120 @Test(timeout = 60000)
121 public void testUnreachableControllerDuringPrepare() throws Exception {
122 coordinator = buildNewCoordinator();
123
124 List<String> expected = Arrays.asList("cohort");
125 Procedure proc = new Procedure(coordinator, WAKE_FREQUENCY,
126 TIMEOUT, procName, procData, expected);
127 final Procedure procSpy = spy(proc);
128
129 when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
130 .thenReturn(procSpy);
131
132
133 IOException cause = new IOException("Failed to reach comms during acquire");
134 doThrow(cause).when(controller)
135 .sendGlobalBarrierAcquire(eq(procSpy), eq(procData), anyListOf(String.class));
136
137
138 proc = coordinator.startProcedure(proc.getErrorMonitor(), procName, procData, expected);
139
140 while(!proc.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS));
141 verify(procSpy, atLeastOnce()).receive(any(ForeignException.class));
142 verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
143 verify(controller, times(1)).sendGlobalBarrierAcquire(procSpy, procData, expected);
144 verify(controller, never()).sendGlobalBarrierReached(any(Procedure.class),
145 anyListOf(String.class));
146 }
147
148
149
150
151 @Test(timeout = 60000)
152 public void testUnreachableControllerDuringCommit() throws Exception {
153 coordinator = buildNewCoordinator();
154
155
156 List<String> expected = Arrays.asList("cohort");
157 final Procedure spy = spy(new Procedure(coordinator,
158 WAKE_FREQUENCY, TIMEOUT, procName, procData, expected));
159
160 when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
161 .thenReturn(spy);
162
163
164 IOException cause = new IOException("Failed to reach controller during prepare");
165 doAnswer(new AcquireBarrierAnswer(procName, new String[] { "cohort" }))
166 .when(controller).sendGlobalBarrierAcquire(eq(spy), eq(procData), anyListOf(String.class));
167 doThrow(cause).when(controller).sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
168
169
170 Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
171
172 while(!task.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS));
173 verify(spy, atLeastOnce()).receive(any(ForeignException.class));
174 verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
175 verify(controller, times(1)).sendGlobalBarrierAcquire(eq(spy),
176 eq(procData), anyListOf(String.class));
177 verify(controller, times(1)).sendGlobalBarrierReached(any(Procedure.class),
178 anyListOf(String.class));
179 }
180
181 @Test(timeout = 60000)
182 public void testNoCohort() throws Exception {
183 runSimpleProcedure();
184 }
185
186 @Test(timeout = 60000)
187 public void testSingleCohortOrchestration() throws Exception {
188 runSimpleProcedure("one");
189 }
190
191 @Test(timeout = 60000)
192 public void testMultipleCohortOrchestration() throws Exception {
193 runSimpleProcedure("one", "two", "three", "four");
194 }
195
196 public void runSimpleProcedure(String... members) throws Exception {
197 coordinator = buildNewCoordinator();
198 Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY,
199 TIMEOUT, procName, procData, Arrays.asList(members));
200 final Procedure spy = spy(task);
201 runCoordinatedProcedure(spy, members);
202 }
203
204
205
206
207 @Test(timeout = 60000)
208 public void testEarlyJoiningBarrier() throws Exception {
209 final String[] cohort = new String[] { "one", "two", "three", "four" };
210 coordinator = buildNewCoordinator();
211 final ProcedureCoordinator ref = coordinator;
212 Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY,
213 TIMEOUT, procName, procData, Arrays.asList(cohort));
214 final Procedure spy = spy(task);
215
216 AcquireBarrierAnswer prepare = new AcquireBarrierAnswer(procName, cohort) {
217 public void doWork() {
218
219
220 ref.memberAcquiredBarrier(this.opName, this.cohort[0]);
221 ref.memberFinishedBarrier(this.opName, this.cohort[0]);
222
223 ref.memberAcquiredBarrier(this.opName, this.cohort[1]);
224
225 ref.memberAcquiredBarrier(this.opName, this.cohort[2]);
226 ref.memberFinishedBarrier(this.opName, this.cohort[2]);
227
228 ref.memberAcquiredBarrier(this.opName, this.cohort[3]);
229 }
230 };
231
232 BarrierAnswer commit = new BarrierAnswer(procName, cohort) {
233 @Override
234 public void doWork() {
235 ref.memberFinishedBarrier(opName, this.cohort[1]);
236 ref.memberFinishedBarrier(opName, this.cohort[3]);
237 }
238 };
239 runCoordinatedOperation(spy, prepare, commit, cohort);
240 }
241
242
243
244
245
246
247
248
249
250
251 public void runCoordinatedProcedure(Procedure spy, String... cohort) throws Exception {
252 runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort),
253 new BarrierAnswer(procName, cohort), cohort);
254 }
255
256 public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepare,
257 String... cohort) throws Exception {
258 runCoordinatedOperation(spy, prepare, new BarrierAnswer(procName, cohort), cohort);
259 }
260
261 public void runCoordinatedOperation(Procedure spy, BarrierAnswer commit,
262 String... cohort) throws Exception {
263 runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), commit, cohort);
264 }
265
266 public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepareOperation,
267 BarrierAnswer commitOperation, String... cohort) throws Exception {
268 List<String> expected = Arrays.asList(cohort);
269 when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
270 .thenReturn(spy);
271
272
273 doAnswer(prepareOperation).when(controller).sendGlobalBarrierAcquire(spy, procData, expected);
274 doAnswer(commitOperation).when(controller)
275 .sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
276
277
278 Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
279
280 task.waitForCompleted();
281
282
283 prepareOperation.ensureRan();
284
285 InOrder inorder = inOrder(spy, controller);
286 inorder.verify(spy).sendGlobalBarrierStart();
287 inorder.verify(controller).sendGlobalBarrierAcquire(task, procData, expected);
288 inorder.verify(spy).sendGlobalBarrierReached();
289 inorder.verify(controller).sendGlobalBarrierReached(eq(task), anyListOf(String.class));
290 }
291
292 private abstract class OperationAnswer implements Answer<Void> {
293 private boolean ran = false;
294
295 public void ensureRan() {
296 assertTrue("Prepare mocking didn't actually run!", ran);
297 }
298
299 @Override
300 public final Void answer(InvocationOnMock invocation) throws Throwable {
301 this.ran = true;
302 doWork();
303 return null;
304 }
305
306 protected abstract void doWork() throws Throwable;
307 }
308
309
310
311
312 private class AcquireBarrierAnswer extends OperationAnswer {
313 protected final String[] cohort;
314 protected final String opName;
315
316 public AcquireBarrierAnswer(String opName, String... cohort) {
317 this.cohort = cohort;
318 this.opName = opName;
319 }
320
321 @Override
322 public void doWork() {
323 if (cohort == null) return;
324 for (String member : cohort) {
325 TestProcedureCoordinator.this.coordinator.memberAcquiredBarrier(opName, member);
326 }
327 }
328 }
329
330
331
332
333 private class BarrierAnswer extends OperationAnswer {
334 protected final String[] cohort;
335 protected final String opName;
336
337 public BarrierAnswer(String opName, String... cohort) {
338 this.cohort = cohort;
339 this.opName = opName;
340 }
341
342 @Override
343 public void doWork() {
344 if (cohort == null) return;
345 for (String member : cohort) {
346 TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member);
347 }
348 }
349 }
350 }