View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import static org.junit.Assert.*;
22  
23  import java.util.concurrent.atomic.AtomicBoolean;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.*;
29  import org.apache.hadoop.hbase.testclassification.MediumTests;
30  import org.apache.hadoop.hbase.util.Bytes;
31  import org.junit.AfterClass;
32  import org.junit.BeforeClass;
33  import org.junit.Test;
34  import org.junit.experimental.categories.Category;
35  
36  /**
37   */
38  @Category(MediumTests.class)
39  public class TestZKLeaderManager {
40    private static final Log LOG = LogFactory.getLog(TestZKLeaderManager.class);
41  
42    private static final String LEADER_ZNODE =
43        "/test/" + TestZKLeaderManager.class.getSimpleName();
44  
45    private static class MockAbortable implements Abortable {
46      private boolean aborted;
47  
48      @Override
49      public void abort(String why, Throwable e) {
50        aborted = true;
51        LOG.fatal("Aborting during test: "+why, e);
52        fail("Aborted during test: " + why);
53      }
54  
55      @Override
56      public boolean isAborted() {
57        return aborted;
58      }
59    }
60  
61    private static class MockLeader extends Thread implements Stoppable {
62      private boolean stopped;
63      private ZooKeeperWatcher watcher;
64      private ZKLeaderManager zkLeader;
65      private AtomicBoolean master = new AtomicBoolean(false);
66      private int index;
67  
68      public MockLeader(ZooKeeperWatcher watcher, int index) {
69        setDaemon(true);
70        setName("TestZKLeaderManager-leader-" + index);
71        this.index = index;
72        this.watcher = watcher;
73        this.zkLeader = new ZKLeaderManager(watcher, LEADER_ZNODE,
74            Bytes.toBytes(index), this);
75      }
76  
77      public boolean isMaster() {
78        return master.get();
79      }
80  
81      public int getIndex() {
82        return index;
83      }
84  
85      public ZooKeeperWatcher getWatcher() {
86        return watcher;
87      }
88  
89      public void run() {
90        while (!stopped) {
91          zkLeader.start();
92          zkLeader.waitToBecomeLeader();
93          master.set(true);
94  
95          while (master.get() && !stopped) {
96            try {
97              Thread.sleep(10);
98            } catch (InterruptedException ignored) {}
99          }
100       }
101     }
102 
103     public void abdicate() {
104       zkLeader.stepDownAsLeader();
105       master.set(false);
106     }
107 
108     @Override
109     public void stop(String why) {
110       stopped = true;
111       abdicate();
112       watcher.close();
113     }
114 
115     @Override
116     public boolean isStopped() {
117       return stopped;
118     }
119   }
120 
121   private static HBaseTestingUtility TEST_UTIL;
122   private static MockLeader[] CANDIDATES;
123 
124   @BeforeClass
125   public static void setupBeforeClass() throws Exception {
126     TEST_UTIL = new HBaseTestingUtility();
127     TEST_UTIL.startMiniZKCluster();
128     Configuration conf = TEST_UTIL.getConfiguration();
129 
130     // use an abortable to fail the test in the case of any KeeperExceptions
131     MockAbortable abortable = new MockAbortable();
132     CANDIDATES = new MockLeader[3];
133     for (int i = 0; i < 3; i++) {
134       ZooKeeperWatcher watcher = newZK(conf, "server"+i, abortable);
135       CANDIDATES[i] = new MockLeader(watcher, i);
136       CANDIDATES[i].start();
137     }
138   }
139 
140   @AfterClass
141   public static void tearDownAfterClass() throws Exception {
142     TEST_UTIL.shutdownMiniZKCluster();
143   }
144 
145   @Test
146   public void testLeaderSelection() throws Exception {
147     MockLeader currentLeader = getCurrentLeader();
148     // one leader should have been found
149     assertNotNull("Leader should exist", currentLeader);
150     LOG.debug("Current leader index is "+currentLeader.getIndex());
151 
152     byte[] znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
153     assertNotNull("Leader znode should contain leader index", znodeData);
154     assertTrue("Leader znode should not be empty", znodeData.length > 0);
155     int storedIndex = Bytes.toInt(znodeData);
156     LOG.debug("Stored leader index in ZK is "+storedIndex);
157     assertEquals("Leader znode should match leader index",
158         currentLeader.getIndex(), storedIndex);
159 
160     // force a leader transition
161     currentLeader.abdicate();
162     assertFalse(currentLeader.isMaster());
163 
164     // check for new leader
165     currentLeader = getCurrentLeader();
166     // one leader should have been found
167     assertNotNull("New leader should exist after abdication", currentLeader);
168     LOG.debug("New leader index is "+currentLeader.getIndex());
169 
170     znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
171     assertNotNull("Leader znode should contain leader index", znodeData);
172     assertTrue("Leader znode should not be empty", znodeData.length > 0);
173     storedIndex = Bytes.toInt(znodeData);
174     LOG.debug("Stored leader index in ZK is "+storedIndex);
175     assertEquals("Leader znode should match leader index",
176         currentLeader.getIndex(), storedIndex);
177 
178     // force another transition by stopping the current
179     currentLeader.stop("Stopping for test");
180     assertFalse(currentLeader.isMaster());
181 
182     // check for new leader
183     currentLeader = getCurrentLeader();
184     // one leader should have been found
185     assertNotNull("New leader should exist after stop", currentLeader);
186     LOG.debug("New leader index is "+currentLeader.getIndex());
187 
188     znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
189     assertNotNull("Leader znode should contain leader index", znodeData);
190     assertTrue("Leader znode should not be empty", znodeData.length > 0);
191     storedIndex = Bytes.toInt(znodeData);
192     LOG.debug("Stored leader index in ZK is "+storedIndex);
193     assertEquals("Leader znode should match leader index",
194         currentLeader.getIndex(), storedIndex);
195 
196     // with a second stop we can guarantee that a previous leader has resumed leading
197     currentLeader.stop("Stopping for test");
198     assertFalse(currentLeader.isMaster());
199 
200     // check for new
201     currentLeader = getCurrentLeader();
202     assertNotNull("New leader should exist", currentLeader);
203   }
204 
205   private MockLeader getCurrentLeader() throws Exception {
206     MockLeader currentLeader = null;
207     outer:
208     // Wait up to 10 secs for initial leader
209     for (int i = 0; i < 1000; i++) {
210       for (int j = 0; j < CANDIDATES.length; j++) {
211         if (CANDIDATES[j].isMaster()) {
212           // should only be one leader
213           if (currentLeader != null) {
214             fail("Both candidate "+currentLeader.getIndex()+" and "+j+" claim to be leader!");
215           }
216           currentLeader = CANDIDATES[j];
217         }
218       }
219       if (currentLeader != null) {
220         break outer;
221       }
222       Thread.sleep(10);
223     }
224     return currentLeader;
225   }
226 
227   private static ZooKeeperWatcher newZK(Configuration conf, String name,
228       Abortable abort) throws Exception {
229     Configuration copy = HBaseConfiguration.create(conf);
230     ZooKeeperWatcher zk = new ZooKeeperWatcher(copy, name, abort);
231     return zk;
232   }
233 
234 }
235