View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  import static org.junit.Assert.fail;
23  
24  import java.io.IOException;
25  import java.nio.ByteBuffer;
26  import java.util.Collection;
27  import java.util.Deque;
28  import java.util.List;
29  import java.util.NavigableMap;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.TableName;
40  import org.apache.hadoop.hbase.HBaseTestingUtility;
41  import org.apache.hadoop.hbase.HColumnDescriptor;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.HRegionLocation;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.LargeTests;
46  import org.apache.hadoop.hbase.ServerName;
47  import org.apache.hadoop.hbase.TableExistsException;
48  import org.apache.hadoop.hbase.catalog.CatalogTracker;
49  import org.apache.hadoop.hbase.catalog.MetaEditor;
50  import org.apache.hadoop.hbase.catalog.MetaReader;
51  import org.apache.hadoop.hbase.client.HConnection;
52  import org.apache.hadoop.hbase.client.HTable;
53  import org.apache.hadoop.hbase.client.Result;
54  import org.apache.hadoop.hbase.client.ResultScanner;
55  import org.apache.hadoop.hbase.client.Scan;
56  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
57  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
58  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
59  import org.apache.hadoop.hbase.regionserver.HRegionServer;
60  import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
61  import org.apache.hadoop.hbase.util.Bytes;
62  import org.apache.hadoop.hbase.util.Pair;
63  import org.junit.AfterClass;
64  import org.junit.BeforeClass;
65  import org.junit.Test;
66  import org.junit.experimental.categories.Category;
67  import org.mockito.Mockito;
68  
69  import com.google.common.collect.Multimap;
70  import com.google.protobuf.RpcController;
71  import com.google.protobuf.ServiceException;
72  
73  /**
74   * Test cases for the atomic load error handling of the bulk load functionality.
75   */
76  @Category(LargeTests.class)
77  public class TestLoadIncrementalHFilesSplitRecovery {
78    final static Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
79  
80    static HBaseTestingUtility util;
81    //used by secure subclass
82    static boolean useSecure = false;
83  
84    final static int NUM_CFS = 10;
85    final static byte[] QUAL = Bytes.toBytes("qual");
86    final static int ROWCOUNT = 100;
87  
88    private final static byte[][] families = new byte[NUM_CFS][];
89    static {
90      for (int i = 0; i < NUM_CFS; i++) {
91        families[i] = Bytes.toBytes(family(i));
92      }
93    }
94  
95    static byte[] rowkey(int i) {
96      return Bytes.toBytes(String.format("row_%08d", i));
97    }
98  
99    static String family(int i) {
100     return String.format("family_%04d", i);
101   }
102 
103   static byte[] value(int i) {
104     return Bytes.toBytes(String.format("%010d", i));
105   }
106 
107   public static void buildHFiles(FileSystem fs, Path dir, int value)
108       throws IOException {
109     byte[] val = value(value);
110     for (int i = 0; i < NUM_CFS; i++) {
111       Path testIn = new Path(dir, family(i));
112 
113       TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
114           Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
115     }
116   }
117 
118   /**
119    * Creates a table with given table name and specified number of column
120    * families if the table does not already exist.
121    */
122   private void setupTable(String table, int cfs) throws IOException {
123     try {
124       LOG.info("Creating table " + table);
125       HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
126       for (int i = 0; i < cfs; i++) {
127         htd.addFamily(new HColumnDescriptor(family(i)));
128       }
129 
130       util.getHBaseAdmin().createTable(htd);
131     } catch (TableExistsException tee) {
132       LOG.info("Table " + table + " already exists");
133     }
134   }
135 
136   /**
137    * Creates a table with given table name,specified number of column families<br>
138    * and splitkeys if the table does not already exist.
139    * @param table
140    * @param cfs
141    * @param SPLIT_KEYS
142    */
143   private void setupTableWithSplitkeys(String table, int cfs, byte[][] SPLIT_KEYS)
144       throws IOException {
145     try {
146       LOG.info("Creating table " + table);
147       HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
148       for (int i = 0; i < cfs; i++) {
149         htd.addFamily(new HColumnDescriptor(family(i)));
150       }
151 
152       util.getHBaseAdmin().createTable(htd, SPLIT_KEYS);
153     } catch (TableExistsException tee) {
154       LOG.info("Table " + table + " already exists");
155     }
156   }
157 
158   private Path buildBulkFiles(String table, int value) throws Exception {
159     Path dir = util.getDataTestDirOnTestFS(table);
160     Path bulk1 = new Path(dir, table+value);
161     FileSystem fs = util.getTestFileSystem();
162     buildHFiles(fs, bulk1, value);
163     return bulk1;
164   }
165 
166   /**
167    * Populate table with known values.
168    */
169   private void populateTable(String table, int value) throws Exception {
170     // create HFiles for different column families
171     LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
172     Path bulk1 = buildBulkFiles(table, value);
173     HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
174     lih.doBulkLoad(bulk1, t);
175   }
176 
177   /**
178    * Split the known table in half.  (this is hard coded for this test suite)
179    */
180   private void forceSplit(String table) {
181     try {
182       // need to call regions server to by synchronous but isn't visible.
183       HRegionServer hrs = util.getRSForFirstRegionInTable(Bytes
184           .toBytes(table));
185 
186       for (HRegionInfo hri : ProtobufUtil.getOnlineRegions(hrs)) {
187         if (Bytes.equals(hri.getTable().getName(), Bytes.toBytes(table))) {
188           // splitRegion doesn't work if startkey/endkey are null
189           ProtobufUtil.split(hrs, hri, rowkey(ROWCOUNT / 2)); // hard code split
190         }
191       }
192 
193       // verify that split completed.
194       int regions;
195       do {
196         regions = 0;
197         for (HRegionInfo hri : ProtobufUtil.getOnlineRegions(hrs)) {
198           if (Bytes.equals(hri.getTable().getName(), Bytes.toBytes(table))) {
199             regions++;
200           }
201         }
202         if (regions != 2) {
203           LOG.info("Taking some time to complete split...");
204           Thread.sleep(250);
205         }
206       } while (regions != 2);
207     } catch (IOException e) {
208       e.printStackTrace();
209     } catch (InterruptedException e) {
210       e.printStackTrace();
211     }
212   }
213 
214   @BeforeClass
215   public static void setupCluster() throws Exception {
216     util = new HBaseTestingUtility();
217     util.startMiniCluster(1);
218   }
219 
220   @AfterClass
221   public static void teardownCluster() throws Exception {
222     util.shutdownMiniCluster();
223   }
224 
225   /**
226    * Checks that all columns have the expected value and that there is the
227    * expected number of rows.
228    * @throws IOException
229    */
230   void assertExpectedTable(String table, int count, int value) throws IOException {
231     HTable t = null;
232     try {
233       assertEquals(util.getHBaseAdmin().listTables(table).length, 1);
234       t = new HTable(util.getConfiguration(), table);
235       Scan s = new Scan();
236       ResultScanner sr = t.getScanner(s);
237       int i = 0;
238       for (Result r : sr) {
239         i++;
240         for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
241           for (byte[] val : nm.values()) {
242             assertTrue(Bytes.equals(val, value(value)));
243           }
244         }
245       }
246       assertEquals(count, i);
247     } catch (IOException e) {
248       fail("Failed due to exception");
249     } finally {
250       if (t != null) t.close();
251     }
252   }
253 
254   /**
255    * Test that shows that exception thrown from the RS side will result in an
256    * exception on the LIHFile client.
257    */
258   @Test(expected=IOException.class)
259   public void testBulkLoadPhaseFailure() throws Exception {
260     String table = "bulkLoadPhaseFailure";
261     setupTable(table, 10);
262 
263     final AtomicInteger attmptedCalls = new AtomicInteger();
264     final AtomicInteger failedCalls = new AtomicInteger();
265     util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
266     try {
267       LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
268 
269         protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
270             TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
271             throws IOException {
272           int i = attmptedCalls.incrementAndGet();
273           if (i == 1) {
274             HConnection errConn = null;
275             try {
276               errConn = getMockedConnection(util.getConfiguration());
277             } catch (Exception e) {
278               LOG.fatal("mocking cruft, should never happen", e);
279               throw new RuntimeException("mocking cruft, should never happen");
280             }
281             failedCalls.incrementAndGet();
282             return super.tryAtomicRegionLoad(errConn, tableName, first, lqis);
283           }
284 
285           return super.tryAtomicRegionLoad(conn, tableName, first, lqis);
286         }
287       };
288 
289       // create HFiles for different column families
290       Path dir = buildBulkFiles(table, 1);
291       HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
292       lih.doBulkLoad(dir, t);
293     } finally {
294       util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
295         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
296     }
297 
298     fail("doBulkLoad should have thrown an exception");
299   }
300 
301   private HConnection getMockedConnection(final Configuration conf)
302   throws IOException, ServiceException {
303     HConnection c = Mockito.mock(HConnection.class);
304     Mockito.when(c.getConfiguration()).thenReturn(conf);
305     Mockito.doNothing().when(c).close();
306     // Make it so we return a particular location when asked.
307     final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
308         ServerName.valueOf("example.org", 1234, 0));
309     Mockito.when(c.getRegionLocation((TableName) Mockito.any(),
310         (byte[]) Mockito.any(), Mockito.anyBoolean())).
311       thenReturn(loc);
312     Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
313       thenReturn(loc);
314     ClientProtos.ClientService.BlockingInterface hri =
315       Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
316     Mockito.when(hri.bulkLoadHFile((RpcController)Mockito.any(), (BulkLoadHFileRequest)Mockito.any())).
317       thenThrow(new ServiceException(new IOException("injecting bulk load error")));
318     Mockito.when(c.getClient(Mockito.any(ServerName.class))).
319       thenReturn(hri);
320     return c;
321   }
322 
323   /**
324    * This test exercises the path where there is a split after initial
325    * validation but before the atomic bulk load call. We cannot use presplitting
326    * to test this path, so we actually inject a split just before the atomic
327    * region load.
328    */
329   @Test
330   public void testSplitWhileBulkLoadPhase() throws Exception {
331     final String table = "splitWhileBulkloadPhase";
332     setupTable(table, 10);
333     populateTable(table,1);
334     assertExpectedTable(table, ROWCOUNT, 1);
335 
336     // Now let's cause trouble.  This will occur after checks and cause bulk
337     // files to fail when attempt to atomically import.  This is recoverable.
338     final AtomicInteger attemptedCalls = new AtomicInteger();
339     LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(
340         util.getConfiguration()) {
341 
342       protected void bulkLoadPhase(final HTable htable, final HConnection conn,
343           ExecutorService pool, Deque<LoadQueueItem> queue,
344           final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
345         int i = attemptedCalls.incrementAndGet();
346         if (i == 1) {
347           // On first attempt force a split.
348           forceSplit(table);
349         }
350 
351         super.bulkLoadPhase(htable, conn, pool, queue, regionGroups);
352       }
353     };
354 
355     // create HFiles for different column families
356     HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
357     Path bulk = buildBulkFiles(table, 2);
358     lih2.doBulkLoad(bulk, t);
359 
360     // check that data was loaded
361     // The three expected attempts are 1) failure because need to split, 2)
362     // load of split top 3) load of split bottom
363     assertEquals(attemptedCalls.get(), 3);
364     assertExpectedTable(table, ROWCOUNT, 2);
365   }
366 
367   /**
368    * This test splits a table and attempts to bulk load.  The bulk import files
369    * should be split before atomically importing.
370    */
371   @Test
372   public void testGroupOrSplitPresplit() throws Exception {
373     final String table = "groupOrSplitPresplit";
374     setupTable(table, 10);
375     populateTable(table, 1);
376     assertExpectedTable(table, ROWCOUNT, 1);
377     forceSplit(table);
378 
379     final AtomicInteger countedLqis= new AtomicInteger();
380     LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
381         util.getConfiguration()) {
382       protected List<LoadQueueItem> groupOrSplit(
383           Multimap<ByteBuffer, LoadQueueItem> regionGroups,
384           final LoadQueueItem item, final HTable htable,
385           final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
386         List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
387         if (lqis != null) {
388           countedLqis.addAndGet(lqis.size());
389         }
390         return lqis;
391       }
392     };
393 
394     // create HFiles for different column families
395     Path bulk = buildBulkFiles(table, 2);
396     HTable ht = new HTable(util.getConfiguration(), Bytes.toBytes(table));
397     lih.doBulkLoad(bulk, ht);
398 
399     assertExpectedTable(table, ROWCOUNT, 2);
400     assertEquals(20, countedLqis.get());
401   }
402 
403   /**
404    * This simulates an remote exception which should cause LIHF to exit with an
405    * exception.
406    */
407   @Test(expected = IOException.class)
408   public void testGroupOrSplitFailure() throws Exception {
409     String table = "groupOrSplitFailure";
410     setupTable(table, 10);
411 
412     LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
413         util.getConfiguration()) {
414       int i = 0;
415 
416       protected List<LoadQueueItem> groupOrSplit(
417           Multimap<ByteBuffer, LoadQueueItem> regionGroups,
418           final LoadQueueItem item, final HTable table,
419           final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
420         i++;
421 
422         if (i == 5) {
423           throw new IOException("failure");
424         }
425         return super.groupOrSplit(regionGroups, item, table, startEndKeys);
426       }
427     };
428 
429     // create HFiles for different column families
430     Path dir = buildBulkFiles(table,1);
431     HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
432     lih.doBulkLoad(dir, t);
433 
434     fail("doBulkLoad should have thrown an exception");
435   }
436 
437   @Test
438   public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
439     String tableName = "testGroupOrSplitWhenRegionHoleExistsInMeta";
440     byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
441     HTable table = new HTable(util.getConfiguration(), Bytes.toBytes(tableName));
442 
443     setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
444     Path dir = buildBulkFiles(tableName, 2);
445 
446     final AtomicInteger countedLqis = new AtomicInteger();
447     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
448 
449       protected List<LoadQueueItem>
450           groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
451               final HTable htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
452         List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
453         if (lqis != null) {
454           countedLqis.addAndGet(lqis.size());
455         }
456         return lqis;
457       }
458     };
459 
460     // do bulkload when there is no region hole in hbase:meta.
461     try {
462       loader.doBulkLoad(dir, table);
463     } catch (Exception e) {
464       LOG.error("exeception=", e);
465     }
466     // check if all the data are loaded into the table.
467     this.assertExpectedTable(tableName, ROWCOUNT, 2);
468 
469     dir = buildBulkFiles(tableName, 3);
470 
471     // Mess it up by leaving a hole in the hbase:meta
472     CatalogTracker ct = new CatalogTracker(util.getConfiguration());
473     List<HRegionInfo> regionInfos = MetaReader.getTableRegions(ct, TableName.valueOf(tableName));
474     for (HRegionInfo regionInfo : regionInfos) {
475       if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
476         MetaEditor.deleteRegion(ct, regionInfo);
477         break;
478       }
479     }
480 
481     try {
482       loader.doBulkLoad(dir, table);
483     } catch (Exception e) {
484       LOG.error("exeception=", e);
485       assertTrue("IOException expected", e instanceof IOException);
486     }
487 
488     table.close();
489 
490     this.assertExpectedTable(tableName, ROWCOUNT, 2);
491   }
492 }
493