View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import java.io.DataInput;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.ArrayList;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.classification.InterfaceAudience;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.KeyValue.KVComparator;
34  import org.apache.hadoop.hbase.fs.HFileSystem;
35  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
36  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
37  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
38  import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
39  import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
40  import org.apache.hadoop.hbase.util.ByteBufferUtils;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.IdLock;
43  import org.apache.hadoop.io.WritableUtils;
44  import org.cloudera.htrace.Trace;
45  import org.cloudera.htrace.TraceScope;
46  
47  import com.google.common.annotations.VisibleForTesting;
48  
49  /**
50   * {@link HFile} reader for version 2.
51   */
52  @InterfaceAudience.Private
53  public class HFileReaderV2 extends AbstractHFileReader {
54  
55    private static final Log LOG = LogFactory.getLog(HFileReaderV2.class);
56  
57    /** Minor versions in HFile V2 starting with this number have hbase checksums */
58    public static final int MINOR_VERSION_WITH_CHECKSUM = 1;
59    /** In HFile V2 minor version that does not support checksums */
60    public static final int MINOR_VERSION_NO_CHECKSUM = 0;
61  
62    /** HFile minor version that introduced pbuf filetrailer */
63    public static final int PBUF_TRAILER_MINOR_VERSION = 2;
64  
65    /**
66     * The size of a (key length, value length) tuple that prefixes each entry in
67     * a data block.
68     */
69    public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
70  
71    protected boolean includesMemstoreTS = false;
72    protected boolean decodeMemstoreTS = false;
73    protected boolean shouldIncludeMemstoreTS() {
74      return includesMemstoreTS;
75    }
76  
77    /** Filesystem-level block reader. */
78    protected HFileBlock.FSReader fsBlockReader;
79  
80    /**
81     * A "sparse lock" implementation allowing to lock on a particular block
82     * identified by offset. The purpose of this is to avoid two clients loading
83     * the same block, and have all but one client wait to get the block from the
84     * cache.
85     */
86    private IdLock offsetLock = new IdLock();
87  
88    /**
89     * Blocks read from the load-on-open section, excluding data root index, meta
90     * index, and file info.
91     */
92    private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
93  
94    /** Minimum minor version supported by this HFile format */
95    static final int MIN_MINOR_VERSION = 0;
96  
97    /** Maximum minor version supported by this HFile format */
98    // We went to version 2 when we moved to pb'ing fileinfo and the trailer on
99    // the file. This version can read Writables version 1.
100   static final int MAX_MINOR_VERSION = 3;
101 
102   /** Minor versions starting with this number have faked index key */
103   static final int MINOR_VERSION_WITH_FAKED_KEY = 3;
104 
105   protected HFileContext hfileContext;
106 
107   /**
108    * Opens a HFile. You must load the index before you can use it by calling
109    * {@link #loadFileInfo()}.
110    *
111    * @param path Path to HFile.
112    * @param trailer File trailer.
113    * @param fsdis input stream.
114    * @param size Length of the stream.
115    * @param cacheConf Cache configuration.
116    * @param hfs
117    * @param conf
118    */
119   public HFileReaderV2(final Path path, final FixedFileTrailer trailer,
120       final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf,
121       final HFileSystem hfs, final Configuration conf) throws IOException {
122     super(path, trailer, size, cacheConf, hfs, conf);
123     this.conf = conf;
124     trailer.expectMajorVersion(getMajorVersion());
125     validateMinorVersion(path, trailer.getMinorVersion());
126     this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer);
127     HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis, fileSize, hfs, path,
128         hfileContext);
129     this.fsBlockReader = fsBlockReaderV2; // upcast
130 
131     // Comparator class name is stored in the trailer in version 2.
132     comparator = trailer.createComparator();
133     dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
134         trailer.getNumDataIndexLevels(), this);
135     metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
136         KeyValue.RAW_COMPARATOR, 1);
137 
138     // Parse load-on-open data.
139 
140     HFileBlock.BlockIterator blockIter = fsBlockReaderV2.blockRange(
141         trailer.getLoadOnOpenDataOffset(),
142         fileSize - trailer.getTrailerSize());
143 
144     // Data index. We also read statistics about the block index written after
145     // the root level.
146     dataBlockIndexReader.readMultiLevelIndexRoot(
147         blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
148         trailer.getDataIndexCount());
149 
150     // Meta index.
151     metaBlockIndexReader.readRootIndex(
152         blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
153         trailer.getMetaIndexCount());
154 
155     // File info
156     fileInfo = new FileInfo();
157     fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
158     lastKey = fileInfo.get(FileInfo.LASTKEY);
159     avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
160     avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
161     byte [] keyValueFormatVersion =
162         fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
163     includesMemstoreTS = keyValueFormatVersion != null &&
164         Bytes.toInt(keyValueFormatVersion) ==
165             HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE;
166     fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS);
167     if (includesMemstoreTS) {
168       decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0;
169     }
170 
171     // Read data block encoding algorithm name from file info.
172     dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo);
173     fsBlockReaderV2.setDataBlockEncoder(dataBlockEncoder);
174 
175     // Store all other load-on-open blocks for further consumption.
176     HFileBlock b;
177     while ((b = blockIter.nextBlock()) != null) {
178       loadOnOpenBlocks.add(b);
179     }
180 
181     // Prefetch file blocks upon open if requested
182     if (cacheConf.shouldPrefetchOnOpen()) {
183       PrefetchExecutor.request(path, new Runnable() {
184         public void run() {
185           try {
186             long offset = 0;
187             long end = fileSize - getTrailer().getTrailerSize();
188             HFileBlock prevBlock = null;
189             while (offset < end) {
190               if (Thread.interrupted()) {
191                 break;
192               }
193               long onDiskSize = -1;
194               if (prevBlock != null) {
195                 onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
196               }
197               HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false, null);
198               prevBlock = block;
199               offset += block.getOnDiskSizeWithHeader();
200             }
201           } catch (IOException e) {
202             // IOExceptions are probably due to region closes (relocation, etc.)
203             if (LOG.isTraceEnabled()) {
204               LOG.trace("Exception encountered while prefetching " + path + ":", e);
205             }
206           } catch (Exception e) {
207             // Other exceptions are interesting
208             LOG.warn("Exception encountered while prefetching " + path + ":", e);
209           } finally {
210             PrefetchExecutor.complete(path);
211           }
212         }
213       });
214     }
215   }
216 
217   protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
218       HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException {
219     return new HFileContextBuilder()
220       .withIncludesMvcc(this.includesMemstoreTS)
221       .withCompression(this.compressAlgo)
222       .withHBaseCheckSum(trailer.getMinorVersion() >= MINOR_VERSION_WITH_CHECKSUM)
223       .build();
224   }
225 
226   /**
227    * Create a Scanner on this file. No seeks or reads are done on creation. Call
228    * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
229    * nothing to clean up in a Scanner. Letting go of your references to the
230    * scanner is sufficient.
231    *
232    * @param cacheBlocks True if we should cache blocks read in by this scanner.
233    * @param pread Use positional read rather than seek+read if true (pread is
234    *          better for random reads, seek+read is better scanning).
235    * @param isCompaction is scanner being used for a compaction?
236    * @return Scanner on this file.
237    */
238    @Override
239    public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
240       final boolean isCompaction) {
241     if (dataBlockEncoder.useEncodedScanner()) {
242       return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
243           hfileContext);
244     }
245 
246     return new ScannerV2(this, cacheBlocks, pread, isCompaction);
247   }
248 
249   /**
250    * @param metaBlockName
251    * @param cacheBlock Add block to cache, if found
252    * @return block wrapped in a ByteBuffer, with header skipped
253    * @throws IOException
254    */
255   @Override
256   public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
257       throws IOException {
258     if (trailer.getMetaIndexCount() == 0) {
259       return null; // there are no meta blocks
260     }
261     if (metaBlockIndexReader == null) {
262       throw new IOException("Meta index not loaded");
263     }
264 
265     byte[] mbname = Bytes.toBytes(metaBlockName);
266     int block = metaBlockIndexReader.rootBlockContainingKey(mbname, 0,
267         mbname.length);
268     if (block == -1)
269       return null;
270     long blockSize = metaBlockIndexReader.getRootBlockDataSize(block);
271     long startTimeNs = System.nanoTime();
272 
273     // Per meta key from any given file, synchronize reads for said block. This
274     // is OK to do for meta blocks because the meta block index is always
275     // single-level.
276     synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
277       // Check cache for block. If found return.
278       long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
279       BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset,
280           DataBlockEncoding.NONE, BlockType.META);
281 
282       cacheBlock &= cacheConf.shouldCacheDataOnRead();
283       if (cacheConf.isBlockCacheEnabled()) {
284         HFileBlock cachedBlock =
285           (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, false, true);
286         if (cachedBlock != null) {
287           // Return a distinct 'shallow copy' of the block,
288           // so pos does not get messed by the scanner
289           return cachedBlock.getBufferWithoutHeader();
290         }
291         // Cache Miss, please load.
292       }
293 
294       HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
295           blockSize, -1, true);
296 
297       final long delta = System.nanoTime() - startTimeNs;
298       HFile.offerReadLatency(delta, true);
299 
300       // Cache the block
301       if (cacheBlock) {
302         cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock,
303             cacheConf.isInMemory());
304       }
305 
306       return metaBlock.getBufferWithoutHeader();
307     }
308   }
309 
310   /**
311    * Read in a file block.
312    * @param dataBlockOffset offset to read.
313    * @param onDiskBlockSize size of the block
314    * @param cacheBlock
315    * @param pread Use positional read instead of seek+read (positional is
316    *          better doing random reads whereas seek+read is better scanning).
317    * @param isCompaction is this block being read as part of a compaction
318    * @param expectedBlockType the block type we are expecting to read with this
319    *          read operation, or null to read whatever block type is available
320    *          and avoid checking (that might reduce caching efficiency of
321    *          encoded data blocks)
322    * @return Block wrapped in a ByteBuffer.
323    * @throws IOException
324    */
325   @Override
326   public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
327       final boolean cacheBlock, boolean pread, final boolean isCompaction,
328       final boolean updateCacheMetrics, BlockType expectedBlockType)
329       throws IOException {
330     if (dataBlockIndexReader == null) {
331       throw new IOException("Block index not loaded");
332     }
333     if (dataBlockOffset < 0
334         || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) {
335       throw new IOException("Requested block is out of range: "
336           + dataBlockOffset + ", lastDataBlockOffset: "
337           + trailer.getLastDataBlockOffset());
338     }
339     // For any given block from any given file, synchronize reads for said
340     // block.
341     // Without a cache, this synchronizing is needless overhead, but really
342     // the other choice is to duplicate work (which the cache would prevent you
343     // from doing).
344 
345     BlockCacheKey cacheKey =
346         new BlockCacheKey(name, dataBlockOffset,
347             dataBlockEncoder.getDataBlockEncoding(),
348             expectedBlockType);
349 
350     boolean useLock = false;
351     IdLock.Entry lockEntry = null;
352     TraceScope traceScope = Trace.startSpan("HFileReaderV2.readBlock");
353     try {
354       while (true) {
355         if (useLock) {
356           lockEntry = offsetLock.getLockEntry(dataBlockOffset);
357         }
358 
359         // Check cache for block. If found return.
360         if (cacheConf.isBlockCacheEnabled()) {
361           // Try and get the block from the block cache. If the useLock variable is true then this
362           // is the second time through the loop and it should not be counted as a block cache miss.
363           HFileBlock cachedBlock = (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, 
364             cacheBlock, useLock, updateCacheMetrics);
365           if (cachedBlock != null) {
366             validateBlockType(cachedBlock, expectedBlockType);
367             if (cachedBlock.getBlockType().isData()) {
368               HFile.dataBlockReadCnt.incrementAndGet();
369 
370               // Validate encoding type for data blocks. We include encoding
371               // type in the cache key, and we expect it to match on a cache hit.
372               if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
373                 throw new IOException("Cached block under key " + cacheKey + " "
374                   + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
375                   + dataBlockEncoder.getDataBlockEncoding() + ")");
376               }
377             }
378             return cachedBlock;
379           }
380           // Carry on, please load.
381         }
382         if (!useLock) {
383           // check cache again with lock
384           useLock = true;
385           continue;
386         }
387         if (Trace.isTracing()) {
388           traceScope.getSpan().addTimelineAnnotation("blockCacheMiss");
389         }
390         // Load block from filesystem.
391         long startTimeNs = System.nanoTime();
392         HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1,
393             pread);
394         validateBlockType(hfileBlock, expectedBlockType);
395 
396         final long delta = System.nanoTime() - startTimeNs;
397         HFile.offerReadLatency(delta, pread);
398 
399         // Cache the block if necessary
400         if (cacheBlock && cacheConf.shouldCacheBlockOnRead(hfileBlock.getBlockType().getCategory())) {
401           cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
402         }
403 
404         if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
405           HFile.dataBlockReadCnt.incrementAndGet();
406         }
407 
408         return hfileBlock;
409       }
410     } finally {
411       traceScope.close();
412       if (lockEntry != null) {
413         offsetLock.releaseLockEntry(lockEntry);
414       }
415     }
416   }
417 
418   @Override
419   public boolean hasMVCCInfo() {
420     return includesMemstoreTS && decodeMemstoreTS;
421   }
422 
423   /**
424    * Compares the actual type of a block retrieved from cache or disk with its
425    * expected type and throws an exception in case of a mismatch. Expected
426    * block type of {@link BlockType#DATA} is considered to match the actual
427    * block type [@link {@link BlockType#ENCODED_DATA} as well.
428    * @param block a block retrieved from cache or disk
429    * @param expectedBlockType the expected block type, or null to skip the
430    *          check
431    */
432   private void validateBlockType(HFileBlock block,
433       BlockType expectedBlockType) throws IOException {
434     if (expectedBlockType == null) {
435       return;
436     }
437     BlockType actualBlockType = block.getBlockType();
438     if (actualBlockType == BlockType.ENCODED_DATA &&
439         expectedBlockType == BlockType.DATA) {
440       // We consider DATA to match ENCODED_DATA for the purpose of this
441       // verification.
442       return;
443     }
444     if (actualBlockType != expectedBlockType) {
445       throw new IOException("Expected block type " + expectedBlockType + ", " +
446           "but got " + actualBlockType + ": " + block);
447     }
448   }
449 
450   /**
451    * @return Last key in the file. May be null if file has no entries. Note that
452    *         this is not the last row key, but rather the byte form of the last
453    *         KeyValue.
454    */
455   @Override
456   public byte[] getLastKey() {
457     return dataBlockIndexReader.isEmpty() ? null : lastKey;
458   }
459 
460   /**
461    * @return Midkey for this file. We work with block boundaries only so
462    *         returned midkey is an approximation only.
463    * @throws IOException
464    */
465   @Override
466   public byte[] midkey() throws IOException {
467     return dataBlockIndexReader.midkey();
468   }
469 
470   @Override
471   public void close() throws IOException {
472     close(cacheConf.shouldEvictOnClose());
473   }
474 
475   public void close(boolean evictOnClose) throws IOException {
476     PrefetchExecutor.cancel(path);
477     if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
478       int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name);
479       if (LOG.isTraceEnabled()) {
480         LOG.trace("On close, file=" + name + " evicted=" + numEvicted
481           + " block(s)");
482       }
483     }
484     fsBlockReader.closeStreams();
485   }
486 
487   /** For testing */
488   @Override
489   HFileBlock.FSReader getUncachedBlockReader() {
490     return fsBlockReader;
491   }
492 
493 
494   protected abstract static class AbstractScannerV2
495       extends AbstractHFileReader.Scanner {
496     protected HFileBlock block;
497 
498     /**
499      * The next indexed key is to keep track of the indexed key of the next data block.
500      * If the nextIndexedKey is HConstants.NO_NEXT_INDEXED_KEY, it means that the
501      * current data block is the last data block.
502      *
503      * If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet.
504      */
505     protected byte[] nextIndexedKey;
506 
507     public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
508         final boolean pread, final boolean isCompaction) {
509       super(r, cacheBlocks, pread, isCompaction);
510     }
511 
512     /**
513      * An internal API function. Seek to the given key, optionally rewinding to
514      * the first key of the block before doing the seek.
515      *
516      * @param key key byte array
517      * @param offset key offset in the key byte array
518      * @param length key length
519      * @param rewind whether to rewind to the first key of the block before
520      *        doing the seek. If this is false, we are assuming we never go
521      *        back, otherwise the result is undefined.
522      * @return -1 if the key is earlier than the first key of the file,
523      *         0 if we are at the given key, 1 if we are past the given key
524      *         -2 if the key is earlier than the first key of the file while
525      *         using a faked index key
526      * @throws IOException
527      */
528     protected int seekTo(byte[] key, int offset, int length, boolean rewind)
529         throws IOException {
530       HFileBlockIndex.BlockIndexReader indexReader =
531           reader.getDataBlockIndexReader();
532       BlockWithScanInfo blockWithScanInfo =
533         indexReader.loadDataBlockWithScanInfo(key, offset, length, block,
534             cacheBlocks, pread, isCompaction);
535       if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
536         // This happens if the key e.g. falls before the beginning of the file.
537         return -1;
538       }
539       return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
540           blockWithScanInfo.getNextIndexedKey(), rewind, key, offset, length, false);
541     }
542 
543     protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
544 
545     protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
546         boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
547         throws IOException;
548 
549     @Override
550     public int seekTo(byte[] key, int offset, int length) throws IOException {
551       // Always rewind to the first key of the block, because the given key
552       // might be before or after the current key.
553       return seekTo(key, offset, length, true);
554     }
555 
556     @Override
557     public int reseekTo(byte[] key, int offset, int length) throws IOException {
558       int compared;
559       if (isSeeked()) {
560         compared = compareKey(reader.getComparator(), key, offset, length);
561         if (compared < 1) {
562           // If the required key is less than or equal to current key, then
563           // don't do anything.
564           return compared;
565         } else {
566           if (this.nextIndexedKey != null &&
567               (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY ||
568                reader.getComparator().compareFlatKey(key, offset, length,
569                    nextIndexedKey, 0, nextIndexedKey.length) < 0)) {
570             // The reader shall continue to scan the current data block instead of querying the
571             // block index as long as it knows the target key is strictly smaller than
572             // the next indexed key or the current data block is the last data block.
573             return loadBlockAndSeekToKey(this.block, this.nextIndexedKey,
574                 false, key, offset, length, false);
575           }
576         }
577       }
578       // Don't rewind on a reseek operation, because reseek implies that we are
579       // always going forward in the file.
580       return seekTo(key, offset, length, false);
581     }
582 
583     @Override
584     public boolean seekBefore(byte[] key, int offset, int length)
585         throws IOException {
586       HFileBlock seekToBlock =
587           reader.getDataBlockIndexReader().seekToDataBlock(key, offset, length,
588               block, cacheBlocks, pread, isCompaction);
589       if (seekToBlock == null) {
590         return false;
591       }
592       ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
593 
594       if (reader.getComparator().compareFlatKey(firstKey.array(),
595           firstKey.arrayOffset(), firstKey.limit(), key, offset, length) >= 0)
596       {
597         long previousBlockOffset = seekToBlock.getPrevBlockOffset();
598         // The key we are interested in
599         if (previousBlockOffset == -1) {
600           // we have a 'problem', the key we want is the first of the file.
601           return false;
602         }
603 
604         // It is important that we compute and pass onDiskSize to the block
605         // reader so that it does not have to read the header separately to
606         // figure out the size.
607         seekToBlock = reader.readBlock(previousBlockOffset,
608             seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
609             pread, isCompaction, true, BlockType.DATA);
610         // TODO shortcut: seek forward in this block to the last key of the
611         // block.
612       }
613       byte[] firstKeyInCurrentBlock = Bytes.getBytes(firstKey);
614       loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, offset, length, true);
615       return true;
616     }
617 
618 
619     /**
620      * Scans blocks in the "scanned" section of the {@link HFile} until the next
621      * data block is found.
622      *
623      * @return the next block, or null if there are no more data blocks
624      * @throws IOException
625      */
626     protected HFileBlock readNextDataBlock() throws IOException {
627       long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
628       if (block == null)
629         return null;
630 
631       HFileBlock curBlock = block;
632 
633       do {
634         if (curBlock.getOffset() >= lastDataBlockOffset)
635           return null;
636 
637         if (curBlock.getOffset() < 0) {
638           throw new IOException("Invalid block file offset: " + block);
639         }
640 
641         // We are reading the next block without block type validation, because
642         // it might turn out to be a non-data block.
643         curBlock = reader.readBlock(curBlock.getOffset()
644             + curBlock.getOnDiskSizeWithHeader(),
645             curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
646             isCompaction, true, null);
647       } while (!curBlock.getBlockType().isData());
648 
649       return curBlock;
650     }
651     /**
652      * Compare the given key against the current key
653      * @param comparator
654      * @param key
655      * @param offset
656      * @param length
657      * @return -1 is the passed key is smaller than the current key, 0 if equal and 1 if greater
658      */
659     public abstract int compareKey(KVComparator comparator, byte[] key, int offset,
660         int length);
661   }
662 
663   /**
664    * Implementation of {@link HFileScanner} interface.
665    */
666   protected static class ScannerV2 extends AbstractScannerV2 {
667     private HFileReaderV2 reader;
668 
669     public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
670         final boolean pread, final boolean isCompaction) {
671       super(r, cacheBlocks, pread, isCompaction);
672       this.reader = r;
673     }
674 
675     @Override
676     public KeyValue getKeyValue() {
677       if (!isSeeked())
678         return null;
679 
680       KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
681           + blockBuffer.position(), getCellBufSize());
682       if (this.reader.shouldIncludeMemstoreTS()) {
683         ret.setMvccVersion(currMemstoreTS);
684       }
685       return ret;
686     }
687 
688     protected int getCellBufSize() {
689       return KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen;
690     }
691 
692     @Override
693     public ByteBuffer getKey() {
694       assertSeeked();
695       return ByteBuffer.wrap(
696           blockBuffer.array(),
697           blockBuffer.arrayOffset() + blockBuffer.position()
698               + KEY_VALUE_LEN_SIZE, currKeyLen).slice();
699     }
700 
701     @Override
702     public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
703       return comparator.compareFlatKey(key, offset, length, blockBuffer.array(),
704           blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen);
705     }
706 
707     @Override
708     public ByteBuffer getValue() {
709       assertSeeked();
710       return ByteBuffer.wrap(
711           blockBuffer.array(),
712           blockBuffer.arrayOffset() + blockBuffer.position()
713               + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
714     }
715 
716     protected void setNonSeekedState() {
717       block = null;
718       blockBuffer = null;
719       currKeyLen = 0;
720       currValueLen = 0;
721       currMemstoreTS = 0;
722       currMemstoreTSLen = 0;
723     }
724 
725     /**
726      * Go to the next key/value in the block section. Loads the next block if
727      * necessary. If successful, {@link #getKey()} and {@link #getValue()} can
728      * be called.
729      *
730      * @return true if successfully navigated to the next key/value
731      */
732     @Override
733     public boolean next() throws IOException {
734       assertSeeked();
735 
736       try {
737         blockBuffer.position(getNextCellStartPosition());
738       } catch (IllegalArgumentException e) {
739         LOG.error("Current pos = " + blockBuffer.position()
740             + "; currKeyLen = " + currKeyLen + "; currValLen = "
741             + currValueLen + "; block limit = " + blockBuffer.limit()
742             + "; HFile name = " + reader.getName()
743             + "; currBlock currBlockOffset = " + block.getOffset());
744         throw e;
745       }
746 
747       if (blockBuffer.remaining() <= 0) {
748         long lastDataBlockOffset =
749             reader.getTrailer().getLastDataBlockOffset();
750 
751         if (block.getOffset() >= lastDataBlockOffset) {
752           setNonSeekedState();
753           return false;
754         }
755 
756         // read the next block
757         HFileBlock nextBlock = readNextDataBlock();
758         if (nextBlock == null) {
759           setNonSeekedState();
760           return false;
761         }
762 
763         updateCurrBlock(nextBlock);
764         return true;
765       }
766 
767       // We are still in the same block.
768       readKeyValueLen();
769       return true;
770     }
771 
772     protected int getNextCellStartPosition() {
773       return blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen
774           + currMemstoreTSLen;
775     }
776 
777     /**
778      * Positions this scanner at the start of the file.
779      *
780      * @return false if empty file; i.e. a call to next would return false and
781      *         the current key and value are undefined.
782      * @throws IOException
783      */
784     @Override
785     public boolean seekTo() throws IOException {
786       if (reader == null) {
787         return false;
788       }
789 
790       if (reader.getTrailer().getEntryCount() == 0) {
791         // No data blocks.
792         return false;
793       }
794 
795       long firstDataBlockOffset =
796           reader.getTrailer().getFirstDataBlockOffset();
797       if (block != null && block.getOffset() == firstDataBlockOffset) {
798         blockBuffer.rewind();
799         readKeyValueLen();
800         return true;
801       }
802 
803       block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
804           isCompaction, true, BlockType.DATA);
805       if (block.getOffset() < 0) {
806         throw new IOException("Invalid block offset: " + block.getOffset());
807       }
808       updateCurrBlock(block);
809       return true;
810     }
811 
812     @Override
813     protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
814         boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
815         throws IOException {
816       if (block == null || block.getOffset() != seekToBlock.getOffset()) {
817         updateCurrBlock(seekToBlock);
818       } else if (rewind) {
819         blockBuffer.rewind();
820       }
821 
822       // Update the nextIndexedKey
823       this.nextIndexedKey = nextIndexedKey;
824       return blockSeek(key, offset, length, seekBefore);
825     }
826 
827     /**
828      * Updates the current block to be the given {@link HFileBlock}. Seeks to
829      * the the first key/value pair.
830      *
831      * @param newBlock the block to make current
832      */
833     protected void updateCurrBlock(HFileBlock newBlock) {
834       block = newBlock;
835 
836       // sanity check
837       if (block.getBlockType() != BlockType.DATA) {
838         throw new IllegalStateException("ScannerV2 works only on data " +
839             "blocks, got " + block.getBlockType() + "; " +
840             "fileName=" + reader.name + ", " +
841             "dataBlockEncoder=" + reader.dataBlockEncoder + ", " +
842             "isCompaction=" + isCompaction);
843       }
844 
845       blockBuffer = block.getBufferWithoutHeader();
846       readKeyValueLen();
847       blockFetches++;
848 
849       // Reset the next indexed key
850       this.nextIndexedKey = null;
851     }
852 
853     protected void readKeyValueLen() {
854       blockBuffer.mark();
855       currKeyLen = blockBuffer.getInt();
856       currValueLen = blockBuffer.getInt();
857       ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen);
858       readMvccVersion();
859       if (currKeyLen < 0 || currValueLen < 0
860           || currKeyLen > blockBuffer.limit()
861           || currValueLen > blockBuffer.limit()) {
862         throw new IllegalStateException("Invalid currKeyLen " + currKeyLen
863             + " or currValueLen " + currValueLen + ". Block offset: "
864             + block.getOffset() + ", block length: " + blockBuffer.limit()
865             + ", position: " + blockBuffer.position() + " (without header).");
866       }
867       blockBuffer.reset();
868     }
869 
870     protected void readMvccVersion() {
871       if (this.reader.shouldIncludeMemstoreTS()) {
872         if (this.reader.decodeMemstoreTS) {
873           try {
874             currMemstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset()
875                 + blockBuffer.position());
876             currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS);
877           } catch (Exception e) {
878             throw new RuntimeException("Error reading memstore timestamp", e);
879           }
880         } else {
881           currMemstoreTS = 0;
882           currMemstoreTSLen = 1;
883         }
884       }
885     }
886 
887     /**
888      * Within a loaded block, seek looking for the last key that is smaller
889      * than (or equal to?) the key we are interested in.
890      *
891      * A note on the seekBefore: if you have seekBefore = true, AND the first
892      * key in the block = key, then you'll get thrown exceptions. The caller has
893      * to check for that case and load the previous block as appropriate.
894      *
895      * @param key the key to find
896      * @param seekBefore find the key before the given key in case of exact
897      *          match.
898      * @return 0 in case of an exact key match, 1 in case of an inexact match,
899      *         -2 in case of an inexact match and furthermore, the input key less
900      *         than the first key of current block(e.g. using a faked index key)
901      */
902     protected int blockSeek(byte[] key, int offset, int length,
903         boolean seekBefore) {
904       int klen, vlen;
905       long memstoreTS = 0;
906       int memstoreTSLen = 0;
907       int lastKeyValueSize = -1;
908       do {
909         blockBuffer.mark();
910         klen = blockBuffer.getInt();
911         vlen = blockBuffer.getInt();
912         blockBuffer.reset();
913         if (this.reader.shouldIncludeMemstoreTS()) {
914           if (this.reader.decodeMemstoreTS) {
915             try {
916               int memstoreTSOffset = blockBuffer.arrayOffset()
917                   + blockBuffer.position() + KEY_VALUE_LEN_SIZE + klen + vlen;
918               memstoreTS = Bytes.readVLong(blockBuffer.array(),
919                   memstoreTSOffset);
920               memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
921             } catch (Exception e) {
922               throw new RuntimeException("Error reading memstore timestamp", e);
923             }
924           } else {
925             memstoreTS = 0;
926             memstoreTSLen = 1;
927           }
928         }
929 
930         int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position()
931             + KEY_VALUE_LEN_SIZE;
932         int comp = reader.getComparator().compareFlatKey(key, offset, length,
933             blockBuffer.array(), keyOffset, klen);
934 
935         if (comp == 0) {
936           if (seekBefore) {
937             if (lastKeyValueSize < 0) {
938               throw new IllegalStateException("blockSeek with seekBefore "
939                   + "at the first key of the block: key="
940                   + Bytes.toStringBinary(key) + ", blockOffset="
941                   + block.getOffset() + ", onDiskSize="
942                   + block.getOnDiskSizeWithHeader());
943             }
944             blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
945             readKeyValueLen();
946             return 1; // non exact match.
947           }
948           currKeyLen = klen;
949           currValueLen = vlen;
950           if (this.reader.shouldIncludeMemstoreTS()) {
951             currMemstoreTS = memstoreTS;
952             currMemstoreTSLen = memstoreTSLen;
953           }
954           return 0; // indicate exact match
955         } else if (comp < 0) {
956           if (lastKeyValueSize > 0)
957             blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
958           readKeyValueLen();
959           if (lastKeyValueSize == -1 && blockBuffer.position() == 0
960               && this.reader.trailer.getMinorVersion() >= MINOR_VERSION_WITH_FAKED_KEY) {
961             return HConstants.INDEX_KEY_MAGIC;
962           }
963           return 1;
964         }
965 
966         // The size of this key/value tuple, including key/value length fields.
967         lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
968         blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
969       } while (blockBuffer.remaining() > 0);
970 
971       // Seek to the last key we successfully read. This will happen if this is
972       // the last key/value pair in the file, in which case the following call
973       // to next() has to return false.
974       blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
975       readKeyValueLen();
976       return 1; // didn't exactly find it.
977     }
978 
979     @Override
980     protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
981       ByteBuffer buffer = curBlock.getBufferWithoutHeader();
982       // It is safe to manipulate this buffer because we own the buffer object.
983       buffer.rewind();
984       int klen = buffer.getInt();
985       buffer.getInt();
986       ByteBuffer keyBuff = buffer.slice();
987       keyBuff.limit(klen);
988       keyBuff.rewind();
989       return keyBuff;
990     }
991 
992     @Override
993     public String getKeyString() {
994       return Bytes.toStringBinary(blockBuffer.array(),
995           blockBuffer.arrayOffset() + blockBuffer.position()
996               + KEY_VALUE_LEN_SIZE, currKeyLen);
997     }
998 
999     @Override
1000     public String getValueString() {
1001       return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
1002           + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
1003           currValueLen);
1004     }
1005   }
1006 
1007   /**
1008    * ScannerV2 that operates on encoded data blocks.
1009    */
1010   protected static class EncodedScannerV2 extends AbstractScannerV2 {
1011     private final HFileBlockDecodingContext decodingCtx;
1012     private final DataBlockEncoder.EncodedSeeker seeker;
1013     private final DataBlockEncoder dataBlockEncoder;
1014     protected final HFileContext meta;
1015 
1016     public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
1017         boolean pread, boolean isCompaction, HFileContext meta) {
1018       super(reader, cacheBlocks, pread, isCompaction);
1019       DataBlockEncoding encoding = reader.dataBlockEncoder.getDataBlockEncoding();
1020       dataBlockEncoder = encoding.getEncoder();
1021       decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(meta);
1022       seeker = dataBlockEncoder.createSeeker(
1023         reader.getComparator(), decodingCtx);
1024       this.meta = meta;
1025     }
1026 
1027     @Override
1028     public boolean isSeeked(){
1029       return this.block != null;
1030     }
1031 
1032     /**
1033      * Updates the current block to be the given {@link HFileBlock}. Seeks to
1034      * the the first key/value pair.
1035      *
1036      * @param newBlock the block to make current
1037      * @throws CorruptHFileException
1038      */
1039     private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException {
1040       block = newBlock;
1041 
1042       // sanity checks
1043       if (block.getBlockType() != BlockType.ENCODED_DATA) {
1044         throw new IllegalStateException(
1045             "EncodedScanner works only on encoded data blocks");
1046       }
1047       short dataBlockEncoderId = block.getDataBlockEncodingId();
1048       if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
1049         String encoderCls = dataBlockEncoder.getClass().getName();
1050         throw new CorruptHFileException("Encoder " + encoderCls
1051           + " doesn't support data block encoding "
1052           + DataBlockEncoding.getNameFromId(dataBlockEncoderId));
1053       }
1054 
1055       seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
1056       blockFetches++;
1057 
1058       // Reset the next indexed key
1059       this.nextIndexedKey = null;
1060     }
1061 
1062     private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
1063       ByteBuffer origBlock = newBlock.getBufferReadOnly();
1064       ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
1065           origBlock.arrayOffset() + newBlock.headerSize() +
1066           DataBlockEncoding.ID_SIZE,
1067           newBlock.getUncompressedSizeWithoutHeader() -
1068           DataBlockEncoding.ID_SIZE).slice();
1069       return encodedBlock;
1070     }
1071 
1072     @Override
1073     public boolean seekTo() throws IOException {
1074       if (reader == null) {
1075         return false;
1076       }
1077 
1078       if (reader.getTrailer().getEntryCount() == 0) {
1079         // No data blocks.
1080         return false;
1081       }
1082 
1083       long firstDataBlockOffset =
1084           reader.getTrailer().getFirstDataBlockOffset();
1085       if (block != null && block.getOffset() == firstDataBlockOffset) {
1086         seeker.rewind();
1087         return true;
1088       }
1089 
1090       block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
1091           isCompaction, true, BlockType.DATA);
1092       if (block.getOffset() < 0) {
1093         throw new IOException("Invalid block offset: " + block.getOffset());
1094       }
1095       updateCurrentBlock(block);
1096       return true;
1097     }
1098 
1099     @Override
1100     public boolean next() throws IOException {
1101       boolean isValid = seeker.next();
1102       if (!isValid) {
1103         block = readNextDataBlock();
1104         isValid = block != null;
1105         if (isValid) {
1106           updateCurrentBlock(block);
1107         }
1108       }
1109       return isValid;
1110     }
1111 
1112     @Override
1113     public ByteBuffer getKey() {
1114       assertValidSeek();
1115       return seeker.getKeyDeepCopy();
1116     }
1117 
1118     @Override
1119     public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
1120       return seeker.compareKey(comparator, key, offset, length);
1121     }
1122 
1123     @Override
1124     public ByteBuffer getValue() {
1125       assertValidSeek();
1126       return seeker.getValueShallowCopy();
1127     }
1128 
1129     @Override
1130     public KeyValue getKeyValue() {
1131       if (block == null) {
1132         return null;
1133       }
1134       return seeker.getKeyValue();
1135     }
1136 
1137     @Override
1138     public String getKeyString() {
1139       ByteBuffer keyBuffer = getKey();
1140       return Bytes.toStringBinary(keyBuffer.array(),
1141           keyBuffer.arrayOffset(), keyBuffer.limit());
1142     }
1143 
1144     @Override
1145     public String getValueString() {
1146       ByteBuffer valueBuffer = getValue();
1147       return Bytes.toStringBinary(valueBuffer.array(),
1148           valueBuffer.arrayOffset(), valueBuffer.limit());
1149     }
1150 
1151     private void assertValidSeek() {
1152       if (block == null) {
1153         throw new NotSeekedException();
1154       }
1155     }
1156 
1157     @Override
1158     protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
1159       return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
1160     }
1161 
1162     @Override
1163     protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
1164         boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
1165         throws IOException  {
1166       if (block == null || block.getOffset() != seekToBlock.getOffset()) {
1167         updateCurrentBlock(seekToBlock);
1168       } else if (rewind) {
1169         seeker.rewind();
1170       }
1171       this.nextIndexedKey = nextIndexedKey;
1172       return seeker.seekToKeyInBlock(key, offset, length, seekBefore);
1173     }
1174   }
1175 
1176   /**
1177    * Returns a buffer with the Bloom filter metadata. The caller takes
1178    * ownership of the buffer.
1179    */
1180   @Override
1181   public DataInput getGeneralBloomFilterMetadata() throws IOException {
1182     return this.getBloomFilterMetadata(BlockType.GENERAL_BLOOM_META);
1183   }
1184 
1185   @Override
1186   public DataInput getDeleteBloomFilterMetadata() throws IOException {
1187     return this.getBloomFilterMetadata(BlockType.DELETE_FAMILY_BLOOM_META);
1188   }
1189 
1190   private DataInput getBloomFilterMetadata(BlockType blockType)
1191   throws IOException {
1192     if (blockType != BlockType.GENERAL_BLOOM_META &&
1193         blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
1194       throw new RuntimeException("Block Type: " + blockType.toString() +
1195           " is not supported") ;
1196     }
1197 
1198     for (HFileBlock b : loadOnOpenBlocks)
1199       if (b.getBlockType() == blockType)
1200         return b.getByteStream();
1201     return null;
1202   }
1203 
1204   @Override
1205   public boolean isFileInfoLoaded() {
1206     return true; // We load file info in constructor in version 2.
1207   }
1208 
1209   /**
1210    * Validates that the minor version is within acceptable limits.
1211    * Otherwise throws an Runtime exception
1212    */
1213   private void validateMinorVersion(Path path, int minorVersion) {
1214     if (minorVersion < MIN_MINOR_VERSION ||
1215         minorVersion > MAX_MINOR_VERSION) {
1216       String msg = "Minor version for path " + path + 
1217                    " is expected to be between " +
1218                    MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION +
1219                    " but is found to be " + minorVersion;
1220       LOG.error(msg);
1221       throw new RuntimeException(msg);
1222     }
1223   }
1224 
1225   @Override
1226   public int getMajorVersion() {
1227     return 2;
1228   }
1229 
1230   @Override
1231   public HFileContext getFileContext() {
1232     return hfileContext;
1233   }
1234 
1235   /**
1236    * Returns false if block prefetching was requested for this file and has
1237    * not completed, true otherwise
1238    */
1239   @VisibleForTesting
1240   boolean prefetchComplete() {
1241     return PrefetchExecutor.isCompleted(path);
1242   }
1243 }