1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import static org.mockito.Matchers.any;
21 import static org.mockito.Matchers.anyString;
22 import static org.mockito.Matchers.eq;
23 import static org.mockito.Mockito.doAnswer;
24 import static org.mockito.Mockito.doThrow;
25 import static org.mockito.Mockito.inOrder;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.never;
28 import static org.mockito.Mockito.reset;
29 import static org.mockito.Mockito.spy;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.verifyZeroInteractions;
33 import static org.mockito.Mockito.when;
34
35 import java.io.IOException;
36 import java.util.concurrent.ThreadPoolExecutor;
37
38 import org.apache.hadoop.hbase.SmallTests;
39 import org.apache.hadoop.hbase.errorhandling.ForeignException;
40 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
41 import org.apache.hadoop.hbase.errorhandling.TimeoutException;
42 import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
43 import org.junit.After;
44 import org.junit.Test;
45 import org.junit.experimental.categories.Category;
46 import org.mockito.InOrder;
47 import org.mockito.Mockito;
48 import org.mockito.invocation.InvocationOnMock;
49 import org.mockito.stubbing.Answer;
50
51
52
53
54 @Category(SmallTests.class)
55 public class TestProcedureMember {
56 private static final long WAKE_FREQUENCY = 100;
57 private static final long TIMEOUT = 100000;
58 private static final long POOL_KEEP_ALIVE = 1;
59
60 private final String op = "some op";
61 private final byte[] data = new byte[0];
62 private final ForeignExceptionDispatcher mockListener = Mockito
63 .spy(new ForeignExceptionDispatcher());
64 private final SubprocedureFactory mockBuilder = mock(SubprocedureFactory.class);
65 private final ProcedureMemberRpcs mockMemberComms = Mockito
66 .mock(ProcedureMemberRpcs.class);
67 private ProcedureMember member;
68 private ForeignExceptionDispatcher dispatcher;
69 Subprocedure spySub;
70
71
72
73
74 @After
75 public void resetTest() {
76 reset(mockListener, mockBuilder, mockMemberComms);
77 if (member != null)
78 try {
79 member.close();
80 } catch (IOException e) {
81 e.printStackTrace();
82 }
83 }
84
85
86
87
88
89 private ProcedureMember buildCohortMember() {
90 String name = "node";
91 ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
92 return new ProcedureMember(mockMemberComms, pool, mockBuilder);
93 }
94
95
96
97
98 private void buildCohortMemberPair() throws IOException {
99 dispatcher = new ForeignExceptionDispatcher();
100 String name = "node";
101 ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
102 member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
103 when(mockMemberComms.getMemberName()).thenReturn("membername");
104 Subprocedure subproc = new EmptySubprocedure(member, dispatcher);
105 spySub = spy(subproc);
106 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spySub);
107 addCommitAnswer();
108 }
109
110
111
112
113
114 private void addCommitAnswer() throws IOException {
115 doAnswer(new Answer<Void>() {
116 @Override
117 public Void answer(InvocationOnMock invocation) throws Throwable {
118 member.receivedReachedGlobalBarrier(op);
119 return null;
120 }
121 }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
122 }
123
124
125
126
127 @Test(timeout = 500)
128 public void testSimpleRun() throws Exception {
129 member = buildCohortMember();
130 EmptySubprocedure subproc = new EmptySubprocedure(member, mockListener);
131 EmptySubprocedure spy = spy(subproc);
132 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
133
134
135 addCommitAnswer();
136
137
138
139 Subprocedure subproc1 = member.createSubprocedure(op, data);
140 member.submitSubprocedure(subproc1);
141
142 subproc.waitForLocallyCompleted();
143
144
145 InOrder order = inOrder(mockMemberComms, spy);
146 order.verify(spy).acquireBarrier();
147 order.verify(mockMemberComms).sendMemberAcquired(eq(spy));
148 order.verify(spy).insideBarrier();
149 order.verify(mockMemberComms).sendMemberCompleted(eq(spy));
150 order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy),
151 any(ForeignException.class));
152 }
153
154
155
156
157
158 @Test(timeout = 60000)
159 public void testMemberPrepareException() throws Exception {
160 buildCohortMemberPair();
161
162
163 doAnswer(
164 new Answer<Void>() {
165 @Override
166 public Void answer(InvocationOnMock invocation) throws Throwable {
167 throw new IOException("Forced IOException in member acquireBarrier");
168 }
169 }).when(spySub).acquireBarrier();
170
171
172
173 Subprocedure subproc = member.createSubprocedure(op, data);
174 member.submitSubprocedure(subproc);
175
176 member.closeAndWait(TIMEOUT);
177
178
179 InOrder order = inOrder(mockMemberComms, spySub);
180 order.verify(spySub).acquireBarrier();
181
182 order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub));
183 order.verify(spySub, never()).insideBarrier();
184 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
185
186 order.verify(spySub).cancel(anyString(), any(Exception.class));
187 order.verify(spySub).cleanup(any(Exception.class));
188 }
189
190
191
192
193 @Test(timeout = 60000)
194 public void testSendMemberAcquiredCommsFailure() throws Exception {
195 buildCohortMemberPair();
196
197
198 doAnswer(
199 new Answer<Void>() {
200 @Override
201 public Void answer(InvocationOnMock invocation) throws Throwable {
202 throw new IOException("Forced IOException in memeber prepare");
203 }
204 }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
205
206
207
208 Subprocedure subproc = member.createSubprocedure(op, data);
209 member.submitSubprocedure(subproc);
210
211 member.closeAndWait(TIMEOUT);
212
213
214 InOrder order = inOrder(mockMemberComms, spySub);
215 order.verify(spySub).acquireBarrier();
216 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
217
218
219 order.verify(spySub, never()).insideBarrier();
220 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
221
222 order.verify(spySub).cancel(anyString(), any(Exception.class));
223 order.verify(spySub).cleanup(any(Exception.class));
224 }
225
226
227
228
229
230
231
232 @Test(timeout = 60000)
233 public void testCoordinatorAbort() throws Exception {
234 buildCohortMemberPair();
235
236
237 final TimeoutException oate = new TimeoutException("bogus timeout", 1,2,0);
238 doAnswer(
239 new Answer<Void>() {
240 @Override
241 public Void answer(InvocationOnMock invocation) throws Throwable {
242
243 spySub.cancel("bogus message", oate);
244
245 Thread.sleep(WAKE_FREQUENCY);
246 return null;
247 }
248 }).when(spySub).waitForReachedGlobalBarrier();
249
250
251
252 Subprocedure subproc = member.createSubprocedure(op, data);
253 member.submitSubprocedure(subproc);
254
255 member.closeAndWait(TIMEOUT);
256
257
258 InOrder order = inOrder(mockMemberComms, spySub);
259 order.verify(spySub).acquireBarrier();
260 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
261
262 order.verify(spySub, never()).insideBarrier();
263 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
264
265 order.verify(spySub).cancel(anyString(), any(Exception.class));
266 order.verify(spySub).cleanup(any(Exception.class));
267 }
268
269
270
271
272
273
274
275
276
277 @Test(timeout = 60000)
278 public void testMemberCommitException() throws Exception {
279 buildCohortMemberPair();
280
281
282 doAnswer(
283 new Answer<Void>() {
284 @Override
285 public Void answer(InvocationOnMock invocation) throws Throwable {
286 throw new IOException("Forced IOException in memeber prepare");
287 }
288 }).when(spySub).insideBarrier();
289
290
291
292 Subprocedure subproc = member.createSubprocedure(op, data);
293 member.submitSubprocedure(subproc);
294
295 member.closeAndWait(TIMEOUT);
296
297
298 InOrder order = inOrder(mockMemberComms, spySub);
299 order.verify(spySub).acquireBarrier();
300 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
301 order.verify(spySub).insideBarrier();
302
303
304 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
305
306 order.verify(spySub).cancel(anyString(), any(Exception.class));
307 order.verify(spySub).cleanup(any(Exception.class));
308 }
309
310
311
312
313
314
315
316
317
318 @Test(timeout = 60000)
319 public void testMemberCommitCommsFailure() throws Exception {
320 buildCohortMemberPair();
321 final TimeoutException oate = new TimeoutException("bogus timeout",1,2,0);
322 doAnswer(
323 new Answer<Void>() {
324 @Override
325 public Void answer(InvocationOnMock invocation) throws Throwable {
326
327 spySub.cancel("commit comms fail", oate);
328
329 Thread.sleep(WAKE_FREQUENCY);
330 return null;
331 }
332 }).when(mockMemberComms).sendMemberCompleted(any(Subprocedure.class));
333
334
335
336 Subprocedure subproc = member.createSubprocedure(op, data);
337 member.submitSubprocedure(subproc);
338
339 member.closeAndWait(TIMEOUT);
340
341
342 InOrder order = inOrder(mockMemberComms, spySub);
343 order.verify(spySub).acquireBarrier();
344 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
345 order.verify(spySub).insideBarrier();
346 order.verify(mockMemberComms).sendMemberCompleted(eq(spySub));
347
348 order.verify(spySub).cancel(anyString(), any(Exception.class));
349 order.verify(spySub).cleanup(any(Exception.class));
350 }
351
352
353
354
355
356 @Test(timeout = 60000)
357 public void testPropagateConnectionErrorBackToManager() throws Exception {
358
359 member = buildCohortMember();
360 ProcedureMember memberSpy = spy(member);
361
362
363 final ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher();
364 ForeignExceptionDispatcher dispSpy = spy(dispatcher);
365 Subprocedure commit = new EmptySubprocedure(member, dispatcher);
366 Subprocedure spy = spy(commit);
367 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
368
369
370 doThrow(new ForeignException("SRC", "prepare exception")).when(spy).acquireBarrier();
371
372 doThrow(new IOException("Controller is down!")).when(mockMemberComms)
373 .sendMemberAborted(eq(spy), any(ForeignException.class));
374
375
376
377
378 Subprocedure subproc = memberSpy.createSubprocedure(op, data);
379 memberSpy.submitSubprocedure(subproc);
380
381 memberSpy.closeAndWait(TIMEOUT);
382
383
384 InOrder order = inOrder(mockMemberComms, spy, dispSpy);
385
386 order.verify(spy).acquireBarrier();
387 order.verify(mockMemberComms, never()).sendMemberAcquired(spy);
388
389
390
391
392
393
394
395 }
396
397
398
399
400
401
402 @Test
403 public void testNoTaskToBeRunFromRequest() throws Exception {
404 ThreadPoolExecutor pool = mock(ThreadPoolExecutor.class);
405 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(null)
406 .thenThrow(new IllegalStateException("Wrong state!"), new IllegalArgumentException("can't understand the args"));
407 member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
408
409
410 Subprocedure subproc = member.createSubprocedure(op, data);
411 member.submitSubprocedure(subproc);
412
413 try {
414
415 Subprocedure subproc2 = member.createSubprocedure(op, data);
416 member.submitSubprocedure(subproc2);
417 } catch (IllegalStateException ise) {
418 }
419
420 try {
421
422 Subprocedure subproc3 = member.createSubprocedure(op, data);
423 member.submitSubprocedure(subproc3);
424 } catch (IllegalArgumentException iae) {
425 }
426
427
428 verifyZeroInteractions(pool);
429
430
431
432 }
433
434
435
436
437 public class EmptySubprocedure extends SubprocedureImpl {
438 public EmptySubprocedure(ProcedureMember member, ForeignExceptionDispatcher dispatcher) {
439 super( member, op, dispatcher,
440
441 WAKE_FREQUENCY, TIMEOUT);
442 }
443 }
444 }