1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import static org.junit.Assert.assertArrayEquals;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
26
27 import java.io.IOException;
28 import java.nio.ByteBuffer;
29 import java.util.Arrays;
30 import java.util.HashSet;
31 import java.util.Random;
32 import java.util.concurrent.ConcurrentLinkedQueue;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.MultithreadedTestUtil;
38 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
39 import org.apache.hadoop.hbase.io.HeapSize;
40 import org.apache.hadoop.hbase.io.compress.Compression;
41 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
42 import org.apache.hadoop.hbase.util.ChecksumType;
43
44 public class CacheTestUtils {
45
46 private static final boolean includesMemstoreTS = true;
47
48
49
50
51
52
53 public static void testHeapSizeChanges(final BlockCache toBeTested,
54 final int blockSize) {
55 HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1);
56 long heapSize = ((HeapSize) toBeTested).heapSize();
57 toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block);
58
59
60 assertTrue(heapSize < ((HeapSize) toBeTested).heapSize());
61
62 toBeTested.evictBlock(blocks[0].blockName);
63
64
65 assertEquals(heapSize, ((HeapSize) toBeTested).heapSize());
66 }
67 public static void testCacheMultiThreaded(final BlockCache toBeTested,
68 final int blockSize, final int numThreads, final int numQueries,
69 final double passingScore) throws Exception {
70
71 Configuration conf = new Configuration();
72 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
73 conf);
74
75 final AtomicInteger totalQueries = new AtomicInteger();
76 final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<HFileBlockPair>();
77 final AtomicInteger hits = new AtomicInteger();
78 final AtomicInteger miss = new AtomicInteger();
79
80 HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize);
81 blocksToTest.addAll(Arrays.asList(blocks));
82
83 for (int i = 0; i < numThreads; i++) {
84 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
85 @Override
86 public void doAnAction() throws Exception {
87 if (!blocksToTest.isEmpty()) {
88 HFileBlockPair ourBlock = blocksToTest.poll();
89
90 if (ourBlock == null) {
91 ctx.setStopFlag(true);
92 return;
93 }
94 toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block);
95 Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName,
96 false, false, true);
97 if (retrievedBlock != null) {
98 assertEquals(ourBlock.block, retrievedBlock);
99 toBeTested.evictBlock(ourBlock.blockName);
100 hits.incrementAndGet();
101 assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true));
102 } else {
103 miss.incrementAndGet();
104 }
105 totalQueries.incrementAndGet();
106 }
107 }
108 };
109 t.setDaemon(true);
110 ctx.addThread(t);
111 }
112 ctx.startThreads();
113 while (!blocksToTest.isEmpty() && ctx.shouldRun()) {
114 Thread.sleep(10);
115 }
116 ctx.stop();
117 if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) {
118 fail("Too many nulls returned. Hits: " + hits.get() + " Misses: "
119 + miss.get());
120 }
121 }
122
123 public static void testCacheSimple(BlockCache toBeTested, int blockSize,
124 int numBlocks) throws Exception {
125
126 HFileBlockPair[] blocks = generateHFileBlocks(numBlocks, blockSize);
127
128 for (HFileBlockPair block : blocks) {
129 assertNull(toBeTested.getBlock(block.blockName, true, false, true));
130 }
131
132
133 for (HFileBlockPair block : blocks) {
134 toBeTested.cacheBlock(block.blockName, block.block);
135 }
136
137
138
139
140
141 for (HFileBlockPair block : blocks) {
142 HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true);
143 if (buf != null) {
144 assertEquals(block.block, buf);
145 }
146
147 }
148
149
150
151 for (HFileBlockPair block : blocks) {
152 try {
153 if (toBeTested.getBlock(block.blockName, true, false, true) != null) {
154 toBeTested.cacheBlock(block.blockName, block.block);
155 if (!(toBeTested instanceof BucketCache)) {
156
157
158 fail("Cache should not allow re-caching a block");
159 }
160 }
161 } catch (RuntimeException re) {
162
163 }
164 }
165
166 }
167
168 public static void hammerSingleKey(final BlockCache toBeTested,
169 int BlockSize, int numThreads, int numQueries) throws Exception {
170 final BlockCacheKey key = new BlockCacheKey("key", 0);
171 final byte[] buf = new byte[5 * 1024];
172 Arrays.fill(buf, (byte) 5);
173
174 final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
175 Configuration conf = new Configuration();
176 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
177 conf);
178
179 final AtomicInteger totalQueries = new AtomicInteger();
180 toBeTested.cacheBlock(key, bac);
181
182 for (int i = 0; i < numThreads; i++) {
183 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
184 @Override
185 public void doAnAction() throws Exception {
186 ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested
187 .getBlock(key, false, false, true);
188 assertArrayEquals(buf, returned.buf);
189 totalQueries.incrementAndGet();
190 }
191 };
192
193 t.setDaemon(true);
194 ctx.addThread(t);
195 }
196
197 ctx.startThreads();
198 while (totalQueries.get() < numQueries && ctx.shouldRun()) {
199 Thread.sleep(10);
200 }
201 ctx.stop();
202 }
203
204 public static void hammerEviction(final BlockCache toBeTested, int BlockSize,
205 int numThreads, int numQueries) throws Exception {
206
207 Configuration conf = new Configuration();
208 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
209 conf);
210
211 final AtomicInteger totalQueries = new AtomicInteger();
212
213 for (int i = 0; i < numThreads; i++) {
214 final int finalI = i;
215
216 final byte[] buf = new byte[5 * 1024];
217 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
218 @Override
219 public void doAnAction() throws Exception {
220 for (int j = 0; j < 100; j++) {
221 BlockCacheKey key = new BlockCacheKey("key_" + finalI + "_" + j, 0);
222 Arrays.fill(buf, (byte) (finalI * j));
223 final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
224
225 ByteArrayCacheable gotBack = (ByteArrayCacheable) toBeTested
226 .getBlock(key, true, false, true);
227 if (gotBack != null) {
228 assertArrayEquals(gotBack.buf, bac.buf);
229 } else {
230 toBeTested.cacheBlock(key, bac);
231 }
232 }
233 totalQueries.incrementAndGet();
234 }
235 };
236
237 t.setDaemon(true);
238 ctx.addThread(t);
239 }
240
241 ctx.startThreads();
242 while (totalQueries.get() < numQueries && ctx.shouldRun()) {
243 Thread.sleep(10);
244 }
245 ctx.stop();
246
247 assertTrue(toBeTested.getStats().getEvictedCount() > 0);
248 }
249
250 private static class ByteArrayCacheable implements Cacheable {
251
252 static final CacheableDeserializer<Cacheable> blockDeserializer =
253 new CacheableDeserializer<Cacheable>() {
254
255 @Override
256 public Cacheable deserialize(ByteBuffer b) throws IOException {
257 int len = b.getInt();
258 Thread.yield();
259 byte buf[] = new byte[len];
260 b.get(buf);
261 return new ByteArrayCacheable(buf);
262 }
263
264 @Override
265 public int getDeserialiserIdentifier() {
266 return deserializerIdentifier;
267 }
268
269 @Override
270 public Cacheable deserialize(ByteBuffer b, boolean reuse)
271 throws IOException {
272 return deserialize(b);
273 }
274 };
275
276 final byte[] buf;
277
278 public ByteArrayCacheable(byte[] buf) {
279 this.buf = buf;
280 }
281
282 @Override
283 public long heapSize() {
284 return 4 + buf.length;
285 }
286
287 @Override
288 public int getSerializedLength() {
289 return 4 + buf.length;
290 }
291
292 @Override
293 public void serialize(ByteBuffer destination) {
294 destination.putInt(buf.length);
295 Thread.yield();
296 destination.put(buf);
297 destination.rewind();
298 }
299
300 @Override
301 public CacheableDeserializer<Cacheable> getDeserializer() {
302 return blockDeserializer;
303 }
304
305 private static final int deserializerIdentifier;
306 static {
307 deserializerIdentifier = CacheableDeserializerIdManager
308 .registerDeserializer(blockDeserializer);
309 }
310
311 @Override
312 public BlockType getBlockType() {
313 return BlockType.DATA;
314 }
315 }
316
317
318 private static HFileBlockPair[] generateHFileBlocks(int blockSize,
319 int numBlocks) {
320 HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
321 Random rand = new Random();
322 HashSet<String> usedStrings = new HashSet<String>();
323 for (int i = 0; i < numBlocks; i++) {
324
325
326
327
328
329 ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize
330 - HFileBlock.EXTRA_SERIALIZATION_SPACE);
331 rand.nextBytes(cachedBuffer.array());
332 cachedBuffer.rewind();
333 int onDiskSizeWithoutHeader = blockSize
334 - HFileBlock.EXTRA_SERIALIZATION_SPACE;
335 int uncompressedSizeWithoutHeader = blockSize
336 - HFileBlock.EXTRA_SERIALIZATION_SPACE;
337 long prevBlockOffset = rand.nextLong();
338 BlockType.DATA.write(cachedBuffer);
339 cachedBuffer.putInt(onDiskSizeWithoutHeader);
340 cachedBuffer.putInt(uncompressedSizeWithoutHeader);
341 cachedBuffer.putLong(prevBlockOffset);
342 cachedBuffer.rewind();
343 HFileContext meta = new HFileContextBuilder()
344 .withHBaseCheckSum(false)
345 .withIncludesMvcc(includesMemstoreTS)
346 .withIncludesTags(false)
347 .withCompression(Compression.Algorithm.NONE)
348 .withBytesPerCheckSum(0)
349 .withChecksumType(ChecksumType.NULL)
350 .build();
351 HFileBlock generated = new HFileBlock(BlockType.DATA,
352 onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
353 prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
354 blockSize,
355 onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, meta);
356
357 String strKey;
358
359 for (strKey = new Long(rand.nextLong()).toString(); !usedStrings
360 .add(strKey); strKey = new Long(rand.nextLong()).toString())
361 ;
362
363 returnedBlocks[i] = new HFileBlockPair();
364 returnedBlocks[i].blockName = new BlockCacheKey(strKey, 0);
365 returnedBlocks[i].block = generated;
366 }
367 return returnedBlocks;
368 }
369
370 private static class HFileBlockPair {
371 BlockCacheKey blockName;
372 HFileBlock block;
373 }
374 }