1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import static org.junit.Assert.*;
22
23 import java.util.concurrent.atomic.AtomicBoolean;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.*;
29 import org.apache.hadoop.hbase.util.Bytes;
30 import org.junit.AfterClass;
31 import org.junit.BeforeClass;
32 import org.junit.Test;
33 import org.junit.experimental.categories.Category;
34
35
36
37 @Category(MediumTests.class)
38 public class TestZKLeaderManager {
39 private static Log LOG = LogFactory.getLog(TestZKLeaderManager.class);
40
41 private static final String LEADER_ZNODE =
42 "/test/" + TestZKLeaderManager.class.getSimpleName();
43
44 private static class MockAbortable implements Abortable {
45 private boolean aborted;
46
47 @Override
48 public void abort(String why, Throwable e) {
49 aborted = true;
50 LOG.fatal("Aborting during test: "+why, e);
51 fail("Aborted during test: " + why);
52 }
53
54 @Override
55 public boolean isAborted() {
56 return aborted;
57 }
58 }
59
60 private static class MockLeader extends Thread implements Stoppable {
61 private boolean stopped;
62 private ZooKeeperWatcher watcher;
63 private ZKLeaderManager zkLeader;
64 private AtomicBoolean master = new AtomicBoolean(false);
65 private int index;
66
67 public MockLeader(ZooKeeperWatcher watcher, int index) {
68 setDaemon(true);
69 setName("TestZKLeaderManager-leader-" + index);
70 this.index = index;
71 this.watcher = watcher;
72 this.zkLeader = new ZKLeaderManager(watcher, LEADER_ZNODE,
73 Bytes.toBytes(index), this);
74 }
75
76 public boolean isMaster() {
77 return master.get();
78 }
79
80 public int getIndex() {
81 return index;
82 }
83
84 public ZooKeeperWatcher getWatcher() {
85 return watcher;
86 }
87
88 public void run() {
89 while (!stopped) {
90 zkLeader.start();
91 zkLeader.waitToBecomeLeader();
92 master.set(true);
93
94 while (master.get() && !stopped) {
95 try {
96 Thread.sleep(10);
97 } catch (InterruptedException ignored) {}
98 }
99 }
100 }
101
102 public void abdicate() {
103 zkLeader.stepDownAsLeader();
104 master.set(false);
105 }
106
107 @Override
108 public void stop(String why) {
109 stopped = true;
110 abdicate();
111 watcher.close();
112 }
113
114 @Override
115 public boolean isStopped() {
116 return stopped;
117 }
118 }
119
120 private static HBaseTestingUtility TEST_UTIL;
121 private static MockLeader[] CANDIDATES;
122
123 @BeforeClass
124 public static void setupBeforeClass() throws Exception {
125 TEST_UTIL = new HBaseTestingUtility();
126 TEST_UTIL.startMiniZKCluster();
127 Configuration conf = TEST_UTIL.getConfiguration();
128
129
130 MockAbortable abortable = new MockAbortable();
131 CANDIDATES = new MockLeader[3];
132 for (int i = 0; i < 3; i++) {
133 ZooKeeperWatcher watcher = newZK(conf, "server"+i, abortable);
134 CANDIDATES[i] = new MockLeader(watcher, i);
135 CANDIDATES[i].start();
136 }
137 }
138
139 @AfterClass
140 public static void tearDownAfterClass() throws Exception {
141 TEST_UTIL.shutdownMiniZKCluster();
142 }
143
144 @Test
145 public void testLeaderSelection() throws Exception {
146 MockLeader currentLeader = getCurrentLeader();
147
148 assertNotNull("Leader should exist", currentLeader);
149 LOG.debug("Current leader index is "+currentLeader.getIndex());
150
151 byte[] znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
152 assertNotNull("Leader znode should contain leader index", znodeData);
153 assertTrue("Leader znode should not be empty", znodeData.length > 0);
154 int storedIndex = Bytes.toInt(znodeData);
155 LOG.debug("Stored leader index in ZK is "+storedIndex);
156 assertEquals("Leader znode should match leader index",
157 currentLeader.getIndex(), storedIndex);
158
159
160 currentLeader.abdicate();
161 assertFalse(currentLeader.isMaster());
162
163
164 currentLeader = getCurrentLeader();
165
166 assertNotNull("New leader should exist after abdication", currentLeader);
167 LOG.debug("New leader index is "+currentLeader.getIndex());
168
169 znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
170 assertNotNull("Leader znode should contain leader index", znodeData);
171 assertTrue("Leader znode should not be empty", znodeData.length > 0);
172 storedIndex = Bytes.toInt(znodeData);
173 LOG.debug("Stored leader index in ZK is "+storedIndex);
174 assertEquals("Leader znode should match leader index",
175 currentLeader.getIndex(), storedIndex);
176
177
178 currentLeader.stop("Stopping for test");
179 assertFalse(currentLeader.isMaster());
180
181
182 currentLeader = getCurrentLeader();
183
184 assertNotNull("New leader should exist after stop", currentLeader);
185 LOG.debug("New leader index is "+currentLeader.getIndex());
186
187 znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
188 assertNotNull("Leader znode should contain leader index", znodeData);
189 assertTrue("Leader znode should not be empty", znodeData.length > 0);
190 storedIndex = Bytes.toInt(znodeData);
191 LOG.debug("Stored leader index in ZK is "+storedIndex);
192 assertEquals("Leader znode should match leader index",
193 currentLeader.getIndex(), storedIndex);
194
195
196 currentLeader.stop("Stopping for test");
197 assertFalse(currentLeader.isMaster());
198
199
200 currentLeader = getCurrentLeader();
201 assertNotNull("New leader should exist", currentLeader);
202 }
203
204 private MockLeader getCurrentLeader() throws Exception {
205 MockLeader currentLeader = null;
206 outer:
207
208 for (int i = 0; i < 1000; i++) {
209 for (int j = 0; j < CANDIDATES.length; j++) {
210 if (CANDIDATES[j].isMaster()) {
211
212 if (currentLeader != null) {
213 fail("Both candidate "+currentLeader.getIndex()+" and "+j+" claim to be leader!");
214 }
215 currentLeader = CANDIDATES[j];
216 }
217 }
218 if (currentLeader != null) {
219 break outer;
220 }
221 Thread.sleep(10);
222 }
223 return currentLeader;
224 }
225
226 private static ZooKeeperWatcher newZK(Configuration conf, String name,
227 Abortable abort) throws Exception {
228 Configuration copy = HBaseConfiguration.create(conf);
229 ZooKeeperWatcher zk = new ZooKeeperWatcher(copy, name, abort);
230 return zk;
231 }
232
233 }
234