View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.io.hfile;
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertNull;
24  import static org.junit.Assert.assertTrue;
25  
26  import java.io.IOException;
27  import java.util.ArrayList;
28  import java.util.Collection;
29  import java.util.EnumMap;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Random;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.HBaseTestingUtility;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.KeyValue;
43  import org.apache.hadoop.hbase.MediumTests;
44  import org.apache.hadoop.hbase.Tag;
45  import org.apache.hadoop.hbase.client.Put;
46  import org.apache.hadoop.hbase.fs.HFileSystem;
47  import org.apache.hadoop.hbase.io.compress.Compression;
48  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
49  import org.apache.hadoop.hbase.regionserver.BloomType;
50  import org.apache.hadoop.hbase.regionserver.HRegion;
51  import org.apache.hadoop.hbase.regionserver.StoreFile;
52  import org.apache.hadoop.hbase.util.BloomFilterFactory;
53  import org.apache.hadoop.hbase.util.Bytes;
54  import org.apache.hadoop.hbase.util.ChecksumType;
55  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
56  import org.junit.After;
57  import org.junit.Before;
58  import org.junit.Test;
59  import org.junit.experimental.categories.Category;
60  import org.junit.runner.RunWith;
61  import org.junit.runners.Parameterized;
62  import org.junit.runners.Parameterized.Parameters;
63  
64  /**
65   * Tests {@link HFile} cache-on-write functionality for the following block
66   * types: data blocks, non-root index blocks, and Bloom filter blocks.
67   */
68  @RunWith(Parameterized.class)
69  @Category(MediumTests.class)
70  public class TestCacheOnWrite {
71  
72    private static final Log LOG = LogFactory.getLog(TestCacheOnWrite.class);
73  
74    private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
75    private Configuration conf;
76    private CacheConfig cacheConf;
77    private FileSystem fs;
78    private Random rand = new Random(12983177L);
79    private Path storeFilePath;
80    private BlockCache blockCache;
81    private String testDescription;
82  
83    private final CacheOnWriteType cowType;
84    private final Compression.Algorithm compress;
85    private final BlockEncoderTestType encoderType;
86    private final HFileDataBlockEncoder encoder;
87  
88    private static final int DATA_BLOCK_SIZE = 2048;
89    private static final int NUM_KV = 25000;
90    private static final int INDEX_BLOCK_SIZE = 512;
91    private static final int BLOOM_BLOCK_SIZE = 4096;
92    private static final BloomType BLOOM_TYPE = BloomType.ROWCOL;
93    private static final int CKBYTES = 512;
94  
95    /** The number of valid key types possible in a store file */
96    private static final int NUM_VALID_KEY_TYPES =
97        KeyValue.Type.values().length - 2;
98  
99    private static enum CacheOnWriteType {
100     DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
101         BlockType.DATA, BlockType.ENCODED_DATA),
102     BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
103         BlockType.BLOOM_CHUNK),
104     INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
105         BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX);
106 
107     private final String confKey;
108     private final BlockType blockType1;
109     private final BlockType blockType2;
110 
111     private CacheOnWriteType(String confKey, BlockType blockType) {
112       this(confKey, blockType, blockType);
113     }
114 
115     private CacheOnWriteType(String confKey, BlockType blockType1,
116         BlockType blockType2) {
117       this.blockType1 = blockType1;
118       this.blockType2 = blockType2;
119       this.confKey = confKey;
120     }
121 
122     public boolean shouldBeCached(BlockType blockType) {
123       return blockType == blockType1 || blockType == blockType2;
124     }
125 
126     public void modifyConf(Configuration conf) {
127       for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
128         conf.setBoolean(cowType.confKey, cowType == this);
129       }
130     }
131 
132   }
133 
134   private static final DataBlockEncoding ENCODING_ALGO =
135       DataBlockEncoding.PREFIX;
136 
137   /** Provides fancy names for three combinations of two booleans */
138   private static enum BlockEncoderTestType {
139     NO_BLOCK_ENCODING_NOOP(true, false),
140     NO_BLOCK_ENCODING(false, false),
141     BLOCK_ENCODING_EVERYWHERE(false, true);
142 
143     private final boolean noop;
144     private final boolean encode;
145 
146     BlockEncoderTestType(boolean noop, boolean encode) {
147       this.encode = encode;
148       this.noop = noop;
149     }
150 
151     public HFileDataBlockEncoder getEncoder() {
152       return noop ? NoOpDataBlockEncoder.INSTANCE : new HFileDataBlockEncoderImpl(
153         encode ? ENCODING_ALGO : DataBlockEncoding.NONE);
154     }
155   }
156 
157   public TestCacheOnWrite(CacheOnWriteType cowType,
158       Compression.Algorithm compress, BlockEncoderTestType encoderType) {
159     this.cowType = cowType;
160     this.compress = compress;
161     this.encoderType = encoderType;
162     this.encoder = encoderType.getEncoder();
163     testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + 
164         ", encoderType=" + encoderType + "]";
165     System.out.println(testDescription);
166   }
167 
168   @Parameters
169   public static Collection<Object[]> getParameters() {
170     List<Object[]> cowTypes = new ArrayList<Object[]>();
171     for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
172       for (Compression.Algorithm compress :
173            HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
174         for (BlockEncoderTestType encoderType :
175              BlockEncoderTestType.values()) {
176           cowTypes.add(new Object[] { cowType, compress, encoderType });
177         }
178       }
179     }
180     return cowTypes;
181   }
182 
183   @Before
184   public void setUp() throws IOException {
185     conf = TEST_UTIL.getConfiguration();
186     this.conf.set("dfs.datanode.data.dir.perm", "700");
187     conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
188     conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
189     conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
190         BLOOM_BLOCK_SIZE);
191     conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
192         cowType.shouldBeCached(BlockType.DATA));
193     conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
194         cowType.shouldBeCached(BlockType.LEAF_INDEX));
195     conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
196         cowType.shouldBeCached(BlockType.BLOOM_CHUNK));
197     cowType.modifyConf(conf);
198     fs = HFileSystem.get(conf);
199     cacheConf = new CacheConfig(conf);
200     blockCache = cacheConf.getBlockCache();
201   }
202 
203   @After
204   public void tearDown() {
205     cacheConf = new CacheConfig(conf);
206     blockCache = cacheConf.getBlockCache();
207   }
208 
209   @Test
210   public void testStoreFileCacheOnWrite() throws IOException {
211     testStoreFileCacheOnWriteInternals(false);
212     testStoreFileCacheOnWriteInternals(true);
213   }
214 
215   protected void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException {
216     writeStoreFile(useTags);
217     readStoreFile(useTags);
218   }
219 
220   private void readStoreFile(boolean useTags) throws IOException {
221     AbstractHFileReader reader;
222     if (useTags) {
223         reader = (HFileReaderV3) HFile.createReader(fs, storeFilePath, cacheConf, conf);
224     } else {
225         reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf);
226     }
227     LOG.info("HFile information: " + reader);
228     final boolean cacheBlocks = false;
229     final boolean pread = false;
230     HFileScanner scanner = reader.getScanner(cacheBlocks, pread);
231     assertTrue(testDescription, scanner.seekTo());
232 
233     long offset = 0;
234     HFileBlock prevBlock = null;
235     EnumMap<BlockType, Integer> blockCountByType =
236         new EnumMap<BlockType, Integer>(BlockType.class);
237 
238     DataBlockEncoding encodingInCache =
239         encoderType.getEncoder().getDataBlockEncoding();
240     while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
241       long onDiskSize = -1;
242       if (prevBlock != null) {
243          onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
244       }
245       // Flags: don't cache the block, use pread, this is not a compaction.
246       // Also, pass null for expected block type to avoid checking it.
247       HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
248           false, true, null);
249       BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
250           offset, encodingInCache, block.getBlockType());
251       boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null;
252       boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
253       if (shouldBeCached != isCached) {
254         throw new AssertionError(
255             "shouldBeCached: " + shouldBeCached+ "\n" +
256             "isCached: " + isCached + "\n" +
257             "Test description: " + testDescription + "\n" +
258             "block: " + block + "\n" +
259             "encodingInCache: " + encodingInCache + "\n" +
260             "blockCacheKey: " + blockCacheKey);
261       }
262       prevBlock = block;
263       offset += block.getOnDiskSizeWithHeader();
264       BlockType bt = block.getBlockType();
265       Integer count = blockCountByType.get(bt);
266       blockCountByType.put(bt, (count == null ? 0 : count) + 1);
267     }
268 
269     LOG.info("Block count by type: " + blockCountByType);
270     String countByType = blockCountByType.toString();
271     BlockType cachedDataBlockType =
272         encoderType.encode ? BlockType.ENCODED_DATA : BlockType.DATA;
273     if (useTags) {
274       assertEquals("{" + cachedDataBlockType
275           + "=1550, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=20}", countByType);
276     } else {
277       assertEquals("{" + cachedDataBlockType
278           + "=1379, LEAF_INDEX=154, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=18}", countByType);
279     }
280     reader.close();
281   }
282 
283   public static KeyValue.Type generateKeyType(Random rand) {
284     if (rand.nextBoolean()) {
285       // Let's make half of KVs puts.
286       return KeyValue.Type.Put;
287     } else {
288       KeyValue.Type keyType =
289           KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
290       if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum)
291       {
292         throw new RuntimeException("Generated an invalid key type: " + keyType
293             + ". " + "Probably the layout of KeyValue.Type has changed.");
294       }
295       return keyType;
296     }
297   }
298 
299   public void writeStoreFile(boolean useTags) throws IOException {
300     if(useTags) {
301       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
302     } else {
303       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
304     }
305     Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
306         "test_cache_on_write");
307     HFileContext meta = new HFileContextBuilder().withCompression(compress)
308         .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
309         .withBlockSize(DATA_BLOCK_SIZE).withDataBlockEncoding(encoder.getDataBlockEncoding())
310         .withIncludesTags(useTags).build();
311     StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf, cacheConf, fs)
312         .withOutputDir(storeFileParentDir).withComparator(KeyValue.COMPARATOR)
313         .withFileContext(meta)
314         .withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build();
315 
316     final int rowLen = 32;
317     for (int i = 0; i < NUM_KV; ++i) {
318       byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i);
319       byte[] v = TestHFileWriterV2.randomValue(rand);
320       int cfLen = rand.nextInt(k.length - rowLen + 1);
321       KeyValue kv;
322       if(useTags) {
323         Tag t = new Tag((byte) 1, "visibility");
324         List<Tag> tagList = new ArrayList<Tag>();
325         tagList.add(t);
326         Tag[] tags = new Tag[1];
327         tags[0] = t;
328         kv = new KeyValue(
329             k, 0, rowLen,
330             k, rowLen, cfLen,
331             k, rowLen + cfLen, k.length - rowLen - cfLen,
332             rand.nextLong(),
333             generateKeyType(rand),
334             v, 0, v.length, tagList);
335       } else {
336         kv = new KeyValue(
337           k, 0, rowLen,
338           k, rowLen, cfLen,
339           k, rowLen + cfLen, k.length - rowLen - cfLen,
340           rand.nextLong(),
341           generateKeyType(rand),
342           v, 0, v.length);
343       }
344       sfw.append(kv);
345     }
346 
347     sfw.close();
348     storeFilePath = sfw.getPath();
349   }
350 
351   @Test
352   public void testNotCachingDataBlocksDuringCompaction() throws IOException {
353     testNotCachingDataBlocksDuringCompactionInternals(false);
354     testNotCachingDataBlocksDuringCompactionInternals(true);
355   }
356 
357   protected void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags) throws IOException {
358     if (useTags) {
359       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
360     } else {
361       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
362     }
363     // TODO: need to change this test if we add a cache size threshold for
364     // compactions, or if we implement some other kind of intelligent logic for
365     // deciding what blocks to cache-on-write on compaction.
366     final String table = "CompactionCacheOnWrite";
367     final String cf = "myCF";
368     final byte[] cfBytes = Bytes.toBytes(cf);
369     final int maxVersions = 3;
370     HRegion region = TEST_UTIL.createTestRegion(table, 
371         new HColumnDescriptor(cf)
372             .setCompressionType(compress)
373             .setBloomFilterType(BLOOM_TYPE)
374             .setMaxVersions(maxVersions)
375             .setDataBlockEncoding(encoder.getDataBlockEncoding())
376     );
377     int rowIdx = 0;
378     long ts = EnvironmentEdgeManager.currentTimeMillis();
379     for (int iFile = 0; iFile < 5; ++iFile) {
380       for (int iRow = 0; iRow < 500; ++iRow) {
381         String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + 
382             iRow;
383         Put p = new Put(Bytes.toBytes(rowStr));
384         ++rowIdx;
385         for (int iCol = 0; iCol < 10; ++iCol) {
386           String qualStr = "col" + iCol;
387           String valueStr = "value_" + rowStr + "_" + qualStr;
388           for (int iTS = 0; iTS < 5; ++iTS) {
389             if (useTags) {
390               Tag t = new Tag((byte) 1, "visibility");
391               Tag[] tags = new Tag[1];
392               tags[0] = t;
393               KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
394                   HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
395               p.add(kv);
396             } else {
397               p.add(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
398             }
399           }
400         }
401         region.put(p);
402       }
403       region.flushcache();
404     }
405     LruBlockCache blockCache =
406         (LruBlockCache) new CacheConfig(conf).getBlockCache();
407     blockCache.clearCache();
408     assertEquals(0, blockCache.getBlockTypeCountsForTest().size());
409     region.compactStores();
410     LOG.debug("compactStores() returned");
411 
412     Map<BlockType, Integer> blockTypesInCache =
413         blockCache.getBlockTypeCountsForTest();
414     LOG.debug("Block types in cache: " + blockTypesInCache);
415     assertNull(blockTypesInCache.get(BlockType.ENCODED_DATA));
416     assertNull(blockTypesInCache.get(BlockType.DATA));
417     region.close();
418     blockCache.shutdown();
419   }
420 }
421