View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  import static org.junit.Assert.fail;
23  
24  import java.io.IOException;
25  import java.nio.ByteBuffer;
26  import java.util.Collection;
27  import java.util.Deque;
28  import java.util.List;
29  import java.util.NavigableMap;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.HBaseTestingUtility;
39  import org.apache.hadoop.hbase.HColumnDescriptor;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HRegionInfo;
42  import org.apache.hadoop.hbase.HRegionLocation;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.MetaTableAccessor;
45  import org.apache.hadoop.hbase.ServerName;
46  import org.apache.hadoop.hbase.TableExistsException;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.Admin;
49  import org.apache.hadoop.hbase.client.Connection;
50  import org.apache.hadoop.hbase.client.ConnectionFactory;
51  import org.apache.hadoop.hbase.client.HConnection;
52  import org.apache.hadoop.hbase.client.HTable;
53  import org.apache.hadoop.hbase.client.Result;
54  import org.apache.hadoop.hbase.client.ResultScanner;
55  import org.apache.hadoop.hbase.client.Scan;
56  import org.apache.hadoop.hbase.client.Table;
57  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
58  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
59  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
60  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
61  import org.apache.hadoop.hbase.regionserver.HRegionServer;
62  import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
63  import org.apache.hadoop.hbase.util.Bytes;
64  import org.apache.hadoop.hbase.util.Pair;
65  import org.apache.hadoop.hbase.testclassification.LargeTests;
66  import org.junit.AfterClass;
67  import org.junit.BeforeClass;
68  import org.junit.Test;
69  import org.junit.experimental.categories.Category;
70  import org.mockito.Mockito;
71  
72  import com.google.common.collect.Multimap;
73  import com.google.protobuf.RpcController;
74  import com.google.protobuf.ServiceException;
75  
76  /**
77   * Test cases for the atomic load error handling of the bulk load functionality.
78   */
79  @Category(LargeTests.class)
80  public class TestLoadIncrementalHFilesSplitRecovery {
81    private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
82  
83    static HBaseTestingUtility util;
84    //used by secure subclass
85    static boolean useSecure = false;
86  
87    final static int NUM_CFS = 10;
88    final static byte[] QUAL = Bytes.toBytes("qual");
89    final static int ROWCOUNT = 100;
90  
91    private final static byte[][] families = new byte[NUM_CFS][];
92    static {
93      for (int i = 0; i < NUM_CFS; i++) {
94        families[i] = Bytes.toBytes(family(i));
95      }
96    }
97  
98    static byte[] rowkey(int i) {
99      return Bytes.toBytes(String.format("row_%08d", i));
100   }
101 
102   static String family(int i) {
103     return String.format("family_%04d", i);
104   }
105 
106   static byte[] value(int i) {
107     return Bytes.toBytes(String.format("%010d", i));
108   }
109 
110   public static void buildHFiles(FileSystem fs, Path dir, int value)
111       throws IOException {
112     byte[] val = value(value);
113     for (int i = 0; i < NUM_CFS; i++) {
114       Path testIn = new Path(dir, family(i));
115 
116       TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
117           Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
118     }
119   }
120 
121   /**
122    * Creates a table with given table name and specified number of column
123    * families if the table does not already exist.
124    */
125   private void setupTable(final Connection connection, TableName table, int cfs)
126   throws IOException {
127     try {
128       LOG.info("Creating table " + table);
129       HTableDescriptor htd = new HTableDescriptor(table);
130       for (int i = 0; i < cfs; i++) {
131         htd.addFamily(new HColumnDescriptor(family(i)));
132       }
133       try (Admin admin = connection.getAdmin()) {
134         admin.createTable(htd);
135       }
136     } catch (TableExistsException tee) {
137       LOG.info("Table " + table + " already exists");
138     }
139   }
140 
141   /**
142    * Creates a table with given table name,specified number of column families<br>
143    * and splitkeys if the table does not already exist.
144    * @param table
145    * @param cfs
146    * @param SPLIT_KEYS
147    */
148   private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
149       throws IOException {
150     try {
151       LOG.info("Creating table " + table);
152       HTableDescriptor htd = new HTableDescriptor(table);
153       for (int i = 0; i < cfs; i++) {
154         htd.addFamily(new HColumnDescriptor(family(i)));
155       }
156 
157       util.createTable(htd, SPLIT_KEYS);
158     } catch (TableExistsException tee) {
159       LOG.info("Table " + table + " already exists");
160     }
161   }
162 
163   private Path buildBulkFiles(TableName table, int value) throws Exception {
164     Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
165     Path bulk1 = new Path(dir, table.getNameAsString() + value);
166     FileSystem fs = util.getTestFileSystem();
167     buildHFiles(fs, bulk1, value);
168     return bulk1;
169   }
170 
171   /**
172    * Populate table with known values.
173    */
174   private void populateTable(final Connection connection, TableName table, int value)
175   throws Exception {
176     // create HFiles for different column families
177     LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
178     Path bulk1 = buildBulkFiles(table, value);
179     try (Table t = connection.getTable(table)) {
180       lih.doBulkLoad(bulk1, (HTable)t);
181     }
182   }
183 
184   /**
185    * Split the known table in half.  (this is hard coded for this test suite)
186    */
187   private void forceSplit(TableName table) {
188     try {
189       // need to call regions server to by synchronous but isn't visible.
190       HRegionServer hrs = util.getRSForFirstRegionInTable(table);
191 
192       for (HRegionInfo hri :
193           ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
194         if (hri.getTable().equals(table)) {
195           // splitRegion doesn't work if startkey/endkey are null
196           ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
197         }
198       }
199 
200       // verify that split completed.
201       int regions;
202       do {
203         regions = 0;
204         for (HRegionInfo hri :
205             ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
206           if (hri.getTable().equals(table)) {
207             regions++;
208           }
209         }
210         if (regions != 2) {
211           LOG.info("Taking some time to complete split...");
212           Thread.sleep(250);
213         }
214       } while (regions != 2);
215     } catch (IOException e) {
216       e.printStackTrace();
217     } catch (InterruptedException e) {
218       e.printStackTrace();
219     }
220   }
221 
222   @BeforeClass
223   public static void setupCluster() throws Exception {
224     util = new HBaseTestingUtility();
225     util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,"");
226     util.startMiniCluster(1);
227   }
228 
229   @AfterClass
230   public static void teardownCluster() throws Exception {
231     util.shutdownMiniCluster();
232   }
233 
234   /**
235    * Checks that all columns have the expected value and that there is the
236    * expected number of rows.
237    * @throws IOException
238    */
239   void assertExpectedTable(TableName table, int count, int value) throws IOException {
240     HTableDescriptor [] htds = util.getHBaseAdmin().listTables(table.getNameAsString());
241     assertEquals(htds.length, 1);
242     Table t = null;
243     try {
244       t = new HTable(util.getConfiguration(), table);
245       Scan s = new Scan();
246       ResultScanner sr = t.getScanner(s);
247       int i = 0;
248       for (Result r : sr) {
249         i++;
250         for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
251           for (byte[] val : nm.values()) {
252             assertTrue(Bytes.equals(val, value(value)));
253           }
254         }
255       }
256       assertEquals(count, i);
257     } catch (IOException e) {
258       fail("Failed due to exception");
259     } finally {
260       if (t != null) t.close();
261     }
262   }
263 
264   /**
265    * Test that shows that exception thrown from the RS side will result in an
266    * exception on the LIHFile client.
267    */
268   @Test(expected=IOException.class, timeout=120000)
269   public void testBulkLoadPhaseFailure() throws Exception {
270     TableName table = TableName.valueOf("bulkLoadPhaseFailure");
271     final AtomicInteger attmptedCalls = new AtomicInteger();
272     final AtomicInteger failedCalls = new AtomicInteger();
273     util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
274     try (Connection connection = ConnectionFactory.createConnection(this.util.getConfiguration())) {
275       setupTable(connection, table, 10);
276       LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
277         @Override
278         protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
279             TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
280                 throws IOException {
281           int i = attmptedCalls.incrementAndGet();
282           if (i == 1) {
283             Connection errConn = null;
284             try {
285               errConn = getMockedConnection(util.getConfiguration());
286             } catch (Exception e) {
287               LOG.fatal("mocking cruft, should never happen", e);
288               throw new RuntimeException("mocking cruft, should never happen");
289             }
290             failedCalls.incrementAndGet();
291             return super.tryAtomicRegionLoad((HConnection)errConn, tableName, first, lqis);
292           }
293 
294           return super.tryAtomicRegionLoad((HConnection)conn, tableName, first, lqis);
295         }
296       };
297       try {
298         // create HFiles for different column families
299         Path dir = buildBulkFiles(table, 1);
300         try (Table t = connection.getTable(table)) {
301           lih.doBulkLoad(dir, (HTable)t);
302         }
303       } finally {
304         util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
305             HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
306       }
307       fail("doBulkLoad should have thrown an exception");
308     }
309   }
310 
311   @SuppressWarnings("deprecation")
312   private HConnection getMockedConnection(final Configuration conf)
313   throws IOException, ServiceException {
314     HConnection c = Mockito.mock(HConnection.class);
315     Mockito.when(c.getConfiguration()).thenReturn(conf);
316     Mockito.doNothing().when(c).close();
317     // Make it so we return a particular location when asked.
318     final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
319         ServerName.valueOf("example.org", 1234, 0));
320     Mockito.when(c.getRegionLocation((TableName) Mockito.any(),
321         (byte[]) Mockito.any(), Mockito.anyBoolean())).
322       thenReturn(loc);
323     Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
324       thenReturn(loc);
325     ClientProtos.ClientService.BlockingInterface hri =
326       Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
327     Mockito.when(hri.bulkLoadHFile((RpcController)Mockito.any(), (BulkLoadHFileRequest)Mockito.any())).
328       thenThrow(new ServiceException(new IOException("injecting bulk load error")));
329     Mockito.when(c.getClient(Mockito.any(ServerName.class))).
330       thenReturn(hri);
331     return c;
332   }
333 
334   /**
335    * This test exercises the path where there is a split after initial
336    * validation but before the atomic bulk load call. We cannot use presplitting
337    * to test this path, so we actually inject a split just before the atomic
338    * region load.
339    */
340   @Test (timeout=120000)
341   public void testSplitWhileBulkLoadPhase() throws Exception {
342     final TableName table = TableName.valueOf("splitWhileBulkloadPhase");
343     try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
344       setupTable(connection, table, 10);
345       populateTable(connection, table,1);
346       assertExpectedTable(table, ROWCOUNT, 1);
347 
348       // Now let's cause trouble.  This will occur after checks and cause bulk
349       // files to fail when attempt to atomically import.  This is recoverable.
350       final AtomicInteger attemptedCalls = new AtomicInteger();
351       LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) {
352         @Override
353         protected void bulkLoadPhase(final Table htable, final Connection conn,
354             ExecutorService pool, Deque<LoadQueueItem> queue,
355             final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
356           int i = attemptedCalls.incrementAndGet();
357           if (i == 1) {
358             // On first attempt force a split.
359             forceSplit(table);
360           }
361           super.bulkLoadPhase(htable, conn, pool, queue, regionGroups);
362         }
363       };
364 
365       // create HFiles for different column families
366       try (Table t = connection.getTable(table)) {
367         Path bulk = buildBulkFiles(table, 2);
368         lih2.doBulkLoad(bulk, (HTable)t);
369       }
370 
371       // check that data was loaded
372       // The three expected attempts are 1) failure because need to split, 2)
373       // load of split top 3) load of split bottom
374       assertEquals(attemptedCalls.get(), 3);
375       assertExpectedTable(table, ROWCOUNT, 2);
376     }
377   }
378 
379   /**
380    * This test splits a table and attempts to bulk load.  The bulk import files
381    * should be split before atomically importing.
382    */
383   @Test (timeout=120000)
384   public void testGroupOrSplitPresplit() throws Exception {
385     final TableName table = TableName.valueOf("groupOrSplitPresplit");
386     try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
387       setupTable(connection, table, 10);
388       populateTable(connection, table, 1);
389       assertExpectedTable(connection, table, ROWCOUNT, 1);
390       forceSplit(table);
391 
392       final AtomicInteger countedLqis= new AtomicInteger();
393       LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
394           util.getConfiguration()) {
395         @Override
396         protected List<LoadQueueItem> groupOrSplit(
397             Multimap<ByteBuffer, LoadQueueItem> regionGroups,
398             final LoadQueueItem item, final Table htable,
399             final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
400           List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
401           if (lqis != null) {
402             countedLqis.addAndGet(lqis.size());
403           }
404           return lqis;
405         }
406       };
407 
408       // create HFiles for different column families
409       Path bulk = buildBulkFiles(table, 2);
410       try (Table t = connection.getTable(table)) {
411         lih.doBulkLoad(bulk, (HTable)t);
412       }
413       assertExpectedTable(connection, table, ROWCOUNT, 2);
414       assertEquals(20, countedLqis.get());
415     }
416   }
417 
418   /**
419    * This simulates an remote exception which should cause LIHF to exit with an
420    * exception.
421    */
422   @Test(expected = IOException.class, timeout=120000)
423   public void testGroupOrSplitFailure() throws Exception {
424     TableName table = TableName.valueOf("groupOrSplitFailure");
425     try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
426       setupTable(connection, table, 10);
427 
428       LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
429           util.getConfiguration()) {
430         int i = 0;
431 
432         @Override
433         protected List<LoadQueueItem> groupOrSplit(
434             Multimap<ByteBuffer, LoadQueueItem> regionGroups,
435             final LoadQueueItem item, final Table table,
436             final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
437           i++;
438 
439           if (i == 5) {
440             throw new IOException("failure");
441           }
442           return super.groupOrSplit(regionGroups, item, table, startEndKeys);
443         }
444       };
445 
446       // create HFiles for different column families
447       Path dir = buildBulkFiles(table,1);
448       try (Table t = connection.getTable(table)) {
449         lih.doBulkLoad(dir, (HTable)t);
450       }
451     }
452 
453     fail("doBulkLoad should have thrown an exception");
454   }
455 
456   @Test (timeout=120000)
457   public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
458     TableName tableName = TableName.valueOf("testGroupOrSplitWhenRegionHoleExistsInMeta");
459     byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
460     // Share connection. We were failing to find the table with our new reverse scan because it
461     // looks for first region, not any region -- that is how it works now.  The below removes first
462     // region in test.  Was reliant on the Connection caching having first region.
463     Connection connection = ConnectionFactory.createConnection(util.getConfiguration());
464     Table table = connection.getTable(tableName);
465 
466     setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
467     Path dir = buildBulkFiles(tableName, 2);
468 
469     final AtomicInteger countedLqis = new AtomicInteger();
470     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
471 
472       @Override
473       protected List<LoadQueueItem> groupOrSplit(
474           Multimap<ByteBuffer, LoadQueueItem> regionGroups,
475           final LoadQueueItem item, final Table htable,
476           final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
477         List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
478         if (lqis != null) {
479           countedLqis.addAndGet(lqis.size());
480         }
481         return lqis;
482       }
483     };
484 
485     // do bulkload when there is no region hole in hbase:meta.
486     try {
487       loader.doBulkLoad(dir, (HTable)table);
488     } catch (Exception e) {
489       LOG.error("exeception=", e);
490     }
491     // check if all the data are loaded into the table.
492     this.assertExpectedTable(tableName, ROWCOUNT, 2);
493 
494     dir = buildBulkFiles(tableName, 3);
495 
496     // Mess it up by leaving a hole in the hbase:meta
497     List<HRegionInfo> regionInfos = MetaTableAccessor.getTableRegions(util.getZooKeeperWatcher(),
498       connection, tableName);
499     for (HRegionInfo regionInfo : regionInfos) {
500       if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
501         MetaTableAccessor.deleteRegion(connection, regionInfo);
502         break;
503       }
504     }
505 
506     try {
507       loader.doBulkLoad(dir, (HTable)table);
508     } catch (Exception e) {
509       LOG.error("exeception=", e);
510       assertTrue("IOException expected", e instanceof IOException);
511     }
512 
513     table.close();
514 
515     // Make sure at least the one region that still exists can be found.
516     regionInfos = MetaTableAccessor.getTableRegions(util.getZooKeeperWatcher(),
517       connection, tableName);
518     assertTrue(regionInfos.size() >= 1);
519 
520     this.assertExpectedTable(connection, tableName, ROWCOUNT, 2);
521     connection.close();
522   }
523 
524   /**
525    * Checks that all columns have the expected value and that there is the
526    * expected number of rows.
527    * @throws IOException
528    */
529   void assertExpectedTable(final Connection connection, TableName table, int count, int value)
530   throws IOException {
531     HTableDescriptor [] htds = util.getHBaseAdmin().listTables(table.getNameAsString());
532     assertEquals(htds.length, 1);
533     Table t = null;
534     try {
535       t = connection.getTable(table);
536       Scan s = new Scan();
537       ResultScanner sr = t.getScanner(s);
538       int i = 0;
539       for (Result r : sr) {
540         i++;
541         for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
542           for (byte[] val : nm.values()) {
543             assertTrue(Bytes.equals(val, value(value)));
544           }
545         }
546       }
547       assertEquals(count, i);
548     } catch (IOException e) {
549       fail("Failed due to exception");
550     } finally {
551       if (t != null) t.close();
552     }
553   }
554 }