1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication;
21
22 import static org.junit.Assert.assertArrayEquals;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
27
28 import java.io.IOException;
29 import java.util.List;
30 import java.util.Map;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.*;
36 import org.apache.hadoop.hbase.client.Delete;
37 import org.apache.hadoop.hbase.client.Get;
38 import org.apache.hadoop.hbase.client.HBaseAdmin;
39 import org.apache.hadoop.hbase.client.HTable;
40 import org.apache.hadoop.hbase.client.Put;
41 import org.apache.hadoop.hbase.client.Result;
42 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
43 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
46 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
47 import org.junit.AfterClass;
48 import org.junit.BeforeClass;
49 import org.junit.Test;
50 import org.junit.experimental.categories.Category;
51
52 @Category(LargeTests.class)
53 public class TestPerTableCFReplication {
54
55 private static final Log LOG = LogFactory.getLog(TestPerTableCFReplication.class);
56
57 private static Configuration conf1;
58 private static Configuration conf2;
59 private static Configuration conf3;
60
61 private static HBaseTestingUtility utility1;
62 private static HBaseTestingUtility utility2;
63 private static HBaseTestingUtility utility3;
64 private static final long SLEEP_TIME = 500;
65 private static final int NB_RETRIES = 100;
66
67 private static final byte[] tableName = Bytes.toBytes("test");
68 private static final byte[] tabAName = Bytes.toBytes("TA");
69 private static final byte[] tabBName = Bytes.toBytes("TB");
70 private static final byte[] tabCName = Bytes.toBytes("TC");
71 private static final byte[] famName = Bytes.toBytes("f");
72 private static final byte[] f1Name = Bytes.toBytes("f1");
73 private static final byte[] f2Name = Bytes.toBytes("f2");
74 private static final byte[] f3Name = Bytes.toBytes("f3");
75 private static final byte[] row1 = Bytes.toBytes("row1");
76 private static final byte[] row2 = Bytes.toBytes("row2");
77 private static final byte[] noRepfamName = Bytes.toBytes("norep");
78 private static final byte[] val = Bytes.toBytes("myval");
79
80 private static HTableDescriptor table;
81 private static HTableDescriptor tabA;
82 private static HTableDescriptor tabB;
83 private static HTableDescriptor tabC;
84
85 @BeforeClass
86 public static void setUpBeforeClass() throws Exception {
87 conf1 = HBaseConfiguration.create();
88 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
89
90
91 conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
92 conf1.setInt("replication.source.size.capacity", 1024);
93 conf1.setLong("replication.source.sleepforretries", 100);
94 conf1.setInt("hbase.regionserver.maxlogs", 10);
95 conf1.setLong("hbase.master.logcleaner.ttl", 10);
96 conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
97 conf1.setBoolean("dfs.support.append", true);
98 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
99 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
100 "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
101
102 utility1 = new HBaseTestingUtility(conf1);
103 utility1.startMiniZKCluster();
104 MiniZooKeeperCluster miniZK = utility1.getZkCluster();
105 new ZooKeeperWatcher(conf1, "cluster1", null, true);
106
107 conf2 = new Configuration(conf1);
108 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
109
110 conf3 = new Configuration(conf1);
111 conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
112
113 utility2 = new HBaseTestingUtility(conf2);
114 utility2.setZkCluster(miniZK);
115 new ZooKeeperWatcher(conf2, "cluster3", null, true);
116
117 utility3 = new HBaseTestingUtility(conf3);
118 utility3.setZkCluster(miniZK);
119 new ZooKeeperWatcher(conf3, "cluster3", null, true);
120
121 table = new HTableDescriptor(TableName.valueOf(tableName));
122 HColumnDescriptor fam = new HColumnDescriptor(famName);
123 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
124 table.addFamily(fam);
125 fam = new HColumnDescriptor(noRepfamName);
126 table.addFamily(fam);
127
128 tabA = new HTableDescriptor(tabAName);
129 fam = new HColumnDescriptor(f1Name);
130 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
131 tabA.addFamily(fam);
132 fam = new HColumnDescriptor(f2Name);
133 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
134 tabA.addFamily(fam);
135 fam = new HColumnDescriptor(f3Name);
136 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
137 tabA.addFamily(fam);
138
139 tabB = new HTableDescriptor(tabBName);
140 fam = new HColumnDescriptor(f1Name);
141 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
142 tabB.addFamily(fam);
143 fam = new HColumnDescriptor(f2Name);
144 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
145 tabB.addFamily(fam);
146 fam = new HColumnDescriptor(f3Name);
147 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
148 tabB.addFamily(fam);
149
150 tabC = new HTableDescriptor(tabCName);
151 fam = new HColumnDescriptor(f1Name);
152 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
153 tabC.addFamily(fam);
154 fam = new HColumnDescriptor(f2Name);
155 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
156 tabC.addFamily(fam);
157 fam = new HColumnDescriptor(f3Name);
158 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
159 tabC.addFamily(fam);
160
161 utility1.startMiniCluster();
162 utility2.startMiniCluster();
163 utility3.startMiniCluster();
164 }
165
166 @AfterClass
167 public static void tearDownAfterClass() throws Exception {
168 utility3.shutdownMiniCluster();
169 utility2.shutdownMiniCluster();
170 utility1.shutdownMiniCluster();
171 }
172
173 @Test
174 public void testParseTableCFsFromConfig() {
175 Map<String, List<String>> tabCFsMap = null;
176
177
178 tabCFsMap = ReplicationPeer.parseTableCFsFromConfig(null);
179 assertEquals(null, tabCFsMap);
180
181 tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("");
182 assertEquals(null, tabCFsMap);
183
184 tabCFsMap = ReplicationPeer.parseTableCFsFromConfig(" ");
185 assertEquals(null, tabCFsMap);
186
187
188 tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1");
189 assertEquals(1, tabCFsMap.size());
190 assertTrue(tabCFsMap.containsKey("tab1"));
191 assertFalse(tabCFsMap.containsKey("tab2"));
192 assertEquals(null, tabCFsMap.get("tab1"));
193
194 tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab2:cf1");
195 assertEquals(1, tabCFsMap.size());
196 assertTrue(tabCFsMap.containsKey("tab2"));
197 assertFalse(tabCFsMap.containsKey("tab1"));
198 assertEquals(1, tabCFsMap.get("tab2").size());
199 assertEquals("cf1", tabCFsMap.get("tab2").get(0));
200
201 tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab3 : cf1 , cf3");
202 assertEquals(1, tabCFsMap.size());
203 assertTrue(tabCFsMap.containsKey("tab3"));
204 assertFalse(tabCFsMap.containsKey("tab1"));
205 assertEquals(2, tabCFsMap.get("tab3").size());
206 assertTrue(tabCFsMap.get("tab3").contains("cf1"));
207 assertTrue(tabCFsMap.get("tab3").contains("cf3"));
208
209
210 tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
211
212 assertEquals(3, tabCFsMap.size());
213 assertTrue(tabCFsMap.containsKey("tab1"));
214 assertTrue(tabCFsMap.containsKey("tab2"));
215 assertTrue(tabCFsMap.containsKey("tab3"));
216
217 assertEquals(null, tabCFsMap.get("tab1"));
218
219 assertEquals(1, tabCFsMap.get("tab2").size());
220 assertEquals("cf1", tabCFsMap.get("tab2").get(0));
221
222 assertEquals(2, tabCFsMap.get("tab3").size());
223 assertTrue(tabCFsMap.get("tab3").contains("cf1"));
224 assertTrue(tabCFsMap.get("tab3").contains("cf3"));
225
226
227
228 tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
229
230 assertEquals(3, tabCFsMap.size());
231 assertTrue(tabCFsMap.containsKey("tab1"));
232 assertTrue(tabCFsMap.containsKey("tab2"));
233 assertTrue(tabCFsMap.containsKey("tab3"));
234
235 assertEquals(null, tabCFsMap.get("tab1"));
236
237 assertEquals(1, tabCFsMap.get("tab2").size());
238 assertEquals("cf1", tabCFsMap.get("tab2").get(0));
239
240 assertEquals(2, tabCFsMap.get("tab3").size());
241 assertTrue(tabCFsMap.get("tab3").contains("cf1"));
242 assertTrue(tabCFsMap.get("tab3").contains("cf3"));
243
244
245
246 tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
247
248 assertEquals(1, tabCFsMap.size());
249 assertFalse(tabCFsMap.containsKey("tab1"));
250 assertFalse(tabCFsMap.containsKey("tab2"));
251 assertTrue(tabCFsMap.containsKey("tab3"));
252
253 assertEquals(2, tabCFsMap.get("tab3").size());
254 assertTrue(tabCFsMap.get("tab3").contains("cf1"));
255 assertTrue(tabCFsMap.get("tab3").contains("cf3"));
256 }
257
258 @Test(timeout=300000)
259 public void testPerTableCFReplication() throws Exception {
260 LOG.info("testPerTableCFReplication");
261 ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
262
263 new HBaseAdmin(conf1).createTable(tabA);
264 new HBaseAdmin(conf1).createTable(tabB);
265 new HBaseAdmin(conf1).createTable(tabC);
266 new HBaseAdmin(conf2).createTable(tabA);
267 new HBaseAdmin(conf2).createTable(tabB);
268 new HBaseAdmin(conf2).createTable(tabC);
269 new HBaseAdmin(conf3).createTable(tabA);
270 new HBaseAdmin(conf3).createTable(tabB);
271 new HBaseAdmin(conf3).createTable(tabC);
272
273 HTable htab1A = new HTable(conf1, tabAName);
274 HTable htab2A = new HTable(conf2, tabAName);
275 HTable htab3A = new HTable(conf3, tabAName);
276
277 HTable htab1B = new HTable(conf1, tabBName);
278 HTable htab2B = new HTable(conf2, tabBName);
279 HTable htab3B = new HTable(conf3, tabBName);
280
281 HTable htab1C = new HTable(conf1, tabCName);
282 HTable htab2C = new HTable(conf2, tabCName);
283 HTable htab3C = new HTable(conf3, tabCName);
284
285
286 admin1.addPeer("2", utility2.getClusterKey(), "TC;TB:f1,f3");
287 admin1.addPeer("3", utility3.getClusterKey(), "TA;TB:f1,f2");
288
289
290 putAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
291 ensureRowNotReplicated(row1, f1Name, htab2A);
292 deleteAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
293
294 putAndWaitWithFamily(row1, f2Name, htab1A, htab3A);
295 ensureRowNotReplicated(row1, f2Name, htab2A);
296 deleteAndWaitWithFamily(row1, f2Name, htab1A, htab3A);
297
298 putAndWaitWithFamily(row1, f3Name, htab1A, htab3A);
299 ensureRowNotReplicated(row1, f3Name, htab2A);
300 deleteAndWaitWithFamily(row1, f3Name, htab1A, htab3A);
301
302
303 putAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B);
304 deleteAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B);
305
306
307 putAndWaitWithFamily(row1, f2Name, htab1B, htab3B);
308 ensureRowNotReplicated(row1, f2Name, htab2B);
309 deleteAndWaitWithFamily(row1, f2Name, htab1B, htab3B);
310
311
312 putAndWaitWithFamily(row1, f3Name, htab1B, htab2B);
313 ensureRowNotReplicated(row1, f3Name, htab3B);
314 deleteAndWaitWithFamily(row1, f3Name, htab1B, htab2B);
315
316
317 putAndWaitWithFamily(row1, f1Name, htab1C, htab2C);
318 ensureRowNotReplicated(row1, f1Name, htab3C);
319 deleteAndWaitWithFamily(row1, f1Name, htab1C, htab2C);
320
321 putAndWaitWithFamily(row1, f2Name, htab1C, htab2C);
322 ensureRowNotReplicated(row1, f2Name, htab3C);
323 deleteAndWaitWithFamily(row1, f2Name, htab1C, htab2C);
324
325 putAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
326 ensureRowNotReplicated(row1, f3Name, htab3C);
327 deleteAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
328
329
330 admin1.setPeerTableCFs("2", "TA:f1,f2; TC:f2,f3");
331 admin1.setPeerTableCFs("3", "TB; TC:f3");
332
333
334 putAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
335 ensureRowNotReplicated(row2, f1Name, htab3A);
336 deleteAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
337
338 putAndWaitWithFamily(row2, f2Name, htab1A, htab2A);
339 ensureRowNotReplicated(row2, f2Name, htab3A);
340 deleteAndWaitWithFamily(row2, f2Name, htab1A, htab2A);
341
342 putAndWaitWithFamily(row2, f3Name, htab1A);
343 ensureRowNotReplicated(row2, f3Name, htab2A, htab3A);
344 deleteAndWaitWithFamily(row2, f3Name, htab1A);
345
346
347 putAndWaitWithFamily(row2, f1Name, htab1B, htab3B);
348 ensureRowNotReplicated(row2, f1Name, htab2B);
349 deleteAndWaitWithFamily(row2, f1Name, htab1B, htab3B);
350
351 putAndWaitWithFamily(row2, f2Name, htab1B, htab3B);
352 ensureRowNotReplicated(row2, f2Name, htab2B);
353 deleteAndWaitWithFamily(row2, f2Name, htab1B, htab3B);
354
355 putAndWaitWithFamily(row2, f3Name, htab1B, htab3B);
356 ensureRowNotReplicated(row2, f3Name, htab2B);
357 deleteAndWaitWithFamily(row2, f3Name, htab1B, htab3B);
358
359
360 putAndWaitWithFamily(row2, f1Name, htab1C);
361 ensureRowNotReplicated(row2, f1Name, htab2C, htab3C);
362 deleteAndWaitWithFamily(row2, f1Name, htab1C);
363
364 putAndWaitWithFamily(row2, f2Name, htab1C, htab2C);
365 ensureRowNotReplicated(row2, f2Name, htab3C);
366 deleteAndWaitWithFamily(row2, f2Name, htab1C, htab2C);
367
368 putAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
369 deleteAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
370 }
371
372 private void ensureRowNotReplicated(byte[] row, byte[] fam, HTable... tables) throws IOException {
373 Get get = new Get(row);
374 get.addFamily(fam);
375 for (HTable table : tables) {
376 Result res = table.get(get);
377 assertEquals(0, res.size());
378 }
379 }
380
381 private void deleteAndWaitWithFamily(byte[] row, byte[] fam,
382 HTable source, HTable... targets)
383 throws Exception {
384 Delete del = new Delete(row);
385 del.deleteFamily(fam);
386 source.delete(del);
387
388 Get get = new Get(row);
389 get.addFamily(fam);
390 for (int i = 0; i < NB_RETRIES; i++) {
391 if (i==NB_RETRIES-1) {
392 fail("Waited too much time for del replication");
393 }
394 boolean removedFromAll = true;
395 for (HTable target : targets) {
396 Result res = target.get(get);
397 if (res.size() >= 1) {
398 LOG.info("Row not deleted");
399 removedFromAll = false;
400 break;
401 }
402 }
403 if (removedFromAll) {
404 break;
405 } else {
406 Thread.sleep(SLEEP_TIME);
407 }
408 }
409 }
410
411 private void putAndWaitWithFamily(byte[] row, byte[] fam,
412 HTable source, HTable... targets)
413 throws Exception {
414 Put put = new Put(row);
415 put.add(fam, row, val);
416 source.put(put);
417
418 Get get = new Get(row);
419 get.addFamily(fam);
420 for (int i = 0; i < NB_RETRIES; i++) {
421 if (i==NB_RETRIES-1) {
422 fail("Waited too much time for put replication");
423 }
424 boolean replicatedToAll = true;
425 for (HTable target : targets) {
426 Result res = target.get(get);
427 if (res.size() == 0) {
428 LOG.info("Row not available");
429 replicatedToAll = false;
430 break;
431 } else {
432 assertEquals(res.size(), 1);
433 assertArrayEquals(res.value(), val);
434 }
435 }
436 if (replicatedToAll) {
437 break;
438 } else {
439 Thread.sleep(SLEEP_TIME);
440 }
441 }
442 }
443 }