1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.replication;
19
20 import static org.junit.Assert.assertEquals;
21
22 import java.util.ArrayList;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.*;
28 import org.apache.hadoop.hbase.client.Admin;
29 import org.apache.hadoop.hbase.client.Delete;
30 import org.apache.hadoop.hbase.client.HBaseAdmin;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.client.Put;
33 import org.apache.hadoop.hbase.client.Table;
34 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
35 import org.apache.hadoop.hbase.testclassification.LargeTests;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
38 import org.junit.Before;
39 import org.junit.Test;
40 import org.junit.experimental.categories.Category;
41
42 @Category(LargeTests.class)
43 public class TestReplicationSyncUpTool extends TestReplicationBase {
44
45 private static final Log LOG = LogFactory.getLog(TestReplicationSyncUpTool.class);
46
47 private static final TableName t1_su = TableName.valueOf("t1_syncup");
48 private static final TableName t2_su = TableName.valueOf("t2_syncup");
49
50 private static final byte[] famName = Bytes.toBytes("cf1");
51 private static final byte[] qualName = Bytes.toBytes("q1");
52
53 private static final byte[] noRepfamName = Bytes.toBytes("norep");
54
55 private HTableDescriptor t1_syncupSource, t1_syncupTarget;
56 private HTableDescriptor t2_syncupSource, t2_syncupTarget;
57
58 private Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
59
60 @Before
61 public void setUp() throws Exception {
62
63 HColumnDescriptor fam;
64
65 t1_syncupSource = new HTableDescriptor(t1_su);
66 fam = new HColumnDescriptor(famName);
67 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
68 t1_syncupSource.addFamily(fam);
69 fam = new HColumnDescriptor(noRepfamName);
70 t1_syncupSource.addFamily(fam);
71
72 t1_syncupTarget = new HTableDescriptor(t1_su);
73 fam = new HColumnDescriptor(famName);
74 t1_syncupTarget.addFamily(fam);
75 fam = new HColumnDescriptor(noRepfamName);
76 t1_syncupTarget.addFamily(fam);
77
78 t2_syncupSource = new HTableDescriptor(t2_su);
79 fam = new HColumnDescriptor(famName);
80 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
81 t2_syncupSource.addFamily(fam);
82 fam = new HColumnDescriptor(noRepfamName);
83 t2_syncupSource.addFamily(fam);
84
85 t2_syncupTarget = new HTableDescriptor(t2_su);
86 fam = new HColumnDescriptor(famName);
87 t2_syncupTarget.addFamily(fam);
88 fam = new HColumnDescriptor(noRepfamName);
89 t2_syncupTarget.addFamily(fam);
90
91 }
92
93
94
95
96
97
98 @Test(timeout = 300000)
99 public void testSyncUpTool() throws Exception {
100
101
102
103
104
105
106
107
108 setupReplication();
109
110
111
112
113
114
115
116
117 putAndReplicateRows();
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143 mimicSyncUpAfterDelete();
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173 mimicSyncUpAfterPut();
174
175 }
176
177 private void setupReplication() throws Exception {
178 ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
179 ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
180
181 Admin ha = new HBaseAdmin(conf1);
182 ha.createTable(t1_syncupSource);
183 ha.createTable(t2_syncupSource);
184 ha.close();
185
186 ha = new HBaseAdmin(conf2);
187 ha.createTable(t1_syncupTarget);
188 ha.createTable(t2_syncupTarget);
189 ha.close();
190
191
192 ht1Source = new HTable(conf1, t1_su);
193 ht1Source.setWriteBufferSize(1024);
194 ht2Source = new HTable(conf1, t2_su);
195 ht1Source.setWriteBufferSize(1024);
196
197
198 ht1TargetAtPeer1 = new HTable(conf2, t1_su);
199 ht1TargetAtPeer1.setWriteBufferSize(1024);
200 ht2TargetAtPeer1 = new HTable(conf2, t2_su);
201 ht2TargetAtPeer1.setWriteBufferSize(1024);
202
203
204
205
206 admin1.addPeer("1", utility2.getClusterKey());
207
208 admin1.close();
209 admin2.close();
210 }
211
212 private void putAndReplicateRows() throws Exception {
213 LOG.debug("putAndReplicateRows");
214
215 Put p;
216
217
218 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
219 p = new Put(Bytes.toBytes("row" + i));
220 p.add(famName, qualName, Bytes.toBytes("val" + i));
221 ht1Source.put(p);
222 }
223 p = new Put(Bytes.toBytes("row" + 9999));
224 p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
225 ht1Source.put(p);
226
227
228 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
229 p = new Put(Bytes.toBytes("row" + i));
230 p.add(famName, qualName, Bytes.toBytes("val" + i));
231 ht2Source.put(p);
232 }
233 p = new Put(Bytes.toBytes("row" + 9999));
234 p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
235 ht2Source.put(p);
236
237
238 Thread.sleep(SLEEP_TIME);
239 int rowCount_ht1Source = utility1.countRows(ht1Source);
240 for (int i = 0; i < NB_RETRIES; i++) {
241 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
242 if (i==NB_RETRIES-1) {
243 assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source - 1,
244 rowCount_ht1TargetAtPeer1);
245 }
246 if (rowCount_ht1Source - 1 == rowCount_ht1TargetAtPeer1) {
247 break;
248 }
249 Thread.sleep(SLEEP_TIME);
250 }
251
252 int rowCount_ht2Source = utility1.countRows(ht2Source);
253 for (int i = 0; i < NB_RETRIES; i++) {
254 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
255 if (i==NB_RETRIES-1) {
256 assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source - 1,
257 rowCount_ht2TargetAtPeer1);
258 }
259 if (rowCount_ht2Source - 1 == rowCount_ht2TargetAtPeer1) {
260 break;
261 }
262 Thread.sleep(SLEEP_TIME);
263 }
264 }
265
266 private void mimicSyncUpAfterDelete() throws Exception {
267 LOG.debug("mimicSyncUpAfterDelete");
268 utility2.shutdownMiniHBaseCluster();
269
270 List<Delete> list = new ArrayList<Delete>();
271
272 for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) {
273 String rowKey = "row" + i;
274 Delete del = new Delete(rowKey.getBytes());
275 list.add(del);
276 }
277 ht1Source.delete(list);
278
279 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
280 String rowKey = "row" + i;
281 Delete del = new Delete(rowKey.getBytes());
282 list.add(del);
283 }
284 ht2Source.delete(list);
285
286 int rowCount_ht1Source = utility1.countRows(ht1Source);
287 assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51,
288 rowCount_ht1Source);
289
290 int rowCount_ht2Source = utility1.countRows(ht2Source);
291 assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam",
292 101, rowCount_ht2Source);
293
294 utility1.shutdownMiniHBaseCluster();
295 utility2.restartHBaseCluster(1);
296
297 Thread.sleep(SLEEP_TIME);
298
299
300 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
301 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
302 assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
303 assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
304
305
306 for (int i = 0; i < NB_RETRIES; i++) {
307 syncUp(utility1);
308 rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
309 rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
310 if (i == NB_RETRIES - 1) {
311 if (rowCount_ht1TargetAtPeer1 != 50 || rowCount_ht2TargetAtPeer1 != 100) {
312
313 utility1.restartHBaseCluster(1);
314 rowCount_ht1Source = utility1.countRows(ht1Source);
315 LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source);
316 rowCount_ht2Source = utility1.countRows(ht2Source);
317 LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source);
318 }
319 assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
320 rowCount_ht1TargetAtPeer1);
321 assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
322 rowCount_ht2TargetAtPeer1);
323 }
324 if (rowCount_ht1TargetAtPeer1 == 50 && rowCount_ht2TargetAtPeer1 == 100) {
325 LOG.info("SyncUpAfterDelete succeeded at retry = " + i);
326 break;
327 } else {
328 LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
329 + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
330 + rowCount_ht2TargetAtPeer1);
331 }
332 Thread.sleep(SLEEP_TIME);
333 }
334 }
335
336 private void mimicSyncUpAfterPut() throws Exception {
337 LOG.debug("mimicSyncUpAfterPut");
338 utility1.restartHBaseCluster(1);
339 utility2.shutdownMiniHBaseCluster();
340
341 Put p;
342
343
344 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
345 p = new Put(Bytes.toBytes("row" + i));
346 p.add(famName, qualName, Bytes.toBytes("val" + i));
347 ht1Source.put(p);
348 }
349 p = new Put(Bytes.toBytes("row" + 9998));
350 p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
351 ht1Source.put(p);
352
353
354
355 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
356 p = new Put(Bytes.toBytes("row" + i));
357 p.add(famName, qualName, Bytes.toBytes("val" + i));
358 ht2Source.put(p);
359 }
360 p = new Put(Bytes.toBytes("row" + 9998));
361 p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
362 ht2Source.put(p);
363
364 int rowCount_ht1Source = utility1.countRows(ht1Source);
365 assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source);
366 int rowCount_ht2Source = utility1.countRows(ht2Source);
367 assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
368
369 utility1.shutdownMiniHBaseCluster();
370 utility2.restartHBaseCluster(1);
371
372 Thread.sleep(SLEEP_TIME);
373
374
375 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
376 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
377 assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50,
378 rowCount_ht1TargetAtPeer1);
379 assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
380 rowCount_ht2TargetAtPeer1);
381
382
383 for (int i = 0; i < NB_RETRIES; i++) {
384 syncUp(utility1);
385 rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
386 rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
387 if (i == NB_RETRIES - 1) {
388 if (rowCount_ht1TargetAtPeer1 != 100 || rowCount_ht2TargetAtPeer1 != 200) {
389
390 utility1.restartHBaseCluster(1);
391 rowCount_ht1Source = utility1.countRows(ht1Source);
392 LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
393 rowCount_ht2Source = utility1.countRows(ht2Source);
394 LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source);
395 }
396 assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
397 rowCount_ht1TargetAtPeer1);
398 assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
399 rowCount_ht2TargetAtPeer1);
400 }
401 if (rowCount_ht1TargetAtPeer1 == 100 && rowCount_ht2TargetAtPeer1 == 200) {
402 LOG.info("SyncUpAfterPut succeeded at retry = " + i);
403 break;
404 } else {
405 LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
406 + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
407 + rowCount_ht2TargetAtPeer1);
408 }
409 Thread.sleep(SLEEP_TIME);
410 }
411 }
412
413 private void syncUp(HBaseTestingUtility ut) throws Exception {
414 ReplicationSyncUp.setConfigure(ut.getConfiguration());
415 String[] arguments = new String[] { null };
416 new ReplicationSyncUp().run(arguments);
417 }
418
419 }