1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import java.io.ByteArrayOutputStream;
22 import java.io.DataInput;
23 import java.io.DataInputStream;
24 import java.io.DataOutput;
25 import java.io.DataOutputStream;
26 import java.io.IOException;
27 import java.nio.ByteBuffer;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.classification.InterfaceAudience;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FSDataOutputStream;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.KeyValue;
41 import org.apache.hadoop.hbase.KeyValue.KVComparator;
42 import org.apache.hadoop.hbase.io.HeapSize;
43 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
44 import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.util.ClassSize;
47 import org.apache.hadoop.hbase.util.CompoundBloomFilterWriter;
48 import org.apache.hadoop.io.WritableUtils;
49 import org.apache.hadoop.util.StringUtils;
50
51
52
53
54
55
56
57
58
59
60 @InterfaceAudience.Private
61 public class HFileBlockIndex {
62
63 private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
64
65 static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
66
67
68
69
70
71 public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
72
73
74
75
76
77
78
79 static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
80 + Bytes.SIZEOF_LONG;
81
82
83
84
85 private static final String INLINE_BLOCKS_NOT_ALLOWED =
86 "Inline blocks are not allowed in the single-level-only mode";
87
88
89
90
91
92
93
94 private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
95 2 * Bytes.SIZEOF_INT;
96
97
98
99
100
101
102
103
104
105
106
107 public static class BlockIndexReader implements HeapSize {
108
109 private final KVComparator comparator;
110
111
112 private byte[][] blockKeys;
113 private long[] blockOffsets;
114 private int[] blockDataSizes;
115 private int rootCount = 0;
116
117
118 private long midLeafBlockOffset = -1;
119 private int midLeafBlockOnDiskSize = -1;
120 private int midKeyEntry = -1;
121
122
123 private AtomicReference<byte[]> midKey = new AtomicReference<byte[]>();
124
125
126
127
128
129 private int searchTreeLevel;
130
131
132 private CachingBlockReader cachingBlockReader;
133
134 public BlockIndexReader(final KVComparator c, final int treeLevel,
135 final CachingBlockReader cachingBlockReader) {
136 this(c, treeLevel);
137 this.cachingBlockReader = cachingBlockReader;
138 }
139
140 public BlockIndexReader(final KVComparator c, final int treeLevel)
141 {
142 comparator = c;
143 searchTreeLevel = treeLevel;
144 }
145
146
147
148
149 public boolean isEmpty() {
150 return blockKeys.length == 0;
151 }
152
153
154
155
156
157 public void ensureNonEmpty() {
158 if (blockKeys.length == 0) {
159 throw new IllegalStateException("Block index is empty or not loaded");
160 }
161 }
162
163
164
165
166
167
168
169
170
171
172
173
174
175 public HFileBlock seekToDataBlock(final byte[] key, int keyOffset,
176 int keyLength, HFileBlock currentBlock, boolean cacheBlocks,
177 boolean pread, boolean isCompaction)
178 throws IOException {
179 BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, keyOffset, keyLength,
180 currentBlock, cacheBlocks, pread, isCompaction);
181 if (blockWithScanInfo == null) {
182 return null;
183 } else {
184 return blockWithScanInfo.getHFileBlock();
185 }
186 }
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205 public BlockWithScanInfo loadDataBlockWithScanInfo(final byte[] key, int keyOffset,
206 int keyLength, HFileBlock currentBlock, boolean cacheBlocks,
207 boolean pread, boolean isCompaction)
208 throws IOException {
209 int rootLevelIndex = rootBlockContainingKey(key, keyOffset, keyLength);
210 if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
211 return null;
212 }
213
214
215 byte[] nextIndexedKey = null;
216
217
218 long currentOffset = blockOffsets[rootLevelIndex];
219 int currentOnDiskSize = blockDataSizes[rootLevelIndex];
220
221 if (rootLevelIndex < blockKeys.length - 1) {
222 nextIndexedKey = blockKeys[rootLevelIndex + 1];
223 } else {
224 nextIndexedKey = HConstants.NO_NEXT_INDEXED_KEY;
225 }
226
227 int lookupLevel = 1;
228 int index = -1;
229
230 HFileBlock block;
231 while (true) {
232
233 if (currentBlock != null && currentBlock.getOffset() == currentOffset)
234 {
235
236
237
238
239 block = currentBlock;
240 } else {
241
242
243 boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
244 BlockType expectedBlockType;
245 if (lookupLevel < searchTreeLevel - 1) {
246 expectedBlockType = BlockType.INTERMEDIATE_INDEX;
247 } else if (lookupLevel == searchTreeLevel - 1) {
248 expectedBlockType = BlockType.LEAF_INDEX;
249 } else {
250
251 expectedBlockType = BlockType.DATA;
252 }
253 block = cachingBlockReader.readBlock(currentOffset,
254 currentOnDiskSize, shouldCache, pread, isCompaction, true,
255 expectedBlockType);
256 }
257
258 if (block == null) {
259 throw new IOException("Failed to read block at offset " +
260 currentOffset + ", onDiskSize=" + currentOnDiskSize);
261 }
262
263
264 if (block.getBlockType().isData()) {
265 break;
266 }
267
268
269
270 if (++lookupLevel > searchTreeLevel) {
271 throw new IOException("Search Tree Level overflow: lookupLevel="+
272 lookupLevel + ", searchTreeLevel=" + searchTreeLevel);
273 }
274
275
276
277 ByteBuffer buffer = block.getBufferWithoutHeader();
278 index = locateNonRootIndexEntry(buffer, key, keyOffset, keyLength, comparator);
279 if (index == -1) {
280 throw new IOException("The key "
281 + Bytes.toStringBinary(key, keyOffset, keyLength)
282 + " is before the" + " first key of the non-root index block "
283 + block);
284 }
285
286 currentOffset = buffer.getLong();
287 currentOnDiskSize = buffer.getInt();
288
289
290 byte[] tmpNextIndexedKey = getNonRootIndexedKey(buffer, index + 1);
291 if (tmpNextIndexedKey != null) {
292 nextIndexedKey = tmpNextIndexedKey;
293 }
294 }
295
296 if (lookupLevel != searchTreeLevel) {
297 throw new IOException("Reached a data block at level " + lookupLevel +
298 " but the number of levels is " + searchTreeLevel);
299 }
300
301
302 BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
303 return blockWithScanInfo;
304 }
305
306
307
308
309
310
311
312
313 public byte[] midkey() throws IOException {
314 if (rootCount == 0)
315 throw new IOException("HFile empty");
316
317 byte[] targetMidKey = this.midKey.get();
318 if (targetMidKey != null) {
319 return targetMidKey;
320 }
321
322 if (midLeafBlockOffset >= 0) {
323 if (cachingBlockReader == null) {
324 throw new IOException("Have to read the middle leaf block but " +
325 "no block reader available");
326 }
327
328
329 HFileBlock midLeafBlock = cachingBlockReader.readBlock(
330 midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
331 BlockType.LEAF_INDEX);
332
333 ByteBuffer b = midLeafBlock.getBufferWithoutHeader();
334 int numDataBlocks = b.getInt();
335 int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1));
336 int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
337 keyRelOffset;
338 int keyOffset = b.arrayOffset() +
339 Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset +
340 SECONDARY_INDEX_ENTRY_OVERHEAD;
341 targetMidKey = Arrays.copyOfRange(b.array(), keyOffset, keyOffset + keyLen);
342 } else {
343
344 targetMidKey = blockKeys[rootCount / 2];
345 }
346
347 this.midKey.set(targetMidKey);
348 return targetMidKey;
349 }
350
351
352
353
354 public byte[] getRootBlockKey(int i) {
355 return blockKeys[i];
356 }
357
358
359
360
361 public long getRootBlockOffset(int i) {
362 return blockOffsets[i];
363 }
364
365
366
367
368
369
370 public int getRootBlockDataSize(int i) {
371 return blockDataSizes[i];
372 }
373
374
375
376
377 public int getRootBlockCount() {
378 return rootCount;
379 }
380
381
382
383
384
385
386
387
388
389
390 public int rootBlockContainingKey(final byte[] key, int offset,
391 int length) {
392 int pos = Bytes.binarySearch(blockKeys, key, offset, length,
393 comparator);
394
395
396
397 if (pos >= 0) {
398
399 assert pos < blockKeys.length;
400 return pos;
401 }
402
403
404
405
406
407
408 int i = -pos - 1;
409 assert 0 <= i && i <= blockKeys.length;
410 return i - 1;
411 }
412
413
414
415
416
417
418
419
420 private void add(final byte[] key, final long offset, final int dataSize) {
421 blockOffsets[rootCount] = offset;
422 blockKeys[rootCount] = key;
423 blockDataSizes[rootCount] = dataSize;
424 rootCount++;
425 }
426
427
428
429
430
431
432
433 private byte[] getNonRootIndexedKey(ByteBuffer nonRootIndex, int i) {
434 int numEntries = nonRootIndex.getInt(0);
435 if (i < 0 || i >= numEntries) {
436 return null;
437 }
438
439
440
441 int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
442
443 int targetKeyRelOffset = nonRootIndex.getInt(
444 Bytes.SIZEOF_INT * (i + 1));
445
446
447 int targetKeyOffset = entriesOffset
448 + targetKeyRelOffset
449 + SECONDARY_INDEX_ENTRY_OVERHEAD;
450
451
452
453
454 int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
455 targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
456
457 int from = nonRootIndex.arrayOffset() + targetKeyOffset;
458 int to = from + targetKeyLength;
459 return Arrays.copyOfRange(nonRootIndex.array(), from, to);
460 }
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478 static int binarySearchNonRootIndex(byte[] key, int keyOffset,
479 int keyLength, ByteBuffer nonRootIndex,
480 KVComparator comparator) {
481
482 int numEntries = nonRootIndex.getInt(0);
483 int low = 0;
484 int high = numEntries - 1;
485 int mid = 0;
486
487
488
489 int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
490
491
492
493
494
495 while (low <= high) {
496 mid = (low + high) >>> 1;
497
498
499 int midKeyRelOffset = nonRootIndex.getInt(
500 Bytes.SIZEOF_INT * (mid + 1));
501
502
503 int midKeyOffset = entriesOffset
504 + midKeyRelOffset
505 + SECONDARY_INDEX_ENTRY_OVERHEAD;
506
507
508
509
510 int midLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (mid + 2)) -
511 midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
512
513
514
515 int cmp = comparator.compareFlatKey(key, keyOffset, keyLength,
516 nonRootIndex.array(), nonRootIndex.arrayOffset() + midKeyOffset,
517 midLength);
518
519
520 if (cmp > 0)
521 low = mid + 1;
522
523 else if (cmp < 0)
524 high = mid - 1;
525 else
526 return mid;
527 }
528
529
530
531
532
533 if (low != high + 1) {
534 throw new IllegalStateException("Binary search broken: low=" + low
535 + " " + "instead of " + (high + 1));
536 }
537
538
539
540 int i = low - 1;
541
542
543 if (i < -1 || i >= numEntries) {
544 throw new IllegalStateException("Binary search broken: result is " +
545 i + " but expected to be between -1 and (numEntries - 1) = " +
546 (numEntries - 1));
547 }
548
549 return i;
550 }
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566 static int locateNonRootIndexEntry(ByteBuffer nonRootBlock, byte[] key,
567 int keyOffset, int keyLength, KVComparator comparator) {
568 int entryIndex = binarySearchNonRootIndex(key, keyOffset, keyLength,
569 nonRootBlock, comparator);
570
571 if (entryIndex != -1) {
572 int numEntries = nonRootBlock.getInt(0);
573
574
575 int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
576
577
578
579 int entryRelOffset = nonRootBlock.getInt(Bytes.SIZEOF_INT
580 * (1 + entryIndex));
581
582 nonRootBlock.position(entriesOffset + entryRelOffset);
583 }
584
585 return entryIndex;
586 }
587
588
589
590
591
592
593
594
595
596
597
598 public void readRootIndex(DataInput in, final int numEntries)
599 throws IOException {
600 blockOffsets = new long[numEntries];
601 blockKeys = new byte[numEntries][];
602 blockDataSizes = new int[numEntries];
603
604
605 if (numEntries > 0) {
606 for (int i = 0; i < numEntries; ++i) {
607 long offset = in.readLong();
608 int dataSize = in.readInt();
609 byte[] key = Bytes.readByteArray(in);
610 add(key, offset, dataSize);
611 }
612 }
613 }
614
615
616
617
618
619
620
621
622
623
624
625
626 public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
627 DataInputStream in = blk.getByteStream();
628 readRootIndex(in, numEntries);
629 return in;
630 }
631
632
633
634
635
636
637
638
639
640
641 public void readMultiLevelIndexRoot(HFileBlock blk,
642 final int numEntries) throws IOException {
643 DataInputStream in = readRootIndex(blk, numEntries);
644
645
646 int checkSumBytes = blk.totalChecksumBytes();
647 if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
648
649 return;
650 }
651 midLeafBlockOffset = in.readLong();
652 midLeafBlockOnDiskSize = in.readInt();
653 midKeyEntry = in.readInt();
654 }
655
656 @Override
657 public String toString() {
658 StringBuilder sb = new StringBuilder();
659 sb.append("size=" + rootCount).append("\n");
660 for (int i = 0; i < rootCount; i++) {
661 sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
662 .append("\n offset=").append(blockOffsets[i])
663 .append(", dataSize=" + blockDataSizes[i]).append("\n");
664 }
665 return sb.toString();
666 }
667
668 @Override
669 public long heapSize() {
670 long heapSize = ClassSize.align(6 * ClassSize.REFERENCE +
671 2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
672
673
674 heapSize += MID_KEY_METADATA_SIZE;
675
676
677 if (blockKeys != null) {
678
679 heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length
680 * ClassSize.REFERENCE);
681
682
683 for (byte[] key : blockKeys) {
684 heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
685 }
686 }
687
688 if (blockOffsets != null) {
689 heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
690 * Bytes.SIZEOF_LONG);
691 }
692
693 if (blockDataSizes != null) {
694 heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
695 * Bytes.SIZEOF_INT);
696 }
697
698 return ClassSize.align(heapSize);
699 }
700
701 }
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718 public static class BlockIndexWriter implements InlineBlockWriter {
719
720
721
722
723
724
725
726
727
728 private BlockIndexChunk rootChunk = new BlockIndexChunk();
729
730
731
732
733
734 private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
735
736
737
738
739
740
741
742
743
744
745 private int numLevels = 1;
746
747 private HFileBlock.Writer blockWriter;
748 private byte[] firstKey = null;
749
750
751
752
753
754
755 private long totalNumEntries;
756
757
758 private long totalBlockOnDiskSize;
759
760
761 private long totalBlockUncompressedSize;
762
763
764 private int maxChunkSize;
765
766
767 private boolean singleLevelOnly;
768
769
770 private BlockCache blockCache;
771
772
773 private String nameForCaching;
774
775
776 public BlockIndexWriter() {
777 this(null, null, null);
778 singleLevelOnly = true;
779 }
780
781
782
783
784
785
786
787
788 public BlockIndexWriter(HFileBlock.Writer blockWriter,
789 BlockCache blockCache, String nameForCaching) {
790 if ((blockCache == null) != (nameForCaching == null)) {
791 throw new IllegalArgumentException("Block cache and file name for " +
792 "caching must be both specified or both null");
793 }
794
795 this.blockWriter = blockWriter;
796 this.blockCache = blockCache;
797 this.nameForCaching = nameForCaching;
798 this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
799 }
800
801 public void setMaxChunkSize(int maxChunkSize) {
802 if (maxChunkSize <= 0) {
803 throw new IllegalArgumentException("Invald maximum index block size");
804 }
805 this.maxChunkSize = maxChunkSize;
806 }
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826 public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
827 if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
828 throw new IOException("Trying to write a multi-level block index, " +
829 "but are " + curInlineChunk.getNumEntries() + " entries in the " +
830 "last inline chunk.");
831 }
832
833
834
835 byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
836 : null;
837
838 if (curInlineChunk != null) {
839 while (rootChunk.getRootSize() > maxChunkSize) {
840 rootChunk = writeIntermediateLevel(out, rootChunk);
841 numLevels += 1;
842 }
843 }
844
845
846 long rootLevelIndexPos = out.getPos();
847
848 {
849 DataOutput blockStream =
850 blockWriter.startWriting(BlockType.ROOT_INDEX);
851 rootChunk.writeRoot(blockStream);
852 if (midKeyMetadata != null)
853 blockStream.write(midKeyMetadata);
854 blockWriter.writeHeaderAndData(out);
855 }
856
857
858 totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
859 totalBlockUncompressedSize +=
860 blockWriter.getUncompressedSizeWithoutHeader();
861
862 if (LOG.isTraceEnabled()) {
863 LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
864 + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
865 + " root-level entries, " + totalNumEntries + " total entries, "
866 + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
867 " on-disk size, "
868 + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
869 " total uncompressed size.");
870 }
871 return rootLevelIndexPos;
872 }
873
874
875
876
877
878
879
880
881
882
883
884 public void writeSingleLevelIndex(DataOutput out, String description)
885 throws IOException {
886 expectNumLevels(1);
887
888 if (!singleLevelOnly)
889 throw new IOException("Single-level mode is turned off");
890
891 if (rootChunk.getNumEntries() > 0)
892 throw new IOException("Root-level entries already added in " +
893 "single-level mode");
894
895 rootChunk = curInlineChunk;
896 curInlineChunk = new BlockIndexChunk();
897
898 if (LOG.isTraceEnabled()) {
899 LOG.trace("Wrote a single-level " + description + " index with "
900 + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
901 + " bytes");
902 }
903 rootChunk.writeRoot(out);
904 }
905
906
907
908
909
910
911
912
913
914
915
916
917
918 private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
919 BlockIndexChunk currentLevel) throws IOException {
920
921 BlockIndexChunk parent = new BlockIndexChunk();
922
923
924 BlockIndexChunk curChunk = new BlockIndexChunk();
925
926 for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
927 curChunk.add(currentLevel.getBlockKey(i),
928 currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
929
930 if (curChunk.getRootSize() >= maxChunkSize)
931 writeIntermediateBlock(out, parent, curChunk);
932 }
933
934 if (curChunk.getNumEntries() > 0) {
935 writeIntermediateBlock(out, parent, curChunk);
936 }
937
938 return parent;
939 }
940
941 private void writeIntermediateBlock(FSDataOutputStream out,
942 BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
943 long beginOffset = out.getPos();
944 DataOutputStream dos = blockWriter.startWriting(
945 BlockType.INTERMEDIATE_INDEX);
946 curChunk.writeNonRoot(dos);
947 byte[] curFirstKey = curChunk.getBlockKey(0);
948 blockWriter.writeHeaderAndData(out);
949
950 if (blockCache != null) {
951 HFileBlock blockForCaching = blockWriter.getBlockForCaching();
952 blockCache.cacheBlock(new BlockCacheKey(nameForCaching,
953 beginOffset, DataBlockEncoding.NONE,
954 blockForCaching.getBlockType()), blockForCaching);
955 }
956
957
958 totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
959 totalBlockUncompressedSize +=
960 blockWriter.getUncompressedSizeWithoutHeader();
961
962
963
964
965
966
967 parent.add(curFirstKey, beginOffset,
968 blockWriter.getOnDiskSizeWithHeader());
969
970
971 curChunk.clear();
972 curFirstKey = null;
973 }
974
975
976
977
978 public final int getNumRootEntries() {
979 return rootChunk.getNumEntries();
980 }
981
982
983
984
985 public int getNumLevels() {
986 return numLevels;
987 }
988
989 private void expectNumLevels(int expectedNumLevels) {
990 if (numLevels != expectedNumLevels) {
991 throw new IllegalStateException("Number of block index levels is "
992 + numLevels + "but is expected to be " + expectedNumLevels);
993 }
994 }
995
996
997
998
999
1000
1001 @Override
1002 public boolean shouldWriteBlock(boolean closing) {
1003 if (singleLevelOnly) {
1004 throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1005 }
1006
1007 if (curInlineChunk == null) {
1008 throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1009 "called with closing=true and then called again?");
1010 }
1011
1012 if (curInlineChunk.getNumEntries() == 0) {
1013 return false;
1014 }
1015
1016
1017 if (closing) {
1018 if (rootChunk.getNumEntries() == 0) {
1019
1020
1021
1022 expectNumLevels(1);
1023 rootChunk = curInlineChunk;
1024 curInlineChunk = null;
1025 return false;
1026 }
1027
1028 return true;
1029 } else {
1030 return curInlineChunk.getNonRootSize() >= maxChunkSize;
1031 }
1032 }
1033
1034
1035
1036
1037
1038
1039
1040 @Override
1041 public void writeInlineBlock(DataOutput out) throws IOException {
1042 if (singleLevelOnly)
1043 throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1044
1045
1046
1047 curInlineChunk.writeNonRoot(out);
1048
1049
1050
1051 firstKey = curInlineChunk.getBlockKey(0);
1052
1053
1054 curInlineChunk.clear();
1055 }
1056
1057
1058
1059
1060
1061 @Override
1062 public void blockWritten(long offset, int onDiskSize, int uncompressedSize)
1063 {
1064
1065 totalBlockOnDiskSize += onDiskSize;
1066 totalBlockUncompressedSize += uncompressedSize;
1067
1068 if (singleLevelOnly)
1069 throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1070
1071 if (firstKey == null) {
1072 throw new IllegalStateException("Trying to add second-level index " +
1073 "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1074 "but the first key was not set in writeInlineBlock");
1075 }
1076
1077 if (rootChunk.getNumEntries() == 0) {
1078
1079 expectNumLevels(1);
1080 numLevels = 2;
1081 }
1082
1083
1084
1085 rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1086 firstKey = null;
1087 }
1088
1089 @Override
1090 public BlockType getInlineBlockType() {
1091 return BlockType.LEAF_INDEX;
1092 }
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104 public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize)
1105 {
1106 curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1107 ++totalNumEntries;
1108 }
1109
1110
1111
1112
1113 public void ensureSingleLevel() throws IOException {
1114 if (numLevels > 1) {
1115 throw new IOException ("Wrote a " + numLevels + "-level index with " +
1116 rootChunk.getNumEntries() + " root-level entries, but " +
1117 "this is expected to be a single-level block index.");
1118 }
1119 }
1120
1121
1122
1123
1124
1125
1126 @Override
1127 public boolean getCacheOnWrite() {
1128 return blockCache != null;
1129 }
1130
1131
1132
1133
1134
1135
1136
1137 public long getTotalUncompressedSize() {
1138 return totalBlockUncompressedSize;
1139 }
1140
1141 }
1142
1143
1144
1145
1146
1147
1148 static class BlockIndexChunk {
1149
1150
1151 private final List<byte[]> blockKeys = new ArrayList<byte[]>();
1152
1153
1154 private final List<Long> blockOffsets = new ArrayList<Long>();
1155
1156
1157 private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
1158
1159
1160
1161
1162
1163
1164 private final List<Long> numSubEntriesAt = new ArrayList<Long>();
1165
1166
1167
1168
1169
1170
1171 private int curTotalNonRootEntrySize = 0;
1172
1173
1174
1175
1176 private int curTotalRootSize = 0;
1177
1178
1179
1180
1181
1182
1183 private final List<Integer> secondaryIndexOffsetMarks =
1184 new ArrayList<Integer>();
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199 void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1200 long curTotalNumSubEntries) {
1201
1202 secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1203 curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1204 + firstKey.length;
1205
1206 curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1207 + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1208
1209 blockKeys.add(firstKey);
1210 blockOffsets.add(blockOffset);
1211 onDiskDataSizes.add(onDiskDataSize);
1212
1213 if (curTotalNumSubEntries != -1) {
1214 numSubEntriesAt.add(curTotalNumSubEntries);
1215
1216
1217 if (numSubEntriesAt.size() != blockKeys.size()) {
1218 throw new IllegalStateException("Only have key/value count " +
1219 "stats for " + numSubEntriesAt.size() + " block index " +
1220 "entries out of " + blockKeys.size());
1221 }
1222 }
1223 }
1224
1225
1226
1227
1228
1229
1230
1231 public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1232 add(firstKey, blockOffset, onDiskDataSize, -1);
1233 }
1234
1235 public void clear() {
1236 blockKeys.clear();
1237 blockOffsets.clear();
1238 onDiskDataSizes.clear();
1239 secondaryIndexOffsetMarks.clear();
1240 numSubEntriesAt.clear();
1241 curTotalNonRootEntrySize = 0;
1242 curTotalRootSize = 0;
1243 }
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262 public int getEntryBySubEntry(long k) {
1263
1264
1265
1266 int i = Collections.binarySearch(numSubEntriesAt, k);
1267
1268
1269
1270
1271 if (i >= 0)
1272 return i + 1;
1273
1274
1275 return -i - 1;
1276 }
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286 public byte[] getMidKeyMetadata() throws IOException {
1287 ByteArrayOutputStream baos = new ByteArrayOutputStream(
1288 MID_KEY_METADATA_SIZE);
1289 DataOutputStream baosDos = new DataOutputStream(baos);
1290 long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1291 if (totalNumSubEntries == 0) {
1292 throw new IOException("No leaf-level entries, mid-key unavailable");
1293 }
1294 long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1295 int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1296
1297 baosDos.writeLong(blockOffsets.get(midKeyEntry));
1298 baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1299
1300 long numSubEntriesBefore = midKeyEntry > 0
1301 ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1302 long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1303 if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1304 {
1305 throw new IOException("Could not identify mid-key index within the "
1306 + "leaf-level block containing mid-key: out of range ("
1307 + subEntryWithinEntry + ", numSubEntriesBefore="
1308 + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1309 + ")");
1310 }
1311
1312 baosDos.writeInt((int) subEntryWithinEntry);
1313
1314 if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1315 throw new IOException("Could not write mid-key metadata: size=" +
1316 baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1317 }
1318
1319
1320 baos.close();
1321
1322 return baos.toByteArray();
1323 }
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334 void writeNonRoot(DataOutput out) throws IOException {
1335
1336 out.writeInt(blockKeys.size());
1337
1338 if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1339 throw new IOException("Corrupted block index chunk writer: " +
1340 blockKeys.size() + " entries but " +
1341 secondaryIndexOffsetMarks.size() + " secondary index items");
1342 }
1343
1344
1345
1346
1347
1348 for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1349 out.writeInt(currentSecondaryIndex);
1350
1351
1352
1353 out.writeInt(curTotalNonRootEntrySize);
1354
1355 for (int i = 0; i < blockKeys.size(); ++i) {
1356 out.writeLong(blockOffsets.get(i));
1357 out.writeInt(onDiskDataSizes.get(i));
1358 out.write(blockKeys.get(i));
1359 }
1360 }
1361
1362
1363
1364
1365
1366 int getNonRootSize() {
1367 return Bytes.SIZEOF_INT
1368 + Bytes.SIZEOF_INT * (blockKeys.size() + 1)
1369 + curTotalNonRootEntrySize;
1370 }
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382 void writeRoot(DataOutput out) throws IOException {
1383 for (int i = 0; i < blockKeys.size(); ++i) {
1384 out.writeLong(blockOffsets.get(i));
1385 out.writeInt(onDiskDataSizes.get(i));
1386 Bytes.writeByteArray(out, blockKeys.get(i));
1387 }
1388 }
1389
1390
1391
1392
1393 int getRootSize() {
1394 return curTotalRootSize;
1395 }
1396
1397
1398
1399
1400 public int getNumEntries() {
1401 return blockKeys.size();
1402 }
1403
1404 public byte[] getBlockKey(int i) {
1405 return blockKeys.get(i);
1406 }
1407
1408 public long getBlockOffset(int i) {
1409 return blockOffsets.get(i);
1410 }
1411
1412 public int getOnDiskDataSize(int i) {
1413 return onDiskDataSizes.get(i);
1414 }
1415
1416 public long getCumulativeNumKV(int i) {
1417 if (i < 0)
1418 return 0;
1419 return numSubEntriesAt.get(i);
1420 }
1421
1422 }
1423
1424 public static int getMaxChunkSize(Configuration conf) {
1425 return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1426 }
1427 }