View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  
28  import org.apache.hadoop.hbase.util.ByteStringer;
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.MediumTests;
38  import org.apache.hadoop.hbase.Stoppable;
39  import org.apache.hadoop.hbase.client.Get;
40  import org.apache.hadoop.hbase.client.HTable;
41  import org.apache.hadoop.hbase.client.Result;
42  import org.apache.hadoop.hbase.client.ResultScanner;
43  import org.apache.hadoop.hbase.client.Scan;
44  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
45  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID;
46  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.junit.AfterClass;
49  import org.junit.Before;
50  import org.junit.BeforeClass;
51  import org.junit.Test;
52  import org.junit.experimental.categories.Category;
53  
54  @Category(MediumTests.class)
55  public class TestReplicationSink {
56    private static final Log LOG = LogFactory.getLog(TestReplicationSink.class);
57    private static final int BATCH_SIZE = 10;
58  
59    private final static HBaseTestingUtility TEST_UTIL =
60        new HBaseTestingUtility();
61  
62    private static ReplicationSink SINK;
63  
64    private static final byte[] TABLE_NAME1 =
65        Bytes.toBytes("table1");
66    private static final byte[] TABLE_NAME2 =
67        Bytes.toBytes("table2");
68  
69    private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
70    private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
71  
72    private static HTable table1;
73    private static Stoppable STOPPABLE = new Stoppable() {
74      final AtomicBoolean stop = new AtomicBoolean(false);
75  
76      @Override
77      public boolean isStopped() {
78        return this.stop.get();
79      }
80  
81      @Override
82      public void stop(String why) {
83        LOG.info("STOPPING BECAUSE: " + why);
84        this.stop.set(true);
85      }
86      
87    };
88  
89    private static HTable table2;
90  
91     /**
92     * @throws java.lang.Exception
93     */
94    @BeforeClass
95    public static void setUpBeforeClass() throws Exception {
96      TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
97      TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY,
98          HConstants.REPLICATION_ENABLE_DEFAULT);
99      TEST_UTIL.startMiniCluster(3);
100     SINK =
101       new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
102     table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
103     table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
104   }
105 
106   /**
107    * @throws java.lang.Exception
108    */
109   @AfterClass
110   public static void tearDownAfterClass() throws Exception {
111     STOPPABLE.stop("Shutting down");
112     TEST_UTIL.shutdownMiniCluster();
113   }
114 
115   /**
116    * @throws java.lang.Exception
117    */
118   @Before
119   public void setUp() throws Exception {
120     table1 = TEST_UTIL.truncateTable(TABLE_NAME1);
121     table2 = TEST_UTIL.truncateTable(TABLE_NAME2);
122   }
123 
124   /**
125    * Insert a whole batch of entries
126    * @throws Exception
127    */
128   @Test
129   public void testBatchSink() throws Exception {
130     List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE);
131     List<Cell> cells = new ArrayList<Cell>();
132     for(int i = 0; i < BATCH_SIZE; i++) {
133       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
134     }
135     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
136     Scan scan = new Scan();
137     ResultScanner scanRes = table1.getScanner(scan);
138     assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
139   }
140 
141   /**
142    * Insert a mix of puts and deletes
143    * @throws Exception
144    */
145   @Test
146   public void testMixedPutDelete() throws Exception {
147     List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE/2);
148     List<Cell> cells = new ArrayList<Cell>();
149     for(int i = 0; i < BATCH_SIZE/2; i++) {
150       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
151     }
152     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells));
153 
154     entries = new ArrayList<WALEntry>(BATCH_SIZE);
155     cells = new ArrayList<Cell>();
156     for(int i = 0; i < BATCH_SIZE; i++) {
157       entries.add(createEntry(TABLE_NAME1, i,
158           i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn, cells));
159     }
160 
161     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
162     Scan scan = new Scan();
163     ResultScanner scanRes = table1.getScanner(scan);
164     assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
165   }
166 
167   /**
168    * Insert to 2 different tables
169    * @throws Exception
170    */
171   @Test
172   public void testMixedPutTables() throws Exception {
173     List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE/2);
174     List<Cell> cells = new ArrayList<Cell>();
175     for(int i = 0; i < BATCH_SIZE; i++) {
176       entries.add(createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
177               i, KeyValue.Type.Put, cells));
178     }
179 
180     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
181     Scan scan = new Scan();
182     ResultScanner scanRes = table2.getScanner(scan);
183     for(Result res : scanRes) {
184       assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
185     }
186   }
187 
188   /**
189    * Insert then do different types of deletes
190    * @throws Exception
191    */
192   @Test
193   public void testMixedDeletes() throws Exception {
194     List<WALEntry> entries = new ArrayList<WALEntry>(3);
195     List<Cell> cells = new ArrayList<Cell>();
196     for(int i = 0; i < 3; i++) {
197       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
198     }
199     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
200     entries = new ArrayList<WALEntry>(3);
201     cells = new ArrayList<Cell>();
202     entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells));
203     entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
204     entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells));
205 
206     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
207 
208     Scan scan = new Scan();
209     ResultScanner scanRes = table1.getScanner(scan);
210     assertEquals(0, scanRes.next(3).length);
211   }
212 
213   /**
214    * Puts are buffered, but this tests when a delete (not-buffered) is applied
215    * before the actual Put that creates it.
216    * @throws Exception
217    */
218   @Test
219   public void testApplyDeleteBeforePut() throws Exception {
220     List<WALEntry> entries = new ArrayList<WALEntry>(5);
221     List<Cell> cells = new ArrayList<Cell>();
222     for(int i = 0; i < 2; i++) {
223       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
224     }
225     entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
226     for(int i = 3; i < 5; i++) {
227       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
228     }
229     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
230     Get get = new Get(Bytes.toBytes(1));
231     Result res = table1.get(get);
232     assertEquals(0, res.size());
233   }
234 
235   private WALEntry createEntry(byte [] table, int row,  KeyValue.Type type, List<Cell> cells) {
236     byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
237     byte[] rowBytes = Bytes.toBytes(row);
238     // Just make sure we don't get the same ts for two consecutive rows with
239     // same key
240     try {
241       Thread.sleep(1);
242     } catch (InterruptedException e) {
243       LOG.info("Was interrupted while sleep, meh", e);
244     }
245     final long now = System.currentTimeMillis();
246     KeyValue kv = null;
247     if(type.getCode() == KeyValue.Type.Put.getCode()) {
248       kv = new KeyValue(rowBytes, fam, fam, now,
249           KeyValue.Type.Put, Bytes.toBytes(row));
250     } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
251         kv = new KeyValue(rowBytes, fam, fam,
252             now, KeyValue.Type.DeleteColumn);
253     } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
254         kv = new KeyValue(rowBytes, fam, null,
255             now, KeyValue.Type.DeleteFamily);
256     }
257     WALEntry.Builder builder = WALEntry.newBuilder();
258     builder.setAssociatedCellCount(1);
259     WALKey.Builder keyBuilder = WALKey.newBuilder();
260     UUID.Builder uuidBuilder = UUID.newBuilder();
261     uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits());
262     uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits());
263     keyBuilder.setClusterId(uuidBuilder.build());
264     keyBuilder.setTableName(ByteStringer.wrap(table));
265     keyBuilder.setWriteTime(now);
266     keyBuilder.setEncodedRegionName(ByteStringer.wrap(HConstants.EMPTY_BYTE_ARRAY));
267     keyBuilder.setLogSequenceNumber(-1);
268     builder.setKey(keyBuilder.build());
269     cells.add(kv);
270 
271     return builder.build();
272   }
273 
274 }