1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication;
21
22 import static org.junit.Assert.assertArrayEquals;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
27
28 import java.io.IOException;
29 import java.util.List;
30 import java.util.Map;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.HBaseConfiguration;
36 import org.apache.hadoop.hbase.HBaseTestingUtility;
37 import org.apache.hadoop.hbase.HColumnDescriptor;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HTableDescriptor;
40 import org.apache.hadoop.hbase.testclassification.LargeTests;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.client.Admin;
43 import org.apache.hadoop.hbase.client.Connection;
44 import org.apache.hadoop.hbase.client.ConnectionFactory;
45 import org.apache.hadoop.hbase.client.Delete;
46 import org.apache.hadoop.hbase.client.Get;
47 import org.apache.hadoop.hbase.client.Put;
48 import org.apache.hadoop.hbase.client.Result;
49 import org.apache.hadoop.hbase.client.Table;
50 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
51 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
54 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
55 import org.junit.AfterClass;
56 import org.junit.BeforeClass;
57 import org.junit.Test;
58 import org.junit.experimental.categories.Category;
59
60 @Category(LargeTests.class)
61 public class TestPerTableCFReplication {
62
63 private static final Log LOG = LogFactory.getLog(TestPerTableCFReplication.class);
64
65 private static Configuration conf1;
66 private static Configuration conf2;
67 private static Configuration conf3;
68
69 private static HBaseTestingUtility utility1;
70 private static HBaseTestingUtility utility2;
71 private static HBaseTestingUtility utility3;
72 private static final long SLEEP_TIME = 500;
73 private static final int NB_RETRIES = 100;
74
75 private static final TableName tableName = TableName.valueOf("test");
76 private static final TableName tabAName = TableName.valueOf("TA");
77 private static final TableName tabBName = TableName.valueOf("TB");
78 private static final TableName tabCName = TableName.valueOf("TC");
79 private static final byte[] famName = Bytes.toBytes("f");
80 private static final byte[] f1Name = Bytes.toBytes("f1");
81 private static final byte[] f2Name = Bytes.toBytes("f2");
82 private static final byte[] f3Name = Bytes.toBytes("f3");
83 private static final byte[] row1 = Bytes.toBytes("row1");
84 private static final byte[] row2 = Bytes.toBytes("row2");
85 private static final byte[] noRepfamName = Bytes.toBytes("norep");
86 private static final byte[] val = Bytes.toBytes("myval");
87
88 private static HTableDescriptor table;
89 private static HTableDescriptor tabA;
90 private static HTableDescriptor tabB;
91 private static HTableDescriptor tabC;
92
93 @BeforeClass
94 public static void setUpBeforeClass() throws Exception {
95 conf1 = HBaseConfiguration.create();
96 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
97
98
99 conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
100 conf1.setInt("replication.source.size.capacity", 1024);
101 conf1.setLong("replication.source.sleepforretries", 100);
102 conf1.setInt("hbase.regionserver.maxlogs", 10);
103 conf1.setLong("hbase.master.logcleaner.ttl", 10);
104 conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
105 conf1.setBoolean("dfs.support.append", true);
106 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
107 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
108 "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
109
110 utility1 = new HBaseTestingUtility(conf1);
111 utility1.startMiniZKCluster();
112 MiniZooKeeperCluster miniZK = utility1.getZkCluster();
113 new ZooKeeperWatcher(conf1, "cluster1", null, true);
114
115 conf2 = new Configuration(conf1);
116 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
117
118 conf3 = new Configuration(conf1);
119 conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
120
121 utility2 = new HBaseTestingUtility(conf2);
122 utility2.setZkCluster(miniZK);
123 new ZooKeeperWatcher(conf2, "cluster3", null, true);
124
125 utility3 = new HBaseTestingUtility(conf3);
126 utility3.setZkCluster(miniZK);
127 new ZooKeeperWatcher(conf3, "cluster3", null, true);
128
129 table = new HTableDescriptor(tableName);
130 HColumnDescriptor fam = new HColumnDescriptor(famName);
131 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
132 table.addFamily(fam);
133 fam = new HColumnDescriptor(noRepfamName);
134 table.addFamily(fam);
135
136 tabA = new HTableDescriptor(tabAName);
137 fam = new HColumnDescriptor(f1Name);
138 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
139 tabA.addFamily(fam);
140 fam = new HColumnDescriptor(f2Name);
141 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
142 tabA.addFamily(fam);
143 fam = new HColumnDescriptor(f3Name);
144 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
145 tabA.addFamily(fam);
146
147 tabB = new HTableDescriptor(tabBName);
148 fam = new HColumnDescriptor(f1Name);
149 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
150 tabB.addFamily(fam);
151 fam = new HColumnDescriptor(f2Name);
152 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
153 tabB.addFamily(fam);
154 fam = new HColumnDescriptor(f3Name);
155 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
156 tabB.addFamily(fam);
157
158 tabC = new HTableDescriptor(tabCName);
159 fam = new HColumnDescriptor(f1Name);
160 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
161 tabC.addFamily(fam);
162 fam = new HColumnDescriptor(f2Name);
163 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
164 tabC.addFamily(fam);
165 fam = new HColumnDescriptor(f3Name);
166 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
167 tabC.addFamily(fam);
168
169 utility1.startMiniCluster();
170 utility2.startMiniCluster();
171 utility3.startMiniCluster();
172 }
173
174 @AfterClass
175 public static void tearDownAfterClass() throws Exception {
176 utility3.shutdownMiniCluster();
177 utility2.shutdownMiniCluster();
178 utility1.shutdownMiniCluster();
179 }
180
181 @Test
182 public void testParseTableCFsFromConfig() {
183 Map<TableName, List<String>> tabCFsMap = null;
184
185
186 tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(null);
187 assertEquals(null, tabCFsMap);
188
189 tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("");
190 assertEquals(null, tabCFsMap);
191
192 tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(" ");
193 assertEquals(null, tabCFsMap);
194
195 TableName tab1 = TableName.valueOf("tab1");
196 TableName tab2 = TableName.valueOf("tab2");
197 TableName tab3 = TableName.valueOf("tab3");
198
199
200 tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab1");
201 assertEquals(1, tabCFsMap.size());
202 assertTrue(tabCFsMap.containsKey(tab1));
203 assertFalse(tabCFsMap.containsKey(tab2));
204 assertEquals(null, tabCFsMap.get(tab1));
205
206 tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab2:cf1");
207 assertEquals(1, tabCFsMap.size());
208 assertTrue(tabCFsMap.containsKey(tab2));
209 assertFalse(tabCFsMap.containsKey(tab1));
210 assertEquals(1, tabCFsMap.get(tab2).size());
211 assertEquals("cf1", tabCFsMap.get(tab2).get(0));
212
213 tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab3 : cf1 , cf3");
214 assertEquals(1, tabCFsMap.size());
215 assertTrue(tabCFsMap.containsKey(tab3));
216 assertFalse(tabCFsMap.containsKey(tab1));
217 assertEquals(2, tabCFsMap.get(tab3).size());
218 assertTrue(tabCFsMap.get(tab3).contains("cf1"));
219 assertTrue(tabCFsMap.get(tab3).contains("cf3"));
220
221
222 tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
223
224 assertEquals(3, tabCFsMap.size());
225 assertTrue(tabCFsMap.containsKey(tab1));
226 assertTrue(tabCFsMap.containsKey(tab2));
227 assertTrue(tabCFsMap.containsKey(tab3));
228
229 assertEquals(null, tabCFsMap.get(tab1));
230
231 assertEquals(1, tabCFsMap.get(tab2).size());
232 assertEquals("cf1", tabCFsMap.get(tab2).get(0));
233
234 assertEquals(2, tabCFsMap.get(tab3).size());
235 assertTrue(tabCFsMap.get(tab3).contains("cf1"));
236 assertTrue(tabCFsMap.get(tab3).contains("cf3"));
237
238
239
240 tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(
241 "tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
242
243 assertEquals(3, tabCFsMap.size());
244 assertTrue(tabCFsMap.containsKey(tab1));
245 assertTrue(tabCFsMap.containsKey(tab2));
246 assertTrue(tabCFsMap.containsKey(tab3));
247
248 assertEquals(null, tabCFsMap.get(tab1));
249
250 assertEquals(1, tabCFsMap.get(tab2).size());
251 assertEquals("cf1", tabCFsMap.get(tab2).get(0));
252
253 assertEquals(2, tabCFsMap.get(tab3).size());
254 assertTrue(tabCFsMap.get(tab3).contains("cf1"));
255 assertTrue(tabCFsMap.get(tab3).contains("cf3"));
256
257
258
259 tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(
260 "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
261
262 assertEquals(1, tabCFsMap.size());
263 assertFalse(tabCFsMap.containsKey(tab1));
264 assertFalse(tabCFsMap.containsKey(tab2));
265 assertTrue(tabCFsMap.containsKey(tab3));
266
267 assertEquals(2, tabCFsMap.get(tab3).size());
268 assertTrue(tabCFsMap.get(tab3).contains("cf1"));
269 assertTrue(tabCFsMap.get(tab3).contains("cf3"));
270 }
271
272 @Test(timeout=300000)
273 public void testPerTableCFReplication() throws Exception {
274 LOG.info("testPerTableCFReplication");
275 ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf1);
276 Connection connection1 = ConnectionFactory.createConnection(conf1);
277 Connection connection2 = ConnectionFactory.createConnection(conf2);
278 Connection connection3 = ConnectionFactory.createConnection(conf3);
279 try {
280 Admin admin1 = connection1.getAdmin();
281 Admin admin2 = connection2.getAdmin();
282 Admin admin3 = connection3.getAdmin();
283
284 admin1.createTable(tabA);
285 admin1.createTable(tabB);
286 admin1.createTable(tabC);
287 admin2.createTable(tabA);
288 admin2.createTable(tabB);
289 admin2.createTable(tabC);
290 admin3.createTable(tabA);
291 admin3.createTable(tabB);
292 admin3.createTable(tabC);
293
294 Table htab1A = connection1.getTable(tabAName);
295 Table htab2A = connection2.getTable(tabAName);
296 Table htab3A = connection3.getTable(tabAName);
297
298 Table htab1B = connection1.getTable(tabBName);
299 Table htab2B = connection2.getTable(tabBName);
300 Table htab3B = connection3.getTable(tabBName);
301
302 Table htab1C = connection1.getTable(tabCName);
303 Table htab2C = connection2.getTable(tabCName);
304 Table htab3C = connection3.getTable(tabCName);
305
306
307 replicationAdmin.addPeer("2", utility2.getClusterKey(), "TC;TB:f1,f3");
308 replicationAdmin.addPeer("3", utility3.getClusterKey(), "TA;TB:f1,f2");
309
310
311 putAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
312 ensureRowNotReplicated(row1, f1Name, htab2A);
313 deleteAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
314
315 putAndWaitWithFamily(row1, f2Name, htab1A, htab3A);
316 ensureRowNotReplicated(row1, f2Name, htab2A);
317 deleteAndWaitWithFamily(row1, f2Name, htab1A, htab3A);
318
319 putAndWaitWithFamily(row1, f3Name, htab1A, htab3A);
320 ensureRowNotReplicated(row1, f3Name, htab2A);
321 deleteAndWaitWithFamily(row1, f3Name, htab1A, htab3A);
322
323
324 putAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B);
325 deleteAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B);
326
327
328 putAndWaitWithFamily(row1, f2Name, htab1B, htab3B);
329 ensureRowNotReplicated(row1, f2Name, htab2B);
330 deleteAndWaitWithFamily(row1, f2Name, htab1B, htab3B);
331
332
333 putAndWaitWithFamily(row1, f3Name, htab1B, htab2B);
334 ensureRowNotReplicated(row1, f3Name, htab3B);
335 deleteAndWaitWithFamily(row1, f3Name, htab1B, htab2B);
336
337
338 putAndWaitWithFamily(row1, f1Name, htab1C, htab2C);
339 ensureRowNotReplicated(row1, f1Name, htab3C);
340 deleteAndWaitWithFamily(row1, f1Name, htab1C, htab2C);
341
342 putAndWaitWithFamily(row1, f2Name, htab1C, htab2C);
343 ensureRowNotReplicated(row1, f2Name, htab3C);
344 deleteAndWaitWithFamily(row1, f2Name, htab1C, htab2C);
345
346 putAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
347 ensureRowNotReplicated(row1, f3Name, htab3C);
348 deleteAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
349
350
351 replicationAdmin.setPeerTableCFs("2", "TA:f1,f2; TC:f2,f3");
352 replicationAdmin.setPeerTableCFs("3", "TB; TC:f3");
353
354
355 putAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
356 ensureRowNotReplicated(row2, f1Name, htab3A);
357 deleteAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
358
359 putAndWaitWithFamily(row2, f2Name, htab1A, htab2A);
360 ensureRowNotReplicated(row2, f2Name, htab3A);
361 deleteAndWaitWithFamily(row2, f2Name, htab1A, htab2A);
362
363 putAndWaitWithFamily(row2, f3Name, htab1A);
364 ensureRowNotReplicated(row2, f3Name, htab2A, htab3A);
365 deleteAndWaitWithFamily(row2, f3Name, htab1A);
366
367
368 putAndWaitWithFamily(row2, f1Name, htab1B, htab3B);
369 ensureRowNotReplicated(row2, f1Name, htab2B);
370 deleteAndWaitWithFamily(row2, f1Name, htab1B, htab3B);
371
372 putAndWaitWithFamily(row2, f2Name, htab1B, htab3B);
373 ensureRowNotReplicated(row2, f2Name, htab2B);
374 deleteAndWaitWithFamily(row2, f2Name, htab1B, htab3B);
375
376 putAndWaitWithFamily(row2, f3Name, htab1B, htab3B);
377 ensureRowNotReplicated(row2, f3Name, htab2B);
378 deleteAndWaitWithFamily(row2, f3Name, htab1B, htab3B);
379
380
381 putAndWaitWithFamily(row2, f1Name, htab1C);
382 ensureRowNotReplicated(row2, f1Name, htab2C, htab3C);
383 deleteAndWaitWithFamily(row2, f1Name, htab1C);
384
385 putAndWaitWithFamily(row2, f2Name, htab1C, htab2C);
386 ensureRowNotReplicated(row2, f2Name, htab3C);
387 deleteAndWaitWithFamily(row2, f2Name, htab1C, htab2C);
388
389 putAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
390 deleteAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
391 } finally {
392 connection1.close();
393 connection2.close();
394 connection3.close();
395 }
396 }
397
398 private void ensureRowNotReplicated(byte[] row, byte[] fam, Table... tables) throws IOException {
399 Get get = new Get(row);
400 get.addFamily(fam);
401 for (Table table : tables) {
402 Result res = table.get(get);
403 assertEquals(0, res.size());
404 }
405 }
406
407 private void deleteAndWaitWithFamily(byte[] row, byte[] fam,
408 Table source, Table... targets)
409 throws Exception {
410 Delete del = new Delete(row);
411 del.deleteFamily(fam);
412 source.delete(del);
413
414 Get get = new Get(row);
415 get.addFamily(fam);
416 for (int i = 0; i < NB_RETRIES; i++) {
417 if (i==NB_RETRIES-1) {
418 fail("Waited too much time for del replication");
419 }
420 boolean removedFromAll = true;
421 for (Table target : targets) {
422 Result res = target.get(get);
423 if (res.size() >= 1) {
424 LOG.info("Row not deleted");
425 removedFromAll = false;
426 break;
427 }
428 }
429 if (removedFromAll) {
430 break;
431 } else {
432 Thread.sleep(SLEEP_TIME);
433 }
434 }
435 }
436
437 private void putAndWaitWithFamily(byte[] row, byte[] fam,
438 Table source, Table... targets)
439 throws Exception {
440 Put put = new Put(row);
441 put.add(fam, row, val);
442 source.put(put);
443
444 Get get = new Get(row);
445 get.addFamily(fam);
446 for (int i = 0; i < NB_RETRIES; i++) {
447 if (i==NB_RETRIES-1) {
448 fail("Waited too much time for put replication");
449 }
450 boolean replicatedToAll = true;
451 for (Table target : targets) {
452 Result res = target.get(get);
453 if (res.size() == 0) {
454 LOG.info("Row not available");
455 replicatedToAll = false;
456 break;
457 } else {
458 assertEquals(res.size(), 1);
459 assertArrayEquals(res.value(), val);
460 }
461 }
462 if (replicatedToAll) {
463 break;
464 } else {
465 Thread.sleep(SLEEP_TIME);
466 }
467 }
468 }
469 }