1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import java.io.IOException;
21 import java.util.LinkedList;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.classification.InterfaceAudience;
26 import org.apache.hadoop.classification.InterfaceStability;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Cell;
29 import org.apache.hadoop.hbase.DoNotRetryIOException;
30 import org.apache.hadoop.hbase.HBaseConfiguration;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HRegionInfo;
33 import org.apache.hadoop.hbase.KeyValueUtil;
34 import org.apache.hadoop.hbase.NotServingRegionException;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.UnknownScannerException;
37 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
38 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
39 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
40 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
41 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
42 import org.apache.hadoop.hbase.util.Bytes;
43
44
45
46
47
48
49 @InterfaceAudience.Public
50 @InterfaceStability.Stable
51 public class ClientScanner extends AbstractClientScanner {
52 private final Log LOG = LogFactory.getLog(this.getClass());
53 protected Scan scan;
54 protected boolean closed = false;
55
56
57 protected HRegionInfo currentRegion = null;
58 protected ScannerCallable callable = null;
59 protected final LinkedList<Result> cache = new LinkedList<Result>();
60 protected final int caching;
61 protected long lastNext;
62
63 protected Result lastResult = null;
64 protected final long maxScannerResultSize;
65 private final HConnection connection;
66 private final TableName tableName;
67 protected final int scannerTimeout;
68 protected boolean scanMetricsPublished = false;
69 protected RpcRetryingCaller<Result []> caller;
70 protected RpcControllerFactory rpcControllerFactory;
71
72
73
74
75
76
77
78
79
80
81
82 public ClientScanner(final Configuration conf, final Scan scan,
83 final TableName tableName) throws IOException {
84 this(conf, scan, tableName, HConnectionManager.getConnection(conf));
85 }
86
87
88
89
90 @Deprecated
91 public ClientScanner(final Configuration conf, final Scan scan,
92 final byte [] tableName) throws IOException {
93 this(conf, scan, TableName.valueOf(tableName));
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107 public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
108 HConnection connection) throws IOException {
109 this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf),
110 RpcControllerFactory.instantiate(conf));
111 }
112
113
114
115
116 @Deprecated
117 public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName,
118 HConnection connection) throws IOException {
119 this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf),
120 RpcControllerFactory.instantiate(conf));
121 }
122
123
124
125
126
127
128
129 @Deprecated
130 public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
131 HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
132 this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
133 }
134
135
136
137
138
139
140
141
142
143
144 public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
145 HConnection connection, RpcRetryingCallerFactory rpcFactory,
146 RpcControllerFactory controllerFactory) throws IOException {
147 if (LOG.isTraceEnabled()) {
148 LOG.trace("Scan table=" + tableName
149 + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
150 }
151 this.scan = scan;
152 this.tableName = tableName;
153 this.lastNext = System.currentTimeMillis();
154 this.connection = connection;
155 if (scan.getMaxResultSize() > 0) {
156 this.maxScannerResultSize = scan.getMaxResultSize();
157 } else {
158 this.maxScannerResultSize = conf.getLong(
159 HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
160 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
161 }
162 this.scannerTimeout = HBaseConfiguration.getInt(conf,
163 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
164 HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
165 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
166
167
168 initScanMetrics(scan);
169
170
171 if (this.scan.getCaching() > 0) {
172 this.caching = this.scan.getCaching();
173 } else {
174 this.caching = conf.getInt(
175 HConstants.HBASE_CLIENT_SCANNER_CACHING,
176 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
177 }
178
179 this.caller = rpcFactory.<Result[]> newCaller();
180 this.rpcControllerFactory = controllerFactory;
181
182 initializeScannerInConstruction();
183 }
184
185 protected void initializeScannerInConstruction() throws IOException{
186
187 nextScanner(this.caching, false);
188 }
189
190 protected HConnection getConnection() {
191 return this.connection;
192 }
193
194
195
196
197
198 @Deprecated
199 protected byte [] getTableName() {
200 return this.tableName.getName();
201 }
202
203 protected TableName getTable() {
204 return this.tableName;
205 }
206
207 protected Scan getScan() {
208 return scan;
209 }
210
211 protected long getTimestamp() {
212 return lastNext;
213 }
214
215
216 protected boolean checkScanStopRow(final byte [] endKey) {
217 if (this.scan.getStopRow().length > 0) {
218
219 byte [] stopRow = scan.getStopRow();
220 int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
221 endKey, 0, endKey.length);
222 if (cmp <= 0) {
223
224
225 return true;
226 }
227 }
228 return false;
229 }
230
231
232
233
234
235
236
237
238
239
240 protected boolean nextScanner(int nbRows, final boolean done)
241 throws IOException {
242
243 if (this.callable != null) {
244 this.callable.setClose();
245 this.caller.callWithRetries(callable);
246 this.callable = null;
247 }
248
249
250 byte [] localStartKey;
251
252
253 if (this.currentRegion != null) {
254 byte [] endKey = this.currentRegion.getEndKey();
255 if (endKey == null ||
256 Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
257 checkScanStopRow(endKey) ||
258 done) {
259 close();
260 if (LOG.isTraceEnabled()) {
261 LOG.trace("Finished " + this.currentRegion);
262 }
263 return false;
264 }
265 localStartKey = endKey;
266 if (LOG.isTraceEnabled()) {
267 LOG.trace("Finished " + this.currentRegion);
268 }
269 } else {
270 localStartKey = this.scan.getStartRow();
271 }
272
273 if (LOG.isDebugEnabled() && this.currentRegion != null) {
274
275 LOG.debug("Advancing internal scanner to startKey at '" +
276 Bytes.toStringBinary(localStartKey) + "'");
277 }
278 try {
279 callable = getScannerCallable(localStartKey, nbRows);
280
281
282 this.caller.callWithRetries(callable);
283 this.currentRegion = callable.getHRegionInfo();
284 if (this.scanMetrics != null) {
285 this.scanMetrics.countOfRegions.incrementAndGet();
286 }
287 } catch (IOException e) {
288 close();
289 throw e;
290 }
291 return true;
292 }
293
294 @InterfaceAudience.Private
295 protected ScannerCallable getScannerCallable(byte [] localStartKey,
296 int nbRows) {
297 scan.setStartRow(localStartKey);
298 ScannerCallable s = new ScannerCallable(getConnection(),
299 getTable(), scan, this.scanMetrics, rpcControllerFactory.newController());
300 s.setCaching(nbRows);
301 return s;
302 }
303
304
305
306
307
308
309
310
311
312
313
314
315 protected void writeScanMetrics() {
316 if (this.scanMetrics == null || scanMetricsPublished) {
317 return;
318 }
319 MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
320 scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
321 scanMetricsPublished = true;
322 }
323
324 @Override
325 public Result next() throws IOException {
326
327 if (cache.size() == 0 && this.closed) {
328 return null;
329 }
330 if (cache.size() == 0) {
331 Result [] values = null;
332 long remainingResultSize = maxScannerResultSize;
333 int countdown = this.caching;
334
335
336 callable.setCaching(this.caching);
337
338
339 boolean skipFirst = false;
340 boolean retryAfterOutOfOrderException = true;
341 do {
342 try {
343 if (skipFirst) {
344
345
346 callable.setCaching(1);
347 values = this.caller.callWithRetries(callable);
348 callable.setCaching(this.caching);
349 skipFirst = false;
350 }
351
352
353
354 values = this.caller.callWithRetries(callable);
355 if (skipFirst && values != null && values.length == 1) {
356 skipFirst = false;
357 values = this.caller.callWithRetries(callable);
358 }
359 retryAfterOutOfOrderException = true;
360 } catch (DoNotRetryIOException e) {
361
362
363 if (e instanceof UnknownScannerException) {
364 long timeout = lastNext + scannerTimeout;
365
366
367
368 if (timeout < System.currentTimeMillis()) {
369 long elapsed = System.currentTimeMillis() - lastNext;
370 ScannerTimeoutException ex = new ScannerTimeoutException(
371 elapsed + "ms passed since the last invocation, " +
372 "timeout is currently set to " + scannerTimeout);
373 ex.initCause(e);
374 throw ex;
375 }
376 } else {
377
378
379 Throwable cause = e.getCause();
380 if ((cause != null && cause instanceof NotServingRegionException) ||
381 (cause != null && cause instanceof RegionServerStoppedException) ||
382 e instanceof OutOfOrderScannerNextException) {
383
384
385
386 } else {
387 throw e;
388 }
389 }
390
391 if (this.lastResult != null) {
392 this.scan.setStartRow(this.lastResult.getRow());
393
394
395 skipFirst = true;
396 }
397 if (e instanceof OutOfOrderScannerNextException) {
398 if (retryAfterOutOfOrderException) {
399 retryAfterOutOfOrderException = false;
400 } else {
401
402 throw new DoNotRetryIOException("Failed after retry of " +
403 "OutOfOrderScannerNextException: was there a rpc timeout?", e);
404 }
405 }
406
407 this.currentRegion = null;
408
409
410 callable = null;
411
412 continue;
413 }
414 long currentTime = System.currentTimeMillis();
415 if (this.scanMetrics != null ) {
416 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext);
417 }
418 lastNext = currentTime;
419 if (values != null && values.length > 0) {
420 for (Result rs : values) {
421 cache.add(rs);
422 for (Cell kv : rs.rawCells()) {
423
424 remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize();
425 }
426 countdown--;
427 this.lastResult = rs;
428 }
429 }
430
431 } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
432 }
433
434 if (cache.size() > 0) {
435 return cache.poll();
436 }
437
438
439 writeScanMetrics();
440 return null;
441 }
442
443 @Override
444 public void close() {
445 if (!scanMetricsPublished) writeScanMetrics();
446 if (callable != null) {
447 callable.setClose();
448 try {
449 this.caller.callWithRetries(callable);
450 } catch (UnknownScannerException e) {
451
452
453
454 } catch (IOException e) {
455
456 LOG.warn("scanner failed to close. Exception follows: " + e);
457 }
458 callable = null;
459 }
460 closed = true;
461 }
462 }