1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.replication;
19
20 import static org.junit.Assert.assertEquals;
21
22 import java.util.ArrayList;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.*;
28 import org.apache.hadoop.hbase.client.Delete;
29 import org.apache.hadoop.hbase.client.HBaseAdmin;
30 import org.apache.hadoop.hbase.client.HTable;
31 import org.apache.hadoop.hbase.client.Put;
32 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.junit.experimental.categories.Category;
38
39 @Category(LargeTests.class)
40 public class TestReplicationSyncUpTool extends TestReplicationBase {
41
42 private static final Log LOG = LogFactory.getLog(TestReplicationSyncUpTool.class);
43
44 private static final byte[] t1_su = Bytes.toBytes("t1_syncup");
45 private static final byte[] t2_su = Bytes.toBytes("t2_syncup");
46
47 private static final byte[] famName = Bytes.toBytes("cf1");
48 private static final byte[] qualName = Bytes.toBytes("q1");
49
50 private static final byte[] noRepfamName = Bytes.toBytes("norep");
51
52 private HTableDescriptor t1_syncupSource, t1_syncupTarget;
53 private HTableDescriptor t2_syncupSource, t2_syncupTarget;
54
55 private HTable ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
56
57 @Before
58 public void setUp() throws Exception {
59
60 HColumnDescriptor fam;
61
62 t1_syncupSource = new HTableDescriptor(TableName.valueOf(t1_su));
63 fam = new HColumnDescriptor(famName);
64 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
65 t1_syncupSource.addFamily(fam);
66 fam = new HColumnDescriptor(noRepfamName);
67 t1_syncupSource.addFamily(fam);
68
69 t1_syncupTarget = new HTableDescriptor(TableName.valueOf(t1_su));
70 fam = new HColumnDescriptor(famName);
71 t1_syncupTarget.addFamily(fam);
72 fam = new HColumnDescriptor(noRepfamName);
73 t1_syncupTarget.addFamily(fam);
74
75 t2_syncupSource = new HTableDescriptor(TableName.valueOf(t2_su));
76 fam = new HColumnDescriptor(famName);
77 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
78 t2_syncupSource.addFamily(fam);
79 fam = new HColumnDescriptor(noRepfamName);
80 t2_syncupSource.addFamily(fam);
81
82 t2_syncupTarget = new HTableDescriptor(TableName.valueOf(t2_su));
83 fam = new HColumnDescriptor(famName);
84 t2_syncupTarget.addFamily(fam);
85 fam = new HColumnDescriptor(noRepfamName);
86 t2_syncupTarget.addFamily(fam);
87
88 }
89
90
91
92
93
94
95 @Test(timeout = 300000)
96 public void testSyncUpTool() throws Exception {
97
98
99
100
101
102
103
104
105 setupReplication();
106
107
108
109
110
111
112
113
114 putAndReplicateRows();
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140 mimicSyncUpAfterDelete();
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170 mimicSyncUpAfterPut();
171
172 }
173
174 private void setupReplication() throws Exception {
175 ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
176 ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
177
178 HBaseAdmin ha = new HBaseAdmin(conf1);
179 ha.createTable(t1_syncupSource);
180 ha.createTable(t2_syncupSource);
181 ha.close();
182
183 ha = new HBaseAdmin(conf2);
184 ha.createTable(t1_syncupTarget);
185 ha.createTable(t2_syncupTarget);
186 ha.close();
187
188
189 ht1Source = new HTable(conf1, t1_su);
190 ht1Source.setWriteBufferSize(1024);
191 ht2Source = new HTable(conf1, t2_su);
192 ht1Source.setWriteBufferSize(1024);
193
194
195 ht1TargetAtPeer1 = new HTable(conf2, t1_su);
196 ht1TargetAtPeer1.setWriteBufferSize(1024);
197 ht2TargetAtPeer1 = new HTable(conf2, t2_su);
198 ht2TargetAtPeer1.setWriteBufferSize(1024);
199
200
201
202
203 admin1.addPeer("1", utility2.getClusterKey());
204
205 admin1.close();
206 admin2.close();
207 }
208
209 private void putAndReplicateRows() throws Exception {
210 LOG.debug("putAndReplicateRows");
211
212 Put p;
213
214
215 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
216 p = new Put(Bytes.toBytes("row" + i));
217 p.add(famName, qualName, Bytes.toBytes("val" + i));
218 ht1Source.put(p);
219 }
220 p = new Put(Bytes.toBytes("row" + 9999));
221 p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
222 ht1Source.put(p);
223
224
225 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
226 p = new Put(Bytes.toBytes("row" + i));
227 p.add(famName, qualName, Bytes.toBytes("val" + i));
228 ht2Source.put(p);
229 }
230 p = new Put(Bytes.toBytes("row" + 9999));
231 p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
232 ht2Source.put(p);
233
234
235 Thread.sleep(SLEEP_TIME);
236 int rowCount_ht1Source = utility1.countRows(ht1Source);
237 for (int i = 0; i < NB_RETRIES; i++) {
238 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
239 if (i==NB_RETRIES-1) {
240 assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source - 1,
241 rowCount_ht1TargetAtPeer1);
242 }
243 if (rowCount_ht1Source - 1 == rowCount_ht1TargetAtPeer1) {
244 break;
245 }
246 Thread.sleep(SLEEP_TIME);
247 }
248
249 int rowCount_ht2Source = utility1.countRows(ht2Source);
250 for (int i = 0; i < NB_RETRIES; i++) {
251 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
252 if (i==NB_RETRIES-1) {
253 assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source - 1,
254 rowCount_ht2TargetAtPeer1);
255 }
256 if (rowCount_ht2Source - 1 == rowCount_ht2TargetAtPeer1) {
257 break;
258 }
259 Thread.sleep(SLEEP_TIME);
260 }
261 }
262
263 private void mimicSyncUpAfterDelete() throws Exception {
264 LOG.debug("mimicSyncUpAfterDelete");
265 utility2.shutdownMiniHBaseCluster();
266
267 List<Delete> list = new ArrayList<Delete>();
268
269 for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) {
270 String rowKey = "row" + i;
271 Delete del = new Delete(rowKey.getBytes());
272 list.add(del);
273 }
274 ht1Source.delete(list);
275
276 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
277 String rowKey = "row" + i;
278 Delete del = new Delete(rowKey.getBytes());
279 list.add(del);
280 }
281 ht2Source.delete(list);
282
283 int rowCount_ht1Source = utility1.countRows(ht1Source);
284 assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51,
285 rowCount_ht1Source);
286
287 int rowCount_ht2Source = utility1.countRows(ht2Source);
288 assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam",
289 101, rowCount_ht2Source);
290
291 utility1.shutdownMiniHBaseCluster();
292 utility2.restartHBaseCluster(1);
293
294 Thread.sleep(SLEEP_TIME);
295
296
297 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
298 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
299 assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
300 assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
301
302
303 for (int i = 0; i < NB_RETRIES; i++) {
304 syncUp(utility1);
305 rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
306 rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
307 if (i == NB_RETRIES - 1) {
308 if (rowCount_ht1TargetAtPeer1 != 50 || rowCount_ht2TargetAtPeer1 != 100) {
309
310 utility1.restartHBaseCluster(1);
311 rowCount_ht1Source = utility1.countRows(ht1Source);
312 LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source);
313 rowCount_ht2Source = utility1.countRows(ht2Source);
314 LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source);
315 }
316 assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
317 rowCount_ht1TargetAtPeer1);
318 assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
319 rowCount_ht2TargetAtPeer1);
320 }
321 if (rowCount_ht1TargetAtPeer1 == 50 && rowCount_ht2TargetAtPeer1 == 100) {
322 LOG.info("SyncUpAfterDelete succeeded at retry = " + i);
323 break;
324 } else {
325 LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
326 + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
327 + rowCount_ht2TargetAtPeer1);
328 }
329 Thread.sleep(SLEEP_TIME);
330 }
331 }
332
333 private void mimicSyncUpAfterPut() throws Exception {
334 LOG.debug("mimicSyncUpAfterPut");
335 utility1.restartHBaseCluster(1);
336 utility2.shutdownMiniHBaseCluster();
337
338 Put p;
339
340
341 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
342 p = new Put(Bytes.toBytes("row" + i));
343 p.add(famName, qualName, Bytes.toBytes("val" + i));
344 ht1Source.put(p);
345 }
346 p = new Put(Bytes.toBytes("row" + 9998));
347 p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
348 ht1Source.put(p);
349
350
351
352 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
353 p = new Put(Bytes.toBytes("row" + i));
354 p.add(famName, qualName, Bytes.toBytes("val" + i));
355 ht2Source.put(p);
356 }
357 p = new Put(Bytes.toBytes("row" + 9998));
358 p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
359 ht2Source.put(p);
360
361 int rowCount_ht1Source = utility1.countRows(ht1Source);
362 assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source);
363 int rowCount_ht2Source = utility1.countRows(ht2Source);
364 assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
365
366 utility1.shutdownMiniHBaseCluster();
367 utility2.restartHBaseCluster(1);
368
369 Thread.sleep(SLEEP_TIME);
370
371
372 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
373 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
374 assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50,
375 rowCount_ht1TargetAtPeer1);
376 assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
377 rowCount_ht2TargetAtPeer1);
378
379
380 for (int i = 0; i < NB_RETRIES; i++) {
381 syncUp(utility1);
382 rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
383 rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
384 if (i == NB_RETRIES - 1) {
385 if (rowCount_ht1TargetAtPeer1 != 100 || rowCount_ht2TargetAtPeer1 != 200) {
386
387 utility1.restartHBaseCluster(1);
388 rowCount_ht1Source = utility1.countRows(ht1Source);
389 LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
390 rowCount_ht2Source = utility1.countRows(ht2Source);
391 LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source);
392 }
393 assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
394 rowCount_ht1TargetAtPeer1);
395 assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
396 rowCount_ht2TargetAtPeer1);
397 }
398 if (rowCount_ht1TargetAtPeer1 == 100 && rowCount_ht2TargetAtPeer1 == 200) {
399 LOG.info("SyncUpAfterPut succeeded at retry = " + i);
400 break;
401 } else {
402 LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
403 + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
404 + rowCount_ht2TargetAtPeer1);
405 }
406 Thread.sleep(SLEEP_TIME);
407 }
408 }
409
410 private void syncUp(HBaseTestingUtility ut) throws Exception {
411 ReplicationSyncUp.setConfigure(ut.getConfiguration());
412 String[] arguments = new String[] { null };
413 new ReplicationSyncUp().run(arguments);
414 }
415
416 }