1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import static org.junit.Assert.*;
22
23 import java.util.List;
24 import java.util.SortedMap;
25 import java.util.SortedSet;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.ServerName;
30 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
31 import org.junit.Before;
32 import org.junit.Test;
33
34
35
36
37
38 public abstract class TestReplicationStateBasic {
39
40 protected ReplicationQueues rq1;
41 protected ReplicationQueues rq2;
42 protected ReplicationQueues rq3;
43 protected ReplicationQueuesClient rqc;
44 protected String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString();
45 protected String server2 = ServerName.valueOf("hostname2.example.org", 1234, -1L).toString();
46 protected String server3 = ServerName.valueOf("hostname3.example.org", 1234, -1L).toString();
47 protected ReplicationPeers rp;
48 protected static final String ID_ONE = "1";
49 protected static final String ID_TWO = "2";
50 protected static String KEY_ONE;
51 protected static String KEY_TWO;
52
53
54 protected String OUR_ID = "3";
55 protected String OUR_KEY;
56
57 protected static int zkTimeoutCount;
58 protected static final int ZK_MAX_COUNT = 300;
59 protected static final int ZK_SLEEP_INTERVAL = 100;
60
61 private static final Log LOG = LogFactory.getLog(TestReplicationStateBasic.class);
62
63 @Before
64 public void setUp() {
65 zkTimeoutCount = 0;
66 }
67
68 @Test
69 public void testReplicationQueuesClient() throws ReplicationException {
70 rqc.init();
71
72 assertEquals(0, rqc.getListOfReplicators().size());
73 assertNull(rqc.getLogsInQueue(server1, "qId1"));
74 assertNull(rqc.getAllQueues(server1));
75
76
77
78
79
80 rq1.init(server1);
81 rq2.init(server2);
82 rq1.addLog("qId1", "trash");
83 rq1.removeLog("qId1", "trash");
84 rq1.addLog("qId2", "filename1");
85 rq1.addLog("qId3", "filename2");
86 rq1.addLog("qId3", "filename3");
87 rq2.addLog("trash", "trash");
88 rq2.removeQueue("trash");
89
90 List<String> reps = rqc.getListOfReplicators();
91 assertEquals(2, reps.size());
92 assertTrue(server1, reps.contains(server1));
93 assertTrue(server2, reps.contains(server2));
94
95 assertNull(rqc.getLogsInQueue("bogus", "bogus"));
96 assertNull(rqc.getLogsInQueue(server1, "bogus"));
97 assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size());
98 assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size());
99 assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0));
100
101 assertNull(rqc.getAllQueues("bogus"));
102 assertEquals(0, rqc.getAllQueues(server2).size());
103 List<String> list = rqc.getAllQueues(server1);
104 assertEquals(3, list.size());
105 assertTrue(list.contains("qId2"));
106 assertTrue(list.contains("qId3"));
107 }
108
109 @Test
110 public void testReplicationQueues() throws ReplicationException {
111 rq1.init(server1);
112 rq2.init(server2);
113 rq3.init(server3);
114
115 rp.init();
116
117
118 assertEquals(3, rq1.getListOfReplicators().size());
119 rq1.removeQueue("bogus");
120 rq1.removeLog("bogus", "bogus");
121 rq1.removeAllQueues();
122 assertNull(rq1.getAllQueues());
123 assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
124 assertNull(rq1.getLogsInQueue("bogus"));
125 assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, -1L).toString()).size());
126
127 rq1.setLogPosition("bogus", "bogus", 5L);
128
129 populateQueues();
130
131 assertEquals(3, rq1.getListOfReplicators().size());
132 assertEquals(0, rq2.getLogsInQueue("qId1").size());
133 assertEquals(5, rq3.getLogsInQueue("qId5").size());
134 assertEquals(0, rq3.getLogPosition("qId1", "filename0"));
135 rq3.setLogPosition("qId5", "filename4", 354L);
136 assertEquals(354L, rq3.getLogPosition("qId5", "filename4"));
137
138 assertEquals(5, rq3.getLogsInQueue("qId5").size());
139 assertEquals(0, rq2.getLogsInQueue("qId1").size());
140 assertEquals(0, rq1.getAllQueues().size());
141 assertEquals(1, rq2.getAllQueues().size());
142 assertEquals(5, rq3.getAllQueues().size());
143
144 assertEquals(0, rq3.claimQueues(server1).size());
145 assertEquals(2, rq3.getListOfReplicators().size());
146
147 SortedMap<String, SortedSet<String>> queues = rq2.claimQueues(server3);
148 assertEquals(5, queues.size());
149 assertEquals(1, rq2.getListOfReplicators().size());
150
151
152 assertEquals(0, rq2.claimQueues(server2).size());
153
154 assertEquals(6, rq2.getAllQueues().size());
155
156 rq2.removeAllQueues();
157
158 assertEquals(0, rq2.getListOfReplicators().size());
159 }
160
161 @Test
162 public void testReplicationPeers() throws Exception {
163 rp.init();
164
165
166 try {
167 rp.removePeer("bogus");
168 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
169 } catch (IllegalArgumentException e) {
170 }
171 try {
172 rp.enablePeer("bogus");
173 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
174 } catch (IllegalArgumentException e) {
175 }
176 try {
177 rp.disablePeer("bogus");
178 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
179 } catch (IllegalArgumentException e) {
180 }
181 try {
182 rp.getStatusOfConnectedPeer("bogus");
183 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
184 } catch (IllegalArgumentException e) {
185 }
186 assertFalse(rp.connectToPeer("bogus"));
187 rp.disconnectFromPeer("bogus");
188 assertEquals(0, rp.getRegionServersOfConnectedPeer("bogus").size());
189 assertNull(rp.getPeerUUID("bogus"));
190 assertNull(rp.getPeerConf("bogus"));
191 assertNumberOfPeers(0, 0);
192
193
194 rp.addPeer(ID_ONE, KEY_ONE);
195 assertNumberOfPeers(0, 1);
196 rp.addPeer(ID_TWO, KEY_TWO);
197 assertNumberOfPeers(0, 2);
198
199
200 try {
201 rp.getStatusOfConnectedPeer(ID_ONE);
202 fail("There are no connected peers, should have thrown an IllegalArgumentException");
203 } catch (IllegalArgumentException e) {
204 }
205 assertNull(rp.getPeerUUID(ID_ONE));
206 assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE)));
207 rp.disconnectFromPeer(ID_ONE);
208 assertEquals(0, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
209
210
211 rp.connectToPeer(ID_ONE);
212 assertNumberOfPeers(1, 2);
213 assertTrue(rp.getStatusOfConnectedPeer(ID_ONE));
214 rp.disablePeer(ID_ONE);
215 assertConnectedPeerStatus(false, ID_ONE);
216 rp.enablePeer(ID_ONE);
217 assertConnectedPeerStatus(true, ID_ONE);
218 assertEquals(1, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
219 assertNotNull(rp.getPeerUUID(ID_ONE).toString());
220
221
222 rp.disconnectFromPeer(ID_ONE);
223 assertNumberOfPeers(0, 2);
224 try {
225 rp.getStatusOfConnectedPeer(ID_ONE);
226 fail("There are no connected peers, should have thrown an IllegalArgumentException");
227 } catch (IllegalArgumentException e) {
228 }
229 }
230
231 protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
232
233 if (status != rp.getStatusOfPeerFromBackingStore(peerId)) {
234 fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
235 }
236 while (true) {
237 if (status == rp.getStatusOfConnectedPeer(peerId)) {
238 return;
239 }
240 if (zkTimeoutCount < ZK_MAX_COUNT) {
241 LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
242 + ", sleeping and trying again.");
243 Thread.sleep(ZK_SLEEP_INTERVAL);
244 } else {
245 fail("Timed out waiting for ConnectedPeerStatus to be " + status);
246 }
247 }
248 }
249
250 protected void assertNumberOfPeers(int connected, int total) {
251 assertEquals(total, rp.getAllPeerClusterKeys().size());
252 assertEquals(connected, rp.getConnectedPeers().size());
253 assertEquals(total, rp.getAllPeerIds().size());
254 }
255
256
257
258
259
260 protected void populateQueues() throws ReplicationException {
261 rq1.addLog("trash", "trash");
262 rq1.removeQueue("trash");
263
264 rq2.addLog("qId1", "trash");
265 rq2.removeLog("qId1", "trash");
266
267 for (int i = 1; i < 6; i++) {
268 for (int j = 0; j < i; j++) {
269 rq3.addLog("qId" + i, "filename" + j);
270 }
271
272 rp.addPeer("qId" + i, "bogus" + i);
273 }
274 }
275 }
276