1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23
24 import java.net.URLEncoder;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.SortedMap;
30 import java.util.SortedSet;
31 import java.util.TreeSet;
32 import java.util.UUID;
33 import java.util.concurrent.CountDownLatch;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.ChoreService;
41 import org.apache.hadoop.hbase.ClusterId;
42 import org.apache.hadoop.hbase.CoordinatedStateManager;
43 import org.apache.hadoop.hbase.HBaseConfiguration;
44 import org.apache.hadoop.hbase.HBaseTestingUtility;
45 import org.apache.hadoop.hbase.HColumnDescriptor;
46 import org.apache.hadoop.hbase.HConstants;
47 import org.apache.hadoop.hbase.HRegionInfo;
48 import org.apache.hadoop.hbase.HTableDescriptor;
49 import org.apache.hadoop.hbase.KeyValue;
50 import org.apache.hadoop.hbase.Server;
51 import org.apache.hadoop.hbase.ServerName;
52 import org.apache.hadoop.hbase.TableName;
53 import org.apache.hadoop.hbase.client.ClusterConnection;
54 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
55 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
56 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
57 import org.apache.hadoop.hbase.replication.ReplicationFactory;
58 import org.apache.hadoop.hbase.replication.ReplicationPeers;
59 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
60 import org.apache.hadoop.hbase.replication.ReplicationQueues;
61 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
62 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
63 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
64 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
65 import org.apache.hadoop.hbase.testclassification.MediumTests;
66 import org.apache.hadoop.hbase.util.Bytes;
67 import org.apache.hadoop.hbase.util.FSUtils;
68 import org.apache.hadoop.hbase.wal.WAL;
69 import org.apache.hadoop.hbase.wal.WALFactory;
70 import org.apache.hadoop.hbase.wal.WALKey;
71 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
72 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
73 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
74 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
75 import org.junit.After;
76 import org.junit.AfterClass;
77 import org.junit.Before;
78 import org.junit.BeforeClass;
79 import org.junit.Test;
80 import org.junit.experimental.categories.Category;
81
82 import com.google.common.collect.Sets;
83
84 @Category(MediumTests.class)
85 public class TestReplicationSourceManager {
86
87 private static final Log LOG =
88 LogFactory.getLog(TestReplicationSourceManager.class);
89
90 private static Configuration conf;
91
92 private static HBaseTestingUtility utility;
93
94 private static Replication replication;
95
96 private static ReplicationSourceManager manager;
97
98 private static ZooKeeperWatcher zkw;
99
100 private static HTableDescriptor htd;
101
102 private static HRegionInfo hri;
103
104 private static final byte[] r1 = Bytes.toBytes("r1");
105
106 private static final byte[] r2 = Bytes.toBytes("r2");
107
108 private static final byte[] f1 = Bytes.toBytes("f1");
109
110 private static final TableName test =
111 TableName.valueOf("test");
112
113 private static final String slaveId = "1";
114
115 private static FileSystem fs;
116
117 private static Path oldLogDir;
118
119 private static Path logDir;
120
121 private static CountDownLatch latch;
122
123 private static List<String> files = new ArrayList<String>();
124
125 @BeforeClass
126 public static void setUpBeforeClass() throws Exception {
127
128 conf = HBaseConfiguration.create();
129 conf.set("replication.replicationsource.implementation",
130 ReplicationSourceDummy.class.getCanonicalName());
131 conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
132 HConstants.REPLICATION_ENABLE_DEFAULT);
133 conf.setLong("replication.sleep.before.failover", 2000);
134 conf.setInt("replication.source.maxretriesmultiplier", 10);
135 utility = new HBaseTestingUtility(conf);
136 utility.startMiniZKCluster();
137
138 zkw = new ZooKeeperWatcher(conf, "test", null);
139 ZKUtil.createWithParents(zkw, "/hbase/replication");
140 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
141 ZKUtil.setData(zkw, "/hbase/replication/peers/1",
142 Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
143 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
144 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
145 ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
146 ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
147 ZKUtil.createWithParents(zkw, "/hbase/replication/state");
148 ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
149
150 ZKClusterId.setClusterId(zkw, new ClusterId());
151 FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
152 fs = FileSystem.get(conf);
153 oldLogDir = new Path(utility.getDataTestDir(),
154 HConstants.HREGION_OLDLOGDIR_NAME);
155 logDir = new Path(utility.getDataTestDir(),
156 HConstants.HREGION_LOGDIR_NAME);
157 replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
158 manager = replication.getReplicationManager();
159
160 manager.addSource(slaveId);
161
162 htd = new HTableDescriptor(test);
163 HColumnDescriptor col = new HColumnDescriptor("f1");
164 col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
165 htd.addFamily(col);
166 col = new HColumnDescriptor("f2");
167 col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
168 htd.addFamily(col);
169
170 hri = new HRegionInfo(htd.getTableName(), r1, r2);
171 }
172
173 @AfterClass
174 public static void tearDownAfterClass() throws Exception {
175 manager.join();
176 utility.shutdownMiniCluster();
177 }
178
179 @Before
180 public void setUp() throws Exception {
181 fs.delete(logDir, true);
182 fs.delete(oldLogDir, true);
183 }
184
185 @After
186 public void tearDown() throws Exception {
187 setUp();
188 }
189
190 @Test
191 public void testLogRoll() throws Exception {
192 long baseline = 1000;
193 long time = baseline;
194 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
195 KeyValue kv = new KeyValue(r1, f1, r1);
196 WALEdit edit = new WALEdit();
197 edit.add(kv);
198
199 List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
200 listeners.add(replication);
201 final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners,
202 URLEncoder.encode("regionserver:60020", "UTF8"));
203 final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes());
204 manager.init();
205 HTableDescriptor htd = new HTableDescriptor();
206 htd.addFamily(new HColumnDescriptor(f1));
207
208 for(long i = 1; i < 101; i++) {
209 if(i > 1 && i % 20 == 0) {
210 wal.rollWriter();
211 }
212 LOG.info(i);
213 final long txid = wal.append(htd,
214 hri,
215 new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
216 edit,
217 true);
218 wal.sync(txid);
219 }
220
221
222
223 LOG.info(baseline + " and " + time);
224 baseline += 101;
225 time = baseline;
226 LOG.info(baseline + " and " + time);
227
228 for (int i = 0; i < 3; i++) {
229 wal.append(htd, hri,
230 new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
231 edit,
232 true);
233 }
234 wal.sync();
235
236 int logNumber = 0;
237 for (Map.Entry<String, SortedSet<String>> entry : manager.getWALs().get(slaveId).entrySet()) {
238 logNumber += entry.getValue().size();
239 }
240 assertEquals(6, logNumber);
241
242 wal.rollWriter();
243
244 manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
245 "1", 0, false, false);
246
247 wal.append(htd, hri,
248 new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
249 edit,
250 true);
251 wal.sync();
252
253 assertEquals(1, manager.getWALs().size());
254
255
256
257 }
258
259 @Test
260 public void testClaimQueues() throws Exception {
261 LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
262 conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
263 final Server server = new DummyServer("hostname0.example.org");
264 ReplicationQueues rq =
265 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
266 server);
267 rq.init(server.getServerName().toString());
268
269 files.add("log1");
270 files.add("log2");
271 for (String file : files) {
272 rq.addLog("1", file);
273 }
274
275 Server s1 = new DummyServer("dummyserver1.example.org");
276 Server s2 = new DummyServer("dummyserver2.example.org");
277 Server s3 = new DummyServer("dummyserver3.example.org");
278
279
280 DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(
281 server.getServerName().getServerName(), s1);
282 DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(
283 server.getServerName().getServerName(), s2);
284 DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(
285 server.getServerName().getServerName(), s3);
286
287 latch = new CountDownLatch(3);
288
289 w1.start();
290 w2.start();
291 w3.start();
292
293 int populatedMap = 0;
294
295 latch.await();
296 populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated()
297 + w3.isLogZnodesMapPopulated();
298 assertEquals(1, populatedMap);
299 server.abort("", null);
300 }
301
302 @Test
303 public void testCleanupFailoverQueues() throws Exception {
304 final Server server = new DummyServer("hostname1.example.org");
305 ReplicationQueues rq =
306 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
307 server);
308 rq.init(server.getServerName().toString());
309
310 SortedSet<String> files = new TreeSet<String>();
311 String group = "testgroup";
312 String file1 = group + ".log1";
313 String file2 = group + ".log2";
314 files.add(file1);
315 files.add(file2);
316 for (String file : files) {
317 rq.addLog("1", file);
318 }
319 Server s1 = new DummyServer("dummyserver1.example.org");
320 ReplicationQueues rq1 =
321 ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
322 rq1.init(s1.getServerName().toString());
323 ReplicationPeers rp1 =
324 ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
325 rp1.init();
326 NodeFailoverWorker w1 =
327 manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
328 new Long(1), new Long(2)));
329 w1.start();
330 w1.join(5000);
331 assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
332 String id = "1-" + server.getServerName().getServerName();
333 assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
334 manager.cleanOldLogs(file2, id, true);
335
336 assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
337 }
338
339 @Test
340 public void testNodeFailoverDeadServerParsing() throws Exception {
341 LOG.debug("testNodeFailoverDeadServerParsing");
342 conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
343 final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
344 ReplicationQueues repQueues =
345 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
346 repQueues.init(server.getServerName().toString());
347
348 files.add("log1");
349 files.add("log2");
350 for (String file : files) {
351 repQueues.addLog("1", file);
352 }
353
354
355 Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
356 Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
357 Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
358
359
360 ReplicationQueues rq1 =
361 ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
362 rq1.init(s1.getServerName().toString());
363 SortedMap<String, SortedSet<String>> testMap =
364 rq1.claimQueues(server.getServerName().getServerName());
365 ReplicationQueues rq2 =
366 ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2);
367 rq2.init(s2.getServerName().toString());
368 testMap = rq2.claimQueues(s1.getServerName().getServerName());
369 ReplicationQueues rq3 =
370 ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3);
371 rq3.init(s3.getServerName().toString());
372 testMap = rq3.claimQueues(s2.getServerName().getServerName());
373
374 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey());
375 List<String> result = replicationQueueInfo.getDeadRegionServers();
376
377
378 assertTrue(result.contains(server.getServerName().getServerName()));
379 assertTrue(result.contains(s1.getServerName().getServerName()));
380 assertTrue(result.contains(s2.getServerName().getServerName()));
381
382 server.abort("", null);
383 }
384
385 @Test
386 public void testFailoverDeadServerCversionChange() throws Exception {
387 LOG.debug("testFailoverDeadServerCversionChange");
388
389 conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
390 final Server s0 = new DummyServer("cversion-change0.example.org");
391 ReplicationQueues repQueues =
392 ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
393 repQueues.init(s0.getServerName().toString());
394
395 files.add("log1");
396 files.add("log2");
397 for (String file : files) {
398 repQueues.addLog("1", file);
399 }
400
401 Server s1 = new DummyServer("cversion-change1.example.org");
402 ReplicationQueues rq1 =
403 ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
404 rq1.init(s1.getServerName().toString());
405
406 ReplicationQueuesClient client =
407 ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1);
408
409 int v0 = client.getQueuesZNodeCversion();
410 rq1.claimQueues(s0.getServerName().getServerName());
411 int v1 = client.getQueuesZNodeCversion();
412
413 assertEquals(v0 + 1, v1);
414
415 s0.abort("", null);
416 }
417
418 static class DummyNodeFailoverWorker extends Thread {
419 private SortedMap<String, SortedSet<String>> logZnodesMap;
420 Server server;
421 private String deadRsZnode;
422 ReplicationQueues rq;
423
424 public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
425 this.deadRsZnode = znode;
426 this.server = s;
427 this.rq =
428 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
429 server);
430 this.rq.init(this.server.getServerName().toString());
431 }
432
433 @Override
434 public void run() {
435 try {
436 logZnodesMap = rq.claimQueues(deadRsZnode);
437 server.abort("Done with testing", null);
438 } catch (Exception e) {
439 LOG.error("Got exception while running NodeFailoverWorker", e);
440 } finally {
441 latch.countDown();
442 }
443 }
444
445
446
447
448 private int isLogZnodesMapPopulated() {
449 Collection<SortedSet<String>> sets = logZnodesMap.values();
450 if (sets.size() > 1) {
451 throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size());
452 }
453 if (sets.size() == 1) {
454 SortedSet<String> s = sets.iterator().next();
455 for (String file : files) {
456
457 if (!s.contains(file)) {
458 return 0;
459 }
460 }
461 return 1;
462 }
463 return 0;
464 }
465 }
466
467 static class DummyServer implements Server {
468 String hostname;
469
470 DummyServer() {
471 hostname = "hostname.example.org";
472 }
473
474 DummyServer(String hostname) {
475 this.hostname = hostname;
476 }
477
478 @Override
479 public Configuration getConfiguration() {
480 return conf;
481 }
482
483 @Override
484 public ZooKeeperWatcher getZooKeeper() {
485 return zkw;
486 }
487
488 @Override
489 public CoordinatedStateManager getCoordinatedStateManager() {
490 return null;
491 }
492 @Override
493 public ClusterConnection getConnection() {
494 return null;
495 }
496
497 @Override
498 public MetaTableLocator getMetaTableLocator() {
499 return null;
500 }
501
502 @Override
503 public ServerName getServerName() {
504 return ServerName.valueOf(hostname, 1234, 1L);
505 }
506
507 @Override
508 public void abort(String why, Throwable e) {
509
510 }
511
512 @Override
513 public boolean isAborted() {
514 return false;
515 }
516
517 @Override
518 public void stop(String why) {
519
520 }
521
522 @Override
523 public boolean isStopped() {
524 return false;
525 }
526
527 @Override
528 public ChoreService getChoreService() {
529 return null;
530 }
531 }
532 }