View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.mockito.Mockito.never;
22  import static org.mockito.Mockito.spy;
23  import static org.mockito.Mockito.times;
24  import static org.mockito.Mockito.verify;
25  
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.concurrent.CountDownLatch;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.HBaseTestingUtility;
33  import org.apache.hadoop.hbase.MediumTests;
34  import org.apache.hadoop.hbase.errorhandling.ForeignException;
35  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
36  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37  import org.apache.hadoop.hbase.util.Pair;
38  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
39  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
40  import org.junit.AfterClass;
41  import org.junit.BeforeClass;
42  import org.junit.Test;
43  import org.junit.experimental.categories.Category;
44  import org.mockito.Mockito;
45  import org.mockito.invocation.InvocationOnMock;
46  import org.mockito.stubbing.Answer;
47  import org.mockito.verification.VerificationMode;
48  
49  import com.google.common.collect.Lists;
50  
51  /**
52   * Test zookeeper-based, procedure controllers
53   */
54  @Category(MediumTests.class)
55  public class TestZKProcedureControllers {
56  
57    static final Log LOG = LogFactory.getLog(TestZKProcedureControllers.class);
58    private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
59    private static final String COHORT_NODE_NAME = "expected";
60    private static final String CONTROLLER_NODE_NAME = "controller";
61    private static final VerificationMode once = Mockito.times(1);
62  
63    @BeforeClass
64    public static void setupTest() throws Exception {
65      UTIL.startMiniZKCluster();
66    }
67  
68    @AfterClass
69    public static void cleanupTest() throws Exception {
70      UTIL.shutdownMiniZKCluster();
71    }
72  
73    /**
74     * Smaller test to just test the actuation on the cohort member
75     * @throws Exception on failure
76     */
77    @Test(timeout = 60000)
78    public void testSimpleZKCohortMemberController() throws Exception {
79      ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
80      final String operationName = "instanceTest";
81  
82      final Subprocedure sub = Mockito.mock(Subprocedure.class);
83      Mockito.when(sub.getName()).thenReturn(operationName);
84  
85      final byte[] data = new byte[] { 1, 2, 3 };
86      final CountDownLatch prepared = new CountDownLatch(1);
87      final CountDownLatch committed = new CountDownLatch(1);
88  
89      final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher());
90      final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(
91          watcher, "testSimple");
92  
93      // mock out cohort member callbacks
94      final ProcedureMember member = Mockito
95          .mock(ProcedureMember.class);
96      Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data);
97      Mockito.doAnswer(new Answer<Void>() {
98        @Override
99        public Void answer(InvocationOnMock invocation) throws Throwable {
100         controller.sendMemberAcquired(sub);
101         prepared.countDown();
102         return null;
103       }
104     }).when(member).submitSubprocedure(sub);
105     Mockito.doAnswer(new Answer<Void>() {
106       @Override
107       public Void answer(InvocationOnMock invocation) throws Throwable {
108         controller.sendMemberCompleted(sub);
109         committed.countDown();
110         return null;
111       }
112     }).when(member).receivedReachedGlobalBarrier(operationName);
113 
114     // start running the listener
115     controller.start(COHORT_NODE_NAME, member);
116 
117     // set a prepare node from a 'coordinator'
118     String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName);
119     ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data));
120     // wait for the operation to be prepared
121     prepared.await();
122 
123     // create the commit node so we update the operation to enter the commit phase
124     String commit = ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName);
125     LOG.debug("Found prepared, posting commit node:" + commit);
126     ZKUtil.createAndFailSilent(watcher, commit);
127     LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit));
128     committed.await();
129 
130     verify(monitor, never()).receive(Mockito.any(ForeignException.class));
131     // XXX: broken due to composition.
132 //    verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(),
133 //      Mockito.any(IOException.class));
134     // cleanup after the test
135     ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode());
136     assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
137     assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
138   }
139 
140   @Test(timeout = 60000)
141   public void testZKCoordinatorControllerWithNoCohort() throws Exception {
142     final String operationName = "no cohort controller test";
143     final byte[] data = new byte[] { 1, 2, 3 };
144 
145     runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data);
146     runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data);
147   }
148 
149   @Test(timeout = 60000)
150   public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception {
151     final String operationName = "single member controller test";
152     final byte[] data = new byte[] { 1, 2, 3 };
153 
154     runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort");
155     runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort");
156   }
157 
158   @Test(timeout = 60000)
159   public void testZKCoordinatorControllerMultipleCohort() throws Exception {
160     final String operationName = "multi member controller test";
161     final byte[] data = new byte[] { 1, 2, 3 };
162 
163     runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort",
164       "cohort2", "cohort3");
165     runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort",
166       "cohort2", "cohort3");
167   }
168 
169   private void runMockCommitWithOrchestratedControllers(StartControllers controllers,
170       String operationName, byte[] data, String... cohort) throws Exception {
171     ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
172     List<String> expected = Lists.newArrayList(cohort);
173 
174     final Subprocedure sub = Mockito.mock(Subprocedure.class);
175     Mockito.when(sub.getName()).thenReturn(operationName);
176 
177     CountDownLatch prepared = new CountDownLatch(expected.size());
178     CountDownLatch committed = new CountDownLatch(expected.size());
179     // mock out coordinator so we can keep track of zk progress
180     ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
181       prepared, committed);
182 
183     ProcedureMember member = Mockito.mock(ProcedureMember.class);
184 
185     Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> pair = controllers
186         .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
187     ZKProcedureCoordinatorRpcs controller = pair.getFirst();
188     List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
189     // start the operation
190     Procedure p = Mockito.mock(Procedure.class);
191     Mockito.when(p.getName()).thenReturn(operationName);
192 
193     controller.sendGlobalBarrierAcquire(p, data, expected);
194 
195     // post the prepare node for each expected node
196     for (ZKProcedureMemberRpcs cc : cohortControllers) {
197       cc.sendMemberAcquired(sub);
198     }
199 
200     // wait for all the notifications to reach the coordinator
201     prepared.await();
202     // make sure we got the all the nodes and no more
203     Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
204       Mockito.anyString());
205 
206     // kick off the commit phase
207     controller.sendGlobalBarrierReached(p, expected);
208 
209     // post the committed node for each expected node
210     for (ZKProcedureMemberRpcs cc : cohortControllers) {
211       cc.sendMemberCompleted(sub);
212     }
213 
214     // wait for all commit notifications to reach the coordinator
215     committed.await();
216     // make sure we got the all the nodes and no more
217     Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
218       Mockito.anyString());
219 
220     controller.resetMembers(p);
221 
222     // verify all behavior
223     verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
224     verifyCohort(member, cohortControllers.size(), operationName, data);
225     verifyCoordinator(operationName, coordinator, expected);
226   }
227 
228   // TODO Broken by composition.
229 //  @Test
230 //  public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception {
231 //    runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
232 //      "cohort1", "cohort2");
233 //    runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
234 //      "cohort1", "cohort2");
235 //  }
236 
237   public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data,
238       String... cohort) throws Exception {
239     ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
240     List<String> expected = Lists.newArrayList(cohort);
241 
242     final Subprocedure sub = Mockito.mock(Subprocedure.class);
243     Mockito.when(sub.getName()).thenReturn(operationName);
244 
245     final CountDownLatch prepared = new CountDownLatch(expected.size());
246     final CountDownLatch committed = new CountDownLatch(expected.size());
247     // mock out coordinator so we can keep track of zk progress
248     ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
249       prepared, committed);
250 
251     ProcedureMember member = Mockito.mock(ProcedureMember.class);
252     Procedure p = Mockito.mock(Procedure.class);
253     Mockito.when(p.getName()).thenReturn(operationName);
254 
255     Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> pair = controllers
256         .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
257     ZKProcedureCoordinatorRpcs controller = pair.getFirst();
258     List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
259 
260     // post 1/2 the prepare nodes early
261     for (int i = 0; i < cohortControllers.size() / 2; i++) {
262       cohortControllers.get(i).sendMemberAcquired(sub);
263     }
264 
265     // start the operation
266     controller.sendGlobalBarrierAcquire(p, data, expected);
267 
268     // post the prepare node for each expected node
269     for (ZKProcedureMemberRpcs cc : cohortControllers) {
270       cc.sendMemberAcquired(sub);
271     }
272 
273     // wait for all the notifications to reach the coordinator
274     prepared.await();
275     // make sure we got the all the nodes and no more
276     Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
277       Mockito.anyString());
278 
279     // kick off the commit phase
280     controller.sendGlobalBarrierReached(p, expected);
281 
282     // post the committed node for each expected node
283     for (ZKProcedureMemberRpcs cc : cohortControllers) {
284       cc.sendMemberCompleted(sub);
285     }
286 
287     // wait for all commit notifications to reach the coordiantor
288     committed.await();
289     // make sure we got the all the nodes and no more
290     Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
291       Mockito.anyString());
292 
293     controller.resetMembers(p);
294 
295     // verify all behavior
296     verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
297     verifyCohort(member, cohortControllers.size(), operationName, data);
298     verifyCoordinator(operationName, coordinator, expected);
299   }
300 
301   /**
302    * @return a mock {@link ProcedureCoordinator} that just counts down the
303    *         prepared and committed latch for called to the respective method
304    */
305   private ProcedureCoordinator setupMockCoordinator(String operationName,
306       final CountDownLatch prepared, final CountDownLatch committed) {
307     ProcedureCoordinator coordinator = Mockito
308         .mock(ProcedureCoordinator.class);
309     Mockito.mock(ProcedureCoordinator.class);
310     Mockito.doAnswer(new Answer<Void>() {
311       @Override
312       public Void answer(InvocationOnMock invocation) throws Throwable {
313         prepared.countDown();
314         return null;
315       }
316     }).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
317     Mockito.doAnswer(new Answer<Void>() {
318       @Override
319       public Void answer(InvocationOnMock invocation) throws Throwable {
320         committed.countDown();
321         return null;
322       }
323     }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString());
324     return coordinator;
325   }
326 
327   /**
328    * Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper
329    */
330   private void verifyZooKeeperClean(String operationName, ZooKeeperWatcher watcher,
331       ZKProcedureUtil controller) throws Exception {
332     String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName);
333     String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName);
334     String abort = ZKProcedureUtil.getAbortNode(controller, operationName);
335     assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
336     assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
337     assertEquals("Didn't delete abort node", -1, ZKUtil.checkExists(watcher, abort));
338   }
339 
340   /**
341    * Verify the cohort controller got called once per expected node to start the operation
342    */
343   private void verifyCohort(ProcedureMember member, int cohortSize,
344       String operationName, byte[] data) {
345 //    verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName),
346 //      (byte[]) Mockito.argThat(new ArrayEquals(data)));
347     verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.any(Subprocedure.class));
348 
349   }
350 
351   /**
352    * Verify that the coordinator only got called once for each expected node
353    */
354   private void verifyCoordinator(String operationName,
355       ProcedureCoordinator coordinator, List<String> expected) {
356     // verify that we got all the expected nodes
357     for (String node : expected) {
358       verify(coordinator, once).memberAcquiredBarrier(operationName, node);
359       verify(coordinator, once).memberFinishedBarrier(operationName, node);
360     }
361   }
362 
363   /**
364    * Specify how the controllers that should be started (not spy/mockable) for the test.
365    */
366   private abstract class StartControllers {
367     public abstract Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
368         ZooKeeperWatcher watcher, String operationName,
369         ProcedureCoordinator coordinator, String controllerName,
370         ProcedureMember member, List<String> cohortNames) throws Exception;
371   }
372 
373   private final StartControllers startCoordinatorFirst = new StartControllers() {
374 
375     @Override
376     public Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
377         ZooKeeperWatcher watcher, String operationName,
378         ProcedureCoordinator coordinator, String controllerName,
379         ProcedureMember member, List<String> expected) throws Exception {
380       // start the controller
381       ZKProcedureCoordinatorRpcs controller = new ZKProcedureCoordinatorRpcs(
382           watcher, operationName, CONTROLLER_NODE_NAME);
383       controller.start(coordinator);
384 
385       // make a cohort controller for each expected node
386 
387       List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
388       for (String nodeName : expected) {
389         ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
390         cc.start(nodeName, member);
391         cohortControllers.add(cc);
392       }
393       return new Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>>(
394           controller, cohortControllers);
395     }
396   };
397 
398   /**
399    * Check for the possible race condition where a cohort member starts after the controller and
400    * therefore could miss a new operation
401    */
402   private final StartControllers startCohortFirst = new StartControllers() {
403 
404     @Override
405     public Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
406         ZooKeeperWatcher watcher, String operationName,
407         ProcedureCoordinator coordinator, String controllerName,
408         ProcedureMember member, List<String> expected) throws Exception {
409 
410       // make a cohort controller for each expected node
411       List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
412       for (String nodeName : expected) {
413         ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
414         cc.start(nodeName, member);
415         cohortControllers.add(cc);
416       }
417 
418       // start the controller
419       ZKProcedureCoordinatorRpcs controller = new ZKProcedureCoordinatorRpcs(
420           watcher, operationName, CONTROLLER_NODE_NAME);
421       controller.start(coordinator);
422 
423       return new Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>>(
424           controller, cohortControllers);
425     }
426   };
427 }