1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.assertTrue;
26
27 import java.io.IOException;
28 import java.util.Random;
29 import java.util.concurrent.Semaphore;
30
31 import junit.framework.Assert;
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.*;
35 import org.apache.hadoop.hbase.master.TestActiveMasterManager.NodeDeletionListener;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.Threads;
38 import org.apache.zookeeper.CreateMode;
39 import org.apache.zookeeper.WatchedEvent;
40 import org.apache.zookeeper.Watcher;
41 import org.apache.zookeeper.ZooDefs.Ids;
42 import org.apache.zookeeper.ZooKeeper;
43 import org.junit.AfterClass;
44 import org.junit.BeforeClass;
45 import org.junit.Test;
46 import org.junit.experimental.categories.Category;
47
48 @Category(MediumTests.class)
49 public class TestZooKeeperNodeTracker {
50 private static final Log LOG = LogFactory.getLog(TestZooKeeperNodeTracker.class);
51 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
52
53 private final static Random rand = new Random();
54
55 @BeforeClass
56 public static void setUpBeforeClass() throws Exception {
57 TEST_UTIL.startMiniZKCluster();
58 }
59
60 @AfterClass
61 public static void tearDownAfterClass() throws Exception {
62 TEST_UTIL.shutdownMiniZKCluster();
63 }
64
65
66
67
68
69
70 @Test public void testInterruptible() throws IOException, InterruptedException {
71 Abortable abortable = new StubAbortable();
72 ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
73 "testInterruptible", abortable);
74 final TestTracker tracker = new TestTracker(zk, "/xyz", abortable);
75 tracker.start();
76 Thread t = new Thread() {
77 @Override
78 public void run() {
79 try {
80 tracker.blockUntilAvailable();
81 } catch (InterruptedException e) {
82 throw new RuntimeException("Interrupted", e);
83 }
84 }
85 };
86 t.start();
87 while (!t.isAlive()) Threads.sleep(1);
88 tracker.stop();
89 t.join();
90
91 }
92
93 @Test
94 public void testNodeTracker() throws Exception {
95 Abortable abortable = new StubAbortable();
96 ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
97 "testNodeTracker", abortable);
98 ZKUtil.createAndFailSilent(zk, zk.baseZNode);
99
100 final String node =
101 ZKUtil.joinZNode(zk.baseZNode, new Long(rand.nextLong()).toString());
102
103 final byte [] dataOne = Bytes.toBytes("dataOne");
104 final byte [] dataTwo = Bytes.toBytes("dataTwo");
105
106
107 TestTracker localTracker = new TestTracker(zk, node, abortable);
108 localTracker.start();
109 zk.registerListener(localTracker);
110
111
112 assertNull(localTracker.getData(false));
113
114
115 WaitToGetDataThread thread = new WaitToGetDataThread(zk, node);
116 thread.start();
117
118
119 assertFalse(thread.hasData);
120
121
122 TestTracker secondTracker = new TestTracker(zk, node, null);
123 secondTracker.start();
124 zk.registerListener(secondTracker);
125
126
127 TestingZKListener zkListener = new TestingZKListener(zk, node);
128 zk.registerListener(zkListener);
129 assertEquals(0, zkListener.createdLock.availablePermits());
130
131
132
133 final ZooKeeper zkconn = new ZooKeeper(
134 ZKConfig.getZKQuorumServersString(TEST_UTIL.getConfiguration()), 60000,
135 new StubWatcher());
136
137
138 zkconn.create(node, dataOne, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
139
140
141 zkListener.waitForCreation();
142 thread.join();
143
144
145 assertNotNull(localTracker.getData(false));
146 assertNotNull(localTracker.blockUntilAvailable());
147 assertTrue(Bytes.equals(localTracker.getData(false), dataOne));
148 assertTrue(thread.hasData);
149 assertTrue(Bytes.equals(thread.tracker.getData(false), dataOne));
150 LOG.info("Successfully got data one");
151
152
153 assertNotNull(secondTracker.getData(false));
154 assertNotNull(secondTracker.blockUntilAvailable());
155 assertTrue(Bytes.equals(secondTracker.getData(false), dataOne));
156 LOG.info("Successfully got data one with the second tracker");
157
158
159 zkconn.delete(node, -1);
160 zkListener.waitForDeletion();
161
162
163 TestTracker threadTracker = thread.tracker;
164 thread = new WaitToGetDataThread(zk, node, threadTracker);
165 thread.start();
166
167
168 assertFalse(thread.hasData);
169 assertNull(secondTracker.getData(false));
170 assertNull(localTracker.getData(false));
171 LOG.info("Successfully made unavailable");
172
173
174 zkconn.create(node, dataTwo, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
175
176
177 zkListener.waitForCreation();
178 thread.join();
179
180
181 assertNotNull(localTracker.getData(false));
182 assertNotNull(localTracker.blockUntilAvailable());
183 assertTrue(Bytes.equals(localTracker.getData(false), dataTwo));
184 assertNotNull(secondTracker.getData(false));
185 assertNotNull(secondTracker.blockUntilAvailable());
186 assertTrue(Bytes.equals(secondTracker.getData(false), dataTwo));
187 assertTrue(thread.hasData);
188 assertTrue(Bytes.equals(thread.tracker.getData(false), dataTwo));
189 LOG.info("Successfully got data two on all trackers and threads");
190
191
192 zkconn.setData(node, dataOne, -1);
193
194
195 zkListener.waitForDataChange();
196
197
198 assertNotNull(localTracker.getData(false));
199 assertNotNull(localTracker.blockUntilAvailable());
200 assertTrue(Bytes.equals(localTracker.getData(false), dataOne));
201 assertNotNull(secondTracker.getData(false));
202 assertNotNull(secondTracker.blockUntilAvailable());
203 assertTrue(Bytes.equals(secondTracker.getData(false), dataOne));
204 assertTrue(thread.hasData);
205 assertTrue(Bytes.equals(thread.tracker.getData(false), dataOne));
206 LOG.info("Successfully got data one following a data change on all trackers and threads");
207 }
208
209 public static class WaitToGetDataThread extends Thread {
210
211 TestTracker tracker;
212 boolean hasData;
213
214 public WaitToGetDataThread(ZooKeeperWatcher zk, String node) {
215 tracker = new TestTracker(zk, node, null);
216 tracker.start();
217 zk.registerListener(tracker);
218 hasData = false;
219 }
220
221 public WaitToGetDataThread(ZooKeeperWatcher zk, String node,
222 TestTracker tracker) {
223 this.tracker = tracker;
224 hasData = false;
225 }
226
227 @Override
228 public void run() {
229 LOG.info("Waiting for data to be available in WaitToGetDataThread");
230 try {
231 tracker.blockUntilAvailable();
232 } catch (InterruptedException e) {
233 e.printStackTrace();
234 }
235 LOG.info("Data now available in tracker from WaitToGetDataThread");
236 hasData = true;
237 }
238 }
239
240 public static class TestTracker extends ZooKeeperNodeTracker {
241 public TestTracker(ZooKeeperWatcher watcher, String node,
242 Abortable abortable) {
243 super(watcher, node, abortable);
244 }
245 }
246
247 public static class TestingZKListener extends ZooKeeperListener {
248 private static final Log LOG = LogFactory.getLog(NodeDeletionListener.class);
249
250 private Semaphore deletedLock;
251 private Semaphore createdLock;
252 private Semaphore changedLock;
253 private String node;
254
255 public TestingZKListener(ZooKeeperWatcher watcher, String node) {
256 super(watcher);
257 deletedLock = new Semaphore(0);
258 createdLock = new Semaphore(0);
259 changedLock = new Semaphore(0);
260 this.node = node;
261 }
262
263 @Override
264 public void nodeDeleted(String path) {
265 if(path.equals(node)) {
266 LOG.debug("nodeDeleted(" + path + ")");
267 deletedLock.release();
268 }
269 }
270
271 @Override
272 public void nodeCreated(String path) {
273 if(path.equals(node)) {
274 LOG.debug("nodeCreated(" + path + ")");
275 createdLock.release();
276 }
277 }
278
279 @Override
280 public void nodeDataChanged(String path) {
281 if(path.equals(node)) {
282 LOG.debug("nodeDataChanged(" + path + ")");
283 changedLock.release();
284 }
285 }
286
287 public void waitForDeletion() throws InterruptedException {
288 deletedLock.acquire();
289 }
290
291 public void waitForCreation() throws InterruptedException {
292 createdLock.acquire();
293 }
294
295 public void waitForDataChange() throws InterruptedException {
296 changedLock.acquire();
297 }
298 }
299
300 public static class StubAbortable implements Abortable {
301 @Override
302 public void abort(final String msg, final Throwable t) {}
303
304 @Override
305 public boolean isAborted() {
306 return false;
307 }
308
309 }
310
311 public static class StubWatcher implements Watcher {
312 @Override
313 public void process(WatchedEvent event) {}
314 }
315
316 @Test
317 public void testCleanZNode() throws Exception {
318 ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
319 "testNodeTracker", new TestZooKeeperNodeTracker.StubAbortable());
320
321 final ServerName sn = ServerName.valueOf("127.0.0.1:52", 45L);
322
323 ZKUtil.createAndFailSilent(zkw,
324 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT,
325 HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT));
326
327 final String nodeName = zkw.getMasterAddressZNode();
328
329
330 ZKUtil.createAndFailSilent(zkw, nodeName);
331 MasterAddressTracker.deleteIfEquals(zkw, sn.toString());
332 Assert.assertFalse(ZKUtil.getData(zkw, nodeName) == null);
333
334
335 ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn));
336 MasterAddressTracker.deleteIfEquals(zkw, ServerName.valueOf("127.0.0.2:52", 45L).toString());
337 Assert.assertFalse(ZKUtil.getData(zkw, nodeName) == null);
338
339
340 ZKUtil.setData(zkw, nodeName,MasterAddressTracker.toByteArray(sn));
341 MasterAddressTracker.deleteIfEquals(zkw, sn.toString());
342 Assert.assertTrue( ZKUtil.getData(zkw, nodeName)== null );
343
344
345 MasterAddressTracker.deleteIfEquals(zkw, sn.toString());
346 }
347
348 }
349