View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  import static org.mockito.Matchers.anyObject;
25  import static org.mockito.Mockito.doAnswer;
26  import static org.mockito.Mockito.doReturn;
27  import static org.mockito.Mockito.doThrow;
28  import static org.mockito.Mockito.mock;
29  import static org.mockito.Mockito.spy;
30  
31  import java.io.IOException;
32  import java.util.Arrays;
33  import java.util.Map;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.*;
38  import org.apache.hadoop.hbase.client.HTable;
39  import org.apache.hadoop.hbase.client.Put;
40  import org.apache.hadoop.hbase.client.Result;
41  import org.apache.hadoop.hbase.client.ResultScanner;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.junit.AfterClass;
46  import org.junit.Before;
47  import org.junit.BeforeClass;
48  import org.junit.Test;
49  import org.junit.experimental.categories.Category;
50  import org.mockito.invocation.InvocationOnMock;
51  import org.mockito.stubbing.Answer;
52  
53  /**
54   * This tests the TableInputFormat and its recovery semantics
55   * 
56   */
57  @Category(LargeTests.class)
58  public class TestTableInputFormat {
59  
60    private static final Log LOG = LogFactory.getLog(TestTableInputFormat.class);
61  
62    private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
63    static final byte[] FAMILY = Bytes.toBytes("family");
64  
65    private static final byte[][] columns = new byte[][] { FAMILY };
66  
67    @BeforeClass
68    public static void beforeClass() throws Exception {
69      UTIL.startMiniCluster();
70    }
71  
72    @AfterClass
73    public static void afterClass() throws Exception {
74      UTIL.shutdownMiniCluster();
75    }
76  
77    @Before
78    public void before() throws IOException {
79      LOG.info("before");
80      UTIL.ensureSomeRegionServersAvailable(1);
81      LOG.info("before done");
82    }
83  
84    /**
85     * Setup a table with two rows and values.
86     * 
87     * @param tableName
88     * @return
89     * @throws IOException
90     */
91    public static HTable createTable(byte[] tableName) throws IOException {
92      HTable table = UTIL.createTable(tableName, FAMILY);
93      Put p = new Put("aaa".getBytes());
94      p.add(FAMILY, null, "value aaa".getBytes());
95      table.put(p);
96      p = new Put("bbb".getBytes());
97      p.add(FAMILY, null, "value bbb".getBytes());
98      table.put(p);
99      return table;
100   }
101 
102   /**
103    * Verify that the result and key have expected values.
104    * 
105    * @param r
106    * @param key
107    * @param expectedKey
108    * @param expectedValue
109    * @return
110    */
111   static boolean checkResult(Result r, ImmutableBytesWritable key,
112       byte[] expectedKey, byte[] expectedValue) {
113     assertEquals(0, key.compareTo(expectedKey));
114     Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
115     byte[] value = vals.values().iterator().next();
116     assertTrue(Arrays.equals(value, expectedValue));
117     return true; // if succeed
118   }
119 
120   /**
121    * Create table data and run tests on specified htable using the
122    * o.a.h.hbase.mapred API.
123    * 
124    * @param table
125    * @throws IOException
126    */
127   static void runTestMapred(HTable table) throws IOException {
128     org.apache.hadoop.hbase.mapred.TableRecordReader trr = 
129         new org.apache.hadoop.hbase.mapred.TableRecordReader();
130     trr.setStartRow("aaa".getBytes());
131     trr.setEndRow("zzz".getBytes());
132     trr.setHTable(table);
133     trr.setInputColumns(columns);
134 
135     trr.init();
136     Result r = new Result();
137     ImmutableBytesWritable key = new ImmutableBytesWritable();
138 
139     boolean more = trr.next(key, r);
140     assertTrue(more);
141     checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
142 
143     more = trr.next(key, r);
144     assertTrue(more);
145     checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
146 
147     // no more data
148     more = trr.next(key, r);
149     assertFalse(more);
150   }
151 
152   /**
153    * Create table data and run tests on specified htable using the
154    * o.a.h.hbase.mapreduce API.
155    * 
156    * @param table
157    * @throws IOException
158    * @throws InterruptedException
159    */
160   static void runTestMapreduce(HTable table) throws IOException,
161       InterruptedException {
162     org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr = 
163         new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl();
164     Scan s = new Scan();
165     s.setStartRow("aaa".getBytes());
166     s.setStopRow("zzz".getBytes());
167     s.addFamily(FAMILY);
168     trr.setScan(s);
169     trr.setHTable(table);
170 
171     trr.initialize(null, null);
172     Result r = new Result();
173     ImmutableBytesWritable key = new ImmutableBytesWritable();
174 
175     boolean more = trr.nextKeyValue();
176     assertTrue(more);
177     key = trr.getCurrentKey();
178     r = trr.getCurrentValue();
179     checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
180 
181     more = trr.nextKeyValue();
182     assertTrue(more);
183     key = trr.getCurrentKey();
184     r = trr.getCurrentValue();
185     checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
186 
187     // no more data
188     more = trr.nextKeyValue();
189     assertFalse(more);
190   }
191 
192   /**
193    * Create a table that IOE's on first scanner next call
194    * 
195    * @throws IOException
196    */
197   static HTable createIOEScannerTable(byte[] name, final int failCnt)
198       throws IOException {
199     // build up a mock scanner stuff to fail the first time
200     Answer<ResultScanner> a = new Answer<ResultScanner>() {
201       int cnt = 0;
202 
203       @Override
204       public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
205         // first invocation return the busted mock scanner
206         if (cnt++ < failCnt) {
207           // create mock ResultScanner that always fails.
208           Scan scan = mock(Scan.class);
209           doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
210           ResultScanner scanner = mock(ResultScanner.class);
211           // simulate TimeoutException / IOException
212           doThrow(new IOException("Injected exception")).when(scanner).next();
213           return scanner;
214         }
215 
216         // otherwise return the real scanner.
217         return (ResultScanner) invocation.callRealMethod();
218       }
219     };
220 
221     HTable htable = spy(createTable(name));
222     doAnswer(a).when(htable).getScanner((Scan) anyObject());
223     return htable;
224   }
225 
226   /**
227    * Create a table that throws a DoNoRetryIOException on first scanner next
228    * call
229    * 
230    * @throws IOException
231    */
232   static HTable createDNRIOEScannerTable(byte[] name, final int failCnt)
233       throws IOException {
234     // build up a mock scanner stuff to fail the first time
235     Answer<ResultScanner> a = new Answer<ResultScanner>() {
236       int cnt = 0;
237 
238       @Override
239       public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
240         // first invocation return the busted mock scanner
241         if (cnt++ < failCnt) {
242           // create mock ResultScanner that always fails.
243           Scan scan = mock(Scan.class);
244           doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
245           ResultScanner scanner = mock(ResultScanner.class);
246 
247           invocation.callRealMethod(); // simulate UnknownScannerException
248           doThrow(
249               new UnknownScannerException("Injected simulated TimeoutException"))
250               .when(scanner).next();
251           return scanner;
252         }
253 
254         // otherwise return the real scanner.
255         return (ResultScanner) invocation.callRealMethod();
256       }
257     };
258 
259     HTable htable = spy(createTable(name));
260     doAnswer(a).when(htable).getScanner((Scan) anyObject());
261     return htable;
262   }
263 
264   /**
265    * Run test assuming no errors using mapred api.
266    * 
267    * @throws IOException
268    */
269   @Test
270   public void testTableRecordReader() throws IOException {
271     HTable table = createTable("table1".getBytes());
272     runTestMapred(table);
273   }
274 
275   /**
276    * Run test assuming Scanner IOException failure using mapred api,
277    * 
278    * @throws IOException
279    */
280   @Test
281   public void testTableRecordReaderScannerFail() throws IOException {
282     HTable htable = createIOEScannerTable("table2".getBytes(), 1);
283     runTestMapred(htable);
284   }
285 
286   /**
287    * Run test assuming Scanner IOException failure using mapred api,
288    * 
289    * @throws IOException
290    */
291   @Test(expected = IOException.class)
292   public void testTableRecordReaderScannerFailTwice() throws IOException {
293     HTable htable = createIOEScannerTable("table3".getBytes(), 2);
294     runTestMapred(htable);
295   }
296 
297   /**
298    * Run test assuming UnknownScannerException (which is a type of
299    * DoNotRetryIOException) using mapred api.
300    * 
301    * @throws org.apache.hadoop.hbase.DoNotRetryIOException
302    */
303   @Test
304   public void testTableRecordReaderScannerTimeout() throws IOException {
305     HTable htable = createDNRIOEScannerTable("table4".getBytes(), 1);
306     runTestMapred(htable);
307   }
308 
309   /**
310    * Run test assuming UnknownScannerException (which is a type of
311    * DoNotRetryIOException) using mapred api.
312    * 
313    * @throws org.apache.hadoop.hbase.DoNotRetryIOException
314    */
315   @Test(expected = org.apache.hadoop.hbase.DoNotRetryIOException.class)
316   public void testTableRecordReaderScannerTimeoutTwice() throws IOException {
317     HTable htable = createDNRIOEScannerTable("table5".getBytes(), 2);
318     runTestMapred(htable);
319   }
320 
321   /**
322    * Run test assuming no errors using newer mapreduce api
323    * 
324    * @throws IOException
325    * @throws InterruptedException
326    */
327   @Test
328   public void testTableRecordReaderMapreduce() throws IOException,
329       InterruptedException {
330     HTable table = createTable("table1-mr".getBytes());
331     runTestMapreduce(table);
332   }
333 
334   /**
335    * Run test assuming Scanner IOException failure using newer mapreduce api
336    * 
337    * @throws IOException
338    * @throws InterruptedException
339    */
340   @Test
341   public void testTableRecordReaderScannerFailMapreduce() throws IOException,
342       InterruptedException {
343     HTable htable = createIOEScannerTable("table2-mr".getBytes(), 1);
344     runTestMapreduce(htable);
345   }
346 
347   /**
348    * Run test assuming Scanner IOException failure using newer mapreduce api
349    * 
350    * @throws IOException
351    * @throws InterruptedException
352    */
353   @Test(expected = IOException.class)
354   public void testTableRecordReaderScannerFailMapreduceTwice() throws IOException,
355       InterruptedException {
356     HTable htable = createIOEScannerTable("table3-mr".getBytes(), 2);
357     runTestMapreduce(htable);
358   }
359 
360   /**
361    * Run test assuming UnknownScannerException (which is a type of
362    * DoNotRetryIOException) using newer mapreduce api
363    * 
364    * @throws InterruptedException
365    * @throws org.apache.hadoop.hbase.DoNotRetryIOException
366    */
367   @Test
368   public void testTableRecordReaderScannerTimeoutMapreduce()
369       throws IOException, InterruptedException {
370     HTable htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1);
371     runTestMapreduce(htable);
372   }
373 
374   /**
375    * Run test assuming UnknownScannerException (which is a type of
376    * DoNotRetryIOException) using newer mapreduce api
377    * 
378    * @throws InterruptedException
379    * @throws org.apache.hadoop.hbase.DoNotRetryIOException
380    */
381   @Test(expected = org.apache.hadoop.hbase.DoNotRetryIOException.class)
382   public void testTableRecordReaderScannerTimeoutMapreduceTwice()
383       throws IOException, InterruptedException {
384     HTable htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2);
385     runTestMapreduce(htable);
386   }
387 
388 }
389