View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.lang.ref.WeakReference;
22  import java.nio.ByteBuffer;
23  import java.util.EnumMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.PriorityQueue;
28  import java.util.SortedSet;
29  import java.util.TreeSet;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.locks.ReentrantLock;
36  
37  import com.google.common.base.Objects;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.io.HeapSize;
43  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
44  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.ClassSize;
47  import org.apache.hadoop.hbase.util.HasThread;
48  import org.apache.hadoop.util.StringUtils;
49  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
50  
51  import com.google.common.annotations.VisibleForTesting;
52  import com.google.common.util.concurrent.ThreadFactoryBuilder;
53  
54  /**
55   * A block cache implementation that is memory-aware using {@link HeapSize},
56   * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
57   * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
58   * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
59   *
60   * Contains three levels of block priority to allow for
61   * scan-resistance and in-memory families 
62   * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An
63   * in-memory column family is a column family that should be served from memory if possible):
64   * single-access, multiple-accesses, and in-memory priority.
65   * A block is added with an in-memory priority flag if
66   * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, 
67   * otherwise a block becomes a single access
68   * priority the first time it is read into this block cache.  If a block is accessed again while
69   * in cache, it is marked as a multiple access priority block.  This delineation of blocks is used
70   * to prevent scans from thrashing the cache adding a least-frequently-used
71   * element to the eviction algorithm.<p>
72   *
73   * Each priority is given its own chunk of the total cache to ensure
74   * fairness during eviction.  Each priority will retain close to its maximum
75   * size, however, if any priority is not using its entire chunk the others
76   * are able to grow beyond their chunk size.<p>
77   *
78   * Instantiated at a minimum with the total size and average block size.
79   * All sizes are in bytes.  The block size is not especially important as this
80   * cache is fully dynamic in its sizing of blocks.  It is only used for
81   * pre-allocating data structures and in initial heap estimation of the map.<p>
82   *
83   * The detailed constructor defines the sizes for the three priorities (they
84   * should total to the <code>maximum size</code> defined).  It also sets the levels that
85   * trigger and control the eviction thread.<p>
86   *
87   * The <code>acceptable size</code> is the cache size level which triggers the eviction
88   * process to start.  It evicts enough blocks to get the size below the
89   * minimum size specified.<p>
90   *
91   * Eviction happens in a separate thread and involves a single full-scan
92   * of the map.  It determines how many bytes must be freed to reach the minimum
93   * size, and then while scanning determines the fewest least-recently-used
94   * blocks necessary from each of the three priorities (would be 3 times bytes
95   * to free).  It then uses the priority chunk sizes to evict fairly according
96   * to the relative sizes and usage.
97   */
98  @InterfaceAudience.Private
99  @JsonIgnoreProperties({"encodingCountsForTest"})
100 public class LruBlockCache implements ResizableBlockCache, HeapSize {
101 
102   private static final Log LOG = LogFactory.getLog(LruBlockCache.class);
103 
104   /**
105    * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
106    * evicting during an eviction run till the cache size is down to 80% of the total.
107    */
108   static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
109 
110   /**
111    * Acceptable size of cache (no evictions if size < acceptable)
112    */
113   static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
114 
115   static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
116   static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
117   static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";
118 
119   /**
120    * Configuration key to force data-block always (except in-memory are too much)
121    * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
122    * configuration, inMemoryForceMode is a cluster-wide configuration
123    */
124   static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";
125 
126   /** Default Configuration Parameters*/
127 
128   /** Backing Concurrent Map Configuration */
129   static final float DEFAULT_LOAD_FACTOR = 0.75f;
130   static final int DEFAULT_CONCURRENCY_LEVEL = 16;
131 
132   /** Eviction thresholds */
133   static final float DEFAULT_MIN_FACTOR = 0.95f;
134   static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
135 
136   /** Priority buckets */
137   static final float DEFAULT_SINGLE_FACTOR = 0.25f;
138   static final float DEFAULT_MULTI_FACTOR = 0.50f;
139   static final float DEFAULT_MEMORY_FACTOR = 0.25f;
140 
141   static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
142 
143   /** Statistics thread */
144   static final int statThreadPeriod = 60 * 5;
145   private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
146   private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
147 
148   /** Concurrent map (the cache) */
149   private final Map<BlockCacheKey,LruCachedBlock> map;
150 
151   /** Eviction lock (locked when eviction in process) */
152   private final ReentrantLock evictionLock = new ReentrantLock(true);
153   private final long maxBlockSize;
154 
155   /** Volatile boolean to track if we are in an eviction process or not */
156   private volatile boolean evictionInProgress = false;
157 
158   /** Eviction thread */
159   private final EvictionThread evictionThread;
160 
161   /** Statistics thread schedule pool (for heavy debugging, could remove) */
162   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
163     new ThreadFactoryBuilder().setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
164 
165   /** Current size of cache */
166   private final AtomicLong size;
167 
168   /** Current number of cached elements */
169   private final AtomicLong elements;
170 
171   /** Cache access count (sequential ID) */
172   private final AtomicLong count;
173 
174   /** Cache statistics */
175   private final CacheStats stats;
176 
177   /** Maximum allowable size of cache (block put if size > max, evict) */
178   private long maxSize;
179 
180   /** Approximate block size */
181   private long blockSize;
182 
183   /** Acceptable size of cache (no evictions if size < acceptable) */
184   private float acceptableFactor;
185 
186   /** Minimum threshold of cache (when evicting, evict until size < min) */
187   private float minFactor;
188 
189   /** Single access bucket size */
190   private float singleFactor;
191 
192   /** Multiple access bucket size */
193   private float multiFactor;
194 
195   /** In-memory bucket size */
196   private float memoryFactor;
197 
198   /** Overhead of the structure itself */
199   private long overhead;
200 
201   /** Whether in-memory hfile's data block has higher priority when evicting */
202   private boolean forceInMemory;
203 
204   /** Where to send victims (blocks evicted/missing from the cache) */
205   private BlockCache victimHandler = null;
206 
207   /**
208    * Default constructor.  Specify maximum size and expected average block
209    * size (approximation is fine).
210    *
211    * <p>All other factors will be calculated based on defaults specified in
212    * this class.
213    * @param maxSize maximum size of cache, in bytes
214    * @param blockSize approximate size of each block, in bytes
215    */
216   public LruBlockCache(long maxSize, long blockSize) {
217     this(maxSize, blockSize, true);
218   }
219 
220   /**
221    * Constructor used for testing.  Allows disabling of the eviction thread.
222    */
223   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
224     this(maxSize, blockSize, evictionThread,
225         (int)Math.ceil(1.2*maxSize/blockSize),
226         DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
227         DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
228         DEFAULT_SINGLE_FACTOR,
229         DEFAULT_MULTI_FACTOR,
230         DEFAULT_MEMORY_FACTOR,
231         false,
232         DEFAULT_MAX_BLOCK_SIZE
233         );
234   }
235 
236   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
237     this(maxSize, blockSize, evictionThread,
238         (int)Math.ceil(1.2*maxSize/blockSize),
239         DEFAULT_LOAD_FACTOR,
240         DEFAULT_CONCURRENCY_LEVEL,
241         conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
242         conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
243         conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
244         conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
245         conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
246         conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
247         conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)
248         );
249   }
250 
251   public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
252     this(maxSize, blockSize, true, conf);
253   }
254 
255   /**
256    * Configurable constructor.  Use this constructor if not using defaults.
257    * @param maxSize maximum size of this cache, in bytes
258    * @param blockSize expected average size of blocks, in bytes
259    * @param evictionThread whether to run evictions in a bg thread or not
260    * @param mapInitialSize initial size of backing ConcurrentHashMap
261    * @param mapLoadFactor initial load factor of backing ConcurrentHashMap
262    * @param mapConcurrencyLevel initial concurrency factor for backing CHM
263    * @param minFactor percentage of total size that eviction will evict until
264    * @param acceptableFactor percentage of total size that triggers eviction
265    * @param singleFactor percentage of total size for single-access blocks
266    * @param multiFactor percentage of total size for multiple-access blocks
267    * @param memoryFactor percentage of total size for in-memory blocks
268    */
269   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
270       int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
271       float minFactor, float acceptableFactor, float singleFactor,
272       float multiFactor, float memoryFactor, boolean forceInMemory, long maxBlockSize) {
273     this.maxBlockSize = maxBlockSize;
274     if(singleFactor + multiFactor + memoryFactor != 1 ||
275         singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
276       throw new IllegalArgumentException("Single, multi, and memory factors " +
277           " should be non-negative and total 1.0");
278     }
279     if(minFactor >= acceptableFactor) {
280       throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
281     }
282     if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
283       throw new IllegalArgumentException("all factors must be < 1");
284     }
285     this.maxSize = maxSize;
286     this.blockSize = blockSize;
287     this.forceInMemory = forceInMemory;
288     map = new ConcurrentHashMap<BlockCacheKey,LruCachedBlock>(mapInitialSize,
289         mapLoadFactor, mapConcurrencyLevel);
290     this.minFactor = minFactor;
291     this.acceptableFactor = acceptableFactor;
292     this.singleFactor = singleFactor;
293     this.multiFactor = multiFactor;
294     this.memoryFactor = memoryFactor;
295     this.stats = new CacheStats(this.getClass().getSimpleName());
296     this.count = new AtomicLong(0);
297     this.elements = new AtomicLong(0);
298     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
299     this.size = new AtomicLong(this.overhead);
300     if(evictionThread) {
301       this.evictionThread = new EvictionThread(this);
302       this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
303     } else {
304       this.evictionThread = null;
305     }
306     // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
307     // every five minutes.
308     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
309         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
310   }
311 
312   @Override
313   public void setMaxSize(long maxSize) {
314     this.maxSize = maxSize;
315     if(this.size.get() > acceptableSize() && !evictionInProgress) {
316       runEviction();
317     }
318   }
319 
320   // BlockCache implementation
321 
322   /**
323    * Cache the block with the specified name and buffer.
324    * <p>
325    * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
326    * this can happen, for which we compare the buffer contents.
327    * @param cacheKey block's cache key
328    * @param buf block buffer
329    * @param inMemory if block is in-memory
330    * @param cacheDataInL1
331    */
332   @Override
333   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
334       final boolean cacheDataInL1) {
335 
336     if (buf.heapSize() > maxBlockSize) {
337       // If there are a lot of blocks that are too
338       // big this can make the logs way too noisy.
339       // So we log 2%
340       if (stats.failInsert() % 50 == 0) {
341         LOG.warn("Trying to cache too large a block "
342             + cacheKey.getHfileName() + " @ "
343             + cacheKey.getOffset()
344             + " is " + buf.heapSize()
345             + " which is larger than " + maxBlockSize);
346       }
347       return;
348     }
349 
350     LruCachedBlock cb = map.get(cacheKey);
351     if (cb != null) {
352       // compare the contents, if they are not equal, we are in big trouble
353       if (compare(buf, cb.getBuffer()) != 0) {
354         throw new RuntimeException("Cached block contents differ, which should not have happened."
355           + "cacheKey:" + cacheKey);
356       }
357       String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
358       msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
359       LOG.warn(msg);
360       return;
361     }
362     cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
363     long newSize = updateSizeMetrics(cb, false);
364     map.put(cacheKey, cb);
365     long val = elements.incrementAndGet();
366     if (LOG.isTraceEnabled()) {
367       long size = map.size();
368       assertCounterSanity(size, val);
369     }
370     if (newSize > acceptableSize() && !evictionInProgress) {
371       runEviction();
372     }
373   }
374 
375   /**
376    * Sanity-checking for parity between actual block cache content and metrics.
377    * Intended only for use with TRACE level logging and -ea JVM.
378    */
379   private static void assertCounterSanity(long mapSize, long counterVal) {
380     if (counterVal < 0) {
381       LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
382         ", mapSize=" + mapSize);
383       return;
384     }
385     if (mapSize < Integer.MAX_VALUE) {
386       double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
387       if (pct_diff > 0.05) {
388         LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
389           ", mapSize=" + mapSize);
390       }
391     }
392   }
393 
394   private int compare(Cacheable left, Cacheable right) {
395     ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
396     left.serialize(l);
397     ByteBuffer r = ByteBuffer.allocate(right.getSerializedLength());
398     right.serialize(r);
399     return Bytes.compareTo(l.array(), l.arrayOffset(), l.limit(),
400       r.array(), r.arrayOffset(), r.limit());
401   }
402 
403   /**
404    * Cache the block with the specified name and buffer.
405    * <p>
406    * @param cacheKey block's cache key
407    * @param buf block buffer
408    */
409   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
410     cacheBlock(cacheKey, buf, false, false);
411   }
412 
413   /**
414    * Helper function that updates the local size counter and also updates any
415    * per-cf or per-blocktype metrics it can discern from given
416    * {@link LruCachedBlock}
417    *
418    * @param cb
419    * @param evict
420    */
421   protected long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
422     long heapsize = cb.heapSize();
423     if (evict) {
424       heapsize *= -1;
425     }
426     return size.addAndGet(heapsize);
427   }
428 
429   /**
430    * Get the buffer of the block with the specified name.
431    * @param cacheKey block's cache key
432    * @param caching true if the caller caches blocks on cache misses
433    * @param repeat Whether this is a repeat lookup for the same block
434    *        (used to avoid double counting cache misses when doing double-check locking)
435    * @param updateCacheMetrics Whether to update cache metrics or not
436    * @return buffer of specified cache key, or null if not in cache
437    */
438   @Override
439   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
440       boolean updateCacheMetrics) {
441     LruCachedBlock cb = map.get(cacheKey);
442     if (cb == null) {
443       if (!repeat && updateCacheMetrics) stats.miss(caching, cacheKey.isPrimary());
444       // If there is another block cache then try and read there.
445       // However if this is a retry ( second time in double checked locking )
446       // And it's already a miss then the l2 will also be a miss.
447       if (victimHandler != null && !repeat) {
448         Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
449 
450         // Promote this to L1.
451         if (result != null && caching) {
452           cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = */ true);
453         }
454         return result;
455       }
456       return null;
457     }
458     if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary());
459     cb.access(count.incrementAndGet());
460     return cb.getBuffer();
461   }
462 
463   /**
464    * Whether the cache contains block with specified cacheKey
465    * @param cacheKey
466    * @return true if contains the block
467    */
468   public boolean containsBlock(BlockCacheKey cacheKey) {
469     return map.containsKey(cacheKey);
470   }
471 
472   @Override
473   public boolean evictBlock(BlockCacheKey cacheKey) {
474     LruCachedBlock cb = map.get(cacheKey);
475     if (cb == null) return false;
476     evictBlock(cb, false);
477     return true;
478   }
479 
480   /**
481    * Evicts all blocks for a specific HFile. This is an
482    * expensive operation implemented as a linear-time search through all blocks
483    * in the cache. Ideally this should be a search in a log-access-time map.
484    *
485    * <p>
486    * This is used for evict-on-close to remove all blocks of a specific HFile.
487    *
488    * @return the number of blocks evicted
489    */
490   @Override
491   public int evictBlocksByHfileName(String hfileName) {
492     int numEvicted = 0;
493     for (BlockCacheKey key : map.keySet()) {
494       if (key.getHfileName().equals(hfileName)) {
495         if (evictBlock(key))
496           ++numEvicted;
497       }
498     }
499     if (victimHandler != null) {
500       numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
501     }
502     return numEvicted;
503   }
504 
505   /**
506    * Evict the block, and it will be cached by the victim handler if exists &amp;&amp;
507    * block may be read again later
508    * @param block
509    * @param evictedByEvictionProcess true if the given block is evicted by
510    *          EvictionThread
511    * @return the heap size of evicted block
512    */
513   protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
514     map.remove(block.getCacheKey());
515     updateSizeMetrics(block, true);
516     long val = elements.decrementAndGet();
517     if (LOG.isTraceEnabled()) {
518       long size = map.size();
519       assertCounterSanity(size, val);
520     }
521     stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
522     if (evictedByEvictionProcess && victimHandler != null) {
523       if (victimHandler instanceof BucketCache) {
524         boolean wait = getCurrentSize() < acceptableSize();
525         boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
526         ((BucketCache)victimHandler).cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
527             inMemory, wait);
528       } else {
529         victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
530       }
531     }
532     return block.heapSize();
533   }
534 
535   /**
536    * Multi-threaded call to run the eviction process.
537    */
538   private void runEviction() {
539     if(evictionThread == null) {
540       evict();
541     } else {
542       evictionThread.evict();
543     }
544   }
545 
546   /**
547    * Eviction method.
548    */
549   void evict() {
550 
551     // Ensure only one eviction at a time
552     if(!evictionLock.tryLock()) return;
553 
554     try {
555       evictionInProgress = true;
556       long currentSize = this.size.get();
557       long bytesToFree = currentSize - minSize();
558 
559       if (LOG.isTraceEnabled()) {
560         LOG.trace("Block cache LRU eviction started; Attempting to free " +
561           StringUtils.byteDesc(bytesToFree) + " of total=" +
562           StringUtils.byteDesc(currentSize));
563       }
564 
565       if(bytesToFree <= 0) return;
566 
567       // Instantiate priority buckets
568       BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
569           singleSize());
570       BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
571           multiSize());
572       BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
573           memorySize());
574 
575       // Scan entire map putting into appropriate buckets
576       for(LruCachedBlock cachedBlock : map.values()) {
577         switch(cachedBlock.getPriority()) {
578           case SINGLE: {
579             bucketSingle.add(cachedBlock);
580             break;
581           }
582           case MULTI: {
583             bucketMulti.add(cachedBlock);
584             break;
585           }
586           case MEMORY: {
587             bucketMemory.add(cachedBlock);
588             break;
589           }
590         }
591       }
592 
593       long bytesFreed = 0;
594       if (forceInMemory || memoryFactor > 0.999f) {
595         long s = bucketSingle.totalSize();
596         long m = bucketMulti.totalSize();
597         if (bytesToFree > (s + m)) {
598           // this means we need to evict blocks in memory bucket to make room,
599           // so the single and multi buckets will be emptied
600           bytesFreed = bucketSingle.free(s);
601           bytesFreed += bucketMulti.free(m);
602           if (LOG.isTraceEnabled()) {
603             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
604               " from single and multi buckets");
605           }
606           bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
607           if (LOG.isTraceEnabled()) {
608             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
609               " total from all three buckets ");
610           }
611         } else {
612           // this means no need to evict block in memory bucket,
613           // and we try best to make the ratio between single-bucket and
614           // multi-bucket is 1:2
615           long bytesRemain = s + m - bytesToFree;
616           if (3 * s <= bytesRemain) {
617             // single-bucket is small enough that no eviction happens for it
618             // hence all eviction goes from multi-bucket
619             bytesFreed = bucketMulti.free(bytesToFree);
620           } else if (3 * m <= 2 * bytesRemain) {
621             // multi-bucket is small enough that no eviction happens for it
622             // hence all eviction goes from single-bucket
623             bytesFreed = bucketSingle.free(bytesToFree);
624           } else {
625             // both buckets need to evict some blocks
626             bytesFreed = bucketSingle.free(s - bytesRemain / 3);
627             if (bytesFreed < bytesToFree) {
628               bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
629             }
630           }
631         }
632       } else {
633         PriorityQueue<BlockBucket> bucketQueue =
634           new PriorityQueue<BlockBucket>(3);
635 
636         bucketQueue.add(bucketSingle);
637         bucketQueue.add(bucketMulti);
638         bucketQueue.add(bucketMemory);
639 
640         int remainingBuckets = 3;
641 
642         BlockBucket bucket;
643         while((bucket = bucketQueue.poll()) != null) {
644           long overflow = bucket.overflow();
645           if(overflow > 0) {
646             long bucketBytesToFree = Math.min(overflow,
647                 (bytesToFree - bytesFreed) / remainingBuckets);
648             bytesFreed += bucket.free(bucketBytesToFree);
649           }
650           remainingBuckets--;
651         }
652       }
653 
654       if (LOG.isTraceEnabled()) {
655         long single = bucketSingle.totalSize();
656         long multi = bucketMulti.totalSize();
657         long memory = bucketMemory.totalSize();
658         LOG.trace("Block cache LRU eviction completed; " +
659           "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
660           "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
661           "single=" + StringUtils.byteDesc(single) + ", " +
662           "multi=" + StringUtils.byteDesc(multi) + ", " +
663           "memory=" + StringUtils.byteDesc(memory));
664       }
665     } finally {
666       stats.evict();
667       evictionInProgress = false;
668       evictionLock.unlock();
669     }
670   }
671 
672   @Override
673   public String toString() {
674     return Objects.toStringHelper(this)
675       .add("blockCount", getBlockCount())
676       .add("currentSize", getCurrentSize())
677       .add("freeSize", getFreeSize())
678       .add("maxSize", getMaxSize())
679       .add("heapSize", heapSize())
680       .add("minSize", minSize())
681       .add("minFactor", minFactor)
682       .add("multiSize", multiSize())
683       .add("multiFactor", multiFactor)
684       .add("singleSize", singleSize())
685       .add("singleFactor", singleFactor)
686       .toString();
687   }
688 
689   /**
690    * Used to group blocks into priority buckets.  There will be a BlockBucket
691    * for each priority (single, multi, memory).  Once bucketed, the eviction
692    * algorithm takes the appropriate number of elements out of each according
693    * to configuration parameters and their relatives sizes.
694    */
695   private class BlockBucket implements Comparable<BlockBucket> {
696 
697     private final String name;
698     private LruCachedBlockQueue queue;
699     private long totalSize = 0;
700     private long bucketSize;
701 
702     public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
703       this.name = name;
704       this.bucketSize = bucketSize;
705       queue = new LruCachedBlockQueue(bytesToFree, blockSize);
706       totalSize = 0;
707     }
708 
709     public void add(LruCachedBlock block) {
710       totalSize += block.heapSize();
711       queue.add(block);
712     }
713 
714     public long free(long toFree) {
715       if (LOG.isTraceEnabled()) {
716         LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
717       }
718       LruCachedBlock cb;
719       long freedBytes = 0;
720       while ((cb = queue.pollLast()) != null) {
721         freedBytes += evictBlock(cb, true);
722         if (freedBytes >= toFree) {
723           return freedBytes;
724         }
725       }
726       if (LOG.isTraceEnabled()) {
727         LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
728       }
729       return freedBytes;
730     }
731 
732     public long overflow() {
733       return totalSize - bucketSize;
734     }
735 
736     public long totalSize() {
737       return totalSize;
738     }
739 
740     public int compareTo(BlockBucket that) {
741       if(this.overflow() == that.overflow()) return 0;
742       return this.overflow() > that.overflow() ? 1 : -1;
743     }
744 
745     @Override
746     public boolean equals(Object that) {
747       if (that == null || !(that instanceof BlockBucket)){
748         return false;
749       }
750       return compareTo((BlockBucket)that) == 0;
751     }
752 
753     @Override
754     public int hashCode() {
755       return Objects.hashCode(name, bucketSize, queue, totalSize);
756     }
757 
758     @Override
759     public String toString() {
760       return Objects.toStringHelper(this)
761         .add("name", name)
762         .add("totalSize", StringUtils.byteDesc(totalSize))
763         .add("bucketSize", StringUtils.byteDesc(bucketSize))
764         .toString();
765     }
766   }
767 
768   /**
769    * Get the maximum size of this cache.
770    * @return max size in bytes
771    */
772   public long getMaxSize() {
773     return this.maxSize;
774   }
775 
776   @Override
777   public long getCurrentSize() {
778     return this.size.get();
779   }
780 
781   @Override
782   public long getFreeSize() {
783     return getMaxSize() - getCurrentSize();
784   }
785 
786   @Override
787   public long size() {
788     return getMaxSize();
789   }
790 
791   @Override
792   public long getBlockCount() {
793     return this.elements.get();
794   }
795 
796   EvictionThread getEvictionThread() {
797     return this.evictionThread;
798   }
799 
800   /*
801    * Eviction thread.  Sits in waiting state until an eviction is triggered
802    * when the cache size grows above the acceptable level.<p>
803    *
804    * Thread is triggered into action by {@link LruBlockCache#runEviction()}
805    */
806   static class EvictionThread extends HasThread {
807     private WeakReference<LruBlockCache> cache;
808     private volatile boolean go = true;
809     // flag set after enter the run method, used for test
810     private boolean enteringRun = false;
811 
812     public EvictionThread(LruBlockCache cache) {
813       super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
814       setDaemon(true);
815       this.cache = new WeakReference<LruBlockCache>(cache);
816     }
817 
818     @Override
819     public void run() {
820       enteringRun = true;
821       while (this.go) {
822         synchronized(this) {
823           try {
824             this.wait(1000 * 10/*Don't wait for ever*/);
825           } catch(InterruptedException e) {
826             LOG.warn("Interrupted eviction thread ", e);
827             Thread.currentThread().interrupt();
828           }
829         }
830         LruBlockCache cache = this.cache.get();
831         if (cache == null) break;
832         cache.evict();
833       }
834     }
835 
836     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
837         justification="This is what we want")
838     public void evict() {
839       synchronized(this) {
840         this.notifyAll();
841       }
842     }
843 
844     synchronized void shutdown() {
845       this.go = false;
846       this.notifyAll();
847     }
848 
849     /**
850      * Used for the test.
851      */
852     boolean isEnteringRun() {
853       return this.enteringRun;
854     }
855   }
856 
857   /*
858    * Statistics thread.  Periodically prints the cache statistics to the log.
859    */
860   static class StatisticsThread extends Thread {
861     private final LruBlockCache lru;
862 
863     public StatisticsThread(LruBlockCache lru) {
864       super("LruBlockCacheStats");
865       setDaemon(true);
866       this.lru = lru;
867     }
868 
869     @Override
870     public void run() {
871       lru.logStats();
872     }
873   }
874 
875   public void logStats() {
876     // Log size
877     long totalSize = heapSize();
878     long freeSize = maxSize - totalSize;
879     LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
880         "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
881         "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
882         "blockCount=" + getBlockCount() + ", " +
883         "accesses=" + stats.getRequestCount() + ", " +
884         "hits=" + stats.getHitCount() + ", " +
885         "hitRatio=" + (stats.getHitCount() == 0 ?
886           "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
887         "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
888         "cachingHits=" + stats.getHitCachingCount() + ", " +
889         "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
890           "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
891         "evictions=" + stats.getEvictionCount() + ", " +
892         "evicted=" + stats.getEvictedCount() + ", " +
893         "evictedPerRun=" + stats.evictedPerEviction());
894   }
895 
896   /**
897    * Get counter statistics for this cache.
898    *
899    * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
900    * of the eviction processes.
901    */
902   public CacheStats getStats() {
903     return this.stats;
904   }
905 
906   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
907       (3 * Bytes.SIZEOF_LONG) + (10 * ClassSize.REFERENCE) +
908       (5 * Bytes.SIZEOF_FLOAT) + (2 * Bytes.SIZEOF_BOOLEAN)
909       + ClassSize.OBJECT);
910 
911   @Override
912   public long heapSize() {
913     return getCurrentSize();
914   }
915 
916   public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
917     // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
918     return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
919         ((long)Math.ceil(maxSize*1.2/blockSize)
920             * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
921         ((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
922   }
923 
924   @Override
925   public Iterator<CachedBlock> iterator() {
926     final Iterator<LruCachedBlock> iterator = map.values().iterator();
927 
928     return new Iterator<CachedBlock>() {
929       private final long now = System.nanoTime();
930 
931       @Override
932       public boolean hasNext() {
933         return iterator.hasNext();
934       }
935 
936       @Override
937       public CachedBlock next() {
938         final LruCachedBlock b = iterator.next();
939         return new CachedBlock() {
940           @Override
941           public String toString() {
942             return BlockCacheUtil.toString(this, now);
943           }
944 
945           @Override
946           public BlockPriority getBlockPriority() {
947             return b.getPriority();
948           }
949 
950           @Override
951           public BlockType getBlockType() {
952             return b.getBuffer().getBlockType();
953           }
954 
955           @Override
956           public long getOffset() {
957             return b.getCacheKey().getOffset();
958           }
959 
960           @Override
961           public long getSize() {
962             return b.getBuffer().heapSize();
963           }
964 
965           @Override
966           public long getCachedTime() {
967             return b.getCachedTime();
968           }
969 
970           @Override
971           public String getFilename() {
972             return b.getCacheKey().getHfileName();
973           }
974 
975           @Override
976           public int compareTo(CachedBlock other) {
977             int diff = this.getFilename().compareTo(other.getFilename());
978             if (diff != 0) return diff;
979             diff = (int)(this.getOffset() - other.getOffset());
980             if (diff != 0) return diff;
981             if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
982               throw new IllegalStateException("" + this.getCachedTime() + ", " +
983                 other.getCachedTime());
984             }
985             return (int)(other.getCachedTime() - this.getCachedTime());
986           }
987 
988           @Override
989           public int hashCode() {
990             return b.hashCode();
991           }
992 
993           @Override
994           public boolean equals(Object obj) {
995             if (obj instanceof CachedBlock) {
996               CachedBlock cb = (CachedBlock)obj;
997               return compareTo(cb) == 0;
998             } else {
999               return false;
1000             }
1001           }
1002         };
1003       }
1004 
1005       @Override
1006       public void remove() {
1007         throw new UnsupportedOperationException();
1008       }
1009     };
1010   }
1011 
1012   // Simple calculators of sizes given factors and maxSize
1013 
1014   long acceptableSize() {
1015     return (long)Math.floor(this.maxSize * this.acceptableFactor);
1016   }
1017   private long minSize() {
1018     return (long)Math.floor(this.maxSize * this.minFactor);
1019   }
1020   private long singleSize() {
1021     return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1022   }
1023   private long multiSize() {
1024     return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1025   }
1026   private long memorySize() {
1027     return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1028   }
1029 
1030   public void shutdown() {
1031     if (victimHandler != null)
1032       victimHandler.shutdown();
1033     this.scheduleThreadPool.shutdown();
1034     for (int i = 0; i < 10; i++) {
1035       if (!this.scheduleThreadPool.isShutdown()) {
1036         try {
1037           Thread.sleep(10);
1038         } catch (InterruptedException e) {
1039           LOG.warn("Interrupted while sleeping");
1040           Thread.currentThread().interrupt();
1041           break;
1042         }
1043       }
1044     }
1045 
1046     if (!this.scheduleThreadPool.isShutdown()) {
1047       List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1048       LOG.debug("Still running " + runnables);
1049     }
1050     this.evictionThread.shutdown();
1051   }
1052 
1053   /** Clears the cache. Used in tests. */
1054   @VisibleForTesting
1055   public void clearCache() {
1056     this.map.clear();
1057     this.elements.set(0);
1058   }
1059 
1060   /**
1061    * Used in testing. May be very inefficient.
1062    * @return the set of cached file names
1063    */
1064   @VisibleForTesting
1065   SortedSet<String> getCachedFileNamesForTest() {
1066     SortedSet<String> fileNames = new TreeSet<String>();
1067     for (BlockCacheKey cacheKey : map.keySet()) {
1068       fileNames.add(cacheKey.getHfileName());
1069     }
1070     return fileNames;
1071   }
1072 
1073   @VisibleForTesting
1074   Map<BlockType, Integer> getBlockTypeCountsForTest() {
1075     Map<BlockType, Integer> counts =
1076         new EnumMap<BlockType, Integer>(BlockType.class);
1077     for (LruCachedBlock cb : map.values()) {
1078       BlockType blockType = ((Cacheable)cb.getBuffer()).getBlockType();
1079       Integer count = counts.get(blockType);
1080       counts.put(blockType, (count == null ? 0 : count) + 1);
1081     }
1082     return counts;
1083   }
1084 
1085   @VisibleForTesting
1086   public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1087     Map<DataBlockEncoding, Integer> counts =
1088         new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
1089     for (LruCachedBlock block : map.values()) {
1090       DataBlockEncoding encoding =
1091               ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1092       Integer count = counts.get(encoding);
1093       counts.put(encoding, (count == null ? 0 : count) + 1);
1094     }
1095     return counts;
1096   }
1097 
1098   public void setVictimCache(BlockCache handler) {
1099     assert victimHandler == null;
1100     victimHandler = handler;
1101   }
1102 
1103   @VisibleForTesting
1104   Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1105     return map;
1106   }
1107 
1108   BlockCache getVictimHandler() {
1109     return this.victimHandler;
1110   }
1111 
1112   @Override
1113   public BlockCache[] getBlockCaches() {
1114     return null;
1115   }
1116 }