1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.io.hfile;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNotEquals;
25 import static org.junit.Assert.assertTrue;
26
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.EnumMap;
31 import java.util.List;
32 import java.util.Random;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HColumnDescriptor;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.KeyValue;
43 import org.apache.hadoop.hbase.testclassification.MediumTests;
44 import org.apache.hadoop.hbase.Tag;
45 import org.apache.hadoop.hbase.client.Durability;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.fs.HFileSystem;
48 import org.apache.hadoop.hbase.io.compress.Compression;
49 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
50 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
51 import org.apache.hadoop.hbase.regionserver.BloomType;
52 import org.apache.hadoop.hbase.regionserver.HRegion;
53 import org.apache.hadoop.hbase.regionserver.Region;
54 import org.apache.hadoop.hbase.regionserver.StoreFile;
55 import org.apache.hadoop.hbase.util.BloomFilterFactory;
56 import org.apache.hadoop.hbase.util.Bytes;
57 import org.apache.hadoop.hbase.util.ChecksumType;
58 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
59 import org.junit.After;
60 import org.junit.AfterClass;
61 import org.junit.Before;
62 import org.junit.Test;
63 import org.junit.experimental.categories.Category;
64 import org.junit.runner.RunWith;
65 import org.junit.runners.Parameterized;
66 import org.junit.runners.Parameterized.Parameters;
67
68 import com.google.common.collect.Lists;
69
70
71
72
73
74 @RunWith(Parameterized.class)
75 @Category(MediumTests.class)
76 public class TestCacheOnWrite {
77
78 private static final Log LOG = LogFactory.getLog(TestCacheOnWrite.class);
79
80 private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
81 private Configuration conf;
82 private CacheConfig cacheConf;
83 private FileSystem fs;
84 private Random rand = new Random(12983177L);
85 private Path storeFilePath;
86 private BlockCache blockCache;
87 private String testDescription;
88
89 private final CacheOnWriteType cowType;
90 private final Compression.Algorithm compress;
91 private final boolean cacheCompressedData;
92
93 private static final int DATA_BLOCK_SIZE = 2048;
94 private static final int NUM_KV = 25000;
95 private static final int INDEX_BLOCK_SIZE = 512;
96 private static final int BLOOM_BLOCK_SIZE = 4096;
97 private static final BloomType BLOOM_TYPE = BloomType.ROWCOL;
98 private static final int CKBYTES = 512;
99
100
101 private static final int NUM_VALID_KEY_TYPES =
102 KeyValue.Type.values().length - 2;
103
104 private static enum CacheOnWriteType {
105 DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
106 BlockType.DATA, BlockType.ENCODED_DATA),
107 BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
108 BlockType.BLOOM_CHUNK),
109 INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
110 BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX);
111
112 private final String confKey;
113 private final BlockType blockType1;
114 private final BlockType blockType2;
115
116 private CacheOnWriteType(String confKey, BlockType blockType) {
117 this(confKey, blockType, blockType);
118 }
119
120 private CacheOnWriteType(String confKey, BlockType blockType1,
121 BlockType blockType2) {
122 this.blockType1 = blockType1;
123 this.blockType2 = blockType2;
124 this.confKey = confKey;
125 }
126
127 public boolean shouldBeCached(BlockType blockType) {
128 return blockType == blockType1 || blockType == blockType2;
129 }
130
131 public void modifyConf(Configuration conf) {
132 for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
133 conf.setBoolean(cowType.confKey, cowType == this);
134 }
135 }
136 }
137
138 public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress,
139 boolean cacheCompressedData, BlockCache blockCache) {
140 this.cowType = cowType;
141 this.compress = compress;
142 this.cacheCompressedData = cacheCompressedData;
143 this.blockCache = blockCache;
144 testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress +
145 ", cacheCompressedData=" + cacheCompressedData + "]";
146 LOG.info(testDescription);
147 }
148
149 private static List<BlockCache> getBlockCaches() throws IOException {
150 Configuration conf = TEST_UTIL.getConfiguration();
151 List<BlockCache> blockcaches = new ArrayList<BlockCache>();
152
153 blockcaches.add(new CacheConfig(conf).getBlockCache());
154
155
156 BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration());
157 blockcaches.add(lru);
158
159
160 FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir());
161 int[] bucketSizes =
162 { INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 };
163 BlockCache bucketcache =
164 new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null);
165 blockcaches.add(bucketcache);
166 return blockcaches;
167 }
168
169 @Parameters
170 public static Collection<Object[]> getParameters() throws IOException {
171 List<Object[]> params = new ArrayList<Object[]>();
172 for (BlockCache blockCache : getBlockCaches()) {
173 for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
174 for (Compression.Algorithm compress : HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
175 for (boolean cacheCompressedData : new boolean[] { false, true }) {
176 params.add(new Object[] { cowType, compress, cacheCompressedData, blockCache });
177 }
178 }
179 }
180 }
181 return params;
182 }
183
184 private void clearBlockCache(BlockCache blockCache) throws InterruptedException {
185 if (blockCache instanceof LruBlockCache) {
186 ((LruBlockCache) blockCache).clearCache();
187 } else {
188
189 for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) {
190 if (clearCount > 0) {
191 LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, "
192 + blockCache.getBlockCount() + " blocks remaining");
193 Thread.sleep(10);
194 }
195 for (CachedBlock block : Lists.newArrayList(blockCache)) {
196 BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset());
197
198 for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) {
199 if (evictCount > 1) {
200 LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount
201 + " times, maybe a bug here");
202 }
203 }
204 }
205 }
206 }
207 }
208
209 @Before
210 public void setUp() throws IOException {
211 conf = TEST_UTIL.getConfiguration();
212 this.conf.set("dfs.datanode.data.dir.perm", "700");
213 conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
214 conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
215 conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
216 BLOOM_BLOCK_SIZE);
217 conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData);
218 cowType.modifyConf(conf);
219 fs = HFileSystem.get(conf);
220 CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = blockCache;
221 cacheConf =
222 new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA),
223 cowType.shouldBeCached(BlockType.LEAF_INDEX),
224 cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData,
225 false, false, false);
226 }
227
228 @After
229 public void tearDown() throws IOException, InterruptedException {
230 clearBlockCache(blockCache);
231 }
232
233 @AfterClass
234 public static void afterClass() throws IOException {
235 TEST_UTIL.cleanupTestDir();
236 }
237
238 private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException {
239 writeStoreFile(useTags);
240 readStoreFile(useTags);
241 }
242
243 private void readStoreFile(boolean useTags) throws IOException {
244 AbstractHFileReader reader;
245 if (useTags) {
246 reader = (HFileReaderV3) HFile.createReader(fs, storeFilePath, cacheConf, conf);
247 } else {
248 reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf);
249 }
250 LOG.info("HFile information: " + reader);
251 HFileContext meta = new HFileContextBuilder().withCompression(compress)
252 .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
253 .withBlockSize(DATA_BLOCK_SIZE)
254 .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
255 .withIncludesTags(useTags).build();
256 final boolean cacheBlocks = false;
257 final boolean pread = false;
258 HFileScanner scanner = reader.getScanner(cacheBlocks, pread);
259 assertTrue(testDescription, scanner.seekTo());
260
261 long offset = 0;
262 HFileBlock prevBlock = null;
263 EnumMap<BlockType, Integer> blockCountByType =
264 new EnumMap<BlockType, Integer>(BlockType.class);
265
266 DataBlockEncoding encodingInCache = NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding();
267 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
268 long onDiskSize = -1;
269 if (prevBlock != null) {
270 onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
271 }
272
273
274 HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
275 false, true, null, encodingInCache);
276 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
277 offset);
278 HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true);
279 boolean isCached = fromCache != null;
280 boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
281 assertTrue("shouldBeCached: " + shouldBeCached+ "\n" +
282 "isCached: " + isCached + "\n" +
283 "Test description: " + testDescription + "\n" +
284 "block: " + block + "\n" +
285 "encodingInCache: " + encodingInCache + "\n" +
286 "blockCacheKey: " + blockCacheKey,
287 shouldBeCached == isCached);
288 if (isCached) {
289 if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) {
290 if (compress != Compression.Algorithm.NONE) {
291 assertFalse(fromCache.isUnpacked());
292 }
293 fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader());
294 } else {
295 assertTrue(fromCache.isUnpacked());
296 }
297
298 assertEquals(block.getChecksumType(), fromCache.getChecksumType());
299 assertEquals(block.getBlockType(), fromCache.getBlockType());
300 assertNotEquals(block.getBlockType(), BlockType.ENCODED_DATA);
301 assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader());
302 assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader());
303 assertEquals(
304 block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader());
305 }
306 prevBlock = block;
307 offset += block.getOnDiskSizeWithHeader();
308 BlockType bt = block.getBlockType();
309 Integer count = blockCountByType.get(bt);
310 blockCountByType.put(bt, (count == null ? 0 : count) + 1);
311 }
312
313 LOG.info("Block count by type: " + blockCountByType);
314 String countByType = blockCountByType.toString();
315 if (useTags) {
316 assertEquals("{" + BlockType.DATA
317 + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=34}", countByType);
318 } else {
319 assertEquals("{" + BlockType.DATA
320 + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}", countByType);
321 }
322
323
324 while (scanner.next()) {
325 scanner.getKeyValue();
326 }
327 reader.close();
328 }
329
330 public static KeyValue.Type generateKeyType(Random rand) {
331 if (rand.nextBoolean()) {
332
333 return KeyValue.Type.Put;
334 } else {
335 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
336 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
337 throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
338 + "Probably the layout of KeyValue.Type has changed.");
339 }
340 return keyType;
341 }
342 }
343
344 private void writeStoreFile(boolean useTags) throws IOException {
345 if(useTags) {
346 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
347 } else {
348 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
349 }
350 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
351 "test_cache_on_write");
352 HFileContext meta = new HFileContextBuilder().withCompression(compress)
353 .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
354 .withBlockSize(DATA_BLOCK_SIZE)
355 .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
356 .withIncludesTags(useTags).build();
357 StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf, cacheConf, fs)
358 .withOutputDir(storeFileParentDir).withComparator(KeyValue.COMPARATOR)
359 .withFileContext(meta)
360 .withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build();
361 byte[] cf = Bytes.toBytes("fam");
362 for (int i = 0; i < NUM_KV; ++i) {
363 byte[] row = TestHFileWriterV2.randomOrderedKey(rand, i);
364 byte[] qualifier = TestHFileWriterV2.randomRowOrQualifier(rand);
365 byte[] value = TestHFileWriterV2.randomValue(rand);
366 KeyValue kv;
367 if(useTags) {
368 Tag t = new Tag((byte) 1, "visibility");
369 List<Tag> tagList = new ArrayList<Tag>();
370 tagList.add(t);
371 Tag[] tags = new Tag[1];
372 tags[0] = t;
373 kv =
374 new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
375 rand.nextLong(), generateKeyType(rand), value, 0, value.length, tagList);
376 } else {
377 kv =
378 new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
379 rand.nextLong(), generateKeyType(rand), value, 0, value.length);
380 }
381 sfw.append(kv);
382 }
383
384 sfw.close();
385 storeFilePath = sfw.getPath();
386 }
387
388 private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)
389 throws IOException, InterruptedException {
390 if (useTags) {
391 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
392 } else {
393 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
394 }
395
396
397
398 final String table = "CompactionCacheOnWrite";
399 final String cf = "myCF";
400 final byte[] cfBytes = Bytes.toBytes(cf);
401 final int maxVersions = 3;
402 Region region = TEST_UTIL.createTestRegion(table,
403 new HColumnDescriptor(cf)
404 .setCompressionType(compress)
405 .setBloomFilterType(BLOOM_TYPE)
406 .setMaxVersions(maxVersions)
407 .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
408 );
409 int rowIdx = 0;
410 long ts = EnvironmentEdgeManager.currentTime();
411 for (int iFile = 0; iFile < 5; ++iFile) {
412 for (int iRow = 0; iRow < 500; ++iRow) {
413 String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" +
414 iRow;
415 Put p = new Put(Bytes.toBytes(rowStr));
416 ++rowIdx;
417 for (int iCol = 0; iCol < 10; ++iCol) {
418 String qualStr = "col" + iCol;
419 String valueStr = "value_" + rowStr + "_" + qualStr;
420 for (int iTS = 0; iTS < 5; ++iTS) {
421 if (useTags) {
422 Tag t = new Tag((byte) 1, "visibility");
423 Tag[] tags = new Tag[1];
424 tags[0] = t;
425 KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
426 HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
427 p.add(kv);
428 } else {
429 p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
430 }
431 }
432 }
433 p.setDurability(Durability.ASYNC_WAL);
434 region.put(p);
435 }
436 region.flush(true);
437 }
438 clearBlockCache(blockCache);
439 assertEquals(0, blockCache.getBlockCount());
440 region.compact(false);
441 LOG.debug("compactStores() returned");
442
443 for (CachedBlock block: blockCache) {
444 assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType());
445 assertNotEquals(BlockType.DATA, block.getBlockType());
446 }
447 ((HRegion)region).close();
448 }
449
450 @Test
451 public void testStoreFileCacheOnWrite() throws IOException {
452 testStoreFileCacheOnWriteInternals(false);
453 testStoreFileCacheOnWriteInternals(true);
454 }
455
456 @Test
457 public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
458 testNotCachingDataBlocksDuringCompactionInternals(false);
459 testNotCachingDataBlocksDuringCompactionInternals(true);
460 }
461 }