View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.replication;
19  
20  import static org.junit.Assert.assertEquals;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.*;
28  import org.apache.hadoop.hbase.client.Delete;
29  import org.apache.hadoop.hbase.client.HBaseAdmin;
30  import org.apache.hadoop.hbase.client.HTable;
31  import org.apache.hadoop.hbase.client.Put;
32  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
35  import org.junit.Before;
36  import org.junit.Test;
37  import org.junit.experimental.categories.Category;
38  
39  @Category(LargeTests.class)
40  public class TestReplicationSyncUpTool extends TestReplicationBase {
41  
42    private static final Log LOG = LogFactory.getLog(TestReplicationSyncUpTool.class);
43  
44    private static final byte[] t1_su = Bytes.toBytes("t1_syncup");
45    private static final byte[] t2_su = Bytes.toBytes("t2_syncup");
46  
47    private static final byte[] famName = Bytes.toBytes("cf1");
48    private static final byte[] qualName = Bytes.toBytes("q1");
49  
50    private static final byte[] noRepfamName = Bytes.toBytes("norep");
51  
52    private HTableDescriptor t1_syncupSource, t1_syncupTarget;
53    private HTableDescriptor t2_syncupSource, t2_syncupTarget;
54  
55    private HTable ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
56  
57    @Before
58    public void setUp() throws Exception {
59  
60      HColumnDescriptor fam;
61  
62      t1_syncupSource = new HTableDescriptor(TableName.valueOf(t1_su));
63      fam = new HColumnDescriptor(famName);
64      fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
65      t1_syncupSource.addFamily(fam);
66      fam = new HColumnDescriptor(noRepfamName);
67      t1_syncupSource.addFamily(fam);
68  
69      t1_syncupTarget = new HTableDescriptor(TableName.valueOf(t1_su));
70      fam = new HColumnDescriptor(famName);
71      t1_syncupTarget.addFamily(fam);
72      fam = new HColumnDescriptor(noRepfamName);
73      t1_syncupTarget.addFamily(fam);
74  
75      t2_syncupSource = new HTableDescriptor(TableName.valueOf(t2_su));
76      fam = new HColumnDescriptor(famName);
77      fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
78      t2_syncupSource.addFamily(fam);
79      fam = new HColumnDescriptor(noRepfamName);
80      t2_syncupSource.addFamily(fam);
81  
82      t2_syncupTarget = new HTableDescriptor(TableName.valueOf(t2_su));
83      fam = new HColumnDescriptor(famName);
84      t2_syncupTarget.addFamily(fam);
85      fam = new HColumnDescriptor(noRepfamName);
86      t2_syncupTarget.addFamily(fam);
87  
88    }
89  
90    /**
91     * Add a row to a table in each cluster, check it's replicated, delete it,
92     * check's gone Also check the puts and deletes are not replicated back to
93     * the originating cluster.
94     */
95    @Test(timeout = 300000)
96    public void testSyncUpTool() throws Exception {
97  
98      /**
99       * Set up Replication: on Master and one Slave
100      * Table: t1_syncup and t2_syncup
101      * columnfamily:
102      *    'cf1'  : replicated
103      *    'norep': not replicated
104      */
105     setupReplication();
106 
107     /**
108      * at Master:
109      * t1_syncup: put 100 rows into cf1, and 1 rows into norep
110      * t2_syncup: put 200 rows into cf1, and 1 rows into norep
111      *
112      * verify correctly replicated to slave
113      */
114     putAndReplicateRows();
115 
116     /**
117      * Verify delete works
118      *
119      * step 1: stop hbase on Slave
120      *
121      * step 2: at Master:
122      *  t1_syncup: delete 50 rows  from cf1
123      *  t2_syncup: delete 100 rows from cf1
124      *  no change on 'norep'
125      *
126      * step 3: stop hbase on master, restart hbase on Slave
127      *
128      * step 4: verify Slave still have the rows before delete
129      *      t1_syncup: 100 rows from cf1
130      *      t2_syncup: 200 rows from cf1
131      *
132      * step 5: run syncup tool on Master
133      *
134      * step 6: verify that delete show up on Slave
135      *      t1_syncup: 50 rows from cf1
136      *      t2_syncup: 100 rows from cf1
137      *
138      * verify correctly replicated to Slave
139      */
140     mimicSyncUpAfterDelete();
141 
142     /**
143      * Verify put works
144      *
145      * step 1: stop hbase on Slave
146      *
147      * step 2: at Master:
148      *  t1_syncup: put 100 rows  from cf1
149      *  t2_syncup: put 200 rows  from cf1
150      *  and put another row on 'norep'
151      *  ATTN: put to 'cf1' will overwrite existing rows, so end count will
152      *        be 100 and 200 respectively
153      *      put to 'norep' will add a new row.
154      *
155      * step 3: stop hbase on master, restart hbase on Slave
156      *
157      * step 4: verify Slave still has the rows before put
158      *      t1_syncup: 50 rows from cf1
159      *      t2_syncup: 100 rows from cf1
160      *
161      * step 5: run syncup tool on Master
162      *
163      * step 6: verify that put show up on Slave
164      *         and 'norep' does not
165      *      t1_syncup: 100 rows from cf1
166      *      t2_syncup: 200 rows from cf1
167      *
168      * verify correctly replicated to Slave
169      */
170     mimicSyncUpAfterPut();
171 
172   }
173 
174   private void setupReplication() throws Exception {
175     ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
176     ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
177 
178     HBaseAdmin ha = new HBaseAdmin(conf1);
179     ha.createTable(t1_syncupSource);
180     ha.createTable(t2_syncupSource);
181     ha.close();
182 
183     ha = new HBaseAdmin(conf2);
184     ha.createTable(t1_syncupTarget);
185     ha.createTable(t2_syncupTarget);
186     ha.close();
187 
188     // Get HTable from Master
189     ht1Source = new HTable(conf1, t1_su);
190     ht1Source.setWriteBufferSize(1024);
191     ht2Source = new HTable(conf1, t2_su);
192     ht1Source.setWriteBufferSize(1024);
193 
194     // Get HTable from Peer1
195     ht1TargetAtPeer1 = new HTable(conf2, t1_su);
196     ht1TargetAtPeer1.setWriteBufferSize(1024);
197     ht2TargetAtPeer1 = new HTable(conf2, t2_su);
198     ht2TargetAtPeer1.setWriteBufferSize(1024);
199 
200     /**
201      * set M-S : Master: utility1 Slave1: utility2
202      */
203     admin1.addPeer("1", utility2.getClusterKey());
204 
205     admin1.close();
206     admin2.close();
207   }
208 
209   private void putAndReplicateRows() throws Exception {
210     LOG.debug("putAndReplicateRows");
211     // add rows to Master cluster,
212     Put p;
213 
214     // 100 + 1 row to t1_syncup
215     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
216       p = new Put(Bytes.toBytes("row" + i));
217       p.add(famName, qualName, Bytes.toBytes("val" + i));
218       ht1Source.put(p);
219     }
220     p = new Put(Bytes.toBytes("row" + 9999));
221     p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
222     ht1Source.put(p);
223 
224     // 200 + 1 row to t2_syncup
225     for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
226       p = new Put(Bytes.toBytes("row" + i));
227       p.add(famName, qualName, Bytes.toBytes("val" + i));
228       ht2Source.put(p);
229     }
230     p = new Put(Bytes.toBytes("row" + 9999));
231     p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
232     ht2Source.put(p);
233 
234     // ensure replication completed
235     Thread.sleep(SLEEP_TIME);
236     int rowCount_ht1Source = utility1.countRows(ht1Source);
237     for (int i = 0; i < NB_RETRIES; i++) {
238       int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
239       if (i==NB_RETRIES-1) {
240         assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source - 1,
241             rowCount_ht1TargetAtPeer1);
242       }
243       if (rowCount_ht1Source - 1 == rowCount_ht1TargetAtPeer1) {
244         break;
245       }
246       Thread.sleep(SLEEP_TIME);
247     }
248 
249     int rowCount_ht2Source = utility1.countRows(ht2Source);
250     for (int i = 0; i < NB_RETRIES; i++) {
251       int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
252       if (i==NB_RETRIES-1) {
253         assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source - 1,
254             rowCount_ht2TargetAtPeer1);
255       }
256       if (rowCount_ht2Source - 1 == rowCount_ht2TargetAtPeer1) {
257         break;
258       }
259       Thread.sleep(SLEEP_TIME);
260     }
261   }
262 
263   private void mimicSyncUpAfterDelete() throws Exception {
264     LOG.debug("mimicSyncUpAfterDelete");
265     utility2.shutdownMiniHBaseCluster();
266 
267     List<Delete> list = new ArrayList<Delete>();
268     // delete half of the rows
269     for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) {
270       String rowKey = "row" + i;
271       Delete del = new Delete(rowKey.getBytes());
272       list.add(del);
273     }
274     ht1Source.delete(list);
275 
276     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
277       String rowKey = "row" + i;
278       Delete del = new Delete(rowKey.getBytes());
279       list.add(del);
280     }
281     ht2Source.delete(list);
282 
283     int rowCount_ht1Source = utility1.countRows(ht1Source);
284     assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51,
285       rowCount_ht1Source);
286 
287     int rowCount_ht2Source = utility1.countRows(ht2Source);
288     assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam",
289       101, rowCount_ht2Source);
290 
291     utility1.shutdownMiniHBaseCluster();
292     utility2.restartHBaseCluster(1);
293 
294     Thread.sleep(SLEEP_TIME);
295 
296     // before sync up
297     int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
298     int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
299     assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
300     assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
301 
302     // After sync up
303     for (int i = 0; i < NB_RETRIES; i++) {
304       syncUp(utility1);
305       rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
306       rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
307       if (i == NB_RETRIES - 1) {
308         if (rowCount_ht1TargetAtPeer1 != 50 || rowCount_ht2TargetAtPeer1 != 100) {
309           // syncUP still failed. Let's look at the source in case anything wrong there
310           utility1.restartHBaseCluster(1);
311           rowCount_ht1Source = utility1.countRows(ht1Source);
312           LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source);
313           rowCount_ht2Source = utility1.countRows(ht2Source);
314           LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source);
315         }
316         assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
317           rowCount_ht1TargetAtPeer1);
318         assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
319           rowCount_ht2TargetAtPeer1);
320       }
321       if (rowCount_ht1TargetAtPeer1 == 50 && rowCount_ht2TargetAtPeer1 == 100) {
322         LOG.info("SyncUpAfterDelete succeeded at retry = " + i);
323         break;
324       } else {
325         LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
326             + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
327             + rowCount_ht2TargetAtPeer1);
328       }
329       Thread.sleep(SLEEP_TIME);
330     }
331   }
332 
333   private void mimicSyncUpAfterPut() throws Exception {
334     LOG.debug("mimicSyncUpAfterPut");
335     utility1.restartHBaseCluster(1);
336     utility2.shutdownMiniHBaseCluster();
337 
338     Put p;
339     // another 100 + 1 row to t1_syncup
340     // we should see 100 + 2 rows now
341     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
342       p = new Put(Bytes.toBytes("row" + i));
343       p.add(famName, qualName, Bytes.toBytes("val" + i));
344       ht1Source.put(p);
345     }
346     p = new Put(Bytes.toBytes("row" + 9998));
347     p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
348     ht1Source.put(p);
349 
350     // another 200 + 1 row to t1_syncup
351     // we should see 200 + 2 rows now
352     for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
353       p = new Put(Bytes.toBytes("row" + i));
354       p.add(famName, qualName, Bytes.toBytes("val" + i));
355       ht2Source.put(p);
356     }
357     p = new Put(Bytes.toBytes("row" + 9998));
358     p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
359     ht2Source.put(p);
360 
361     int rowCount_ht1Source = utility1.countRows(ht1Source);
362     assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source);
363     int rowCount_ht2Source = utility1.countRows(ht2Source);
364     assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
365 
366     utility1.shutdownMiniHBaseCluster();
367     utility2.restartHBaseCluster(1);
368 
369     Thread.sleep(SLEEP_TIME);
370 
371     // before sync up
372     int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
373     int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
374     assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50,
375       rowCount_ht1TargetAtPeer1);
376     assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
377       rowCount_ht2TargetAtPeer1);
378 
379     // after syun up
380     for (int i = 0; i < NB_RETRIES; i++) {
381       syncUp(utility1);
382       rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
383       rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
384       if (i == NB_RETRIES - 1) {
385         if (rowCount_ht1TargetAtPeer1 != 100 || rowCount_ht2TargetAtPeer1 != 200) {
386           // syncUP still failed. Let's look at the source in case anything wrong there
387           utility1.restartHBaseCluster(1);
388           rowCount_ht1Source = utility1.countRows(ht1Source);
389           LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
390           rowCount_ht2Source = utility1.countRows(ht2Source);
391           LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source);
392         }
393         assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
394           rowCount_ht1TargetAtPeer1);
395         assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
396           rowCount_ht2TargetAtPeer1);
397       }
398       if (rowCount_ht1TargetAtPeer1 == 100 && rowCount_ht2TargetAtPeer1 == 200) {
399         LOG.info("SyncUpAfterPut succeeded at retry = " + i);
400         break;
401       } else {
402         LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
403             + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
404             + rowCount_ht2TargetAtPeer1);
405       }
406       Thread.sleep(SLEEP_TIME);
407     }
408   }
409 
410   private void syncUp(HBaseTestingUtility ut) throws Exception {
411     ReplicationSyncUp.setConfigure(ut.getConfiguration());
412     String[] arguments = new String[] { null };
413     new ReplicationSyncUp().run(arguments);
414   }
415 
416 }