1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import java.io.ByteArrayInputStream;
22 import java.io.Closeable;
23 import java.io.DataInput;
24 import java.io.DataInputStream;
25 import java.io.DataOutputStream;
26 import java.io.IOException;
27 import java.io.SequenceInputStream;
28 import java.net.InetSocketAddress;
29 import java.nio.ByteBuffer;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Comparator;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.SortedMap;
37 import java.util.TreeMap;
38 import java.util.concurrent.ArrayBlockingQueue;
39 import java.util.concurrent.BlockingQueue;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.concurrent.atomic.AtomicLong;
42
43 import org.apache.hadoop.hbase.util.ByteStringer;
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.classification.InterfaceAudience;
47 import org.apache.hadoop.conf.Configuration;
48 import org.apache.hadoop.fs.FSDataInputStream;
49 import org.apache.hadoop.fs.FSDataOutputStream;
50 import org.apache.hadoop.fs.FileStatus;
51 import org.apache.hadoop.fs.FileSystem;
52 import org.apache.hadoop.fs.Path;
53 import org.apache.hadoop.fs.PathFilter;
54 import org.apache.hadoop.hbase.HConstants;
55 import org.apache.hadoop.hbase.KeyValue;
56 import org.apache.hadoop.hbase.KeyValue.KVComparator;
57 import org.apache.hadoop.hbase.fs.HFileSystem;
58 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
59 import org.apache.hadoop.hbase.io.compress.Compression;
60 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
61 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
62 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
63 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
64 import org.apache.hadoop.hbase.protobuf.generated.HFileProtos;
65 import org.apache.hadoop.hbase.util.BloomFilterWriter;
66 import org.apache.hadoop.hbase.util.Bytes;
67 import org.apache.hadoop.hbase.util.ChecksumType;
68 import org.apache.hadoop.hbase.util.FSUtils;
69 import org.apache.hadoop.io.Writable;
70
71 import com.google.common.base.Preconditions;
72 import com.google.common.collect.Lists;
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137 @InterfaceAudience.Private
138 public class HFile {
139 static final Log LOG = LogFactory.getLog(HFile.class);
140
141
142
143
144 public final static int MAXIMUM_KEY_LENGTH = Integer.MAX_VALUE;
145
146
147
148
149 public final static Compression.Algorithm DEFAULT_COMPRESSION_ALGORITHM =
150 Compression.Algorithm.NONE;
151
152
153 public static final int MIN_FORMAT_VERSION = 2;
154
155
156
157 public static final int MAX_FORMAT_VERSION = 3;
158
159
160
161
162 public static final int MIN_FORMAT_VERSION_WITH_TAGS = 3;
163
164
165 public final static String DEFAULT_COMPRESSION =
166 DEFAULT_COMPRESSION_ALGORITHM.getName();
167
168
169 public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
170
171
172
173
174
175
176
177 public final static int MIN_NUM_HFILE_PATH_LEVELS = 5;
178
179
180
181
182 public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024;
183 public static final ChecksumType DEFAULT_CHECKSUM_TYPE = ChecksumType.CRC32;
184
185
186 private static final AtomicInteger readOps = new AtomicInteger();
187 private static final AtomicLong readTimeNano = new AtomicLong();
188 private static final AtomicInteger writeOps = new AtomicInteger();
189 private static final AtomicLong writeTimeNano = new AtomicLong();
190
191
192 private static final AtomicInteger preadOps = new AtomicInteger();
193 private static final AtomicLong preadTimeNano = new AtomicLong();
194
195
196 static final AtomicLong checksumFailures = new AtomicLong();
197
198
199
200
201
202
203 private static final int LATENCY_BUFFER_SIZE = 5000;
204 private static final BlockingQueue<Long> fsReadLatenciesNanos =
205 new ArrayBlockingQueue<Long>(LATENCY_BUFFER_SIZE);
206 private static final BlockingQueue<Long> fsWriteLatenciesNanos =
207 new ArrayBlockingQueue<Long>(LATENCY_BUFFER_SIZE);
208 private static final BlockingQueue<Long> fsPreadLatenciesNanos =
209 new ArrayBlockingQueue<Long>(LATENCY_BUFFER_SIZE);
210
211 public static final void offerReadLatency(long latencyNanos, boolean pread) {
212 if (pread) {
213 fsPreadLatenciesNanos.offer(latencyNanos);
214 preadOps.incrementAndGet();
215 preadTimeNano.addAndGet(latencyNanos);
216 } else {
217 fsReadLatenciesNanos.offer(latencyNanos);
218 readTimeNano.addAndGet(latencyNanos);
219 readOps.incrementAndGet();
220 }
221 }
222
223 public static final void offerWriteLatency(long latencyNanos) {
224 fsWriteLatenciesNanos.offer(latencyNanos);
225
226 writeTimeNano.addAndGet(latencyNanos);
227 writeOps.incrementAndGet();
228 }
229
230 public static final Collection<Long> getReadLatenciesNanos() {
231 final List<Long> latencies =
232 Lists.newArrayListWithCapacity(fsReadLatenciesNanos.size());
233 fsReadLatenciesNanos.drainTo(latencies);
234 return latencies;
235 }
236
237 public static final Collection<Long> getPreadLatenciesNanos() {
238 final List<Long> latencies =
239 Lists.newArrayListWithCapacity(fsPreadLatenciesNanos.size());
240 fsPreadLatenciesNanos.drainTo(latencies);
241 return latencies;
242 }
243
244 public static final Collection<Long> getWriteLatenciesNanos() {
245 final List<Long> latencies =
246 Lists.newArrayListWithCapacity(fsWriteLatenciesNanos.size());
247 fsWriteLatenciesNanos.drainTo(latencies);
248 return latencies;
249 }
250
251
252 public static final AtomicLong dataBlockReadCnt = new AtomicLong(0);
253
254
255 public static final int getReadOps() {
256 return readOps.getAndSet(0);
257 }
258
259 public static final long getReadTimeMs() {
260 return readTimeNano.getAndSet(0) / 1000000;
261 }
262
263
264 public static final int getPreadOps() {
265 return preadOps.getAndSet(0);
266 }
267
268 public static final long getPreadTimeMs() {
269 return preadTimeNano.getAndSet(0) / 1000000;
270 }
271
272 public static final int getWriteOps() {
273 return writeOps.getAndSet(0);
274 }
275
276 public static final long getWriteTimeMs() {
277 return writeTimeNano.getAndSet(0) / 1000000;
278 }
279
280
281
282
283
284 public static final long getChecksumFailuresCount() {
285 return checksumFailures.getAndSet(0);
286 }
287
288
289 public interface Writer extends Closeable {
290
291
292 void appendFileInfo(byte[] key, byte[] value) throws IOException;
293
294 void append(KeyValue kv) throws IOException;
295
296 void append(byte[] key, byte[] value) throws IOException;
297
298 void append (byte[] key, byte[] value, byte[] tag) throws IOException;
299
300
301 Path getPath();
302
303
304
305
306
307 void addInlineBlockWriter(InlineBlockWriter bloomWriter);
308
309
310
311
312
313
314 void appendMetaBlock(String bloomFilterMetaKey, Writable metaWriter);
315
316
317
318
319
320
321 void addGeneralBloomFilter(BloomFilterWriter bfw);
322
323
324
325
326
327 void addDeleteFamilyBloomFilter(BloomFilterWriter bfw) throws IOException;
328
329
330
331
332 HFileContext getFileContext();
333 }
334
335
336
337
338
339 public static abstract class WriterFactory {
340 protected final Configuration conf;
341 protected final CacheConfig cacheConf;
342 protected FileSystem fs;
343 protected Path path;
344 protected FSDataOutputStream ostream;
345 protected KVComparator comparator = KeyValue.COMPARATOR;
346 protected InetSocketAddress[] favoredNodes;
347 private HFileContext fileContext;
348
349 WriterFactory(Configuration conf, CacheConfig cacheConf) {
350 this.conf = conf;
351 this.cacheConf = cacheConf;
352 }
353
354 public WriterFactory withPath(FileSystem fs, Path path) {
355 Preconditions.checkNotNull(fs);
356 Preconditions.checkNotNull(path);
357 this.fs = fs;
358 this.path = path;
359 return this;
360 }
361
362 public WriterFactory withOutputStream(FSDataOutputStream ostream) {
363 Preconditions.checkNotNull(ostream);
364 this.ostream = ostream;
365 return this;
366 }
367
368 public WriterFactory withComparator(KVComparator comparator) {
369 Preconditions.checkNotNull(comparator);
370 this.comparator = comparator;
371 return this;
372 }
373
374 public WriterFactory withFavoredNodes(InetSocketAddress[] favoredNodes) {
375
376 this.favoredNodes = favoredNodes;
377 return this;
378 }
379
380 public WriterFactory withFileContext(HFileContext fileContext) {
381 this.fileContext = fileContext;
382 return this;
383 }
384
385 public Writer create() throws IOException {
386 if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) {
387 throw new AssertionError("Please specify exactly one of " +
388 "filesystem/path or path");
389 }
390 if (path != null) {
391 ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes);
392 }
393 return createWriter(fs, path, ostream,
394 comparator, fileContext);
395 }
396
397 protected abstract Writer createWriter(FileSystem fs, Path path, FSDataOutputStream ostream,
398 KVComparator comparator, HFileContext fileContext) throws IOException;
399 }
400
401
402 public static final String FORMAT_VERSION_KEY = "hfile.format.version";
403
404 public static int getFormatVersion(Configuration conf) {
405 int version = conf.getInt(FORMAT_VERSION_KEY, MAX_FORMAT_VERSION);
406 checkFormatVersion(version);
407 return version;
408 }
409
410
411
412
413
414
415 public static final WriterFactory getWriterFactoryNoCache(Configuration
416 conf) {
417 Configuration tempConf = new Configuration(conf);
418 tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
419 return HFile.getWriterFactory(conf, new CacheConfig(tempConf));
420 }
421
422
423
424
425 public static final WriterFactory getWriterFactory(Configuration conf,
426 CacheConfig cacheConf) {
427 int version = getFormatVersion(conf);
428 switch (version) {
429 case 2:
430 return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
431 case 3:
432 return new HFileWriterV3.WriterFactoryV3(conf, cacheConf);
433 default:
434 throw new IllegalArgumentException("Cannot create writer for HFile " +
435 "format version " + version);
436 }
437 }
438
439
440 public interface CachingBlockReader {
441 HFileBlock readBlock(long offset, long onDiskBlockSize,
442 boolean cacheBlock, final boolean pread, final boolean isCompaction,
443 final boolean updateCacheMetrics, BlockType expectedBlockType)
444 throws IOException;
445 }
446
447
448 public interface Reader extends Closeable, CachingBlockReader {
449
450
451
452
453
454 String getName();
455
456 KVComparator getComparator();
457
458 HFileScanner getScanner(boolean cacheBlocks,
459 final boolean pread, final boolean isCompaction);
460
461 ByteBuffer getMetaBlock(String metaBlockName,
462 boolean cacheBlock) throws IOException;
463
464 Map<byte[], byte[]> loadFileInfo() throws IOException;
465
466 byte[] getLastKey();
467
468 byte[] midkey() throws IOException;
469
470 long length();
471
472 long getEntries();
473
474 byte[] getFirstKey();
475
476 long indexSize();
477
478 byte[] getFirstRowKey();
479
480 byte[] getLastRowKey();
481
482 FixedFileTrailer getTrailer();
483
484 HFileBlockIndex.BlockIndexReader getDataBlockIndexReader();
485
486 HFileScanner getScanner(boolean cacheBlocks, boolean pread);
487
488 Compression.Algorithm getCompressionAlgorithm();
489
490
491
492
493
494
495 DataInput getGeneralBloomFilterMetadata() throws IOException;
496
497
498
499
500
501
502 DataInput getDeleteBloomFilterMetadata() throws IOException;
503
504 Path getPath();
505
506
507 void close(boolean evictOnClose) throws IOException;
508
509 DataBlockEncoding getDataBlockEncoding();
510
511 boolean hasMVCCInfo();
512
513
514
515
516 HFileContext getFileContext();
517 }
518
519
520
521
522
523
524
525
526
527
528
529
530
531 private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis,
532 long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException {
533 FixedFileTrailer trailer = null;
534 try {
535 boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
536 assert !isHBaseChecksum;
537 trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
538 switch (trailer.getMajorVersion()) {
539 case 2:
540 return new HFileReaderV2(path, trailer, fsdis, size, cacheConf, hfs, conf);
541 case 3 :
542 return new HFileReaderV3(path, trailer, fsdis, size, cacheConf, hfs, conf);
543 default:
544 throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion());
545 }
546 } catch (Throwable t) {
547 try {
548 fsdis.close();
549 } catch (Throwable t2) {
550 LOG.warn("Error closing fsdis FSDataInputStreamWrapper", t2);
551 }
552 throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t);
553 }
554 }
555
556
557
558
559
560
561
562
563
564
565
566 public static Reader createReader(FileSystem fs, Path path,
567 FSDataInputStreamWrapper fsdis, long size, CacheConfig cacheConf, Configuration conf)
568 throws IOException {
569 HFileSystem hfs = null;
570
571
572
573
574
575 if (!(fs instanceof HFileSystem)) {
576 hfs = new HFileSystem(fs);
577 } else {
578 hfs = (HFileSystem)fs;
579 }
580 return pickReaderVersion(path, fsdis, size, cacheConf, hfs, conf);
581 }
582
583
584
585
586
587
588
589
590
591 public static Reader createReader(
592 FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException {
593 Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf");
594 FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
595 return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(),
596 cacheConf, stream.getHfs(), conf);
597 }
598
599
600
601
602 static Reader createReaderFromStream(Path path,
603 FSDataInputStream fsdis, long size, CacheConfig cacheConf, Configuration conf)
604 throws IOException {
605 FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
606 return pickReaderVersion(path, wrapper, size, cacheConf, null, conf);
607 }
608
609
610
611
612 public static class FileInfo implements SortedMap<byte[], byte[]> {
613 static final String RESERVED_PREFIX = "hfile.";
614 static final byte[] RESERVED_PREFIX_BYTES = Bytes.toBytes(RESERVED_PREFIX);
615 static final byte [] LASTKEY = Bytes.toBytes(RESERVED_PREFIX + "LASTKEY");
616 static final byte [] AVG_KEY_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_KEY_LEN");
617 static final byte [] AVG_VALUE_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_VALUE_LEN");
618 static final byte [] COMPARATOR = Bytes.toBytes(RESERVED_PREFIX + "COMPARATOR");
619 static final byte [] TAGS_COMPRESSED = Bytes.toBytes(RESERVED_PREFIX + "TAGS_COMPRESSED");
620 public static final byte [] MAX_TAGS_LEN = Bytes.toBytes(RESERVED_PREFIX + "MAX_TAGS_LEN");
621 private final SortedMap<byte [], byte []> map = new TreeMap<byte [], byte []>(Bytes.BYTES_COMPARATOR);
622
623 public FileInfo() {
624 super();
625 }
626
627
628
629
630
631
632
633
634
635
636
637
638 public FileInfo append(final byte[] k, final byte[] v,
639 final boolean checkPrefix) throws IOException {
640 if (k == null || v == null) {
641 throw new NullPointerException("Key nor value may be null");
642 }
643 if (checkPrefix && isReservedFileInfoKey(k)) {
644 throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX
645 + " are reserved");
646 }
647 put(k, v);
648 return this;
649 }
650
651 public void clear() {
652 this.map.clear();
653 }
654
655 public Comparator<? super byte[]> comparator() {
656 return map.comparator();
657 }
658
659 public boolean containsKey(Object key) {
660 return map.containsKey(key);
661 }
662
663 public boolean containsValue(Object value) {
664 return map.containsValue(value);
665 }
666
667 public Set<java.util.Map.Entry<byte[], byte[]>> entrySet() {
668 return map.entrySet();
669 }
670
671 public boolean equals(Object o) {
672 return map.equals(o);
673 }
674
675 public byte[] firstKey() {
676 return map.firstKey();
677 }
678
679 public byte[] get(Object key) {
680 return map.get(key);
681 }
682
683 public int hashCode() {
684 return map.hashCode();
685 }
686
687 public SortedMap<byte[], byte[]> headMap(byte[] toKey) {
688 return this.map.headMap(toKey);
689 }
690
691 public boolean isEmpty() {
692 return map.isEmpty();
693 }
694
695 public Set<byte[]> keySet() {
696 return map.keySet();
697 }
698
699 public byte[] lastKey() {
700 return map.lastKey();
701 }
702
703 public byte[] put(byte[] key, byte[] value) {
704 return this.map.put(key, value);
705 }
706
707 public void putAll(Map<? extends byte[], ? extends byte[]> m) {
708 this.map.putAll(m);
709 }
710
711 public byte[] remove(Object key) {
712 return this.map.remove(key);
713 }
714
715 public int size() {
716 return map.size();
717 }
718
719 public SortedMap<byte[], byte[]> subMap(byte[] fromKey, byte[] toKey) {
720 return this.map.subMap(fromKey, toKey);
721 }
722
723 public SortedMap<byte[], byte[]> tailMap(byte[] fromKey) {
724 return this.map.tailMap(fromKey);
725 }
726
727 public Collection<byte[]> values() {
728 return map.values();
729 }
730
731
732
733
734
735
736
737
738 void write(final DataOutputStream out) throws IOException {
739 HFileProtos.FileInfoProto.Builder builder = HFileProtos.FileInfoProto.newBuilder();
740 for (Map.Entry<byte [], byte[]> e: this.map.entrySet()) {
741 HBaseProtos.BytesBytesPair.Builder bbpBuilder = HBaseProtos.BytesBytesPair.newBuilder();
742 bbpBuilder.setFirst(ByteStringer.wrap(e.getKey()));
743 bbpBuilder.setSecond(ByteStringer.wrap(e.getValue()));
744 builder.addMapEntry(bbpBuilder.build());
745 }
746 out.write(ProtobufUtil.PB_MAGIC);
747 builder.build().writeDelimitedTo(out);
748 }
749
750
751
752
753
754
755
756
757 void read(final DataInputStream in) throws IOException {
758
759 int pblen = ProtobufUtil.lengthOfPBMagic();
760 byte [] pbuf = new byte[pblen];
761 if (in.markSupported()) in.mark(pblen);
762 int read = in.read(pbuf);
763 if (read != pblen) throw new IOException("read=" + read + ", wanted=" + pblen);
764 if (ProtobufUtil.isPBMagicPrefix(pbuf)) {
765 parsePB(HFileProtos.FileInfoProto.parseDelimitedFrom(in));
766 } else {
767 if (in.markSupported()) {
768 in.reset();
769 parseWritable(in);
770 } else {
771
772 ByteArrayInputStream bais = new ByteArrayInputStream(pbuf);
773 SequenceInputStream sis = new SequenceInputStream(bais, in);
774
775
776
777 parseWritable(new DataInputStream(sis));
778 }
779 }
780 }
781
782
783
784
785
786
787 void parseWritable(final DataInputStream in) throws IOException {
788
789 this.map.clear();
790
791 int entries = in.readInt();
792
793 for (int i = 0; i < entries; i++) {
794 byte [] key = Bytes.readByteArray(in);
795
796 in.readByte();
797 byte [] value = Bytes.readByteArray(in);
798 this.map.put(key, value);
799 }
800 }
801
802
803
804
805
806 void parsePB(final HFileProtos.FileInfoProto fip) {
807 this.map.clear();
808 for (BytesBytesPair pair: fip.getMapEntryList()) {
809 this.map.put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
810 }
811 }
812 }
813
814
815 public static boolean isReservedFileInfoKey(byte[] key) {
816 return Bytes.startsWith(key, FileInfo.RESERVED_PREFIX_BYTES);
817 }
818
819
820
821
822
823
824
825
826
827
828
829
830
831 public static String[] getSupportedCompressionAlgorithms() {
832 return Compression.getSupportedAlgorithms();
833 }
834
835
836
837
838
839
840 static int longToInt(final long l) {
841
842
843 return (int)(l & 0x00000000ffffffffL);
844 }
845
846
847
848
849
850
851
852
853
854
855 static List<Path> getStoreFiles(FileSystem fs, Path regionDir)
856 throws IOException {
857 List<Path> res = new ArrayList<Path>();
858 PathFilter dirFilter = new FSUtils.DirFilter(fs);
859 FileStatus[] familyDirs = fs.listStatus(regionDir, dirFilter);
860 for(FileStatus dir : familyDirs) {
861 FileStatus[] files = fs.listStatus(dir.getPath());
862 for (FileStatus file : files) {
863 if (!file.isDir()) {
864 res.add(file.getPath());
865 }
866 }
867 }
868 return res;
869 }
870
871 public static void main(String[] args) throws IOException {
872 HFilePrettyPrinter prettyPrinter = new HFilePrettyPrinter();
873 System.exit(prettyPrinter.run(args));
874 }
875
876
877
878
879
880
881
882
883
884
885 public static void checkFormatVersion(int version)
886 throws IllegalArgumentException {
887 if (version < MIN_FORMAT_VERSION || version > MAX_FORMAT_VERSION) {
888 throw new IllegalArgumentException("Invalid HFile version: " + version
889 + " (expected to be " + "between " + MIN_FORMAT_VERSION + " and "
890 + MAX_FORMAT_VERSION + ")");
891 }
892 }
893 }