View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import static org.junit.Assert.*;
22  
23  import java.util.concurrent.atomic.AtomicBoolean;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.*;
29  import org.apache.hadoop.hbase.util.Bytes;
30  import org.junit.AfterClass;
31  import org.junit.BeforeClass;
32  import org.junit.Test;
33  import org.junit.experimental.categories.Category;
34  
35  /**
36   */
37  @Category(MediumTests.class)
38  public class TestZKLeaderManager {
39    private static Log LOG = LogFactory.getLog(TestZKLeaderManager.class);
40  
41    private static final String LEADER_ZNODE =
42        "/test/" + TestZKLeaderManager.class.getSimpleName();
43  
44    private static class MockAbortable implements Abortable {
45      private boolean aborted;
46  
47      @Override
48      public void abort(String why, Throwable e) {
49        aborted = true;
50        LOG.fatal("Aborting during test: "+why, e);
51        fail("Aborted during test: " + why);
52      }
53  
54      @Override
55      public boolean isAborted() {
56        return aborted;
57      }
58    }
59  
60    private static class MockLeader extends Thread implements Stoppable {
61      private boolean stopped;
62      private ZooKeeperWatcher watcher;
63      private ZKLeaderManager zkLeader;
64      private AtomicBoolean master = new AtomicBoolean(false);
65      private int index;
66  
67      public MockLeader(ZooKeeperWatcher watcher, int index) {
68        setDaemon(true);
69        setName("TestZKLeaderManager-leader-" + index);
70        this.index = index;
71        this.watcher = watcher;
72        this.zkLeader = new ZKLeaderManager(watcher, LEADER_ZNODE,
73            Bytes.toBytes(index), this);
74      }
75  
76      public boolean isMaster() {
77        return master.get();
78      }
79  
80      public int getIndex() {
81        return index;
82      }
83  
84      public ZooKeeperWatcher getWatcher() {
85        return watcher;
86      }
87  
88      public void run() {
89        while (!stopped) {
90          zkLeader.start();
91          zkLeader.waitToBecomeLeader();
92          master.set(true);
93  
94          while (master.get() && !stopped) {
95            try {
96              Thread.sleep(10);
97            } catch (InterruptedException ignored) {}
98          }
99        }
100     }
101 
102     public void abdicate() {
103       zkLeader.stepDownAsLeader();
104       master.set(false);
105     }
106 
107     @Override
108     public void stop(String why) {
109       stopped = true;
110       abdicate();
111       watcher.close();
112     }
113 
114     @Override
115     public boolean isStopped() {
116       return stopped;
117     }
118   }
119 
120   private static HBaseTestingUtility TEST_UTIL;
121   private static MockLeader[] CANDIDATES;
122 
123   @BeforeClass
124   public static void setupBeforeClass() throws Exception {
125     TEST_UTIL = new HBaseTestingUtility();
126     TEST_UTIL.startMiniZKCluster();
127     Configuration conf = TEST_UTIL.getConfiguration();
128 
129     // use an abortable to fail the test in the case of any KeeperExceptions
130     MockAbortable abortable = new MockAbortable();
131     CANDIDATES = new MockLeader[3];
132     for (int i = 0; i < 3; i++) {
133       ZooKeeperWatcher watcher = newZK(conf, "server"+i, abortable);
134       CANDIDATES[i] = new MockLeader(watcher, i);
135       CANDIDATES[i].start();
136     }
137   }
138 
139   @AfterClass
140   public static void tearDownAfterClass() throws Exception {
141     TEST_UTIL.shutdownMiniZKCluster();
142   }
143 
144   @Test
145   public void testLeaderSelection() throws Exception {
146     MockLeader currentLeader = getCurrentLeader();
147     // one leader should have been found
148     assertNotNull("Leader should exist", currentLeader);
149     LOG.debug("Current leader index is "+currentLeader.getIndex());
150 
151     byte[] znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
152     assertNotNull("Leader znode should contain leader index", znodeData);
153     assertTrue("Leader znode should not be empty", znodeData.length > 0);
154     int storedIndex = Bytes.toInt(znodeData);
155     LOG.debug("Stored leader index in ZK is "+storedIndex);
156     assertEquals("Leader znode should match leader index",
157         currentLeader.getIndex(), storedIndex);
158 
159     // force a leader transition
160     currentLeader.abdicate();
161     assertFalse(currentLeader.isMaster());
162 
163     // check for new leader
164     currentLeader = getCurrentLeader();
165     // one leader should have been found
166     assertNotNull("New leader should exist after abdication", currentLeader);
167     LOG.debug("New leader index is "+currentLeader.getIndex());
168 
169     znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
170     assertNotNull("Leader znode should contain leader index", znodeData);
171     assertTrue("Leader znode should not be empty", znodeData.length > 0);
172     storedIndex = Bytes.toInt(znodeData);
173     LOG.debug("Stored leader index in ZK is "+storedIndex);
174     assertEquals("Leader znode should match leader index",
175         currentLeader.getIndex(), storedIndex);
176 
177     // force another transition by stopping the current
178     currentLeader.stop("Stopping for test");
179     assertFalse(currentLeader.isMaster());
180 
181     // check for new leader
182     currentLeader = getCurrentLeader();
183     // one leader should have been found
184     assertNotNull("New leader should exist after stop", currentLeader);
185     LOG.debug("New leader index is "+currentLeader.getIndex());
186 
187     znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
188     assertNotNull("Leader znode should contain leader index", znodeData);
189     assertTrue("Leader znode should not be empty", znodeData.length > 0);
190     storedIndex = Bytes.toInt(znodeData);
191     LOG.debug("Stored leader index in ZK is "+storedIndex);
192     assertEquals("Leader znode should match leader index",
193         currentLeader.getIndex(), storedIndex);
194 
195     // with a second stop we can guarantee that a previous leader has resumed leading
196     currentLeader.stop("Stopping for test");
197     assertFalse(currentLeader.isMaster());
198 
199     // check for new
200     currentLeader = getCurrentLeader();
201     assertNotNull("New leader should exist", currentLeader);
202   }
203 
204   private MockLeader getCurrentLeader() throws Exception {
205     MockLeader currentLeader = null;
206     outer:
207     // Wait up to 10 secs for initial leader
208     for (int i = 0; i < 1000; i++) {
209       for (int j = 0; j < CANDIDATES.length; j++) {
210         if (CANDIDATES[j].isMaster()) {
211           // should only be one leader
212           if (currentLeader != null) {
213             fail("Both candidate "+currentLeader.getIndex()+" and "+j+" claim to be leader!");
214           }
215           currentLeader = CANDIDATES[j];
216         }
217       }
218       if (currentLeader != null) {
219         break outer;
220       }
221       Thread.sleep(10);
222     }
223     return currentLeader;
224   }
225 
226   private static ZooKeeperWatcher newZK(Configuration conf, String name,
227       Abortable abort) throws Exception {
228     Configuration copy = HBaseConfiguration.create(conf);
229     ZooKeeperWatcher zk = new ZooKeeperWatcher(copy, name, abort);
230     return zk;
231   }
232 
233 }
234