1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.concurrent.atomic.AtomicBoolean;
27
28 import org.apache.hadoop.hbase.client.Table;
29 import org.apache.hadoop.hbase.util.ByteStringer;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.CellUtil;
35 import org.apache.hadoop.hbase.HBaseTestingUtility;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.KeyValue;
38 import org.apache.hadoop.hbase.testclassification.MediumTests;
39 import org.apache.hadoop.hbase.Stoppable;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.client.Get;
42 import org.apache.hadoop.hbase.client.Result;
43 import org.apache.hadoop.hbase.client.ResultScanner;
44 import org.apache.hadoop.hbase.client.Scan;
45 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
46 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID;
47 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.junit.AfterClass;
50 import org.junit.Before;
51 import org.junit.BeforeClass;
52 import org.junit.Test;
53 import org.junit.experimental.categories.Category;
54
55 @Category(MediumTests.class)
56 public class TestReplicationSink {
57 private static final Log LOG = LogFactory.getLog(TestReplicationSink.class);
58 private static final int BATCH_SIZE = 10;
59
60 private final static HBaseTestingUtility TEST_UTIL =
61 new HBaseTestingUtility();
62
63 private static ReplicationSink SINK;
64
65 private static final TableName TABLE_NAME1 =
66 TableName.valueOf("table1");
67 private static final TableName TABLE_NAME2 =
68 TableName.valueOf("table2");
69
70 private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
71 private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
72
73 private static Table table1;
74 private static Stoppable STOPPABLE = new Stoppable() {
75 final AtomicBoolean stop = new AtomicBoolean(false);
76
77 @Override
78 public boolean isStopped() {
79 return this.stop.get();
80 }
81
82 @Override
83 public void stop(String why) {
84 LOG.info("STOPPING BECAUSE: " + why);
85 this.stop.set(true);
86 }
87
88 };
89
90 private static Table table2;
91
92
93
94
95 @BeforeClass
96 public static void setUpBeforeClass() throws Exception {
97 TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
98 TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY,
99 HConstants.REPLICATION_ENABLE_DEFAULT);
100 TEST_UTIL.startMiniCluster(3);
101 SINK =
102 new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
103 table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
104 table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
105 }
106
107
108
109
110 @AfterClass
111 public static void tearDownAfterClass() throws Exception {
112 STOPPABLE.stop("Shutting down");
113 TEST_UTIL.shutdownMiniCluster();
114 }
115
116
117
118
119 @Before
120 public void setUp() throws Exception {
121 table1 = TEST_UTIL.deleteTableData(TABLE_NAME1);
122 table2 = TEST_UTIL.deleteTableData(TABLE_NAME2);
123 }
124
125
126
127
128
129 @Test
130 public void testBatchSink() throws Exception {
131 List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE);
132 List<Cell> cells = new ArrayList<Cell>();
133 for(int i = 0; i < BATCH_SIZE; i++) {
134 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
135 }
136 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
137 Scan scan = new Scan();
138 ResultScanner scanRes = table1.getScanner(scan);
139 assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
140 }
141
142
143
144
145
146 @Test
147 public void testMixedPutDelete() throws Exception {
148 List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE/2);
149 List<Cell> cells = new ArrayList<Cell>();
150 for(int i = 0; i < BATCH_SIZE/2; i++) {
151 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
152 }
153 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells));
154
155 entries = new ArrayList<WALEntry>(BATCH_SIZE);
156 cells = new ArrayList<Cell>();
157 for(int i = 0; i < BATCH_SIZE; i++) {
158 entries.add(createEntry(TABLE_NAME1, i,
159 i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn, cells));
160 }
161
162 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
163 Scan scan = new Scan();
164 ResultScanner scanRes = table1.getScanner(scan);
165 assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
166 }
167
168
169
170
171
172 @Test
173 public void testMixedPutTables() throws Exception {
174 List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE/2);
175 List<Cell> cells = new ArrayList<Cell>();
176 for(int i = 0; i < BATCH_SIZE; i++) {
177 entries.add(createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
178 i, KeyValue.Type.Put, cells));
179 }
180
181 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
182 Scan scan = new Scan();
183 ResultScanner scanRes = table2.getScanner(scan);
184 for(Result res : scanRes) {
185 assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
186 }
187 }
188
189
190
191
192
193 @Test
194 public void testMixedDeletes() throws Exception {
195 List<WALEntry> entries = new ArrayList<WALEntry>(3);
196 List<Cell> cells = new ArrayList<Cell>();
197 for(int i = 0; i < 3; i++) {
198 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
199 }
200 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
201 entries = new ArrayList<WALEntry>(3);
202 cells = new ArrayList<Cell>();
203 entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells));
204 entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
205 entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells));
206
207 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
208
209 Scan scan = new Scan();
210 ResultScanner scanRes = table1.getScanner(scan);
211 assertEquals(0, scanRes.next(3).length);
212 }
213
214
215
216
217
218
219 @Test
220 public void testApplyDeleteBeforePut() throws Exception {
221 List<WALEntry> entries = new ArrayList<WALEntry>(5);
222 List<Cell> cells = new ArrayList<Cell>();
223 for(int i = 0; i < 2; i++) {
224 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
225 }
226 entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
227 for(int i = 3; i < 5; i++) {
228 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
229 }
230 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
231 Get get = new Get(Bytes.toBytes(1));
232 Result res = table1.get(get);
233 assertEquals(0, res.size());
234 }
235
236 private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List<Cell> cells) {
237 byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
238 byte[] rowBytes = Bytes.toBytes(row);
239
240
241 try {
242 Thread.sleep(1);
243 } catch (InterruptedException e) {
244 LOG.info("Was interrupted while sleep, meh", e);
245 }
246 final long now = System.currentTimeMillis();
247 KeyValue kv = null;
248 if(type.getCode() == KeyValue.Type.Put.getCode()) {
249 kv = new KeyValue(rowBytes, fam, fam, now,
250 KeyValue.Type.Put, Bytes.toBytes(row));
251 } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
252 kv = new KeyValue(rowBytes, fam, fam,
253 now, KeyValue.Type.DeleteColumn);
254 } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
255 kv = new KeyValue(rowBytes, fam, null,
256 now, KeyValue.Type.DeleteFamily);
257 }
258 WALEntry.Builder builder = WALEntry.newBuilder();
259 builder.setAssociatedCellCount(1);
260 WALKey.Builder keyBuilder = WALKey.newBuilder();
261 UUID.Builder uuidBuilder = UUID.newBuilder();
262 uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits());
263 uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits());
264 keyBuilder.setClusterId(uuidBuilder.build());
265 keyBuilder.setTableName(ByteStringer.wrap(table.getName()));
266 keyBuilder.setWriteTime(now);
267 keyBuilder.setEncodedRegionName(ByteStringer.wrap(HConstants.EMPTY_BYTE_ARRAY));
268 keyBuilder.setLogSequenceNumber(-1);
269 builder.setKey(keyBuilder.build());
270 cells.add(kv);
271
272 return builder.build();
273 }
274
275 }