View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
22  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
23  import static org.junit.Assert.assertEquals;
24  import static org.junit.Assert.assertTrue;
25  import static org.junit.Assert.fail;
26  
27  import java.io.ByteArrayOutputStream;
28  import java.io.DataOutputStream;
29  import java.io.IOException;
30  import java.io.OutputStream;
31  import java.nio.ByteBuffer;
32  import java.util.ArrayList;
33  import java.util.Collection;
34  import java.util.Collections;
35  import java.util.HashMap;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Random;
39  import java.util.concurrent.Callable;
40  import java.util.concurrent.ExecutionException;
41  import java.util.concurrent.Executor;
42  import java.util.concurrent.ExecutorCompletionService;
43  import java.util.concurrent.Executors;
44  import java.util.concurrent.Future;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.fs.FSDataInputStream;
49  import org.apache.hadoop.fs.FSDataOutputStream;
50  import org.apache.hadoop.fs.FileSystem;
51  import org.apache.hadoop.fs.Path;
52  import org.apache.hadoop.hbase.HBaseTestingUtility;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.KeyValue;
55  import org.apache.hadoop.hbase.MediumTests;
56  import org.apache.hadoop.hbase.Tag;
57  import org.apache.hadoop.hbase.fs.HFileSystem;
58  import org.apache.hadoop.hbase.io.DoubleOutputStream;
59  import org.apache.hadoop.hbase.io.compress.Compression;
60  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
61  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
62  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
63  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
64  import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
65  import org.apache.hadoop.hbase.util.Bytes;
66  import org.apache.hadoop.hbase.util.ChecksumType;
67  import org.apache.hadoop.hbase.util.ClassSize;
68  import org.apache.hadoop.io.WritableUtils;
69  import org.apache.hadoop.io.compress.Compressor;
70  import org.junit.Before;
71  import org.junit.Test;
72  import org.junit.experimental.categories.Category;
73  import org.junit.runner.RunWith;
74  import org.junit.runners.Parameterized;
75  import org.junit.runners.Parameterized.Parameters;
76  
77  @Category(MediumTests.class)
78  @RunWith(Parameterized.class)
79  public class TestHFileBlock {
80    // change this value to activate more logs
81    private static final boolean detailedLogging = false;
82    private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
83  
84    private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
85  
86    static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
87        NONE, GZ };
88  
89    private static final int NUM_TEST_BLOCKS = 1000;
90    private static final int NUM_READER_THREADS = 26;
91  
92    // Used to generate KeyValues
93    private static int NUM_KEYVALUES = 50;
94    private static int FIELD_LENGTH = 10;
95    private static float CHANCE_TO_REPEAT = 0.6f;
96  
97    private static final HBaseTestingUtility TEST_UTIL =
98      new HBaseTestingUtility();
99    private FileSystem fs;
100   private int uncompressedSizeV1;
101 
102   private final boolean includesMemstoreTS;
103   private final boolean includesTag;
104   public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag) {
105     this.includesMemstoreTS = includesMemstoreTS;
106     this.includesTag = includesTag;
107   }
108 
109   @Parameters
110   public static Collection<Object[]> parameters() {
111     return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
112   }
113 
114   @Before
115   public void setUp() throws IOException {
116     fs = HFileSystem.get(TEST_UTIL.getConfiguration());
117   }
118 
119   static void writeTestBlockContents(DataOutputStream dos) throws IOException {
120     // This compresses really well.
121     for (int i = 0; i < 1000; ++i)
122       dos.writeInt(i / 100);
123   }
124 
125  static int writeTestKeyValues(OutputStream dos, int seed, boolean includesMemstoreTS, boolean useTag)
126       throws IOException {
127     List<KeyValue> keyValues = new ArrayList<KeyValue>();
128     Random randomizer = new Random(42l + seed); // just any fixed number
129 
130     // generate keyValues
131     for (int i = 0; i < NUM_KEYVALUES; ++i) {
132       byte[] row;
133       long timestamp;
134       byte[] family;
135       byte[] qualifier;
136       byte[] value;
137 
138       // generate it or repeat, it should compress well
139       if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
140         row = keyValues.get(randomizer.nextInt(keyValues.size())).getRow();
141       } else {
142         row = new byte[FIELD_LENGTH];
143         randomizer.nextBytes(row);
144       }
145       if (0 == i) {
146         family = new byte[FIELD_LENGTH];
147         randomizer.nextBytes(family);
148       } else {
149         family = keyValues.get(0).getFamily();
150       }
151       if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
152         qualifier = keyValues.get(
153             randomizer.nextInt(keyValues.size())).getQualifier();
154       } else {
155         qualifier = new byte[FIELD_LENGTH];
156         randomizer.nextBytes(qualifier);
157       }
158       if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
159         value = keyValues.get(randomizer.nextInt(keyValues.size())).getValue();
160       } else {
161         value = new byte[FIELD_LENGTH];
162         randomizer.nextBytes(value);
163       }
164       if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
165         timestamp = keyValues.get(
166             randomizer.nextInt(keyValues.size())).getTimestamp();
167       } else {
168         timestamp = randomizer.nextLong();
169       }
170       if (!useTag) {
171         keyValues.add(new KeyValue(row, family, qualifier, timestamp, value));
172       } else {
173         keyValues.add(new KeyValue(row, family, qualifier, timestamp, value, new Tag[] { new Tag(
174             (byte) 1, Bytes.toBytes("myTagVal")) }));
175       }
176     }
177 
178     // sort it and write to stream
179     int totalSize = 0;
180    Collections.sort(keyValues, KeyValue.COMPARATOR);
181     DataOutputStream dataOutputStream = new DataOutputStream(dos);
182 
183     for (KeyValue kv : keyValues) {
184       dataOutputStream.writeInt(kv.getKeyLength());
185       dataOutputStream.writeInt(kv.getValueLength());
186       dataOutputStream.write(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
187       dataOutputStream.write(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
188       // Write the additonal tag into the stream
189       // always write the taglength
190       totalSize += kv.getLength();
191       if (useTag) {
192         dataOutputStream.writeShort(kv.getTagsLengthUnsigned());
193         dataOutputStream.write(kv.getBuffer(), kv.getTagsOffset(), kv.getTagsLengthUnsigned());
194       }
195       if (includesMemstoreTS) {
196         long memstoreTS = randomizer.nextLong();
197         WritableUtils.writeVLong(dataOutputStream, memstoreTS);
198         totalSize += WritableUtils.getVIntSize(memstoreTS);
199       }
200     }
201     return totalSize;
202   }
203 
204   public byte[] createTestV1Block(Compression.Algorithm algo)
205       throws IOException {
206     Compressor compressor = algo.getCompressor();
207     ByteArrayOutputStream baos = new ByteArrayOutputStream();
208     OutputStream os = algo.createCompressionStream(baos, compressor, 0);
209     DataOutputStream dos = new DataOutputStream(os);
210     BlockType.META.write(dos); // Let's make this a meta block.
211     writeTestBlockContents(dos);
212     uncompressedSizeV1 = dos.size();
213     dos.flush();
214     algo.returnCompressor(compressor);
215     return baos.toByteArray();
216   }
217 
218   static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo,
219       boolean includesMemstoreTS, boolean includesTag) throws IOException {
220     final BlockType blockType = BlockType.DATA;
221     HFileContext meta = new HFileContextBuilder()
222                         .withCompression(algo)
223                         .withIncludesMvcc(includesMemstoreTS)
224                         .withIncludesTags(includesTag)
225                         .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
226                         .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
227                         .build();
228     HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
229     DataOutputStream dos = hbw.startWriting(blockType);
230     writeTestBlockContents(dos);
231     dos.flush();
232     byte[] headerAndData = hbw.getHeaderAndDataForTest();
233     assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
234     hbw.release();
235     return hbw;
236   }
237 
238   public String createTestBlockStr(Compression.Algorithm algo,
239       int correctLength, boolean useTag) throws IOException {
240     HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS, useTag);
241     byte[] testV2Block = hbw.getHeaderAndDataForTest();
242     int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE + 9;
243     if (testV2Block.length == correctLength) {
244       // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
245       // variations across operating systems.
246       // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
247       // We only make this change when the compressed block length matches.
248       // Otherwise, there are obviously other inconsistencies.
249       testV2Block[osOffset] = 3;
250     }
251     return Bytes.toStringBinary(testV2Block);
252   }
253 
254   @Test
255   public void testNoCompression() throws IOException {
256     assertEquals(4000, createTestV2Block(NONE, includesMemstoreTS, false).
257                  getBlockForCaching().getUncompressedSizeWithoutHeader());
258   }
259 
260   @Test
261   public void testGzipCompression() throws IOException {
262     final String correctTestBlockStr =
263         "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
264             + "\\xFF\\xFF\\xFF\\xFF"
265             + "\\x01\\x00\\x00@\\x00\\x00\\x00\\x00["
266             // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
267             + "\\x1F\\x8B"  // gzip magic signature
268             + "\\x08"  // Compression method: 8 = "deflate"
269             + "\\x00"  // Flags
270             + "\\x00\\x00\\x00\\x00"  // mtime
271             + "\\x00"  // XFL (extra flags)
272             // OS (0 = FAT filesystems, 3 = Unix). However, this field
273             // sometimes gets set to 0 on Linux and Mac, so we reset it to 3.
274             // This appears to be a difference caused by the availability
275             // (and use) of the native GZ codec.
276             + "\\x03"
277             + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
278             + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
279             + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00"
280             + "\\x00\\x00\\x00\\x00"; //  4 byte checksum (ignored)
281     final int correctGzipBlockLength = 95;
282     final String testBlockStr = createTestBlockStr(GZ, correctGzipBlockLength, false);
283     // We ignore the block checksum because createTestBlockStr can change the
284     // gzip header after the block is produced
285     assertEquals(correctTestBlockStr.substring(0, correctGzipBlockLength - 4),
286       testBlockStr.substring(0, correctGzipBlockLength - 4));
287   }
288 
289   @Test
290   public void testReaderV2() throws IOException {
291     testReaderV2Internals();
292   }
293 
294   protected void testReaderV2Internals() throws IOException {
295     if(includesTag) {
296       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
297     }
298     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
299       for (boolean pread : new boolean[] { false, true }) {
300           LOG.info("testReaderV2: Compression algorithm: " + algo +
301                    ", pread=" + pread);
302         Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
303             + algo);
304         FSDataOutputStream os = fs.create(path);
305         HFileContext meta = new HFileContextBuilder()
306                            .withCompression(algo)
307                            .withIncludesMvcc(includesMemstoreTS)
308                            .withIncludesTags(includesTag)
309                            .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
310                            .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
311                            .build();
312         HFileBlock.Writer hbw = new HFileBlock.Writer(null,
313            meta);
314         long totalSize = 0;
315         for (int blockId = 0; blockId < 2; ++blockId) {
316           DataOutputStream dos = hbw.startWriting(BlockType.DATA);
317           for (int i = 0; i < 1234; ++i)
318             dos.writeInt(i);
319           hbw.writeHeaderAndData(os);
320           totalSize += hbw.getOnDiskSizeWithHeader();
321         }
322         os.close();
323 
324         FSDataInputStream is = fs.open(path);
325         meta = new HFileContextBuilder()
326         .withHBaseCheckSum(true)
327         .withIncludesMvcc(includesMemstoreTS)
328         .withIncludesTags(includesTag)
329         .withCompression(algo).build();
330         HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, totalSize, meta);
331         HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
332         is.close();
333         assertEquals(0, HFile.getChecksumFailuresCount());
334 
335         b.sanityCheck();
336         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
337         assertEquals(algo == GZ ? 2173 : 4936,
338                      b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
339         String blockStr = b.toString();
340 
341         if (algo == GZ) {
342           is = fs.open(path);
343           hbr = new HFileBlock.FSReaderV2(is, totalSize, meta);
344           b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
345                                 b.totalChecksumBytes(), -1, pread);
346           assertEquals(blockStr, b.toString());
347           int wrongCompressedSize = 2172;
348           try {
349             b = hbr.readBlockData(0, wrongCompressedSize
350                 + HConstants.HFILEBLOCK_HEADER_SIZE, -1, pread);
351             fail("Exception expected");
352           } catch (IOException ex) {
353             String expectedPrefix = "On-disk size without header provided is "
354                 + wrongCompressedSize + ", but block header contains "
355                 + b.getOnDiskSizeWithoutHeader() + ".";
356             assertTrue("Invalid exception message: '" + ex.getMessage()
357                 + "'.\nMessage is expected to start with: '" + expectedPrefix
358                 + "'", ex.getMessage().startsWith(expectedPrefix));
359           }
360           is.close();
361         }
362       }
363     }
364   }
365 
366   /**
367    * Test encoding/decoding data blocks.
368    * @throws IOException a bug or a problem with temporary files.
369    */
370   @Test
371   public void testDataBlockEncoding() throws IOException {
372     testInternals();
373   }
374 
375   private void testInternals() throws IOException {
376     final int numBlocks = 5;
377     if(includesTag) {
378       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
379     }
380     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
381       for (boolean pread : new boolean[] { false, true }) {
382         for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
383           Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
384               + algo + "_" + encoding.toString());
385           FSDataOutputStream os = fs.create(path);
386           HFileDataBlockEncoder dataBlockEncoder =
387               new HFileDataBlockEncoderImpl(encoding);
388           HFileContext meta = new HFileContextBuilder()
389                               .withCompression(algo)
390                               .withIncludesMvcc(includesMemstoreTS)
391                               .withIncludesTags(includesTag)
392                               .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
393                               .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
394                               .build();
395           HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder,
396              meta);
397           long totalSize = 0;
398           final List<Integer> encodedSizes = new ArrayList<Integer>();
399           final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
400           for (int blockId = 0; blockId < numBlocks; ++blockId) {
401             DataOutputStream dos = hbw.startWriting(BlockType.DATA);
402             writeEncodedBlock(algo, encoding, dos, encodedSizes, encodedBlocks,
403                 blockId, includesMemstoreTS, HConstants.HFILEBLOCK_DUMMY_HEADER, includesTag);
404             hbw.writeHeaderAndData(os);
405             totalSize += hbw.getOnDiskSizeWithHeader();
406           }
407           os.close();
408 
409           FSDataInputStream is = fs.open(path);
410           meta = new HFileContextBuilder()
411                 .withHBaseCheckSum(true)
412                 .withCompression(algo)
413                 .withIncludesMvcc(includesMemstoreTS)
414                 .withIncludesTags(includesTag)
415                 .build();
416           HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, totalSize, meta);
417           hbr.setDataBlockEncoder(dataBlockEncoder);
418           hbr.setIncludesMemstoreTS(includesMemstoreTS);
419           HFileBlock b;
420           int pos = 0;
421           for (int blockId = 0; blockId < numBlocks; ++blockId) {
422             b = hbr.readBlockData(pos, -1, -1, pread);
423             assertEquals(0, HFile.getChecksumFailuresCount());
424             b.sanityCheck();
425             pos += b.getOnDiskSizeWithHeader();
426             assertEquals((int) encodedSizes.get(blockId),
427                 b.getUncompressedSizeWithoutHeader());
428             ByteBuffer actualBuffer = b.getBufferWithoutHeader();
429             if (encoding != DataBlockEncoding.NONE) {
430               // We expect a two-byte big-endian encoding id.
431               assertEquals(0, actualBuffer.get(0));
432               assertEquals(encoding.getId(), actualBuffer.get(1));
433               actualBuffer.position(2);
434               actualBuffer = actualBuffer.slice();
435             }
436 
437             ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
438             expectedBuffer.rewind();
439 
440             // test if content matches, produce nice message
441             assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding,
442                 pread);
443           }
444           is.close();
445         }
446       }
447     }
448   }
449 
450   static void writeEncodedBlock(Algorithm algo, DataBlockEncoding encoding,
451        DataOutputStream dos, final List<Integer> encodedSizes,
452       final List<ByteBuffer> encodedBlocks, int blockId, 
453       boolean includesMemstoreTS, byte[] dummyHeader, boolean useTag) throws IOException {
454     ByteArrayOutputStream baos = new ByteArrayOutputStream();
455     DoubleOutputStream doubleOutputStream =
456         new DoubleOutputStream(dos, baos);
457     writeTestKeyValues(doubleOutputStream, blockId, includesMemstoreTS, useTag);
458     ByteBuffer rawBuf = ByteBuffer.wrap(baos.toByteArray());
459     rawBuf.rewind();
460 
461     DataBlockEncoder encoder = encoding.getEncoder();
462     int headerLen = dummyHeader.length;
463     byte[] encodedResultWithHeader = null;
464     HFileContext meta = new HFileContextBuilder()
465                         .withCompression(algo)
466                         .withIncludesMvcc(includesMemstoreTS)
467                         .withIncludesTags(useTag)
468                         .build();
469     if (encoder != null) {
470       HFileBlockEncodingContext encodingCtx = encoder.newDataBlockEncodingContext(encoding,
471           dummyHeader, meta);
472       encoder.encodeKeyValues(rawBuf, encodingCtx);
473       encodedResultWithHeader =
474           encodingCtx.getUncompressedBytesWithHeader();
475     } else {
476       HFileBlockDefaultEncodingContext defaultEncodingCtx = new HFileBlockDefaultEncodingContext(
477           encoding, dummyHeader, meta);
478       byte[] rawBufWithHeader =
479           new byte[rawBuf.array().length + headerLen];
480       System.arraycopy(rawBuf.array(), 0, rawBufWithHeader,
481           headerLen, rawBuf.array().length);
482       defaultEncodingCtx.compressAfterEncodingWithBlockType(rawBufWithHeader,
483           BlockType.DATA);
484       encodedResultWithHeader =
485         defaultEncodingCtx.getUncompressedBytesWithHeader();
486     }
487     final int encodedSize =
488         encodedResultWithHeader.length - headerLen;
489     if (encoder != null) {
490       // We need to account for the two-byte encoding algorithm ID that
491       // comes after the 24-byte block header but before encoded KVs.
492       headerLen += DataBlockEncoding.ID_SIZE;
493     }
494     byte[] encodedDataSection =
495         new byte[encodedResultWithHeader.length - headerLen];
496     System.arraycopy(encodedResultWithHeader, headerLen,
497         encodedDataSection, 0, encodedDataSection.length);
498     final ByteBuffer encodedBuf =
499         ByteBuffer.wrap(encodedDataSection);
500     encodedSizes.add(encodedSize);
501     encodedBlocks.add(encodedBuf);
502   }
503 
504   static void assertBuffersEqual(ByteBuffer expectedBuffer,
505       ByteBuffer actualBuffer, Compression.Algorithm compression,
506       DataBlockEncoding encoding, boolean pread) {
507     if (!actualBuffer.equals(expectedBuffer)) {
508       int prefix = 0;
509       int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit());
510       while (prefix < minLimit &&
511           expectedBuffer.get(prefix) == actualBuffer.get(prefix)) {
512         prefix++;
513       }
514 
515       fail(String.format(
516           "Content mismath for compression %s, encoding %s, " +
517           "pread %s, commonPrefix %d, expected %s, got %s",
518           compression, encoding, pread, prefix,
519           nextBytesToStr(expectedBuffer, prefix),
520           nextBytesToStr(actualBuffer, prefix)));
521     }
522   }
523 
524   /**
525    * Convert a few next bytes in the given buffer at the given position to
526    * string. Used for error messages.
527    */
528   private static String nextBytesToStr(ByteBuffer buf, int pos) {
529     int maxBytes = buf.limit() - pos;
530     int numBytes = Math.min(16, maxBytes);
531     return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos,
532         numBytes) + (numBytes < maxBytes ? "..." : "");
533   }
534 
535   @Test
536   public void testPreviousOffset() throws IOException {
537     testPreviousOffsetInternals();
538   }
539 
540   protected void testPreviousOffsetInternals() throws IOException {
541     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
542       for (boolean pread : BOOLEAN_VALUES) {
543         for (boolean cacheOnWrite : BOOLEAN_VALUES) {
544           Random rand = defaultRandom();
545           LOG.info("testPreviousOffset:Compression algorithm: " + algo +
546                    ", pread=" + pread +
547                    ", cacheOnWrite=" + cacheOnWrite);
548           Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset");
549           List<Long> expectedOffsets = new ArrayList<Long>();
550           List<Long> expectedPrevOffsets = new ArrayList<Long>();
551           List<BlockType> expectedTypes = new ArrayList<BlockType>();
552           List<ByteBuffer> expectedContents = cacheOnWrite
553               ? new ArrayList<ByteBuffer>() : null;
554           long totalSize = writeBlocks(rand, algo, path, expectedOffsets,
555               expectedPrevOffsets, expectedTypes, expectedContents);
556 
557           FSDataInputStream is = fs.open(path);
558           HFileContext meta = new HFileContextBuilder()
559                               .withHBaseCheckSum(true)
560                               .withIncludesMvcc(includesMemstoreTS)
561                               .withIncludesTags(includesTag)
562                               .withCompression(algo).build();
563           HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, totalSize, meta);
564           long curOffset = 0;
565           for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
566             if (!pread) {
567               assertEquals(is.getPos(), curOffset + (i == 0 ? 0 :
568                   HConstants.HFILEBLOCK_HEADER_SIZE));
569             }
570 
571             assertEquals(expectedOffsets.get(i).longValue(), curOffset);
572             if (detailedLogging) {
573               LOG.info("Reading block #" + i + " at offset " + curOffset);
574             }
575             HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread);
576             if (detailedLogging) {
577               LOG.info("Block #" + i + ": " + b);
578             }
579             assertEquals("Invalid block #" + i + "'s type:",
580                 expectedTypes.get(i), b.getBlockType());
581             assertEquals("Invalid previous block offset for block " + i
582                 + " of " + "type " + b.getBlockType() + ":",
583                 (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset());
584             b.sanityCheck();
585             assertEquals(curOffset, b.getOffset());
586 
587             // Now re-load this block knowing the on-disk size. This tests a
588             // different branch in the loader.
589             HFileBlock b2 = hbr.readBlockData(curOffset,
590                 b.getOnDiskSizeWithHeader(), -1, pread);
591             b2.sanityCheck();
592 
593             assertEquals(b.getBlockType(), b2.getBlockType());
594             assertEquals(b.getOnDiskSizeWithoutHeader(),
595                 b2.getOnDiskSizeWithoutHeader());
596             assertEquals(b.getOnDiskSizeWithHeader(),
597                 b2.getOnDiskSizeWithHeader());
598             assertEquals(b.getUncompressedSizeWithoutHeader(),
599                 b2.getUncompressedSizeWithoutHeader());
600             assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset());
601             assertEquals(curOffset, b2.getOffset());
602             assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum());
603             assertEquals(b.getOnDiskDataSizeWithHeader(),
604                          b2.getOnDiskDataSizeWithHeader());
605             assertEquals(0, HFile.getChecksumFailuresCount());
606 
607             curOffset += b.getOnDiskSizeWithHeader();
608 
609             if (cacheOnWrite) {
610               // In the cache-on-write mode we store uncompressed bytes so we
611               // can compare them to what was read by the block reader.
612               // b's buffer has header + data + checksum while
613               // expectedContents have header + data only
614               ByteBuffer bufRead = b.getBufferWithHeader();
615               ByteBuffer bufExpected = expectedContents.get(i);
616               boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
617                   bufRead.arrayOffset(),
618                   bufRead.limit() - b.totalChecksumBytes(),
619                   bufExpected.array(), bufExpected.arrayOffset(),
620                   bufExpected.limit()) == 0;
621               String wrongBytesMsg = "";
622 
623               if (!bytesAreCorrect) {
624                 // Optimization: only construct an error message in case we
625                 // will need it.
626                 wrongBytesMsg = "Expected bytes in block #" + i + " (algo="
627                     + algo + ", pread=" + pread
628                     + ", cacheOnWrite=" + cacheOnWrite + "):\n";
629                 wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
630                     bufExpected.arrayOffset(), Math.min(32,
631                         bufExpected.limit()))
632                     + ", actual:\n"
633                     + Bytes.toStringBinary(bufRead.array(),
634                         bufRead.arrayOffset(), Math.min(32, bufRead.limit()));
635                 if (detailedLogging) {
636                   LOG.warn("expected header" +
637                            HFileBlock.toStringHeader(bufExpected) +
638                            "\nfound    header" +
639                            HFileBlock.toStringHeader(bufRead));
640                   LOG.warn("bufread offset " + bufRead.arrayOffset() +
641                            " limit " + bufRead.limit() +
642                            " expected offset " + bufExpected.arrayOffset() +
643                            " limit " + bufExpected.limit());
644                   LOG.warn(wrongBytesMsg);
645                 }
646               }
647               assertTrue(wrongBytesMsg, bytesAreCorrect);
648             }
649           }
650 
651           assertEquals(curOffset, fs.getFileStatus(path).getLen());
652           is.close();
653         }
654       }
655     }
656   }
657 
658   private Random defaultRandom() {
659     return new Random(189237);
660   }
661 
662   private class BlockReaderThread implements Callable<Boolean> {
663     private final String clientId;
664     private final HFileBlock.FSReader hbr;
665     private final List<Long> offsets;
666     private final List<BlockType> types;
667     private final long fileSize;
668 
669     public BlockReaderThread(String clientId,
670         HFileBlock.FSReader hbr, List<Long> offsets, List<BlockType> types,
671         long fileSize) {
672       this.clientId = clientId;
673       this.offsets = offsets;
674       this.hbr = hbr;
675       this.types = types;
676       this.fileSize = fileSize;
677     }
678 
679     @Override
680     public Boolean call() throws Exception {
681       Random rand = new Random(clientId.hashCode());
682       long endTime = System.currentTimeMillis() + 10000;
683       int numBlocksRead = 0;
684       int numPositionalRead = 0;
685       int numWithOnDiskSize = 0;
686       while (System.currentTimeMillis() < endTime) {
687         int blockId = rand.nextInt(NUM_TEST_BLOCKS);
688         long offset = offsets.get(blockId);
689         boolean pread = rand.nextBoolean();
690         boolean withOnDiskSize = rand.nextBoolean();
691         long expectedSize =
692           (blockId == NUM_TEST_BLOCKS - 1 ? fileSize
693               : offsets.get(blockId + 1)) - offset;
694 
695         HFileBlock b;
696         try {
697           long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
698           b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread);
699         } catch (IOException ex) {
700           LOG.error("Error in client " + clientId + " trying to read block at "
701               + offset + ", pread=" + pread + ", withOnDiskSize=" +
702               withOnDiskSize, ex);
703           return false;
704         }
705 
706         assertEquals(types.get(blockId), b.getBlockType());
707         assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
708         assertEquals(offset, b.getOffset());
709 
710         ++numBlocksRead;
711         if (pread)
712           ++numPositionalRead;
713         if (withOnDiskSize)
714           ++numWithOnDiskSize;
715       }
716       LOG.info("Client " + clientId + " successfully read " + numBlocksRead +
717         " blocks (with pread: " + numPositionalRead + ", with onDiskSize " +
718         "specified: " + numWithOnDiskSize + ")");
719 
720       return true;
721     }
722 
723   }
724 
725   @Test
726   public void testConcurrentReading() throws Exception {
727     testConcurrentReadingInternals();
728   }
729 
730   protected void testConcurrentReadingInternals() throws IOException,
731       InterruptedException, ExecutionException {
732     for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
733       Path path =
734           new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
735       Random rand = defaultRandom();
736       List<Long> offsets = new ArrayList<Long>();
737       List<BlockType> types = new ArrayList<BlockType>();
738       writeBlocks(rand, compressAlgo, path, offsets, null, types, null);
739       FSDataInputStream is = fs.open(path);
740       long fileSize = fs.getFileStatus(path).getLen();
741       HFileContext meta = new HFileContextBuilder()
742                           .withHBaseCheckSum(true)
743                           .withIncludesMvcc(includesMemstoreTS)
744                           .withIncludesTags(includesTag)
745                           .withCompression(compressAlgo)
746                           .build();
747       HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, fileSize, meta);
748 
749       Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
750       ExecutorCompletionService<Boolean> ecs =
751           new ExecutorCompletionService<Boolean>(exec);
752 
753       for (int i = 0; i < NUM_READER_THREADS; ++i) {
754         ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr,
755             offsets, types, fileSize));
756       }
757 
758       for (int i = 0; i < NUM_READER_THREADS; ++i) {
759         Future<Boolean> result = ecs.take();
760         assertTrue(result.get());
761         if (detailedLogging) {
762           LOG.info(String.valueOf(i + 1)
763             + " reader threads finished successfully (algo=" + compressAlgo
764             + ")");
765         }
766       }
767 
768       is.close();
769     }
770   }
771 
772   private long writeBlocks(Random rand, Compression.Algorithm compressAlgo,
773       Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets,
774       List<BlockType> expectedTypes, List<ByteBuffer> expectedContents
775   ) throws IOException {
776     boolean cacheOnWrite = expectedContents != null;
777     FSDataOutputStream os = fs.create(path);
778     HFileContext meta = new HFileContextBuilder()
779                         .withHBaseCheckSum(true)
780                         .withIncludesMvcc(includesMemstoreTS)
781                         .withIncludesTags(includesTag)
782                         .withCompression(compressAlgo)
783                         .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
784                         .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
785                         .build();
786     HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
787     Map<BlockType, Long> prevOffsetByType = new HashMap<BlockType, Long>();
788     long totalSize = 0;
789     for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
790       long pos = os.getPos();
791       int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
792       if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
793         blockTypeOrdinal = BlockType.DATA.ordinal();
794       }
795       BlockType bt = BlockType.values()[blockTypeOrdinal];
796       DataOutputStream dos = hbw.startWriting(bt);
797       int size = rand.nextInt(500);
798       for (int j = 0; j < size; ++j) {
799         // This might compress well.
800         dos.writeShort(i + 1);
801         dos.writeInt(j + 1);
802       }
803 
804       if (expectedOffsets != null)
805         expectedOffsets.add(os.getPos());
806 
807       if (expectedPrevOffsets != null) {
808         Long prevOffset = prevOffsetByType.get(bt);
809         expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1);
810         prevOffsetByType.put(bt, os.getPos());
811       }
812 
813       expectedTypes.add(bt);
814 
815       hbw.writeHeaderAndData(os);
816       totalSize += hbw.getOnDiskSizeWithHeader();
817 
818       if (cacheOnWrite)
819         expectedContents.add(hbw.getUncompressedBufferWithHeader());
820 
821       if (detailedLogging) {
822         LOG.info("Written block #" + i + " of type " + bt
823             + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader()
824             + " at offset " + pos);
825       }
826     }
827     os.close();
828     LOG.info("Created a temporary file at " + path + ", "
829         + fs.getFileStatus(path).getLen() + " byte, compression=" +
830         compressAlgo);
831     return totalSize;
832   }
833 
834   @Test
835   public void testBlockHeapSize() {
836     testBlockHeapSizeInternals();
837   }
838 
839   protected void testBlockHeapSizeInternals() {
840     if (ClassSize.is32BitJVM()) {
841       assertTrue(HFileBlock.BYTE_BUFFER_HEAP_SIZE == 64);
842     } else {
843       assertTrue(HFileBlock.BYTE_BUFFER_HEAP_SIZE == 80);
844     }
845 
846     for (int size : new int[] { 100, 256, 12345 }) {
847       byte[] byteArr = new byte[HConstants.HFILEBLOCK_HEADER_SIZE + size];
848       ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
849       HFileContext meta = new HFileContextBuilder()
850                           .withIncludesMvcc(includesMemstoreTS)
851                           .withIncludesTags(includesTag)
852                           .withHBaseCheckSum(false)
853                           .withCompression(Algorithm.NONE)
854                           .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
855                           .withChecksumType(ChecksumType.NULL).build();
856       HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
857           HFileBlock.FILL_HEADER, -1, 
858           0, meta);
859       long byteBufferExpectedSize =
860           ClassSize.align(ClassSize.estimateBase(buf.getClass(), true)
861               + HConstants.HFILEBLOCK_HEADER_SIZE + size);
862       long hfileMetaSize =  ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
863       long hfileBlockExpectedSize =
864           ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
865       long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize;
866       assertEquals("Block data size: " + size + ", byte buffer expected " +
867           "size: " + byteBufferExpectedSize + ", HFileBlock class expected " +
868           "size: " + hfileBlockExpectedSize + ";", expected,
869           block.heapSize());
870     }
871   }
872 
873 
874 }
875