View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.NavigableSet;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.DoNotRetryIOException;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.KeyValue.KVComparator;
38  import org.apache.hadoop.hbase.KeyValueUtil;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.client.IsolationLevel;
41  import org.apache.hadoop.hbase.client.Scan;
42  import org.apache.hadoop.hbase.executor.ExecutorService;
43  import org.apache.hadoop.hbase.filter.Filter;
44  import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
45  import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
46  import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
47  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50  
51  import com.google.common.annotations.VisibleForTesting;
52  
53  /**
54   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
55   * into List<KeyValue> for a single row.
56   */
57  @InterfaceAudience.Private
58  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
59      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
60    private static final Log LOG = LogFactory.getLog(StoreScanner.class);
61    // In unit tests, the store could be null
62    protected final Store store;
63    protected ScanQueryMatcher matcher;
64    protected KeyValueHeap heap;
65    protected boolean cacheBlocks;
66  
67    protected long countPerRow = 0;
68    protected int storeLimit = -1;
69    protected int storeOffset = 0;
70  
71    // Used to indicate that the scanner has closed (see HBASE-1107)
72    // Doesnt need to be volatile because it's always accessed via synchronized methods
73    protected boolean closing = false;
74    protected final boolean get;
75    protected final boolean explicitColumnQuery;
76    protected final boolean useRowColBloom;
77    /**
78     * A flag that enables StoreFileScanner parallel-seeking
79     */
80    protected boolean parallelSeekEnabled = false;
81    protected ExecutorService executor;
82    protected final Scan scan;
83    protected final NavigableSet<byte[]> columns;
84    protected final long oldestUnexpiredTS;
85    protected final long now;
86    protected final int minVersions;
87    protected final long maxRowSize;
88    protected final long cellsPerHeartbeatCheck;
89  
90    /**
91     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
92     * KVs skipped via seeking to next row/column. TODO: estimate them?
93     */
94    private long kvsScanned = 0;
95    private Cell prevCell = null;
96  
97    /** We don't ever expect to change this, the constant is just for clarity. */
98    static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
99    public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
100       "hbase.storescanner.parallel.seek.enable";
101 
102   /** Used during unit testing to ensure that lazy seek does save seek ops */
103   protected static boolean lazySeekEnabledGlobally =
104       LAZY_SEEK_ENABLED_BY_DEFAULT;
105 
106   /**
107    * The number of cells scanned in between timeout checks. Specifying a larger value means that
108    * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
109    * timeout checks.
110    */
111   public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
112       "hbase.cells.scanned.per.heartbeat.check";
113 
114   /**
115    * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
116    */
117   public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
118 
119   // if heap == null and lastTop != null, you need to reseek given the key below
120   protected Cell lastTop = null;
121 
122   // A flag whether use pread for scan
123   private boolean scanUsePread = false;
124   protected ReentrantLock lock = new ReentrantLock();
125   
126   private final long readPt;
127 
128   // used by the injection framework to test race between StoreScanner construction and compaction
129   enum StoreScannerCompactionRace {
130     BEFORE_SEEK,
131     AFTER_SEEK,
132     COMPACT_COMPLETE
133   }
134 
135   /** An internal constructor. */
136   protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
137       final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks) {
138     this.readPt = readPt;
139     this.store = store;
140     this.cacheBlocks = cacheBlocks;
141     get = scan.isGetScan();
142     int numCol = columns == null ? 0 : columns.size();
143     explicitColumnQuery = numCol > 0;
144     this.scan = scan;
145     this.columns = columns;
146     this.now = EnvironmentEdgeManager.currentTime();
147     this.oldestUnexpiredTS = now - scanInfo.getTtl();
148     this.minVersions = scanInfo.getMinVersions();
149 
150      // We look up row-column Bloom filters for multi-column queries as part of
151      // the seek operation. However, we also look the row-column Bloom filter
152      // for multi-row (non-"get") scans because this is not done in
153      // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
154      this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
155 
156      this.maxRowSize = scanInfo.getTableMaxRowSize();
157      this.scanUsePread = scan.isSmall()? true: scanInfo.isUsePread();
158      this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
159      // Parallel seeking is on if the config allows and more there is more than one store file.
160      if (this.store != null && this.store.getStorefilesCount() > 1) {
161        RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
162        if (rsService != null && scanInfo.isParallelSeekEnabled()) {
163          this.parallelSeekEnabled = true;
164          this.executor = rsService.getExecutorService();
165        }
166      }
167   }
168 
169   /**
170    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
171    * are not in a compaction.
172    *
173    * @param store who we scan
174    * @param scan the spec
175    * @param columns which columns we are scanning
176    * @throws IOException
177    */
178   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
179       long readPt)
180   throws IOException {
181     this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
182     if (columns != null && scan.isRaw()) {
183       throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
184     }
185     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
186         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
187         oldestUnexpiredTS, now, store.getCoprocessorHost());
188 
189     this.store.addChangedReaderObserver(this);
190 
191     // Pass columns to try to filter out unnecessary StoreFiles.
192     List<KeyValueScanner> scanners = getScannersNoCompaction();
193 
194     // Seek all scanners to the start of the Row (or if the exact matching row
195     // key does not exist, then to the start of the next matching Row).
196     // Always check bloom filter to optimize the top row seek for delete
197     // family marker.
198     seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
199         && lazySeekEnabledGlobally, parallelSeekEnabled);
200 
201     // set storeLimit
202     this.storeLimit = scan.getMaxResultsPerColumnFamily();
203 
204     // set rowOffset
205     this.storeOffset = scan.getRowOffsetPerColumnFamily();
206 
207     // Combine all seeked scanners with a heap
208     resetKVHeap(scanners, store.getComparator());
209   }
210 
211   /**
212    * Used for compactions.<p>
213    *
214    * Opens a scanner across specified StoreFiles.
215    * @param store who we scan
216    * @param scan the spec
217    * @param scanners ancillary scanners
218    * @param smallestReadPoint the readPoint that we should use for tracking
219    *          versions
220    */
221   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
222       List<? extends KeyValueScanner> scanners, ScanType scanType,
223       long smallestReadPoint, long earliestPutTs) throws IOException {
224     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
225   }
226 
227   /**
228    * Used for compactions that drop deletes from a limited range of rows.<p>
229    *
230    * Opens a scanner across specified StoreFiles.
231    * @param store who we scan
232    * @param scan the spec
233    * @param scanners ancillary scanners
234    * @param smallestReadPoint the readPoint that we should use for tracking versions
235    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
236    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
237    */
238   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
239       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
240       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
241     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
242         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
243   }
244 
245   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
246       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
247       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
248     this(store, scan, scanInfo, null,
249       ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED), false);
250     if (dropDeletesFromRow == null) {
251       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
252           earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
253     } else {
254       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
255           oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
256     }
257 
258     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
259     scanners = selectScannersFrom(scanners);
260 
261     // Seek all scanners to the initial key
262     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
263 
264     // Combine all seeked scanners with a heap
265     resetKVHeap(scanners, store.getComparator());
266   }
267 
268   @VisibleForTesting
269   StoreScanner(final Scan scan, ScanInfo scanInfo,
270       ScanType scanType, final NavigableSet<byte[]> columns,
271       final List<KeyValueScanner> scanners) throws IOException {
272     this(scan, scanInfo, scanType, columns, scanners,
273         HConstants.LATEST_TIMESTAMP,
274         // 0 is passed as readpoint because the test bypasses Store
275         0);
276   }
277 
278   @VisibleForTesting
279   StoreScanner(final Scan scan, ScanInfo scanInfo,
280     ScanType scanType, final NavigableSet<byte[]> columns,
281     final List<KeyValueScanner> scanners, long earliestPutTs)
282         throws IOException {
283     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
284       // 0 is passed as readpoint because the test bypasses Store
285       0);
286   }
287   
288   private StoreScanner(final Scan scan, ScanInfo scanInfo,
289       ScanType scanType, final NavigableSet<byte[]> columns,
290       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
291   throws IOException {
292     this(null, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
293     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
294         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
295 
296     // In unit tests, the store could be null
297     if (this.store != null) {
298       this.store.addChangedReaderObserver(this);
299     }
300     // Seek all scanners to the initial key
301     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
302     resetKVHeap(scanners, scanInfo.getComparator());
303   }
304 
305   /**
306    * Get a filtered list of scanners. Assumes we are not in a compaction.
307    * @return list of scanners to seek
308    */
309   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
310     final boolean isCompaction = false;
311     boolean usePread = get || scanUsePread;
312     return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread,
313         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
314   }
315 
316   /**
317    * Seek the specified scanners with the given key
318    * @param scanners
319    * @param seekKey
320    * @param isLazy true if using lazy seek
321    * @param isParallelSeek true if using parallel seek
322    * @throws IOException
323    */
324   protected void seekScanners(List<? extends KeyValueScanner> scanners,
325       Cell seekKey, boolean isLazy, boolean isParallelSeek)
326       throws IOException {
327     // Seek all scanners to the start of the Row (or if the exact matching row
328     // key does not exist, then to the start of the next matching Row).
329     // Always check bloom filter to optimize the top row seek for delete
330     // family marker.
331     if (isLazy) {
332       for (KeyValueScanner scanner : scanners) {
333         scanner.requestSeek(seekKey, false, true);
334       }
335     } else {
336       if (!isParallelSeek) {
337         long totalScannersSoughtBytes = 0;
338         for (KeyValueScanner scanner : scanners) {
339           if (totalScannersSoughtBytes >= maxRowSize) {
340             throw new RowTooBigException("Max row size allowed: " + maxRowSize
341               + ", but row is bigger than that");
342           }
343           scanner.seek(seekKey);
344           Cell c = scanner.peek();
345           if (c != null) {
346             totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
347           }
348         }
349       } else {
350         parallelSeek(scanners, seekKey);
351       }
352     }
353   }
354 
355   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
356       KVComparator comparator) throws IOException {
357     // Combine all seeked scanners with a heap
358     heap = new KeyValueHeap(scanners, comparator);
359   }
360 
361   /**
362    * Filters the given list of scanners using Bloom filter, time range, and
363    * TTL.
364    */
365   protected List<KeyValueScanner> selectScannersFrom(
366       final List<? extends KeyValueScanner> allScanners) {
367     boolean memOnly;
368     boolean filesOnly;
369     if (scan instanceof InternalScan) {
370       InternalScan iscan = (InternalScan)scan;
371       memOnly = iscan.isCheckOnlyMemStore();
372       filesOnly = iscan.isCheckOnlyStoreFiles();
373     } else {
374       memOnly = false;
375       filesOnly = false;
376     }
377 
378     List<KeyValueScanner> scanners =
379         new ArrayList<KeyValueScanner>(allScanners.size());
380 
381     // We can only exclude store files based on TTL if minVersions is set to 0.
382     // Otherwise, we might have to return KVs that have technically expired.
383     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
384         Long.MIN_VALUE;
385 
386     // include only those scan files which pass all filters
387     for (KeyValueScanner kvs : allScanners) {
388       boolean isFile = kvs.isFileScanner();
389       if ((!isFile && filesOnly) || (isFile && memOnly)) {
390         continue;
391       }
392 
393       if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
394         scanners.add(kvs);
395       }
396     }
397     return scanners;
398   }
399 
400   @Override
401   public Cell peek() {
402     lock.lock();
403     try {
404     if (this.heap == null) {
405       return this.lastTop;
406     }
407     return this.heap.peek();
408     } finally {
409       lock.unlock();
410     }
411   }
412 
413   @Override
414   public KeyValue next() {
415     // throw runtime exception perhaps?
416     throw new RuntimeException("Never call StoreScanner.next()");
417   }
418 
419   @Override
420   public void close() {
421     lock.lock();
422     try {
423     if (this.closing) return;
424     this.closing = true;
425     // Under test, we dont have a this.store
426     if (this.store != null)
427       this.store.deleteChangedReaderObserver(this);
428     if (this.heap != null)
429       this.heap.close();
430     this.heap = null; // CLOSED!
431     this.lastTop = null; // If both are null, we are closed.
432     } finally {
433       lock.unlock();
434     }
435   }
436 
437   @Override
438   public boolean seek(Cell key) throws IOException {
439     lock.lock();
440     try {
441     // reset matcher state, in case that underlying store changed
442     checkReseek();
443     return this.heap.seek(key);
444     } finally {
445       lock.unlock();
446     }
447   }
448 
449   @Override
450   public boolean next(List<Cell> outResult) throws IOException {
451     return next(outResult, NoLimitScannerContext.getInstance());
452   }
453 
454   /**
455    * Get the next row of values from this Store.
456    * @param outResult
457    * @param scannerContext
458    * @return true if there are more rows, false if scanner is done
459    */
460   @Override
461   public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
462     lock.lock();
463 
464     try {
465     if (scannerContext == null) {
466       throw new IllegalArgumentException("Scanner context cannot be null");
467     }
468     if (checkReseek()) {
469       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
470     }
471 
472     // if the heap was left null, then the scanners had previously run out anyways, close and
473     // return.
474     if (this.heap == null) {
475       close();
476       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
477     }
478 
479     Cell cell = this.heap.peek();
480     if (cell == null) {
481       close();
482       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
483     }
484 
485     // only call setRow if the row changes; avoids confusing the query matcher
486     // if scanning intra-row
487     byte[] row = cell.getRowArray();
488     int offset = cell.getRowOffset();
489     short length = cell.getRowLength();
490 
491     // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
492     // rows. Else it is possible we are still traversing the same row so we must perform the row
493     // comparison.
494     if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.row == null) {
495       this.countPerRow = 0;
496       matcher.setRow(row, offset, length);
497     }
498 
499     // Clear progress away unless invoker has indicated it should be kept.
500     if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
501     
502     // Only do a sanity-check if store and comparator are available.
503     KeyValue.KVComparator comparator =
504         store != null ? store.getComparator() : null;
505 
506     int count = 0;
507     long totalBytesRead = 0;
508 
509     LOOP: do {
510       // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
511       if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
512         scannerContext.updateTimeProgress();
513         if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
514           return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
515         }
516       }
517 
518       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
519       checkScanOrder(prevCell, cell, comparator);
520       prevCell = cell;
521 
522       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
523       qcode = optimize(qcode, cell);
524       switch(qcode) {
525         case INCLUDE:
526         case INCLUDE_AND_SEEK_NEXT_ROW:
527         case INCLUDE_AND_SEEK_NEXT_COL:
528 
529           Filter f = matcher.getFilter();
530           if (f != null) {
531             // TODO convert Scan Query Matcher to be Cell instead of KV based ?
532             cell = f.transformCell(cell);
533           }
534 
535           this.countPerRow++;
536           if (storeLimit > -1 &&
537               this.countPerRow > (storeLimit + storeOffset)) {
538             // do what SEEK_NEXT_ROW does.
539             if (!matcher.moreRowsMayExistAfter(cell)) {
540               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
541             }
542             // Setting the matcher.row = null, will mean that after the subsequent seekToNextRow()
543             // the heap.peek() will any way be in the next row. So the SQM.match(cell) need do
544             // another compareRow to say the current row is DONE
545             matcher.row = null;
546             seekToNextRow(cell);
547             break LOOP;
548           }
549 
550           // add to results only if we have skipped #storeOffset kvs
551           // also update metric accordingly
552           if (this.countPerRow > storeOffset) {
553             outResult.add(cell);
554 
555             // Update local tracking information
556             count++;
557             totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
558 
559             // Update the progress of the scanner context
560             scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
561             scannerContext.incrementBatchProgress(1);
562 
563             if (totalBytesRead > maxRowSize) {
564               throw new RowTooBigException("Max row size allowed: " + maxRowSize
565                   + ", but the row is bigger than that.");
566             }
567           }
568 
569           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
570             if (!matcher.moreRowsMayExistAfter(cell)) {
571               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
572             }
573             // Setting the matcher.row = null, will mean that after the subsequent seekToNextRow()
574             // the heap.peek() will any way be in the next row. So the SQM.match(cell) need do
575             // another compareRow to say the current row is DONE
576             matcher.row = null;
577             seekToNextRow(cell);
578           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
579             seekAsDirection(matcher.getKeyForNextColumn(cell));
580           } else {
581             this.heap.next();
582           }
583 
584           if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
585             break LOOP;
586           }
587           if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
588             break LOOP;
589           }
590           continue;
591 
592         case DONE:
593           // We are sure that this row is done and we are in the next row.
594           // So subsequent StoresScanner.next() call need not do another compare
595           // and set the matcher.row
596           matcher.row = null;
597           return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
598 
599         case DONE_SCAN:
600           close();
601           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
602 
603         case SEEK_NEXT_ROW:
604           // This is just a relatively simple end of scan fix, to short-cut end
605           // us if there is an endKey in the scan.
606           if (!matcher.moreRowsMayExistAfter(cell)) {
607             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
608           }
609           // Setting the matcher.row = null, will mean that after the subsequent seekToNextRow()
610           // the heap.peek() will any way be in the next row. So the SQM.match(cell) need do
611           // another compareRow to say the current row is DONE
612           matcher.row = null;
613           seekToNextRow(cell);
614           break;
615 
616         case SEEK_NEXT_COL:
617           seekAsDirection(matcher.getKeyForNextColumn(cell));
618           break;
619 
620         case SKIP:
621           this.heap.next();
622           break;
623 
624         case SEEK_NEXT_USING_HINT:
625           // TODO convert resee to Cell?
626           Cell nextKV = matcher.getNextKeyHint(cell);
627           if (nextKV != null) {
628             seekAsDirection(nextKV);
629           } else {
630             heap.next();
631           }
632           break;
633 
634         default:
635           throw new RuntimeException("UNEXPECTED");
636       }
637     } while((cell = this.heap.peek()) != null);
638 
639     if (count > 0) {
640       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
641     }
642 
643     // No more keys
644     close();
645     return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
646     } finally {
647       lock.unlock();
648     }
649   }
650 
651   /*
652    * See if we should actually SEEK or rather just SKIP to the next Cell.
653    * (see HBASE-13109)
654    */
655   private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
656     switch(qcode) {
657     case INCLUDE_AND_SEEK_NEXT_COL:
658     case SEEK_NEXT_COL:
659     {
660       Cell nextIndexedKey = getNextIndexedKey();
661       if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
662           && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
663         return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
664       }
665       break;
666     }
667     case INCLUDE_AND_SEEK_NEXT_ROW:
668     case SEEK_NEXT_ROW:
669     {
670       Cell nextIndexedKey = getNextIndexedKey();
671       if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
672           && matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
673         return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
674       }
675       break;
676     }
677     default:
678       break;
679     }
680     return qcode;
681   }
682 
683   // Implementation of ChangedReadersObserver
684   @Override
685   public void updateReaders() throws IOException {
686     lock.lock();
687     try {
688     if (this.closing) return;
689 
690     // All public synchronized API calls will call 'checkReseek' which will cause
691     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
692     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
693     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
694     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
695     if (this.heap == null) return;
696 
697     // this could be null.
698     this.lastTop = this.peek();
699 
700     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
701 
702     // close scanners to old obsolete Store files
703     this.heap.close(); // bubble thru and close all scanners.
704     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
705 
706     // Let the next() call handle re-creating and seeking
707     } finally {
708       lock.unlock();
709     }
710   }
711 
712   /**
713    * @return true if top of heap has changed (and KeyValueHeap has to try the
714    *         next KV)
715    * @throws IOException
716    */
717   protected boolean checkReseek() throws IOException {
718     if (this.heap == null && this.lastTop != null) {
719       resetScannerStack(this.lastTop);
720       if (this.heap.peek() == null
721           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
722         LOG.debug("Storescanner.peek() is changed where before = "
723             + this.lastTop.toString() + ",and after = " + this.heap.peek());
724         this.lastTop = null;
725         return true;
726       }
727       this.lastTop = null; // gone!
728     }
729     // else dont need to reseek
730     return false;
731   }
732 
733   protected void resetScannerStack(Cell lastTopKey) throws IOException {
734     if (heap != null) {
735       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
736     }
737 
738     /* When we have the scan object, should we not pass it to getScanners()
739      * to get a limited set of scanners? We did so in the constructor and we
740      * could have done it now by storing the scan object from the constructor */
741     List<KeyValueScanner> scanners = getScannersNoCompaction();
742 
743     // Seek all scanners to the initial key
744     seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
745 
746     // Combine all seeked scanners with a heap
747     resetKVHeap(scanners, store.getComparator());
748 
749     // Reset the state of the Query Matcher and set to top row.
750     // Only reset and call setRow if the row changes; avoids confusing the
751     // query matcher if scanning intra-row.
752     Cell kv = heap.peek();
753     if (kv == null) {
754       kv = lastTopKey;
755     }
756     byte[] row = kv.getRowArray();
757     int offset = kv.getRowOffset();
758     short length = kv.getRowLength();
759     if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
760         matcher.rowOffset, matcher.rowLength)) {
761       this.countPerRow = 0;
762       matcher.reset();
763       matcher.setRow(row, offset, length);
764     }
765   }
766 
767   /**
768    * Check whether scan as expected order
769    * @param prevKV
770    * @param kv
771    * @param comparator
772    * @throws IOException
773    */
774   protected void checkScanOrder(Cell prevKV, Cell kv,
775       KeyValue.KVComparator comparator) throws IOException {
776     // Check that the heap gives us KVs in an increasing order.
777     assert prevKV == null || comparator == null
778         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
779         + " followed by a " + "smaller key " + kv + " in cf " + store;
780   }
781 
782   protected boolean seekToNextRow(Cell kv) throws IOException {
783     return reseek(KeyValueUtil.createLastOnRow(kv));
784   }
785 
786   /**
787    * Do a reseek in a normal StoreScanner(scan forward)
788    * @param kv
789    * @return true if scanner has values left, false if end of scanner
790    * @throws IOException
791    */
792   protected boolean seekAsDirection(Cell kv)
793       throws IOException {
794     return reseek(kv);
795   }
796 
797   @Override
798   public boolean reseek(Cell kv) throws IOException {
799     lock.lock();
800     try {
801     //Heap will not be null, if this is called from next() which.
802     //If called from RegionScanner.reseek(...) make sure the scanner
803     //stack is reset if needed.
804     checkReseek();
805     if (explicitColumnQuery && lazySeekEnabledGlobally) {
806       return heap.requestSeek(kv, true, useRowColBloom);
807     }
808     return heap.reseek(kv);
809     } finally {
810       lock.unlock();
811     }
812   }
813 
814   @Override
815   public long getSequenceID() {
816     return 0;
817   }
818 
819   /**
820    * Seek storefiles in parallel to optimize IO latency as much as possible
821    * @param scanners the list {@link KeyValueScanner}s to be read from
822    * @param kv the KeyValue on which the operation is being requested
823    * @throws IOException
824    */
825   private void parallelSeek(final List<? extends KeyValueScanner>
826       scanners, final Cell kv) throws IOException {
827     if (scanners.isEmpty()) return;
828     int storeFileScannerCount = scanners.size();
829     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
830     List<ParallelSeekHandler> handlers = 
831         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
832     for (KeyValueScanner scanner : scanners) {
833       if (scanner instanceof StoreFileScanner) {
834         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
835           this.readPt, latch);
836         executor.submit(seekHandler);
837         handlers.add(seekHandler);
838       } else {
839         scanner.seek(kv);
840         latch.countDown();
841       }
842     }
843 
844     try {
845       latch.await();
846     } catch (InterruptedException ie) {
847       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
848     }
849 
850     for (ParallelSeekHandler handler : handlers) {
851       if (handler.getErr() != null) {
852         throw new IOException(handler.getErr());
853       }
854     }
855   }
856 
857   /**
858    * Used in testing.
859    * @return all scanners in no particular order
860    */
861   List<KeyValueScanner> getAllScannersForTesting() {
862     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
863     KeyValueScanner current = heap.getCurrentForTesting();
864     if (current != null)
865       allScanners.add(current);
866     for (KeyValueScanner scanner : heap.getHeap())
867       allScanners.add(scanner);
868     return allScanners;
869   }
870 
871   static void enableLazySeekGlobally(boolean enable) {
872     lazySeekEnabledGlobally = enable;
873   }
874 
875   /**
876    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
877    */
878   public long getEstimatedNumberOfKvsScanned() {
879     return this.kvsScanned;
880   }
881 
882   @Override
883   public Cell getNextIndexedKey() {
884     return this.heap.getNextIndexedKey();
885   }
886 }
887