1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.nio.charset.StandardCharsets;
25 import java.util.List;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HRegionInfo;
33 import org.apache.hadoop.hbase.KeyValue;
34 import org.apache.hadoop.hbase.KeyValue.Type;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
37 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
38 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
39 import org.apache.hadoop.hbase.testclassification.SmallTests;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.junit.experimental.categories.Category;
44 import org.mockito.Mockito;
45 import org.mockito.invocation.InvocationOnMock;
46 import org.mockito.stubbing.Answer;
47
48
49
50
51 @Category(SmallTests.class)
52 public class TestClientSmallScanner {
53
54 Scan scan;
55 ExecutorService pool;
56 Configuration conf;
57
58 ClusterConnection clusterConn;
59 RpcRetryingCallerFactory rpcFactory;
60 RpcControllerFactory controllerFactory;
61 RpcRetryingCaller<Result[]> caller;
62
63 @Before
64 @SuppressWarnings({"deprecation", "unchecked"})
65 public void setup() throws IOException {
66 clusterConn = Mockito.mock(ClusterConnection.class);
67 rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
68 controllerFactory = Mockito.mock(RpcControllerFactory.class);
69 pool = Executors.newSingleThreadExecutor();
70 scan = new Scan();
71 conf = new Configuration();
72 Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
73
74 caller = Mockito.mock(RpcRetryingCaller.class);
75
76 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
77 }
78
79 @After
80 public void teardown() {
81 if (null != pool) {
82 pool.shutdownNow();
83 }
84 }
85
86
87
88
89 private Answer<Boolean> createTrueThenFalseAnswer() {
90 return new Answer<Boolean>() {
91 boolean first = true;
92
93 @Override
94 public Boolean answer(InvocationOnMock invocation) {
95 if (first) {
96 first = false;
97 return true;
98 }
99 return false;
100 }
101 };
102 }
103
104 private SmallScannerCallableFactory getFactory(
105 final ScannerCallableWithReplicas callableWithReplicas) {
106 return new SmallScannerCallableFactory() {
107 @Override
108 public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
109 Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
110 RpcControllerFactory controllerFactory, ExecutorService pool,
111 int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
112 RpcRetryingCaller<Result[]> caller) {
113 return callableWithReplicas;
114 }
115 };
116 }
117
118 @Test
119 public void testContextPresent() throws Exception {
120 final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
121 Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
122 Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
123 Type.Maximum);
124
125 ScannerCallableWithReplicas callableWithReplicas = Mockito
126 .mock(ScannerCallableWithReplicas.class);
127
128
129 @SuppressWarnings("unchecked")
130 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
131
132 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
133
134 SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
135
136
137
138
139 try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
140 clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
141
142 css.setScannerCallableFactory(factory);
143
144
145 Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout()))
146 .thenAnswer(new Answer<Result[]>() {
147 int count = 0;
148
149 @Override
150 public Result[] answer(InvocationOnMock invocation) {
151 Result[] results;
152 if (0 == count) {
153 results = new Result[] {Result.create(new Cell[] {kv1}),
154 Result.create(new Cell[] {kv2})};
155 } else if (1 == count) {
156 results = new Result[] {Result.create(new Cell[] {kv3})};
157 } else {
158 results = new Result[0];
159 }
160 count++;
161 return results;
162 }
163 });
164
165
166 Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
167
168 Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer(
169 createTrueThenFalseAnswer());
170
171
172 HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
173 Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
174
175 Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
176
177 css.loadCache();
178
179 List<Result> results = css.cache;
180 assertEquals(3, results.size());
181 for (int i = 1; i <= 3; i++) {
182 Result result = results.get(i - 1);
183 byte[] row = result.getRow();
184 assertEquals("row" + i, new String(row, StandardCharsets.UTF_8));
185 assertEquals(1, result.getMap().size());
186 }
187
188 assertTrue(css.closed);
189 }
190 }
191
192 @Test
193 public void testNoContextFewerRecords() throws Exception {
194 final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
195 Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
196 Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
197 Type.Maximum);
198
199 ScannerCallableWithReplicas callableWithReplicas = Mockito
200 .mock(ScannerCallableWithReplicas.class);
201
202
203 scan.setCaching(2);
204 SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
205
206 try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
207 clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
208
209 css.setScannerCallableFactory(factory);
210
211 Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout()))
212 .thenAnswer(new Answer<Result[]>() {
213 int count = 0;
214
215 @Override
216 public Result[] answer(InvocationOnMock invocation) {
217 Result[] results;
218 if (0 == count) {
219 results = new Result[] {Result.create(new Cell[] {kv1}),
220 Result.create(new Cell[] {kv2})};
221 } else if (1 == count) {
222
223 results = new Result[] {Result.create(new Cell[] {kv3})};
224 } else {
225 throw new RuntimeException("Should not fetch a third batch from the server");
226 }
227 count++;
228 return results;
229 }
230 });
231
232
233 Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
234
235 Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow(
236 new RuntimeException("Should not be called"));
237
238
239 HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
240 Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
241
242 Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
243
244 css.loadCache();
245
246 List<Result> results = css.cache;
247 assertEquals(2, results.size());
248 for (int i = 1; i <= 2; i++) {
249 Result result = results.get(i - 1);
250 byte[] row = result.getRow();
251 assertEquals("row" + i, new String(row, StandardCharsets.UTF_8));
252 assertEquals(1, result.getMap().size());
253 }
254
255
256 results.clear();
257
258 css.loadCache();
259
260 assertEquals(1, results.size());
261 Result result = results.get(0);
262 assertEquals("row3", new String(result.getRow(), StandardCharsets.UTF_8));
263 assertEquals(1, result.getMap().size());
264 assertTrue(css.closed);
265 }
266 }
267
268 @Test
269 public void testNoContextNoRecords() throws Exception {
270 ScannerCallableWithReplicas callableWithReplicas = Mockito
271 .mock(ScannerCallableWithReplicas.class);
272
273
274 scan.setCaching(2);
275
276 SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
277
278 try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
279 clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
280
281 css.setScannerCallableFactory(factory);
282
283
284 Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout()))
285 .thenReturn(new Result[0]);
286
287
288 Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
289
290 Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow(
291 new RuntimeException("Should not be called"));
292
293
294 HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
295 Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
296
297 Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
298
299 css.loadCache();
300
301 assertEquals(0, css.cache.size());
302 assertTrue(css.closed);
303 }
304 }
305
306 @Test
307 public void testContextNoRecords() throws Exception {
308 ScannerCallableWithReplicas callableWithReplicas = Mockito
309 .mock(ScannerCallableWithReplicas.class);
310
311 SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
312
313 try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
314 clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
315
316 css.setScannerCallableFactory(factory);
317
318
319 Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout()))
320 .thenReturn(new Result[0]);
321
322
323 Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
324
325 Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenReturn(false);
326
327
328 HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
329 Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
330
331 Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
332
333 css.loadCache();
334
335 assertEquals(0, css.cache.size());
336 assertTrue(css.closed);
337 }
338 }
339 }