1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
22 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
26
27 import java.io.ByteArrayOutputStream;
28 import java.io.DataOutputStream;
29 import java.io.IOException;
30 import java.io.OutputStream;
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.Collections;
35 import java.util.HashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Random;
39 import java.util.concurrent.Callable;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.Executor;
42 import java.util.concurrent.ExecutorCompletionService;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.Future;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.fs.FSDataInputStream;
49 import org.apache.hadoop.fs.FSDataOutputStream;
50 import org.apache.hadoop.fs.FileSystem;
51 import org.apache.hadoop.fs.Path;
52 import org.apache.hadoop.hbase.HBaseTestingUtility;
53 import org.apache.hadoop.hbase.HConstants;
54 import org.apache.hadoop.hbase.KeyValue;
55 import org.apache.hadoop.hbase.MediumTests;
56 import org.apache.hadoop.hbase.Tag;
57 import org.apache.hadoop.hbase.fs.HFileSystem;
58 import org.apache.hadoop.hbase.io.DoubleOutputStream;
59 import org.apache.hadoop.hbase.io.compress.Compression;
60 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
61 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
62 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
63 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
64 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
65 import org.apache.hadoop.hbase.util.Bytes;
66 import org.apache.hadoop.hbase.util.ChecksumType;
67 import org.apache.hadoop.hbase.util.ClassSize;
68 import org.apache.hadoop.io.WritableUtils;
69 import org.apache.hadoop.io.compress.Compressor;
70 import org.junit.Before;
71 import org.junit.Test;
72 import org.junit.experimental.categories.Category;
73 import org.junit.runner.RunWith;
74 import org.junit.runners.Parameterized;
75 import org.junit.runners.Parameterized.Parameters;
76
77 @Category(MediumTests.class)
78 @RunWith(Parameterized.class)
79 public class TestHFileBlock {
80
81 private static final boolean detailedLogging = false;
82 private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
83
84 private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
85
86 static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
87 NONE, GZ };
88
89 private static final int NUM_TEST_BLOCKS = 1000;
90 private static final int NUM_READER_THREADS = 26;
91
92
93 private static int NUM_KEYVALUES = 50;
94 private static int FIELD_LENGTH = 10;
95 private static float CHANCE_TO_REPEAT = 0.6f;
96
97 private static final HBaseTestingUtility TEST_UTIL =
98 new HBaseTestingUtility();
99 private FileSystem fs;
100 private int uncompressedSizeV1;
101
102 private final boolean includesMemstoreTS;
103 private final boolean includesTag;
104 public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag) {
105 this.includesMemstoreTS = includesMemstoreTS;
106 this.includesTag = includesTag;
107 }
108
109 @Parameters
110 public static Collection<Object[]> parameters() {
111 return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
112 }
113
114 @Before
115 public void setUp() throws IOException {
116 fs = HFileSystem.get(TEST_UTIL.getConfiguration());
117 }
118
119 static void writeTestBlockContents(DataOutputStream dos) throws IOException {
120
121 for (int i = 0; i < 1000; ++i)
122 dos.writeInt(i / 100);
123 }
124
125 static int writeTestKeyValues(OutputStream dos, int seed, boolean includesMemstoreTS, boolean useTag)
126 throws IOException {
127 List<KeyValue> keyValues = new ArrayList<KeyValue>();
128 Random randomizer = new Random(42l + seed);
129
130
131 for (int i = 0; i < NUM_KEYVALUES; ++i) {
132 byte[] row;
133 long timestamp;
134 byte[] family;
135 byte[] qualifier;
136 byte[] value;
137
138
139 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
140 row = keyValues.get(randomizer.nextInt(keyValues.size())).getRow();
141 } else {
142 row = new byte[FIELD_LENGTH];
143 randomizer.nextBytes(row);
144 }
145 if (0 == i) {
146 family = new byte[FIELD_LENGTH];
147 randomizer.nextBytes(family);
148 } else {
149 family = keyValues.get(0).getFamily();
150 }
151 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
152 qualifier = keyValues.get(
153 randomizer.nextInt(keyValues.size())).getQualifier();
154 } else {
155 qualifier = new byte[FIELD_LENGTH];
156 randomizer.nextBytes(qualifier);
157 }
158 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
159 value = keyValues.get(randomizer.nextInt(keyValues.size())).getValue();
160 } else {
161 value = new byte[FIELD_LENGTH];
162 randomizer.nextBytes(value);
163 }
164 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
165 timestamp = keyValues.get(
166 randomizer.nextInt(keyValues.size())).getTimestamp();
167 } else {
168 timestamp = randomizer.nextLong();
169 }
170 if (!useTag) {
171 keyValues.add(new KeyValue(row, family, qualifier, timestamp, value));
172 } else {
173 keyValues.add(new KeyValue(row, family, qualifier, timestamp, value, new Tag[] { new Tag(
174 (byte) 1, Bytes.toBytes("myTagVal")) }));
175 }
176 }
177
178
179 int totalSize = 0;
180 Collections.sort(keyValues, KeyValue.COMPARATOR);
181 DataOutputStream dataOutputStream = new DataOutputStream(dos);
182
183 for (KeyValue kv : keyValues) {
184 dataOutputStream.writeInt(kv.getKeyLength());
185 dataOutputStream.writeInt(kv.getValueLength());
186 dataOutputStream.write(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
187 dataOutputStream.write(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
188
189
190 totalSize += kv.getLength();
191 if (useTag) {
192 dataOutputStream.writeShort(kv.getTagsLengthUnsigned());
193 dataOutputStream.write(kv.getBuffer(), kv.getTagsOffset(), kv.getTagsLengthUnsigned());
194 }
195 if (includesMemstoreTS) {
196 long memstoreTS = randomizer.nextLong();
197 WritableUtils.writeVLong(dataOutputStream, memstoreTS);
198 totalSize += WritableUtils.getVIntSize(memstoreTS);
199 }
200 }
201 return totalSize;
202 }
203
204 public byte[] createTestV1Block(Compression.Algorithm algo)
205 throws IOException {
206 Compressor compressor = algo.getCompressor();
207 ByteArrayOutputStream baos = new ByteArrayOutputStream();
208 OutputStream os = algo.createCompressionStream(baos, compressor, 0);
209 DataOutputStream dos = new DataOutputStream(os);
210 BlockType.META.write(dos);
211 writeTestBlockContents(dos);
212 uncompressedSizeV1 = dos.size();
213 dos.flush();
214 algo.returnCompressor(compressor);
215 return baos.toByteArray();
216 }
217
218 static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo,
219 boolean includesMemstoreTS, boolean includesTag) throws IOException {
220 final BlockType blockType = BlockType.DATA;
221 HFileContext meta = new HFileContextBuilder()
222 .withCompression(algo)
223 .withIncludesMvcc(includesMemstoreTS)
224 .withIncludesTags(includesTag)
225 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
226 .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
227 .build();
228 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
229 DataOutputStream dos = hbw.startWriting(blockType);
230 writeTestBlockContents(dos);
231 dos.flush();
232 byte[] headerAndData = hbw.getHeaderAndDataForTest();
233 assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
234 hbw.release();
235 return hbw;
236 }
237
238 public String createTestBlockStr(Compression.Algorithm algo,
239 int correctLength, boolean useTag) throws IOException {
240 HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS, useTag);
241 byte[] testV2Block = hbw.getHeaderAndDataForTest();
242 int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE + 9;
243 if (testV2Block.length == correctLength) {
244
245
246
247
248
249 testV2Block[osOffset] = 3;
250 }
251 return Bytes.toStringBinary(testV2Block);
252 }
253
254 @Test
255 public void testNoCompression() throws IOException {
256 assertEquals(4000, createTestV2Block(NONE, includesMemstoreTS, false).
257 getBlockForCaching().getUncompressedSizeWithoutHeader());
258 }
259
260 @Test
261 public void testGzipCompression() throws IOException {
262 final String correctTestBlockStr =
263 "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
264 + "\\xFF\\xFF\\xFF\\xFF"
265 + "\\x01\\x00\\x00@\\x00\\x00\\x00\\x00["
266
267 + "\\x1F\\x8B"
268 + "\\x08"
269 + "\\x00"
270 + "\\x00\\x00\\x00\\x00"
271 + "\\x00"
272
273
274
275
276 + "\\x03"
277 + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
278 + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
279 + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00"
280 + "\\x00\\x00\\x00\\x00";
281 final int correctGzipBlockLength = 95;
282 final String testBlockStr = createTestBlockStr(GZ, correctGzipBlockLength, false);
283
284
285 assertEquals(correctTestBlockStr.substring(0, correctGzipBlockLength - 4),
286 testBlockStr.substring(0, correctGzipBlockLength - 4));
287 }
288
289 @Test
290 public void testReaderV2() throws IOException {
291 testReaderV2Internals();
292 }
293
294 protected void testReaderV2Internals() throws IOException {
295 if(includesTag) {
296 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
297 }
298 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
299 for (boolean pread : new boolean[] { false, true }) {
300 LOG.info("testReaderV2: Compression algorithm: " + algo +
301 ", pread=" + pread);
302 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
303 + algo);
304 FSDataOutputStream os = fs.create(path);
305 HFileContext meta = new HFileContextBuilder()
306 .withCompression(algo)
307 .withIncludesMvcc(includesMemstoreTS)
308 .withIncludesTags(includesTag)
309 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
310 .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
311 .build();
312 HFileBlock.Writer hbw = new HFileBlock.Writer(null,
313 meta);
314 long totalSize = 0;
315 for (int blockId = 0; blockId < 2; ++blockId) {
316 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
317 for (int i = 0; i < 1234; ++i)
318 dos.writeInt(i);
319 hbw.writeHeaderAndData(os);
320 totalSize += hbw.getOnDiskSizeWithHeader();
321 }
322 os.close();
323
324 FSDataInputStream is = fs.open(path);
325 meta = new HFileContextBuilder()
326 .withHBaseCheckSum(true)
327 .withIncludesMvcc(includesMemstoreTS)
328 .withIncludesTags(includesTag)
329 .withCompression(algo).build();
330 HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, totalSize, meta);
331 HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
332 is.close();
333 assertEquals(0, HFile.getChecksumFailuresCount());
334
335 b.sanityCheck();
336 assertEquals(4936, b.getUncompressedSizeWithoutHeader());
337 assertEquals(algo == GZ ? 2173 : 4936,
338 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
339 String blockStr = b.toString();
340
341 if (algo == GZ) {
342 is = fs.open(path);
343 hbr = new HFileBlock.FSReaderV2(is, totalSize, meta);
344 b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
345 b.totalChecksumBytes(), -1, pread);
346 assertEquals(blockStr, b.toString());
347 int wrongCompressedSize = 2172;
348 try {
349 b = hbr.readBlockData(0, wrongCompressedSize
350 + HConstants.HFILEBLOCK_HEADER_SIZE, -1, pread);
351 fail("Exception expected");
352 } catch (IOException ex) {
353 String expectedPrefix = "On-disk size without header provided is "
354 + wrongCompressedSize + ", but block header contains "
355 + b.getOnDiskSizeWithoutHeader() + ".";
356 assertTrue("Invalid exception message: '" + ex.getMessage()
357 + "'.\nMessage is expected to start with: '" + expectedPrefix
358 + "'", ex.getMessage().startsWith(expectedPrefix));
359 }
360 is.close();
361 }
362 }
363 }
364 }
365
366
367
368
369
370 @Test
371 public void testDataBlockEncoding() throws IOException {
372 testInternals();
373 }
374
375 private void testInternals() throws IOException {
376 final int numBlocks = 5;
377 if(includesTag) {
378 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
379 }
380 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
381 for (boolean pread : new boolean[] { false, true }) {
382 for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
383 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
384 + algo + "_" + encoding.toString());
385 FSDataOutputStream os = fs.create(path);
386 HFileDataBlockEncoder dataBlockEncoder =
387 new HFileDataBlockEncoderImpl(encoding);
388 HFileContext meta = new HFileContextBuilder()
389 .withCompression(algo)
390 .withIncludesMvcc(includesMemstoreTS)
391 .withIncludesTags(includesTag)
392 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
393 .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
394 .build();
395 HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder,
396 meta);
397 long totalSize = 0;
398 final List<Integer> encodedSizes = new ArrayList<Integer>();
399 final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
400 for (int blockId = 0; blockId < numBlocks; ++blockId) {
401 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
402 writeEncodedBlock(algo, encoding, dos, encodedSizes, encodedBlocks,
403 blockId, includesMemstoreTS, HConstants.HFILEBLOCK_DUMMY_HEADER, includesTag);
404 hbw.writeHeaderAndData(os);
405 totalSize += hbw.getOnDiskSizeWithHeader();
406 }
407 os.close();
408
409 FSDataInputStream is = fs.open(path);
410 meta = new HFileContextBuilder()
411 .withHBaseCheckSum(true)
412 .withCompression(algo)
413 .withIncludesMvcc(includesMemstoreTS)
414 .withIncludesTags(includesTag)
415 .build();
416 HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, totalSize, meta);
417 hbr.setDataBlockEncoder(dataBlockEncoder);
418 hbr.setIncludesMemstoreTS(includesMemstoreTS);
419 HFileBlock b;
420 int pos = 0;
421 for (int blockId = 0; blockId < numBlocks; ++blockId) {
422 b = hbr.readBlockData(pos, -1, -1, pread);
423 assertEquals(0, HFile.getChecksumFailuresCount());
424 b.sanityCheck();
425 pos += b.getOnDiskSizeWithHeader();
426 assertEquals((int) encodedSizes.get(blockId),
427 b.getUncompressedSizeWithoutHeader());
428 ByteBuffer actualBuffer = b.getBufferWithoutHeader();
429 if (encoding != DataBlockEncoding.NONE) {
430
431 assertEquals(0, actualBuffer.get(0));
432 assertEquals(encoding.getId(), actualBuffer.get(1));
433 actualBuffer.position(2);
434 actualBuffer = actualBuffer.slice();
435 }
436
437 ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
438 expectedBuffer.rewind();
439
440
441 assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding,
442 pread);
443 }
444 is.close();
445 }
446 }
447 }
448 }
449
450 static void writeEncodedBlock(Algorithm algo, DataBlockEncoding encoding,
451 DataOutputStream dos, final List<Integer> encodedSizes,
452 final List<ByteBuffer> encodedBlocks, int blockId,
453 boolean includesMemstoreTS, byte[] dummyHeader, boolean useTag) throws IOException {
454 ByteArrayOutputStream baos = new ByteArrayOutputStream();
455 DoubleOutputStream doubleOutputStream =
456 new DoubleOutputStream(dos, baos);
457 writeTestKeyValues(doubleOutputStream, blockId, includesMemstoreTS, useTag);
458 ByteBuffer rawBuf = ByteBuffer.wrap(baos.toByteArray());
459 rawBuf.rewind();
460
461 DataBlockEncoder encoder = encoding.getEncoder();
462 int headerLen = dummyHeader.length;
463 byte[] encodedResultWithHeader = null;
464 HFileContext meta = new HFileContextBuilder()
465 .withCompression(algo)
466 .withIncludesMvcc(includesMemstoreTS)
467 .withIncludesTags(useTag)
468 .build();
469 if (encoder != null) {
470 HFileBlockEncodingContext encodingCtx = encoder.newDataBlockEncodingContext(encoding,
471 dummyHeader, meta);
472 encoder.encodeKeyValues(rawBuf, encodingCtx);
473 encodedResultWithHeader =
474 encodingCtx.getUncompressedBytesWithHeader();
475 } else {
476 HFileBlockDefaultEncodingContext defaultEncodingCtx = new HFileBlockDefaultEncodingContext(
477 encoding, dummyHeader, meta);
478 byte[] rawBufWithHeader =
479 new byte[rawBuf.array().length + headerLen];
480 System.arraycopy(rawBuf.array(), 0, rawBufWithHeader,
481 headerLen, rawBuf.array().length);
482 defaultEncodingCtx.compressAfterEncodingWithBlockType(rawBufWithHeader,
483 BlockType.DATA);
484 encodedResultWithHeader =
485 defaultEncodingCtx.getUncompressedBytesWithHeader();
486 }
487 final int encodedSize =
488 encodedResultWithHeader.length - headerLen;
489 if (encoder != null) {
490
491
492 headerLen += DataBlockEncoding.ID_SIZE;
493 }
494 byte[] encodedDataSection =
495 new byte[encodedResultWithHeader.length - headerLen];
496 System.arraycopy(encodedResultWithHeader, headerLen,
497 encodedDataSection, 0, encodedDataSection.length);
498 final ByteBuffer encodedBuf =
499 ByteBuffer.wrap(encodedDataSection);
500 encodedSizes.add(encodedSize);
501 encodedBlocks.add(encodedBuf);
502 }
503
504 static void assertBuffersEqual(ByteBuffer expectedBuffer,
505 ByteBuffer actualBuffer, Compression.Algorithm compression,
506 DataBlockEncoding encoding, boolean pread) {
507 if (!actualBuffer.equals(expectedBuffer)) {
508 int prefix = 0;
509 int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit());
510 while (prefix < minLimit &&
511 expectedBuffer.get(prefix) == actualBuffer.get(prefix)) {
512 prefix++;
513 }
514
515 fail(String.format(
516 "Content mismath for compression %s, encoding %s, " +
517 "pread %s, commonPrefix %d, expected %s, got %s",
518 compression, encoding, pread, prefix,
519 nextBytesToStr(expectedBuffer, prefix),
520 nextBytesToStr(actualBuffer, prefix)));
521 }
522 }
523
524
525
526
527
528 private static String nextBytesToStr(ByteBuffer buf, int pos) {
529 int maxBytes = buf.limit() - pos;
530 int numBytes = Math.min(16, maxBytes);
531 return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos,
532 numBytes) + (numBytes < maxBytes ? "..." : "");
533 }
534
535 @Test
536 public void testPreviousOffset() throws IOException {
537 testPreviousOffsetInternals();
538 }
539
540 protected void testPreviousOffsetInternals() throws IOException {
541 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
542 for (boolean pread : BOOLEAN_VALUES) {
543 for (boolean cacheOnWrite : BOOLEAN_VALUES) {
544 Random rand = defaultRandom();
545 LOG.info("testPreviousOffset:Compression algorithm: " + algo +
546 ", pread=" + pread +
547 ", cacheOnWrite=" + cacheOnWrite);
548 Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset");
549 List<Long> expectedOffsets = new ArrayList<Long>();
550 List<Long> expectedPrevOffsets = new ArrayList<Long>();
551 List<BlockType> expectedTypes = new ArrayList<BlockType>();
552 List<ByteBuffer> expectedContents = cacheOnWrite
553 ? new ArrayList<ByteBuffer>() : null;
554 long totalSize = writeBlocks(rand, algo, path, expectedOffsets,
555 expectedPrevOffsets, expectedTypes, expectedContents);
556
557 FSDataInputStream is = fs.open(path);
558 HFileContext meta = new HFileContextBuilder()
559 .withHBaseCheckSum(true)
560 .withIncludesMvcc(includesMemstoreTS)
561 .withIncludesTags(includesTag)
562 .withCompression(algo).build();
563 HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, totalSize, meta);
564 long curOffset = 0;
565 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
566 if (!pread) {
567 assertEquals(is.getPos(), curOffset + (i == 0 ? 0 :
568 HConstants.HFILEBLOCK_HEADER_SIZE));
569 }
570
571 assertEquals(expectedOffsets.get(i).longValue(), curOffset);
572 if (detailedLogging) {
573 LOG.info("Reading block #" + i + " at offset " + curOffset);
574 }
575 HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread);
576 if (detailedLogging) {
577 LOG.info("Block #" + i + ": " + b);
578 }
579 assertEquals("Invalid block #" + i + "'s type:",
580 expectedTypes.get(i), b.getBlockType());
581 assertEquals("Invalid previous block offset for block " + i
582 + " of " + "type " + b.getBlockType() + ":",
583 (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset());
584 b.sanityCheck();
585 assertEquals(curOffset, b.getOffset());
586
587
588
589 HFileBlock b2 = hbr.readBlockData(curOffset,
590 b.getOnDiskSizeWithHeader(), -1, pread);
591 b2.sanityCheck();
592
593 assertEquals(b.getBlockType(), b2.getBlockType());
594 assertEquals(b.getOnDiskSizeWithoutHeader(),
595 b2.getOnDiskSizeWithoutHeader());
596 assertEquals(b.getOnDiskSizeWithHeader(),
597 b2.getOnDiskSizeWithHeader());
598 assertEquals(b.getUncompressedSizeWithoutHeader(),
599 b2.getUncompressedSizeWithoutHeader());
600 assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset());
601 assertEquals(curOffset, b2.getOffset());
602 assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum());
603 assertEquals(b.getOnDiskDataSizeWithHeader(),
604 b2.getOnDiskDataSizeWithHeader());
605 assertEquals(0, HFile.getChecksumFailuresCount());
606
607 curOffset += b.getOnDiskSizeWithHeader();
608
609 if (cacheOnWrite) {
610
611
612
613
614 ByteBuffer bufRead = b.getBufferWithHeader();
615 ByteBuffer bufExpected = expectedContents.get(i);
616 boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
617 bufRead.arrayOffset(),
618 bufRead.limit() - b.totalChecksumBytes(),
619 bufExpected.array(), bufExpected.arrayOffset(),
620 bufExpected.limit()) == 0;
621 String wrongBytesMsg = "";
622
623 if (!bytesAreCorrect) {
624
625
626 wrongBytesMsg = "Expected bytes in block #" + i + " (algo="
627 + algo + ", pread=" + pread
628 + ", cacheOnWrite=" + cacheOnWrite + "):\n";
629 wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
630 bufExpected.arrayOffset(), Math.min(32,
631 bufExpected.limit()))
632 + ", actual:\n"
633 + Bytes.toStringBinary(bufRead.array(),
634 bufRead.arrayOffset(), Math.min(32, bufRead.limit()));
635 if (detailedLogging) {
636 LOG.warn("expected header" +
637 HFileBlock.toStringHeader(bufExpected) +
638 "\nfound header" +
639 HFileBlock.toStringHeader(bufRead));
640 LOG.warn("bufread offset " + bufRead.arrayOffset() +
641 " limit " + bufRead.limit() +
642 " expected offset " + bufExpected.arrayOffset() +
643 " limit " + bufExpected.limit());
644 LOG.warn(wrongBytesMsg);
645 }
646 }
647 assertTrue(wrongBytesMsg, bytesAreCorrect);
648 }
649 }
650
651 assertEquals(curOffset, fs.getFileStatus(path).getLen());
652 is.close();
653 }
654 }
655 }
656 }
657
658 private Random defaultRandom() {
659 return new Random(189237);
660 }
661
662 private class BlockReaderThread implements Callable<Boolean> {
663 private final String clientId;
664 private final HFileBlock.FSReader hbr;
665 private final List<Long> offsets;
666 private final List<BlockType> types;
667 private final long fileSize;
668
669 public BlockReaderThread(String clientId,
670 HFileBlock.FSReader hbr, List<Long> offsets, List<BlockType> types,
671 long fileSize) {
672 this.clientId = clientId;
673 this.offsets = offsets;
674 this.hbr = hbr;
675 this.types = types;
676 this.fileSize = fileSize;
677 }
678
679 @Override
680 public Boolean call() throws Exception {
681 Random rand = new Random(clientId.hashCode());
682 long endTime = System.currentTimeMillis() + 10000;
683 int numBlocksRead = 0;
684 int numPositionalRead = 0;
685 int numWithOnDiskSize = 0;
686 while (System.currentTimeMillis() < endTime) {
687 int blockId = rand.nextInt(NUM_TEST_BLOCKS);
688 long offset = offsets.get(blockId);
689 boolean pread = rand.nextBoolean();
690 boolean withOnDiskSize = rand.nextBoolean();
691 long expectedSize =
692 (blockId == NUM_TEST_BLOCKS - 1 ? fileSize
693 : offsets.get(blockId + 1)) - offset;
694
695 HFileBlock b;
696 try {
697 long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
698 b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread);
699 } catch (IOException ex) {
700 LOG.error("Error in client " + clientId + " trying to read block at "
701 + offset + ", pread=" + pread + ", withOnDiskSize=" +
702 withOnDiskSize, ex);
703 return false;
704 }
705
706 assertEquals(types.get(blockId), b.getBlockType());
707 assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
708 assertEquals(offset, b.getOffset());
709
710 ++numBlocksRead;
711 if (pread)
712 ++numPositionalRead;
713 if (withOnDiskSize)
714 ++numWithOnDiskSize;
715 }
716 LOG.info("Client " + clientId + " successfully read " + numBlocksRead +
717 " blocks (with pread: " + numPositionalRead + ", with onDiskSize " +
718 "specified: " + numWithOnDiskSize + ")");
719
720 return true;
721 }
722
723 }
724
725 @Test
726 public void testConcurrentReading() throws Exception {
727 testConcurrentReadingInternals();
728 }
729
730 protected void testConcurrentReadingInternals() throws IOException,
731 InterruptedException, ExecutionException {
732 for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
733 Path path =
734 new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
735 Random rand = defaultRandom();
736 List<Long> offsets = new ArrayList<Long>();
737 List<BlockType> types = new ArrayList<BlockType>();
738 writeBlocks(rand, compressAlgo, path, offsets, null, types, null);
739 FSDataInputStream is = fs.open(path);
740 long fileSize = fs.getFileStatus(path).getLen();
741 HFileContext meta = new HFileContextBuilder()
742 .withHBaseCheckSum(true)
743 .withIncludesMvcc(includesMemstoreTS)
744 .withIncludesTags(includesTag)
745 .withCompression(compressAlgo)
746 .build();
747 HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, fileSize, meta);
748
749 Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
750 ExecutorCompletionService<Boolean> ecs =
751 new ExecutorCompletionService<Boolean>(exec);
752
753 for (int i = 0; i < NUM_READER_THREADS; ++i) {
754 ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr,
755 offsets, types, fileSize));
756 }
757
758 for (int i = 0; i < NUM_READER_THREADS; ++i) {
759 Future<Boolean> result = ecs.take();
760 assertTrue(result.get());
761 if (detailedLogging) {
762 LOG.info(String.valueOf(i + 1)
763 + " reader threads finished successfully (algo=" + compressAlgo
764 + ")");
765 }
766 }
767
768 is.close();
769 }
770 }
771
772 private long writeBlocks(Random rand, Compression.Algorithm compressAlgo,
773 Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets,
774 List<BlockType> expectedTypes, List<ByteBuffer> expectedContents
775 ) throws IOException {
776 boolean cacheOnWrite = expectedContents != null;
777 FSDataOutputStream os = fs.create(path);
778 HFileContext meta = new HFileContextBuilder()
779 .withHBaseCheckSum(true)
780 .withIncludesMvcc(includesMemstoreTS)
781 .withIncludesTags(includesTag)
782 .withCompression(compressAlgo)
783 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
784 .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
785 .build();
786 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
787 Map<BlockType, Long> prevOffsetByType = new HashMap<BlockType, Long>();
788 long totalSize = 0;
789 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
790 long pos = os.getPos();
791 int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
792 if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
793 blockTypeOrdinal = BlockType.DATA.ordinal();
794 }
795 BlockType bt = BlockType.values()[blockTypeOrdinal];
796 DataOutputStream dos = hbw.startWriting(bt);
797 int size = rand.nextInt(500);
798 for (int j = 0; j < size; ++j) {
799
800 dos.writeShort(i + 1);
801 dos.writeInt(j + 1);
802 }
803
804 if (expectedOffsets != null)
805 expectedOffsets.add(os.getPos());
806
807 if (expectedPrevOffsets != null) {
808 Long prevOffset = prevOffsetByType.get(bt);
809 expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1);
810 prevOffsetByType.put(bt, os.getPos());
811 }
812
813 expectedTypes.add(bt);
814
815 hbw.writeHeaderAndData(os);
816 totalSize += hbw.getOnDiskSizeWithHeader();
817
818 if (cacheOnWrite)
819 expectedContents.add(hbw.getUncompressedBufferWithHeader());
820
821 if (detailedLogging) {
822 LOG.info("Written block #" + i + " of type " + bt
823 + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader()
824 + " at offset " + pos);
825 }
826 }
827 os.close();
828 LOG.info("Created a temporary file at " + path + ", "
829 + fs.getFileStatus(path).getLen() + " byte, compression=" +
830 compressAlgo);
831 return totalSize;
832 }
833
834 @Test
835 public void testBlockHeapSize() {
836 testBlockHeapSizeInternals();
837 }
838
839 protected void testBlockHeapSizeInternals() {
840 if (ClassSize.is32BitJVM()) {
841 assertTrue(HFileBlock.BYTE_BUFFER_HEAP_SIZE == 64);
842 } else {
843 assertTrue(HFileBlock.BYTE_BUFFER_HEAP_SIZE == 80);
844 }
845
846 for (int size : new int[] { 100, 256, 12345 }) {
847 byte[] byteArr = new byte[HConstants.HFILEBLOCK_HEADER_SIZE + size];
848 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
849 HFileContext meta = new HFileContextBuilder()
850 .withIncludesMvcc(includesMemstoreTS)
851 .withIncludesTags(includesTag)
852 .withHBaseCheckSum(false)
853 .withCompression(Algorithm.NONE)
854 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
855 .withChecksumType(ChecksumType.NULL).build();
856 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
857 HFileBlock.FILL_HEADER, -1,
858 0, meta);
859 long byteBufferExpectedSize =
860 ClassSize.align(ClassSize.estimateBase(buf.getClass(), true)
861 + HConstants.HFILEBLOCK_HEADER_SIZE + size);
862 long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
863 long hfileBlockExpectedSize =
864 ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
865 long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize;
866 assertEquals("Block data size: " + size + ", byte buffer expected " +
867 "size: " + byteBufferExpectedSize + ", HFileBlock class expected " +
868 "size: " + hfileBlockExpectedSize + ";", expected,
869 block.heapSize());
870 }
871 }
872
873
874 }
875