1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import static org.junit.Assert.*;
22 import static org.mockito.Mockito.mock;
23 import static org.mockito.Mockito.when;
24
25 import java.io.IOException;
26 import java.util.List;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.atomic.AtomicLong;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.commons.logging.impl.Log4JLogger;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HRegionLocation;
37 import org.apache.hadoop.hbase.HTableDescriptor;
38 import org.apache.hadoop.hbase.testclassification.MediumTests;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.Waiter;
41 import org.apache.hadoop.hbase.client.ClusterConnection;
42 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
43 import org.apache.hadoop.hbase.client.Connection;
44 import org.apache.hadoop.hbase.client.ConnectionFactory;
45 import org.apache.hadoop.hbase.client.RegionLocator;
46 import org.apache.hadoop.hbase.client.Table;
47 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
48 import org.apache.hadoop.hbase.regionserver.HRegionServer;
49 import org.apache.hadoop.hbase.regionserver.Region;
50 import org.apache.hadoop.hbase.wal.WAL.Entry;
51 import org.apache.hadoop.hbase.wal.WALKey;
52 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
53 import org.apache.hadoop.hbase.replication.ReplicationException;
54 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
55 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
56 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
57 import org.apache.log4j.Level;
58 import org.junit.AfterClass;
59 import org.junit.BeforeClass;
60 import org.junit.Test;
61 import org.junit.experimental.categories.Category;
62
63 import com.google.common.collect.Lists;
64
65
66
67
68
69 @Category(MediumTests.class)
70 public class TestRegionReplicaReplicationEndpoint {
71
72 private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class);
73
74 static {
75 ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL);
76 }
77
78 private static final int NB_SERVERS = 2;
79
80 private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
81
82 @BeforeClass
83 public static void beforeClass() throws Exception {
84 Configuration conf = HTU.getConfiguration();
85 conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
86 conf.setInt("replication.source.size.capacity", 10240);
87 conf.setLong("replication.source.sleepforretries", 100);
88 conf.setInt("hbase.regionserver.maxlogs", 10);
89 conf.setLong("hbase.master.logcleaner.ttl", 10);
90 conf.setInt("zookeeper.recovery.retry", 1);
91 conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
92 conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
93 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
94 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
95 conf.setInt("replication.stats.thread.period.seconds", 5);
96 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
97 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
98 conf.setInt("hbase.client.serverside.retries.multiplier", 1);
99
100 HTU.startMiniCluster(NB_SERVERS);
101 }
102
103 @AfterClass
104 public static void afterClass() throws Exception {
105 HTU.shutdownMiniCluster();
106 }
107
108 @Test
109 public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException {
110
111
112 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
113 String peerId = "region_replica_replication";
114
115 if (admin.getPeerConfig(peerId) != null) {
116 admin.removePeer(peerId);
117 }
118
119 HTableDescriptor htd = HTU.createTableDescriptor(
120 "testReplicationPeerIsCreated_no_region_replicas");
121 HTU.getHBaseAdmin().createTable(htd);
122 ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId);
123 assertNull(peerConfig);
124
125 htd = HTU.createTableDescriptor("testReplicationPeerIsCreated");
126 htd.setRegionReplication(2);
127 HTU.getHBaseAdmin().createTable(htd);
128
129
130 peerConfig = admin.getPeerConfig(peerId);
131 assertNotNull(peerConfig);
132 assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
133 HTU.getConfiguration()));
134 assertEquals(peerConfig.getReplicationEndpointImpl(),
135 RegionReplicaReplicationEndpoint.class.getName());
136 admin.close();
137 }
138
139 @Test (timeout=240000)
140 public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception {
141
142
143 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
144 String peerId = "region_replica_replication";
145
146 if (admin.getPeerConfig(peerId) != null) {
147 admin.removePeer(peerId);
148 }
149
150 HTableDescriptor htd
151 = HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable");
152 HTU.getHBaseAdmin().createTable(htd);
153
154
155 ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId);
156 assertNull(peerConfig);
157
158 HTU.getHBaseAdmin().disableTable(htd.getTableName());
159 htd.setRegionReplication(2);
160 HTU.getHBaseAdmin().modifyTable(htd.getTableName(), htd);
161 HTU.getHBaseAdmin().enableTable(htd.getTableName());
162
163
164 peerConfig = admin.getPeerConfig(peerId);
165 assertNotNull(peerConfig);
166 assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
167 HTU.getConfiguration()));
168 assertEquals(peerConfig.getReplicationEndpointImpl(),
169 RegionReplicaReplicationEndpoint.class.getName());
170 admin.close();
171 }
172
173 public void testRegionReplicaReplication(int regionReplication) throws Exception {
174
175
176 TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
177 + regionReplication);
178 HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
179 htd.setRegionReplication(regionReplication);
180 HTU.getHBaseAdmin().createTable(htd);
181 TableName tableNameNoReplicas =
182 TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
183 HTU.deleteTableIfAny(tableNameNoReplicas);
184 HTU.createTable(tableNameNoReplicas, HBaseTestingUtility.fam1);
185
186 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
187 Table table = connection.getTable(tableName);
188 Table tableNoReplicas = connection.getTable(tableNameNoReplicas);
189
190 try {
191
192 HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtility.fam1, 6000, 7000);
193
194
195 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
196
197 verifyReplication(tableName, regionReplication, 0, 1000);
198
199 } finally {
200 table.close();
201 tableNoReplicas.close();
202 HTU.deleteTableIfAny(tableNameNoReplicas);
203 connection.close();
204 }
205 }
206
207 private void verifyReplication(TableName tableName, int regionReplication,
208 final int startRow, final int endRow) throws Exception {
209 verifyReplication(tableName, regionReplication, startRow, endRow, true);
210 }
211
212 private void verifyReplication(TableName tableName, int regionReplication,
213 final int startRow, final int endRow, final boolean present) throws Exception {
214
215 final Region[] regions = new Region[regionReplication];
216
217 for (int i=0; i < NB_SERVERS; i++) {
218 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
219 List<Region> onlineRegions = rs.getOnlineRegions(tableName);
220 for (Region region : onlineRegions) {
221 regions[region.getRegionInfo().getReplicaId()] = region;
222 }
223 }
224
225 for (Region region : regions) {
226 assertNotNull(region);
227 }
228
229 for (int i = 1; i < regionReplication; i++) {
230 final Region region = regions[i];
231
232 Waiter.waitFor(HTU.getConfiguration(), 90000, new Waiter.Predicate<Exception>() {
233 @Override
234 public boolean evaluate() throws Exception {
235 LOG.info("verifying replication for region replica:" + region.getRegionInfo());
236 try {
237 HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow, present);
238 } catch(Throwable ex) {
239 LOG.warn("Verification from secondary region is not complete yet", ex);
240
241 return false;
242 }
243 return true;
244 }
245 });
246 }
247 }
248
249 @Test(timeout = 240000)
250 public void testRegionReplicaReplicationWith2Replicas() throws Exception {
251 testRegionReplicaReplication(2);
252 }
253
254 @Test(timeout = 240000)
255 public void testRegionReplicaReplicationWith3Replicas() throws Exception {
256 testRegionReplicaReplication(3);
257 }
258
259 @Test(timeout = 240000)
260 public void testRegionReplicaReplicationWith10Replicas() throws Exception {
261 testRegionReplicaReplication(10);
262 }
263
264 @Test (timeout = 240000)
265 public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
266 int regionReplication = 3;
267 TableName tableName = TableName.valueOf("testRegionReplicaWithoutMemstoreReplication");
268 HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
269 htd.setRegionReplication(regionReplication);
270 htd.setRegionMemstoreReplication(false);
271 HTU.getHBaseAdmin().createTable(htd);
272
273 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
274 Table table = connection.getTable(tableName);
275 try {
276
277 final int STEP = 100;
278 for (int i = 0; i < 3; ++i) {
279 final int startRow = i * STEP;
280 final int endRow = (i + 1) * STEP;
281 LOG.info("Writing data from " + startRow + " to " + endRow);
282 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, startRow, endRow);
283 verifyReplication(tableName, regionReplication, startRow, endRow, false);
284
285
286 LOG.info("flushing table");
287 HTU.flush(tableName);
288 verifyReplication(tableName, regionReplication, 0, endRow, true);
289 }
290 } finally {
291 table.close();
292 connection.close();
293 }
294 }
295
296 @Test (timeout = 240000)
297 public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
298
299
300
301
302 int regionReplication = 3;
303 TableName tableName = TableName.valueOf("testRegionReplicaReplicationForFlushAndCompaction");
304 HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
305 htd.setRegionReplication(regionReplication);
306 HTU.getHBaseAdmin().createTable(htd);
307
308 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
309 Table table = connection.getTable(tableName);
310
311 try {
312
313
314 for (int i = 0; i < 6000; i += 1000) {
315 LOG.info("Writing data from " + i + " to " + (i+1000));
316 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, i, i+1000);
317 LOG.info("flushing table");
318 HTU.flush(tableName);
319 LOG.info("compacting table");
320 HTU.compact(tableName, false);
321 }
322
323 verifyReplication(tableName, regionReplication, 0, 1000);
324 } finally {
325 table.close();
326 connection.close();
327 }
328 }
329
330 @Test (timeout = 240000)
331 public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
332 testRegionReplicaReplicationIgnoresDisabledTables(false);
333 }
334
335 @Test (timeout = 240000)
336 public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
337 testRegionReplicaReplicationIgnoresDisabledTables(true);
338 }
339
340 public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable)
341 throws Exception {
342
343
344
345 TableName tableName = TableName.valueOf("testRegionReplicaReplicationIgnoresDisabledTables"
346 + dropTable);
347 HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
348 int regionReplication = 3;
349 htd.setRegionReplication(regionReplication);
350 HTU.deleteTableIfAny(tableName);
351 HTU.getHBaseAdmin().createTable(htd);
352 TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable");
353 HTU.deleteTableIfAny(toBeDisabledTable);
354 htd = HTU.createTableDescriptor(toBeDisabledTable.toString());
355 htd.setRegionReplication(regionReplication);
356 HTU.getHBaseAdmin().createTable(htd);
357
358
359 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
360 admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
361
362
363
364 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
365 Table table = connection.getTable(tableName);
366 Table tableToBeDisabled = connection.getTable(toBeDisabledTable);
367
368 HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000);
369
370 AtomicLong skippedEdits = new AtomicLong();
371 RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
372 mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
373 when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
374 RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
375 new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
376 (ClusterConnection) connection,
377 Executors.newSingleThreadExecutor(), Integer.MAX_VALUE);
378 RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
379 HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
380 byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
381
382 Entry entry = new Entry(
383 new WALKey(encodedRegionName, toBeDisabledTable, 1),
384 new WALEdit());
385
386 HTU.getHBaseAdmin().disableTable(toBeDisabledTable);
387 if (dropTable) {
388 HTU.getHBaseAdmin().deleteTable(toBeDisabledTable);
389 }
390
391 sinkWriter.append(toBeDisabledTable, encodedRegionName,
392 HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
393
394 assertEquals(2, skippedEdits.get());
395
396 try {
397
398
399
400 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
401
402
403 admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
404
405 verifyReplication(tableName, regionReplication, 0, 1000);
406
407 } finally {
408 admin.close();
409 table.close();
410 rl.close();
411 tableToBeDisabled.close();
412 HTU.deleteTableIfAny(toBeDisabledTable);
413 connection.close();
414 }
415 }
416 }