View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication;
20  
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.util.ArrayList;
27  import java.util.Arrays;
28  import java.util.HashMap;
29  import java.util.List;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellUtil;
35  import org.apache.hadoop.hbase.ClusterStatus;
36  import org.apache.hadoop.hbase.HColumnDescriptor;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.HRegionInfo;
39  import org.apache.hadoop.hbase.HTableDescriptor;
40  import org.apache.hadoop.hbase.ServerLoad;
41  import org.apache.hadoop.hbase.ServerName;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.client.HTable;
44  import org.apache.hadoop.hbase.client.Admin;
45  import org.apache.hadoop.hbase.client.Delete;
46  import org.apache.hadoop.hbase.client.Get;
47  import org.apache.hadoop.hbase.client.HBaseAdmin;
48  import org.apache.hadoop.hbase.client.Put;
49  import org.apache.hadoop.hbase.client.Result;
50  import org.apache.hadoop.hbase.client.ResultScanner;
51  import org.apache.hadoop.hbase.client.Scan;
52  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
53  import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
54  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
55  import org.apache.hadoop.hbase.testclassification.LargeTests;
56  import org.apache.hadoop.hbase.wal.WALKey;
57  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
58  import org.apache.hadoop.hbase.replication.regionserver.Replication;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
61  import org.apache.hadoop.hbase.util.JVMClusterUtil;
62  import org.apache.hadoop.mapreduce.Job;
63  import org.junit.Before;
64  import org.junit.Test;
65  import org.junit.experimental.categories.Category;
66  
67  import com.google.protobuf.ByteString;
68  import com.sun.tools.javac.code.Attribute.Array;
69  
70  @Category(LargeTests.class)
71  public class TestReplicationSmallTests extends TestReplicationBase {
72  
73    private static final Log LOG = LogFactory.getLog(TestReplicationSmallTests.class);
74  
75    /**
76     * @throws java.lang.Exception
77     */
78    @Before
79    public void setUp() throws Exception {
80      // Starting and stopping replication can make us miss new logs,
81      // rolling like this makes sure the most recent one gets added to the queue
82      for ( JVMClusterUtil.RegionServerThread r :
83          utility1.getHBaseCluster().getRegionServerThreads()) {
84        utility1.getHBaseAdmin().rollWALWriter(r.getRegionServer().getServerName());
85      }
86      utility1.deleteTableData(tableName);
87      // truncating the table will send one Delete per row to the slave cluster
88      // in an async fashion, which is why we cannot just call deleteTableData on
89      // utility2 since late writes could make it to the slave in some way.
90      // Instead, we truncate the first table and wait for all the Deletes to
91      // make it to the slave.
92      Scan scan = new Scan();
93      int lastCount = 0;
94      for (int i = 0; i < NB_RETRIES; i++) {
95        if (i==NB_RETRIES-1) {
96          fail("Waited too much time for truncate");
97        }
98        ResultScanner scanner = htable2.getScanner(scan);
99        Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
100       scanner.close();
101       if (res.length != 0) {
102         if (res.length < lastCount) {
103           i--; // Don't increment timeout if we make progress
104         }
105         lastCount = res.length;
106         LOG.info("Still got " + res.length + " rows");
107         Thread.sleep(SLEEP_TIME);
108       } else {
109         break;
110       }
111     }
112   }
113 
114   /**
115    * Verify that version and column delete marker types are replicated
116    * correctly.
117    * @throws Exception
118    */
119   @Test(timeout=300000)
120   public void testDeleteTypes() throws Exception {
121     LOG.info("testDeleteTypes");
122     final byte[] v1 = Bytes.toBytes("v1");
123     final byte[] v2 = Bytes.toBytes("v2");
124     final byte[] v3 = Bytes.toBytes("v3");
125     htable1 = new HTable(conf1, tableName);
126 
127     long t = EnvironmentEdgeManager.currentTime();
128     // create three versions for "row"
129     Put put = new Put(row);
130     put.add(famName, row, t, v1);
131     htable1.put(put);
132 
133     put = new Put(row);
134     put.add(famName, row, t+1, v2);
135     htable1.put(put);
136 
137     put = new Put(row);
138     put.add(famName, row, t+2, v3);
139     htable1.put(put);
140 
141     Get get = new Get(row);
142     get.setMaxVersions();
143     for (int i = 0; i < NB_RETRIES; i++) {
144       if (i==NB_RETRIES-1) {
145         fail("Waited too much time for put replication");
146       }
147       Result res = htable2.get(get);
148       if (res.size() < 3) {
149         LOG.info("Rows not available");
150         Thread.sleep(SLEEP_TIME);
151       } else {
152         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
153         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
154         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1);
155         break;
156       }
157     }
158     // place a version delete marker (delete last version)
159     Delete d = new Delete(row);
160     d.deleteColumn(famName, row, t);
161     htable1.delete(d);
162 
163     get = new Get(row);
164     get.setMaxVersions();
165     for (int i = 0; i < NB_RETRIES; i++) {
166       if (i==NB_RETRIES-1) {
167         fail("Waited too much time for put replication");
168       }
169       Result res = htable2.get(get);
170       if (res.size() > 2) {
171         LOG.info("Version not deleted");
172         Thread.sleep(SLEEP_TIME);
173       } else {
174         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
175         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
176         break;
177       }
178     }
179 
180     // place a column delete marker
181     d = new Delete(row);
182     d.deleteColumns(famName, row, t+2);
183     htable1.delete(d);
184 
185     // now *both* of the remaining version should be deleted
186     // at the replica
187     get = new Get(row);
188     for (int i = 0; i < NB_RETRIES; i++) {
189       if (i==NB_RETRIES-1) {
190         fail("Waited too much time for del replication");
191       }
192       Result res = htable2.get(get);
193       if (res.size() >= 1) {
194         LOG.info("Rows not deleted");
195         Thread.sleep(SLEEP_TIME);
196       } else {
197         break;
198       }
199     }
200   }
201 
202   /**
203    * Add a row, check it's replicated, delete it, check's gone
204    * @throws Exception
205    */
206   @Test(timeout=300000)
207   public void testSimplePutDelete() throws Exception {
208     LOG.info("testSimplePutDelete");
209     Put put = new Put(row);
210     put.add(famName, row, row);
211 
212     htable1 = new HTable(conf1, tableName);
213     htable1.put(put);
214 
215     Get get = new Get(row);
216     for (int i = 0; i < NB_RETRIES; i++) {
217       if (i==NB_RETRIES-1) {
218         fail("Waited too much time for put replication");
219       }
220       Result res = htable2.get(get);
221       if (res.size() == 0) {
222         LOG.info("Row not available");
223         Thread.sleep(SLEEP_TIME);
224       } else {
225         assertArrayEquals(res.value(), row);
226         break;
227       }
228     }
229 
230     Delete del = new Delete(row);
231     htable1.delete(del);
232 
233     get = new Get(row);
234     for (int i = 0; i < NB_RETRIES; i++) {
235       if (i==NB_RETRIES-1) {
236         fail("Waited too much time for del replication");
237       }
238       Result res = htable2.get(get);
239       if (res.size() >= 1) {
240         LOG.info("Row not deleted");
241         Thread.sleep(SLEEP_TIME);
242       } else {
243         break;
244       }
245     }
246   }
247 
248   /**
249    * Try a small batch upload using the write buffer, check it's replicated
250    * @throws Exception
251    */
252   @Test(timeout=300000)
253   public void testSmallBatch() throws Exception {
254     LOG.info("testSmallBatch");
255     // normal Batch tests
256     List<Put> puts = new ArrayList<>();
257     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
258       Put put = new Put(Bytes.toBytes(i));
259       put.add(famName, row, row);
260       puts.add(put);
261     }
262     htable1.put(puts);
263 
264     Scan scan = new Scan();
265 
266     ResultScanner scanner1 = htable1.getScanner(scan);
267     Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
268     scanner1.close();
269     assertEquals(NB_ROWS_IN_BATCH, res1.length);
270 
271     for (int i = 0; i < NB_RETRIES; i++) {
272       scan = new Scan();
273       if (i==NB_RETRIES-1) {
274         fail("Waited too much time for normal batch replication");
275       }
276       ResultScanner scanner = htable2.getScanner(scan);
277       Result[] res = scanner.next(NB_ROWS_IN_BATCH);
278       scanner.close();
279       if (res.length != NB_ROWS_IN_BATCH) {
280         LOG.info("Only got " + res.length + " rows");
281         Thread.sleep(SLEEP_TIME);
282       } else {
283         break;
284       }
285     }
286   }
287 
288   /**
289    * Test disable/enable replication, trying to insert, make sure nothing's
290    * replicated, enable it, the insert should be replicated
291    *
292    * @throws Exception
293    */
294   @Test(timeout = 300000)
295   public void testDisableEnable() throws Exception {
296 
297     // Test disabling replication
298     admin.disablePeer("2");
299 
300     byte[] rowkey = Bytes.toBytes("disable enable");
301     Put put = new Put(rowkey);
302     put.add(famName, row, row);
303     htable1.put(put);
304 
305     Get get = new Get(rowkey);
306     for (int i = 0; i < NB_RETRIES; i++) {
307       Result res = htable2.get(get);
308       if (res.size() >= 1) {
309         fail("Replication wasn't disabled");
310       } else {
311         LOG.info("Row not replicated, let's wait a bit more...");
312         Thread.sleep(SLEEP_TIME);
313       }
314     }
315 
316     // Test enable replication
317     admin.enablePeer("2");
318 
319     for (int i = 0; i < NB_RETRIES; i++) {
320       Result res = htable2.get(get);
321       if (res.size() == 0) {
322         LOG.info("Row not available");
323         Thread.sleep(SLEEP_TIME);
324       } else {
325         assertArrayEquals(res.value(), row);
326         return;
327       }
328     }
329     fail("Waited too much time for put replication");
330   }
331 
332   /**
333    * Integration test for TestReplicationAdmin, removes and re-add a peer
334    * cluster
335    *
336    * @throws Exception
337    */
338   @Test(timeout=300000)
339   public void testAddAndRemoveClusters() throws Exception {
340     LOG.info("testAddAndRemoveClusters");
341     admin.removePeer("2");
342     Thread.sleep(SLEEP_TIME);
343     byte[] rowKey = Bytes.toBytes("Won't be replicated");
344     Put put = new Put(rowKey);
345     put.add(famName, row, row);
346     htable1.put(put);
347 
348     Get get = new Get(rowKey);
349     for (int i = 0; i < NB_RETRIES; i++) {
350       if (i == NB_RETRIES-1) {
351         break;
352       }
353       Result res = htable2.get(get);
354       if (res.size() >= 1) {
355         fail("Not supposed to be replicated");
356       } else {
357         LOG.info("Row not replicated, let's wait a bit more...");
358         Thread.sleep(SLEEP_TIME);
359       }
360     }
361 
362     admin.addPeer("2", utility2.getClusterKey());
363     Thread.sleep(SLEEP_TIME);
364     rowKey = Bytes.toBytes("do rep");
365     put = new Put(rowKey);
366     put.add(famName, row, row);
367     LOG.info("Adding new row");
368     htable1.put(put);
369 
370     get = new Get(rowKey);
371     for (int i = 0; i < NB_RETRIES; i++) {
372       if (i==NB_RETRIES-1) {
373         fail("Waited too much time for put replication");
374       }
375       Result res = htable2.get(get);
376       if (res.size() == 0) {
377         LOG.info("Row not available");
378         Thread.sleep(SLEEP_TIME*i);
379       } else {
380         assertArrayEquals(res.value(), row);
381         break;
382       }
383     }
384   }
385 
386 
387   /**
388    * Do a more intense version testSmallBatch, one  that will trigger
389    * wal rolling and other non-trivial code paths
390    * @throws Exception
391    */
392   @Test(timeout=300000)
393   public void testLoading() throws Exception {
394     LOG.info("Writing out rows to table1 in testLoading");
395     List<Put> puts = new ArrayList<Put>();
396     for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
397       Put put = new Put(Bytes.toBytes(i));
398       put.add(famName, row, row);
399       puts.add(put);
400     }
401     htable1.setWriteBufferSize(1024);
402     // The puts will be iterated through and flushed only when the buffer
403     // size is reached.
404     htable1.put(puts);
405 
406     Scan scan = new Scan();
407 
408     ResultScanner scanner = htable1.getScanner(scan);
409     Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
410     scanner.close();
411 
412     assertEquals(NB_ROWS_IN_BIG_BATCH, res.length);
413 
414     LOG.info("Looking in table2 for replicated rows in testLoading");
415     long start = System.currentTimeMillis();
416     // Retry more than NB_RETRIES.  As it was, retries were done in 5 seconds and we'd fail
417     // sometimes.
418     final long retries = NB_RETRIES * 10;
419     for (int i = 0; i < retries; i++) {
420       scan = new Scan();
421       scanner = htable2.getScanner(scan);
422       res = scanner.next(NB_ROWS_IN_BIG_BATCH);
423       scanner.close();
424       if (res.length != NB_ROWS_IN_BIG_BATCH) {
425         if (i == retries - 1) {
426           int lastRow = -1;
427           for (Result result : res) {
428             int currentRow = Bytes.toInt(result.getRow());
429             for (int row = lastRow+1; row < currentRow; row++) {
430               LOG.error("Row missing: " + row);
431             }
432             lastRow = currentRow;
433           }
434           LOG.error("Last row: " + lastRow);
435           fail("Waited too much time for normal batch replication, " +
436             res.length + " instead of " + NB_ROWS_IN_BIG_BATCH + "; waited=" +
437             (System.currentTimeMillis() - start) + "ms");
438         } else {
439           LOG.info("Only got " + res.length + " rows... retrying");
440           Thread.sleep(SLEEP_TIME);
441         }
442       } else {
443         break;
444       }
445     }
446   }
447 
448   /**
449    * Do a small loading into a table, make sure the data is really the same,
450    * then run the VerifyReplication job to check the results. Do a second
451    * comparison where all the cells are different.
452    * @throws Exception
453    */
454   @Test(timeout=300000)
455   public void testVerifyRepJob() throws Exception {
456     // Populate the tables, at the same time it guarantees that the tables are
457     // identical since it does the check
458     testSmallBatch();
459 
460     String[] args = new String[] {"2", tableName.getNameAsString()};
461     Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
462     if (job == null) {
463       fail("Job wasn't created, see the log");
464     }
465     if (!job.waitForCompletion(true)) {
466       fail("Job failed, see the log");
467     }
468     assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
469         findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
470     assertEquals(0, job.getCounters().
471         findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
472 
473     Scan scan = new Scan();
474     ResultScanner rs = htable2.getScanner(scan);
475     Put put = null;
476     for (Result result : rs) {
477       put = new Put(result.getRow());
478       Cell firstVal = result.rawCells()[0];
479       put.add(CellUtil.cloneFamily(firstVal),
480           CellUtil.cloneQualifier(firstVal), Bytes.toBytes("diff data"));
481       htable2.put(put);
482     }
483     Delete delete = new Delete(put.getRow());
484     htable2.delete(delete);
485     job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
486     if (job == null) {
487       fail("Job wasn't created, see the log");
488     }
489     if (!job.waitForCompletion(true)) {
490       fail("Job failed, see the log");
491     }
492     assertEquals(0, job.getCounters().
493         findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
494     assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
495         findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
496   }
497 
498   @Test(timeout=300000)
499   // VerifyReplication should honor versions option
500   public void testHBase14905() throws Exception {
501     // normal Batch tests
502     byte[] qualifierName = Bytes.toBytes("f1");
503     Put put = new Put(Bytes.toBytes("r1"));
504     put.addColumn(famName, qualifierName, Bytes.toBytes("v1002"));
505     htable1.put(put);
506     put.addColumn(famName, qualifierName, Bytes.toBytes("v1001"));
507     htable1.put(put);
508     put.addColumn(famName, qualifierName, Bytes.toBytes("v1112"));
509     htable1.put(put);
510 
511     Scan scan = new Scan();
512     scan.setMaxVersions(100);
513     ResultScanner scanner1 = htable1.getScanner(scan);
514     Result[] res1 = scanner1.next(1);
515     scanner1.close();
516 
517     assertEquals(1, res1.length);
518     assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
519 
520     for (int i = 0; i < NB_RETRIES; i++) {
521       scan = new Scan();
522       scan.setMaxVersions(100);
523       scanner1 = htable2.getScanner(scan);
524       res1 = scanner1.next(1);
525       scanner1.close();
526       if (res1.length != 1) {
527         LOG.info("Only got " + res1.length + " rows");
528         Thread.sleep(SLEEP_TIME);
529       } else {
530         int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
531         if (cellNumber != 3) {
532           LOG.info("Only got " + cellNumber + " cells");
533           Thread.sleep(SLEEP_TIME);
534         } else {
535           break;
536         }
537       }
538       if (i == NB_RETRIES-1) {
539         fail("Waited too much time for normal batch replication");
540       }
541     }
542 
543     put.addColumn(famName, qualifierName, Bytes.toBytes("v1111"));
544     htable2.put(put);
545     put.addColumn(famName, qualifierName, Bytes.toBytes("v1112"));
546     htable2.put(put);
547 
548     scan = new Scan();
549     scan.setMaxVersions(100);
550     scanner1 = htable2.getScanner(scan);
551     res1 = scanner1.next(NB_ROWS_IN_BATCH);
552     scanner1.close();
553 
554     assertEquals(1, res1.length);
555     assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size());
556 
557     String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()};
558     Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
559     if (job == null) {
560       fail("Job wasn't created, see the log");
561     }
562     if (!job.waitForCompletion(true)) {
563       fail("Job failed, see the log");
564     }
565     assertEquals(0, job.getCounters().
566       findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
567     assertEquals(1, job.getCounters().
568       findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
569   }
570 
571   @Test(timeout=300000)
572   // VerifyReplication should honor versions option
573   public void testVersionMismatchHBase14905() throws Exception {
574     // normal Batch tests
575     byte[] qualifierName = Bytes.toBytes("f1");
576     Put put = new Put(Bytes.toBytes("r1"));
577     long ts = System.currentTimeMillis();
578     put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1"));
579     htable1.put(put);
580     put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v2"));
581     htable1.put(put);
582     put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v3"));
583     htable1.put(put);
584        
585     Scan scan = new Scan();
586     scan.setMaxVersions(100);
587     ResultScanner scanner1 = htable1.getScanner(scan);
588     Result[] res1 = scanner1.next(1);
589     scanner1.close();
590 
591     assertEquals(1, res1.length);
592     assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
593     
594     for (int i = 0; i < NB_RETRIES; i++) {
595       scan = new Scan();
596       scan.setMaxVersions(100);
597       scanner1 = htable2.getScanner(scan);
598       res1 = scanner1.next(1);
599       scanner1.close();
600       if (res1.length != 1) {
601         LOG.info("Only got " + res1.length + " rows");
602         Thread.sleep(SLEEP_TIME);
603       } else {
604         int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
605         if (cellNumber != 3) {
606           LOG.info("Only got " + cellNumber + " cells");
607           Thread.sleep(SLEEP_TIME);
608         } else {
609           break;
610         }
611       }
612       if (i == NB_RETRIES-1) {
613         fail("Waited too much time for normal batch replication");
614       }
615     }
616    
617     try {
618       // Disabling replication and modifying the particular version of the cell to validate the feature.  
619       admin.disablePeer("2");
620       Put put2 = new Put(Bytes.toBytes("r1"));
621       put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99"));
622       htable2.put(put2);
623       
624       scan = new Scan();
625       scan.setMaxVersions(100);
626       scanner1 = htable2.getScanner(scan);
627       res1 = scanner1.next(NB_ROWS_IN_BATCH);
628       scanner1.close();
629       assertEquals(1, res1.length);
630       assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
631     
632       String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()};
633       Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
634       if (job == null) {
635         fail("Job wasn't created, see the log");
636       }
637       if (!job.waitForCompletion(true)) {
638         fail("Job failed, see the log");
639       }    
640       assertEquals(0, job.getCounters().
641         findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
642       assertEquals(1, job.getCounters().
643         findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
644       }
645     finally {
646       admin.enablePeer("2");
647     }
648   }
649 
650   /**
651    * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out
652    * the compaction WALEdit
653    * @throws Exception
654    */
655   @Test(timeout=300000)
656   public void testCompactionWALEdits() throws Exception {
657     WALProtos.CompactionDescriptor compactionDescriptor =
658         WALProtos.CompactionDescriptor.getDefaultInstance();
659     HRegionInfo hri = new HRegionInfo(htable1.getName(),
660       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
661     WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
662     Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit);
663   }
664 
665   /**
666    * Test for HBASE-8663
667    * Create two new Tables with colfamilies enabled for replication then run
668    * ReplicationAdmin.listReplicated(). Finally verify the table:colfamilies. Note:
669    * TestReplicationAdmin is a better place for this testing but it would need mocks.
670    * @throws Exception
671    */
672   @Test(timeout = 300000)
673   public void testVerifyListReplicatedTable() throws Exception {
674     LOG.info("testVerifyListReplicatedTable");
675 
676     final String tName = "VerifyListReplicated_";
677     final String colFam = "cf1";
678     final int numOfTables = 3;
679 
680     HBaseAdmin hadmin = new HBaseAdmin(conf1);
681 
682     // Create Tables
683     for (int i = 0; i < numOfTables; i++) {
684       HTableDescriptor ht = new HTableDescriptor(TableName.valueOf(tName + i));
685       HColumnDescriptor cfd = new HColumnDescriptor(colFam);
686       cfd.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
687       ht.addFamily(cfd);
688       hadmin.createTable(ht);
689     }
690 
691     // verify the result
692     List<HashMap<String, String>> replicationColFams = admin.listReplicated();
693     int[] match = new int[numOfTables]; // array of 3 with init value of zero
694 
695     for (int i = 0; i < replicationColFams.size(); i++) {
696       HashMap<String, String> replicationEntry = replicationColFams.get(i);
697       String tn = replicationEntry.get(ReplicationAdmin.TNAME);
698       if ((tn.startsWith(tName)) && replicationEntry.get(ReplicationAdmin.CFNAME).equals(colFam)) {
699         int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit
700         match[m]++; // should only increase once
701       }
702     }
703 
704     // check the matching result
705     for (int i = 0; i < match.length; i++) {
706       assertTrue("listReplicated() does not match table " + i, (match[i] == 1));
707     }
708 
709     // drop tables
710     for (int i = 0; i < numOfTables; i++) {
711       String ht = tName + i;
712       hadmin.disableTable(ht);
713       hadmin.deleteTable(ht);
714     }
715 
716     hadmin.close();
717   }
718 
719   /**
720    * Test for HBASE-9531
721    * put a few rows into htable1, which should be replicated to htable2
722    * create a ClusterStatus instance 'status' from HBaseAdmin
723    * test : status.getLoad(server).getReplicationLoadSourceList()
724    * test : status.getLoad(server).getReplicationLoadSink()
725    * * @throws Exception
726    */
727   @Test(timeout = 300000)
728   public void testReplicationStatus() throws Exception {
729     LOG.info("testReplicationStatus");
730 
731     try (Admin admin = utility1.getConnection().getAdmin()) {
732 
733       final byte[] qualName = Bytes.toBytes("q");
734       Put p;
735 
736       for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
737         p = new Put(Bytes.toBytes("row" + i));
738         p.add(famName, qualName, Bytes.toBytes("val" + i));
739         htable1.put(p);
740       }
741 
742       ClusterStatus status = admin.getClusterStatus();
743 
744       for (ServerName server : status.getServers()) {
745         ServerLoad sl = status.getLoad(server);
746         List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList();
747         ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink();
748 
749         // check SourceList has at least one entry
750         assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() > 0));
751 
752         // check Sink exist only as it is difficult to verify the value on the fly
753         assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
754           (rLoadSink.getAgeOfLastAppliedOp() >= 0));
755         assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
756           (rLoadSink.getTimeStampsOfLastAppliedOp() >= 0));
757       }
758     }
759   }
760 }