View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication;
20  
21  import static org.junit.Assert.*;
22  
23  import java.util.List;
24  import java.util.SortedMap;
25  import java.util.SortedSet;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.ServerName;
30  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
31  import org.junit.Before;
32  import org.junit.Test;
33  
34  /**
35   * White box testing for replication state interfaces. Implementations should extend this class, and
36   * initialize the interfaces properly.
37   */
38  public abstract class TestReplicationStateBasic {
39  
40    protected ReplicationQueues rq1;
41    protected ReplicationQueues rq2;
42    protected ReplicationQueues rq3;
43    protected ReplicationQueuesClient rqc;
44    protected String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString();
45    protected String server2 = ServerName.valueOf("hostname2.example.org", 1234, -1L).toString();
46    protected String server3 = ServerName.valueOf("hostname3.example.org", 1234, -1L).toString();
47    protected ReplicationPeers rp;
48    protected static final String ID_ONE = "1";
49    protected static final String ID_TWO = "2";
50    protected static String KEY_ONE;
51    protected static String KEY_TWO;
52  
53    // For testing when we try to replicate to ourself
54    protected String OUR_ID = "3";
55    protected String OUR_KEY;
56  
57    protected static int zkTimeoutCount;
58    protected static final int ZK_MAX_COUNT = 300;
59    protected static final int ZK_SLEEP_INTERVAL = 100; // millis
60  
61    private static final Log LOG = LogFactory.getLog(TestReplicationStateBasic.class);
62  
63    @Before
64    public void setUp() {
65      zkTimeoutCount = 0;
66    }
67  
68    @Test
69    public void testReplicationQueuesClient() throws ReplicationException {
70      rqc.init();
71      // Test methods with empty state
72      assertEquals(0, rqc.getListOfReplicators().size());
73      assertNull(rqc.getLogsInQueue(server1, "qId1"));
74      assertNull(rqc.getAllQueues(server1));
75  
76      /*
77       * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each --
78       * server2: zero queues
79       */
80      rq1.init(server1);
81      rq2.init(server2);
82      rq1.addLog("qId1", "trash");
83      rq1.removeLog("qId1", "trash");
84      rq1.addLog("qId2", "filename1");
85      rq1.addLog("qId3", "filename2");
86      rq1.addLog("qId3", "filename3");
87      rq2.addLog("trash", "trash");
88      rq2.removeQueue("trash");
89  
90      List<String> reps = rqc.getListOfReplicators();
91      assertEquals(2, reps.size());
92      assertTrue(server1, reps.contains(server1));
93      assertTrue(server2, reps.contains(server2));
94  
95      assertNull(rqc.getLogsInQueue("bogus", "bogus"));
96      assertNull(rqc.getLogsInQueue(server1, "bogus"));
97      assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size());
98      assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size());
99      assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0));
100 
101     assertNull(rqc.getAllQueues("bogus"));
102     assertEquals(0, rqc.getAllQueues(server2).size());
103     List<String> list = rqc.getAllQueues(server1);
104     assertEquals(3, list.size());
105     assertTrue(list.contains("qId2"));
106     assertTrue(list.contains("qId3"));
107   }
108 
109   @Test
110   public void testReplicationQueues() throws ReplicationException {
111     rq1.init(server1);
112     rq2.init(server2);
113     rq3.init(server3);
114     //Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
115     rp.init();
116 
117     // 3 replicators should exist
118     assertEquals(3, rq1.getListOfReplicators().size());
119     rq1.removeQueue("bogus");
120     rq1.removeLog("bogus", "bogus");
121     rq1.removeAllQueues();
122     assertNull(rq1.getAllQueues());
123     assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
124     assertNull(rq1.getLogsInQueue("bogus"));
125     assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, -1L).toString()).size());
126 
127     rq1.setLogPosition("bogus", "bogus", 5L);
128 
129     populateQueues();
130 
131     assertEquals(3, rq1.getListOfReplicators().size());
132     assertEquals(0, rq2.getLogsInQueue("qId1").size());
133     assertEquals(5, rq3.getLogsInQueue("qId5").size());
134     assertEquals(0, rq3.getLogPosition("qId1", "filename0"));
135     rq3.setLogPosition("qId5", "filename4", 354L);
136     assertEquals(354L, rq3.getLogPosition("qId5", "filename4"));
137 
138     assertEquals(5, rq3.getLogsInQueue("qId5").size());
139     assertEquals(0, rq2.getLogsInQueue("qId1").size());
140     assertEquals(0, rq1.getAllQueues().size());
141     assertEquals(1, rq2.getAllQueues().size());
142     assertEquals(5, rq3.getAllQueues().size());
143 
144     assertEquals(0, rq3.claimQueues(server1).size());
145     assertEquals(2, rq3.getListOfReplicators().size());
146 
147     SortedMap<String, SortedSet<String>> queues = rq2.claimQueues(server3);
148     assertEquals(5, queues.size());
149     assertEquals(1, rq2.getListOfReplicators().size());
150 
151     // Try to claim our own queues
152     assertEquals(0, rq2.claimQueues(server2).size());
153 
154     assertEquals(6, rq2.getAllQueues().size());
155 
156     rq2.removeAllQueues();
157 
158     assertEquals(0, rq2.getListOfReplicators().size());
159   }
160 
161   @Test
162   public void testReplicationPeers() throws Exception {
163     rp.init();
164 
165     // Test methods with non-existent peer ids
166     try {
167       rp.removePeer("bogus");
168       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
169     } catch (IllegalArgumentException e) {
170     }
171     try {
172       rp.enablePeer("bogus");
173       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
174     } catch (IllegalArgumentException e) {
175     }
176     try {
177       rp.disablePeer("bogus");
178       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
179     } catch (IllegalArgumentException e) {
180     }
181     try {
182       rp.getStatusOfConnectedPeer("bogus");
183       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
184     } catch (IllegalArgumentException e) {
185     }
186     assertFalse(rp.connectToPeer("bogus"));
187     rp.disconnectFromPeer("bogus");
188     assertEquals(0, rp.getRegionServersOfConnectedPeer("bogus").size());
189     assertNull(rp.getPeerUUID("bogus"));
190     assertNull(rp.getPeerConf("bogus"));
191     assertNumberOfPeers(0, 0);
192 
193     // Add some peers
194     rp.addPeer(ID_ONE, KEY_ONE);
195     assertNumberOfPeers(0, 1);
196     rp.addPeer(ID_TWO, KEY_TWO);
197     assertNumberOfPeers(0, 2);
198 
199     // Test methods with a peer that is added but not connected
200     try {
201       rp.getStatusOfConnectedPeer(ID_ONE);
202       fail("There are no connected peers, should have thrown an IllegalArgumentException");
203     } catch (IllegalArgumentException e) {
204     }
205     assertNull(rp.getPeerUUID(ID_ONE));
206     assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE)));
207     rp.disconnectFromPeer(ID_ONE);
208     assertEquals(0, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
209 
210     // Connect to one peer
211     rp.connectToPeer(ID_ONE);
212     assertNumberOfPeers(1, 2);
213     assertTrue(rp.getStatusOfConnectedPeer(ID_ONE));
214     rp.disablePeer(ID_ONE);
215     assertConnectedPeerStatus(false, ID_ONE);
216     rp.enablePeer(ID_ONE);
217     assertConnectedPeerStatus(true, ID_ONE);
218     assertEquals(1, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
219     assertNotNull(rp.getPeerUUID(ID_ONE).toString());
220 
221     // Disconnect peer
222     rp.disconnectFromPeer(ID_ONE);
223     assertNumberOfPeers(0, 2);
224     try {
225       rp.getStatusOfConnectedPeer(ID_ONE);
226       fail("There are no connected peers, should have thrown an IllegalArgumentException");
227     } catch (IllegalArgumentException e) {
228     }
229   }
230 
231   protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
232     // we can first check if the value was changed in the store, if it wasn't then fail right away
233     if (status != rp.getStatusOfPeerFromBackingStore(peerId)) {
234       fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
235     }
236     while (true) {
237       if (status == rp.getStatusOfConnectedPeer(peerId)) {
238         return;
239       }
240       if (zkTimeoutCount < ZK_MAX_COUNT) {
241         LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
242             + ", sleeping and trying again.");
243         Thread.sleep(ZK_SLEEP_INTERVAL);
244       } else {
245         fail("Timed out waiting for ConnectedPeerStatus to be " + status);
246       }
247     }
248   }
249 
250   protected void assertNumberOfPeers(int connected, int total) {
251     assertEquals(total, rp.getAllPeerClusterKeys().size());
252     assertEquals(connected, rp.getConnectedPeers().size());
253     assertEquals(total, rp.getAllPeerIds().size());
254   }
255 
256   /*
257    * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
258    * 3, 4, 5 log files respectively
259    */
260   protected void populateQueues() throws ReplicationException {
261     rq1.addLog("trash", "trash");
262     rq1.removeQueue("trash");
263 
264     rq2.addLog("qId1", "trash");
265     rq2.removeLog("qId1", "trash");
266 
267     for (int i = 1; i < 6; i++) {
268       for (int j = 0; j < i; j++) {
269         rq3.addLog("qId" + i, "filename" + j);
270       }
271       //Add peers for the corresponding queues so they are not orphans
272       rp.addPeer("qId" + i, "bogus" + i);
273     }
274   }
275 }
276