1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io.hfile;
19
20 import java.io.ByteArrayOutputStream;
21 import java.io.DataInputStream;
22 import java.io.DataOutput;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.nio.ByteBuffer;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantLock;
29
30 import org.apache.hadoop.fs.FSDataInputStream;
31 import org.apache.hadoop.fs.FSDataOutputStream;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.hbase.fs.HFileSystem;
37 import org.apache.hadoop.hbase.io.ByteBufferInputStream;
38 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
39 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
40 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
41 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
42 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
43 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
44 import org.apache.hadoop.hbase.util.ByteBufferUtils;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.util.ChecksumType;
47 import org.apache.hadoop.hbase.util.ClassSize;
48 import org.apache.hadoop.io.IOUtils;
49
50 import com.google.common.annotations.VisibleForTesting;
51 import com.google.common.base.Preconditions;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 @InterfaceAudience.Private
86 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="HE_EQUALS_USE_HASHCODE",
87 justification="Fix!!! Fine for now bug FIXXXXXXX!!!!")
88 public class HFileBlock implements Cacheable {
89
90
91
92
93
94
95 static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
96
97 public static final boolean FILL_HEADER = true;
98 public static final boolean DONT_FILL_HEADER = false;
99
100
101
102
103
104 public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
105 + DataBlockEncoding.ID_SIZE;
106
107 static final byte[] DUMMY_HEADER_NO_CHECKSUM =
108 new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
109
110 public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
111 ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
112
113
114 public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
115 + Bytes.SIZEOF_LONG;
116
117
118
119
120 static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
121
122 static final CacheableDeserializer<Cacheable> blockDeserializer =
123 new CacheableDeserializer<Cacheable>() {
124 public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
125 buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
126 ByteBuffer newByteBuffer;
127 if (reuse) {
128 newByteBuffer = buf.slice();
129 } else {
130 newByteBuffer = ByteBuffer.allocate(buf.limit());
131 newByteBuffer.put(buf);
132 }
133 buf.position(buf.limit());
134 buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
135 boolean usesChecksum = buf.get() == (byte)1;
136 HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum);
137 hFileBlock.offset = buf.getLong();
138 hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt();
139 if (hFileBlock.hasNextBlockHeader()) {
140 hFileBlock.buf.limit(hFileBlock.buf.limit() - hFileBlock.headerSize());
141 }
142 return hFileBlock;
143 }
144
145 @Override
146 public int getDeserialiserIdentifier() {
147 return deserializerIdentifier;
148 }
149
150 @Override
151 public HFileBlock deserialize(ByteBuffer b) throws IOException {
152 return deserialize(b, false);
153 }
154 };
155 private static final int deserializerIdentifier;
156 static {
157 deserializerIdentifier = CacheableDeserializerIdManager
158 .registerDeserializer(blockDeserializer);
159 }
160
161
162 private BlockType blockType;
163
164
165 private int onDiskSizeWithoutHeader;
166
167
168 private final int uncompressedSizeWithoutHeader;
169
170
171 private final long prevBlockOffset;
172
173
174
175
176
177 private final int onDiskDataSizeWithHeader;
178
179
180 private ByteBuffer buf;
181
182
183 private HFileContext fileContext;
184
185
186
187
188
189 private long offset = -1;
190
191
192
193
194
195
196 private int nextBlockOnDiskSizeWithHeader = -1;
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214 HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
215 long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
216 int onDiskDataSizeWithHeader, HFileContext fileContext) {
217 this.blockType = blockType;
218 this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
219 this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
220 this.prevBlockOffset = prevBlockOffset;
221 this.buf = buf;
222 this.offset = offset;
223 this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
224 this.fileContext = fileContext;
225 if (fillHeader)
226 overwriteHeader();
227 this.buf.rewind();
228 }
229
230
231
232
233 HFileBlock(HFileBlock that) {
234 this.blockType = that.blockType;
235 this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader;
236 this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader;
237 this.prevBlockOffset = that.prevBlockOffset;
238 this.buf = that.buf.duplicate();
239 this.offset = that.offset;
240 this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader;
241 this.fileContext = that.fileContext;
242 this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader;
243 }
244
245
246
247
248
249
250
251
252
253 HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
254 b.rewind();
255 blockType = BlockType.read(b);
256 onDiskSizeWithoutHeader = b.getInt();
257 uncompressedSizeWithoutHeader = b.getInt();
258 prevBlockOffset = b.getLong();
259 HFileContextBuilder contextBuilder = new HFileContextBuilder();
260 contextBuilder.withHBaseCheckSum(usesHBaseChecksum);
261 if (usesHBaseChecksum) {
262 contextBuilder.withChecksumType(ChecksumType.codeToType(b.get()));
263 contextBuilder.withBytesPerCheckSum(b.getInt());
264 this.onDiskDataSizeWithHeader = b.getInt();
265 } else {
266 contextBuilder.withChecksumType(ChecksumType.NULL);
267 contextBuilder.withBytesPerCheckSum(0);
268 this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
269 HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
270 }
271 this.fileContext = contextBuilder.build();
272 buf = b;
273 buf.rewind();
274 }
275
276 public BlockType getBlockType() {
277 return blockType;
278 }
279
280
281 public short getDataBlockEncodingId() {
282 if (blockType != BlockType.ENCODED_DATA) {
283 throw new IllegalArgumentException("Querying encoder ID of a block " +
284 "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
285 }
286 return buf.getShort(headerSize());
287 }
288
289
290
291
292 public int getOnDiskSizeWithHeader() {
293 return onDiskSizeWithoutHeader + headerSize();
294 }
295
296
297
298
299 public int getOnDiskSizeWithoutHeader() {
300 return onDiskSizeWithoutHeader;
301 }
302
303
304
305
306 public int getUncompressedSizeWithoutHeader() {
307 return uncompressedSizeWithoutHeader;
308 }
309
310
311
312
313
314 public long getPrevBlockOffset() {
315 return prevBlockOffset;
316 }
317
318
319
320
321
322 private void overwriteHeader() {
323 buf.rewind();
324 blockType.write(buf);
325 buf.putInt(onDiskSizeWithoutHeader);
326 buf.putInt(uncompressedSizeWithoutHeader);
327 buf.putLong(prevBlockOffset);
328 if (this.fileContext.isUseHBaseChecksum()) {
329 buf.put(fileContext.getChecksumType().getCode());
330 buf.putInt(fileContext.getBytesPerChecksum());
331 buf.putInt(onDiskDataSizeWithHeader);
332 }
333 }
334
335
336
337
338
339
340 public ByteBuffer getBufferWithoutHeader() {
341 ByteBuffer dup = this.buf.duplicate();
342 dup.position(headerSize());
343 dup.limit(buf.limit() - totalChecksumBytes());
344 return dup.slice();
345 }
346
347
348
349
350
351
352
353
354
355
356
357 public ByteBuffer getBufferReadOnly() {
358 ByteBuffer dup = this.buf.duplicate();
359 dup.limit(buf.limit() - totalChecksumBytes());
360 return dup.slice();
361 }
362
363
364
365
366
367
368
369
370 public ByteBuffer getBufferReadOnlyWithHeader() {
371 ByteBuffer dup = this.buf.duplicate();
372 return dup.slice();
373 }
374
375
376
377
378
379
380
381 ByteBuffer getBufferWithHeader() {
382 ByteBuffer dupBuf = buf.duplicate();
383 dupBuf.rewind();
384 return dupBuf;
385 }
386
387 private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
388 String fieldName) throws IOException {
389 if (valueFromBuf != valueFromField) {
390 throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
391 + ") is different from that in the field (" + valueFromField + ")");
392 }
393 }
394
395 private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
396 throws IOException {
397 if (valueFromBuf != valueFromField) {
398 throw new IOException("Block type stored in the buffer: " +
399 valueFromBuf + ", block type field: " + valueFromField);
400 }
401 }
402
403
404
405
406
407
408
409
410 void sanityCheck() throws IOException {
411 buf.rewind();
412
413 sanityCheckAssertion(BlockType.read(buf), blockType);
414
415 sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
416 "onDiskSizeWithoutHeader");
417
418 sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
419 "uncompressedSizeWithoutHeader");
420
421 sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
422 if (this.fileContext.isUseHBaseChecksum()) {
423 sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
424 sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum");
425 sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
426 }
427
428 int cksumBytes = totalChecksumBytes();
429 int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
430 if (buf.limit() != expectedBufLimit) {
431 throw new AssertionError("Expected buffer limit " + expectedBufLimit
432 + ", got " + buf.limit());
433 }
434
435
436
437 int hdrSize = headerSize();
438 if (buf.capacity() != expectedBufLimit &&
439 buf.capacity() != expectedBufLimit + hdrSize) {
440 throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
441 ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
442 }
443 }
444
445 @Override
446 public String toString() {
447 StringBuilder sb = new StringBuilder()
448 .append("HFileBlock [")
449 .append(" fileOffset=").append(offset)
450 .append(" headerSize()=").append(headerSize())
451 .append(" blockType=").append(blockType)
452 .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
453 .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
454 .append(" prevBlockOffset=").append(prevBlockOffset)
455 .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum());
456 if (fileContext.isUseHBaseChecksum()) {
457 sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
458 .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1))
459 .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
460 } else {
461 sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
462 .append("(").append(onDiskSizeWithoutHeader)
463 .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
464 }
465 String dataBegin = null;
466 if (buf.hasArray()) {
467 dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
468 Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
469 } else {
470 ByteBuffer bufWithoutHeader = getBufferWithoutHeader();
471 byte[] dataBeginBytes = new byte[Math.min(32,
472 bufWithoutHeader.limit() - bufWithoutHeader.position())];
473 bufWithoutHeader.get(dataBeginBytes);
474 dataBegin = Bytes.toStringBinary(dataBeginBytes);
475 }
476 sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
477 .append(" totalChecksumBytes()=").append(totalChecksumBytes())
478 .append(" isUnpacked()=").append(isUnpacked())
479 .append(" buf=[ ").append(buf).append(" ]")
480 .append(" dataBeginsWith=").append(dataBegin)
481 .append(" fileContext=").append(fileContext)
482 .append(" ]");
483 return sb.toString();
484 }
485
486
487
488
489 private void validateOnDiskSizeWithoutHeader(int expectedOnDiskSizeWithoutHeader)
490 throws IOException {
491 if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
492 String dataBegin = null;
493 if (buf.hasArray()) {
494 dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset(), Math.min(32, buf.limit()));
495 } else {
496 ByteBuffer bufDup = getBufferReadOnly();
497 byte[] dataBeginBytes = new byte[Math.min(32, bufDup.limit() - bufDup.position())];
498 bufDup.get(dataBeginBytes);
499 dataBegin = Bytes.toStringBinary(dataBeginBytes);
500 }
501 String blockInfoMsg =
502 "Block offset: " + offset + ", data starts with: " + dataBegin;
503 throw new IOException("On-disk size without header provided is "
504 + expectedOnDiskSizeWithoutHeader + ", but block "
505 + "header contains " + onDiskSizeWithoutHeader + ". " +
506 blockInfoMsg);
507 }
508 }
509
510
511
512
513
514 HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
515 if (!fileContext.isCompressedOrEncrypted()) {
516
517
518
519 return this;
520 }
521
522 HFileBlock unpacked = new HFileBlock(this);
523 unpacked.allocateBuffer();
524
525 HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
526 reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
527
528 ByteBuffer dup = this.buf.duplicate();
529 dup.position(this.headerSize());
530 dup = dup.slice();
531 ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
532 unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
533 dup);
534
535
536 if (unpacked.hasNextBlockHeader()) {
537
538
539
540
541 ByteBuffer inDup = this.buf.duplicate();
542 inDup.limit(inDup.limit() + headerSize());
543 ByteBuffer outDup = unpacked.buf.duplicate();
544 outDup.limit(outDup.limit() + unpacked.headerSize());
545 ByteBufferUtils.copyFromBufferToBuffer(
546 outDup,
547 inDup,
548 this.onDiskDataSizeWithHeader,
549 unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader
550 + unpacked.totalChecksumBytes(), unpacked.headerSize());
551 }
552 return unpacked;
553 }
554
555
556
557
558 private boolean hasNextBlockHeader() {
559 return nextBlockOnDiskSizeWithHeader > 0;
560 }
561
562
563
564
565
566
567 private void allocateBuffer() {
568 int cksumBytes = totalChecksumBytes();
569 int headerSize = headerSize();
570 int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
571 cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
572
573
574 ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
575
576
577
578 ByteBuffer dup = buf.duplicate();
579 dup.position(0);
580 dup.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
581
582 buf = newBuf;
583
584 buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
585 }
586
587
588
589
590
591 public boolean isUnpacked() {
592 final int cksumBytes = totalChecksumBytes();
593 final int headerSize = headerSize();
594 final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
595 final int bufCapacity = buf.capacity();
596 return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
597 }
598
599
600 public void assumeUncompressed() throws IOException {
601 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
602 totalChecksumBytes()) {
603 throw new IOException("Using no compression but "
604 + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
605 + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
606 + ", numChecksumbytes=" + totalChecksumBytes());
607 }
608 }
609
610
611
612
613
614 public void expectType(BlockType expectedType) throws IOException {
615 if (blockType != expectedType) {
616 throw new IOException("Invalid block type: expected=" + expectedType
617 + ", actual=" + blockType);
618 }
619 }
620
621
622 public long getOffset() {
623 if (offset < 0) {
624 throw new IllegalStateException(
625 "HFile block offset not initialized properly");
626 }
627 return offset;
628 }
629
630
631
632
633 public DataInputStream getByteStream() {
634 ByteBuffer dup = this.buf.duplicate();
635 dup.position(this.headerSize());
636 return new DataInputStream(new ByteBufferInputStream(dup));
637 }
638
639 @Override
640 public long heapSize() {
641 long size = ClassSize.align(
642 ClassSize.OBJECT +
643
644 3 * ClassSize.REFERENCE +
645
646
647 4 * Bytes.SIZEOF_INT +
648
649 2 * Bytes.SIZEOF_LONG +
650
651 fileContext.heapSize()
652 );
653
654 if (buf != null) {
655
656 size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE);
657 }
658
659 return ClassSize.align(size);
660 }
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677 public static boolean readWithExtra(InputStream in, byte[] buf,
678 int bufOffset, int necessaryLen, int extraLen) throws IOException {
679 int bytesRemaining = necessaryLen + extraLen;
680 while (bytesRemaining > 0) {
681 int ret = in.read(buf, bufOffset, bytesRemaining);
682 if (ret == -1 && bytesRemaining <= extraLen) {
683
684 break;
685 }
686
687 if (ret < 0) {
688 throw new IOException("Premature EOF from inputStream (read "
689 + "returned " + ret + ", was trying to read " + necessaryLen
690 + " necessary bytes and " + extraLen + " extra bytes, "
691 + "successfully read "
692 + (necessaryLen + extraLen - bytesRemaining));
693 }
694 bufOffset += ret;
695 bytesRemaining -= ret;
696 }
697 return bytesRemaining <= 0;
698 }
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717 @VisibleForTesting
718 static boolean positionalReadWithExtra(FSDataInputStream in,
719 long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
720 throws IOException {
721 int bytesRemaining = necessaryLen + extraLen;
722 int bytesRead = 0;
723 while (bytesRead < necessaryLen) {
724 int ret = in.read(position, buf, bufOffset, bytesRemaining);
725 if (ret < 0) {
726 throw new IOException("Premature EOF from inputStream (positional read "
727 + "returned " + ret + ", was trying to read " + necessaryLen
728 + " necessary bytes and " + extraLen + " extra bytes, "
729 + "successfully read " + bytesRead);
730 }
731 position += ret;
732 bufOffset += ret;
733 bytesRemaining -= ret;
734 bytesRead += ret;
735 }
736 return bytesRead != necessaryLen && bytesRemaining <= 0;
737 }
738
739
740
741
742
743 public int getNextBlockOnDiskSizeWithHeader() {
744 return nextBlockOnDiskSizeWithHeader;
745 }
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760 public static class Writer {
761
762 private enum State {
763 INIT,
764 WRITING,
765 BLOCK_READY
766 };
767
768
769 private State state = State.INIT;
770
771
772 private final HFileDataBlockEncoder dataBlockEncoder;
773
774 private HFileBlockEncodingContext dataBlockEncodingCtx;
775
776
777 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
778
779
780
781
782
783
784
785 private ByteArrayOutputStream baosInMemory;
786
787
788
789
790
791
792 private BlockType blockType;
793
794
795
796
797
798 private DataOutputStream userDataStream;
799
800
801
802 private int unencodedDataSizeWritten;
803
804
805
806
807
808
809 private byte[] onDiskBytesWithHeader;
810
811
812
813
814
815
816
817 private byte[] onDiskChecksum;
818
819
820
821
822
823
824
825
826 private byte[] uncompressedBytesWithHeader;
827
828
829
830
831
832 private long startOffset;
833
834
835
836
837
838 private long[] prevOffsetByType;
839
840
841 private long prevOffset;
842
843 private HFileContext fileContext;
844
845
846
847
848 public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
849 this.dataBlockEncoder = dataBlockEncoder != null
850 ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
851 defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
852 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
853 dataBlockEncodingCtx = this.dataBlockEncoder
854 .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
855
856 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
857 throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
858 " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
859 fileContext.getBytesPerChecksum());
860 }
861
862 baosInMemory = new ByteArrayOutputStream();
863
864 prevOffsetByType = new long[BlockType.values().length];
865 for (int i = 0; i < prevOffsetByType.length; ++i)
866 prevOffsetByType[i] = -1;
867
868 this.fileContext = fileContext;
869 }
870
871
872
873
874
875
876
877 public DataOutputStream startWriting(BlockType newBlockType)
878 throws IOException {
879 if (state == State.BLOCK_READY && startOffset != -1) {
880
881
882 prevOffsetByType[blockType.getId()] = startOffset;
883 }
884
885 startOffset = -1;
886 blockType = newBlockType;
887
888 baosInMemory.reset();
889 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
890
891 state = State.WRITING;
892
893
894 userDataStream = new DataOutputStream(baosInMemory);
895 if (newBlockType == BlockType.DATA) {
896 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
897 }
898 this.unencodedDataSizeWritten = 0;
899 return userDataStream;
900 }
901
902
903
904
905
906
907 public void write(Cell cell) throws IOException{
908 expectState(State.WRITING);
909 this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx,
910 this.userDataStream);
911 }
912
913
914
915
916
917
918
919
920 DataOutputStream getUserDataStream() {
921 expectState(State.WRITING);
922 return userDataStream;
923 }
924
925
926
927
928
929 void ensureBlockReady() throws IOException {
930 Preconditions.checkState(state != State.INIT,
931 "Unexpected state: " + state);
932
933 if (state == State.BLOCK_READY)
934 return;
935
936
937 finishBlock();
938 }
939
940
941
942
943
944
945
946 private void finishBlock() throws IOException {
947 if (blockType == BlockType.DATA) {
948 BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
949 new BufferGrabbingByteArrayOutputStream();
950 baosInMemory.writeTo(baosInMemoryCopy);
951 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
952 baosInMemoryCopy.buf, blockType);
953 blockType = dataBlockEncodingCtx.getBlockType();
954 }
955 userDataStream.flush();
956
957 uncompressedBytesWithHeader = baosInMemory.toByteArray();
958 prevOffset = prevOffsetByType[blockType.getId()];
959
960
961
962
963 state = State.BLOCK_READY;
964 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
965 onDiskBytesWithHeader = dataBlockEncodingCtx
966 .compressAndEncrypt(uncompressedBytesWithHeader);
967 } else {
968 onDiskBytesWithHeader = defaultBlockEncodingCtx
969 .compressAndEncrypt(uncompressedBytesWithHeader);
970 }
971 int numBytes = (int) ChecksumUtil.numBytes(
972 onDiskBytesWithHeader.length,
973 fileContext.getBytesPerChecksum());
974
975
976 putHeader(onDiskBytesWithHeader, 0,
977 onDiskBytesWithHeader.length + numBytes,
978 uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
979
980 putHeader(uncompressedBytesWithHeader, 0,
981 onDiskBytesWithHeader.length + numBytes,
982 uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
983
984 onDiskChecksum = new byte[numBytes];
985 ChecksumUtil.generateChecksums(
986 onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
987 onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
988 }
989
990 public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
991 private byte[] buf;
992
993 @Override
994 public void write(byte[] b, int off, int len) {
995 this.buf = b;
996 }
997
998 public byte[] getBuffer() {
999 return this.buf;
1000 }
1001 }
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011 private void putHeader(byte[] dest, int offset, int onDiskSize,
1012 int uncompressedSize, int onDiskDataSize) {
1013 offset = blockType.put(dest, offset);
1014 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1015 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1016 offset = Bytes.putLong(dest, offset, prevOffset);
1017 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1018 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1019 Bytes.putInt(dest, offset, onDiskDataSize);
1020 }
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030 public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1031 long offset = out.getPos();
1032 if (startOffset != -1 && offset != startOffset) {
1033 throw new IOException("A " + blockType + " block written to a "
1034 + "stream twice, first at offset " + startOffset + ", then at "
1035 + offset);
1036 }
1037 startOffset = offset;
1038
1039 finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1040 }
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1052 throws IOException {
1053 ensureBlockReady();
1054 out.write(onDiskBytesWithHeader);
1055 out.write(onDiskChecksum);
1056 }
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068 byte[] getHeaderAndDataForTest() throws IOException {
1069 ensureBlockReady();
1070
1071
1072 byte[] output =
1073 new byte[onDiskBytesWithHeader.length
1074 + onDiskChecksum.length];
1075 System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
1076 onDiskBytesWithHeader.length);
1077 System.arraycopy(onDiskChecksum, 0, output,
1078 onDiskBytesWithHeader.length, onDiskChecksum.length);
1079 return output;
1080 }
1081
1082
1083
1084
1085 public void release() {
1086 if (dataBlockEncodingCtx != null) {
1087 dataBlockEncodingCtx.close();
1088 dataBlockEncodingCtx = null;
1089 }
1090 if (defaultBlockEncodingCtx != null) {
1091 defaultBlockEncodingCtx.close();
1092 defaultBlockEncodingCtx = null;
1093 }
1094 }
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104 int getOnDiskSizeWithoutHeader() {
1105 expectState(State.BLOCK_READY);
1106 return onDiskBytesWithHeader.length
1107 + onDiskChecksum.length
1108 - HConstants.HFILEBLOCK_HEADER_SIZE;
1109 }
1110
1111
1112
1113
1114
1115
1116
1117
1118 int getOnDiskSizeWithHeader() {
1119 expectState(State.BLOCK_READY);
1120 return onDiskBytesWithHeader.length + onDiskChecksum.length;
1121 }
1122
1123
1124
1125
1126 int getUncompressedSizeWithoutHeader() {
1127 expectState(State.BLOCK_READY);
1128 return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1129 }
1130
1131
1132
1133
1134 int getUncompressedSizeWithHeader() {
1135 expectState(State.BLOCK_READY);
1136 return uncompressedBytesWithHeader.length;
1137 }
1138
1139
1140 public boolean isWriting() {
1141 return state == State.WRITING;
1142 }
1143
1144
1145
1146
1147
1148
1149
1150
1151 public int blockSizeWritten() {
1152 if (state != State.WRITING) return 0;
1153 return this.unencodedDataSizeWritten;
1154 }
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164 ByteBuffer getUncompressedBufferWithHeader() {
1165 expectState(State.BLOCK_READY);
1166 return ByteBuffer.wrap(uncompressedBytesWithHeader);
1167 }
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177 ByteBuffer getOnDiskBufferWithHeader() {
1178 expectState(State.BLOCK_READY);
1179 return ByteBuffer.wrap(onDiskBytesWithHeader);
1180 }
1181
1182 private void expectState(State expectedState) {
1183 if (state != expectedState) {
1184 throw new IllegalStateException("Expected state: " + expectedState +
1185 ", actual state: " + state);
1186 }
1187 }
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199 public void writeBlock(BlockWritable bw, FSDataOutputStream out)
1200 throws IOException {
1201 bw.writeToBlock(startWriting(bw.getBlockType()));
1202 writeHeaderAndData(out);
1203 }
1204
1205
1206
1207
1208
1209
1210
1211
1212 public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1213 HFileContext newContext = new HFileContextBuilder()
1214 .withBlockSize(fileContext.getBlocksize())
1215 .withBytesPerCheckSum(0)
1216 .withChecksumType(ChecksumType.NULL)
1217 .withCompression(fileContext.getCompression())
1218 .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1219 .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1220 .withCompressTags(fileContext.isCompressTags())
1221 .withIncludesMvcc(fileContext.isIncludesMvcc())
1222 .withIncludesTags(fileContext.isIncludesTags())
1223 .build();
1224 return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1225 getUncompressedSizeWithoutHeader(), prevOffset,
1226 cacheConf.shouldCacheCompressed(blockType.getCategory()) ?
1227 getOnDiskBufferWithHeader() :
1228 getUncompressedBufferWithHeader(),
1229 FILL_HEADER, startOffset,
1230 onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
1231 }
1232 }
1233
1234
1235 public interface BlockWritable {
1236
1237
1238 BlockType getBlockType();
1239
1240
1241
1242
1243
1244
1245
1246 void writeToBlock(DataOutput out) throws IOException;
1247 }
1248
1249
1250
1251
1252 public interface BlockIterator {
1253
1254
1255
1256
1257 HFileBlock nextBlock() throws IOException;
1258
1259
1260
1261
1262
1263 HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1264 }
1265
1266
1267 public interface FSReader {
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280 HFileBlock readBlockData(long offset, long onDiskSize,
1281 int uncompressedSize, boolean pread) throws IOException;
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292 BlockIterator blockRange(long startOffset, long endOffset);
1293
1294
1295 void closeStreams() throws IOException;
1296
1297
1298 HFileBlockDecodingContext getBlockDecodingContext();
1299
1300
1301 HFileBlockDecodingContext getDefaultBlockDecodingContext();
1302 }
1303
1304
1305
1306
1307
1308 private abstract static class AbstractFSReader implements FSReader {
1309
1310
1311
1312 protected long fileSize;
1313
1314
1315 protected final int hdrSize;
1316
1317
1318 protected HFileSystem hfs;
1319
1320
1321 protected Path path;
1322
1323 private final Lock streamLock = new ReentrantLock();
1324
1325
1326 public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1327
1328 protected HFileContext fileContext;
1329
1330 public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext)
1331 throws IOException {
1332 this.fileSize = fileSize;
1333 this.hfs = hfs;
1334 this.path = path;
1335 this.fileContext = fileContext;
1336 this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1337 }
1338
1339 @Override
1340 public BlockIterator blockRange(final long startOffset,
1341 final long endOffset) {
1342 final FSReader owner = this;
1343 return new BlockIterator() {
1344 private long offset = startOffset;
1345
1346 @Override
1347 public HFileBlock nextBlock() throws IOException {
1348 if (offset >= endOffset)
1349 return null;
1350 HFileBlock b = readBlockData(offset, -1, -1, false);
1351 offset += b.getOnDiskSizeWithHeader();
1352 return b.unpack(fileContext, owner);
1353 }
1354
1355 @Override
1356 public HFileBlock nextBlockWithBlockType(BlockType blockType)
1357 throws IOException {
1358 HFileBlock blk = nextBlock();
1359 if (blk.getBlockType() != blockType) {
1360 throw new IOException("Expected block of type " + blockType
1361 + " but found " + blk.getBlockType());
1362 }
1363 return blk;
1364 }
1365 };
1366 }
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383 protected int readAtOffset(FSDataInputStream istream,
1384 byte[] dest, int destOffset, int size,
1385 boolean peekIntoNextBlock, long fileOffset, boolean pread)
1386 throws IOException {
1387 if (peekIntoNextBlock &&
1388 destOffset + size + hdrSize > dest.length) {
1389
1390
1391 throw new IOException("Attempted to read " + size + " bytes and " +
1392 hdrSize + " bytes of next header into a " + dest.length +
1393 "-byte array at offset " + destOffset);
1394 }
1395
1396 if (!pread && streamLock.tryLock()) {
1397
1398 try {
1399 istream.seek(fileOffset);
1400
1401 long realOffset = istream.getPos();
1402 if (realOffset != fileOffset) {
1403 throw new IOException("Tried to seek to " + fileOffset + " to "
1404 + "read " + size + " bytes, but pos=" + realOffset
1405 + " after seek");
1406 }
1407
1408 if (!peekIntoNextBlock) {
1409 IOUtils.readFully(istream, dest, destOffset, size);
1410 return -1;
1411 }
1412
1413
1414 if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
1415 return -1;
1416 } finally {
1417 streamLock.unlock();
1418 }
1419 } else {
1420
1421 int extraSize = peekIntoNextBlock ? hdrSize : 0;
1422 if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset,
1423 size, extraSize)) {
1424 return -1;
1425 }
1426 }
1427
1428 assert peekIntoNextBlock;
1429 return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
1430 }
1431
1432 }
1433
1434
1435
1436
1437
1438 private static class PrefetchedHeader {
1439 long offset = -1;
1440 byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1441 final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1442 }
1443
1444
1445 static class FSReaderImpl extends AbstractFSReader {
1446
1447
1448 protected FSDataInputStreamWrapper streamWrapper;
1449
1450 private HFileBlockDecodingContext encodedBlockDecodingCtx;
1451
1452
1453 private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1454
1455 private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
1456 new ThreadLocal<PrefetchedHeader>() {
1457 @Override
1458 public PrefetchedHeader initialValue() {
1459 return new PrefetchedHeader();
1460 }
1461 };
1462
1463 public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1464 HFileContext fileContext) throws IOException {
1465 super(fileSize, hfs, path, fileContext);
1466 this.streamWrapper = stream;
1467
1468 this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1469 defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1470 encodedBlockDecodingCtx = defaultDecodingCtx;
1471 }
1472
1473
1474
1475
1476
1477 FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
1478 throws IOException {
1479 this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1480 }
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493 @Override
1494 public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
1495 int uncompressedSize, boolean pread)
1496 throws IOException {
1497
1498
1499
1500
1501
1502
1503 boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1504 FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1505
1506 HFileBlock blk = readBlockDataInternal(is, offset,
1507 onDiskSizeWithHeaderL,
1508 uncompressedSize, pread,
1509 doVerificationThruHBaseChecksum);
1510 if (blk == null) {
1511 HFile.LOG.warn("HBase checksum verification failed for file " +
1512 path + " at offset " +
1513 offset + " filesize " + fileSize +
1514 ". Retrying read with HDFS checksums turned on...");
1515
1516 if (!doVerificationThruHBaseChecksum) {
1517 String msg = "HBase checksum verification failed for file " +
1518 path + " at offset " +
1519 offset + " filesize " + fileSize +
1520 " but this cannot happen because doVerify is " +
1521 doVerificationThruHBaseChecksum;
1522 HFile.LOG.warn(msg);
1523 throw new IOException(msg);
1524 }
1525 HFile.checksumFailures.incrementAndGet();
1526
1527
1528
1529
1530
1531
1532
1533 is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1534 doVerificationThruHBaseChecksum = false;
1535 blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL,
1536 uncompressedSize, pread,
1537 doVerificationThruHBaseChecksum);
1538 if (blk != null) {
1539 HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1540 path + " at offset " +
1541 offset + " filesize " + fileSize);
1542 }
1543 }
1544 if (blk == null && !doVerificationThruHBaseChecksum) {
1545 String msg = "readBlockData failed, possibly due to " +
1546 "checksum verification failed for file " + path +
1547 " at offset " + offset + " filesize " + fileSize;
1548 HFile.LOG.warn(msg);
1549 throw new IOException(msg);
1550 }
1551
1552
1553
1554
1555
1556
1557
1558
1559 streamWrapper.checksumOk();
1560 return blk;
1561 }
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576 private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1577 long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread,
1578 boolean verifyChecksum)
1579 throws IOException {
1580 if (offset < 0) {
1581 throw new IOException("Invalid offset=" + offset + " trying to read "
1582 + "block (onDiskSize=" + onDiskSizeWithHeaderL
1583 + ", uncompressedSize=" + uncompressedSize + ")");
1584 }
1585
1586 if (uncompressedSize != -1) {
1587 throw new IOException("Version 2 block reader API does not need " +
1588 "the uncompressed size parameter");
1589 }
1590
1591 if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1592 || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1593 throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1594 + ": expected to be at least " + hdrSize
1595 + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
1596 + offset + ", uncompressedSize=" + uncompressedSize + ")");
1597 }
1598
1599 int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
1600
1601
1602
1603
1604
1605
1606
1607 PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1608 ByteBuffer headerBuf = prefetchedHeader.offset == offset? prefetchedHeader.buf: null;
1609
1610
1611 int nextBlockOnDiskSize = 0;
1612 byte[] onDiskBlock = null;
1613
1614 HFileBlock b = null;
1615 if (onDiskSizeWithHeader > 0) {
1616
1617
1618
1619
1620
1621
1622
1623
1624 int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1625 onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
1626
1627 nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
1628 preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
1629 true, offset + preReadHeaderSize, pread);
1630 if (headerBuf != null) {
1631
1632
1633
1634 assert headerBuf.hasArray();
1635 System.arraycopy(headerBuf.array(),
1636 headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1637 } else {
1638 headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1639 }
1640
1641 try {
1642
1643 b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
1644 } catch (IOException ex) {
1645
1646 throw new IOException("Failed to read compressed block at "
1647 + offset
1648 + ", onDiskSizeWithoutHeader="
1649 + onDiskSizeWithHeader
1650 + ", preReadHeaderSize="
1651 + hdrSize
1652 + ", header.length="
1653 + prefetchedHeader.header.length
1654 + ", header bytes: "
1655 + Bytes.toStringBinary(prefetchedHeader.header, 0,
1656 hdrSize), ex);
1657 }
1658
1659 int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
1660 assert onDiskSizeWithoutHeader >= 0;
1661 b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
1662 } else {
1663
1664
1665
1666
1667
1668
1669
1670
1671 if (headerBuf == null) {
1672
1673
1674
1675
1676
1677 headerBuf = ByteBuffer.allocate(hdrSize);
1678
1679 readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
1680 hdrSize, false, offset, pread);
1681 }
1682
1683 b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
1684 onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
1685
1686 System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1687 nextBlockOnDiskSize =
1688 readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
1689 - hdrSize, true, offset + hdrSize, pread);
1690 onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize;
1691 }
1692
1693 if (!fileContext.isCompressedOrEncrypted()) {
1694 b.assumeUncompressed();
1695 }
1696
1697 if (verifyChecksum && !validateBlockChecksum(b, offset, onDiskBlock, hdrSize)) {
1698 return null;
1699 }
1700
1701
1702
1703
1704
1705 b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader),
1706 this.fileContext.isUseHBaseChecksum());
1707
1708 b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
1709
1710
1711 if (b.hasNextBlockHeader()) {
1712 prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
1713 System.arraycopy(onDiskBlock, onDiskSizeWithHeader, prefetchedHeader.header, 0, hdrSize);
1714 }
1715
1716 b.offset = offset;
1717 b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
1718 b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
1719 return b;
1720 }
1721
1722 void setIncludesMemstoreTS(boolean includesMemstoreTS) {
1723 this.fileContext.setIncludesMvcc(includesMemstoreTS);
1724 }
1725
1726 void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1727 encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1728 }
1729
1730 @Override
1731 public HFileBlockDecodingContext getBlockDecodingContext() {
1732 return this.encodedBlockDecodingCtx;
1733 }
1734
1735 @Override
1736 public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1737 return this.defaultDecodingCtx;
1738 }
1739
1740
1741
1742
1743
1744
1745
1746 protected boolean validateBlockChecksum(HFileBlock block, long offset, byte[] data,
1747 int hdrSize)
1748 throws IOException {
1749 return ChecksumUtil.validateBlockChecksum(path, offset, block, data, hdrSize);
1750 }
1751
1752 @Override
1753 public void closeStreams() throws IOException {
1754 streamWrapper.close();
1755 }
1756
1757 @Override
1758 public String toString() {
1759 return "hfs=" + hfs + ", path=" + path + ", fileContext=" + fileContext;
1760 }
1761 }
1762
1763 @Override
1764 public int getSerializedLength() {
1765 if (buf != null) {
1766
1767 int extraSpace = hasNextBlockHeader() ? headerSize() : 0;
1768 return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1769 }
1770 return 0;
1771 }
1772
1773 @Override
1774 public void serialize(ByteBuffer destination) {
1775 ByteBufferUtils.copyFromBufferToBuffer(destination, this.buf, 0, getSerializedLength()
1776 - EXTRA_SERIALIZATION_SPACE);
1777 serializeExtraInfo(destination);
1778 }
1779
1780 public void serializeExtraInfo(ByteBuffer destination) {
1781 destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1782 destination.putLong(this.offset);
1783 destination.putInt(this.nextBlockOnDiskSizeWithHeader);
1784 destination.rewind();
1785 }
1786
1787 @Override
1788 public CacheableDeserializer<Cacheable> getDeserializer() {
1789 return HFileBlock.blockDeserializer;
1790 }
1791
1792 @Override
1793 public boolean equals(Object comparison) {
1794 if (this == comparison) {
1795 return true;
1796 }
1797 if (comparison == null) {
1798 return false;
1799 }
1800 if (comparison.getClass() != this.getClass()) {
1801 return false;
1802 }
1803
1804 HFileBlock castedComparison = (HFileBlock) comparison;
1805
1806 if (castedComparison.blockType != this.blockType) {
1807 return false;
1808 }
1809 if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
1810 return false;
1811 }
1812 if (castedComparison.offset != this.offset) {
1813 return false;
1814 }
1815 if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1816 return false;
1817 }
1818 if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1819 return false;
1820 }
1821 if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1822 return false;
1823 }
1824 if (ByteBufferUtils.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1825 castedComparison.buf.limit()) != 0) {
1826 return false;
1827 }
1828 return true;
1829 }
1830
1831 public DataBlockEncoding getDataBlockEncoding() {
1832 if (blockType == BlockType.ENCODED_DATA) {
1833 return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1834 }
1835 return DataBlockEncoding.NONE;
1836 }
1837
1838 byte getChecksumType() {
1839 return this.fileContext.getChecksumType().getCode();
1840 }
1841
1842 int getBytesPerChecksum() {
1843 return this.fileContext.getBytesPerChecksum();
1844 }
1845
1846
1847 int getOnDiskDataSizeWithHeader() {
1848 return this.onDiskDataSizeWithHeader;
1849 }
1850
1851
1852
1853
1854
1855 int totalChecksumBytes() {
1856
1857
1858
1859
1860 if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
1861 return 0;
1862 }
1863 return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1864 this.fileContext.getBytesPerChecksum());
1865 }
1866
1867
1868
1869
1870 public int headerSize() {
1871 return headerSize(this.fileContext.isUseHBaseChecksum());
1872 }
1873
1874
1875
1876
1877 public static int headerSize(boolean usesHBaseChecksum) {
1878 if (usesHBaseChecksum) {
1879 return HConstants.HFILEBLOCK_HEADER_SIZE;
1880 }
1881 return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1882 }
1883
1884
1885
1886
1887 public byte[] getDummyHeaderForVersion() {
1888 return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1889 }
1890
1891
1892
1893
1894 static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1895 if (usesHBaseChecksum) {
1896 return HConstants.HFILEBLOCK_DUMMY_HEADER;
1897 }
1898 return DUMMY_HEADER_NO_CHECKSUM;
1899 }
1900
1901
1902
1903
1904
1905 public HFileContext getHFileContext() {
1906 return this.fileContext;
1907 }
1908
1909
1910
1911
1912
1913
1914 static String toStringHeader(ByteBuffer buf) throws IOException {
1915 byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
1916 buf.get(magicBuf);
1917 BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
1918 int compressedBlockSizeNoHeader = buf.getInt();
1919 int uncompressedBlockSizeNoHeader = buf.getInt();
1920 long prevBlockOffset = buf.getLong();
1921 byte cksumtype = buf.get();
1922 long bytesPerChecksum = buf.getInt();
1923 long onDiskDataSizeWithHeader = buf.getInt();
1924 return " Header dump: magic: " + Bytes.toString(magicBuf) +
1925 " blockType " + bt +
1926 " compressedBlockSizeNoHeader " +
1927 compressedBlockSizeNoHeader +
1928 " uncompressedBlockSizeNoHeader " +
1929 uncompressedBlockSizeNoHeader +
1930 " prevBlockOffset " + prevBlockOffset +
1931 " checksumType " + ChecksumType.codeToType(cksumtype) +
1932 " bytesPerChecksum " + bytesPerChecksum +
1933 " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
1934 }
1935 }