View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
22  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
23  import static org.junit.Assert.assertEquals;
24  import static org.junit.Assert.assertTrue;
25  import static org.junit.Assert.fail;
26  
27  import java.io.ByteArrayOutputStream;
28  import java.io.DataOutputStream;
29  import java.io.IOException;
30  import java.io.OutputStream;
31  import java.nio.ByteBuffer;
32  import java.util.ArrayList;
33  import java.util.Collection;
34  import java.util.List;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.fs.FSDataInputStream;
39  import org.apache.hadoop.fs.FSDataOutputStream;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.SmallTests;
44  import org.apache.hadoop.hbase.fs.HFileSystem;
45  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
46  import org.apache.hadoop.hbase.io.compress.Compression;
47  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
48  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
49  import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
50  import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
51  import org.apache.hadoop.hbase.util.Bytes;
52  import org.apache.hadoop.hbase.util.ChecksumType;
53  import org.apache.hadoop.io.compress.Compressor;
54  import org.junit.Before;
55  import org.junit.Test;
56  import org.junit.experimental.categories.Category;
57  import org.junit.runner.RunWith;
58  import org.junit.runners.Parameterized;
59  import org.junit.runners.Parameterized.Parameters;
60  
61  import com.google.common.base.Preconditions;
62  
63  /**
64   * This class has unit tests to prove that older versions of
65   * HFiles (without checksums) are compatible with current readers.
66   */
67  @Category(SmallTests.class)
68  @RunWith(Parameterized.class)
69  public class TestHFileBlockCompatibility {
70    // change this value to activate more logs
71    private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
72  
73    private static final Log LOG = LogFactory.getLog(TestHFileBlockCompatibility.class);
74  
75    private static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
76        NONE, GZ };
77  
78    // The mnior version for pre-checksum files
79    private static int MINOR_VERSION = 0;
80  
81    private static final HBaseTestingUtility TEST_UTIL =
82      new HBaseTestingUtility();
83    private HFileSystem fs;
84    private int uncompressedSizeV1;
85  
86    private final boolean includesMemstoreTS;
87    private final boolean includesTag;
88  
89    public TestHFileBlockCompatibility(boolean includesMemstoreTS, boolean includesTag) {
90      this.includesMemstoreTS = includesMemstoreTS;
91      this.includesTag = includesTag;
92    }
93  
94    @Parameters
95    public static Collection<Object[]> parameters() {
96      return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
97    }
98  
99    @Before
100   public void setUp() throws IOException {
101     fs = (HFileSystem)HFileSystem.get(TEST_UTIL.getConfiguration());
102   }
103 
104   public byte[] createTestV1Block(Compression.Algorithm algo)
105       throws IOException {
106     Compressor compressor = algo.getCompressor();
107     ByteArrayOutputStream baos = new ByteArrayOutputStream();
108     OutputStream os = algo.createCompressionStream(baos, compressor, 0);
109     DataOutputStream dos = new DataOutputStream(os);
110     BlockType.META.write(dos); // Let's make this a meta block.
111     TestHFileBlock.writeTestBlockContents(dos);
112     uncompressedSizeV1 = dos.size();
113     dos.flush();
114     algo.returnCompressor(compressor);
115     return baos.toByteArray();
116   }
117 
118   private Writer createTestV2Block(Compression.Algorithm algo)
119       throws IOException {
120     final BlockType blockType = BlockType.DATA;
121     Writer hbw = new Writer(algo, null,
122         includesMemstoreTS, includesTag);
123     DataOutputStream dos = hbw.startWriting(blockType);
124     TestHFileBlock.writeTestBlockContents(dos);
125     // make sure the block is ready by calling hbw.getHeaderAndData()
126     hbw.getHeaderAndData();
127     assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
128     hbw.releaseCompressor();
129     return hbw;
130   }
131 
132  private String createTestBlockStr(Compression.Algorithm algo,
133       int correctLength) throws IOException {
134     Writer hbw = createTestV2Block(algo);
135     byte[] testV2Block = hbw.getHeaderAndData();
136     int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + 9;
137     if (testV2Block.length == correctLength) {
138       // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
139       // variations across operating systems.
140       // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
141       testV2Block[osOffset] = 3;
142     }
143     return Bytes.toStringBinary(testV2Block);
144   }
145 
146   @Test
147   public void testNoCompression() throws IOException {
148     assertEquals(4000, createTestV2Block(NONE).getBlockForCaching().
149         getUncompressedSizeWithoutHeader());
150   }
151 
152   @Test
153   public void testGzipCompression() throws IOException {
154     final String correctTestBlockStr =
155         "DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
156             + "\\xFF\\xFF\\xFF\\xFF"
157             // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
158             + "\\x1F\\x8B"  // gzip magic signature
159             + "\\x08"  // Compression method: 8 = "deflate"
160             + "\\x00"  // Flags
161             + "\\x00\\x00\\x00\\x00"  // mtime
162             + "\\x00"  // XFL (extra flags)
163             // OS (0 = FAT filesystems, 3 = Unix). However, this field
164             // sometimes gets set to 0 on Linux and Mac, so we reset it to 3.
165             + "\\x03"
166             + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
167             + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
168             + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00";
169     final int correctGzipBlockLength = 82;
170 
171     String returnedStr = createTestBlockStr(GZ, correctGzipBlockLength);
172     assertEquals(correctTestBlockStr, returnedStr);
173   }
174 
175   @Test
176   public void testReaderV2() throws IOException {
177     if(includesTag) {
178       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
179     }
180     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
181       for (boolean pread : new boolean[] { false, true }) {
182           LOG.info("testReaderV2: Compression algorithm: " + algo +
183                    ", pread=" + pread);
184         Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
185             + algo);
186         FSDataOutputStream os = fs.create(path);
187         Writer hbw = new Writer(algo, null,
188             includesMemstoreTS, includesTag);
189         long totalSize = 0;
190         for (int blockId = 0; blockId < 2; ++blockId) {
191           DataOutputStream dos = hbw.startWriting(BlockType.DATA);
192           for (int i = 0; i < 1234; ++i)
193             dos.writeInt(i);
194           hbw.writeHeaderAndData(os);
195           totalSize += hbw.getOnDiskSizeWithHeader();
196         }
197         os.close();
198 
199         FSDataInputStream is = fs.open(path);
200         HFileContext meta = new HFileContextBuilder()
201                            .withHBaseCheckSum(false)
202                            .withIncludesMvcc(includesMemstoreTS)
203                            .withIncludesTags(includesTag)
204                            .withCompression(algo)
205                            .build();
206         HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is),
207             totalSize, fs, path, meta);
208         HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
209         is.close();
210 
211         b.sanityCheck();
212         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
213         assertEquals(algo == GZ ? 2173 : 4936,
214                      b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
215         String blockStr = b.toString();
216 
217         if (algo == GZ) {
218           is = fs.open(path);
219           hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is), totalSize, fs, path,
220               meta);
221           b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM +
222                                 b.totalChecksumBytes(), -1, pread);
223           assertEquals(blockStr, b.toString());
224           int wrongCompressedSize = 2172;
225           try {
226             b = hbr.readBlockData(0, wrongCompressedSize
227                 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM, -1, pread);
228             fail("Exception expected");
229           } catch (IOException ex) {
230             String expectedPrefix = "On-disk size without header provided is "
231                 + wrongCompressedSize + ", but block header contains "
232                 + b.getOnDiskSizeWithoutHeader() + ".";
233             assertTrue("Invalid exception message: '" + ex.getMessage()
234                 + "'.\nMessage is expected to start with: '" + expectedPrefix
235                 + "'", ex.getMessage().startsWith(expectedPrefix));
236           }
237           is.close();
238         }
239       }
240     }
241   }
242 
243   /**
244    * Test encoding/decoding data blocks.
245    * @throws IOException a bug or a problem with temporary files.
246    */
247   @Test
248   public void testDataBlockEncoding() throws IOException {
249     if(includesTag) {
250       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
251     }
252     final int numBlocks = 5;
253     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
254       for (boolean pread : new boolean[] { false, true }) {
255         for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
256           LOG.info("testDataBlockEncoding algo " + algo +
257                    " pread = " + pread +
258                    " encoding " + encoding);
259           Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
260               + algo + "_" + encoding.toString());
261           FSDataOutputStream os = fs.create(path);
262           HFileDataBlockEncoder dataBlockEncoder =
263               new HFileDataBlockEncoderImpl(encoding);
264           TestHFileBlockCompatibility.Writer hbw =
265               new TestHFileBlockCompatibility.Writer(algo,
266                   dataBlockEncoder, includesMemstoreTS, includesTag);
267           long totalSize = 0;
268           final List<Integer> encodedSizes = new ArrayList<Integer>();
269           final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
270           for (int blockId = 0; blockId < numBlocks; ++blockId) {
271             DataOutputStream dos = hbw.startWriting(BlockType.DATA);
272             TestHFileBlock.writeEncodedBlock(algo, encoding, dos, encodedSizes,
273                 encodedBlocks, blockId, includesMemstoreTS,
274                 TestHFileBlockCompatibility.Writer.DUMMY_HEADER, includesTag);
275 
276             hbw.writeHeaderAndData(os);
277             totalSize += hbw.getOnDiskSizeWithHeader();
278           }
279           os.close();
280 
281           FSDataInputStream is = fs.open(path);
282           HFileContext meta = new HFileContextBuilder()
283                               .withHBaseCheckSum(false)
284                               .withIncludesMvcc(includesMemstoreTS)
285                               .withIncludesTags(includesTag)
286                               .withCompression(algo)
287                               .build();
288           HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is),
289               totalSize, fs, path, meta);
290           hbr.setDataBlockEncoder(dataBlockEncoder);
291           hbr.setIncludesMemstoreTS(includesMemstoreTS);
292 
293           HFileBlock b;
294           int pos = 0;
295           for (int blockId = 0; blockId < numBlocks; ++blockId) {
296             b = hbr.readBlockData(pos, -1, -1, pread);
297             b.sanityCheck();
298             pos += b.getOnDiskSizeWithHeader();
299 
300             assertEquals((int) encodedSizes.get(blockId),
301                 b.getUncompressedSizeWithoutHeader());
302             ByteBuffer actualBuffer = b.getBufferWithoutHeader();
303             if (encoding != DataBlockEncoding.NONE) {
304               // We expect a two-byte big-endian encoding id.
305               assertEquals(0, actualBuffer.get(0));
306               assertEquals(encoding.getId(), actualBuffer.get(1));
307               actualBuffer.position(2);
308               actualBuffer = actualBuffer.slice();
309             }
310 
311             ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
312             expectedBuffer.rewind();
313 
314             // test if content matches, produce nice message
315             TestHFileBlock.assertBuffersEqual(expectedBuffer, actualBuffer,
316               algo, encoding, pread);
317           }
318           is.close();
319         }
320       }
321     }
322   }
323   /**
324    * This is the version of the HFileBlock.Writer that is used to
325    * create V2 blocks with minor version 0. These blocks do not
326    * have hbase-level checksums. The code is here to test
327    * backward compatibility. The reason we do not inherit from
328    * HFileBlock.Writer is because we never ever want to change the code
329    * in this class but the code in HFileBlock.Writer will continually
330    * evolve.
331    */
332   public static final class Writer {
333 
334     // These constants are as they were in minorVersion 0.
335     private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
336     private static final boolean DONT_FILL_HEADER = HFileBlock.DONT_FILL_HEADER;
337     private static final byte[] DUMMY_HEADER =
338       HFileBlock.DUMMY_HEADER_NO_CHECKSUM;
339 
340     private enum State {
341       INIT,
342       WRITING,
343       BLOCK_READY
344     };
345 
346     /** Writer state. Used to ensure the correct usage protocol. */
347     private State state = State.INIT;
348 
349     /** Compression algorithm for all blocks this instance writes. */
350     private final Compression.Algorithm compressAlgo;
351 
352     /** Data block encoder used for data blocks */
353     private final HFileDataBlockEncoder dataBlockEncoder;
354 
355     private HFileBlockEncodingContext dataBlockEncodingCtx;
356     /** block encoding context for non-data blocks */
357     private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
358 
359     /**
360      * The stream we use to accumulate data in uncompressed format for each
361      * block. We reset this stream at the end of each block and reuse it. The
362      * header is written as the first {@link #HEADER_SIZE} bytes into this
363      * stream.
364      */
365     private ByteArrayOutputStream baosInMemory;
366 
367     /** Compressor, which is also reused between consecutive blocks. */
368     private Compressor compressor;
369 
370     /**
371      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
372      * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
373      * to {@link BlockType#ENCODED_DATA}.
374      */
375     private BlockType blockType;
376 
377     /**
378      * A stream that we write uncompressed bytes to, which compresses them and
379      * writes them to {@link #baosInMemory}.
380      */
381     private DataOutputStream userDataStream;
382 
383     /**
384      * Bytes to be written to the file system, including the header. Compressed
385      * if compression is turned on.
386      */
387     private byte[] onDiskBytesWithHeader;
388 
389     /**
390      * Valid in the READY state. Contains the header and the uncompressed (but
391      * potentially encoded, if this is a data block) bytes, so the length is
392      * {@link #uncompressedSizeWithoutHeader} + {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
393      */
394     private byte[] uncompressedBytesWithHeader;
395 
396     /**
397      * Current block's start offset in the {@link HFile}. Set in
398      * {@link #writeHeaderAndData(FSDataOutputStream)}.
399      */
400     private long startOffset;
401 
402     /**
403      * Offset of previous block by block type. Updated when the next block is
404      * started.
405      */
406     private long[] prevOffsetByType;
407 
408     /** The offset of the previous block of the same type */
409     private long prevOffset;
410 
411     private HFileContext meta;
412 
413     /**
414      * @param compressionAlgorithm compression algorithm to use
415      * @param dataBlockEncoderAlgo data block encoding algorithm to use
416      */
417     public Writer(Compression.Algorithm compressionAlgorithm,
418           HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) {
419       compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm;
420       this.dataBlockEncoder = dataBlockEncoder != null
421           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
422 
423       meta = new HFileContextBuilder()
424               .withHBaseCheckSum(false)
425               .withIncludesMvcc(includesMemstoreTS)
426               .withIncludesTags(includesTag)
427               .withCompression(compressionAlgorithm)
428               .build();
429       defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, DUMMY_HEADER, meta);
430       dataBlockEncodingCtx =
431           this.dataBlockEncoder.newDataBlockEncodingContext(
432               DUMMY_HEADER, meta);
433       baosInMemory = new ByteArrayOutputStream();
434 
435       prevOffsetByType = new long[BlockType.values().length];
436       for (int i = 0; i < prevOffsetByType.length; ++i)
437         prevOffsetByType[i] = -1;
438 
439     }
440 
441     /**
442      * Starts writing into the block. The previous block's data is discarded.
443      *
444      * @return the stream the user can write their data into
445      * @throws IOException
446      */
447     public DataOutputStream startWriting(BlockType newBlockType)
448         throws IOException {
449       if (state == State.BLOCK_READY && startOffset != -1) {
450         // We had a previous block that was written to a stream at a specific
451         // offset. Save that offset as the last offset of a block of that type.
452         prevOffsetByType[blockType.getId()] = startOffset;
453       }
454 
455       startOffset = -1;
456       blockType = newBlockType;
457 
458       baosInMemory.reset();
459       baosInMemory.write(DUMMY_HEADER);
460 
461       state = State.WRITING;
462 
463       // We will compress it later in finishBlock()
464       userDataStream = new DataOutputStream(baosInMemory);
465       return userDataStream;
466     }
467 
468     /**
469      * Returns the stream for the user to write to. The block writer takes care
470      * of handling compression and buffering for caching on write. Can only be
471      * called in the "writing" state.
472      *
473      * @return the data output stream for the user to write to
474      */
475     DataOutputStream getUserDataStream() {
476       expectState(State.WRITING);
477       return userDataStream;
478     }
479 
480     /**
481      * Transitions the block writer from the "writing" state to the "block
482      * ready" state.  Does nothing if a block is already finished.
483      */
484     private void ensureBlockReady() throws IOException {
485       Preconditions.checkState(state != State.INIT,
486           "Unexpected state: " + state);
487 
488       if (state == State.BLOCK_READY)
489         return;
490 
491       // This will set state to BLOCK_READY.
492       finishBlock();
493     }
494 
495     /**
496      * An internal method that flushes the compressing stream (if using
497      * compression), serializes the header, and takes care of the separate
498      * uncompressed stream for caching on write, if applicable. Sets block
499      * write state to "block ready".
500      */
501     private void finishBlock() throws IOException {
502       userDataStream.flush();
503       // This does an array copy, so it is safe to cache this byte array.
504       uncompressedBytesWithHeader = baosInMemory.toByteArray();
505       prevOffset = prevOffsetByType[blockType.getId()];
506 
507       // We need to set state before we can package the block up for
508       // cache-on-write. In a way, the block is ready, but not yet encoded or
509       // compressed.
510       state = State.BLOCK_READY;
511       if (blockType == BlockType.DATA) {
512         encodeDataBlockForDisk();
513       } else {
514         defaultBlockEncodingCtx.compressAfterEncodingWithBlockType(
515             uncompressedBytesWithHeader, blockType);
516         onDiskBytesWithHeader =
517           defaultBlockEncodingCtx.getOnDiskBytesWithHeader();
518       }
519 
520       // put the header for on disk bytes
521       putHeader(onDiskBytesWithHeader, 0,
522           onDiskBytesWithHeader.length,
523           uncompressedBytesWithHeader.length);
524       //set the header for the uncompressed bytes (for cache-on-write)
525       putHeader(uncompressedBytesWithHeader, 0,
526           onDiskBytesWithHeader.length,
527         uncompressedBytesWithHeader.length);
528     }
529 
530     /**
531      * Encodes this block if it is a data block and encoding is turned on in
532      * {@link #dataBlockEncoder}.
533      */
534     private void encodeDataBlockForDisk() throws IOException {
535       // do data block encoding, if data block encoder is set
536       ByteBuffer rawKeyValues =
537           ByteBuffer.wrap(uncompressedBytesWithHeader, HEADER_SIZE,
538               uncompressedBytesWithHeader.length - HEADER_SIZE).slice();
539 
540       //do the encoding
541       dataBlockEncoder.beforeWriteToDisk(rawKeyValues, dataBlockEncodingCtx, blockType);
542 
543       uncompressedBytesWithHeader =
544           dataBlockEncodingCtx.getUncompressedBytesWithHeader();
545       onDiskBytesWithHeader =
546           dataBlockEncodingCtx.getOnDiskBytesWithHeader();
547       blockType = dataBlockEncodingCtx.getBlockType();
548     }
549 
550     /**
551      * Put the header into the given byte array at the given offset.
552      * @param onDiskSize size of the block on disk
553      * @param uncompressedSize size of the block after decompression (but
554      *          before optional data block decoding)
555      */
556     private void putHeader(byte[] dest, int offset, int onDiskSize,
557         int uncompressedSize) {
558       offset = blockType.put(dest, offset);
559       offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE);
560       offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE);
561       Bytes.putLong(dest, offset, prevOffset);
562     }
563 
564     /**
565      * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
566      * the offset of this block so that it can be referenced in the next block
567      * of the same type.
568      *
569      * @param out
570      * @throws IOException
571      */
572     public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
573       long offset = out.getPos();
574       if (startOffset != -1 && offset != startOffset) {
575         throw new IOException("A " + blockType + " block written to a "
576             + "stream twice, first at offset " + startOffset + ", then at "
577             + offset);
578       }
579       startOffset = offset;
580 
581       writeHeaderAndData((DataOutputStream) out);
582     }
583 
584     /**
585      * Writes the header and the compressed data of this block (or uncompressed
586      * data when not using compression) into the given stream. Can be called in
587      * the "writing" state or in the "block ready" state. If called in the
588      * "writing" state, transitions the writer to the "block ready" state.
589      *
590      * @param out the output stream to write the
591      * @throws IOException
592      */
593     private void writeHeaderAndData(DataOutputStream out) throws IOException {
594       ensureBlockReady();
595       out.write(onDiskBytesWithHeader);
596     }
597 
598     /**
599      * Returns the header or the compressed data (or uncompressed data when not
600      * using compression) as a byte array. Can be called in the "writing" state
601      * or in the "block ready" state. If called in the "writing" state,
602      * transitions the writer to the "block ready" state.
603      *
604      * @return header and data as they would be stored on disk in a byte array
605      * @throws IOException
606      */
607     public byte[] getHeaderAndData() throws IOException {
608       ensureBlockReady();
609       return onDiskBytesWithHeader;
610     }
611 
612     /**
613      * Releases the compressor this writer uses to compress blocks into the
614      * compressor pool. Needs to be called before the writer is discarded.
615      */
616     public void releaseCompressor() {
617       if (compressor != null) {
618         compressAlgo.returnCompressor(compressor);
619         compressor = null;
620       }
621     }
622 
623     /**
624      * Returns the on-disk size of the data portion of the block. This is the
625      * compressed size if compression is enabled. Can only be called in the
626      * "block ready" state. Header is not compressed, and its size is not
627      * included in the return value.
628      *
629      * @return the on-disk size of the block, not including the header.
630      */
631     public int getOnDiskSizeWithoutHeader() {
632       expectState(State.BLOCK_READY);
633       return onDiskBytesWithHeader.length - HEADER_SIZE;
634     }
635 
636     /**
637      * Returns the on-disk size of the block. Can only be called in the
638      * "block ready" state.
639      *
640      * @return the on-disk size of the block ready to be written, including the
641      *         header size
642      */
643     public int getOnDiskSizeWithHeader() {
644       expectState(State.BLOCK_READY);
645       return onDiskBytesWithHeader.length;
646     }
647 
648     /**
649      * The uncompressed size of the block data. Does not include header size.
650      */
651     public int getUncompressedSizeWithoutHeader() {
652       expectState(State.BLOCK_READY);
653       return uncompressedBytesWithHeader.length - HEADER_SIZE;
654     }
655 
656     /**
657      * The uncompressed size of the block data, including header size.
658      */
659     public int getUncompressedSizeWithHeader() {
660       expectState(State.BLOCK_READY);
661       return uncompressedBytesWithHeader.length;
662     }
663 
664     /** @return true if a block is being written  */
665     public boolean isWriting() {
666       return state == State.WRITING;
667     }
668 
669     /**
670      * Returns the number of bytes written into the current block so far, or
671      * zero if not writing the block at the moment. Note that this will return
672      * zero in the "block ready" state as well.
673      *
674      * @return the number of bytes written
675      */
676     public int blockSizeWritten() {
677       if (state != State.WRITING)
678         return 0;
679       return userDataStream.size();
680     }
681 
682     /**
683      * Returns the header followed by the uncompressed data, even if using
684      * compression. This is needed for storing uncompressed blocks in the block
685      * cache. Can be called in the "writing" state or the "block ready" state.
686      *
687      * @return uncompressed block bytes for caching on write
688      */
689     private byte[] getUncompressedDataWithHeader() {
690       expectState(State.BLOCK_READY);
691 
692       return uncompressedBytesWithHeader;
693     }
694 
695     private void expectState(State expectedState) {
696       if (state != expectedState) {
697         throw new IllegalStateException("Expected state: " + expectedState +
698             ", actual state: " + state);
699       }
700     }
701 
702     /**
703      * Similar to {@link #getUncompressedBufferWithHeader()} but returns a byte
704      * buffer.
705      *
706      * @return uncompressed block for caching on write in the form of a buffer
707      */
708     public ByteBuffer getUncompressedBufferWithHeader() {
709       byte[] b = getUncompressedDataWithHeader();
710       return ByteBuffer.wrap(b, 0, b.length);
711     }
712 
713     /**
714      * Takes the given {@link BlockWritable} instance, creates a new block of
715      * its appropriate type, writes the writable into this block, and flushes
716      * the block into the output stream. The writer is instructed not to buffer
717      * uncompressed bytes for cache-on-write.
718      *
719      * @param bw the block-writable object to write as a block
720      * @param out the file system output stream
721      * @throws IOException
722      */
723     public void writeBlock(BlockWritable bw, FSDataOutputStream out)
724         throws IOException {
725       bw.writeToBlock(startWriting(bw.getBlockType()));
726       writeHeaderAndData(out);
727     }
728 
729     /**
730      * Creates a new HFileBlock.
731      */
732     public HFileBlock getBlockForCaching() {
733       HFileContext meta = new HFileContextBuilder()
734              .withHBaseCheckSum(false)
735              .withChecksumType(ChecksumType.NULL)
736              .withBytesPerCheckSum(0)
737              .build();
738       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
739           getUncompressedSizeWithoutHeader(), prevOffset,
740           getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset, 
741           getOnDiskSizeWithoutHeader(), meta);
742     }
743   }
744 
745 }
746