1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.mockito.Matchers.any;
22 import static org.mockito.Matchers.anyListOf;
23 import static org.mockito.Matchers.eq;
24 import static org.mockito.Mockito.atMost;
25 import static org.mockito.Mockito.never;
26 import static org.mockito.Mockito.spy;
27 import static org.mockito.Mockito.when;
28
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.List;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.ThreadPoolExecutor;
35 import java.util.concurrent.atomic.AtomicInteger;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.hbase.Abortable;
40 import org.apache.hadoop.hbase.HBaseTestingUtility;
41 import org.apache.hadoop.hbase.MediumTests;
42 import org.apache.hadoop.hbase.errorhandling.ForeignException;
43 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
44 import org.apache.hadoop.hbase.errorhandling.TimeoutException;
45 import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
46 import org.apache.hadoop.hbase.util.Pair;
47 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
48 import org.junit.AfterClass;
49 import org.junit.BeforeClass;
50 import org.junit.Test;
51 import org.junit.experimental.categories.Category;
52 import org.mockito.Mockito;
53 import org.mockito.internal.matchers.ArrayEquals;
54 import org.mockito.invocation.InvocationOnMock;
55 import org.mockito.stubbing.Answer;
56 import org.mockito.verification.VerificationMode;
57
58 import com.google.common.collect.Lists;
59
60
61
62
63 @Category(MediumTests.class)
64 public class TestZKProcedure {
65
66 private static final Log LOG = LogFactory.getLog(TestZKProcedure.class);
67 private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
68 private static final String COORDINATOR_NODE_NAME = "coordinator";
69 private static final long KEEP_ALIVE = 100;
70 private static final int POOL_SIZE = 1;
71 private static final long TIMEOUT = 10000;
72 private static final long WAKE_FREQUENCY = 500;
73 private static final String opName = "op";
74 private static final byte[] data = new byte[] { 1, 2 };
75 private static final VerificationMode once = Mockito.times(1);
76
77 @BeforeClass
78 public static void setupTest() throws Exception {
79 UTIL.startMiniZKCluster();
80 }
81
82 @AfterClass
83 public static void cleanupTest() throws Exception {
84 UTIL.shutdownMiniZKCluster();
85 }
86
87 private static ZooKeeperWatcher newZooKeeperWatcher() throws IOException {
88 return new ZooKeeperWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() {
89 @Override
90 public void abort(String why, Throwable e) {
91 throw new RuntimeException(
92 "Unexpected abort in distributed three phase commit test:" + why, e);
93 }
94
95 @Override
96 public boolean isAborted() {
97 return false;
98 }
99 });
100 }
101
102 @Test
103 public void testEmptyMemberSet() throws Exception {
104 runCommit();
105 }
106
107 @Test
108 public void testSingleMember() throws Exception {
109 runCommit("one");
110 }
111
112 @Test
113 public void testMultipleMembers() throws Exception {
114 runCommit("one", "two", "three", "four" );
115 }
116
117 private void runCommit(String... members) throws Exception {
118
119 if (members == null) {
120 members = new String[0];
121 }
122 List<String> expected = Arrays.asList(members);
123
124
125 ZooKeeperWatcher coordZkw = newZooKeeperWatcher();
126 String opDescription = "coordination test - " + members.length + " cohort members";
127
128
129 ZKProcedureCoordinatorRpcs coordinatorComms = new ZKProcedureCoordinatorRpcs(
130 coordZkw, opDescription, COORDINATOR_NODE_NAME);
131 ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
132 ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) {
133 @Override
134 public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
135 List<String> expectedMembers) {
136 return Mockito.spy(super.createProcedure(fed, procName, procArgs, expectedMembers));
137 }
138 };
139
140
141
142 SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
143 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
144 members.length);
145
146 for (String member : members) {
147 ZooKeeperWatcher watcher = newZooKeeperWatcher();
148 ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription);
149 ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
150 ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
151 procMembers.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(procMember, comms));
152 comms.start(member, procMember);
153 }
154
155
156 final List<Subprocedure> subprocs = new ArrayList<Subprocedure>();
157 for (int i = 0; i < procMembers.size(); i++) {
158 ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
159 Subprocedure commit = Mockito
160 .spy(new SubprocedureImpl(procMembers.get(i).getFirst(), opName, cohortMonitor,
161 WAKE_FREQUENCY, TIMEOUT));
162 subprocs.add(commit);
163 }
164
165
166 final AtomicInteger i = new AtomicInteger(0);
167 Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName),
168 (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
169 new Answer<Subprocedure>() {
170 @Override
171 public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
172 int index = i.getAndIncrement();
173 LOG.debug("Task size:" + subprocs.size() + ", getting:" + index);
174 Subprocedure commit = subprocs.get(index);
175 return commit;
176 }
177 });
178
179
180
181
182
183
184 Procedure task = coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected);
185
186
187
188
189 waitAndVerifyProc(task, once, once, never(), once, false);
190 verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, never(), once, false);
191
192
193 closeAll(coordinator, coordinatorComms, procMembers);
194 }
195
196
197
198
199
200 @Test
201 public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception {
202 String opDescription = "error injection coordination";
203 String[] cohortMembers = new String[] { "one", "two", "three" };
204 List<String> expected = Lists.newArrayList(cohortMembers);
205
206 final int memberErrorIndex = 2;
207 final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1);
208
209
210 ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher();
211 ZKProcedureCoordinatorRpcs coordinatorController = new ZKProcedureCoordinatorRpcs(
212 coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
213 ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
214 ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool));
215
216
217 SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
218 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
219 expected.size());
220 for (String member : expected) {
221 ZooKeeperWatcher watcher = newZooKeeperWatcher();
222 ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription);
223 ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
224 ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
225 members.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(mem, controller));
226 controller.start(member, mem);
227 }
228
229
230 final List<Subprocedure> cohortTasks = new ArrayList<Subprocedure>();
231 final int[] elem = new int[1];
232 for (int i = 0; i < members.size(); i++) {
233 ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
234 final ProcedureMember comms = members.get(i).getFirst();
235 Subprocedure commit = Mockito
236 .spy(new SubprocedureImpl(comms, opName, cohortMonitor, WAKE_FREQUENCY, TIMEOUT));
237
238 Mockito.doAnswer(new Answer<Void>() {
239 @Override
240 public Void answer(InvocationOnMock invocation) throws Throwable {
241 int index = elem[0];
242 if (index == memberErrorIndex) {
243 LOG.debug("Sending error to coordinator");
244 ForeignException remoteCause = new ForeignException("TIMER",
245 new TimeoutException("subprocTimeout" , 1, 2, 0));
246 Subprocedure r = ((Subprocedure) invocation.getMock());
247 LOG.error("Remote commit failure, not propagating error:" + remoteCause);
248 comms.receiveAbortProcedure(r.getName(), remoteCause);
249 assertEquals(r.isComplete(), true);
250
251
252 try {
253 Procedure.waitForLatch(coordinatorReceivedErrorLatch, new ForeignExceptionDispatcher(),
254 WAKE_FREQUENCY, "coordinator received error");
255 } catch (InterruptedException e) {
256 LOG.debug("Wait for latch interrupted, done:" + (coordinatorReceivedErrorLatch.getCount() == 0));
257
258 Thread.currentThread().interrupt();
259 }
260 }
261 elem[0] = ++index;
262 return null;
263 }
264 }).when(commit).acquireBarrier();
265 cohortTasks.add(commit);
266 }
267
268
269 final int[] i = new int[] { 0 };
270 Mockito.when(
271 subprocFactory.buildSubprocedure(Mockito.eq(opName),
272 (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
273 new Answer<Subprocedure>() {
274 @Override
275 public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
276 int index = i[0];
277 Subprocedure commit = cohortTasks.get(index);
278 index++;
279 i[0] = index;
280 return commit;
281 }
282 });
283
284
285 ForeignExceptionDispatcher coordinatorTaskErrorMonitor = Mockito
286 .spy(new ForeignExceptionDispatcher());
287 Procedure coordinatorTask = Mockito.spy(new Procedure(coordinator,
288 coordinatorTaskErrorMonitor, WAKE_FREQUENCY, TIMEOUT,
289 opName, data, expected));
290 when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(opName), eq(data), anyListOf(String.class)))
291 .thenReturn(coordinatorTask);
292
293 Mockito.doAnswer(new Answer<Void>() {
294 @Override
295 public Void answer(InvocationOnMock invocation) throws Throwable {
296
297 invocation.callRealMethod();
298
299 coordinatorReceivedErrorLatch.countDown();
300 return null;
301 }
302 }).when(coordinatorTask).receive(Mockito.any(ForeignException.class));
303
304
305
306
307
308 Procedure task = coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, expected);
309 assertEquals("Didn't mock coordinator task", coordinatorTask, task);
310
311
312 try {
313 task.waitForCompleted();
314 } catch (ForeignException fe) {
315
316 }
317
318
319
320
321
322
323
324 waitAndVerifyProc(coordinatorTask, once, never(), once, atMost(1), true);
325 verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, never(), once,
326 once, true);
327
328
329 closeAll(coordinator, coordinatorController, members);
330 }
331
332
333
334
335
336
337 private void waitAndVerifyProc(Procedure proc, VerificationMode prepare,
338 VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
339 throws Exception {
340 boolean caughtError = false;
341 try {
342 proc.waitForCompleted();
343 } catch (ForeignException fe) {
344 caughtError = true;
345 }
346
347 Mockito.verify(proc, prepare).sendGlobalBarrierStart();
348 Mockito.verify(proc, commit).sendGlobalBarrierReached();
349 Mockito.verify(proc, finish).sendGlobalBarrierComplete();
350 assertEquals("Operation error state was unexpected", opHasError, proc.getErrorMonitor()
351 .hasException());
352 assertEquals("Operation error state was unexpected", opHasError, caughtError);
353
354 }
355
356
357
358
359
360
361 private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare,
362 VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
363 throws Exception {
364 boolean caughtError = false;
365 try {
366 op.waitForLocallyCompleted();
367 } catch (ForeignException fe) {
368 caughtError = true;
369 }
370
371 Mockito.verify(op, prepare).acquireBarrier();
372 Mockito.verify(op, commit).insideBarrier();
373
374
375 assertEquals("Operation error state was unexpected", opHasError, op.getErrorCheckable()
376 .hasException());
377 assertEquals("Operation error state was unexpected", opHasError, caughtError);
378
379 }
380
381 private void verifyCohortSuccessful(List<String> cohortNames,
382 SubprocedureFactory subprocFactory, Iterable<Subprocedure> cohortTasks,
383 VerificationMode prepare, VerificationMode commit, VerificationMode cleanup,
384 VerificationMode finish, boolean opHasError) throws Exception {
385
386
387 Mockito.verify(subprocFactory, Mockito.times(cohortNames.size())).buildSubprocedure(
388 Mockito.eq(opName), (byte[]) Mockito.argThat(new ArrayEquals(data)));
389
390 int j = 0;
391 for (Subprocedure op : cohortTasks) {
392 LOG.debug("Checking mock:" + (j++));
393 waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError);
394 }
395 }
396
397 private void closeAll(
398 ProcedureCoordinator coordinator,
399 ZKProcedureCoordinatorRpcs coordinatorController,
400 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort)
401 throws IOException {
402
403 for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) {
404 member.getFirst().close();
405 member.getSecond().close();
406 }
407 coordinator.close();
408 coordinatorController.close();
409 }
410 }