1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23
24 import java.net.URLEncoder;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.List;
28 import java.util.SortedMap;
29 import java.util.SortedSet;
30 import java.util.TreeSet;
31 import java.util.UUID;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.ClusterId;
42 import org.apache.hadoop.hbase.HBaseConfiguration;
43 import org.apache.hadoop.hbase.HBaseTestingUtility;
44 import org.apache.hadoop.hbase.HColumnDescriptor;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.HRegionInfo;
47 import org.apache.hadoop.hbase.HTableDescriptor;
48 import org.apache.hadoop.hbase.KeyValue;
49 import org.apache.hadoop.hbase.MediumTests;
50 import org.apache.hadoop.hbase.Server;
51 import org.apache.hadoop.hbase.ServerName;
52 import org.apache.hadoop.hbase.catalog.CatalogTracker;
53 import org.apache.hadoop.hbase.regionserver.wal.HLog;
54 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
55 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
56 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
57 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
58 import org.apache.hadoop.hbase.replication.ReplicationFactory;
59 import org.apache.hadoop.hbase.replication.ReplicationPeers;
60 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
61 import org.apache.hadoop.hbase.replication.ReplicationQueues;
62 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
63 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
64 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
65 import org.apache.hadoop.hbase.util.Bytes;
66 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
67 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
68 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
69 import org.junit.After;
70 import org.junit.AfterClass;
71 import org.junit.Before;
72 import org.junit.BeforeClass;
73 import org.junit.Test;
74 import org.junit.experimental.categories.Category;
75
76 import com.google.common.collect.Sets;
77
78 @Category(MediumTests.class)
79 public class TestReplicationSourceManager {
80
81 private static final Log LOG =
82 LogFactory.getLog(TestReplicationSourceManager.class);
83
84 private static Configuration conf;
85
86 private static HBaseTestingUtility utility;
87
88 private static Replication replication;
89
90 private static ReplicationSourceManager manager;
91
92 private static ZooKeeperWatcher zkw;
93
94 private static HTableDescriptor htd;
95
96 private static HRegionInfo hri;
97
98 private static final byte[] r1 = Bytes.toBytes("r1");
99
100 private static final byte[] r2 = Bytes.toBytes("r2");
101
102 private static final byte[] f1 = Bytes.toBytes("f1");
103
104 private static final TableName test =
105 TableName.valueOf("test");
106
107 private static final String slaveId = "1";
108
109 private static FileSystem fs;
110
111 private static String logName;
112
113 private static Path oldLogDir;
114
115 private static Path logDir;
116
117 private static CountDownLatch latch;
118
119 private static List<String> files = new ArrayList<String>();
120
121 @BeforeClass
122 public static void setUpBeforeClass() throws Exception {
123
124 conf = HBaseConfiguration.create();
125 conf.set("replication.replicationsource.implementation",
126 ReplicationSourceDummy.class.getCanonicalName());
127 conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
128 HConstants.REPLICATION_ENABLE_DEFAULT);
129 utility = new HBaseTestingUtility(conf);
130 utility.startMiniZKCluster();
131
132 zkw = new ZooKeeperWatcher(conf, "test", null);
133 ZKUtil.createWithParents(zkw, "/hbase/replication");
134 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
135 ZKUtil.setData(zkw, "/hbase/replication/peers/1",
136 Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
137 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
138 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
139 ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
140 ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
141 ZKUtil.createWithParents(zkw, "/hbase/replication/state");
142 ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
143
144 ZKClusterId.setClusterId(zkw, new ClusterId());
145 fs = FileSystem.get(conf);
146 oldLogDir = new Path(utility.getDataTestDir(),
147 HConstants.HREGION_OLDLOGDIR_NAME);
148 logDir = new Path(utility.getDataTestDir(),
149 HConstants.HREGION_LOGDIR_NAME);
150 replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
151 manager = replication.getReplicationManager();
152
153 logName = HConstants.HREGION_LOGDIR_NAME;
154
155 manager.addSource(slaveId);
156
157 htd = new HTableDescriptor(test);
158 HColumnDescriptor col = new HColumnDescriptor("f1");
159 col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
160 htd.addFamily(col);
161 col = new HColumnDescriptor("f2");
162 col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
163 htd.addFamily(col);
164
165 hri = new HRegionInfo(htd.getTableName(), r1, r2);
166 }
167
168 @AfterClass
169 public static void tearDownAfterClass() throws Exception {
170 manager.join();
171 utility.shutdownMiniCluster();
172 }
173
174 @Before
175 public void setUp() throws Exception {
176 fs.delete(logDir, true);
177 fs.delete(oldLogDir, true);
178 }
179
180 @After
181 public void tearDown() throws Exception {
182 setUp();
183 }
184
185 @Test
186 public void testLogRoll() throws Exception {
187 long seq = 0;
188 long baseline = 1000;
189 long time = baseline;
190 KeyValue kv = new KeyValue(r1, f1, r1);
191 WALEdit edit = new WALEdit();
192 edit.add(kv);
193
194 List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
195 listeners.add(replication);
196 HLog hlog = HLogFactory.createHLog(fs, utility.getDataTestDir(), logName,
197 conf, listeners, URLEncoder.encode("regionserver:60020", "UTF8"));
198 final AtomicLong sequenceId = new AtomicLong(1);
199 manager.init();
200 HTableDescriptor htd = new HTableDescriptor();
201 htd.addFamily(new HColumnDescriptor(f1));
202
203 for(long i = 1; i < 101; i++) {
204 if(i > 1 && i % 20 == 0) {
205 hlog.rollWriter();
206 }
207 LOG.info(i);
208 HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
209 System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
210 hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId);
211 }
212
213
214
215 LOG.info(baseline + " and " + time);
216 baseline += 101;
217 time = baseline;
218 LOG.info(baseline + " and " + time);
219
220 for (int i = 0; i < 3; i++) {
221 hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId);
222 }
223
224 assertEquals(6, manager.getHLogs().get(slaveId).size());
225
226 hlog.rollWriter();
227
228 manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
229 "1", 0, false, false);
230
231 hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId);
232
233 assertEquals(1, manager.getHLogs().size());
234
235
236
237 }
238
239 @Test
240 public void testClaimQueues() throws Exception {
241 LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
242 conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
243 final Server server = new DummyServer("hostname0.example.org");
244 ReplicationQueues rq =
245 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
246 server);
247 rq.init(server.getServerName().toString());
248
249 files.add("log1");
250 files.add("log2");
251 for (String file : files) {
252 rq.addLog("1", file);
253 }
254
255 Server s1 = new DummyServer("dummyserver1.example.org");
256 Server s2 = new DummyServer("dummyserver2.example.org");
257 Server s3 = new DummyServer("dummyserver3.example.org");
258
259
260 DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(
261 server.getServerName().getServerName(), s1);
262 DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(
263 server.getServerName().getServerName(), s2);
264 DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(
265 server.getServerName().getServerName(), s3);
266
267 latch = new CountDownLatch(3);
268
269 w1.start();
270 w2.start();
271 w3.start();
272
273 int populatedMap = 0;
274
275 latch.await();
276 populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated()
277 + w3.isLogZnodesMapPopulated();
278 assertEquals(1, populatedMap);
279 server.abort("", null);
280 }
281
282 @Test
283 public void testCleanupFailoverQueues() throws Exception {
284 final Server server = new DummyServer("hostname1.example.org");
285 ReplicationQueues rq =
286 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
287 server);
288 rq.init(server.getServerName().toString());
289
290 SortedSet<String> files = new TreeSet<String>();
291 files.add("log1");
292 files.add("log2");
293 for (String file : files) {
294 rq.addLog("1", file);
295 }
296 Server s1 = new DummyServer("dummyserver1.example.org");
297 ReplicationQueues rq1 =
298 ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
299 rq1.init(s1.getServerName().toString());
300 ReplicationPeers rp1 =
301 ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
302 rp1.init();
303 NodeFailoverWorker w1 =
304 manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
305 new Long(1), new Long(2)));
306 w1.start();
307 w1.join(5000);
308 assertEquals(1, manager.getHlogsByIdRecoveredQueues().size());
309 String id = "1-" + server.getServerName().getServerName();
310 assertEquals(files, manager.getHlogsByIdRecoveredQueues().get(id));
311 manager.cleanOldLogs("log2", id, true);
312
313 assertEquals(Sets.newHashSet("log2"), manager.getHlogsByIdRecoveredQueues().get(id));
314 }
315
316 @Test
317 public void testNodeFailoverDeadServerParsing() throws Exception {
318 LOG.debug("testNodeFailoverDeadServerParsing");
319 conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
320 final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
321 ReplicationQueues repQueues =
322 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
323 repQueues.init(server.getServerName().toString());
324
325 files.add("log1");
326 files.add("log2");
327 for (String file : files) {
328 repQueues.addLog("1", file);
329 }
330
331
332 Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
333 Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
334 Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
335
336
337 ReplicationQueues rq1 =
338 ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
339 rq1.init(s1.getServerName().toString());
340 SortedMap<String, SortedSet<String>> testMap =
341 rq1.claimQueues(server.getServerName().getServerName());
342 ReplicationQueues rq2 =
343 ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2);
344 rq2.init(s2.getServerName().toString());
345 testMap = rq2.claimQueues(s1.getServerName().getServerName());
346 ReplicationQueues rq3 =
347 ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3);
348 rq3.init(s3.getServerName().toString());
349 testMap = rq3.claimQueues(s2.getServerName().getServerName());
350
351 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey());
352 List<String> result = replicationQueueInfo.getDeadRegionServers();
353
354
355 assertTrue(result.contains(server.getServerName().getServerName()));
356 assertTrue(result.contains(s1.getServerName().getServerName()));
357 assertTrue(result.contains(s2.getServerName().getServerName()));
358
359 server.abort("", null);
360 }
361
362
363 static class DummyNodeFailoverWorker extends Thread {
364 private SortedMap<String, SortedSet<String>> logZnodesMap;
365 Server server;
366 private String deadRsZnode;
367 ReplicationQueues rq;
368
369 public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
370 this.deadRsZnode = znode;
371 this.server = s;
372 this.rq =
373 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
374 server);
375 this.rq.init(this.server.getServerName().toString());
376 }
377
378 @Override
379 public void run() {
380 try {
381 logZnodesMap = rq.claimQueues(deadRsZnode);
382 server.abort("Done with testing", null);
383 } catch (Exception e) {
384 LOG.error("Got exception while running NodeFailoverWorker", e);
385 } finally {
386 latch.countDown();
387 }
388 }
389
390
391
392
393 private int isLogZnodesMapPopulated() {
394 Collection<SortedSet<String>> sets = logZnodesMap.values();
395 if (sets.size() > 1) {
396 throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size());
397 }
398 if (sets.size() == 1) {
399 SortedSet<String> s = sets.iterator().next();
400 for (String file : files) {
401
402 if (!s.contains(file)) {
403 return 0;
404 }
405 }
406 return 1;
407 }
408 return 0;
409 }
410 }
411
412 static class DummyServer implements Server {
413 String hostname;
414
415 DummyServer() {
416 hostname = "hostname.example.org";
417 }
418
419 DummyServer(String hostname) {
420 this.hostname = hostname;
421 }
422
423 @Override
424 public Configuration getConfiguration() {
425 return conf;
426 }
427
428 @Override
429 public ZooKeeperWatcher getZooKeeper() {
430 return zkw;
431 }
432
433 @Override
434 public CatalogTracker getCatalogTracker() {
435 return null;
436 }
437
438 @Override
439 public ServerName getServerName() {
440 return ServerName.valueOf(hostname, 1234, 1L);
441 }
442
443 @Override
444 public void abort(String why, Throwable e) {
445
446 }
447
448 @Override
449 public boolean isAborted() {
450 return false;
451 }
452
453 @Override
454 public void stop(String why) {
455
456 }
457
458 @Override
459 public boolean isStopped() {
460 return false;
461 }
462 }
463
464 }
465