View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication;
20  
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.util.HashMap;
27  import java.util.List;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.HColumnDescriptor;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.HRegionInfo;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.LargeTests;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.client.Delete;
40  import org.apache.hadoop.hbase.client.Get;
41  import org.apache.hadoop.hbase.client.HBaseAdmin;
42  import org.apache.hadoop.hbase.client.HTable;
43  import org.apache.hadoop.hbase.client.Put;
44  import org.apache.hadoop.hbase.client.Result;
45  import org.apache.hadoop.hbase.client.ResultScanner;
46  import org.apache.hadoop.hbase.client.Scan;
47  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
48  import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
49  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
50  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
51  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
52  import org.apache.hadoop.hbase.replication.regionserver.Replication;
53  import org.apache.hadoop.hbase.util.Bytes;
54  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55  import org.apache.hadoop.hbase.util.JVMClusterUtil;
56  import org.apache.hadoop.mapreduce.Job;
57  import org.junit.Before;
58  import org.junit.Test;
59  import org.junit.experimental.categories.Category;
60  
61  @Category(LargeTests.class)
62  public class TestReplicationSmallTests extends TestReplicationBase {
63  
64    private static final Log LOG = LogFactory.getLog(TestReplicationSmallTests.class);
65  
66    /**
67     * @throws java.lang.Exception
68     */
69    @Before
70    public void setUp() throws Exception {
71      htable1.setAutoFlush(true, true);
72      // Starting and stopping replication can make us miss new logs,
73      // rolling like this makes sure the most recent one gets added to the queue
74      for ( JVMClusterUtil.RegionServerThread r :
75          utility1.getHBaseCluster().getRegionServerThreads()) {
76        r.getRegionServer().getWAL().rollWriter();
77      }
78      utility1.truncateTable(tableName);
79      // truncating the table will send one Delete per row to the slave cluster
80      // in an async fashion, which is why we cannot just call truncateTable on
81      // utility2 since late writes could make it to the slave in some way.
82      // Instead, we truncate the first table and wait for all the Deletes to
83      // make it to the slave.
84      Scan scan = new Scan();
85      int lastCount = 0;
86      for (int i = 0; i < NB_RETRIES; i++) {
87        if (i==NB_RETRIES-1) {
88          fail("Waited too much time for truncate");
89        }
90        ResultScanner scanner = htable2.getScanner(scan);
91        Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
92        scanner.close();
93        if (res.length != 0) {
94          if (res.length < lastCount) {
95            i--; // Don't increment timeout if we make progress
96          }
97          lastCount = res.length;
98          LOG.info("Still got " + res.length + " rows");
99          Thread.sleep(SLEEP_TIME);
100       } else {
101         break;
102       }
103     }
104   }
105 
106   /**
107    * Verify that version and column delete marker types are replicated
108    * correctly.
109    * @throws Exception
110    */
111   @Test(timeout=300000)
112   public void testDeleteTypes() throws Exception {
113     LOG.info("testDeleteTypes");
114     final byte[] v1 = Bytes.toBytes("v1");
115     final byte[] v2 = Bytes.toBytes("v2");
116     final byte[] v3 = Bytes.toBytes("v3");
117     htable1 = new HTable(conf1, tableName);
118 
119     long t = EnvironmentEdgeManager.currentTimeMillis();
120     // create three versions for "row"
121     Put put = new Put(row);
122     put.add(famName, row, t, v1);
123     htable1.put(put);
124 
125     put = new Put(row);
126     put.add(famName, row, t+1, v2);
127     htable1.put(put);
128 
129     put = new Put(row);
130     put.add(famName, row, t+2, v3);
131     htable1.put(put);
132 
133     Get get = new Get(row);
134     get.setMaxVersions();
135     for (int i = 0; i < NB_RETRIES; i++) {
136       if (i==NB_RETRIES-1) {
137         fail("Waited too much time for put replication");
138       }
139       Result res = htable2.get(get);
140       if (res.size() < 3) {
141         LOG.info("Rows not available");
142         Thread.sleep(SLEEP_TIME);
143       } else {
144         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
145         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
146         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1);
147         break;
148       }
149     }
150     // place a version delete marker (delete last version)
151     Delete d = new Delete(row);
152     d.deleteColumn(famName, row, t);
153     htable1.delete(d);
154 
155     get = new Get(row);
156     get.setMaxVersions();
157     for (int i = 0; i < NB_RETRIES; i++) {
158       if (i==NB_RETRIES-1) {
159         fail("Waited too much time for put replication");
160       }
161       Result res = htable2.get(get);
162       if (res.size() > 2) {
163         LOG.info("Version not deleted");
164         Thread.sleep(SLEEP_TIME);
165       } else {
166         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
167         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
168         break;
169       }
170     }
171 
172     // place a column delete marker
173     d = new Delete(row);
174     d.deleteColumns(famName, row, t+2);
175     htable1.delete(d);
176 
177     // now *both* of the remaining version should be deleted
178     // at the replica
179     get = new Get(row);
180     for (int i = 0; i < NB_RETRIES; i++) {
181       if (i==NB_RETRIES-1) {
182         fail("Waited too much time for del replication");
183       }
184       Result res = htable2.get(get);
185       if (res.size() >= 1) {
186         LOG.info("Rows not deleted");
187         Thread.sleep(SLEEP_TIME);
188       } else {
189         break;
190       }
191     }
192   }
193 
194   /**
195    * Add a row, check it's replicated, delete it, check's gone
196    * @throws Exception
197    */
198   @Test(timeout=300000)
199   public void testSimplePutDelete() throws Exception {
200     LOG.info("testSimplePutDelete");
201     Put put = new Put(row);
202     put.add(famName, row, row);
203 
204     htable1 = new HTable(conf1, tableName);
205     htable1.put(put);
206 
207     Get get = new Get(row);
208     for (int i = 0; i < NB_RETRIES; i++) {
209       if (i==NB_RETRIES-1) {
210         fail("Waited too much time for put replication");
211       }
212       Result res = htable2.get(get);
213       if (res.size() == 0) {
214         LOG.info("Row not available");
215         Thread.sleep(SLEEP_TIME);
216       } else {
217         assertArrayEquals(res.value(), row);
218         break;
219       }
220     }
221 
222     Delete del = new Delete(row);
223     htable1.delete(del);
224 
225     get = new Get(row);
226     for (int i = 0; i < NB_RETRIES; i++) {
227       if (i==NB_RETRIES-1) {
228         fail("Waited too much time for del replication");
229       }
230       Result res = htable2.get(get);
231       if (res.size() >= 1) {
232         LOG.info("Row not deleted");
233         Thread.sleep(SLEEP_TIME);
234       } else {
235         break;
236       }
237     }
238   }
239 
240   /**
241    * Try a small batch upload using the write buffer, check it's replicated
242    * @throws Exception
243    */
244   @Test(timeout=300000)
245   public void testSmallBatch() throws Exception {
246     LOG.info("testSmallBatch");
247     Put put;
248     // normal Batch tests
249     htable1.setAutoFlush(false, true);
250     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
251       put = new Put(Bytes.toBytes(i));
252       put.add(famName, row, row);
253       htable1.put(put);
254     }
255     htable1.flushCommits();
256 
257     Scan scan = new Scan();
258 
259     ResultScanner scanner1 = htable1.getScanner(scan);
260     Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
261     scanner1.close();
262     assertEquals(NB_ROWS_IN_BATCH, res1.length);
263 
264     for (int i = 0; i < NB_RETRIES; i++) {
265       scan = new Scan();
266       if (i==NB_RETRIES-1) {
267         fail("Waited too much time for normal batch replication");
268       }
269       ResultScanner scanner = htable2.getScanner(scan);
270       Result[] res = scanner.next(NB_ROWS_IN_BATCH);
271       scanner.close();
272       if (res.length != NB_ROWS_IN_BATCH) {
273         LOG.info("Only got " + res.length + " rows");
274         Thread.sleep(SLEEP_TIME);
275       } else {
276         break;
277       }
278     }
279   }
280 
281   /**
282    * Test disable/enable replication, trying to insert, make sure nothing's
283    * replicated, enable it, the insert should be replicated
284    *
285    * @throws Exception
286    */
287   @Test(timeout = 300000)
288   public void testDisableEnable() throws Exception {
289 
290     // Test disabling replication
291     admin.disablePeer("2");
292 
293     byte[] rowkey = Bytes.toBytes("disable enable");
294     Put put = new Put(rowkey);
295     put.add(famName, row, row);
296     htable1.put(put);
297 
298     Get get = new Get(rowkey);
299     for (int i = 0; i < NB_RETRIES; i++) {
300       Result res = htable2.get(get);
301       if (res.size() >= 1) {
302         fail("Replication wasn't disabled");
303       } else {
304         LOG.info("Row not replicated, let's wait a bit more...");
305         Thread.sleep(SLEEP_TIME);
306       }
307     }
308 
309     // Test enable replication
310     admin.enablePeer("2");
311 
312     for (int i = 0; i < NB_RETRIES; i++) {
313       Result res = htable2.get(get);
314       if (res.size() == 0) {
315         LOG.info("Row not available");
316         Thread.sleep(SLEEP_TIME);
317       } else {
318         assertArrayEquals(res.value(), row);
319         return;
320       }
321     }
322     fail("Waited too much time for put replication");
323   }
324 
325   /**
326    * Integration test for TestReplicationAdmin, removes and re-add a peer
327    * cluster
328    *
329    * @throws Exception
330    */
331   @Test(timeout=300000)
332   public void testAddAndRemoveClusters() throws Exception {
333     LOG.info("testAddAndRemoveClusters");
334     admin.removePeer("2");
335     Thread.sleep(SLEEP_TIME);
336     byte[] rowKey = Bytes.toBytes("Won't be replicated");
337     Put put = new Put(rowKey);
338     put.add(famName, row, row);
339     htable1.put(put);
340 
341     Get get = new Get(rowKey);
342     for (int i = 0; i < NB_RETRIES; i++) {
343       if (i == NB_RETRIES-1) {
344         break;
345       }
346       Result res = htable2.get(get);
347       if (res.size() >= 1) {
348         fail("Not supposed to be replicated");
349       } else {
350         LOG.info("Row not replicated, let's wait a bit more...");
351         Thread.sleep(SLEEP_TIME);
352       }
353     }
354 
355     admin.addPeer("2", utility2.getClusterKey());
356     Thread.sleep(SLEEP_TIME);
357     rowKey = Bytes.toBytes("do rep");
358     put = new Put(rowKey);
359     put.add(famName, row, row);
360     LOG.info("Adding new row");
361     htable1.put(put);
362 
363     get = new Get(rowKey);
364     for (int i = 0; i < NB_RETRIES; i++) {
365       if (i==NB_RETRIES-1) {
366         fail("Waited too much time for put replication");
367       }
368       Result res = htable2.get(get);
369       if (res.size() == 0) {
370         LOG.info("Row not available");
371         Thread.sleep(SLEEP_TIME*i);
372       } else {
373         assertArrayEquals(res.value(), row);
374         break;
375       }
376     }
377   }
378 
379 
380   /**
381    * Do a more intense version testSmallBatch, one  that will trigger
382    * hlog rolling and other non-trivial code paths
383    * @throws Exception
384    */
385   @Test(timeout=300000)
386   public void loadTesting() throws Exception {
387     htable1.setWriteBufferSize(1024);
388     htable1.setAutoFlush(false, true);
389     for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
390       Put put = new Put(Bytes.toBytes(i));
391       put.add(famName, row, row);
392       htable1.put(put);
393     }
394     htable1.flushCommits();
395 
396     Scan scan = new Scan();
397 
398     ResultScanner scanner = htable1.getScanner(scan);
399     Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
400     scanner.close();
401 
402     assertEquals(NB_ROWS_IN_BIG_BATCH, res.length);
403 
404 
405     long start = System.currentTimeMillis();
406     for (int i = 0; i < NB_RETRIES; i++) {
407       scan = new Scan();
408 
409       scanner = htable2.getScanner(scan);
410       res = scanner.next(NB_ROWS_IN_BIG_BATCH);
411       scanner.close();
412       if (res.length != NB_ROWS_IN_BIG_BATCH) {
413         if (i == NB_RETRIES - 1) {
414           int lastRow = -1;
415           for (Result result : res) {
416             int currentRow = Bytes.toInt(result.getRow());
417             for (int row = lastRow+1; row < currentRow; row++) {
418               LOG.error("Row missing: " + row);
419             }
420             lastRow = currentRow;
421           }
422           LOG.error("Last row: " + lastRow);
423           fail("Waited too much time for normal batch replication, " +
424             res.length + " instead of " + NB_ROWS_IN_BIG_BATCH + "; waited=" +
425             (System.currentTimeMillis() - start) + "ms");
426         } else {
427           LOG.info("Only got " + res.length + " rows");
428           Thread.sleep(SLEEP_TIME);
429         }
430       } else {
431         break;
432       }
433     }
434   }
435 
436   /**
437    * Do a small loading into a table, make sure the data is really the same,
438    * then run the VerifyReplication job to check the results. Do a second
439    * comparison where all the cells are different.
440    * @throws Exception
441    */
442   @Test(timeout=300000)
443   public void testVerifyRepJob() throws Exception {
444     // Populate the tables, at the same time it guarantees that the tables are
445     // identical since it does the check
446     testSmallBatch();
447 
448     String[] args = new String[] {"2", Bytes.toString(tableName)};
449     Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
450     if (job == null) {
451       fail("Job wasn't created, see the log");
452     }
453     if (!job.waitForCompletion(true)) {
454       fail("Job failed, see the log");
455     }
456     assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
457         findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
458     assertEquals(0, job.getCounters().
459         findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
460 
461     Scan scan = new Scan();
462     ResultScanner rs = htable2.getScanner(scan);
463     Put put = null;
464     for (Result result : rs) {
465       put = new Put(result.getRow());
466       Cell firstVal = result.rawCells()[0];
467       put.add(CellUtil.cloneFamily(firstVal),
468           CellUtil.cloneQualifier(firstVal), Bytes.toBytes("diff data"));
469       htable2.put(put);
470     }
471     Delete delete = new Delete(put.getRow());
472     htable2.delete(delete);
473     job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
474     if (job == null) {
475       fail("Job wasn't created, see the log");
476     }
477     if (!job.waitForCompletion(true)) {
478       fail("Job failed, see the log");
479     }
480     assertEquals(0, job.getCounters().
481         findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
482     assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
483         findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
484   }
485 
486   /**
487    * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out
488    * the compaction WALEdit
489    * @throws Exception
490    */
491   @Test(timeout=300000)
492   public void testCompactionWALEdits() throws Exception {
493     WALProtos.CompactionDescriptor compactionDescriptor =
494         WALProtos.CompactionDescriptor.getDefaultInstance();
495     HRegionInfo hri = new HRegionInfo(htable1.getName(),
496       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
497     WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
498     Replication.scopeWALEdits(htable1.getTableDescriptor(), new HLogKey(), edit);
499   }
500 
501   /**
502    * Test for HBASE-8663
503    * Create two new Tables with colfamilies enabled for replication then run
504    * ReplicationAdmin.listReplicated(). Finally verify the table:colfamilies. Note:
505    * TestReplicationAdmin is a better place for this testing but it would need mocks.
506    * @throws Exception
507    */
508   @Test(timeout = 300000)
509   public void testVerifyListReplicatedTable() throws Exception {
510 	LOG.info("testVerifyListReplicatedTable");
511 
512     final String tName = "VerifyListReplicated_";
513     final String colFam = "cf1";
514     final int numOfTables = 3;
515 
516     HBaseAdmin hadmin = new HBaseAdmin(conf1);
517 
518     // Create Tables
519     for (int i = 0; i < numOfTables; i++) {
520       HTableDescriptor ht = new HTableDescriptor(TableName.valueOf(tName + i));
521       HColumnDescriptor cfd = new HColumnDescriptor(colFam);
522       cfd.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
523       ht.addFamily(cfd);
524       hadmin.createTable(ht);
525     }
526 
527     // verify the result
528     List<HashMap<String, String>> replicationColFams = admin.listReplicated();
529     int[] match = new int[numOfTables]; // array of 3 with init value of zero
530 
531     for (int i = 0; i < replicationColFams.size(); i++) {
532       HashMap<String, String> replicationEntry = replicationColFams.get(i);
533       String tn = replicationEntry.get(ReplicationAdmin.TNAME);
534       if ((tn.startsWith(tName)) && replicationEntry.get(ReplicationAdmin.CFNAME).equals(colFam)) {
535         int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit
536         match[m]++; // should only increase once
537       }
538     }
539 
540     // check the matching result
541     for (int i = 0; i < match.length; i++) {
542       assertTrue("listReplicated() does not match table " + i, (match[i] == 1));
543     }
544 
545     // drop tables
546     for (int i = 0; i < numOfTables; i++) {
547       String ht = tName + i;
548       hadmin.disableTable(ht);
549       hadmin.deleteTable(ht);
550     }
551 
552     hadmin.close();
553   }
554 
555 }