1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import static org.junit.Assert.assertArrayEquals;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.fail;
24
25 import java.io.IOException;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.LargeTests;
30 import org.apache.hadoop.hbase.MiniHBaseCluster;
31 import org.apache.hadoop.hbase.client.Get;
32 import org.apache.hadoop.hbase.client.HTable;
33 import org.apache.hadoop.hbase.client.Put;
34 import org.apache.hadoop.hbase.client.Result;
35 import org.apache.hadoop.hbase.client.ResultScanner;
36 import org.apache.hadoop.hbase.client.Scan;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.hbase.util.JVMClusterUtil;
39 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
40 import org.junit.Before;
41 import org.junit.Test;
42 import org.junit.experimental.categories.Category;
43
44
45
46
47 @Category(LargeTests.class)
48 public class TestReplicationChangingPeerRegionservers extends TestReplicationBase {
49
50 private static final Log LOG = LogFactory.getLog(TestReplicationChangingPeerRegionservers.class);
51
52
53
54
55 @Before
56 public void setUp() throws Exception {
57 htable1.setAutoFlush(false, true);
58
59
60 for (JVMClusterUtil.RegionServerThread r :
61 utility1.getHBaseCluster().getRegionServerThreads()) {
62 r.getRegionServer().getWAL().rollWriter();
63 }
64 utility1.truncateTable(tableName);
65
66
67
68
69
70 Scan scan = new Scan();
71 int lastCount = 0;
72 for (int i = 0; i < NB_RETRIES; i++) {
73 if (i == NB_RETRIES - 1) {
74 fail("Waited too much time for truncate");
75 }
76 ResultScanner scanner = htable2.getScanner(scan);
77 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
78 scanner.close();
79 if (res.length != 0) {
80 if (res.length < lastCount) {
81 i--;
82 }
83 lastCount = res.length;
84 LOG.info("Still got " + res.length + " rows");
85 Thread.sleep(SLEEP_TIME);
86 } else {
87 break;
88 }
89 }
90 }
91
92 @Test(timeout = 300000)
93 public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException {
94
95 LOG.info("testSimplePutDelete");
96 MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster();
97
98 doPutTest(Bytes.toBytes(1));
99
100 int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0;
101 peerCluster.stopRegionServer(rsToStop);
102 peerCluster.waitOnRegionServer(rsToStop);
103
104
105 assertEquals(1, peerCluster.getRegionServerThreads().size());
106
107 doPutTest(Bytes.toBytes(2));
108
109 peerCluster.startRegionServer();
110
111
112 assertEquals(2, peerCluster.getRegionServerThreads().size());
113
114 doPutTest(Bytes.toBytes(3));
115
116 }
117
118 private void doPutTest(byte[] row) throws IOException, InterruptedException {
119 Put put = new Put(row);
120 put.add(famName, row, row);
121
122 htable1 = new HTable(conf1, tableName);
123 htable1.put(put);
124
125 Get get = new Get(row);
126 for (int i = 0; i < NB_RETRIES; i++) {
127 if (i == NB_RETRIES - 1) {
128 fail("Waited too much time for put replication");
129 }
130 Result res = htable2.get(get);
131 if (res.size() == 0) {
132 LOG.info("Row not available");
133 Thread.sleep(SLEEP_TIME);
134 } else {
135 assertArrayEquals(res.value(), row);
136 break;
137 }
138 }
139
140 }
141
142 }