1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.io.hfile;
21
22 import java.io.DataOutput;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FSDataOutputStream;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.KeyValue;
36 import org.apache.hadoop.hbase.KeyValue.KVComparator;
37 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
38 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
39 import org.apache.hadoop.hbase.util.BloomFilterWriter;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.io.Writable;
42 import org.apache.hadoop.io.WritableUtils;
43
44
45
46
47 @InterfaceAudience.Private
48 public class HFileWriterV2 extends AbstractHFileWriter {
49 static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
50
51
52 public static final byte [] MAX_MEMSTORE_TS_KEY =
53 Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
54
55
56 public static final byte [] KEY_VALUE_VERSION =
57 Bytes.toBytes("KEY_VALUE_VERSION");
58
59
60 public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
61
62
63 private List<InlineBlockWriter> inlineBlockWriters =
64 new ArrayList<InlineBlockWriter>();
65
66
67 protected HFileBlock.Writer fsBlockWriter;
68
69 private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
70 private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
71
72
73 private long firstDataBlockOffset = -1;
74
75
76 protected long lastDataBlockOffset;
77
78
79 private byte[] lastKeyOfPreviousBlock = null;
80
81
82 private List<BlockWritable> additionalLoadOnOpenData =
83 new ArrayList<BlockWritable>();
84
85 protected long maxMemstoreTS = 0;
86
87 static class WriterFactoryV2 extends HFile.WriterFactory {
88 WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
89 super(conf, cacheConf);
90 }
91
92 @Override
93 public Writer createWriter(FileSystem fs, Path path,
94 FSDataOutputStream ostream,
95 KVComparator comparator, HFileContext context) throws IOException {
96 context.setIncludesTags(false);
97 return new HFileWriterV2(conf, cacheConf, fs, path, ostream,
98 comparator, context);
99 }
100 }
101
102
103 public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
104 FileSystem fs, Path path, FSDataOutputStream ostream,
105 final KVComparator comparator, final HFileContext context) throws IOException {
106 super(cacheConf,
107 ostream == null ? createOutputStream(conf, fs, path, null) : ostream,
108 path, comparator, context);
109 finishInit(conf);
110 }
111
112
113 protected void finishInit(final Configuration conf) {
114 if (fsBlockWriter != null)
115 throw new IllegalStateException("finishInit called twice");
116
117 fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
118
119
120 boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
121 dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
122 cacheIndexesOnWrite ? cacheConf.getBlockCache(): null,
123 cacheIndexesOnWrite ? name : null);
124 dataBlockIndexWriter.setMaxChunkSize(
125 HFileBlockIndex.getMaxChunkSize(conf));
126 inlineBlockWriters.add(dataBlockIndexWriter);
127
128
129 metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
130 if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf);
131 }
132
133
134
135
136
137
138 protected void checkBlockBoundary() throws IOException {
139 if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
140 return;
141
142 finishBlock();
143 writeInlineBlocks(false);
144 newBlock();
145 }
146
147
148 private void finishBlock() throws IOException {
149 if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
150 return;
151
152 long startTimeNs = System.nanoTime();
153
154 if (firstDataBlockOffset == -1) {
155 firstDataBlockOffset = outputStream.getPos();
156 }
157
158 lastDataBlockOffset = outputStream.getPos();
159 fsBlockWriter.writeHeaderAndData(outputStream);
160 int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
161
162 byte[] indexKey = comparator.calcIndexKey(lastKeyOfPreviousBlock, firstKeyInBlock);
163 dataBlockIndexWriter.addEntry(indexKey, lastDataBlockOffset, onDiskSize);
164 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
165 HFile.offerWriteLatency(System.nanoTime() - startTimeNs);
166 if (cacheConf.shouldCacheDataOnWrite()) {
167 doCacheOnWrite(lastDataBlockOffset);
168 }
169 }
170
171
172 private void writeInlineBlocks(boolean closing) throws IOException {
173 for (InlineBlockWriter ibw : inlineBlockWriters) {
174 while (ibw.shouldWriteBlock(closing)) {
175 long offset = outputStream.getPos();
176 boolean cacheThisBlock = ibw.getCacheOnWrite();
177 ibw.writeInlineBlock(fsBlockWriter.startWriting(
178 ibw.getInlineBlockType()));
179 fsBlockWriter.writeHeaderAndData(outputStream);
180 ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
181 fsBlockWriter.getUncompressedSizeWithoutHeader());
182 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
183
184 if (cacheThisBlock) {
185 doCacheOnWrite(offset);
186 }
187 }
188 }
189 }
190
191
192
193
194
195
196 private void doCacheOnWrite(long offset) {
197 HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching();
198 cacheConf.getBlockCache().cacheBlock(
199 new BlockCacheKey(name, offset, blockEncoder.getDataBlockEncoding(),
200 cacheFormatBlock.getBlockType()), cacheFormatBlock);
201 }
202
203
204
205
206
207
208 protected void newBlock() throws IOException {
209
210 fsBlockWriter.startWriting(BlockType.DATA);
211 firstKeyInBlock = null;
212 if (lastKeyLength > 0) {
213 lastKeyOfPreviousBlock = new byte[lastKeyLength];
214 System.arraycopy(lastKeyBuffer, lastKeyOffset, lastKeyOfPreviousBlock, 0, lastKeyLength);
215 }
216 }
217
218
219
220
221
222
223
224
225
226
227
228
229 @Override
230 public void appendMetaBlock(String metaBlockName, Writable content) {
231 byte[] key = Bytes.toBytes(metaBlockName);
232 int i;
233 for (i = 0; i < metaNames.size(); ++i) {
234
235 byte[] cur = metaNames.get(i);
236 if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
237 key.length) > 0) {
238 break;
239 }
240 }
241 metaNames.add(i, key);
242 metaData.add(i, content);
243 }
244
245
246
247
248
249
250
251
252
253 @Override
254 public void append(final KeyValue kv) throws IOException {
255 append(kv.getMvccVersion(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
256 kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
257 this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMvccVersion());
258 }
259
260
261
262
263
264
265
266
267
268
269
270 @Override
271 public void append(final byte[] key, final byte[] value) throws IOException {
272 append(0, key, 0, key.length, value, 0, value.length);
273 }
274
275
276
277
278
279
280
281
282
283
284
285
286
287 protected void append(final long memstoreTS, final byte[] key, final int koffset,
288 final int klength, final byte[] value, final int voffset, final int vlength)
289 throws IOException {
290 boolean dupKey = checkKey(key, koffset, klength);
291 checkValue(value, voffset, vlength);
292 if (!dupKey) {
293 checkBlockBoundary();
294 }
295
296 if (!fsBlockWriter.isWriting())
297 newBlock();
298
299
300
301 {
302 DataOutputStream out = fsBlockWriter.getUserDataStream();
303 out.writeInt(klength);
304 totalKeyLength += klength;
305 out.writeInt(vlength);
306 totalValueLength += vlength;
307 out.write(key, koffset, klength);
308 out.write(value, voffset, vlength);
309 if (this.hFileContext.isIncludesMvcc()) {
310 WritableUtils.writeVLong(out, memstoreTS);
311 }
312 }
313
314
315 if (firstKeyInBlock == null) {
316
317 firstKeyInBlock = new byte[klength];
318 System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
319 }
320
321 lastKeyBuffer = key;
322 lastKeyOffset = koffset;
323 lastKeyLength = klength;
324 entryCount++;
325 }
326
327 @Override
328 public void close() throws IOException {
329 if (outputStream == null) {
330 return;
331 }
332
333 blockEncoder.saveMetadata(this);
334
335
336
337 finishBlock();
338 writeInlineBlocks(true);
339
340 FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
341
342
343 if (!metaNames.isEmpty()) {
344 for (int i = 0; i < metaNames.size(); ++i) {
345
346 long offset = outputStream.getPos();
347
348 DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
349 metaData.get(i).write(dos);
350
351 fsBlockWriter.writeHeaderAndData(outputStream);
352 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
353
354
355 metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
356 fsBlockWriter.getOnDiskSizeWithHeader());
357 }
358 }
359
360
361
362
363
364
365
366
367
368
369 long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
370 trailer.setLoadOnOpenOffset(rootIndexOffset);
371
372
373 metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
374 BlockType.ROOT_INDEX), "meta");
375 fsBlockWriter.writeHeaderAndData(outputStream);
376 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
377
378 if (this.hFileContext.isIncludesMvcc()) {
379 appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
380 appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
381 }
382
383
384 writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
385 fsBlockWriter.writeHeaderAndData(outputStream);
386 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
387
388
389 for (BlockWritable w : additionalLoadOnOpenData){
390 fsBlockWriter.writeBlock(w, outputStream);
391 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
392 }
393
394
395 trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
396 trailer.setUncompressedDataIndexSize(
397 dataBlockIndexWriter.getTotalUncompressedSize());
398 trailer.setFirstDataBlockOffset(firstDataBlockOffset);
399 trailer.setLastDataBlockOffset(lastDataBlockOffset);
400 trailer.setComparatorClass(comparator.getClass());
401 trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
402
403
404 finishClose(trailer);
405
406 fsBlockWriter.release();
407 }
408
409 @Override
410 public void addInlineBlockWriter(InlineBlockWriter ibw) {
411 inlineBlockWriters.add(ibw);
412 }
413
414 @Override
415 public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
416 this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
417 }
418
419 @Override
420 public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
421 this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
422 }
423
424 private void addBloomFilter(final BloomFilterWriter bfw,
425 final BlockType blockType) {
426 if (bfw.getKeyCount() <= 0)
427 return;
428
429 if (blockType != BlockType.GENERAL_BLOOM_META &&
430 blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
431 throw new RuntimeException("Block Type: " + blockType.toString() +
432 "is not supported");
433 }
434 additionalLoadOnOpenData.add(new BlockWritable() {
435 @Override
436 public BlockType getBlockType() {
437 return blockType;
438 }
439
440 @Override
441 public void writeToBlock(DataOutput out) throws IOException {
442 bfw.getMetaWriter().write(out);
443 Writable dataWriter = bfw.getDataWriter();
444 if (dataWriter != null)
445 dataWriter.write(out);
446 }
447 });
448 }
449
450 @Override
451 public void append(byte[] key, byte[] value, byte[] tag) throws IOException {
452 throw new UnsupportedOperationException("KV tags are supported only from HFile V3");
453 }
454
455 protected int getMajorVersion() {
456 return 2;
457 }
458
459 protected int getMinorVersion() {
460 return HFileReaderV2.MAX_MINOR_VERSION;
461 }
462
463 @Override
464 public HFileContext getFileContext() {
465 return hFileContext;
466 }
467 }