1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.io.hfile;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertTrue;
25
26 import java.io.IOException;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.EnumMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Random;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HColumnDescriptor;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.KeyValue;
43 import org.apache.hadoop.hbase.MediumTests;
44 import org.apache.hadoop.hbase.Tag;
45 import org.apache.hadoop.hbase.client.Put;
46 import org.apache.hadoop.hbase.fs.HFileSystem;
47 import org.apache.hadoop.hbase.io.compress.Compression;
48 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
49 import org.apache.hadoop.hbase.regionserver.BloomType;
50 import org.apache.hadoop.hbase.regionserver.HRegion;
51 import org.apache.hadoop.hbase.regionserver.StoreFile;
52 import org.apache.hadoop.hbase.util.BloomFilterFactory;
53 import org.apache.hadoop.hbase.util.Bytes;
54 import org.apache.hadoop.hbase.util.ChecksumType;
55 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
56 import org.junit.After;
57 import org.junit.Before;
58 import org.junit.Test;
59 import org.junit.experimental.categories.Category;
60 import org.junit.runner.RunWith;
61 import org.junit.runners.Parameterized;
62 import org.junit.runners.Parameterized.Parameters;
63
64
65
66
67
68 @RunWith(Parameterized.class)
69 @Category(MediumTests.class)
70 public class TestCacheOnWrite {
71
72 private static final Log LOG = LogFactory.getLog(TestCacheOnWrite.class);
73
74 private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
75 private Configuration conf;
76 private CacheConfig cacheConf;
77 private FileSystem fs;
78 private Random rand = new Random(12983177L);
79 private Path storeFilePath;
80 private BlockCache blockCache;
81 private String testDescription;
82
83 private final CacheOnWriteType cowType;
84 private final Compression.Algorithm compress;
85 private final BlockEncoderTestType encoderType;
86 private final HFileDataBlockEncoder encoder;
87
88 private static final int DATA_BLOCK_SIZE = 2048;
89 private static final int NUM_KV = 25000;
90 private static final int INDEX_BLOCK_SIZE = 512;
91 private static final int BLOOM_BLOCK_SIZE = 4096;
92 private static final BloomType BLOOM_TYPE = BloomType.ROWCOL;
93 private static final int CKBYTES = 512;
94
95
96 private static final int NUM_VALID_KEY_TYPES =
97 KeyValue.Type.values().length - 2;
98
99 private static enum CacheOnWriteType {
100 DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
101 BlockType.DATA, BlockType.ENCODED_DATA),
102 BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
103 BlockType.BLOOM_CHUNK),
104 INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
105 BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX);
106
107 private final String confKey;
108 private final BlockType blockType1;
109 private final BlockType blockType2;
110
111 private CacheOnWriteType(String confKey, BlockType blockType) {
112 this(confKey, blockType, blockType);
113 }
114
115 private CacheOnWriteType(String confKey, BlockType blockType1,
116 BlockType blockType2) {
117 this.blockType1 = blockType1;
118 this.blockType2 = blockType2;
119 this.confKey = confKey;
120 }
121
122 public boolean shouldBeCached(BlockType blockType) {
123 return blockType == blockType1 || blockType == blockType2;
124 }
125
126 public void modifyConf(Configuration conf) {
127 for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
128 conf.setBoolean(cowType.confKey, cowType == this);
129 }
130 }
131
132 }
133
134 private static final DataBlockEncoding ENCODING_ALGO =
135 DataBlockEncoding.PREFIX;
136
137
138 private static enum BlockEncoderTestType {
139 NO_BLOCK_ENCODING_NOOP(true, false),
140 NO_BLOCK_ENCODING(false, false),
141 BLOCK_ENCODING_EVERYWHERE(false, true);
142
143 private final boolean noop;
144 private final boolean encode;
145
146 BlockEncoderTestType(boolean noop, boolean encode) {
147 this.encode = encode;
148 this.noop = noop;
149 }
150
151 public HFileDataBlockEncoder getEncoder() {
152 return noop ? NoOpDataBlockEncoder.INSTANCE : new HFileDataBlockEncoderImpl(
153 encode ? ENCODING_ALGO : DataBlockEncoding.NONE);
154 }
155 }
156
157 public TestCacheOnWrite(CacheOnWriteType cowType,
158 Compression.Algorithm compress, BlockEncoderTestType encoderType) {
159 this.cowType = cowType;
160 this.compress = compress;
161 this.encoderType = encoderType;
162 this.encoder = encoderType.getEncoder();
163 testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress +
164 ", encoderType=" + encoderType + "]";
165 System.out.println(testDescription);
166 }
167
168 @Parameters
169 public static Collection<Object[]> getParameters() {
170 List<Object[]> cowTypes = new ArrayList<Object[]>();
171 for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
172 for (Compression.Algorithm compress :
173 HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
174 for (BlockEncoderTestType encoderType :
175 BlockEncoderTestType.values()) {
176 cowTypes.add(new Object[] { cowType, compress, encoderType });
177 }
178 }
179 }
180 return cowTypes;
181 }
182
183 @Before
184 public void setUp() throws IOException {
185 conf = TEST_UTIL.getConfiguration();
186 this.conf.set("dfs.datanode.data.dir.perm", "700");
187 conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
188 conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
189 conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
190 BLOOM_BLOCK_SIZE);
191 conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
192 cowType.shouldBeCached(BlockType.DATA));
193 conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
194 cowType.shouldBeCached(BlockType.LEAF_INDEX));
195 conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
196 cowType.shouldBeCached(BlockType.BLOOM_CHUNK));
197 cowType.modifyConf(conf);
198 fs = HFileSystem.get(conf);
199 cacheConf = new CacheConfig(conf);
200 blockCache = cacheConf.getBlockCache();
201 }
202
203 @After
204 public void tearDown() {
205 cacheConf = new CacheConfig(conf);
206 blockCache = cacheConf.getBlockCache();
207 }
208
209 @Test
210 public void testStoreFileCacheOnWrite() throws IOException {
211 testStoreFileCacheOnWriteInternals(false);
212 testStoreFileCacheOnWriteInternals(true);
213 }
214
215 protected void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException {
216 writeStoreFile(useTags);
217 readStoreFile(useTags);
218 }
219
220 private void readStoreFile(boolean useTags) throws IOException {
221 AbstractHFileReader reader;
222 if (useTags) {
223 reader = (HFileReaderV3) HFile.createReader(fs, storeFilePath, cacheConf, conf);
224 } else {
225 reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf);
226 }
227 LOG.info("HFile information: " + reader);
228 final boolean cacheBlocks = false;
229 final boolean pread = false;
230 HFileScanner scanner = reader.getScanner(cacheBlocks, pread);
231 assertTrue(testDescription, scanner.seekTo());
232
233 long offset = 0;
234 HFileBlock prevBlock = null;
235 EnumMap<BlockType, Integer> blockCountByType =
236 new EnumMap<BlockType, Integer>(BlockType.class);
237
238 DataBlockEncoding encodingInCache =
239 encoderType.getEncoder().getDataBlockEncoding();
240 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
241 long onDiskSize = -1;
242 if (prevBlock != null) {
243 onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
244 }
245
246
247 HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
248 false, true, null);
249 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
250 offset, encodingInCache, block.getBlockType());
251 boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null;
252 boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
253 if (shouldBeCached != isCached) {
254 throw new AssertionError(
255 "shouldBeCached: " + shouldBeCached+ "\n" +
256 "isCached: " + isCached + "\n" +
257 "Test description: " + testDescription + "\n" +
258 "block: " + block + "\n" +
259 "encodingInCache: " + encodingInCache + "\n" +
260 "blockCacheKey: " + blockCacheKey);
261 }
262 prevBlock = block;
263 offset += block.getOnDiskSizeWithHeader();
264 BlockType bt = block.getBlockType();
265 Integer count = blockCountByType.get(bt);
266 blockCountByType.put(bt, (count == null ? 0 : count) + 1);
267 }
268
269 LOG.info("Block count by type: " + blockCountByType);
270 String countByType = blockCountByType.toString();
271 BlockType cachedDataBlockType =
272 encoderType.encode ? BlockType.ENCODED_DATA : BlockType.DATA;
273 if (useTags) {
274 assertEquals("{" + cachedDataBlockType
275 + "=1550, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=20}", countByType);
276 } else {
277 assertEquals("{" + cachedDataBlockType
278 + "=1379, LEAF_INDEX=154, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=18}", countByType);
279 }
280 reader.close();
281 }
282
283 public static KeyValue.Type generateKeyType(Random rand) {
284 if (rand.nextBoolean()) {
285
286 return KeyValue.Type.Put;
287 } else {
288 KeyValue.Type keyType =
289 KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
290 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum)
291 {
292 throw new RuntimeException("Generated an invalid key type: " + keyType
293 + ". " + "Probably the layout of KeyValue.Type has changed.");
294 }
295 return keyType;
296 }
297 }
298
299 public void writeStoreFile(boolean useTags) throws IOException {
300 if(useTags) {
301 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
302 } else {
303 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
304 }
305 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
306 "test_cache_on_write");
307 HFileContext meta = new HFileContextBuilder().withCompression(compress)
308 .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
309 .withBlockSize(DATA_BLOCK_SIZE).withDataBlockEncoding(encoder.getDataBlockEncoding())
310 .withIncludesTags(useTags).build();
311 StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf, cacheConf, fs)
312 .withOutputDir(storeFileParentDir).withComparator(KeyValue.COMPARATOR)
313 .withFileContext(meta)
314 .withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build();
315
316 final int rowLen = 32;
317 for (int i = 0; i < NUM_KV; ++i) {
318 byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i);
319 byte[] v = TestHFileWriterV2.randomValue(rand);
320 int cfLen = rand.nextInt(k.length - rowLen + 1);
321 KeyValue kv;
322 if(useTags) {
323 Tag t = new Tag((byte) 1, "visibility");
324 List<Tag> tagList = new ArrayList<Tag>();
325 tagList.add(t);
326 Tag[] tags = new Tag[1];
327 tags[0] = t;
328 kv = new KeyValue(
329 k, 0, rowLen,
330 k, rowLen, cfLen,
331 k, rowLen + cfLen, k.length - rowLen - cfLen,
332 rand.nextLong(),
333 generateKeyType(rand),
334 v, 0, v.length, tagList);
335 } else {
336 kv = new KeyValue(
337 k, 0, rowLen,
338 k, rowLen, cfLen,
339 k, rowLen + cfLen, k.length - rowLen - cfLen,
340 rand.nextLong(),
341 generateKeyType(rand),
342 v, 0, v.length);
343 }
344 sfw.append(kv);
345 }
346
347 sfw.close();
348 storeFilePath = sfw.getPath();
349 }
350
351 @Test
352 public void testNotCachingDataBlocksDuringCompaction() throws IOException {
353 testNotCachingDataBlocksDuringCompactionInternals(false);
354 testNotCachingDataBlocksDuringCompactionInternals(true);
355 }
356
357 protected void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags) throws IOException {
358 if (useTags) {
359 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
360 } else {
361 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
362 }
363
364
365
366 final String table = "CompactionCacheOnWrite";
367 final String cf = "myCF";
368 final byte[] cfBytes = Bytes.toBytes(cf);
369 final int maxVersions = 3;
370 HRegion region = TEST_UTIL.createTestRegion(table,
371 new HColumnDescriptor(cf)
372 .setCompressionType(compress)
373 .setBloomFilterType(BLOOM_TYPE)
374 .setMaxVersions(maxVersions)
375 .setDataBlockEncoding(encoder.getDataBlockEncoding())
376 );
377 int rowIdx = 0;
378 long ts = EnvironmentEdgeManager.currentTimeMillis();
379 for (int iFile = 0; iFile < 5; ++iFile) {
380 for (int iRow = 0; iRow < 500; ++iRow) {
381 String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" +
382 iRow;
383 Put p = new Put(Bytes.toBytes(rowStr));
384 ++rowIdx;
385 for (int iCol = 0; iCol < 10; ++iCol) {
386 String qualStr = "col" + iCol;
387 String valueStr = "value_" + rowStr + "_" + qualStr;
388 for (int iTS = 0; iTS < 5; ++iTS) {
389 if (useTags) {
390 Tag t = new Tag((byte) 1, "visibility");
391 Tag[] tags = new Tag[1];
392 tags[0] = t;
393 KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
394 HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
395 p.add(kv);
396 } else {
397 p.add(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
398 }
399 }
400 }
401 region.put(p);
402 }
403 region.flushcache();
404 }
405 LruBlockCache blockCache =
406 (LruBlockCache) new CacheConfig(conf).getBlockCache();
407 blockCache.clearCache();
408 assertEquals(0, blockCache.getBlockTypeCountsForTest().size());
409 region.compactStores();
410 LOG.debug("compactStores() returned");
411
412 Map<BlockType, Integer> blockTypesInCache =
413 blockCache.getBlockTypeCountsForTest();
414 LOG.debug("Block types in cache: " + blockTypesInCache);
415 assertNull(blockTypesInCache.get(BlockType.ENCODED_DATA));
416 assertNull(blockTypesInCache.get(BlockType.DATA));
417 region.close();
418 blockCache.shutdown();
419 }
420 }
421