1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25 import static org.mockito.Matchers.any;
26 import static org.mockito.Matchers.anyBoolean;
27 import static org.mockito.Matchers.anyInt;
28 import static org.mockito.Mockito.when;
29
30 import java.io.IOException;
31 import java.util.Iterator;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.CellScanner;
38 import org.apache.hadoop.hbase.KeyValue;
39 import org.apache.hadoop.hbase.KeyValue.Type;
40 import org.apache.hadoop.hbase.RegionLocations;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
43 import org.apache.hadoop.hbase.testclassification.SmallTests;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.junit.experimental.categories.Category;
48 import org.mockito.InOrder;
49 import org.mockito.Mockito;
50 import org.mockito.invocation.InvocationOnMock;
51 import org.mockito.stubbing.Answer;
52
53
54
55
56 @Category(SmallTests.class)
57 public class TestClientScanner {
58
59 Scan scan;
60 ExecutorService pool;
61 Configuration conf;
62
63 ClusterConnection clusterConn;
64 RpcRetryingCallerFactory rpcFactory;
65 RpcControllerFactory controllerFactory;
66
67 @Before
68 @SuppressWarnings("deprecation")
69 public void setup() throws IOException {
70 clusterConn = Mockito.mock(ClusterConnection.class);
71 rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
72 controllerFactory = Mockito.mock(RpcControllerFactory.class);
73 pool = Executors.newSingleThreadExecutor();
74 scan = new Scan();
75 conf = new Configuration();
76 Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
77 }
78
79 @After
80 public void teardown() {
81 if (null != pool) {
82 pool.shutdownNow();
83 }
84 }
85
86 private static class MockClientScanner extends ClientScanner {
87
88 private boolean rpcFinished = false;
89 private boolean rpcFinishedFired = false;
90
91 public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
92 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
93 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
94 throws IOException {
95 super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
96 primaryOperationTimeout);
97 }
98
99 @Override
100 protected boolean nextScanner(int nbRows, final boolean done) throws IOException {
101 if (!rpcFinished) {
102 return super.nextScanner(nbRows, done);
103 }
104
105
106 if (rpcFinishedFired) {
107 throw new RuntimeException("Expected nextScanner to only be called once after " +
108 " short-circuit was triggered.");
109 }
110 rpcFinishedFired = true;
111 return false;
112 }
113
114 @Override
115 protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
116 int nbRows) {
117 scan.setStartRow(localStartKey);
118 ScannerCallable s =
119 new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
120 this.rpcControllerFactory);
121 s.setCaching(nbRows);
122 ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(getTable(), getConnection(),
123 s, pool, primaryOperationTimeout, scan,
124 getRetries(), scannerTimeout, caching, conf, caller);
125 return sr;
126 }
127
128 public void setRpcFinished(boolean rpcFinished) {
129 this.rpcFinished = rpcFinished;
130 }
131 }
132
133 @Test
134 @SuppressWarnings("unchecked")
135 public void testNoResultsHint() throws IOException {
136 final Result[] results = new Result[1];
137 KeyValue kv1 = new KeyValue("row".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
138 Type.Maximum);
139 results[0] = Result.create(new Cell[] {kv1});
140
141 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
142
143 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
144 Mockito.when(caller.callWithoutRetries(Mockito.any(RetryingCallable.class),
145 Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
146 private int count = 0;
147 @Override
148 public Result[] answer(InvocationOnMock invocation) throws Throwable {
149 ScannerCallableWithReplicas callable = invocation.getArgumentAt(0,
150 ScannerCallableWithReplicas.class);
151 switch (count) {
152 case 0:
153 case 2:
154 count++;
155 return null;
156 case 1:
157 count++;
158 callable.setHasMoreResultsContext(false);
159 return results;
160 default:
161 throw new RuntimeException("Expected only 2 invocations");
162 }
163 }
164 });
165
166
167 scan.setCaching(100);
168 scan.setMaxResultSize(1000*1000);
169
170 try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
171 clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
172
173 scanner.setRpcFinished(true);
174
175 InOrder inOrder = Mockito.inOrder(caller);
176
177 scanner.loadCache();
178
179
180 inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
181 Mockito.any(RetryingCallable.class), Mockito.anyInt());
182
183 assertEquals(1, scanner.cache.size());
184 Result r = scanner.cache.poll();
185 assertNotNull(r);
186 CellScanner cs = r.cellScanner();
187 assertTrue(cs.advance());
188 assertEquals(kv1, cs.current());
189 assertFalse(cs.advance());
190 }
191 }
192
193 @Test
194 @SuppressWarnings("unchecked")
195 public void testSizeLimit() throws IOException {
196 final Result[] results = new Result[1];
197 KeyValue kv1 = new KeyValue("row".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
198 Type.Maximum);
199 results[0] = Result.create(new Cell[] {kv1});
200
201 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
202
203 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
204 Mockito.when(caller.callWithoutRetries(Mockito.any(RetryingCallable.class),
205 Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
206 private int count = 0;
207 @Override
208 public Result[] answer(InvocationOnMock invocation) throws Throwable {
209 ScannerCallableWithReplicas callable = invocation.getArgumentAt(0,
210 ScannerCallableWithReplicas.class);
211 switch (count) {
212 case 0:
213 case 2:
214 count++;
215 return null;
216 case 1:
217 count++;
218 callable.setHasMoreResultsContext(true);
219 callable.setServerHasMoreResults(false);
220 return results;
221 default:
222 throw new RuntimeException("Expected only 2 invocations");
223 }
224 }
225 });
226
227 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
228
229
230 scan.setCaching(100);
231
232 scan.setMaxResultSize(1);
233
234 try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
235 clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
236
237
238 Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class),
239 Mockito.anyInt());
240
241 InOrder inOrder = Mockito.inOrder(caller);
242
243 scanner.loadCache();
244
245 inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
246 Mockito.any(RetryingCallable.class), Mockito.anyInt());
247
248 assertEquals(1, scanner.cache.size());
249 Result r = scanner.cache.poll();
250 assertNotNull(r);
251 CellScanner cs = r.cellScanner();
252 assertTrue(cs.advance());
253 assertEquals(kv1, cs.current());
254 assertFalse(cs.advance());
255 }
256 }
257
258 @Test
259 @SuppressWarnings("unchecked")
260 public void testCacheLimit() throws IOException {
261 KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
262 Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
263 Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
264 Type.Maximum);
265 final Result[] results = new Result[] {Result.create(new Cell[] {kv1}),
266 Result.create(new Cell[] {kv2}), Result.create(new Cell[] {kv3})};
267
268 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
269
270 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
271 Mockito.when(caller.callWithoutRetries(Mockito.any(RetryingCallable.class),
272 Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
273 private int count = 0;
274 @Override
275 public Result[] answer(InvocationOnMock invocation) throws Throwable {
276 ScannerCallableWithReplicas callable = invocation.getArgumentAt(0,
277 ScannerCallableWithReplicas.class);
278 switch (count) {
279 case 0:
280 case 2:
281 count++;
282 return null;
283 case 1:
284 count++;
285 callable.setHasMoreResultsContext(true);
286 callable.setServerHasMoreResults(false);
287 return results;
288 default:
289 throw new RuntimeException("Expected only 2 invocations");
290 }
291 }
292 });
293
294 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
295
296
297 scan.setCaching(1);
298
299 scan.setMaxResultSize(1000*1000);
300
301 try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
302 clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
303
304
305 Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class),
306 Mockito.anyInt());
307
308 InOrder inOrder = Mockito.inOrder(caller);
309
310 scanner.loadCache();
311
312
313
314 inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
315 Mockito.any(RetryingCallable.class), Mockito.anyInt());
316
317 assertEquals(3, scanner.cache.size());
318 Result r = scanner.cache.poll();
319 assertNotNull(r);
320 CellScanner cs = r.cellScanner();
321 assertTrue(cs.advance());
322 assertEquals(kv1, cs.current());
323 assertFalse(cs.advance());
324
325 r = scanner.cache.poll();
326 assertNotNull(r);
327 cs = r.cellScanner();
328 assertTrue(cs.advance());
329 assertEquals(kv2, cs.current());
330 assertFalse(cs.advance());
331
332 r = scanner.cache.poll();
333 assertNotNull(r);
334 cs = r.cellScanner();
335 assertTrue(cs.advance());
336 assertEquals(kv3, cs.current());
337 assertFalse(cs.advance());
338 }
339 }
340
341 @Test
342 @SuppressWarnings("unchecked")
343 public void testNoMoreResults() throws IOException {
344 final Result[] results = new Result[1];
345 KeyValue kv1 = new KeyValue("row".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
346 Type.Maximum);
347 results[0] = Result.create(new Cell[] {kv1});
348
349 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
350
351 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
352 Mockito.when(caller.callWithoutRetries(Mockito.any(RetryingCallable.class),
353 Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
354 private int count = 0;
355 @Override
356 public Result[] answer(InvocationOnMock invocation) throws Throwable {
357 ScannerCallableWithReplicas callable = invocation.getArgumentAt(0,
358 ScannerCallableWithReplicas.class);
359 switch (count) {
360 case 0:
361 case 2:
362 count++;
363 return null;
364 case 1:
365 count++;
366 callable.setHasMoreResultsContext(true);
367 callable.setServerHasMoreResults(false);
368 return results;
369 default:
370 throw new RuntimeException("Expected only 2 invocations");
371 }
372 }
373 });
374
375 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
376
377
378 scan.setCaching(100);
379 scan.setMaxResultSize(1000*1000);
380
381 try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
382 clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
383
384
385 Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class),
386 Mockito.anyInt());
387
388 scanner.setRpcFinished(true);
389
390 InOrder inOrder = Mockito.inOrder(caller);
391
392 scanner.loadCache();
393
394 inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
395 Mockito.any(RetryingCallable.class), Mockito.anyInt());
396
397 assertEquals(1, scanner.cache.size());
398 Result r = scanner.cache.poll();
399 assertNotNull(r);
400 CellScanner cs = r.cellScanner();
401 assertTrue(cs.advance());
402 assertEquals(kv1, cs.current());
403 assertFalse(cs.advance());
404 }
405 }
406
407 @Test
408 @SuppressWarnings("unchecked")
409 public void testMoreResults() throws IOException {
410 final Result[] results1 = new Result[1];
411 KeyValue kv1 = new KeyValue("row".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
412 Type.Maximum);
413 results1[0] = Result.create(new Cell[] {kv1});
414
415 final Result[] results2 = new Result[1];
416 KeyValue kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
417 Type.Maximum);
418 results2[0] = Result.create(new Cell[] {kv2});
419
420
421 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
422
423 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
424 Mockito.when(caller.callWithoutRetries(Mockito.any(RetryingCallable.class),
425 Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
426 private int count = 0;
427 @Override
428 public Result[] answer(InvocationOnMock invocation) throws Throwable {
429 ScannerCallableWithReplicas callable = invocation.getArgumentAt(0,
430 ScannerCallableWithReplicas.class);
431 switch (count) {
432 case 0:
433 case 3:
434 count++;
435 return null;
436 case 1:
437 count++;
438 callable.setHasMoreResultsContext(true);
439 callable.setServerHasMoreResults(true);
440 return results1;
441 case 2:
442 count++;
443
444 callable.setHasMoreResultsContext(true);
445 callable.setServerHasMoreResults(false);
446 return results2;
447 default:
448 throw new RuntimeException("Expected only 2 invocations");
449 }
450 }
451 });
452
453
454 scan.setCaching(100);
455 scan.setMaxResultSize(1000*1000);
456
457 try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
458 clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
459
460
461 Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class),
462 Mockito.anyInt());
463
464 InOrder inOrder = Mockito.inOrder(caller);
465
466 scanner.loadCache();
467
468 inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
469 Mockito.any(RetryingCallable.class), Mockito.anyInt());
470
471 assertEquals(1, scanner.cache.size());
472 Result r = scanner.cache.poll();
473 assertNotNull(r);
474 CellScanner cs = r.cellScanner();
475 assertTrue(cs.advance());
476 assertEquals(kv1, cs.current());
477 assertFalse(cs.advance());
478
479 scanner.setRpcFinished(true);
480
481 inOrder = Mockito.inOrder(caller);
482
483 scanner.loadCache();
484
485 inOrder.verify(caller, Mockito.times(3)).callWithoutRetries(
486 Mockito.any(RetryingCallable.class), Mockito.anyInt());
487
488 r = scanner.cache.poll();
489 assertNotNull(r);
490 cs = r.cellScanner();
491 assertTrue(cs.advance());
492 assertEquals(kv2, cs.current());
493 assertFalse(cs.advance());
494 }
495 }
496
497
498
499
500
501 @Test (timeout = 30000)
502 public void testExceptionsFromReplicasArePropagated() throws IOException {
503 scan.setConsistency(Consistency.TIMELINE);
504
505
506
507 rpcFactory = new MockRpcRetryingCallerFactory(conf);
508 conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY,
509 MockRpcRetryingCallerFactory.class.getName());
510
511
512 when(clusterConn.locateRegion((TableName)any(), (byte[])any(), anyBoolean(),
513 anyBoolean(), anyInt())).thenReturn(new RegionLocations(null, null, null));
514
515 try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
516 clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
517 Iterator<Result> iter = scanner.iterator();
518 while (iter.hasNext()) {
519 iter.next();
520 }
521 fail("Should have failed with RetriesExhaustedException");
522 } catch (RetriesExhaustedException expected) {
523
524 }
525 }
526
527 public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory {
528
529 public MockRpcRetryingCallerFactory(Configuration conf) {
530 super(conf);
531 }
532
533 @Override
534 public <T> RpcRetryingCaller<T> newCaller() {
535 return new RpcRetryingCaller<T>(0, 0, 0) {
536 @Override
537 public void cancel() {
538 }
539 @Override
540 public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
541 throws IOException, RuntimeException {
542 throw new IOException("Scanner exception");
543 }
544
545 @Override
546 public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
547 throws IOException, RuntimeException {
548 try {
549 return callable.call(callTimeout);
550 } catch (IOException e) {
551 throw e;
552 } catch (Exception e) {
553 throw new RuntimeException(e);
554 }
555 }
556 };
557 }
558
559 }
560
561 }