1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication;
21
22 import static org.junit.Assert.assertArrayEquals;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.fail;
25
26 import java.io.IOException;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.*;
32 import org.apache.hadoop.hbase.client.Delete;
33 import org.apache.hadoop.hbase.client.Get;
34 import org.apache.hadoop.hbase.client.HBaseAdmin;
35 import org.apache.hadoop.hbase.client.HTable;
36 import org.apache.hadoop.hbase.client.Put;
37 import org.apache.hadoop.hbase.client.Result;
38 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
39 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
42 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
43 import org.junit.BeforeClass;
44 import org.junit.Test;
45 import org.junit.experimental.categories.Category;
46
47 @Category(LargeTests.class)
48 public class TestMultiSlaveReplication {
49
50 private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
51
52 private static Configuration conf1;
53 private static Configuration conf2;
54 private static Configuration conf3;
55
56 private static HBaseTestingUtility utility1;
57 private static HBaseTestingUtility utility2;
58 private static HBaseTestingUtility utility3;
59 private static final long SLEEP_TIME = 500;
60 private static final int NB_RETRIES = 100;
61
62 private static final byte[] tableName = Bytes.toBytes("test");
63 private static final byte[] famName = Bytes.toBytes("f");
64 private static final byte[] row = Bytes.toBytes("row");
65 private static final byte[] row1 = Bytes.toBytes("row1");
66 private static final byte[] row2 = Bytes.toBytes("row2");
67 private static final byte[] row3 = Bytes.toBytes("row3");
68 private static final byte[] noRepfamName = Bytes.toBytes("norep");
69
70 private static HTableDescriptor table;
71
72 @BeforeClass
73 public static void setUpBeforeClass() throws Exception {
74 conf1 = HBaseConfiguration.create();
75 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
76
77
78 conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
79 conf1.setInt("replication.source.size.capacity", 1024);
80 conf1.setLong("replication.source.sleepforretries", 100);
81 conf1.setInt("hbase.regionserver.maxlogs", 10);
82 conf1.setLong("hbase.master.logcleaner.ttl", 10);
83 conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
84 conf1.setBoolean("dfs.support.append", true);
85 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
86 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
87 "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
88
89 utility1 = new HBaseTestingUtility(conf1);
90 utility1.startMiniZKCluster();
91 MiniZooKeeperCluster miniZK = utility1.getZkCluster();
92 new ZooKeeperWatcher(conf1, "cluster1", null, true);
93
94 conf2 = new Configuration(conf1);
95 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
96
97 conf3 = new Configuration(conf1);
98 conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
99
100 utility2 = new HBaseTestingUtility(conf2);
101 utility2.setZkCluster(miniZK);
102 new ZooKeeperWatcher(conf2, "cluster3", null, true);
103
104 utility3 = new HBaseTestingUtility(conf3);
105 utility3.setZkCluster(miniZK);
106 new ZooKeeperWatcher(conf3, "cluster3", null, true);
107
108 table = new HTableDescriptor(TableName.valueOf(tableName));
109 HColumnDescriptor fam = new HColumnDescriptor(famName);
110 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
111 table.addFamily(fam);
112 fam = new HColumnDescriptor(noRepfamName);
113 table.addFamily(fam);
114 }
115
116 @Test(timeout=300000)
117 public void testMultiSlaveReplication() throws Exception {
118 LOG.info("testCyclicReplication");
119 MiniHBaseCluster master = utility1.startMiniCluster();
120 utility2.startMiniCluster();
121 utility3.startMiniCluster();
122 ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
123
124 new HBaseAdmin(conf1).createTable(table);
125 new HBaseAdmin(conf2).createTable(table);
126 new HBaseAdmin(conf3).createTable(table);
127 HTable htable1 = new HTable(conf1, tableName);
128 htable1.setWriteBufferSize(1024);
129 HTable htable2 = new HTable(conf2, tableName);
130 htable2.setWriteBufferSize(1024);
131 HTable htable3 = new HTable(conf3, tableName);
132 htable3.setWriteBufferSize(1024);
133
134 admin1.addPeer("1", utility2.getClusterKey());
135
136
137 putAndWait(row, famName, htable1, htable2);
138 deleteAndWait(row, htable1, htable2);
139
140 checkRow(row,0,htable3);
141
142 putAndWait(row2, famName, htable1, htable2);
143
144
145 new HBaseAdmin(conf1).rollHLogWriter(master.getRegionServer(0).getServerName().toString());
146
147 putAndWait(row3, famName, htable1, htable2);
148
149 admin1.addPeer("2", utility3.getClusterKey());
150
151
152 putAndWait(row1, famName, htable1, htable2, htable3);
153
154 deleteAndWait(row1, htable1, htable2, htable3);
155
156
157
158 checkRow(row2,0,htable3);
159
160
161
162 checkRow(row3,1,htable3);
163
164 Put p = new Put(row);
165 p.add(famName, row, row);
166 htable1.put(p);
167
168 new HBaseAdmin(conf1).rollHLogWriter(master.getRegionServer(0)
169 .getServerName().toString());
170
171
172
173 deleteAndWait(row2, htable1, htable2, htable3);
174
175
176 checkRow(row, 1, htable2);
177
178
179 checkWithWait(row, 1, htable3);
180
181
182 deleteAndWait(row, htable1, htable2, htable3);
183 deleteAndWait(row3, htable1, htable2, htable3);
184
185 utility3.shutdownMiniCluster();
186 utility2.shutdownMiniCluster();
187 utility1.shutdownMiniCluster();
188 }
189
190 private void checkWithWait(byte[] row, int count, HTable table) throws Exception {
191 Get get = new Get(row);
192 for (int i = 0; i < NB_RETRIES; i++) {
193 if (i == NB_RETRIES - 1) {
194 fail("Waited too much time while getting the row.");
195 }
196 boolean rowReplicated = false;
197 Result res = table.get(get);
198 if (res.size() >= 1) {
199 LOG.info("Row is replicated");
200 rowReplicated = true;
201 assertEquals(count, res.size());
202 break;
203 }
204 if (rowReplicated) {
205 break;
206 } else {
207 Thread.sleep(SLEEP_TIME);
208 }
209 }
210 }
211
212 private void checkRow(byte[] row, int count, HTable... tables) throws IOException {
213 Get get = new Get(row);
214 for (HTable table : tables) {
215 Result res = table.get(get);
216 assertEquals(count, res.size());
217 }
218 }
219
220 private void deleteAndWait(byte[] row, HTable source, HTable... targets)
221 throws Exception {
222 Delete del = new Delete(row);
223 source.delete(del);
224
225 Get get = new Get(row);
226 for (int i = 0; i < NB_RETRIES; i++) {
227 if (i==NB_RETRIES-1) {
228 fail("Waited too much time for del replication");
229 }
230 boolean removedFromAll = true;
231 for (HTable target : targets) {
232 Result res = target.get(get);
233 if (res.size() >= 1) {
234 LOG.info("Row not deleted");
235 removedFromAll = false;
236 break;
237 }
238 }
239 if (removedFromAll) {
240 break;
241 } else {
242 Thread.sleep(SLEEP_TIME);
243 }
244 }
245 }
246
247 private void putAndWait(byte[] row, byte[] fam, HTable source, HTable... targets)
248 throws Exception {
249 Put put = new Put(row);
250 put.add(fam, row, row);
251 source.put(put);
252
253 Get get = new Get(row);
254 for (int i = 0; i < NB_RETRIES; i++) {
255 if (i==NB_RETRIES-1) {
256 fail("Waited too much time for put replication");
257 }
258 boolean replicatedToAll = true;
259 for (HTable target : targets) {
260 Result res = target.get(get);
261 if (res.size() == 0) {
262 LOG.info("Row not available");
263 replicatedToAll = false;
264 break;
265 } else {
266 assertArrayEquals(res.value(), row);
267 }
268 }
269 if (replicatedToAll) {
270 break;
271 } else {
272 Thread.sleep(SLEEP_TIME);
273 }
274 }
275 }
276
277 }
278