View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  import static org.junit.Assert.fail;
24  
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.concurrent.atomic.AtomicInteger;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.ChoreService;
33  import org.apache.hadoop.hbase.ClusterId;
34  import org.apache.hadoop.hbase.CoordinatedStateManager;
35  import org.apache.hadoop.hbase.HBaseTestingUtility;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.testclassification.MediumTests;
38  import org.apache.hadoop.hbase.Server;
39  import org.apache.hadoop.hbase.ServerName;
40  import org.apache.hadoop.hbase.client.ClusterConnection;
41  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
42  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
43  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
44  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
45  import org.junit.AfterClass;
46  import org.junit.Before;
47  import org.junit.BeforeClass;
48  import org.junit.Test;
49  import org.junit.experimental.categories.Category;
50  
51  /**
52   * This class tests the ReplicationTrackerZKImpl class and ReplicationListener interface. One
53   * MiniZKCluster is used throughout the entire class. The cluster is initialized with the creation
54   * of the rsZNode. All other znode creation/initialization is handled by the replication state
55   * interfaces (i.e. ReplicationPeers, etc.). Each test case in this class should ensure that the
56   * MiniZKCluster is cleaned and returned to it's initial state (i.e. nothing but the rsZNode).
57   */
58  @Category(MediumTests.class)
59  public class TestReplicationTrackerZKImpl {
60  
61    private static final Log LOG = LogFactory.getLog(TestReplicationTrackerZKImpl.class);
62  
63    private static Configuration conf;
64    private static HBaseTestingUtility utility;
65  
66    // Each one of the below variables are reinitialized before every test case
67    private ZooKeeperWatcher zkw;
68    private ReplicationPeers rp;
69    private ReplicationTracker rt;
70    private AtomicInteger rsRemovedCount;
71    private String rsRemovedData;
72    private AtomicInteger plChangedCount;
73    private List<String> plChangedData;
74    private AtomicInteger peerRemovedCount;
75    private String peerRemovedData;
76  
77    @BeforeClass
78    public static void setUpBeforeClass() throws Exception {
79      utility = new HBaseTestingUtility();
80      utility.startMiniZKCluster();
81      conf = utility.getConfiguration();
82      ZooKeeperWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility);
83      ZKUtil.createWithParents(zk, zk.rsZNode);
84    }
85  
86    @Before
87    public void setUp() throws Exception {
88      zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
89      String fakeRs1 = ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234");
90      try {
91        ZKClusterId.setClusterId(zkw, new ClusterId());
92        rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
93        rp.init();
94        rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1));
95      } catch (Exception e) {
96        fail("Exception during test setup: " + e);
97      }
98      rsRemovedCount = new AtomicInteger(0);
99      rsRemovedData = "";
100     plChangedCount = new AtomicInteger(0);
101     plChangedData = new ArrayList<String>();
102     peerRemovedCount = new AtomicInteger(0);
103     peerRemovedData = "";
104   }
105 
106   @AfterClass
107   public static void tearDownAfterClass() throws Exception {
108     utility.shutdownMiniZKCluster();
109   }
110 
111   @Test
112   public void testGetListOfRegionServers() throws Exception {
113     // 0 region servers
114     assertEquals(0, rt.getListOfRegionServers().size());
115 
116     // 1 region server
117     ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234"));
118     assertEquals(1, rt.getListOfRegionServers().size());
119 
120     // 2 region servers
121     ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
122     assertEquals(2, rt.getListOfRegionServers().size());
123 
124     // 1 region server
125     ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
126     assertEquals(1, rt.getListOfRegionServers().size());
127 
128     // 0 region server
129     ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234"));
130     assertEquals(0, rt.getListOfRegionServers().size());
131   }
132 
133   @Test(timeout = 30000)
134   public void testRegionServerRemovedEvent() throws Exception {
135     ZKUtil.createAndWatch(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"),
136       HConstants.EMPTY_BYTE_ARRAY);
137     rt.registerListener(new DummyReplicationListener());
138     // delete one
139     ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
140     // wait for event
141     while (rsRemovedCount.get() < 1) {
142       Thread.sleep(5);
143     }
144     assertEquals("hostname2.example.org:1234", rsRemovedData);
145   }
146 
147   @Test(timeout = 30000)
148   public void testPeerRemovedEvent() throws Exception {
149     rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
150     rt.registerListener(new DummyReplicationListener());
151     rp.removePeer("5");
152     // wait for event
153     while (peerRemovedCount.get() < 1) {
154       Thread.sleep(5);
155     }
156     assertEquals("5", peerRemovedData);
157   }
158 
159   @Test(timeout = 30000)
160   public void testPeerListChangedEvent() throws Exception {
161     // add a peer
162     rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
163     zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true);
164     rt.registerListener(new DummyReplicationListener());
165     rp.disablePeer("5");
166     int tmp = plChangedCount.get();
167     LOG.info("Peer count=" + tmp);
168     ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5/peer-state");
169     // wait for event
170     while (plChangedCount.get() <= tmp) {
171       Thread.sleep(100);
172       LOG.info("Peer count=" + tmp);
173     }
174     assertEquals(1, plChangedData.size());
175     assertTrue(plChangedData.contains("5"));
176 
177     // clean up
178     //ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5");
179     rp.removePeer("5");
180   }
181 
182   @Test(timeout = 30000)
183   public void testPeerNameControl() throws Exception {
184     int exists = 0;
185     int hyphen = 0;
186     rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
187 
188     try{
189       rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
190     }catch(IllegalArgumentException e){
191       exists++;
192     }
193 
194     try{
195       rp.addPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
196     }catch(IllegalArgumentException e){
197       hyphen++;
198     }
199     assertEquals(1, exists);
200     assertEquals(1, hyphen);
201 
202     // clean up
203     rp.removePeer("6");
204   }
205 
206   private class DummyReplicationListener implements ReplicationListener {
207 
208     @Override
209     public void regionServerRemoved(String regionServer) {
210       rsRemovedData = regionServer;
211       rsRemovedCount.getAndIncrement();
212       LOG.debug("Received regionServerRemoved event: " + regionServer);
213     }
214 
215     @Override
216     public void peerRemoved(String peerId) {
217       peerRemovedData = peerId;
218       peerRemovedCount.getAndIncrement();
219       LOG.debug("Received peerRemoved event: " + peerId);
220     }
221 
222     @Override
223     public void peerListChanged(List<String> peerIds) {
224       plChangedData.clear();
225       plChangedData.addAll(peerIds);
226       int count = plChangedCount.getAndIncrement();
227       LOG.debug("Received peerListChanged event " + count);
228     }
229   }
230 
231   private class DummyServer implements Server {
232     private String serverName;
233     private boolean isAborted = false;
234     private boolean isStopped = false;
235 
236     public DummyServer(String serverName) {
237       this.serverName = serverName;
238     }
239 
240     @Override
241     public Configuration getConfiguration() {
242       return conf;
243     }
244 
245     @Override
246     public ZooKeeperWatcher getZooKeeper() {
247       return zkw;
248     }
249 
250     @Override
251     public CoordinatedStateManager getCoordinatedStateManager() {
252       return null;
253     }
254 
255     @Override
256     public ClusterConnection getConnection() {
257       return null;
258     }
259 
260     @Override
261     public MetaTableLocator getMetaTableLocator() {
262       return null;
263     }
264 
265     @Override
266     public ServerName getServerName() {
267       return ServerName.valueOf(this.serverName);
268     }
269 
270     @Override
271     public void abort(String why, Throwable e) {
272       LOG.info("Aborting " + serverName);
273       this.isAborted = true;
274     }
275 
276     @Override
277     public boolean isAborted() {
278       return this.isAborted;
279     }
280 
281     @Override
282     public void stop(String why) {
283       this.isStopped = true;
284     }
285 
286     @Override
287     public boolean isStopped() {
288       return this.isStopped;
289     }
290 
291     @Override
292     public ChoreService getChoreService() {
293       return null;
294     }
295   }
296 }