1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24 import static org.mockito.Matchers.anyObject;
25 import static org.mockito.Mockito.doAnswer;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.doThrow;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.spy;
30
31 import java.io.IOException;
32 import java.util.Arrays;
33 import java.util.Map;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.*;
38 import org.apache.hadoop.hbase.client.HTable;
39 import org.apache.hadoop.hbase.client.Put;
40 import org.apache.hadoop.hbase.client.Result;
41 import org.apache.hadoop.hbase.client.ResultScanner;
42 import org.apache.hadoop.hbase.client.Scan;
43 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.junit.AfterClass;
46 import org.junit.Before;
47 import org.junit.BeforeClass;
48 import org.junit.Test;
49 import org.junit.experimental.categories.Category;
50 import org.mockito.invocation.InvocationOnMock;
51 import org.mockito.stubbing.Answer;
52
53
54
55
56
57 @Category(LargeTests.class)
58 public class TestTableInputFormat {
59
60 private static final Log LOG = LogFactory.getLog(TestTableInputFormat.class);
61
62 private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
63 static final byte[] FAMILY = Bytes.toBytes("family");
64
65 private static final byte[][] columns = new byte[][] { FAMILY };
66
67 @BeforeClass
68 public static void beforeClass() throws Exception {
69 UTIL.startMiniCluster();
70 }
71
72 @AfterClass
73 public static void afterClass() throws Exception {
74 UTIL.shutdownMiniCluster();
75 }
76
77 @Before
78 public void before() throws IOException {
79 LOG.info("before");
80 UTIL.ensureSomeRegionServersAvailable(1);
81 LOG.info("before done");
82 }
83
84
85
86
87
88
89
90
91 public static HTable createTable(byte[] tableName) throws IOException {
92 HTable table = UTIL.createTable(tableName, FAMILY);
93 Put p = new Put("aaa".getBytes());
94 p.add(FAMILY, null, "value aaa".getBytes());
95 table.put(p);
96 p = new Put("bbb".getBytes());
97 p.add(FAMILY, null, "value bbb".getBytes());
98 table.put(p);
99 return table;
100 }
101
102
103
104
105
106
107
108
109
110
111 static boolean checkResult(Result r, ImmutableBytesWritable key,
112 byte[] expectedKey, byte[] expectedValue) {
113 assertEquals(0, key.compareTo(expectedKey));
114 Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
115 byte[] value = vals.values().iterator().next();
116 assertTrue(Arrays.equals(value, expectedValue));
117 return true;
118 }
119
120
121
122
123
124
125
126
127 static void runTestMapred(HTable table) throws IOException {
128 org.apache.hadoop.hbase.mapred.TableRecordReader trr =
129 new org.apache.hadoop.hbase.mapred.TableRecordReader();
130 trr.setStartRow("aaa".getBytes());
131 trr.setEndRow("zzz".getBytes());
132 trr.setHTable(table);
133 trr.setInputColumns(columns);
134
135 trr.init();
136 Result r = new Result();
137 ImmutableBytesWritable key = new ImmutableBytesWritable();
138
139 boolean more = trr.next(key, r);
140 assertTrue(more);
141 checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
142
143 more = trr.next(key, r);
144 assertTrue(more);
145 checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
146
147
148 more = trr.next(key, r);
149 assertFalse(more);
150 }
151
152
153
154
155
156
157
158
159
160 static void runTestMapreduce(HTable table) throws IOException,
161 InterruptedException {
162 org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr =
163 new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl();
164 Scan s = new Scan();
165 s.setStartRow("aaa".getBytes());
166 s.setStopRow("zzz".getBytes());
167 s.addFamily(FAMILY);
168 trr.setScan(s);
169 trr.setHTable(table);
170
171 trr.initialize(null, null);
172 Result r = new Result();
173 ImmutableBytesWritable key = new ImmutableBytesWritable();
174
175 boolean more = trr.nextKeyValue();
176 assertTrue(more);
177 key = trr.getCurrentKey();
178 r = trr.getCurrentValue();
179 checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
180
181 more = trr.nextKeyValue();
182 assertTrue(more);
183 key = trr.getCurrentKey();
184 r = trr.getCurrentValue();
185 checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
186
187
188 more = trr.nextKeyValue();
189 assertFalse(more);
190 }
191
192
193
194
195
196
197 static HTable createIOEScannerTable(byte[] name, final int failCnt)
198 throws IOException {
199
200 Answer<ResultScanner> a = new Answer<ResultScanner>() {
201 int cnt = 0;
202
203 @Override
204 public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
205
206 if (cnt++ < failCnt) {
207
208 Scan scan = mock(Scan.class);
209 doReturn("bogus".getBytes()).when(scan).getStartRow();
210 ResultScanner scanner = mock(ResultScanner.class);
211
212 doThrow(new IOException("Injected exception")).when(scanner).next();
213 return scanner;
214 }
215
216
217 return (ResultScanner) invocation.callRealMethod();
218 }
219 };
220
221 HTable htable = spy(createTable(name));
222 doAnswer(a).when(htable).getScanner((Scan) anyObject());
223 return htable;
224 }
225
226
227
228
229
230
231
232 static HTable createDNRIOEScannerTable(byte[] name, final int failCnt)
233 throws IOException {
234
235 Answer<ResultScanner> a = new Answer<ResultScanner>() {
236 int cnt = 0;
237
238 @Override
239 public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
240
241 if (cnt++ < failCnt) {
242
243 Scan scan = mock(Scan.class);
244 doReturn("bogus".getBytes()).when(scan).getStartRow();
245 ResultScanner scanner = mock(ResultScanner.class);
246
247 invocation.callRealMethod();
248 doThrow(
249 new UnknownScannerException("Injected simulated TimeoutException"))
250 .when(scanner).next();
251 return scanner;
252 }
253
254
255 return (ResultScanner) invocation.callRealMethod();
256 }
257 };
258
259 HTable htable = spy(createTable(name));
260 doAnswer(a).when(htable).getScanner((Scan) anyObject());
261 return htable;
262 }
263
264
265
266
267
268
269 @Test
270 public void testTableRecordReader() throws IOException {
271 HTable table = createTable("table1".getBytes());
272 runTestMapred(table);
273 }
274
275
276
277
278
279
280 @Test
281 public void testTableRecordReaderScannerFail() throws IOException {
282 HTable htable = createIOEScannerTable("table2".getBytes(), 1);
283 runTestMapred(htable);
284 }
285
286
287
288
289
290
291 @Test(expected = IOException.class)
292 public void testTableRecordReaderScannerFailTwice() throws IOException {
293 HTable htable = createIOEScannerTable("table3".getBytes(), 2);
294 runTestMapred(htable);
295 }
296
297
298
299
300
301
302
303 @Test
304 public void testTableRecordReaderScannerTimeout() throws IOException {
305 HTable htable = createDNRIOEScannerTable("table4".getBytes(), 1);
306 runTestMapred(htable);
307 }
308
309
310
311
312
313
314
315 @Test(expected = org.apache.hadoop.hbase.DoNotRetryIOException.class)
316 public void testTableRecordReaderScannerTimeoutTwice() throws IOException {
317 HTable htable = createDNRIOEScannerTable("table5".getBytes(), 2);
318 runTestMapred(htable);
319 }
320
321
322
323
324
325
326
327 @Test
328 public void testTableRecordReaderMapreduce() throws IOException,
329 InterruptedException {
330 HTable table = createTable("table1-mr".getBytes());
331 runTestMapreduce(table);
332 }
333
334
335
336
337
338
339
340 @Test
341 public void testTableRecordReaderScannerFailMapreduce() throws IOException,
342 InterruptedException {
343 HTable htable = createIOEScannerTable("table2-mr".getBytes(), 1);
344 runTestMapreduce(htable);
345 }
346
347
348
349
350
351
352
353 @Test(expected = IOException.class)
354 public void testTableRecordReaderScannerFailMapreduceTwice() throws IOException,
355 InterruptedException {
356 HTable htable = createIOEScannerTable("table3-mr".getBytes(), 2);
357 runTestMapreduce(htable);
358 }
359
360
361
362
363
364
365
366
367 @Test
368 public void testTableRecordReaderScannerTimeoutMapreduce()
369 throws IOException, InterruptedException {
370 HTable htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1);
371 runTestMapreduce(htable);
372 }
373
374
375
376
377
378
379
380
381 @Test(expected = org.apache.hadoop.hbase.DoNotRetryIOException.class)
382 public void testTableRecordReaderScannerTimeoutMapreduceTwice()
383 throws IOException, InterruptedException {
384 HTable htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2);
385 runTestMapreduce(htable);
386 }
387
388 }
389