1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.NavigableSet;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.locks.ReentrantLock;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.DoNotRetryIOException;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.KeyValue;
37 import org.apache.hadoop.hbase.KeyValue.KVComparator;
38 import org.apache.hadoop.hbase.KeyValueUtil;
39 import org.apache.hadoop.hbase.client.IsolationLevel;
40 import org.apache.hadoop.hbase.client.Scan;
41 import org.apache.hadoop.hbase.executor.ExecutorService;
42 import org.apache.hadoop.hbase.filter.Filter;
43 import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
46
47
48
49
50
51 @InterfaceAudience.Private
52 public class StoreScanner extends NonReversedNonLazyKeyValueScanner
53 implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
54 static final Log LOG = LogFactory.getLog(StoreScanner.class);
55 protected Store store;
56 protected ScanQueryMatcher matcher;
57 protected KeyValueHeap heap;
58 protected boolean cacheBlocks;
59
60 protected int countPerRow = 0;
61 protected int storeLimit = -1;
62 protected int storeOffset = 0;
63
64
65
66 protected boolean closing = false;
67 protected final boolean isGet;
68 protected final boolean explicitColumnQuery;
69 protected final boolean useRowColBloom;
70
71
72
73 protected boolean isParallelSeekEnabled = false;
74 protected ExecutorService executor;
75 protected final Scan scan;
76 protected final NavigableSet<byte[]> columns;
77 protected final long oldestUnexpiredTS;
78 protected final int minVersions;
79
80
81
82
83
84 private long kvsScanned = 0;
85 private KeyValue prevKV = null;
86
87
88 static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
89 public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
90 "hbase.storescanner.parallel.seek.enable";
91
92
93 protected static boolean lazySeekEnabledGlobally =
94 LAZY_SEEK_ENABLED_BY_DEFAULT;
95
96
97 protected KeyValue lastTop = null;
98
99
100 private boolean scanUsePread = false;
101 protected ReentrantLock lock = new ReentrantLock();
102
103 private final long readPt;
104
105
106 enum StoreScannerCompactionRace {
107 BEFORE_SEEK,
108 AFTER_SEEK,
109 COMPACT_COMPLETE
110 }
111
112
113 protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
114 final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
115 this.readPt = readPt;
116 this.store = store;
117 this.cacheBlocks = cacheBlocks;
118 isGet = scan.isGetScan();
119 int numCol = columns == null ? 0 : columns.size();
120 explicitColumnQuery = numCol > 0;
121 this.scan = scan;
122 this.columns = columns;
123 oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl;
124 this.minVersions = minVersions;
125
126
127
128
129
130 useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
131 this.scanUsePread = scan.isSmall();
132
133
134
135 if (store != null && ((HStore)store).getHRegion() != null
136 && store.getStorefilesCount() > 1) {
137 RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
138 if (rsService == null || !rsService.getConfiguration().getBoolean(
139 STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
140 isParallelSeekEnabled = true;
141 executor = rsService.getExecutorService();
142 }
143 }
144
145
146
147
148
149
150
151
152
153
154 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
155 long readPt)
156 throws IOException {
157 this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
158 scanInfo.getMinVersions(), readPt);
159 if (columns != null && scan.isRaw()) {
160 throw new DoNotRetryIOException(
161 "Cannot specify any column for a raw scan");
162 }
163 matcher = new ScanQueryMatcher(scan, scanInfo, columns,
164 ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
165 oldestUnexpiredTS, store.getCoprocessorHost());
166
167 this.store.addChangedReaderObserver(this);
168
169
170 List<KeyValueScanner> scanners = getScannersNoCompaction();
171
172
173
174
175
176 seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
177 && lazySeekEnabledGlobally, isParallelSeekEnabled);
178
179
180 this.storeLimit = scan.getMaxResultsPerColumnFamily();
181
182
183 this.storeOffset = scan.getRowOffsetPerColumnFamily();
184
185
186 resetKVHeap(scanners, store.getComparator());
187 }
188
189
190
191
192
193
194
195
196
197
198
199 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
200 List<? extends KeyValueScanner> scanners, ScanType scanType,
201 long smallestReadPoint, long earliestPutTs) throws IOException {
202 this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
203 }
204
205
206
207
208
209
210
211
212
213
214
215
216 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
217 List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
218 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
219 this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
220 earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
221 }
222
223 private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
224 List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
225 long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
226 this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
227 ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
228 if (dropDeletesFromRow == null) {
229 matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
230 earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost());
231 } else {
232 matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
233 oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
234 }
235
236
237 scanners = selectScannersFrom(scanners);
238
239
240 seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
241
242
243 resetKVHeap(scanners, store.getComparator());
244 }
245
246
247 StoreScanner(final Scan scan, ScanInfo scanInfo,
248 ScanType scanType, final NavigableSet<byte[]> columns,
249 final List<KeyValueScanner> scanners) throws IOException {
250 this(scan, scanInfo, scanType, columns, scanners,
251 HConstants.LATEST_TIMESTAMP,
252
253 0);
254 }
255
256
257 StoreScanner(final Scan scan, ScanInfo scanInfo,
258 ScanType scanType, final NavigableSet<byte[]> columns,
259 final List<KeyValueScanner> scanners, long earliestPutTs)
260 throws IOException {
261 this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
262
263 0);
264 }
265
266 private StoreScanner(final Scan scan, ScanInfo scanInfo,
267 ScanType scanType, final NavigableSet<byte[]> columns,
268 final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
269 throws IOException {
270 this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
271 scanInfo.getMinVersions(), readPt);
272 this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
273 Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null);
274
275
276 if (this.store != null) {
277 this.store.addChangedReaderObserver(this);
278 }
279
280 seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
281 resetKVHeap(scanners, scanInfo.getComparator());
282 }
283
284
285
286
287
288 protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
289 final boolean isCompaction = false;
290 boolean usePread = isGet || scanUsePread;
291 return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
292 isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
293 }
294
295
296
297
298
299
300
301
302
303 protected void seekScanners(List<? extends KeyValueScanner> scanners,
304 KeyValue seekKey, boolean isLazy, boolean isParallelSeek)
305 throws IOException {
306
307
308
309
310 if (isLazy) {
311 for (KeyValueScanner scanner : scanners) {
312 scanner.requestSeek(seekKey, false, true);
313 }
314 } else {
315 if (!isParallelSeek) {
316 for (KeyValueScanner scanner : scanners) {
317 scanner.seek(seekKey);
318 }
319 } else {
320 parallelSeek(scanners, seekKey);
321 }
322 }
323 }
324
325 protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
326 KVComparator comparator) throws IOException {
327
328 heap = new KeyValueHeap(scanners, comparator);
329 }
330
331
332
333
334
335 protected List<KeyValueScanner> selectScannersFrom(
336 final List<? extends KeyValueScanner> allScanners) {
337 boolean memOnly;
338 boolean filesOnly;
339 if (scan instanceof InternalScan) {
340 InternalScan iscan = (InternalScan)scan;
341 memOnly = iscan.isCheckOnlyMemStore();
342 filesOnly = iscan.isCheckOnlyStoreFiles();
343 } else {
344 memOnly = false;
345 filesOnly = false;
346 }
347
348 List<KeyValueScanner> scanners =
349 new ArrayList<KeyValueScanner>(allScanners.size());
350
351
352
353 long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
354 Long.MIN_VALUE;
355
356
357 for (KeyValueScanner kvs : allScanners) {
358 boolean isFile = kvs.isFileScanner();
359 if ((!isFile && filesOnly) || (isFile && memOnly)) {
360 continue;
361 }
362
363 if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
364 scanners.add(kvs);
365 }
366 }
367 return scanners;
368 }
369
370 @Override
371 public KeyValue peek() {
372 lock.lock();
373 try {
374 if (this.heap == null) {
375 return this.lastTop;
376 }
377 return this.heap.peek();
378 } finally {
379 lock.unlock();
380 }
381 }
382
383 @Override
384 public KeyValue next() {
385
386 throw new RuntimeException("Never call StoreScanner.next()");
387 }
388
389 @Override
390 public void close() {
391 lock.lock();
392 try {
393 if (this.closing) return;
394 this.closing = true;
395
396 if (this.store != null)
397 this.store.deleteChangedReaderObserver(this);
398 if (this.heap != null)
399 this.heap.close();
400 this.heap = null;
401 this.lastTop = null;
402 } finally {
403 lock.unlock();
404 }
405 }
406
407 @Override
408 public boolean seek(KeyValue key) throws IOException {
409 lock.lock();
410 try {
411
412 checkReseek();
413 return this.heap.seek(key);
414 } finally {
415 lock.unlock();
416 }
417 }
418
419
420
421
422
423
424
425 @Override
426 public boolean next(List<Cell> outResult, int limit) throws IOException {
427 lock.lock();
428 try {
429 if (checkReseek()) {
430 return true;
431 }
432
433
434
435 if (this.heap == null) {
436 close();
437 return false;
438 }
439
440 KeyValue peeked = this.heap.peek();
441 if (peeked == null) {
442 close();
443 return false;
444 }
445
446
447
448 byte[] row = peeked.getBuffer();
449 int offset = peeked.getRowOffset();
450 short length = peeked.getRowLength();
451 if (limit < 0 || matcher.row == null || !Bytes.equals(row, offset, length, matcher.row,
452 matcher.rowOffset, matcher.rowLength)) {
453 this.countPerRow = 0;
454 matcher.setRow(row, offset, length);
455 }
456
457 KeyValue kv;
458
459
460 KeyValue.KVComparator comparator =
461 store != null ? store.getComparator() : null;
462
463 int count = 0;
464 LOOP: while((kv = this.heap.peek()) != null) {
465 if (prevKV != kv) ++kvsScanned;
466 checkScanOrder(prevKV, kv, comparator);
467 prevKV = kv;
468
469 ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
470 switch(qcode) {
471 case INCLUDE:
472 case INCLUDE_AND_SEEK_NEXT_ROW:
473 case INCLUDE_AND_SEEK_NEXT_COL:
474
475 Filter f = matcher.getFilter();
476 if (f != null) {
477
478 kv = KeyValueUtil.ensureKeyValue(f.transformCell(kv));
479 }
480
481 this.countPerRow++;
482 if (storeLimit > -1 &&
483 this.countPerRow > (storeLimit + storeOffset)) {
484
485 if (!matcher.moreRowsMayExistAfter(kv)) {
486 return false;
487 }
488 seekToNextRow(kv);
489 break LOOP;
490 }
491
492
493
494 if (this.countPerRow > storeOffset) {
495 outResult.add(kv);
496 count++;
497 }
498
499 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
500 if (!matcher.moreRowsMayExistAfter(kv)) {
501 return false;
502 }
503 seekToNextRow(kv);
504 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
505 seekAsDirection(matcher.getKeyForNextColumn(kv));
506 } else {
507 this.heap.next();
508 }
509
510 if (limit > 0 && (count == limit)) {
511 break LOOP;
512 }
513 continue;
514
515 case DONE:
516 return true;
517
518 case DONE_SCAN:
519 close();
520 return false;
521
522 case SEEK_NEXT_ROW:
523
524
525 if (!matcher.moreRowsMayExistAfter(kv)) {
526 return false;
527 }
528
529 seekToNextRow(kv);
530 break;
531
532 case SEEK_NEXT_COL:
533 seekAsDirection(matcher.getKeyForNextColumn(kv));
534 break;
535
536 case SKIP:
537 this.heap.next();
538 break;
539
540 case SEEK_NEXT_USING_HINT:
541
542 KeyValue nextKV = KeyValueUtil.ensureKeyValue(matcher.getNextKeyHint(kv));
543 if (nextKV != null) {
544 seekAsDirection(nextKV);
545 } else {
546 heap.next();
547 }
548 break;
549
550 default:
551 throw new RuntimeException("UNEXPECTED");
552 }
553 }
554
555 if (count > 0) {
556 return true;
557 }
558
559
560 close();
561 return false;
562 } finally {
563 lock.unlock();
564 }
565 }
566
567 @Override
568 public boolean next(List<Cell> outResult) throws IOException {
569 return next(outResult, -1);
570 }
571
572
573 @Override
574 public void updateReaders() throws IOException {
575 lock.lock();
576 try {
577 if (this.closing) return;
578
579
580
581
582
583
584 if (this.heap == null) return;
585
586
587 this.lastTop = this.peek();
588
589
590
591
592 this.heap.close();
593 this.heap = null;
594
595
596 } finally {
597 lock.unlock();
598 }
599 }
600
601
602
603
604
605
606 protected boolean checkReseek() throws IOException {
607 if (this.heap == null && this.lastTop != null) {
608 resetScannerStack(this.lastTop);
609 if (this.heap.peek() == null
610 || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
611 LOG.debug("Storescanner.peek() is changed where before = "
612 + this.lastTop.toString() + ",and after = " + this.heap.peek());
613 this.lastTop = null;
614 return true;
615 }
616 this.lastTop = null;
617 }
618
619 return false;
620 }
621
622 protected void resetScannerStack(KeyValue lastTopKey) throws IOException {
623 if (heap != null) {
624 throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
625 }
626
627
628
629
630 List<KeyValueScanner> scanners = getScannersNoCompaction();
631
632
633 seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
634
635
636 resetKVHeap(scanners, store.getComparator());
637
638
639
640
641 KeyValue kv = heap.peek();
642 if (kv == null) {
643 kv = lastTopKey;
644 }
645 byte[] row = kv.getBuffer();
646 int offset = kv.getRowOffset();
647 short length = kv.getRowLength();
648 if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
649 matcher.rowOffset, matcher.rowLength)) {
650 this.countPerRow = 0;
651 matcher.reset();
652 matcher.setRow(row, offset, length);
653 }
654 }
655
656
657
658
659
660
661
662
663 protected void checkScanOrder(KeyValue prevKV, KeyValue kv,
664 KeyValue.KVComparator comparator) throws IOException {
665
666 assert prevKV == null || comparator == null
667 || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
668 + " followed by a " + "smaller key " + kv + " in cf " + store;
669 }
670
671 protected boolean seekToNextRow(KeyValue kv) throws IOException {
672 return reseek(matcher.getKeyForNextRow(kv));
673 }
674
675
676
677
678
679
680
681 protected boolean seekAsDirection(KeyValue kv)
682 throws IOException {
683 return reseek(kv);
684 }
685
686 @Override
687 public boolean reseek(KeyValue kv) throws IOException {
688 lock.lock();
689 try {
690
691
692
693 checkReseek();
694 if (explicitColumnQuery && lazySeekEnabledGlobally) {
695 return heap.requestSeek(kv, true, useRowColBloom);
696 }
697 return heap.reseek(kv);
698 } finally {
699 lock.unlock();
700 }
701 }
702
703 @Override
704 public long getSequenceID() {
705 return 0;
706 }
707
708
709
710
711
712
713
714 private void parallelSeek(final List<? extends KeyValueScanner>
715 scanners, final KeyValue kv) throws IOException {
716 if (scanners.isEmpty()) return;
717 int storeFileScannerCount = scanners.size();
718 CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
719 List<ParallelSeekHandler> handlers =
720 new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
721 for (KeyValueScanner scanner : scanners) {
722 if (scanner instanceof StoreFileScanner) {
723 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
724 this.readPt, latch);
725 executor.submit(seekHandler);
726 handlers.add(seekHandler);
727 } else {
728 scanner.seek(kv);
729 latch.countDown();
730 }
731 }
732
733 try {
734 latch.await();
735 } catch (InterruptedException ie) {
736 throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
737 }
738
739 for (ParallelSeekHandler handler : handlers) {
740 if (handler.getErr() != null) {
741 throw new IOException(handler.getErr());
742 }
743 }
744 }
745
746
747
748
749
750 List<KeyValueScanner> getAllScannersForTesting() {
751 List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
752 KeyValueScanner current = heap.getCurrentForTesting();
753 if (current != null)
754 allScanners.add(current);
755 for (KeyValueScanner scanner : heap.getHeap())
756 allScanners.add(scanner);
757 return allScanners;
758 }
759
760 static void enableLazySeekGlobally(boolean enable) {
761 lazySeekEnabledGlobally = enable;
762 }
763
764
765
766
767 public long getEstimatedNumberOfKvsScanned() {
768 return this.kvsScanned;
769 }
770 }
771