View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.mockito.Matchers.any;
22  import static org.mockito.Matchers.anyListOf;
23  import static org.mockito.Matchers.eq;
24  import static org.mockito.Mockito.atMost;
25  import static org.mockito.Mockito.never;
26  import static org.mockito.Mockito.spy;
27  import static org.mockito.Mockito.when;
28  
29  import java.io.IOException;
30  import java.util.ArrayList;
31  import java.util.Arrays;
32  import java.util.List;
33  import java.util.concurrent.CountDownLatch;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.atomic.AtomicInteger;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.hbase.Abortable;
40  import org.apache.hadoop.hbase.HBaseTestingUtility;
41  import org.apache.hadoop.hbase.MediumTests;
42  import org.apache.hadoop.hbase.errorhandling.ForeignException;
43  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
44  import org.apache.hadoop.hbase.errorhandling.TimeoutException;
45  import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
46  import org.apache.hadoop.hbase.util.Pair;
47  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
48  import org.junit.AfterClass;
49  import org.junit.BeforeClass;
50  import org.junit.Test;
51  import org.junit.experimental.categories.Category;
52  import org.mockito.Mockito;
53  import org.mockito.internal.matchers.ArrayEquals;
54  import org.mockito.invocation.InvocationOnMock;
55  import org.mockito.stubbing.Answer;
56  import org.mockito.verification.VerificationMode;
57  
58  import com.google.common.collect.Lists;
59  
60  /**
61   * Cluster-wide testing of a distributed three-phase commit using a 'real' zookeeper cluster
62   */
63  @Category(MediumTests.class)
64  public class TestZKProcedure {
65  
66    private static final Log LOG = LogFactory.getLog(TestZKProcedure.class);
67    private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
68    private static final String COORDINATOR_NODE_NAME = "coordinator";
69    private static final long KEEP_ALIVE = 100; // seconds
70    private static final int POOL_SIZE = 1;
71    private static final long TIMEOUT = 10000; // when debugging make this larger for debugging
72    private static final long WAKE_FREQUENCY = 500;
73    private static final String opName = "op";
74    private static final byte[] data = new byte[] { 1, 2 }; // TODO what is this used for?
75    private static final VerificationMode once = Mockito.times(1);
76  
77    @BeforeClass
78    public static void setupTest() throws Exception {
79      UTIL.startMiniZKCluster();
80    }
81  
82    @AfterClass
83    public static void cleanupTest() throws Exception {
84      UTIL.shutdownMiniZKCluster();
85    }
86  
87    private static ZooKeeperWatcher newZooKeeperWatcher() throws IOException {
88      return new ZooKeeperWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() {
89        @Override
90        public void abort(String why, Throwable e) {
91          throw new RuntimeException(
92              "Unexpected abort in distributed three phase commit test:" + why, e);
93        }
94  
95        @Override
96        public boolean isAborted() {
97          return false;
98        }
99      });
100   }
101 
102   @Test
103   public void testEmptyMemberSet() throws Exception {
104     runCommit();
105   }
106 
107   @Test
108   public void testSingleMember() throws Exception {
109     runCommit("one");
110   }
111 
112   @Test
113   public void testMultipleMembers() throws Exception {
114     runCommit("one", "two", "three", "four" );
115   }
116 
117   private void runCommit(String... members) throws Exception {
118     // make sure we just have an empty list
119     if (members == null) {
120       members = new String[0];
121     }
122     List<String> expected = Arrays.asList(members);
123 
124     // setup the constants
125     ZooKeeperWatcher coordZkw = newZooKeeperWatcher();
126     String opDescription = "coordination test - " + members.length + " cohort members";
127 
128     // start running the controller
129     ZKProcedureCoordinatorRpcs coordinatorComms = new ZKProcedureCoordinatorRpcs(
130         coordZkw, opDescription, COORDINATOR_NODE_NAME);
131     ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
132     ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) {
133       @Override
134       public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
135           List<String> expectedMembers) {
136         return Mockito.spy(super.createProcedure(fed, procName, procArgs, expectedMembers));
137       }
138     };
139 
140     // build and start members
141     // NOTE: There is a single subprocedure builder for all members here.
142     SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
143     List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
144         members.length);
145     // start each member
146     for (String member : members) {
147       ZooKeeperWatcher watcher = newZooKeeperWatcher();
148       ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription);
149       ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
150       ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
151       procMembers.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(procMember, comms));
152       comms.start(member, procMember);
153     }
154 
155     // setup mock member subprocedures
156     final List<Subprocedure> subprocs = new ArrayList<Subprocedure>();
157     for (int i = 0; i < procMembers.size(); i++) {
158       ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
159       Subprocedure commit = Mockito
160       .spy(new SubprocedureImpl(procMembers.get(i).getFirst(), opName, cohortMonitor,
161           WAKE_FREQUENCY, TIMEOUT));
162       subprocs.add(commit);
163     }
164 
165     // link subprocedure to buildNewOperation invocation.
166     final AtomicInteger i = new AtomicInteger(0); // NOTE: would be racy if not an AtomicInteger
167     Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName),
168         (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
169       new Answer<Subprocedure>() {
170         @Override
171         public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
172           int index = i.getAndIncrement();
173           LOG.debug("Task size:" + subprocs.size() + ", getting:" + index);
174           Subprocedure commit = subprocs.get(index);
175           return commit;
176         }
177       });
178 
179     // setup spying on the coordinator
180 //    Procedure proc = Mockito.spy(procBuilder.createProcedure(coordinator, opName, data, expected));
181 //    Mockito.when(procBuilder.build(coordinator, opName, data, expected)).thenReturn(proc);
182 
183     // start running the operation
184     Procedure task = coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected);
185 //    assertEquals("Didn't mock coordinator task", proc, task);
186 
187     // verify all things ran as expected
188 //    waitAndVerifyProc(proc, once, once, never(), once, false);
189     waitAndVerifyProc(task, once, once, never(), once, false);
190     verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, never(), once, false);
191 
192     // close all the things
193     closeAll(coordinator, coordinatorComms, procMembers);
194   }
195 
196   /**
197    * Test a distributed commit with multiple cohort members, where one of the cohort members has a
198    * timeout exception during the prepare stage.
199    */
200   @Test
201   public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception {
202     String opDescription = "error injection coordination";
203     String[] cohortMembers = new String[] { "one", "two", "three" };
204     List<String> expected = Lists.newArrayList(cohortMembers);
205     // error constants
206     final int memberErrorIndex = 2;
207     final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1);
208 
209     // start running the coordinator and its controller
210     ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher();
211     ZKProcedureCoordinatorRpcs coordinatorController = new ZKProcedureCoordinatorRpcs(
212         coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
213     ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
214     ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool));
215 
216     // start a member for each node
217     SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
218     List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
219         expected.size());
220     for (String member : expected) {
221       ZooKeeperWatcher watcher = newZooKeeperWatcher();
222       ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription);
223       ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
224       ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
225       members.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(mem, controller));
226       controller.start(member, mem);
227     }
228 
229     // setup mock subprocedures
230     final List<Subprocedure> cohortTasks = new ArrayList<Subprocedure>();
231     final int[] elem = new int[1];
232     for (int i = 0; i < members.size(); i++) {
233       ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
234       final ProcedureMember comms = members.get(i).getFirst();
235       Subprocedure commit = Mockito
236       .spy(new SubprocedureImpl(comms, opName, cohortMonitor, WAKE_FREQUENCY, TIMEOUT));
237       // This nasty bit has one of the impls throw a TimeoutException
238       Mockito.doAnswer(new Answer<Void>() {
239         @Override
240         public Void answer(InvocationOnMock invocation) throws Throwable {
241           int index = elem[0];
242           if (index == memberErrorIndex) {
243             LOG.debug("Sending error to coordinator");
244             ForeignException remoteCause = new ForeignException("TIMER",
245                 new TimeoutException("subprocTimeout" , 1, 2, 0));
246             Subprocedure r = ((Subprocedure) invocation.getMock());
247             LOG.error("Remote commit failure, not propagating error:" + remoteCause);
248             comms.receiveAbortProcedure(r.getName(), remoteCause);
249             assertEquals(r.isComplete(), true);
250             // don't complete the error phase until the coordinator has gotten the error
251             // notification (which ensures that we never progress past prepare)
252             try {
253               Procedure.waitForLatch(coordinatorReceivedErrorLatch, new ForeignExceptionDispatcher(),
254                   WAKE_FREQUENCY, "coordinator received error");
255             } catch (InterruptedException e) {
256               LOG.debug("Wait for latch interrupted, done:" + (coordinatorReceivedErrorLatch.getCount() == 0));
257               // reset the interrupt status on the thread
258               Thread.currentThread().interrupt();
259             }
260           }
261           elem[0] = ++index;
262           return null;
263         }
264       }).when(commit).acquireBarrier();
265       cohortTasks.add(commit);
266     }
267 
268     // pass out a task per member
269     final int[] i = new int[] { 0 };
270     Mockito.when(
271       subprocFactory.buildSubprocedure(Mockito.eq(opName),
272         (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
273       new Answer<Subprocedure>() {
274         @Override
275         public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
276           int index = i[0];
277           Subprocedure commit = cohortTasks.get(index);
278           index++;
279           i[0] = index;
280           return commit;
281         }
282       });
283 
284     // setup spying on the coordinator
285     ForeignExceptionDispatcher coordinatorTaskErrorMonitor = Mockito
286         .spy(new ForeignExceptionDispatcher());
287     Procedure coordinatorTask = Mockito.spy(new Procedure(coordinator,
288         coordinatorTaskErrorMonitor, WAKE_FREQUENCY, TIMEOUT,
289         opName, data, expected));
290     when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(opName), eq(data), anyListOf(String.class)))
291       .thenReturn(coordinatorTask);
292     // count down the error latch when we get the remote error
293     Mockito.doAnswer(new Answer<Void>() {
294       @Override
295       public Void answer(InvocationOnMock invocation) throws Throwable {
296         // pass on the error to the master
297         invocation.callRealMethod();
298         // then count down the got error latch
299         coordinatorReceivedErrorLatch.countDown();
300         return null;
301       }
302     }).when(coordinatorTask).receive(Mockito.any(ForeignException.class));
303 
304     // ----------------------------
305     // start running the operation
306     // ----------------------------
307 
308     Procedure task = coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, expected);
309     assertEquals("Didn't mock coordinator task", coordinatorTask, task);
310 
311     // wait for the task to complete
312     try {
313       task.waitForCompleted();
314     } catch (ForeignException fe) {
315       // this may get caught or may not
316     }
317 
318     // -------------
319     // verification
320     // -------------
321 
322     // always expect prepared, never committed, and possible to have cleanup and finish (racy since
323     // error case)
324     waitAndVerifyProc(coordinatorTask, once, never(), once, atMost(1), true);
325     verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, never(), once,
326       once, true);
327 
328     // close all the open things
329     closeAll(coordinator, coordinatorController, members);
330   }
331 
332   /**
333    * Wait for the coordinator task to complete, and verify all the mocks
334    * @param task to wait on
335    * @throws Exception on unexpected failure
336    */
337   private void waitAndVerifyProc(Procedure proc, VerificationMode prepare,
338       VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
339       throws Exception {
340     boolean caughtError = false;
341     try {
342       proc.waitForCompleted();
343     } catch (ForeignException fe) {
344       caughtError = true;
345     }
346     // make sure that the task called all the expected phases
347     Mockito.verify(proc, prepare).sendGlobalBarrierStart();
348     Mockito.verify(proc, commit).sendGlobalBarrierReached();
349     Mockito.verify(proc, finish).sendGlobalBarrierComplete();
350     assertEquals("Operation error state was unexpected", opHasError, proc.getErrorMonitor()
351         .hasException());
352     assertEquals("Operation error state was unexpected", opHasError, caughtError);
353 
354   }
355 
356   /**
357    * Wait for the coordinator task to complete, and verify all the mocks
358    * @param task to wait on
359    * @throws Exception on unexpected failure
360    */
361   private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare,
362       VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
363       throws Exception {
364     boolean caughtError = false;
365     try {
366       op.waitForLocallyCompleted();
367     } catch (ForeignException fe) {
368       caughtError = true;
369     }
370     // make sure that the task called all the expected phases
371     Mockito.verify(op, prepare).acquireBarrier();
372     Mockito.verify(op, commit).insideBarrier();
373     // We cannot guarantee that cleanup has run so we don't check it.
374 
375     assertEquals("Operation error state was unexpected", opHasError, op.getErrorCheckable()
376         .hasException());
377     assertEquals("Operation error state was unexpected", opHasError, caughtError);
378 
379   }
380 
381   private void verifyCohortSuccessful(List<String> cohortNames,
382       SubprocedureFactory subprocFactory, Iterable<Subprocedure> cohortTasks,
383       VerificationMode prepare, VerificationMode commit, VerificationMode cleanup,
384       VerificationMode finish, boolean opHasError) throws Exception {
385 
386     // make sure we build the correct number of cohort members
387     Mockito.verify(subprocFactory, Mockito.times(cohortNames.size())).buildSubprocedure(
388       Mockito.eq(opName), (byte[]) Mockito.argThat(new ArrayEquals(data)));
389     // verify that we ran each of the operations cleanly
390     int j = 0;
391     for (Subprocedure op : cohortTasks) {
392       LOG.debug("Checking mock:" + (j++));
393       waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError);
394     }
395   }
396 
397   private void closeAll(
398       ProcedureCoordinator coordinator,
399       ZKProcedureCoordinatorRpcs coordinatorController,
400       List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort)
401       throws IOException {
402     // make sure we close all the resources
403     for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) {
404       member.getFirst().close();
405       member.getSecond().close();
406     }
407     coordinator.close();
408     coordinatorController.close();
409   }
410 }