View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import static org.mockito.Matchers.any;
21  import static org.mockito.Matchers.anyString;
22  import static org.mockito.Matchers.eq;
23  import static org.mockito.Mockito.doAnswer;
24  import static org.mockito.Mockito.doThrow;
25  import static org.mockito.Mockito.inOrder;
26  import static org.mockito.Mockito.mock;
27  import static org.mockito.Mockito.never;
28  import static org.mockito.Mockito.reset;
29  import static org.mockito.Mockito.spy;
30  import static org.mockito.Mockito.times;
31  import static org.mockito.Mockito.verify;
32  import static org.mockito.Mockito.verifyZeroInteractions;
33  import static org.mockito.Mockito.when;
34  
35  import java.io.IOException;
36  import java.util.concurrent.ThreadPoolExecutor;
37  
38  import org.apache.hadoop.hbase.SmallTests;
39  import org.apache.hadoop.hbase.errorhandling.ForeignException;
40  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
41  import org.apache.hadoop.hbase.errorhandling.TimeoutException;
42  import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
43  import org.junit.After;
44  import org.junit.Test;
45  import org.junit.experimental.categories.Category;
46  import org.mockito.InOrder;
47  import org.mockito.Mockito;
48  import org.mockito.invocation.InvocationOnMock;
49  import org.mockito.stubbing.Answer;
50  
51  /**
52   * Test the procedure member, and it's error handling mechanisms.
53   */
54  @Category(SmallTests.class)
55  public class TestProcedureMember {
56    private static final long WAKE_FREQUENCY = 100;
57    private static final long TIMEOUT = 100000;
58    private static final long POOL_KEEP_ALIVE = 1;
59  
60    private final String op = "some op";
61    private final byte[] data = new byte[0];
62    private final ForeignExceptionDispatcher mockListener = Mockito
63        .spy(new ForeignExceptionDispatcher());
64    private final SubprocedureFactory mockBuilder = mock(SubprocedureFactory.class);
65    private final ProcedureMemberRpcs mockMemberComms = Mockito
66        .mock(ProcedureMemberRpcs.class);
67    private ProcedureMember member;
68    private ForeignExceptionDispatcher dispatcher;
69    Subprocedure spySub;
70  
71    /**
72     * Reset all the mock objects
73     */
74    @After
75    public void resetTest() {
76      reset(mockListener, mockBuilder, mockMemberComms);
77      if (member != null)
78        try {
79          member.close();
80        } catch (IOException e) {
81          e.printStackTrace();
82        }
83    }
84  
85    /**
86     * Build a member using the class level mocks
87     * @return member to use for tests
88     */
89    private ProcedureMember buildCohortMember() {
90      String name = "node";
91      ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
92      return new ProcedureMember(mockMemberComms, pool, mockBuilder);
93    }
94  
95    /**
96     * Setup a procedure member that returns the spied-upon {@link Subprocedure}.
97     */
98    private void buildCohortMemberPair() throws IOException {
99      dispatcher = new ForeignExceptionDispatcher();
100     String name = "node";
101     ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
102     member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
103     when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating exception
104     Subprocedure subproc = new EmptySubprocedure(member, dispatcher);
105     spySub = spy(subproc);
106     when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spySub);
107     addCommitAnswer();
108   }
109 
110 
111   /**
112    * Add a 'in barrier phase' response to the mock controller when it gets a acquired notification
113    */
114   private void addCommitAnswer() throws IOException {
115     doAnswer(new Answer<Void>() {
116       @Override
117       public Void answer(InvocationOnMock invocation) throws Throwable {
118         member.receivedReachedGlobalBarrier(op);
119         return null;
120       }
121     }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
122   }
123 
124   /**
125    * Test the normal sub procedure execution case.
126    */
127   @Test(timeout = 500)
128   public void testSimpleRun() throws Exception {
129     member = buildCohortMember();
130     EmptySubprocedure subproc = new EmptySubprocedure(member, mockListener);
131     EmptySubprocedure spy = spy(subproc);
132     when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
133 
134     // when we get a prepare, then start the commit phase
135     addCommitAnswer();
136 
137     // run the operation
138     // build a new operation
139     Subprocedure subproc1 = member.createSubprocedure(op, data);
140     member.submitSubprocedure(subproc1);
141     // and wait for it to finish
142     subproc.waitForLocallyCompleted();
143 
144     // make sure everything ran in order
145     InOrder order = inOrder(mockMemberComms, spy);
146     order.verify(spy).acquireBarrier();
147     order.verify(mockMemberComms).sendMemberAcquired(eq(spy));
148     order.verify(spy).insideBarrier();
149     order.verify(mockMemberComms).sendMemberCompleted(eq(spy));
150     order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy),
151         any(ForeignException.class));
152   }
153 
154   /**
155    * Make sure we call cleanup etc, when we have an exception during
156    * {@link Subprocedure#acquireBarrier()}.
157    */
158   @Test(timeout = 60000)
159   public void testMemberPrepareException() throws Exception {
160     buildCohortMemberPair();
161 
162     // mock an exception on Subprocedure's prepare
163     doAnswer(
164         new Answer<Void>() {
165           @Override
166           public Void answer(InvocationOnMock invocation) throws Throwable {
167             throw new IOException("Forced IOException in member acquireBarrier");
168           }
169         }).when(spySub).acquireBarrier();
170 
171     // run the operation
172     // build a new operation
173     Subprocedure subproc = member.createSubprocedure(op, data);
174     member.submitSubprocedure(subproc);
175     // if the operation doesn't die properly, then this will timeout
176     member.closeAndWait(TIMEOUT);
177 
178     // make sure everything ran in order
179     InOrder order = inOrder(mockMemberComms, spySub);
180     order.verify(spySub).acquireBarrier();
181     // Later phases not run
182     order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub));
183     order.verify(spySub, never()).insideBarrier();
184     order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
185     // error recovery path exercised
186     order.verify(spySub).cancel(anyString(), any(Exception.class));
187     order.verify(spySub).cleanup(any(Exception.class));
188   }
189 
190   /**
191    * Make sure we call cleanup etc, when we have an exception during prepare.
192    */
193   @Test(timeout = 60000)
194   public void testSendMemberAcquiredCommsFailure() throws Exception {
195     buildCohortMemberPair();
196 
197     // mock an exception on Subprocedure's prepare
198     doAnswer(
199         new Answer<Void>() {
200           @Override
201           public Void answer(InvocationOnMock invocation) throws Throwable {
202             throw new IOException("Forced IOException in memeber prepare");
203           }
204         }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
205 
206     // run the operation
207     // build a new operation
208     Subprocedure subproc = member.createSubprocedure(op, data);
209     member.submitSubprocedure(subproc);
210     // if the operation doesn't die properly, then this will timeout
211     member.closeAndWait(TIMEOUT);
212 
213     // make sure everything ran in order
214     InOrder order = inOrder(mockMemberComms, spySub);
215     order.verify(spySub).acquireBarrier();
216     order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
217 
218     // Later phases not run
219     order.verify(spySub, never()).insideBarrier();
220     order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
221     // error recovery path exercised
222     order.verify(spySub).cancel(anyString(), any(Exception.class));
223     order.verify(spySub).cleanup(any(Exception.class));
224   }
225 
226   /**
227    * Fail correctly if coordinator aborts the procedure.  The subprocedure will not interrupt a
228    * running {@link Subprocedure#prepare} -- prepare needs to finish first, and the the abort
229    * is checked.  Thus, the {@link Subprocedure#prepare} should succeed but later get rolled back
230    * via {@link Subprocedure#cleanup}.
231    */
232   @Test(timeout = 60000)
233   public void testCoordinatorAbort() throws Exception {
234     buildCohortMemberPair();
235 
236     // mock that another node timed out or failed to prepare
237     final TimeoutException oate = new TimeoutException("bogus timeout", 1,2,0);
238     doAnswer(
239         new Answer<Void>() {
240           @Override
241           public Void answer(InvocationOnMock invocation) throws Throwable {
242             // inject a remote error (this would have come from an external thread)
243             spySub.cancel("bogus message", oate);
244             // sleep the wake frequency since that is what we promised
245             Thread.sleep(WAKE_FREQUENCY);
246             return null;
247           }
248         }).when(spySub).waitForReachedGlobalBarrier();
249 
250     // run the operation
251     // build a new operation
252     Subprocedure subproc = member.createSubprocedure(op, data);
253     member.submitSubprocedure(subproc);
254     // if the operation doesn't die properly, then this will timeout
255     member.closeAndWait(TIMEOUT);
256 
257     // make sure everything ran in order
258     InOrder order = inOrder(mockMemberComms, spySub);
259     order.verify(spySub).acquireBarrier();
260     order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
261     // Later phases not run
262     order.verify(spySub, never()).insideBarrier();
263     order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
264     // error recovery path exercised
265     order.verify(spySub).cancel(anyString(), any(Exception.class));
266     order.verify(spySub).cleanup(any(Exception.class));
267   }
268 
269   /**
270    * Handle failures if a member's commit phase fails.
271    *
272    * NOTE: This is the core difference that makes this different from traditional 2PC.  In true
273    * 2PC the transaction is committed just before the coordinator sends commit messages to the
274    * member.  Members are then responsible for reading its TX log.  This implementation actually
275    * rolls back, and thus breaks the normal TX guarantees.
276   */
277   @Test(timeout = 60000)
278   public void testMemberCommitException() throws Exception {
279     buildCohortMemberPair();
280 
281     // mock an exception on Subprocedure's prepare
282     doAnswer(
283         new Answer<Void>() {
284           @Override
285           public Void answer(InvocationOnMock invocation) throws Throwable {
286             throw new IOException("Forced IOException in memeber prepare");
287           }
288         }).when(spySub).insideBarrier();
289 
290     // run the operation
291     // build a new operation
292     Subprocedure subproc = member.createSubprocedure(op, data);
293     member.submitSubprocedure(subproc);
294     // if the operation doesn't die properly, then this will timeout
295     member.closeAndWait(TIMEOUT);
296 
297     // make sure everything ran in order
298     InOrder order = inOrder(mockMemberComms, spySub);
299     order.verify(spySub).acquireBarrier();
300     order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
301     order.verify(spySub).insideBarrier();
302 
303     // Later phases not run
304     order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
305     // error recovery path exercised
306     order.verify(spySub).cancel(anyString(), any(Exception.class));
307     order.verify(spySub).cleanup(any(Exception.class));
308   }
309 
310   /**
311    * Handle Failures if a member's commit phase succeeds but notification to coordinator fails
312    *
313    * NOTE: This is the core difference that makes this different from traditional 2PC.  In true
314    * 2PC the transaction is committed just before the coordinator sends commit messages to the
315    * member.  Members are then responsible for reading its TX log.  This implementation actually
316    * rolls back, and thus breaks the normal TX guarantees.
317   */
318   @Test(timeout = 60000)
319   public void testMemberCommitCommsFailure() throws Exception {
320     buildCohortMemberPair();
321     final TimeoutException oate = new TimeoutException("bogus timeout",1,2,0);
322     doAnswer(
323         new Answer<Void>() {
324           @Override
325           public Void answer(InvocationOnMock invocation) throws Throwable {
326             // inject a remote error (this would have come from an external thread)
327             spySub.cancel("commit comms fail", oate);
328             // sleep the wake frequency since that is what we promised
329             Thread.sleep(WAKE_FREQUENCY);
330             return null;
331           }
332         }).when(mockMemberComms).sendMemberCompleted(any(Subprocedure.class));
333 
334     // run the operation
335     // build a new operation
336     Subprocedure subproc = member.createSubprocedure(op, data);
337     member.submitSubprocedure(subproc);
338     // if the operation doesn't die properly, then this will timeout
339     member.closeAndWait(TIMEOUT);
340 
341     // make sure everything ran in order
342     InOrder order = inOrder(mockMemberComms, spySub);
343     order.verify(spySub).acquireBarrier();
344     order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
345     order.verify(spySub).insideBarrier();
346     order.verify(mockMemberComms).sendMemberCompleted(eq(spySub));
347     // error recovery path exercised
348     order.verify(spySub).cancel(anyString(), any(Exception.class));
349     order.verify(spySub).cleanup(any(Exception.class));
350   }
351 
352   /**
353    * Fail correctly on getting an external error while waiting for the prepared latch
354    * @throws Exception on failure
355    */
356   @Test(timeout = 60000)
357   public void testPropagateConnectionErrorBackToManager() throws Exception {
358     // setup the operation
359     member = buildCohortMember();
360     ProcedureMember memberSpy = spy(member);
361 
362     // setup the commit and the spy
363     final ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher();
364     ForeignExceptionDispatcher dispSpy = spy(dispatcher);
365     Subprocedure commit = new EmptySubprocedure(member, dispatcher);
366     Subprocedure spy = spy(commit);
367     when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
368 
369     // fail during the prepare phase
370     doThrow(new ForeignException("SRC", "prepare exception")).when(spy).acquireBarrier();
371     // and throw a connection error when we try to tell the controller about it
372     doThrow(new IOException("Controller is down!")).when(mockMemberComms)
373         .sendMemberAborted(eq(spy), any(ForeignException.class));
374 
375 
376     // run the operation
377     // build a new operation
378     Subprocedure subproc = memberSpy.createSubprocedure(op, data);
379     memberSpy.submitSubprocedure(subproc);
380     // if the operation doesn't die properly, then this will timeout
381     memberSpy.closeAndWait(TIMEOUT);
382 
383     // make sure everything ran in order
384     InOrder order = inOrder(mockMemberComms, spy, dispSpy);
385     // make sure we acquire.
386     order.verify(spy).acquireBarrier();
387     order.verify(mockMemberComms, never()).sendMemberAcquired(spy);
388 
389     // TODO Need to do another refactor to get this to propagate to the coordinator.
390     // make sure we pass a remote exception back the controller
391 //    order.verify(mockMemberComms).sendMemberAborted(eq(spy),
392 //      any(ExternalException.class));
393 //    order.verify(dispSpy).receiveError(anyString(),
394 //        any(ExternalException.class), any());
395   }
396 
397   /**
398    * Test that the cohort member correctly doesn't attempt to start a task when the builder cannot
399    * correctly build a new task for the requested operation
400    * @throws Exception on failure
401    */
402   @Test
403   public void testNoTaskToBeRunFromRequest() throws Exception {
404     ThreadPoolExecutor pool = mock(ThreadPoolExecutor.class);
405     when(mockBuilder.buildSubprocedure(op, data)).thenReturn(null)
406       .thenThrow(new IllegalStateException("Wrong state!"), new IllegalArgumentException("can't understand the args"));
407     member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
408     // builder returns null
409     // build a new operation
410     Subprocedure subproc = member.createSubprocedure(op, data);
411     member.submitSubprocedure(subproc);
412     // throws an illegal state exception
413     try {
414       // build a new operation
415       Subprocedure subproc2 = member.createSubprocedure(op, data);
416       member.submitSubprocedure(subproc2);
417     } catch (IllegalStateException ise) {
418     }
419     // throws an illegal argument exception
420     try {
421       // build a new operation
422       Subprocedure subproc3 = member.createSubprocedure(op, data);
423       member.submitSubprocedure(subproc3);
424     } catch (IllegalArgumentException iae) {
425     }
426 
427     // no request should reach the pool
428     verifyZeroInteractions(pool);
429     // get two abort requests
430     // TODO Need to do another refactor to get this to propagate to the coordinator.
431     // verify(mockMemberComms, times(2)).sendMemberAborted(any(Subprocedure.class), any(ExternalException.class));
432   }
433 
434   /**
435    * Helper {@link Procedure} who's phase for each step is just empty
436    */
437   public class EmptySubprocedure extends SubprocedureImpl {
438     public EmptySubprocedure(ProcedureMember member, ForeignExceptionDispatcher dispatcher) {
439       super( member, op, dispatcher,
440       // TODO 1000000 is an arbitrary number that I picked.
441           WAKE_FREQUENCY, TIMEOUT);
442     }
443   }
444 }