View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import static org.junit.Assert.assertNull;
21  import static org.junit.Assert.assertTrue;
22  import static org.mockito.Matchers.any;
23  import static org.mockito.Matchers.anyListOf;
24  import static org.mockito.Matchers.anyString;
25  import static org.mockito.Matchers.eq;
26  import static org.mockito.Mockito.atLeastOnce;
27  import static org.mockito.Mockito.doAnswer;
28  import static org.mockito.Mockito.doThrow;
29  import static org.mockito.Mockito.inOrder;
30  import static org.mockito.Mockito.mock;
31  import static org.mockito.Mockito.never;
32  import static org.mockito.Mockito.reset;
33  import static org.mockito.Mockito.spy;
34  import static org.mockito.Mockito.times;
35  import static org.mockito.Mockito.verify;
36  import static org.mockito.Mockito.when;
37  
38  import java.io.IOException;
39  import java.util.Arrays;
40  import java.util.List;
41  import java.util.concurrent.ThreadPoolExecutor;
42  import java.util.concurrent.TimeUnit;
43  
44  import org.apache.hadoop.hbase.SmallTests;
45  import org.apache.hadoop.hbase.errorhandling.ForeignException;
46  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
47  import org.junit.After;
48  import org.junit.Test;
49  import org.junit.experimental.categories.Category;
50  import org.mockito.InOrder;
51  import org.mockito.invocation.InvocationOnMock;
52  import org.mockito.stubbing.Answer;
53  
54  import com.google.common.collect.Lists;
55  
56  /**
57   * Test Procedure coordinator operation.
58   * <p>
59   * This only works correctly when we do <i>class level parallelization</i> of tests. If we do method
60   * level serialization this class will likely throw all kinds of errors.
61   */
62  @Category(SmallTests.class)
63  public class TestProcedureCoordinator {
64    // general test constants
65    private static final long WAKE_FREQUENCY = 1000;
66    private static final long TIMEOUT = 100000;
67    private static final long POOL_KEEP_ALIVE = 1;
68    private static final String nodeName = "node";
69    private static final String procName = "some op";
70    private static final byte[] procData = new byte[0];
71    private static final List<String> expected = Lists.newArrayList("remote1", "remote2");
72  
73    // setup the mocks
74    private final ProcedureCoordinatorRpcs controller = mock(ProcedureCoordinatorRpcs.class);
75    private final Procedure task = mock(Procedure.class);
76    private final ForeignExceptionDispatcher monitor = mock(ForeignExceptionDispatcher.class);
77  
78    // handle to the coordinator for each test
79    private ProcedureCoordinator coordinator;
80  
81    @After
82    public void resetTest() throws IOException {
83      // reset all the mocks used for the tests
84      reset(controller, task, monitor);
85      // close the open coordinator, if it was used
86      if (coordinator != null) coordinator.close();
87    }
88  
89    private ProcedureCoordinator buildNewCoordinator() {
90      ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 1, POOL_KEEP_ALIVE);
91      return spy(new ProcedureCoordinator(controller, pool));
92    }
93  
94    /**
95     * Currently we can only handle one procedure at a time.  This makes sure we handle that and
96     * reject submitting more.
97     */
98    @Test
99    public void testThreadPoolSize() throws Exception {
100     ProcedureCoordinator coordinator = buildNewCoordinator();
101     Procedure proc = new Procedure(coordinator,  monitor,
102         WAKE_FREQUENCY, TIMEOUT, procName, procData, expected);
103     Procedure procSpy = spy(proc);
104 
105     Procedure proc2 = new Procedure(coordinator,  monitor,
106         WAKE_FREQUENCY, TIMEOUT, procName +"2", procData, expected);
107     Procedure procSpy2 = spy(proc2);
108     when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
109     .thenReturn(procSpy, procSpy2);
110 
111     coordinator.startProcedure(procSpy.getErrorMonitor(), procName, procData, expected);
112     // null here means second procedure failed to start.
113     assertNull("Coordinator successfully ran two tasks at once with a single thread pool.",
114       coordinator.startProcedure(proc2.getErrorMonitor(), "another op", procData, expected));
115   }
116 
117   /**
118    * Check handling a connection failure correctly if we get it during the acquiring phase
119    */
120   @Test(timeout = 60000)
121   public void testUnreachableControllerDuringPrepare() throws Exception {
122     coordinator = buildNewCoordinator();
123     // setup the proc
124     List<String> expected = Arrays.asList("cohort");
125     Procedure proc = new Procedure(coordinator, WAKE_FREQUENCY,
126         TIMEOUT, procName, procData, expected);
127     final Procedure procSpy = spy(proc);
128 
129     when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
130         .thenReturn(procSpy);
131 
132     // use the passed controller responses
133     IOException cause = new IOException("Failed to reach comms during acquire");
134     doThrow(cause).when(controller)
135         .sendGlobalBarrierAcquire(eq(procSpy), eq(procData), anyListOf(String.class));
136 
137     // run the operation
138     proc = coordinator.startProcedure(proc.getErrorMonitor(), procName, procData, expected);
139     // and wait for it to finish
140     while(!proc.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS));
141     verify(procSpy, atLeastOnce()).receive(any(ForeignException.class));
142     verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
143     verify(controller, times(1)).sendGlobalBarrierAcquire(procSpy, procData, expected);
144     verify(controller, never()).sendGlobalBarrierReached(any(Procedure.class),
145         anyListOf(String.class));
146   }
147 
148   /**
149    * Check handling a connection failure correctly if we get it during the barrier phase
150    */
151   @Test(timeout = 60000)
152   public void testUnreachableControllerDuringCommit() throws Exception {
153     coordinator = buildNewCoordinator();
154 
155     // setup the task and spy on it
156     List<String> expected = Arrays.asList("cohort");
157     final Procedure spy = spy(new Procedure(coordinator,
158         WAKE_FREQUENCY, TIMEOUT, procName, procData, expected));
159 
160     when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
161     .thenReturn(spy);
162 
163     // use the passed controller responses
164     IOException cause = new IOException("Failed to reach controller during prepare");
165     doAnswer(new AcquireBarrierAnswer(procName, new String[] { "cohort" }))
166         .when(controller).sendGlobalBarrierAcquire(eq(spy), eq(procData), anyListOf(String.class));
167     doThrow(cause).when(controller).sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
168 
169     // run the operation
170     Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
171     // and wait for it to finish
172     while(!task.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS));
173     verify(spy, atLeastOnce()).receive(any(ForeignException.class));
174     verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
175     verify(controller, times(1)).sendGlobalBarrierAcquire(eq(spy),
176         eq(procData), anyListOf(String.class));
177     verify(controller, times(1)).sendGlobalBarrierReached(any(Procedure.class),
178         anyListOf(String.class));
179   }
180 
181   @Test(timeout = 60000)
182   public void testNoCohort() throws Exception {
183     runSimpleProcedure();
184   }
185 
186   @Test(timeout = 60000)
187   public void testSingleCohortOrchestration() throws Exception {
188     runSimpleProcedure("one");
189   }
190 
191   @Test(timeout = 60000)
192   public void testMultipleCohortOrchestration() throws Exception {
193     runSimpleProcedure("one", "two", "three", "four");
194   }
195 
196   public void runSimpleProcedure(String... members) throws Exception {
197     coordinator = buildNewCoordinator();
198     Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY,
199         TIMEOUT, procName, procData, Arrays.asList(members));
200     final Procedure spy = spy(task);
201     runCoordinatedProcedure(spy, members);
202   }
203 
204   /**
205    * Test that if nodes join the barrier early we still correctly handle the progress
206    */
207   @Test(timeout = 60000)
208   public void testEarlyJoiningBarrier() throws Exception {
209     final String[] cohort = new String[] { "one", "two", "three", "four" };
210     coordinator = buildNewCoordinator();
211     final ProcedureCoordinator ref = coordinator;
212     Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY,
213         TIMEOUT, procName, procData, Arrays.asList(cohort));
214     final Procedure spy = spy(task);
215 
216     AcquireBarrierAnswer prepare = new AcquireBarrierAnswer(procName, cohort) {
217       public void doWork() {
218         // then do some fun where we commit before all nodes have prepared
219         // "one" commits before anyone else is done
220         ref.memberAcquiredBarrier(this.opName, this.cohort[0]);
221         ref.memberFinishedBarrier(this.opName, this.cohort[0]);
222         // but "two" takes a while
223         ref.memberAcquiredBarrier(this.opName, this.cohort[1]);
224         // "three"jumps ahead
225         ref.memberAcquiredBarrier(this.opName, this.cohort[2]);
226         ref.memberFinishedBarrier(this.opName, this.cohort[2]);
227         // and "four" takes a while
228         ref.memberAcquiredBarrier(this.opName, this.cohort[3]);
229       }
230     };
231 
232     BarrierAnswer commit = new BarrierAnswer(procName, cohort) {
233       @Override
234       public void doWork() {
235         ref.memberFinishedBarrier(opName, this.cohort[1]);
236         ref.memberFinishedBarrier(opName, this.cohort[3]);
237       }
238     };
239     runCoordinatedOperation(spy, prepare, commit, cohort);
240   }
241 
242   /**
243    * Just run a procedure with the standard name and data, with not special task for the mock
244    * coordinator (it works just like a regular coordinator). For custom behavior see
245    * {@link #runCoordinatedOperation(Procedure, AcquireBarrierAnswer, BarrierAnswer, String[])}
246    * .
247    * @param spy Spy on a real {@link Procedure}
248    * @param cohort expected cohort members
249    * @throws Exception on failure
250    */
251   public void runCoordinatedProcedure(Procedure spy, String... cohort) throws Exception {
252     runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort),
253       new BarrierAnswer(procName, cohort), cohort);
254   }
255 
256   public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepare,
257       String... cohort) throws Exception {
258     runCoordinatedOperation(spy, prepare, new BarrierAnswer(procName, cohort), cohort);
259   }
260 
261   public void runCoordinatedOperation(Procedure spy, BarrierAnswer commit,
262       String... cohort) throws Exception {
263     runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), commit, cohort);
264   }
265 
266   public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepareOperation,
267       BarrierAnswer commitOperation, String... cohort) throws Exception {
268     List<String> expected = Arrays.asList(cohort);
269     when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
270       .thenReturn(spy);
271 
272     // use the passed controller responses
273     doAnswer(prepareOperation).when(controller).sendGlobalBarrierAcquire(spy, procData, expected);
274     doAnswer(commitOperation).when(controller)
275         .sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
276 
277     // run the operation
278     Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
279     // and wait for it to finish
280     task.waitForCompleted();
281 
282     // make sure we mocked correctly
283     prepareOperation.ensureRan();
284     // we never got an exception
285     InOrder inorder = inOrder(spy, controller);
286     inorder.verify(spy).sendGlobalBarrierStart();
287     inorder.verify(controller).sendGlobalBarrierAcquire(task, procData, expected);
288     inorder.verify(spy).sendGlobalBarrierReached();
289     inorder.verify(controller).sendGlobalBarrierReached(eq(task), anyListOf(String.class));
290   }
291 
292   private abstract class OperationAnswer implements Answer<Void> {
293     private boolean ran = false;
294 
295     public void ensureRan() {
296       assertTrue("Prepare mocking didn't actually run!", ran);
297     }
298 
299     @Override
300     public final Void answer(InvocationOnMock invocation) throws Throwable {
301       this.ran = true;
302       doWork();
303       return null;
304     }
305 
306     protected abstract void doWork() throws Throwable;
307   }
308 
309   /**
310    * Just tell the current coordinator that each of the nodes has prepared
311    */
312   private class AcquireBarrierAnswer extends OperationAnswer {
313     protected final String[] cohort;
314     protected final String opName;
315 
316     public AcquireBarrierAnswer(String opName, String... cohort) {
317       this.cohort = cohort;
318       this.opName = opName;
319     }
320 
321     @Override
322     public void doWork() {
323       if (cohort == null) return;
324       for (String member : cohort) {
325         TestProcedureCoordinator.this.coordinator.memberAcquiredBarrier(opName, member);
326       }
327     }
328   }
329 
330   /**
331    * Just tell the current coordinator that each of the nodes has committed
332    */
333   private class BarrierAnswer extends OperationAnswer {
334     protected final String[] cohort;
335     protected final String opName;
336 
337     public BarrierAnswer(String opName, String... cohort) {
338       this.cohort = cohort;
339       this.opName = opName;
340     }
341 
342     @Override
343     public void doWork() {
344       if (cohort == null) return;
345       for (String member : cohort) {
346         TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member);
347       }
348     }
349   }
350 }