1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23
24 import java.io.IOException;
25 import java.nio.ByteBuffer;
26 import java.util.Collection;
27 import java.util.Deque;
28 import java.util.List;
29 import java.util.NavigableMap;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.HBaseTestingUtility;
39 import org.apache.hadoop.hbase.HColumnDescriptor;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.hadoop.hbase.HRegionInfo;
42 import org.apache.hadoop.hbase.HRegionLocation;
43 import org.apache.hadoop.hbase.HTableDescriptor;
44 import org.apache.hadoop.hbase.MetaTableAccessor;
45 import org.apache.hadoop.hbase.ServerName;
46 import org.apache.hadoop.hbase.TableExistsException;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.client.Admin;
49 import org.apache.hadoop.hbase.client.Connection;
50 import org.apache.hadoop.hbase.client.ConnectionFactory;
51 import org.apache.hadoop.hbase.client.HConnection;
52 import org.apache.hadoop.hbase.client.HTable;
53 import org.apache.hadoop.hbase.client.Result;
54 import org.apache.hadoop.hbase.client.ResultScanner;
55 import org.apache.hadoop.hbase.client.Scan;
56 import org.apache.hadoop.hbase.client.Table;
57 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
58 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
59 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
60 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
61 import org.apache.hadoop.hbase.regionserver.HRegionServer;
62 import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
63 import org.apache.hadoop.hbase.util.Bytes;
64 import org.apache.hadoop.hbase.util.Pair;
65 import org.apache.hadoop.hbase.testclassification.LargeTests;
66 import org.junit.AfterClass;
67 import org.junit.BeforeClass;
68 import org.junit.Test;
69 import org.junit.experimental.categories.Category;
70 import org.mockito.Mockito;
71
72 import com.google.common.collect.Multimap;
73 import com.google.protobuf.RpcController;
74 import com.google.protobuf.ServiceException;
75
76
77
78
79 @Category(LargeTests.class)
80 public class TestLoadIncrementalHFilesSplitRecovery {
81 private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
82
83 static HBaseTestingUtility util;
84
85 static boolean useSecure = false;
86
87 final static int NUM_CFS = 10;
88 final static byte[] QUAL = Bytes.toBytes("qual");
89 final static int ROWCOUNT = 100;
90
91 private final static byte[][] families = new byte[NUM_CFS][];
92 static {
93 for (int i = 0; i < NUM_CFS; i++) {
94 families[i] = Bytes.toBytes(family(i));
95 }
96 }
97
98 static byte[] rowkey(int i) {
99 return Bytes.toBytes(String.format("row_%08d", i));
100 }
101
102 static String family(int i) {
103 return String.format("family_%04d", i);
104 }
105
106 static byte[] value(int i) {
107 return Bytes.toBytes(String.format("%010d", i));
108 }
109
110 public static void buildHFiles(FileSystem fs, Path dir, int value)
111 throws IOException {
112 byte[] val = value(value);
113 for (int i = 0; i < NUM_CFS; i++) {
114 Path testIn = new Path(dir, family(i));
115
116 TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
117 Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
118 }
119 }
120
121
122
123
124
125 private void setupTable(final Connection connection, TableName table, int cfs)
126 throws IOException {
127 try {
128 LOG.info("Creating table " + table);
129 HTableDescriptor htd = new HTableDescriptor(table);
130 for (int i = 0; i < cfs; i++) {
131 htd.addFamily(new HColumnDescriptor(family(i)));
132 }
133 try (Admin admin = connection.getAdmin()) {
134 admin.createTable(htd);
135 }
136 } catch (TableExistsException tee) {
137 LOG.info("Table " + table + " already exists");
138 }
139 }
140
141
142
143
144
145
146
147
148 private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
149 throws IOException {
150 try {
151 LOG.info("Creating table " + table);
152 HTableDescriptor htd = new HTableDescriptor(table);
153 for (int i = 0; i < cfs; i++) {
154 htd.addFamily(new HColumnDescriptor(family(i)));
155 }
156
157 util.createTable(htd, SPLIT_KEYS);
158 } catch (TableExistsException tee) {
159 LOG.info("Table " + table + " already exists");
160 }
161 }
162
163 private Path buildBulkFiles(TableName table, int value) throws Exception {
164 Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
165 Path bulk1 = new Path(dir, table.getNameAsString() + value);
166 FileSystem fs = util.getTestFileSystem();
167 buildHFiles(fs, bulk1, value);
168 return bulk1;
169 }
170
171
172
173
174 private void populateTable(final Connection connection, TableName table, int value)
175 throws Exception {
176
177 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
178 Path bulk1 = buildBulkFiles(table, value);
179 try (Table t = connection.getTable(table)) {
180 lih.doBulkLoad(bulk1, (HTable)t);
181 }
182 }
183
184
185
186
187 private void forceSplit(TableName table) {
188 try {
189
190 HRegionServer hrs = util.getRSForFirstRegionInTable(table);
191
192 for (HRegionInfo hri :
193 ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
194 if (hri.getTable().equals(table)) {
195
196 ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
197 }
198 }
199
200
201 int regions;
202 do {
203 regions = 0;
204 for (HRegionInfo hri :
205 ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
206 if (hri.getTable().equals(table)) {
207 regions++;
208 }
209 }
210 if (regions != 2) {
211 LOG.info("Taking some time to complete split...");
212 Thread.sleep(250);
213 }
214 } while (regions != 2);
215 } catch (IOException e) {
216 e.printStackTrace();
217 } catch (InterruptedException e) {
218 e.printStackTrace();
219 }
220 }
221
222 @BeforeClass
223 public static void setupCluster() throws Exception {
224 util = new HBaseTestingUtility();
225 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,"");
226 util.startMiniCluster(1);
227 }
228
229 @AfterClass
230 public static void teardownCluster() throws Exception {
231 util.shutdownMiniCluster();
232 }
233
234
235
236
237
238
239 void assertExpectedTable(TableName table, int count, int value) throws IOException {
240 HTableDescriptor [] htds = util.getHBaseAdmin().listTables(table.getNameAsString());
241 assertEquals(htds.length, 1);
242 Table t = null;
243 try {
244 t = new HTable(util.getConfiguration(), table);
245 Scan s = new Scan();
246 ResultScanner sr = t.getScanner(s);
247 int i = 0;
248 for (Result r : sr) {
249 i++;
250 for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
251 for (byte[] val : nm.values()) {
252 assertTrue(Bytes.equals(val, value(value)));
253 }
254 }
255 }
256 assertEquals(count, i);
257 } catch (IOException e) {
258 fail("Failed due to exception");
259 } finally {
260 if (t != null) t.close();
261 }
262 }
263
264
265
266
267
268 @Test(expected=IOException.class, timeout=120000)
269 public void testBulkLoadPhaseFailure() throws Exception {
270 TableName table = TableName.valueOf("bulkLoadPhaseFailure");
271 final AtomicInteger attmptedCalls = new AtomicInteger();
272 final AtomicInteger failedCalls = new AtomicInteger();
273 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
274 try (Connection connection = ConnectionFactory.createConnection(this.util.getConfiguration())) {
275 setupTable(connection, table, 10);
276 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
277 @Override
278 protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
279 TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
280 throws IOException {
281 int i = attmptedCalls.incrementAndGet();
282 if (i == 1) {
283 Connection errConn = null;
284 try {
285 errConn = getMockedConnection(util.getConfiguration());
286 } catch (Exception e) {
287 LOG.fatal("mocking cruft, should never happen", e);
288 throw new RuntimeException("mocking cruft, should never happen");
289 }
290 failedCalls.incrementAndGet();
291 return super.tryAtomicRegionLoad((HConnection)errConn, tableName, first, lqis);
292 }
293
294 return super.tryAtomicRegionLoad((HConnection)conn, tableName, first, lqis);
295 }
296 };
297 try {
298
299 Path dir = buildBulkFiles(table, 1);
300 try (Table t = connection.getTable(table)) {
301 lih.doBulkLoad(dir, (HTable)t);
302 }
303 } finally {
304 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
305 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
306 }
307 fail("doBulkLoad should have thrown an exception");
308 }
309 }
310
311 @SuppressWarnings("deprecation")
312 private HConnection getMockedConnection(final Configuration conf)
313 throws IOException, ServiceException {
314 HConnection c = Mockito.mock(HConnection.class);
315 Mockito.when(c.getConfiguration()).thenReturn(conf);
316 Mockito.doNothing().when(c).close();
317
318 final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
319 ServerName.valueOf("example.org", 1234, 0));
320 Mockito.when(c.getRegionLocation((TableName) Mockito.any(),
321 (byte[]) Mockito.any(), Mockito.anyBoolean())).
322 thenReturn(loc);
323 Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
324 thenReturn(loc);
325 ClientProtos.ClientService.BlockingInterface hri =
326 Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
327 Mockito.when(hri.bulkLoadHFile((RpcController)Mockito.any(), (BulkLoadHFileRequest)Mockito.any())).
328 thenThrow(new ServiceException(new IOException("injecting bulk load error")));
329 Mockito.when(c.getClient(Mockito.any(ServerName.class))).
330 thenReturn(hri);
331 return c;
332 }
333
334
335
336
337
338
339
340 @Test (timeout=120000)
341 public void testSplitWhileBulkLoadPhase() throws Exception {
342 final TableName table = TableName.valueOf("splitWhileBulkloadPhase");
343 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
344 setupTable(connection, table, 10);
345 populateTable(connection, table,1);
346 assertExpectedTable(table, ROWCOUNT, 1);
347
348
349
350 final AtomicInteger attemptedCalls = new AtomicInteger();
351 LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) {
352 @Override
353 protected void bulkLoadPhase(final Table htable, final Connection conn,
354 ExecutorService pool, Deque<LoadQueueItem> queue,
355 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
356 int i = attemptedCalls.incrementAndGet();
357 if (i == 1) {
358
359 forceSplit(table);
360 }
361 super.bulkLoadPhase(htable, conn, pool, queue, regionGroups);
362 }
363 };
364
365
366 try (Table t = connection.getTable(table)) {
367 Path bulk = buildBulkFiles(table, 2);
368 lih2.doBulkLoad(bulk, (HTable)t);
369 }
370
371
372
373
374 assertEquals(attemptedCalls.get(), 3);
375 assertExpectedTable(table, ROWCOUNT, 2);
376 }
377 }
378
379
380
381
382
383 @Test (timeout=120000)
384 public void testGroupOrSplitPresplit() throws Exception {
385 final TableName table = TableName.valueOf("groupOrSplitPresplit");
386 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
387 setupTable(connection, table, 10);
388 populateTable(connection, table, 1);
389 assertExpectedTable(connection, table, ROWCOUNT, 1);
390 forceSplit(table);
391
392 final AtomicInteger countedLqis= new AtomicInteger();
393 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
394 util.getConfiguration()) {
395 @Override
396 protected List<LoadQueueItem> groupOrSplit(
397 Multimap<ByteBuffer, LoadQueueItem> regionGroups,
398 final LoadQueueItem item, final Table htable,
399 final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
400 List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
401 if (lqis != null) {
402 countedLqis.addAndGet(lqis.size());
403 }
404 return lqis;
405 }
406 };
407
408
409 Path bulk = buildBulkFiles(table, 2);
410 try (Table t = connection.getTable(table)) {
411 lih.doBulkLoad(bulk, (HTable)t);
412 }
413 assertExpectedTable(connection, table, ROWCOUNT, 2);
414 assertEquals(20, countedLqis.get());
415 }
416 }
417
418
419
420
421
422 @Test(expected = IOException.class, timeout=120000)
423 public void testGroupOrSplitFailure() throws Exception {
424 TableName table = TableName.valueOf("groupOrSplitFailure");
425 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
426 setupTable(connection, table, 10);
427
428 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
429 util.getConfiguration()) {
430 int i = 0;
431
432 @Override
433 protected List<LoadQueueItem> groupOrSplit(
434 Multimap<ByteBuffer, LoadQueueItem> regionGroups,
435 final LoadQueueItem item, final Table table,
436 final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
437 i++;
438
439 if (i == 5) {
440 throw new IOException("failure");
441 }
442 return super.groupOrSplit(regionGroups, item, table, startEndKeys);
443 }
444 };
445
446
447 Path dir = buildBulkFiles(table,1);
448 try (Table t = connection.getTable(table)) {
449 lih.doBulkLoad(dir, (HTable)t);
450 }
451 }
452
453 fail("doBulkLoad should have thrown an exception");
454 }
455
456 @Test (timeout=120000)
457 public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
458 TableName tableName = TableName.valueOf("testGroupOrSplitWhenRegionHoleExistsInMeta");
459 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
460
461
462
463 Connection connection = ConnectionFactory.createConnection(util.getConfiguration());
464 Table table = connection.getTable(tableName);
465
466 setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
467 Path dir = buildBulkFiles(tableName, 2);
468
469 final AtomicInteger countedLqis = new AtomicInteger();
470 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
471
472 @Override
473 protected List<LoadQueueItem> groupOrSplit(
474 Multimap<ByteBuffer, LoadQueueItem> regionGroups,
475 final LoadQueueItem item, final Table htable,
476 final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
477 List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
478 if (lqis != null) {
479 countedLqis.addAndGet(lqis.size());
480 }
481 return lqis;
482 }
483 };
484
485
486 try {
487 loader.doBulkLoad(dir, (HTable)table);
488 } catch (Exception e) {
489 LOG.error("exeception=", e);
490 }
491
492 this.assertExpectedTable(tableName, ROWCOUNT, 2);
493
494 dir = buildBulkFiles(tableName, 3);
495
496
497 List<HRegionInfo> regionInfos = MetaTableAccessor.getTableRegions(util.getZooKeeperWatcher(),
498 connection, tableName);
499 for (HRegionInfo regionInfo : regionInfos) {
500 if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
501 MetaTableAccessor.deleteRegion(connection, regionInfo);
502 break;
503 }
504 }
505
506 try {
507 loader.doBulkLoad(dir, (HTable)table);
508 } catch (Exception e) {
509 LOG.error("exeception=", e);
510 assertTrue("IOException expected", e instanceof IOException);
511 }
512
513 table.close();
514
515
516 regionInfos = MetaTableAccessor.getTableRegions(util.getZooKeeperWatcher(),
517 connection, tableName);
518 assertTrue(regionInfos.size() >= 1);
519
520 this.assertExpectedTable(connection, tableName, ROWCOUNT, 2);
521 connection.close();
522 }
523
524
525
526
527
528
529 void assertExpectedTable(final Connection connection, TableName table, int count, int value)
530 throws IOException {
531 HTableDescriptor [] htds = util.getHBaseAdmin().listTables(table.getNameAsString());
532 assertEquals(htds.length, 1);
533 Table t = null;
534 try {
535 t = connection.getTable(table);
536 Scan s = new Scan();
537 ResultScanner sr = t.getScanner(s);
538 int i = 0;
539 for (Result r : sr) {
540 i++;
541 for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
542 for (byte[] val : nm.values()) {
543 assertTrue(Bytes.equals(val, value(value)));
544 }
545 }
546 }
547 assertEquals(count, i);
548 } catch (IOException e) {
549 fail("Failed due to exception");
550 } finally {
551 if (t != null) t.close();
552 }
553 }
554 }