1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.hbase.HBaseTestingUtility;
25 import org.apache.hadoop.hbase.testclassification.LargeTests;
26 import org.apache.hadoop.hbase.UnknownScannerException;
27 import org.apache.hadoop.hbase.client.HTable;
28 import org.apache.hadoop.hbase.client.Result;
29 import org.apache.hadoop.hbase.client.ResultScanner;
30 import org.apache.hadoop.hbase.client.Scan;
31 import org.junit.experimental.categories.Category;
32
33 import static org.junit.Assert.fail;
34
35 @Category(LargeTests.class)
36 public class TestReplicationKillRS extends TestReplicationBase {
37
38 private static final Log LOG = LogFactory.getLog(TestReplicationKillRS.class);
39
40
41
42
43
44
45
46
47
48 public void loadTableAndKillRS(HBaseTestingUtility util) throws Exception {
49
50
51 int rsToKill1 =
52 util.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
53
54
55 Thread killer = killARegionServer(util, 5000, rsToKill1);
56
57 LOG.info("Start loading table");
58 int initialCount = utility1.loadTable((HTable)htable1, famName);
59 LOG.info("Done loading table");
60 killer.join(5000);
61 LOG.info("Done waiting for threads");
62
63 Result[] res;
64 while (true) {
65 try {
66 Scan scan = new Scan();
67 ResultScanner scanner = htable1.getScanner(scan);
68 res = scanner.next(initialCount);
69 scanner.close();
70 break;
71 } catch (UnknownScannerException ex) {
72 LOG.info("Cluster wasn't ready yet, restarting scanner");
73 }
74 }
75
76
77 if (res.length != initialCount) {
78 LOG.warn("We lost some rows on the master cluster!");
79
80 initialCount = res.length;
81 }
82
83 int lastCount = 0;
84
85 final long start = System.currentTimeMillis();
86 int i = 0;
87 while (true) {
88 if (i==NB_RETRIES-1) {
89 fail("Waited too much time for queueFailover replication. " +
90 "Waited "+(System.currentTimeMillis() - start)+"ms.");
91 }
92 Scan scan2 = new Scan();
93 ResultScanner scanner2 = htable2.getScanner(scan2);
94 Result[] res2 = scanner2.next(initialCount * 2);
95 scanner2.close();
96 if (res2.length < initialCount) {
97 if (lastCount < res2.length) {
98 i--;
99 } else {
100 i++;
101 }
102 lastCount = res2.length;
103 LOG.info("Only got " + lastCount + " rows instead of " +
104 initialCount + " current i=" + i);
105 Thread.sleep(SLEEP_TIME*2);
106 } else {
107 break;
108 }
109 }
110 }
111
112 private static Thread killARegionServer(final HBaseTestingUtility utility,
113 final long timeout, final int rs) {
114 Thread killer = new Thread() {
115 public void run() {
116 try {
117 Thread.sleep(timeout);
118 utility.getHBaseCluster().getRegionServer(rs).stop("Stopping as part of the test");
119 } catch (Exception e) {
120 LOG.error("Couldn't kill a region server", e);
121 }
122 }
123 };
124 killer.setDaemon(true);
125 killer.start();
126 return killer;
127 }
128 }