1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.concurrent.ExecutorService;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.KeyValue.MetaComparator;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.CellComparator;
35 import org.apache.hadoop.hbase.CellUtil;
36 import org.apache.hadoop.hbase.DoNotRetryIOException;
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HRegionInfo;
40 import org.apache.hadoop.hbase.NotServingRegionException;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.UnknownScannerException;
43 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
44 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
45 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
46 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
47 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
48 import org.apache.hadoop.hbase.util.Bytes;
49
50 import com.google.common.annotations.VisibleForTesting;
51
52
53
54
55
56
57 @InterfaceAudience.Private
58 public class ClientScanner extends AbstractClientScanner {
59 private static final Log LOG = LogFactory.getLog(ClientScanner.class);
60
61
62 static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
63 protected Scan scan;
64 protected boolean closed = false;
65
66
67 protected HRegionInfo currentRegion = null;
68 protected ScannerCallableWithReplicas callable = null;
69 protected final LinkedList<Result> cache = new LinkedList<Result>();
70
71
72
73
74
75 protected final LinkedList<Result> partialResults = new LinkedList<Result>();
76
77
78
79
80
81 protected byte[] partialResultsRow = null;
82
83
84
85 protected Cell lastCellLoadedToCache = null;
86 protected final int caching;
87 protected long lastNext;
88
89 protected Result lastResult = null;
90 protected final long maxScannerResultSize;
91 private final ClusterConnection connection;
92 private final TableName tableName;
93 protected final int scannerTimeout;
94 protected boolean scanMetricsPublished = false;
95 protected RpcRetryingCaller<Result []> caller;
96 protected RpcControllerFactory rpcControllerFactory;
97 protected Configuration conf;
98
99
100
101
102
103 protected final int primaryOperationTimeout;
104 private int retries;
105 protected final ExecutorService pool;
106 private static MetaComparator metaComparator = new MetaComparator();
107
108
109
110
111
112
113
114
115
116
117 public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
118 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
119 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
120 throws IOException {
121 if (LOG.isTraceEnabled()) {
122 LOG.trace("Scan table=" + tableName
123 + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
124 }
125 this.scan = scan;
126 this.tableName = tableName;
127 this.lastNext = System.currentTimeMillis();
128 this.connection = connection;
129 this.pool = pool;
130 this.primaryOperationTimeout = primaryOperationTimeout;
131 this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
132 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
133 if (scan.getMaxResultSize() > 0) {
134 this.maxScannerResultSize = scan.getMaxResultSize();
135 } else {
136 this.maxScannerResultSize = conf.getLong(
137 HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
138 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
139 }
140 this.scannerTimeout = HBaseConfiguration.getInt(conf,
141 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
142 HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
143 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
144
145
146 initScanMetrics(scan);
147
148
149 if (this.scan.getCaching() > 0) {
150 this.caching = this.scan.getCaching();
151 } else {
152 this.caching = conf.getInt(
153 HConstants.HBASE_CLIENT_SCANNER_CACHING,
154 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
155 }
156
157 this.caller = rpcFactory.<Result[]> newCaller();
158 this.rpcControllerFactory = controllerFactory;
159
160 this.conf = conf;
161 initializeScannerInConstruction();
162 }
163
164 protected void initializeScannerInConstruction() throws IOException{
165
166 nextScanner(this.caching, false);
167 }
168
169 protected ClusterConnection getConnection() {
170 return this.connection;
171 }
172
173
174
175
176
177
178
179 @Deprecated
180 protected byte [] getTableName() {
181 return this.tableName.getName();
182 }
183
184 protected TableName getTable() {
185 return this.tableName;
186 }
187
188 protected int getRetries() {
189 return this.retries;
190 }
191
192 protected int getScannerTimeout() {
193 return this.scannerTimeout;
194 }
195
196 protected Configuration getConf() {
197 return this.conf;
198 }
199
200 protected Scan getScan() {
201 return scan;
202 }
203
204 protected ExecutorService getPool() {
205 return pool;
206 }
207
208 protected int getPrimaryOperationTimeout() {
209 return primaryOperationTimeout;
210 }
211
212 protected int getCaching() {
213 return caching;
214 }
215
216 protected long getTimestamp() {
217 return lastNext;
218 }
219
220 @VisibleForTesting
221 protected long getMaxResultSize() {
222 return maxScannerResultSize;
223 }
224
225
226 protected boolean checkScanStopRow(final byte [] endKey) {
227 if (this.scan.getStopRow().length > 0) {
228
229 byte [] stopRow = scan.getStopRow();
230 int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
231 endKey, 0, endKey.length);
232 if (cmp <= 0) {
233
234
235 return true;
236 }
237 }
238 return false;
239 }
240
241 private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
242
243
244
245
246 if (callable != null && callable.switchedToADifferentReplica()) return true;
247 return nextScanner(nbRows, done);
248 }
249
250
251
252
253
254
255
256
257
258
259 protected boolean nextScanner(int nbRows, final boolean done)
260 throws IOException {
261
262 if (this.callable != null) {
263 this.callable.setClose();
264 call(callable, caller, scannerTimeout);
265 this.callable = null;
266 }
267
268
269 byte [] localStartKey;
270
271
272 if (this.currentRegion != null) {
273 byte [] endKey = this.currentRegion.getEndKey();
274 if (endKey == null ||
275 Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
276 checkScanStopRow(endKey) ||
277 done) {
278 close();
279 if (LOG.isTraceEnabled()) {
280 LOG.trace("Finished " + this.currentRegion);
281 }
282 return false;
283 }
284 localStartKey = endKey;
285 if (LOG.isTraceEnabled()) {
286 LOG.trace("Finished " + this.currentRegion);
287 }
288 } else {
289 localStartKey = this.scan.getStartRow();
290 }
291
292 if (LOG.isDebugEnabled() && this.currentRegion != null) {
293
294 LOG.debug("Advancing internal scanner to startKey at '" +
295 Bytes.toStringBinary(localStartKey) + "'");
296 }
297 try {
298 callable = getScannerCallable(localStartKey, nbRows);
299
300
301 call(callable, caller, scannerTimeout);
302 this.currentRegion = callable.getHRegionInfo();
303 if (this.scanMetrics != null) {
304 this.scanMetrics.countOfRegions.incrementAndGet();
305 }
306 } catch (IOException e) {
307 close();
308 throw e;
309 }
310 return true;
311 }
312
313 @VisibleForTesting
314 boolean isAnyRPCcancelled() {
315 return callable.isAnyRPCcancelled();
316 }
317
318 Result[] call(ScannerCallableWithReplicas callable,
319 RpcRetryingCaller<Result[]> caller, int scannerTimeout)
320 throws IOException, RuntimeException {
321 if (Thread.interrupted()) {
322 throw new InterruptedIOException();
323 }
324
325
326 return caller.callWithoutRetries(callable, scannerTimeout);
327 }
328
329 @InterfaceAudience.Private
330 protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
331 int nbRows) {
332 scan.setStartRow(localStartKey);
333 ScannerCallable s =
334 new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
335 this.rpcControllerFactory);
336 s.setCaching(nbRows);
337 ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(),
338 s, pool, primaryOperationTimeout, scan,
339 retries, scannerTimeout, caching, conf, caller);
340 return sr;
341 }
342
343
344
345
346
347
348
349
350
351
352
353
354 protected void writeScanMetrics() {
355 if (this.scanMetrics == null || scanMetricsPublished) {
356 return;
357 }
358 MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
359 scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
360 scanMetricsPublished = true;
361 }
362
363 @Override
364 public Result next() throws IOException {
365
366 if (cache.size() == 0 && this.closed) {
367 return null;
368 }
369 if (cache.size() == 0) {
370 loadCache();
371 }
372
373 if (cache.size() > 0) {
374 return cache.poll();
375 }
376
377
378 writeScanMetrics();
379 return null;
380 }
381
382 @VisibleForTesting
383 public int getCacheSize() {
384 return cache != null ? cache.size() : 0;
385 }
386
387
388
389
390 protected void loadCache() throws IOException {
391 Result[] values = null;
392 long remainingResultSize = maxScannerResultSize;
393 int countdown = this.caching;
394
395 callable.setCaching(this.caching);
396
397
398 boolean retryAfterOutOfOrderException = true;
399
400
401 boolean serverHasMoreResults = false;
402 boolean allResultsSkipped = false;
403 do {
404 allResultsSkipped = false;
405 try {
406
407
408
409 values = call(callable, caller, scannerTimeout);
410
411
412
413
414
415 if (values == null && callable.switchedToADifferentReplica()) {
416
417
418 clearPartialResults();
419 this.currentRegion = callable.getHRegionInfo();
420 continue;
421 }
422 retryAfterOutOfOrderException = true;
423 } catch (DoNotRetryIOException | NeedUnmanagedConnectionException e) {
424
425
426 clearPartialResults();
427
428
429 if (e instanceof UnknownScannerException) {
430 long timeout = lastNext + scannerTimeout;
431
432
433
434 if (timeout < System.currentTimeMillis()) {
435 LOG.info("For hints related to the following exception, please try taking a look at: "
436 + "https://hbase.apache.org/book.html#trouble.client.scantimeout");
437 long elapsed = System.currentTimeMillis() - lastNext;
438 ScannerTimeoutException ex =
439 new ScannerTimeoutException(elapsed + "ms passed since the last invocation, "
440 + "timeout is currently set to " + scannerTimeout);
441 ex.initCause(e);
442 throw ex;
443 }
444 } else {
445
446
447 Throwable cause = e.getCause();
448 if ((cause != null && cause instanceof NotServingRegionException) ||
449 (cause != null && cause instanceof RegionServerStoppedException) ||
450 e instanceof OutOfOrderScannerNextException) {
451
452
453 } else {
454 throw e;
455 }
456 }
457
458 if (this.lastResult != null) {
459
460
461
462
463 if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) {
464 if (scan.isReversed()) {
465 scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
466 } else {
467 scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
468 }
469 } else {
470
471 scan.setStartRow(lastResult.getRow());
472 }
473 }
474 if (e instanceof OutOfOrderScannerNextException) {
475 if (retryAfterOutOfOrderException) {
476 retryAfterOutOfOrderException = false;
477 } else {
478
479 throw new DoNotRetryIOException("Failed after retry of " +
480 "OutOfOrderScannerNextException: was there a rpc timeout?", e);
481 }
482 }
483
484 this.currentRegion = null;
485
486
487 callable = null;
488
489 continue;
490 }
491 long currentTime = System.currentTimeMillis();
492 if (this.scanMetrics != null) {
493 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
494 }
495 lastNext = currentTime;
496
497
498
499 List<Result> resultsToAddToCache =
500 getResultsToAddToCache(values, callable.isHeartbeatMessage());
501 if (!resultsToAddToCache.isEmpty()) {
502 for (Result rs : resultsToAddToCache) {
503 rs = filterLoadedCell(rs);
504 if (rs == null) {
505 continue;
506 }
507 cache.add(rs);
508 for (Cell cell : rs.rawCells()) {
509 remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
510 }
511 countdown--;
512 this.lastResult = rs;
513 if (this.lastResult.isPartial() || scan.getBatch() > 0 ) {
514 updateLastCellLoadedToCache(this.lastResult);
515 } else {
516 this.lastCellLoadedToCache = null;
517 }
518 }
519 if (cache.isEmpty()) {
520
521 allResultsSkipped = true;
522 continue;
523 }
524 }
525 if (callable.isHeartbeatMessage()) {
526 if (cache.size() > 0) {
527
528
529
530
531 if (LOG.isTraceEnabled()) {
532 LOG.trace("Heartbeat message received and cache contains Results."
533 + " Breaking out of scan loop");
534 }
535 break;
536 }
537 continue;
538 }
539
540
541
542
543
544 if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
545
546
547 serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty();
548 }
549
550
551
552
553 } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage())
554 || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
555 && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))));
556 }
557
558
559
560
561
562
563
564
565 private boolean doneWithRegion(long remainingResultSize, int remainingRows,
566 boolean regionHasMoreResults) {
567 return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
568 }
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583 protected List<Result>
584 getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage)
585 throws IOException {
586 int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
587 List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
588
589 final boolean isBatchSet = scan != null && scan.getBatch() > 0;
590 final boolean allowPartials = scan != null && scan.getAllowPartialResults();
591
592
593
594
595
596
597
598 if (allowPartials || isBatchSet) {
599 addResultsToList(resultsToAddToCache, resultsFromServer, 0,
600 (null == resultsFromServer ? 0 : resultsFromServer.length));
601 return resultsToAddToCache;
602 }
603
604
605
606
607 if (resultsFromServer == null || resultsFromServer.length == 0) {
608
609
610
611 if (!partialResults.isEmpty() && !heartbeatMessage) {
612 resultsToAddToCache.add(Result.createCompleteResult(partialResults));
613 clearPartialResults();
614 }
615
616 return resultsToAddToCache;
617 }
618
619
620
621 Result last = resultsFromServer[resultsFromServer.length - 1];
622 Result partial = last.isPartial() ? last : null;
623
624 if (LOG.isTraceEnabled()) {
625 StringBuilder sb = new StringBuilder();
626 sb.append("number results from RPC: ").append(resultsFromServer.length).append(",");
627 sb.append("partial != null: ").append(partial != null).append(",");
628 sb.append("number of partials so far: ").append(partialResults.size());
629 LOG.trace(sb.toString());
630 }
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652 if (partial != null && partialResults.isEmpty()) {
653 addToPartialResults(partial);
654
655
656 addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1);
657 } else if (!partialResults.isEmpty()) {
658 for (int i = 0; i < resultsFromServer.length; i++) {
659 Result result = resultsFromServer[i];
660
661
662
663 if (Bytes.equals(partialResultsRow, result.getRow())) {
664 addToPartialResults(result);
665
666
667
668 if (!result.isPartial()) {
669 resultsToAddToCache.add(Result.createCompleteResult(partialResults));
670 clearPartialResults();
671 }
672 } else {
673
674
675
676 if (!partialResults.isEmpty()) {
677 resultsToAddToCache.add(Result.createCompleteResult(partialResults));
678 clearPartialResults();
679 }
680
681
682
683
684 if (result.isPartial()) {
685 addToPartialResults(result);
686 } else {
687 resultsToAddToCache.add(result);
688 }
689 }
690 }
691 } else {
692 addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length);
693 }
694
695 return resultsToAddToCache;
696 }
697
698
699
700
701
702
703
704 private void addToPartialResults(final Result result) throws IOException {
705 final byte[] row = result.getRow();
706 if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) {
707 throw new IOException("Partial result row does not match. All partial results must come "
708 + "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: "
709 + Bytes.toString(row));
710 }
711 partialResultsRow = row;
712 partialResults.add(result);
713 }
714
715
716
717
718 private void clearPartialResults() {
719 partialResults.clear();
720 partialResultsRow = null;
721 }
722
723
724
725
726
727
728
729
730 private void addResultsToList(List<Result> outputList, Result[] inputArray, int start, int end) {
731 if (inputArray == null || start < 0 || end > inputArray.length) return;
732
733 for (int i = start; i < end; i++) {
734 outputList.add(inputArray[i]);
735 }
736 }
737
738 @Override
739 public void close() {
740 if (!scanMetricsPublished) writeScanMetrics();
741 if (callable != null) {
742 callable.setClose();
743 try {
744 call(callable, caller, scannerTimeout);
745 } catch (UnknownScannerException e) {
746
747
748
749 } catch (IOException e) {
750
751 LOG.warn("scanner failed to close. Exception follows: " + e);
752 }
753 callable = null;
754 }
755 closed = true;
756 }
757
758
759
760
761
762
763 protected static byte[] createClosestRowBefore(byte[] row) {
764 if (row == null) {
765 throw new IllegalArgumentException("The passed row is empty");
766 }
767 if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
768 return MAX_BYTE_ARRAY;
769 }
770 if (row[row.length - 1] == 0) {
771 return Arrays.copyOf(row, row.length - 1);
772 } else {
773 byte[] closestFrontRow = Arrays.copyOf(row, row.length);
774 closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
775 closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
776 return closestFrontRow;
777 }
778 }
779
780 @Override
781 public boolean renewLease() {
782 if (callable != null) {
783
784 callable.setRenew(true);
785 try {
786 this.caller.callWithoutRetries(callable, this.scannerTimeout);
787 } catch (Exception e) {
788 return false;
789 } finally {
790 callable.setRenew(false);
791 }
792 return true;
793 }
794 return false;
795 }
796
797 protected void updateLastCellLoadedToCache(Result result) {
798 if (result.rawCells().length == 0) {
799 return;
800 }
801 this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1];
802 }
803
804
805
806
807
808 private int compare(Cell a, Cell b) {
809 int r = 0;
810 if (currentRegion != null && currentRegion.isMetaRegion()) {
811 r = metaComparator.compareRows(a, b);
812 } else {
813 r = CellComparator.compareRows(a, b);
814 }
815 if (r != 0) {
816 return this.scan.isReversed() ? -r : r;
817 }
818 return CellComparator.compareWithoutRow(a, b);
819 }
820
821 private Result filterLoadedCell(Result result) {
822
823
824
825
826 if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
827 return result;
828 }
829 if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
830
831
832 return result;
833 }
834 if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) {
835
836 return null;
837 }
838
839
840 int index = 1;
841 while (index < result.rawCells().length) {
842 if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
843 break;
844 }
845 index++;
846 }
847 Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
848 return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
849 }
850 }