1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import static org.junit.Assert.assertArrayEquals;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25
26 import java.util.HashMap;
27 import java.util.List;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellUtil;
33 import org.apache.hadoop.hbase.HColumnDescriptor;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.HRegionInfo;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.LargeTests;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.client.Delete;
40 import org.apache.hadoop.hbase.client.Get;
41 import org.apache.hadoop.hbase.client.HBaseAdmin;
42 import org.apache.hadoop.hbase.client.HTable;
43 import org.apache.hadoop.hbase.client.Put;
44 import org.apache.hadoop.hbase.client.Result;
45 import org.apache.hadoop.hbase.client.ResultScanner;
46 import org.apache.hadoop.hbase.client.Scan;
47 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
48 import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
49 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
50 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
51 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
52 import org.apache.hadoop.hbase.replication.regionserver.Replication;
53 import org.apache.hadoop.hbase.util.Bytes;
54 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55 import org.apache.hadoop.hbase.util.JVMClusterUtil;
56 import org.apache.hadoop.mapreduce.Job;
57 import org.junit.Before;
58 import org.junit.Test;
59 import org.junit.experimental.categories.Category;
60
61 @Category(LargeTests.class)
62 public class TestReplicationSmallTests extends TestReplicationBase {
63
64 private static final Log LOG = LogFactory.getLog(TestReplicationSmallTests.class);
65
66
67
68
69 @Before
70 public void setUp() throws Exception {
71 htable1.setAutoFlush(true, true);
72
73
74 for ( JVMClusterUtil.RegionServerThread r :
75 utility1.getHBaseCluster().getRegionServerThreads()) {
76 r.getRegionServer().getWAL().rollWriter();
77 }
78 utility1.truncateTable(tableName);
79
80
81
82
83
84 Scan scan = new Scan();
85 int lastCount = 0;
86 for (int i = 0; i < NB_RETRIES; i++) {
87 if (i==NB_RETRIES-1) {
88 fail("Waited too much time for truncate");
89 }
90 ResultScanner scanner = htable2.getScanner(scan);
91 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
92 scanner.close();
93 if (res.length != 0) {
94 if (res.length < lastCount) {
95 i--;
96 }
97 lastCount = res.length;
98 LOG.info("Still got " + res.length + " rows");
99 Thread.sleep(SLEEP_TIME);
100 } else {
101 break;
102 }
103 }
104 }
105
106
107
108
109
110
111 @Test(timeout=300000)
112 public void testDeleteTypes() throws Exception {
113 LOG.info("testDeleteTypes");
114 final byte[] v1 = Bytes.toBytes("v1");
115 final byte[] v2 = Bytes.toBytes("v2");
116 final byte[] v3 = Bytes.toBytes("v3");
117 htable1 = new HTable(conf1, tableName);
118
119 long t = EnvironmentEdgeManager.currentTimeMillis();
120
121 Put put = new Put(row);
122 put.add(famName, row, t, v1);
123 htable1.put(put);
124
125 put = new Put(row);
126 put.add(famName, row, t+1, v2);
127 htable1.put(put);
128
129 put = new Put(row);
130 put.add(famName, row, t+2, v3);
131 htable1.put(put);
132
133 Get get = new Get(row);
134 get.setMaxVersions();
135 for (int i = 0; i < NB_RETRIES; i++) {
136 if (i==NB_RETRIES-1) {
137 fail("Waited too much time for put replication");
138 }
139 Result res = htable2.get(get);
140 if (res.size() < 3) {
141 LOG.info("Rows not available");
142 Thread.sleep(SLEEP_TIME);
143 } else {
144 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
145 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
146 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1);
147 break;
148 }
149 }
150
151 Delete d = new Delete(row);
152 d.deleteColumn(famName, row, t);
153 htable1.delete(d);
154
155 get = new Get(row);
156 get.setMaxVersions();
157 for (int i = 0; i < NB_RETRIES; i++) {
158 if (i==NB_RETRIES-1) {
159 fail("Waited too much time for put replication");
160 }
161 Result res = htable2.get(get);
162 if (res.size() > 2) {
163 LOG.info("Version not deleted");
164 Thread.sleep(SLEEP_TIME);
165 } else {
166 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
167 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
168 break;
169 }
170 }
171
172
173 d = new Delete(row);
174 d.deleteColumns(famName, row, t+2);
175 htable1.delete(d);
176
177
178
179 get = new Get(row);
180 for (int i = 0; i < NB_RETRIES; i++) {
181 if (i==NB_RETRIES-1) {
182 fail("Waited too much time for del replication");
183 }
184 Result res = htable2.get(get);
185 if (res.size() >= 1) {
186 LOG.info("Rows not deleted");
187 Thread.sleep(SLEEP_TIME);
188 } else {
189 break;
190 }
191 }
192 }
193
194
195
196
197
198 @Test(timeout=300000)
199 public void testSimplePutDelete() throws Exception {
200 LOG.info("testSimplePutDelete");
201 Put put = new Put(row);
202 put.add(famName, row, row);
203
204 htable1 = new HTable(conf1, tableName);
205 htable1.put(put);
206
207 Get get = new Get(row);
208 for (int i = 0; i < NB_RETRIES; i++) {
209 if (i==NB_RETRIES-1) {
210 fail("Waited too much time for put replication");
211 }
212 Result res = htable2.get(get);
213 if (res.size() == 0) {
214 LOG.info("Row not available");
215 Thread.sleep(SLEEP_TIME);
216 } else {
217 assertArrayEquals(res.value(), row);
218 break;
219 }
220 }
221
222 Delete del = new Delete(row);
223 htable1.delete(del);
224
225 get = new Get(row);
226 for (int i = 0; i < NB_RETRIES; i++) {
227 if (i==NB_RETRIES-1) {
228 fail("Waited too much time for del replication");
229 }
230 Result res = htable2.get(get);
231 if (res.size() >= 1) {
232 LOG.info("Row not deleted");
233 Thread.sleep(SLEEP_TIME);
234 } else {
235 break;
236 }
237 }
238 }
239
240
241
242
243
244 @Test(timeout=300000)
245 public void testSmallBatch() throws Exception {
246 LOG.info("testSmallBatch");
247 Put put;
248
249 htable1.setAutoFlush(false, true);
250 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
251 put = new Put(Bytes.toBytes(i));
252 put.add(famName, row, row);
253 htable1.put(put);
254 }
255 htable1.flushCommits();
256
257 Scan scan = new Scan();
258
259 ResultScanner scanner1 = htable1.getScanner(scan);
260 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
261 scanner1.close();
262 assertEquals(NB_ROWS_IN_BATCH, res1.length);
263
264 for (int i = 0; i < NB_RETRIES; i++) {
265 scan = new Scan();
266 if (i==NB_RETRIES-1) {
267 fail("Waited too much time for normal batch replication");
268 }
269 ResultScanner scanner = htable2.getScanner(scan);
270 Result[] res = scanner.next(NB_ROWS_IN_BATCH);
271 scanner.close();
272 if (res.length != NB_ROWS_IN_BATCH) {
273 LOG.info("Only got " + res.length + " rows");
274 Thread.sleep(SLEEP_TIME);
275 } else {
276 break;
277 }
278 }
279 }
280
281
282
283
284
285
286
287 @Test(timeout = 300000)
288 public void testDisableEnable() throws Exception {
289
290
291 admin.disablePeer("2");
292
293 byte[] rowkey = Bytes.toBytes("disable enable");
294 Put put = new Put(rowkey);
295 put.add(famName, row, row);
296 htable1.put(put);
297
298 Get get = new Get(rowkey);
299 for (int i = 0; i < NB_RETRIES; i++) {
300 Result res = htable2.get(get);
301 if (res.size() >= 1) {
302 fail("Replication wasn't disabled");
303 } else {
304 LOG.info("Row not replicated, let's wait a bit more...");
305 Thread.sleep(SLEEP_TIME);
306 }
307 }
308
309
310 admin.enablePeer("2");
311
312 for (int i = 0; i < NB_RETRIES; i++) {
313 Result res = htable2.get(get);
314 if (res.size() == 0) {
315 LOG.info("Row not available");
316 Thread.sleep(SLEEP_TIME);
317 } else {
318 assertArrayEquals(res.value(), row);
319 return;
320 }
321 }
322 fail("Waited too much time for put replication");
323 }
324
325
326
327
328
329
330
331 @Test(timeout=300000)
332 public void testAddAndRemoveClusters() throws Exception {
333 LOG.info("testAddAndRemoveClusters");
334 admin.removePeer("2");
335 Thread.sleep(SLEEP_TIME);
336 byte[] rowKey = Bytes.toBytes("Won't be replicated");
337 Put put = new Put(rowKey);
338 put.add(famName, row, row);
339 htable1.put(put);
340
341 Get get = new Get(rowKey);
342 for (int i = 0; i < NB_RETRIES; i++) {
343 if (i == NB_RETRIES-1) {
344 break;
345 }
346 Result res = htable2.get(get);
347 if (res.size() >= 1) {
348 fail("Not supposed to be replicated");
349 } else {
350 LOG.info("Row not replicated, let's wait a bit more...");
351 Thread.sleep(SLEEP_TIME);
352 }
353 }
354
355 admin.addPeer("2", utility2.getClusterKey());
356 Thread.sleep(SLEEP_TIME);
357 rowKey = Bytes.toBytes("do rep");
358 put = new Put(rowKey);
359 put.add(famName, row, row);
360 LOG.info("Adding new row");
361 htable1.put(put);
362
363 get = new Get(rowKey);
364 for (int i = 0; i < NB_RETRIES; i++) {
365 if (i==NB_RETRIES-1) {
366 fail("Waited too much time for put replication");
367 }
368 Result res = htable2.get(get);
369 if (res.size() == 0) {
370 LOG.info("Row not available");
371 Thread.sleep(SLEEP_TIME*i);
372 } else {
373 assertArrayEquals(res.value(), row);
374 break;
375 }
376 }
377 }
378
379
380
381
382
383
384
385 @Test(timeout=300000)
386 public void loadTesting() throws Exception {
387 htable1.setWriteBufferSize(1024);
388 htable1.setAutoFlush(false, true);
389 for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
390 Put put = new Put(Bytes.toBytes(i));
391 put.add(famName, row, row);
392 htable1.put(put);
393 }
394 htable1.flushCommits();
395
396 Scan scan = new Scan();
397
398 ResultScanner scanner = htable1.getScanner(scan);
399 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
400 scanner.close();
401
402 assertEquals(NB_ROWS_IN_BIG_BATCH, res.length);
403
404
405 long start = System.currentTimeMillis();
406 for (int i = 0; i < NB_RETRIES; i++) {
407 scan = new Scan();
408
409 scanner = htable2.getScanner(scan);
410 res = scanner.next(NB_ROWS_IN_BIG_BATCH);
411 scanner.close();
412 if (res.length != NB_ROWS_IN_BIG_BATCH) {
413 if (i == NB_RETRIES - 1) {
414 int lastRow = -1;
415 for (Result result : res) {
416 int currentRow = Bytes.toInt(result.getRow());
417 for (int row = lastRow+1; row < currentRow; row++) {
418 LOG.error("Row missing: " + row);
419 }
420 lastRow = currentRow;
421 }
422 LOG.error("Last row: " + lastRow);
423 fail("Waited too much time for normal batch replication, " +
424 res.length + " instead of " + NB_ROWS_IN_BIG_BATCH + "; waited=" +
425 (System.currentTimeMillis() - start) + "ms");
426 } else {
427 LOG.info("Only got " + res.length + " rows");
428 Thread.sleep(SLEEP_TIME);
429 }
430 } else {
431 break;
432 }
433 }
434 }
435
436
437
438
439
440
441
442 @Test(timeout=300000)
443 public void testVerifyRepJob() throws Exception {
444
445
446 testSmallBatch();
447
448 String[] args = new String[] {"2", Bytes.toString(tableName)};
449 Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
450 if (job == null) {
451 fail("Job wasn't created, see the log");
452 }
453 if (!job.waitForCompletion(true)) {
454 fail("Job failed, see the log");
455 }
456 assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
457 findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
458 assertEquals(0, job.getCounters().
459 findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
460
461 Scan scan = new Scan();
462 ResultScanner rs = htable2.getScanner(scan);
463 Put put = null;
464 for (Result result : rs) {
465 put = new Put(result.getRow());
466 Cell firstVal = result.rawCells()[0];
467 put.add(CellUtil.cloneFamily(firstVal),
468 CellUtil.cloneQualifier(firstVal), Bytes.toBytes("diff data"));
469 htable2.put(put);
470 }
471 Delete delete = new Delete(put.getRow());
472 htable2.delete(delete);
473 job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
474 if (job == null) {
475 fail("Job wasn't created, see the log");
476 }
477 if (!job.waitForCompletion(true)) {
478 fail("Job failed, see the log");
479 }
480 assertEquals(0, job.getCounters().
481 findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
482 assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
483 findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
484 }
485
486
487
488
489
490
491 @Test(timeout=300000)
492 public void testCompactionWALEdits() throws Exception {
493 WALProtos.CompactionDescriptor compactionDescriptor =
494 WALProtos.CompactionDescriptor.getDefaultInstance();
495 HRegionInfo hri = new HRegionInfo(htable1.getName(),
496 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
497 WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
498 Replication.scopeWALEdits(htable1.getTableDescriptor(), new HLogKey(), edit);
499 }
500
501
502
503
504
505
506
507
508 @Test(timeout = 300000)
509 public void testVerifyListReplicatedTable() throws Exception {
510 LOG.info("testVerifyListReplicatedTable");
511
512 final String tName = "VerifyListReplicated_";
513 final String colFam = "cf1";
514 final int numOfTables = 3;
515
516 HBaseAdmin hadmin = new HBaseAdmin(conf1);
517
518
519 for (int i = 0; i < numOfTables; i++) {
520 HTableDescriptor ht = new HTableDescriptor(TableName.valueOf(tName + i));
521 HColumnDescriptor cfd = new HColumnDescriptor(colFam);
522 cfd.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
523 ht.addFamily(cfd);
524 hadmin.createTable(ht);
525 }
526
527
528 List<HashMap<String, String>> replicationColFams = admin.listReplicated();
529 int[] match = new int[numOfTables];
530
531 for (int i = 0; i < replicationColFams.size(); i++) {
532 HashMap<String, String> replicationEntry = replicationColFams.get(i);
533 String tn = replicationEntry.get(ReplicationAdmin.TNAME);
534 if ((tn.startsWith(tName)) && replicationEntry.get(ReplicationAdmin.CFNAME).equals(colFam)) {
535 int m = Integer.parseInt(tn.substring(tn.length() - 1));
536 match[m]++;
537 }
538 }
539
540
541 for (int i = 0; i < match.length; i++) {
542 assertTrue("listReplicated() does not match table " + i, (match[i] == 1));
543 }
544
545
546 for (int i = 0; i < numOfTables; i++) {
547 String ht = tName + i;
548 hadmin.disableTable(ht);
549 hadmin.deleteTable(ht);
550 }
551
552 hadmin.close();
553 }
554
555 }