1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.atomic.AtomicInteger;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.ClusterId;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.MediumTests;
32 import org.apache.hadoop.hbase.Server;
33 import org.apache.hadoop.hbase.ServerName;
34 import org.apache.hadoop.hbase.catalog.CatalogTracker;
35 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
36 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
37 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
38 import org.junit.AfterClass;
39 import org.junit.Test;
40 import org.junit.Ignore;
41
42 import static org.junit.Assert.*;
43
44 import org.junit.Before;
45 import org.junit.BeforeClass;
46 import org.junit.experimental.categories.Category;
47
48
49
50
51
52
53
54
55 @Category(MediumTests.class)
56 public class TestReplicationTrackerZKImpl {
57
58 private static final Log LOG = LogFactory.getLog(TestReplicationTrackerZKImpl.class);
59
60 private static Configuration conf;
61 private static HBaseTestingUtility utility;
62
63
64 private ZooKeeperWatcher zkw;
65 private ReplicationPeers rp;
66 private ReplicationTracker rt;
67 private AtomicInteger rsRemovedCount;
68 private String rsRemovedData;
69 private AtomicInteger plChangedCount;
70 private List<String> plChangedData;
71 private AtomicInteger peerRemovedCount;
72 private String peerRemovedData;
73
74 @BeforeClass
75 public static void setUpBeforeClass() throws Exception {
76 utility = new HBaseTestingUtility();
77 utility.startMiniZKCluster();
78 conf = utility.getConfiguration();
79 ZooKeeperWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility);
80 ZKUtil.createWithParents(zk, zk.rsZNode);
81 }
82
83 @Before
84 public void setUp() throws Exception {
85 zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
86 String fakeRs1 = ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234");
87 try {
88 ZKClusterId.setClusterId(zkw, new ClusterId());
89 rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
90 rp.init();
91 rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1));
92 } catch (Exception e) {
93 fail("Exception during test setup: " + e);
94 }
95 rsRemovedCount = new AtomicInteger(0);
96 rsRemovedData = "";
97 plChangedCount = new AtomicInteger(0);
98 plChangedData = new ArrayList<String>();
99 peerRemovedCount = new AtomicInteger(0);
100 peerRemovedData = "";
101 }
102
103 @AfterClass
104 public static void tearDownAfterClass() throws Exception {
105 utility.shutdownMiniZKCluster();
106 }
107
108 @Test
109 public void testGetListOfRegionServers() throws Exception {
110
111 assertEquals(0, rt.getListOfRegionServers().size());
112
113
114 ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234"));
115 assertEquals(1, rt.getListOfRegionServers().size());
116
117
118 ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
119 assertEquals(2, rt.getListOfRegionServers().size());
120
121
122 ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
123 assertEquals(1, rt.getListOfRegionServers().size());
124
125
126 ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234"));
127 assertEquals(0, rt.getListOfRegionServers().size());
128 }
129
130 @Test(timeout = 30000)
131 public void testRegionServerRemovedEvent() throws Exception {
132 ZKUtil.createAndWatch(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"),
133 HConstants.EMPTY_BYTE_ARRAY);
134 rt.registerListener(new DummyReplicationListener());
135
136 ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
137
138 while (rsRemovedCount.get() < 1) {
139 Thread.sleep(5);
140 }
141 assertEquals("hostname2.example.org:1234", rsRemovedData);
142 }
143
144 @Ignore ("Flakey") @Test(timeout = 30000)
145 public void testPeerRemovedEvent() throws Exception {
146 rp.addPeer("5", utility.getClusterKey());
147 rt.registerListener(new DummyReplicationListener());
148 rp.removePeer("5");
149
150 while (peerRemovedCount.get() < 1) {
151 Thread.sleep(5);
152 }
153 assertEquals("5", peerRemovedData);
154 }
155
156 @Ignore ("Flakey") @Test(timeout = 30000)
157 public void testPeerListChangedEvent() throws Exception {
158
159 rp.addPeer("5", utility.getClusterKey());
160 zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true);
161 rt.registerListener(new DummyReplicationListener());
162 rp.disablePeer("5");
163 ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5/peer-state");
164
165 int tmp = plChangedCount.get();
166 while (plChangedCount.get() <= tmp) {
167 Thread.sleep(5);
168 }
169 assertEquals(1, plChangedData.size());
170 assertTrue(plChangedData.contains("5"));
171
172
173 ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5");
174 }
175
176 private class DummyReplicationListener implements ReplicationListener {
177
178 @Override
179 public void regionServerRemoved(String regionServer) {
180 rsRemovedData = regionServer;
181 rsRemovedCount.getAndIncrement();
182 LOG.debug("Received regionServerRemoved event: " + regionServer);
183 }
184
185 @Override
186 public void peerRemoved(String peerId) {
187 peerRemovedData = peerId;
188 peerRemovedCount.getAndIncrement();
189 LOG.debug("Received peerRemoved event: " + peerId);
190 }
191
192 @Override
193 public void peerListChanged(List<String> peerIds) {
194 plChangedData.clear();
195 plChangedData.addAll(peerIds);
196 plChangedCount.getAndIncrement();
197 LOG.debug("Received peerListChanged event");
198 }
199 }
200
201 private class DummyServer implements Server {
202 private String serverName;
203 private boolean isAborted = false;
204 private boolean isStopped = false;
205
206 public DummyServer(String serverName) {
207 this.serverName = serverName;
208 }
209
210 @Override
211 public Configuration getConfiguration() {
212 return conf;
213 }
214
215 @Override
216 public ZooKeeperWatcher getZooKeeper() {
217 return zkw;
218 }
219
220 @Override
221 public CatalogTracker getCatalogTracker() {
222 return null;
223 }
224
225 @Override
226 public ServerName getServerName() {
227 return ServerName.valueOf(this.serverName);
228 }
229
230 @Override
231 public void abort(String why, Throwable e) {
232 LOG.info("Aborting " + serverName);
233 this.isAborted = true;
234 }
235
236 @Override
237 public boolean isAborted() {
238 return this.isAborted;
239 }
240
241 @Override
242 public void stop(String why) {
243 this.isStopped = true;
244 }
245
246 @Override
247 public boolean isStopped() {
248 return this.isStopped;
249 }
250 }
251 }
252