1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
22 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
26
27 import java.io.ByteArrayOutputStream;
28 import java.io.DataOutputStream;
29 import java.io.IOException;
30 import java.io.OutputStream;
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.List;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.fs.FSDataInputStream;
39 import org.apache.hadoop.fs.FSDataOutputStream;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.hbase.HBaseTestingUtility;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.SmallTests;
44 import org.apache.hadoop.hbase.fs.HFileSystem;
45 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
46 import org.apache.hadoop.hbase.io.compress.Compression;
47 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
48 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
49 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
50 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
51 import org.apache.hadoop.hbase.util.Bytes;
52 import org.apache.hadoop.hbase.util.ChecksumType;
53 import org.apache.hadoop.io.compress.Compressor;
54 import org.junit.Before;
55 import org.junit.Test;
56 import org.junit.experimental.categories.Category;
57 import org.junit.runner.RunWith;
58 import org.junit.runners.Parameterized;
59 import org.junit.runners.Parameterized.Parameters;
60
61 import com.google.common.base.Preconditions;
62
63
64
65
66
67 @Category(SmallTests.class)
68 @RunWith(Parameterized.class)
69 public class TestHFileBlockCompatibility {
70
71 private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
72
73 private static final Log LOG = LogFactory.getLog(TestHFileBlockCompatibility.class);
74
75 private static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
76 NONE, GZ };
77
78
79 private static int MINOR_VERSION = 0;
80
81 private static final HBaseTestingUtility TEST_UTIL =
82 new HBaseTestingUtility();
83 private HFileSystem fs;
84 private int uncompressedSizeV1;
85
86 private final boolean includesMemstoreTS;
87 private final boolean includesTag;
88
89 public TestHFileBlockCompatibility(boolean includesMemstoreTS, boolean includesTag) {
90 this.includesMemstoreTS = includesMemstoreTS;
91 this.includesTag = includesTag;
92 }
93
94 @Parameters
95 public static Collection<Object[]> parameters() {
96 return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
97 }
98
99 @Before
100 public void setUp() throws IOException {
101 fs = (HFileSystem)HFileSystem.get(TEST_UTIL.getConfiguration());
102 }
103
104 public byte[] createTestV1Block(Compression.Algorithm algo)
105 throws IOException {
106 Compressor compressor = algo.getCompressor();
107 ByteArrayOutputStream baos = new ByteArrayOutputStream();
108 OutputStream os = algo.createCompressionStream(baos, compressor, 0);
109 DataOutputStream dos = new DataOutputStream(os);
110 BlockType.META.write(dos);
111 TestHFileBlock.writeTestBlockContents(dos);
112 uncompressedSizeV1 = dos.size();
113 dos.flush();
114 algo.returnCompressor(compressor);
115 return baos.toByteArray();
116 }
117
118 private Writer createTestV2Block(Compression.Algorithm algo)
119 throws IOException {
120 final BlockType blockType = BlockType.DATA;
121 Writer hbw = new Writer(algo, null,
122 includesMemstoreTS, includesTag);
123 DataOutputStream dos = hbw.startWriting(blockType);
124 TestHFileBlock.writeTestBlockContents(dos);
125
126 hbw.getHeaderAndData();
127 assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
128 hbw.releaseCompressor();
129 return hbw;
130 }
131
132 private String createTestBlockStr(Compression.Algorithm algo,
133 int correctLength) throws IOException {
134 Writer hbw = createTestV2Block(algo);
135 byte[] testV2Block = hbw.getHeaderAndData();
136 int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + 9;
137 if (testV2Block.length == correctLength) {
138
139
140
141 testV2Block[osOffset] = 3;
142 }
143 return Bytes.toStringBinary(testV2Block);
144 }
145
146 @Test
147 public void testNoCompression() throws IOException {
148 assertEquals(4000, createTestV2Block(NONE).getBlockForCaching().
149 getUncompressedSizeWithoutHeader());
150 }
151
152 @Test
153 public void testGzipCompression() throws IOException {
154 final String correctTestBlockStr =
155 "DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
156 + "\\xFF\\xFF\\xFF\\xFF"
157
158 + "\\x1F\\x8B"
159 + "\\x08"
160 + "\\x00"
161 + "\\x00\\x00\\x00\\x00"
162 + "\\x00"
163
164
165 + "\\x03"
166 + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
167 + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
168 + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00";
169 final int correctGzipBlockLength = 82;
170
171 String returnedStr = createTestBlockStr(GZ, correctGzipBlockLength);
172 assertEquals(correctTestBlockStr, returnedStr);
173 }
174
175 @Test
176 public void testReaderV2() throws IOException {
177 if(includesTag) {
178 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
179 }
180 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
181 for (boolean pread : new boolean[] { false, true }) {
182 LOG.info("testReaderV2: Compression algorithm: " + algo +
183 ", pread=" + pread);
184 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
185 + algo);
186 FSDataOutputStream os = fs.create(path);
187 Writer hbw = new Writer(algo, null,
188 includesMemstoreTS, includesTag);
189 long totalSize = 0;
190 for (int blockId = 0; blockId < 2; ++blockId) {
191 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
192 for (int i = 0; i < 1234; ++i)
193 dos.writeInt(i);
194 hbw.writeHeaderAndData(os);
195 totalSize += hbw.getOnDiskSizeWithHeader();
196 }
197 os.close();
198
199 FSDataInputStream is = fs.open(path);
200 HFileContext meta = new HFileContextBuilder()
201 .withHBaseCheckSum(false)
202 .withIncludesMvcc(includesMemstoreTS)
203 .withIncludesTags(includesTag)
204 .withCompression(algo)
205 .build();
206 HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is),
207 totalSize, fs, path, meta);
208 HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
209 is.close();
210
211 b.sanityCheck();
212 assertEquals(4936, b.getUncompressedSizeWithoutHeader());
213 assertEquals(algo == GZ ? 2173 : 4936,
214 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
215 String blockStr = b.toString();
216
217 if (algo == GZ) {
218 is = fs.open(path);
219 hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is), totalSize, fs, path,
220 meta);
221 b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM +
222 b.totalChecksumBytes(), -1, pread);
223 assertEquals(blockStr, b.toString());
224 int wrongCompressedSize = 2172;
225 try {
226 b = hbr.readBlockData(0, wrongCompressedSize
227 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM, -1, pread);
228 fail("Exception expected");
229 } catch (IOException ex) {
230 String expectedPrefix = "On-disk size without header provided is "
231 + wrongCompressedSize + ", but block header contains "
232 + b.getOnDiskSizeWithoutHeader() + ".";
233 assertTrue("Invalid exception message: '" + ex.getMessage()
234 + "'.\nMessage is expected to start with: '" + expectedPrefix
235 + "'", ex.getMessage().startsWith(expectedPrefix));
236 }
237 is.close();
238 }
239 }
240 }
241 }
242
243
244
245
246
247 @Test
248 public void testDataBlockEncoding() throws IOException {
249 if(includesTag) {
250 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
251 }
252 final int numBlocks = 5;
253 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
254 for (boolean pread : new boolean[] { false, true }) {
255 for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
256 LOG.info("testDataBlockEncoding algo " + algo +
257 " pread = " + pread +
258 " encoding " + encoding);
259 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
260 + algo + "_" + encoding.toString());
261 FSDataOutputStream os = fs.create(path);
262 HFileDataBlockEncoder dataBlockEncoder =
263 new HFileDataBlockEncoderImpl(encoding);
264 TestHFileBlockCompatibility.Writer hbw =
265 new TestHFileBlockCompatibility.Writer(algo,
266 dataBlockEncoder, includesMemstoreTS, includesTag);
267 long totalSize = 0;
268 final List<Integer> encodedSizes = new ArrayList<Integer>();
269 final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
270 for (int blockId = 0; blockId < numBlocks; ++blockId) {
271 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
272 TestHFileBlock.writeEncodedBlock(algo, encoding, dos, encodedSizes,
273 encodedBlocks, blockId, includesMemstoreTS,
274 TestHFileBlockCompatibility.Writer.DUMMY_HEADER, includesTag);
275
276 hbw.writeHeaderAndData(os);
277 totalSize += hbw.getOnDiskSizeWithHeader();
278 }
279 os.close();
280
281 FSDataInputStream is = fs.open(path);
282 HFileContext meta = new HFileContextBuilder()
283 .withHBaseCheckSum(false)
284 .withIncludesMvcc(includesMemstoreTS)
285 .withIncludesTags(includesTag)
286 .withCompression(algo)
287 .build();
288 HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is),
289 totalSize, fs, path, meta);
290 hbr.setDataBlockEncoder(dataBlockEncoder);
291 hbr.setIncludesMemstoreTS(includesMemstoreTS);
292
293 HFileBlock b;
294 int pos = 0;
295 for (int blockId = 0; blockId < numBlocks; ++blockId) {
296 b = hbr.readBlockData(pos, -1, -1, pread);
297 b.sanityCheck();
298 pos += b.getOnDiskSizeWithHeader();
299
300 assertEquals((int) encodedSizes.get(blockId),
301 b.getUncompressedSizeWithoutHeader());
302 ByteBuffer actualBuffer = b.getBufferWithoutHeader();
303 if (encoding != DataBlockEncoding.NONE) {
304
305 assertEquals(0, actualBuffer.get(0));
306 assertEquals(encoding.getId(), actualBuffer.get(1));
307 actualBuffer.position(2);
308 actualBuffer = actualBuffer.slice();
309 }
310
311 ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
312 expectedBuffer.rewind();
313
314
315 TestHFileBlock.assertBuffersEqual(expectedBuffer, actualBuffer,
316 algo, encoding, pread);
317 }
318 is.close();
319 }
320 }
321 }
322 }
323
324
325
326
327
328
329
330
331
332 public static final class Writer {
333
334
335 private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
336 private static final boolean DONT_FILL_HEADER = HFileBlock.DONT_FILL_HEADER;
337 private static final byte[] DUMMY_HEADER =
338 HFileBlock.DUMMY_HEADER_NO_CHECKSUM;
339
340 private enum State {
341 INIT,
342 WRITING,
343 BLOCK_READY
344 };
345
346
347 private State state = State.INIT;
348
349
350 private final Compression.Algorithm compressAlgo;
351
352
353 private final HFileDataBlockEncoder dataBlockEncoder;
354
355 private HFileBlockEncodingContext dataBlockEncodingCtx;
356
357 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
358
359
360
361
362
363
364
365 private ByteArrayOutputStream baosInMemory;
366
367
368 private Compressor compressor;
369
370
371
372
373
374
375 private BlockType blockType;
376
377
378
379
380
381 private DataOutputStream userDataStream;
382
383
384
385
386
387 private byte[] onDiskBytesWithHeader;
388
389
390
391
392
393
394 private byte[] uncompressedBytesWithHeader;
395
396
397
398
399
400 private long startOffset;
401
402
403
404
405
406 private long[] prevOffsetByType;
407
408
409 private long prevOffset;
410
411 private HFileContext meta;
412
413
414
415
416
417 public Writer(Compression.Algorithm compressionAlgorithm,
418 HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) {
419 compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm;
420 this.dataBlockEncoder = dataBlockEncoder != null
421 ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
422
423 meta = new HFileContextBuilder()
424 .withHBaseCheckSum(false)
425 .withIncludesMvcc(includesMemstoreTS)
426 .withIncludesTags(includesTag)
427 .withCompression(compressionAlgorithm)
428 .build();
429 defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, DUMMY_HEADER, meta);
430 dataBlockEncodingCtx =
431 this.dataBlockEncoder.newDataBlockEncodingContext(
432 DUMMY_HEADER, meta);
433 baosInMemory = new ByteArrayOutputStream();
434
435 prevOffsetByType = new long[BlockType.values().length];
436 for (int i = 0; i < prevOffsetByType.length; ++i)
437 prevOffsetByType[i] = -1;
438
439 }
440
441
442
443
444
445
446
447 public DataOutputStream startWriting(BlockType newBlockType)
448 throws IOException {
449 if (state == State.BLOCK_READY && startOffset != -1) {
450
451
452 prevOffsetByType[blockType.getId()] = startOffset;
453 }
454
455 startOffset = -1;
456 blockType = newBlockType;
457
458 baosInMemory.reset();
459 baosInMemory.write(DUMMY_HEADER);
460
461 state = State.WRITING;
462
463
464 userDataStream = new DataOutputStream(baosInMemory);
465 return userDataStream;
466 }
467
468
469
470
471
472
473
474
475 DataOutputStream getUserDataStream() {
476 expectState(State.WRITING);
477 return userDataStream;
478 }
479
480
481
482
483
484 private void ensureBlockReady() throws IOException {
485 Preconditions.checkState(state != State.INIT,
486 "Unexpected state: " + state);
487
488 if (state == State.BLOCK_READY)
489 return;
490
491
492 finishBlock();
493 }
494
495
496
497
498
499
500
501 private void finishBlock() throws IOException {
502 userDataStream.flush();
503
504 uncompressedBytesWithHeader = baosInMemory.toByteArray();
505 prevOffset = prevOffsetByType[blockType.getId()];
506
507
508
509
510 state = State.BLOCK_READY;
511 if (blockType == BlockType.DATA) {
512 encodeDataBlockForDisk();
513 } else {
514 defaultBlockEncodingCtx.compressAfterEncodingWithBlockType(
515 uncompressedBytesWithHeader, blockType);
516 onDiskBytesWithHeader =
517 defaultBlockEncodingCtx.getOnDiskBytesWithHeader();
518 }
519
520
521 putHeader(onDiskBytesWithHeader, 0,
522 onDiskBytesWithHeader.length,
523 uncompressedBytesWithHeader.length);
524
525 putHeader(uncompressedBytesWithHeader, 0,
526 onDiskBytesWithHeader.length,
527 uncompressedBytesWithHeader.length);
528 }
529
530
531
532
533
534 private void encodeDataBlockForDisk() throws IOException {
535
536 ByteBuffer rawKeyValues =
537 ByteBuffer.wrap(uncompressedBytesWithHeader, HEADER_SIZE,
538 uncompressedBytesWithHeader.length - HEADER_SIZE).slice();
539
540
541 dataBlockEncoder.beforeWriteToDisk(rawKeyValues, dataBlockEncodingCtx, blockType);
542
543 uncompressedBytesWithHeader =
544 dataBlockEncodingCtx.getUncompressedBytesWithHeader();
545 onDiskBytesWithHeader =
546 dataBlockEncodingCtx.getOnDiskBytesWithHeader();
547 blockType = dataBlockEncodingCtx.getBlockType();
548 }
549
550
551
552
553
554
555
556 private void putHeader(byte[] dest, int offset, int onDiskSize,
557 int uncompressedSize) {
558 offset = blockType.put(dest, offset);
559 offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE);
560 offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE);
561 Bytes.putLong(dest, offset, prevOffset);
562 }
563
564
565
566
567
568
569
570
571
572 public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
573 long offset = out.getPos();
574 if (startOffset != -1 && offset != startOffset) {
575 throw new IOException("A " + blockType + " block written to a "
576 + "stream twice, first at offset " + startOffset + ", then at "
577 + offset);
578 }
579 startOffset = offset;
580
581 writeHeaderAndData((DataOutputStream) out);
582 }
583
584
585
586
587
588
589
590
591
592
593 private void writeHeaderAndData(DataOutputStream out) throws IOException {
594 ensureBlockReady();
595 out.write(onDiskBytesWithHeader);
596 }
597
598
599
600
601
602
603
604
605
606
607 public byte[] getHeaderAndData() throws IOException {
608 ensureBlockReady();
609 return onDiskBytesWithHeader;
610 }
611
612
613
614
615
616 public void releaseCompressor() {
617 if (compressor != null) {
618 compressAlgo.returnCompressor(compressor);
619 compressor = null;
620 }
621 }
622
623
624
625
626
627
628
629
630
631 public int getOnDiskSizeWithoutHeader() {
632 expectState(State.BLOCK_READY);
633 return onDiskBytesWithHeader.length - HEADER_SIZE;
634 }
635
636
637
638
639
640
641
642
643 public int getOnDiskSizeWithHeader() {
644 expectState(State.BLOCK_READY);
645 return onDiskBytesWithHeader.length;
646 }
647
648
649
650
651 public int getUncompressedSizeWithoutHeader() {
652 expectState(State.BLOCK_READY);
653 return uncompressedBytesWithHeader.length - HEADER_SIZE;
654 }
655
656
657
658
659 public int getUncompressedSizeWithHeader() {
660 expectState(State.BLOCK_READY);
661 return uncompressedBytesWithHeader.length;
662 }
663
664
665 public boolean isWriting() {
666 return state == State.WRITING;
667 }
668
669
670
671
672
673
674
675
676 public int blockSizeWritten() {
677 if (state != State.WRITING)
678 return 0;
679 return userDataStream.size();
680 }
681
682
683
684
685
686
687
688
689 private byte[] getUncompressedDataWithHeader() {
690 expectState(State.BLOCK_READY);
691
692 return uncompressedBytesWithHeader;
693 }
694
695 private void expectState(State expectedState) {
696 if (state != expectedState) {
697 throw new IllegalStateException("Expected state: " + expectedState +
698 ", actual state: " + state);
699 }
700 }
701
702
703
704
705
706
707
708 public ByteBuffer getUncompressedBufferWithHeader() {
709 byte[] b = getUncompressedDataWithHeader();
710 return ByteBuffer.wrap(b, 0, b.length);
711 }
712
713
714
715
716
717
718
719
720
721
722
723 public void writeBlock(BlockWritable bw, FSDataOutputStream out)
724 throws IOException {
725 bw.writeToBlock(startWriting(bw.getBlockType()));
726 writeHeaderAndData(out);
727 }
728
729
730
731
732 public HFileBlock getBlockForCaching() {
733 HFileContext meta = new HFileContextBuilder()
734 .withHBaseCheckSum(false)
735 .withChecksumType(ChecksumType.NULL)
736 .withBytesPerCheckSum(0)
737 .build();
738 return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
739 getUncompressedSizeWithoutHeader(), prevOffset,
740 getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset,
741 getOnDiskSizeWithoutHeader(), meta);
742 }
743 }
744
745 }
746