View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.concurrent.atomic.AtomicInteger;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.ClusterId;
29  import org.apache.hadoop.hbase.HBaseTestingUtility;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.MediumTests;
32  import org.apache.hadoop.hbase.Server;
33  import org.apache.hadoop.hbase.ServerName;
34  import org.apache.hadoop.hbase.catalog.CatalogTracker;
35  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
36  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
37  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
38  import org.junit.AfterClass;
39  import org.junit.Test;
40  import org.junit.Ignore;
41  
42  import static org.junit.Assert.*;
43  
44  import org.junit.Before;
45  import org.junit.BeforeClass;
46  import org.junit.experimental.categories.Category;
47  
48  /**
49   * This class tests the ReplicationTrackerZKImpl class and ReplicationListener interface. One
50   * MiniZKCluster is used throughout the entire class. The cluster is initialized with the creation
51   * of the rsZNode. All other znode creation/initialization is handled by the replication state
52   * interfaces (i.e. ReplicationPeers, etc.). Each test case in this class should ensure that the
53   * MiniZKCluster is cleaned and returned to it's initial state (i.e. nothing but the rsZNode).
54   */
55  @Category(MediumTests.class)
56  public class TestReplicationTrackerZKImpl {
57  
58    private static final Log LOG = LogFactory.getLog(TestReplicationTrackerZKImpl.class);
59  
60    private static Configuration conf;
61    private static HBaseTestingUtility utility;
62  
63    // Each one of the below variables are reinitialized before every test case
64    private ZooKeeperWatcher zkw;
65    private ReplicationPeers rp;
66    private ReplicationTracker rt;
67    private AtomicInteger rsRemovedCount;
68    private String rsRemovedData;
69    private AtomicInteger plChangedCount;
70    private List<String> plChangedData;
71    private AtomicInteger peerRemovedCount;
72    private String peerRemovedData;
73  
74    @BeforeClass
75    public static void setUpBeforeClass() throws Exception {
76      utility = new HBaseTestingUtility();
77      utility.startMiniZKCluster();
78      conf = utility.getConfiguration();
79      ZooKeeperWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility);
80      ZKUtil.createWithParents(zk, zk.rsZNode);
81    }
82  
83    @Before
84    public void setUp() throws Exception {
85      zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
86      String fakeRs1 = ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234");
87      try {
88        ZKClusterId.setClusterId(zkw, new ClusterId());
89        rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
90        rp.init();
91        rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1));
92      } catch (Exception e) {
93        fail("Exception during test setup: " + e);
94      }
95      rsRemovedCount = new AtomicInteger(0);
96      rsRemovedData = "";
97      plChangedCount = new AtomicInteger(0);
98      plChangedData = new ArrayList<String>();
99      peerRemovedCount = new AtomicInteger(0);
100     peerRemovedData = "";
101   }
102 
103   @AfterClass
104   public static void tearDownAfterClass() throws Exception {
105     utility.shutdownMiniZKCluster();
106   }
107 
108   @Test
109   public void testGetListOfRegionServers() throws Exception {
110     // 0 region servers
111     assertEquals(0, rt.getListOfRegionServers().size());
112 
113     // 1 region server
114     ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234"));
115     assertEquals(1, rt.getListOfRegionServers().size());
116 
117     // 2 region servers
118     ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
119     assertEquals(2, rt.getListOfRegionServers().size());
120 
121     // 1 region server
122     ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
123     assertEquals(1, rt.getListOfRegionServers().size());
124 
125     // 0 region server
126     ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234"));
127     assertEquals(0, rt.getListOfRegionServers().size());
128   }
129 
130   @Test(timeout = 30000)
131   public void testRegionServerRemovedEvent() throws Exception {
132     ZKUtil.createAndWatch(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"),
133       HConstants.EMPTY_BYTE_ARRAY);
134     rt.registerListener(new DummyReplicationListener());
135     // delete one
136     ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
137     // wait for event
138     while (rsRemovedCount.get() < 1) {
139       Thread.sleep(5);
140     }
141     assertEquals("hostname2.example.org:1234", rsRemovedData);
142   }
143 
144   @Ignore ("Flakey") @Test(timeout = 30000)
145   public void testPeerRemovedEvent() throws Exception {
146     rp.addPeer("5", utility.getClusterKey());
147     rt.registerListener(new DummyReplicationListener());
148     rp.removePeer("5");
149     // wait for event
150     while (peerRemovedCount.get() < 1) {
151       Thread.sleep(5);
152     }
153     assertEquals("5", peerRemovedData);
154   }
155 
156   @Ignore ("Flakey") @Test(timeout = 30000)
157   public void testPeerListChangedEvent() throws Exception {
158     // add a peer
159     rp.addPeer("5", utility.getClusterKey());
160     zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true);
161     rt.registerListener(new DummyReplicationListener());
162     rp.disablePeer("5");
163     ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5/peer-state");
164     // wait for event
165     int tmp = plChangedCount.get();
166     while (plChangedCount.get() <= tmp) {
167       Thread.sleep(5);
168     }
169     assertEquals(1, plChangedData.size());
170     assertTrue(plChangedData.contains("5"));
171 
172     // clean up
173     ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5");
174   }
175 
176   private class DummyReplicationListener implements ReplicationListener {
177 
178     @Override
179     public void regionServerRemoved(String regionServer) {
180       rsRemovedData = regionServer;
181       rsRemovedCount.getAndIncrement();
182       LOG.debug("Received regionServerRemoved event: " + regionServer);
183     }
184 
185     @Override
186     public void peerRemoved(String peerId) {
187       peerRemovedData = peerId;
188       peerRemovedCount.getAndIncrement();
189       LOG.debug("Received peerRemoved event: " + peerId);
190     }
191 
192     @Override
193     public void peerListChanged(List<String> peerIds) {
194       plChangedData.clear();
195       plChangedData.addAll(peerIds);
196       plChangedCount.getAndIncrement();
197       LOG.debug("Received peerListChanged event");
198     }
199   }
200 
201   private class DummyServer implements Server {
202     private String serverName;
203     private boolean isAborted = false;
204     private boolean isStopped = false;
205 
206     public DummyServer(String serverName) {
207       this.serverName = serverName;
208     }
209 
210     @Override
211     public Configuration getConfiguration() {
212       return conf;
213     }
214 
215     @Override
216     public ZooKeeperWatcher getZooKeeper() {
217       return zkw;
218     }
219 
220     @Override
221     public CatalogTracker getCatalogTracker() {
222       return null;
223     }
224 
225     @Override
226     public ServerName getServerName() {
227       return ServerName.valueOf(this.serverName);
228     }
229 
230     @Override
231     public void abort(String why, Throwable e) {
232       LOG.info("Aborting " + serverName);
233       this.isAborted = true;
234     }
235 
236     @Override
237     public boolean isAborted() {
238       return this.isAborted;
239     }
240 
241     @Override
242     public void stop(String why) {
243       this.isStopped = true;
244     }
245 
246     @Override
247     public boolean isStopped() {
248       return this.isStopped;
249     }
250   }
251 }
252