1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23
24 import java.io.IOException;
25 import java.nio.ByteBuffer;
26 import java.util.Collection;
27 import java.util.Deque;
28 import java.util.List;
29 import java.util.NavigableMap;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.HBaseTestingUtility;
41 import org.apache.hadoop.hbase.HColumnDescriptor;
42 import org.apache.hadoop.hbase.HRegionInfo;
43 import org.apache.hadoop.hbase.HRegionLocation;
44 import org.apache.hadoop.hbase.HTableDescriptor;
45 import org.apache.hadoop.hbase.LargeTests;
46 import org.apache.hadoop.hbase.ServerName;
47 import org.apache.hadoop.hbase.TableExistsException;
48 import org.apache.hadoop.hbase.catalog.CatalogTracker;
49 import org.apache.hadoop.hbase.catalog.MetaEditor;
50 import org.apache.hadoop.hbase.catalog.MetaReader;
51 import org.apache.hadoop.hbase.client.HConnection;
52 import org.apache.hadoop.hbase.client.HTable;
53 import org.apache.hadoop.hbase.client.Result;
54 import org.apache.hadoop.hbase.client.ResultScanner;
55 import org.apache.hadoop.hbase.client.Scan;
56 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
57 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
58 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
59 import org.apache.hadoop.hbase.regionserver.HRegionServer;
60 import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
61 import org.apache.hadoop.hbase.util.Bytes;
62 import org.apache.hadoop.hbase.util.Pair;
63 import org.junit.AfterClass;
64 import org.junit.BeforeClass;
65 import org.junit.Test;
66 import org.junit.experimental.categories.Category;
67 import org.mockito.Mockito;
68
69 import com.google.common.collect.Multimap;
70 import com.google.protobuf.RpcController;
71 import com.google.protobuf.ServiceException;
72
73
74
75
76 @Category(LargeTests.class)
77 public class TestLoadIncrementalHFilesSplitRecovery {
78 final static Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
79
80 static HBaseTestingUtility util;
81
82 static boolean useSecure = false;
83
84 final static int NUM_CFS = 10;
85 final static byte[] QUAL = Bytes.toBytes("qual");
86 final static int ROWCOUNT = 100;
87
88 private final static byte[][] families = new byte[NUM_CFS][];
89 static {
90 for (int i = 0; i < NUM_CFS; i++) {
91 families[i] = Bytes.toBytes(family(i));
92 }
93 }
94
95 static byte[] rowkey(int i) {
96 return Bytes.toBytes(String.format("row_%08d", i));
97 }
98
99 static String family(int i) {
100 return String.format("family_%04d", i);
101 }
102
103 static byte[] value(int i) {
104 return Bytes.toBytes(String.format("%010d", i));
105 }
106
107 public static void buildHFiles(FileSystem fs, Path dir, int value)
108 throws IOException {
109 byte[] val = value(value);
110 for (int i = 0; i < NUM_CFS; i++) {
111 Path testIn = new Path(dir, family(i));
112
113 TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
114 Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
115 }
116 }
117
118
119
120
121
122 private void setupTable(String table, int cfs) throws IOException {
123 try {
124 LOG.info("Creating table " + table);
125 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
126 for (int i = 0; i < cfs; i++) {
127 htd.addFamily(new HColumnDescriptor(family(i)));
128 }
129
130 util.getHBaseAdmin().createTable(htd);
131 } catch (TableExistsException tee) {
132 LOG.info("Table " + table + " already exists");
133 }
134 }
135
136
137
138
139
140
141
142
143 private void setupTableWithSplitkeys(String table, int cfs, byte[][] SPLIT_KEYS)
144 throws IOException {
145 try {
146 LOG.info("Creating table " + table);
147 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
148 for (int i = 0; i < cfs; i++) {
149 htd.addFamily(new HColumnDescriptor(family(i)));
150 }
151
152 util.getHBaseAdmin().createTable(htd, SPLIT_KEYS);
153 } catch (TableExistsException tee) {
154 LOG.info("Table " + table + " already exists");
155 }
156 }
157
158 private Path buildBulkFiles(String table, int value) throws Exception {
159 Path dir = util.getDataTestDirOnTestFS(table);
160 Path bulk1 = new Path(dir, table+value);
161 FileSystem fs = util.getTestFileSystem();
162 buildHFiles(fs, bulk1, value);
163 return bulk1;
164 }
165
166
167
168
169 private void populateTable(String table, int value) throws Exception {
170
171 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
172 Path bulk1 = buildBulkFiles(table, value);
173 HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
174 lih.doBulkLoad(bulk1, t);
175 }
176
177
178
179
180 private void forceSplit(String table) {
181 try {
182
183 HRegionServer hrs = util.getRSForFirstRegionInTable(Bytes
184 .toBytes(table));
185
186 for (HRegionInfo hri : ProtobufUtil.getOnlineRegions(hrs)) {
187 if (Bytes.equals(hri.getTable().getName(), Bytes.toBytes(table))) {
188
189 ProtobufUtil.split(hrs, hri, rowkey(ROWCOUNT / 2));
190 }
191 }
192
193
194 int regions;
195 do {
196 regions = 0;
197 for (HRegionInfo hri : ProtobufUtil.getOnlineRegions(hrs)) {
198 if (Bytes.equals(hri.getTable().getName(), Bytes.toBytes(table))) {
199 regions++;
200 }
201 }
202 if (regions != 2) {
203 LOG.info("Taking some time to complete split...");
204 Thread.sleep(250);
205 }
206 } while (regions != 2);
207 } catch (IOException e) {
208 e.printStackTrace();
209 } catch (InterruptedException e) {
210 e.printStackTrace();
211 }
212 }
213
214 @BeforeClass
215 public static void setupCluster() throws Exception {
216 util = new HBaseTestingUtility();
217 util.startMiniCluster(1);
218 }
219
220 @AfterClass
221 public static void teardownCluster() throws Exception {
222 util.shutdownMiniCluster();
223 }
224
225
226
227
228
229
230 void assertExpectedTable(String table, int count, int value) throws IOException {
231 HTable t = null;
232 try {
233 assertEquals(util.getHBaseAdmin().listTables(table).length, 1);
234 t = new HTable(util.getConfiguration(), table);
235 Scan s = new Scan();
236 ResultScanner sr = t.getScanner(s);
237 int i = 0;
238 for (Result r : sr) {
239 i++;
240 for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
241 for (byte[] val : nm.values()) {
242 assertTrue(Bytes.equals(val, value(value)));
243 }
244 }
245 }
246 assertEquals(count, i);
247 } catch (IOException e) {
248 fail("Failed due to exception");
249 } finally {
250 if (t != null) t.close();
251 }
252 }
253
254
255
256
257
258 @Test(expected=IOException.class)
259 public void testBulkLoadPhaseFailure() throws Exception {
260 String table = "bulkLoadPhaseFailure";
261 setupTable(table, 10);
262
263 final AtomicInteger attmptedCalls = new AtomicInteger();
264 final AtomicInteger failedCalls = new AtomicInteger();
265 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
266 try {
267 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
268
269 protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
270 TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
271 throws IOException {
272 int i = attmptedCalls.incrementAndGet();
273 if (i == 1) {
274 HConnection errConn = null;
275 try {
276 errConn = getMockedConnection(util.getConfiguration());
277 } catch (Exception e) {
278 LOG.fatal("mocking cruft, should never happen", e);
279 throw new RuntimeException("mocking cruft, should never happen");
280 }
281 failedCalls.incrementAndGet();
282 return super.tryAtomicRegionLoad(errConn, tableName, first, lqis);
283 }
284
285 return super.tryAtomicRegionLoad(conn, tableName, first, lqis);
286 }
287 };
288
289
290 Path dir = buildBulkFiles(table, 1);
291 HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
292 lih.doBulkLoad(dir, t);
293 } finally {
294 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
295 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
296 }
297
298 fail("doBulkLoad should have thrown an exception");
299 }
300
301 private HConnection getMockedConnection(final Configuration conf)
302 throws IOException, ServiceException {
303 HConnection c = Mockito.mock(HConnection.class);
304 Mockito.when(c.getConfiguration()).thenReturn(conf);
305 Mockito.doNothing().when(c).close();
306
307 final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
308 ServerName.valueOf("example.org", 1234, 0));
309 Mockito.when(c.getRegionLocation((TableName) Mockito.any(),
310 (byte[]) Mockito.any(), Mockito.anyBoolean())).
311 thenReturn(loc);
312 Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
313 thenReturn(loc);
314 ClientProtos.ClientService.BlockingInterface hri =
315 Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
316 Mockito.when(hri.bulkLoadHFile((RpcController)Mockito.any(), (BulkLoadHFileRequest)Mockito.any())).
317 thenThrow(new ServiceException(new IOException("injecting bulk load error")));
318 Mockito.when(c.getClient(Mockito.any(ServerName.class))).
319 thenReturn(hri);
320 return c;
321 }
322
323
324
325
326
327
328
329 @Test
330 public void testSplitWhileBulkLoadPhase() throws Exception {
331 final String table = "splitWhileBulkloadPhase";
332 setupTable(table, 10);
333 populateTable(table,1);
334 assertExpectedTable(table, ROWCOUNT, 1);
335
336
337
338 final AtomicInteger attemptedCalls = new AtomicInteger();
339 LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(
340 util.getConfiguration()) {
341
342 protected void bulkLoadPhase(final HTable htable, final HConnection conn,
343 ExecutorService pool, Deque<LoadQueueItem> queue,
344 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
345 int i = attemptedCalls.incrementAndGet();
346 if (i == 1) {
347
348 forceSplit(table);
349 }
350
351 super.bulkLoadPhase(htable, conn, pool, queue, regionGroups);
352 }
353 };
354
355
356 HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
357 Path bulk = buildBulkFiles(table, 2);
358 lih2.doBulkLoad(bulk, t);
359
360
361
362
363 assertEquals(attemptedCalls.get(), 3);
364 assertExpectedTable(table, ROWCOUNT, 2);
365 }
366
367
368
369
370
371 @Test
372 public void testGroupOrSplitPresplit() throws Exception {
373 final String table = "groupOrSplitPresplit";
374 setupTable(table, 10);
375 populateTable(table, 1);
376 assertExpectedTable(table, ROWCOUNT, 1);
377 forceSplit(table);
378
379 final AtomicInteger countedLqis= new AtomicInteger();
380 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
381 util.getConfiguration()) {
382 protected List<LoadQueueItem> groupOrSplit(
383 Multimap<ByteBuffer, LoadQueueItem> regionGroups,
384 final LoadQueueItem item, final HTable htable,
385 final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
386 List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
387 if (lqis != null) {
388 countedLqis.addAndGet(lqis.size());
389 }
390 return lqis;
391 }
392 };
393
394
395 Path bulk = buildBulkFiles(table, 2);
396 HTable ht = new HTable(util.getConfiguration(), Bytes.toBytes(table));
397 lih.doBulkLoad(bulk, ht);
398
399 assertExpectedTable(table, ROWCOUNT, 2);
400 assertEquals(20, countedLqis.get());
401 }
402
403
404
405
406
407 @Test(expected = IOException.class)
408 public void testGroupOrSplitFailure() throws Exception {
409 String table = "groupOrSplitFailure";
410 setupTable(table, 10);
411
412 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
413 util.getConfiguration()) {
414 int i = 0;
415
416 protected List<LoadQueueItem> groupOrSplit(
417 Multimap<ByteBuffer, LoadQueueItem> regionGroups,
418 final LoadQueueItem item, final HTable table,
419 final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
420 i++;
421
422 if (i == 5) {
423 throw new IOException("failure");
424 }
425 return super.groupOrSplit(regionGroups, item, table, startEndKeys);
426 }
427 };
428
429
430 Path dir = buildBulkFiles(table,1);
431 HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
432 lih.doBulkLoad(dir, t);
433
434 fail("doBulkLoad should have thrown an exception");
435 }
436
437 @Test
438 public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
439 String tableName = "testGroupOrSplitWhenRegionHoleExistsInMeta";
440 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
441 HTable table = new HTable(util.getConfiguration(), Bytes.toBytes(tableName));
442
443 setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
444 Path dir = buildBulkFiles(tableName, 2);
445
446 final AtomicInteger countedLqis = new AtomicInteger();
447 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
448
449 protected List<LoadQueueItem>
450 groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
451 final HTable htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
452 List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
453 if (lqis != null) {
454 countedLqis.addAndGet(lqis.size());
455 }
456 return lqis;
457 }
458 };
459
460
461 try {
462 loader.doBulkLoad(dir, table);
463 } catch (Exception e) {
464 LOG.error("exeception=", e);
465 }
466
467 this.assertExpectedTable(tableName, ROWCOUNT, 2);
468
469 dir = buildBulkFiles(tableName, 3);
470
471
472 CatalogTracker ct = new CatalogTracker(util.getConfiguration());
473 List<HRegionInfo> regionInfos = MetaReader.getTableRegions(ct, TableName.valueOf(tableName));
474 for (HRegionInfo regionInfo : regionInfos) {
475 if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
476 MetaEditor.deleteRegion(ct, regionInfo);
477 break;
478 }
479 }
480
481 try {
482 loader.doBulkLoad(dir, table);
483 } catch (Exception e) {
484 LOG.error("exeception=", e);
485 assertTrue("IOException expected", e instanceof IOException);
486 }
487
488 table.close();
489
490 this.assertExpectedTable(tableName, ROWCOUNT, 2);
491 }
492 }
493