View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.lang.ref.WeakReference;
22  import java.nio.ByteBuffer;
23  import java.util.EnumMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.PriorityQueue;
28  import java.util.SortedSet;
29  import java.util.TreeSet;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.locks.ReentrantLock;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.classification.InterfaceAudience;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.hbase.io.HeapSize;
42  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
43  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.ClassSize;
46  import org.apache.hadoop.hbase.util.HasThread;
47  import org.apache.hadoop.hbase.util.Threads;
48  import org.apache.hadoop.util.StringUtils;
49  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
50   
51  import com.google.common.annotations.VisibleForTesting;
52  import com.google.common.util.concurrent.ThreadFactoryBuilder;
53  
54  /**
55   * A block cache implementation that is memory-aware using {@link HeapSize},
56   * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
57   * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
58   * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
59   *
60   * Contains three levels of block priority to allow for
61   * scan-resistance and in-memory families.  A block is added with an inMemory
62   * flag if necessary, otherwise a block becomes a single access priority.  Once
63   * a blocked is accessed again, it changes to multiple access.  This is used
64   * to prevent scans from thrashing the cache, adding a least-frequently-used
65   * element to the eviction algorithm.<p>
66   *
67   * Each priority is given its own chunk of the total cache to ensure
68   * fairness during eviction.  Each priority will retain close to its maximum
69   * size, however, if any priority is not using its entire chunk the others
70   * are able to grow beyond their chunk size.<p>
71   *
72   * Instantiated at a minimum with the total size and average block size.
73   * All sizes are in bytes.  The block size is not especially important as this
74   * cache is fully dynamic in its sizing of blocks.  It is only used for
75   * pre-allocating data structures and in initial heap estimation of the map.<p>
76   *
77   * The detailed constructor defines the sizes for the three priorities (they
78   * should total to the maximum size defined).  It also sets the levels that
79   * trigger and control the eviction thread.<p>
80   *
81   * The acceptable size is the cache size level which triggers the eviction
82   * process to start.  It evicts enough blocks to get the size below the
83   * minimum size specified.<p>
84   *
85   * Eviction happens in a separate thread and involves a single full-scan
86   * of the map.  It determines how many bytes must be freed to reach the minimum
87   * size, and then while scanning determines the fewest least-recently-used
88   * blocks necessary from each of the three priorities (would be 3 times bytes
89   * to free).  It then uses the priority chunk sizes to evict fairly according
90   * to the relative sizes and usage.
91   */
92  @InterfaceAudience.Private
93  @JsonIgnoreProperties({"encodingCountsForTest"})
94  public class LruBlockCache implements BlockCache, HeapSize {
95  
96    static final Log LOG = LogFactory.getLog(LruBlockCache.class);
97  
98    static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
99    static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
100   static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
101   static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
102   static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";
103 
104   /**
105    * Configuration key to force data-block always(except in-memory are too much)
106    * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
107    * configuration, inMemoryForceMode is a cluster-wide configuration
108    */
109   static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";
110 
111   /** Default Configuration Parameters*/
112 
113   /** Backing Concurrent Map Configuration */
114   static final float DEFAULT_LOAD_FACTOR = 0.75f;
115   static final int DEFAULT_CONCURRENCY_LEVEL = 16;
116 
117   /** Eviction thresholds */
118   static final float DEFAULT_MIN_FACTOR = 0.95f;
119   static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
120 
121   /** Priority buckets */
122   static final float DEFAULT_SINGLE_FACTOR = 0.25f;
123   static final float DEFAULT_MULTI_FACTOR = 0.50f;
124   static final float DEFAULT_MEMORY_FACTOR = 0.25f;
125 
126   static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
127 
128   /** Statistics thread */
129   static final int statThreadPeriod = 60 * 5;
130 
131   /** Concurrent map (the cache) */
132   private final Map<BlockCacheKey,LruCachedBlock> map;
133 
134   /** Eviction lock (locked when eviction in process) */
135   private final ReentrantLock evictionLock = new ReentrantLock(true);
136 
137   /** Volatile boolean to track if we are in an eviction process or not */
138   private volatile boolean evictionInProgress = false;
139 
140   /** Eviction thread */
141   private final EvictionThread evictionThread;
142 
143   /** Statistics thread schedule pool (for heavy debugging, could remove) */
144   private final ScheduledExecutorService scheduleThreadPool =
145     Executors.newScheduledThreadPool(1,
146       new ThreadFactoryBuilder()
147         .setNameFormat("LruStats #%d")
148         .setDaemon(true)
149         .build());
150 
151   /** Current size of cache */
152   private final AtomicLong size;
153 
154   /** Current number of cached elements */
155   private final AtomicLong elements;
156 
157   /** Cache access count (sequential ID) */
158   private final AtomicLong count;
159 
160   /** Cache statistics */
161   private final CacheStats stats;
162 
163   /** Maximum allowable size of cache (block put if size > max, evict) */
164   private long maxSize;
165 
166   /** Approximate block size */
167   private long blockSize;
168 
169   /** Acceptable size of cache (no evictions if size < acceptable) */
170   private float acceptableFactor;
171 
172   /** Minimum threshold of cache (when evicting, evict until size < min) */
173   private float minFactor;
174 
175   /** Single access bucket size */
176   private float singleFactor;
177 
178   /** Multiple access bucket size */
179   private float multiFactor;
180 
181   /** In-memory bucket size */
182   private float memoryFactor;
183 
184   /** Overhead of the structure itself */
185   private long overhead;
186 
187   /** Whether in-memory hfile's data block has higher priority when evicting */
188   private boolean forceInMemory;
189 
190   /** Where to send victims (blocks evicted from the cache) */
191   private BucketCache victimHandler = null;
192 
193   /**
194    * Default constructor.  Specify maximum size and expected average block
195    * size (approximation is fine).
196    *
197    * <p>All other factors will be calculated based on defaults specified in
198    * this class.
199    * @param maxSize maximum size of cache, in bytes
200    * @param blockSize approximate size of each block, in bytes
201    */
202   public LruBlockCache(long maxSize, long blockSize) {
203     this(maxSize, blockSize, true);
204   }
205 
206   /**
207    * Constructor used for testing.  Allows disabling of the eviction thread.
208    */
209   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
210     this(maxSize, blockSize, evictionThread,
211         (int)Math.ceil(1.2*maxSize/blockSize),
212         DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
213         DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
214         DEFAULT_SINGLE_FACTOR,
215         DEFAULT_MULTI_FACTOR,
216         DEFAULT_MEMORY_FACTOR,
217         false
218         );
219   }
220 
221   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
222     this(maxSize, blockSize, evictionThread,
223         (int)Math.ceil(1.2*maxSize/blockSize),
224         DEFAULT_LOAD_FACTOR,
225         DEFAULT_CONCURRENCY_LEVEL,
226         conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
227         conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
228         conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
229         conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
230         conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
231         conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE)
232         );
233   }
234 
235   public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
236     this(maxSize, blockSize, true, conf);
237   }
238 
239   /**
240    * Configurable constructor.  Use this constructor if not using defaults.
241    * @param maxSize maximum size of this cache, in bytes
242    * @param blockSize expected average size of blocks, in bytes
243    * @param evictionThread whether to run evictions in a bg thread or not
244    * @param mapInitialSize initial size of backing ConcurrentHashMap
245    * @param mapLoadFactor initial load factor of backing ConcurrentHashMap
246    * @param mapConcurrencyLevel initial concurrency factor for backing CHM
247    * @param minFactor percentage of total size that eviction will evict until
248    * @param acceptableFactor percentage of total size that triggers eviction
249    * @param singleFactor percentage of total size for single-access blocks
250    * @param multiFactor percentage of total size for multiple-access blocks
251    * @param memoryFactor percentage of total size for in-memory blocks
252    */
253   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
254       int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
255       float minFactor, float acceptableFactor, float singleFactor,
256       float multiFactor, float memoryFactor, boolean forceInMemory) {
257     if(singleFactor + multiFactor + memoryFactor != 1 ||
258         singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
259       throw new IllegalArgumentException("Single, multi, and memory factors " +
260           " should be non-negative and total 1.0");
261     }
262     if(minFactor >= acceptableFactor) {
263       throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
264     }
265     if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
266       throw new IllegalArgumentException("all factors must be < 1");
267     }
268     this.maxSize = maxSize;
269     this.blockSize = blockSize;
270     this.forceInMemory = forceInMemory;
271     map = new ConcurrentHashMap<BlockCacheKey,LruCachedBlock>(mapInitialSize,
272         mapLoadFactor, mapConcurrencyLevel);
273     this.minFactor = minFactor;
274     this.acceptableFactor = acceptableFactor;
275     this.singleFactor = singleFactor;
276     this.multiFactor = multiFactor;
277     this.memoryFactor = memoryFactor;
278     this.stats = new CacheStats();
279     this.count = new AtomicLong(0);
280     this.elements = new AtomicLong(0);
281     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
282     this.size = new AtomicLong(this.overhead);
283     if(evictionThread) {
284       this.evictionThread = new EvictionThread(this);
285       this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
286     } else {
287       this.evictionThread = null;
288     }
289     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
290         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
291   }
292 
293   public void setMaxSize(long maxSize) {
294     this.maxSize = maxSize;
295     if(this.size.get() > acceptableSize() && !evictionInProgress) {
296       runEviction();
297     }
298   }
299 
300   // BlockCache implementation
301 
302   /**
303    * Cache the block with the specified name and buffer.
304    * <p>
305    * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
306    * this can happen, for which we compare the buffer contents.
307    * @param cacheKey block's cache key
308    * @param buf block buffer
309    * @param inMemory if block is in-memory
310    */
311   @Override
312   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
313     LruCachedBlock cb = map.get(cacheKey);
314     if(cb != null) {
315       // compare the contents, if they are not equal, we are in big trouble
316       if (compare(buf, cb.getBuffer()) != 0) {
317         throw new RuntimeException("Cached block contents differ, which should not have happened."
318           + "cacheKey:" + cacheKey);
319       }
320       String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
321       msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
322       LOG.warn(msg);
323       return;
324     }
325     cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
326     long newSize = updateSizeMetrics(cb, false);
327     map.put(cacheKey, cb);
328     elements.incrementAndGet();
329     if(newSize > acceptableSize() && !evictionInProgress) {
330       runEviction();
331     }
332   }
333 
334   private int compare(Cacheable left, Cacheable right) {
335     ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
336     left.serialize(l);
337     ByteBuffer r = ByteBuffer.allocate(right.getSerializedLength());
338     right.serialize(r);
339     return Bytes.compareTo(l.array(), l.arrayOffset(), l.limit(),
340       r.array(), r.arrayOffset(), r.limit());
341   }
342 
343   /**
344    * Cache the block with the specified name and buffer.
345    * <p>
346    * It is assumed this will NEVER be called on an already cached block.  If
347    * that is done, it is assumed that you are reinserting the same exact
348    * block due to a race condition and will update the buffer but not modify
349    * the size of the cache.
350    * @param cacheKey block's cache key
351    * @param buf block buffer
352    */
353   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
354     cacheBlock(cacheKey, buf, false);
355   }
356 
357   /**
358    * Helper function that updates the local size counter and also updates any
359    * per-cf or per-blocktype metrics it can discern from given
360    * {@link LruCachedBlock}
361    *
362    * @param cb
363    * @param evict
364    */
365   protected long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
366     long heapsize = cb.heapSize();
367     if (evict) {
368       heapsize *= -1;
369     }
370     return size.addAndGet(heapsize);
371   }
372 
373   /**
374    * Get the buffer of the block with the specified name.
375    * @param cacheKey block's cache key
376    * @param caching true if the caller caches blocks on cache misses
377    * @param repeat Whether this is a repeat lookup for the same block
378    *        (used to avoid double counting cache misses when doing double-check locking)
379    * @param updateCacheMetrics Whether to update cache metrics or not
380    * @return buffer of specified cache key, or null if not in cache
381    */
382   @Override
383   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
384       boolean updateCacheMetrics) {
385     LruCachedBlock cb = map.get(cacheKey);
386     if(cb == null) {
387       if (!repeat && updateCacheMetrics) stats.miss(caching);
388       if (victimHandler != null)
389         return victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
390       return null;
391     }
392     if (updateCacheMetrics) stats.hit(caching);
393     cb.access(count.incrementAndGet());
394     return cb.getBuffer();
395   }
396 
397   /**
398    * Whether the cache contains block with specified cacheKey
399    * @param cacheKey
400    * @return true if contains the block
401    */
402   public boolean containsBlock(BlockCacheKey cacheKey) {
403     return map.containsKey(cacheKey);
404   }
405 
406   @Override
407   public boolean evictBlock(BlockCacheKey cacheKey) {
408     LruCachedBlock cb = map.get(cacheKey);
409     if (cb == null) return false;
410     evictBlock(cb, false);
411     return true;
412   }
413 
414   /**
415    * Evicts all blocks for a specific HFile. This is an
416    * expensive operation implemented as a linear-time search through all blocks
417    * in the cache. Ideally this should be a search in a log-access-time map.
418    *
419    * <p>
420    * This is used for evict-on-close to remove all blocks of a specific HFile.
421    *
422    * @return the number of blocks evicted
423    */
424   @Override
425   public int evictBlocksByHfileName(String hfileName) {
426     int numEvicted = 0;
427     for (BlockCacheKey key : map.keySet()) {
428       if (key.getHfileName().equals(hfileName)) {
429         if (evictBlock(key))
430           ++numEvicted;
431       }
432     }
433     if (victimHandler != null) {
434       numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
435     }
436     return numEvicted;
437   }
438 
439   /**
440    * Evict the block, and it will be cached by the victim handler if exists &&
441    * block may be read again later
442    * @param block
443    * @param evictedByEvictionProcess true if the given block is evicted by
444    *          EvictionThread
445    * @return the heap size of evicted block
446    */
447   protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
448     map.remove(block.getCacheKey());
449     updateSizeMetrics(block, true);
450     elements.decrementAndGet();
451     stats.evicted();
452     if (evictedByEvictionProcess && victimHandler != null) {
453       boolean wait = getCurrentSize() < acceptableSize();
454       boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
455       victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
456           inMemory, wait);
457     }
458     return block.heapSize();
459   }
460 
461   /**
462    * Multi-threaded call to run the eviction process.
463    */
464   private void runEviction() {
465     if(evictionThread == null) {
466       evict();
467     } else {
468       evictionThread.evict();
469     }
470   }
471 
472   /**
473    * Eviction method.
474    */
475   void evict() {
476 
477     // Ensure only one eviction at a time
478     if(!evictionLock.tryLock()) return;
479 
480     try {
481       evictionInProgress = true;
482       long currentSize = this.size.get();
483       long bytesToFree = currentSize - minSize();
484 
485       if (LOG.isTraceEnabled()) {
486         LOG.trace("Block cache LRU eviction started; Attempting to free " +
487           StringUtils.byteDesc(bytesToFree) + " of total=" +
488           StringUtils.byteDesc(currentSize));
489       }
490 
491       if(bytesToFree <= 0) return;
492 
493       // Instantiate priority buckets
494       BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize,
495           singleSize());
496       BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize,
497           multiSize());
498       BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize,
499           memorySize());
500 
501       // Scan entire map putting into appropriate buckets
502       for(LruCachedBlock cachedBlock : map.values()) {
503         switch(cachedBlock.getPriority()) {
504           case SINGLE: {
505             bucketSingle.add(cachedBlock);
506             break;
507           }
508           case MULTI: {
509             bucketMulti.add(cachedBlock);
510             break;
511           }
512           case MEMORY: {
513             bucketMemory.add(cachedBlock);
514             break;
515           }
516         }
517       }
518 
519       long bytesFreed = 0;
520       if (forceInMemory || memoryFactor > 0.999f) {
521         long s = bucketSingle.totalSize();
522         long m = bucketMulti.totalSize();
523         if (bytesToFree > (s + m)) {
524           // this means we need to evict blocks in memory bucket to make room,
525           // so the single and multi buckets will be emptied
526           bytesFreed = bucketSingle.free(s);
527           bytesFreed += bucketMulti.free(m);
528           bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
529         } else {
530           // this means no need to evict block in memory bucket,
531           // and we try best to make the ratio between single-bucket and
532           // multi-bucket is 1:2
533           long bytesRemain = s + m - bytesToFree;
534           if (3 * s <= bytesRemain) {
535             // single-bucket is small enough that no eviction happens for it
536             // hence all eviction goes from multi-bucket
537             bytesFreed = bucketMulti.free(bytesToFree);
538           } else if (3 * m <= 2 * bytesRemain) {
539             // multi-bucket is small enough that no eviction happens for it
540             // hence all eviction goes from single-bucket
541             bytesFreed = bucketSingle.free(bytesToFree);
542           } else {
543             // both buckets need to evict some blocks
544             bytesFreed = bucketSingle.free(s - bytesRemain / 3);
545             if (bytesFreed < bytesToFree) {
546               bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
547             }
548           }
549         }
550       } else {
551         PriorityQueue<BlockBucket> bucketQueue =
552           new PriorityQueue<BlockBucket>(3);
553 
554         bucketQueue.add(bucketSingle);
555         bucketQueue.add(bucketMulti);
556         bucketQueue.add(bucketMemory);
557 
558         int remainingBuckets = 3;
559 
560         BlockBucket bucket;
561         while((bucket = bucketQueue.poll()) != null) {
562           long overflow = bucket.overflow();
563           if(overflow > 0) {
564             long bucketBytesToFree = Math.min(overflow,
565                 (bytesToFree - bytesFreed) / remainingBuckets);
566             bytesFreed += bucket.free(bucketBytesToFree);
567           }
568           remainingBuckets--;
569         }
570       }
571 
572       if (LOG.isTraceEnabled()) {
573         long single = bucketSingle.totalSize();
574         long multi = bucketMulti.totalSize();
575         long memory = bucketMemory.totalSize();
576         LOG.trace("Block cache LRU eviction completed; " +
577           "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
578           "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
579           "single=" + StringUtils.byteDesc(single) + ", " +
580           "multi=" + StringUtils.byteDesc(multi) + ", " +
581           "memory=" + StringUtils.byteDesc(memory));
582       }
583     } finally {
584       stats.evict();
585       evictionInProgress = false;
586       evictionLock.unlock();
587     }
588   }
589 
590   /**
591    * Used to group blocks into priority buckets.  There will be a BlockBucket
592    * for each priority (single, multi, memory).  Once bucketed, the eviction
593    * algorithm takes the appropriate number of elements out of each according
594    * to configuration parameters and their relatives sizes.
595    */
596   private class BlockBucket implements Comparable<BlockBucket> {
597 
598     private LruCachedBlockQueue queue;
599     private long totalSize = 0;
600     private long bucketSize;
601 
602     public BlockBucket(long bytesToFree, long blockSize, long bucketSize) {
603       this.bucketSize = bucketSize;
604       queue = new LruCachedBlockQueue(bytesToFree, blockSize);
605       totalSize = 0;
606     }
607 
608     public void add(LruCachedBlock block) {
609       totalSize += block.heapSize();
610       queue.add(block);
611     }
612 
613     public long free(long toFree) {
614       LruCachedBlock cb;
615       long freedBytes = 0;
616       while ((cb = queue.pollLast()) != null) {
617         freedBytes += evictBlock(cb, true);
618         if (freedBytes >= toFree) {
619           return freedBytes;
620         }
621       }
622       return freedBytes;
623     }
624 
625     public long overflow() {
626       return totalSize - bucketSize;
627     }
628 
629     public long totalSize() {
630       return totalSize;
631     }
632 
633     public int compareTo(BlockBucket that) {
634       if(this.overflow() == that.overflow()) return 0;
635       return this.overflow() > that.overflow() ? 1 : -1;
636     }
637 
638     @Override
639     public boolean equals(Object that) {
640       if (that == null || !(that instanceof BlockBucket)){
641         return false;
642       }
643 
644       return compareTo(( BlockBucket)that) == 0;
645     }
646 
647   }
648 
649   /**
650    * Get the maximum size of this cache.
651    * @return max size in bytes
652    */
653   public long getMaxSize() {
654     return this.maxSize;
655   }
656 
657   @Override
658   public long getCurrentSize() {
659     return this.size.get();
660   }
661 
662   @Override
663   public long getFreeSize() {
664     return getMaxSize() - getCurrentSize();
665   }
666 
667   @Override
668   public long size() {
669     return getMaxSize();
670   }
671 
672   @Override
673   public long getBlockCount() {
674     return this.elements.get();
675   }
676 
677   EvictionThread getEvictionThread() {
678     return this.evictionThread;
679   }
680 
681   /*
682    * Eviction thread.  Sits in waiting state until an eviction is triggered
683    * when the cache size grows above the acceptable level.<p>
684    *
685    * Thread is triggered into action by {@link LruBlockCache#runEviction()}
686    */
687   static class EvictionThread extends HasThread {
688     private WeakReference<LruBlockCache> cache;
689     private boolean go = true;
690     // flag set after enter the run method, used for test
691     private boolean enteringRun = false;
692 
693     public EvictionThread(LruBlockCache cache) {
694       super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
695       setDaemon(true);
696       this.cache = new WeakReference<LruBlockCache>(cache);
697     }
698 
699     @Override
700     public void run() {
701       enteringRun = true;
702       while (this.go) {
703         synchronized(this) {
704           try {
705             this.wait();
706           } catch(InterruptedException e) {}
707         }
708         LruBlockCache cache = this.cache.get();
709         if(cache == null) break;
710         cache.evict();
711       }
712     }
713 
714     public void evict() {
715       synchronized(this) {
716         this.notifyAll(); // FindBugs NN_NAKED_NOTIFY
717       }
718     }
719 
720     synchronized void shutdown() {
721       this.go = false;
722       this.notifyAll();
723     }
724 
725     /**
726      * Used for the test.
727      */
728     boolean isEnteringRun() {
729       return this.enteringRun;
730     }
731   }
732 
733   /*
734    * Statistics thread.  Periodically prints the cache statistics to the log.
735    */
736   static class StatisticsThread extends Thread {
737     LruBlockCache lru;
738 
739     public StatisticsThread(LruBlockCache lru) {
740       super("LruBlockCache.StatisticsThread");
741       setDaemon(true);
742       this.lru = lru;
743     }
744     @Override
745     public void run() {
746       lru.logStats();
747     }
748   }
749 
750   public void logStats() {
751     if (!LOG.isDebugEnabled()) return;
752     // Log size
753     long totalSize = heapSize();
754     long freeSize = maxSize - totalSize;
755     LruBlockCache.LOG.debug("Total=" + StringUtils.byteDesc(totalSize) + ", " +
756         "free=" + StringUtils.byteDesc(freeSize) + ", " +
757         "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
758         "blocks=" + size() +", " +
759         "accesses=" + stats.getRequestCount() + ", " +
760         "hits=" + stats.getHitCount() + ", " +
761         "hitRatio=" +
762           (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
763         "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
764         "cachingHits=" + stats.getHitCachingCount() + ", " +
765         "cachingHitsRatio=" +
766           (stats.getHitCachingCount() == 0 ? "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
767         "evictions=" + stats.getEvictionCount() + ", " +
768         "evicted=" + stats.getEvictedCount() + ", " +
769         "evictedPerRun=" + stats.evictedPerEviction());
770   }
771 
772   /**
773    * Get counter statistics for this cache.
774    *
775    * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
776    * of the eviction processes.
777    */
778   public CacheStats getStats() {
779     return this.stats;
780   }
781 
782   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
783       (3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
784       (5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
785       + ClassSize.OBJECT);
786 
787   // HeapSize implementation
788   public long heapSize() {
789     return getCurrentSize();
790   }
791 
792   public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
793     // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
794     return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
795         ((long)Math.ceil(maxSize*1.2/blockSize)
796             * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
797         ((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
798   }
799 
800   @Override
801   public Iterator<CachedBlock> iterator() {
802     final Iterator<LruCachedBlock> iterator = map.values().iterator();
803 
804     return new Iterator<CachedBlock>() {
805       private final long now = System.nanoTime();
806 
807       @Override
808       public boolean hasNext() {
809         return iterator.hasNext();
810       }
811 
812       @Override
813       public CachedBlock next() {
814         final LruCachedBlock b = iterator.next();
815         return new CachedBlock() {
816           @Override
817           public String toString() {
818             return BlockCacheUtil.toString(this, now);
819           }
820 
821           @Override
822           public BlockPriority getBlockPriority() {
823             return b.getPriority();
824           }
825 
826           @Override
827           public BlockType getBlockType() {
828             return b.getBuffer().getBlockType();
829           }
830 
831           @Override
832           public long getOffset() {
833             return b.getCacheKey().getOffset();
834           }
835 
836           @Override
837           public long getSize() {
838             return b.getBuffer().heapSize();
839           }
840 
841           @Override
842           public long getCachedTime() {
843             return b.getCachedTime();
844           }
845 
846           @Override
847           public String getFilename() {
848             return b.getCacheKey().getHfileName();
849           }
850 
851           @Override
852           public int compareTo(CachedBlock other) {
853             return (int)(other.getOffset() - this.getOffset());
854           }
855         };
856       }
857 
858       @Override
859       public void remove() {
860         throw new UnsupportedOperationException();
861       }
862     };
863   }
864 
865   // Simple calculators of sizes given factors and maxSize
866 
867   private long acceptableSize() {
868     return (long)Math.floor(this.maxSize * this.acceptableFactor);
869   }
870   private long minSize() {
871     return (long)Math.floor(this.maxSize * this.minFactor);
872   }
873   private long singleSize() {
874     return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
875   }
876   private long multiSize() {
877     return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
878   }
879   private long memorySize() {
880     return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
881   }
882 
883   public void shutdown() {
884     if (victimHandler != null)
885       victimHandler.shutdown();
886     this.scheduleThreadPool.shutdown();
887     for (int i = 0; i < 10; i++) {
888       if (!this.scheduleThreadPool.isShutdown()) Threads.sleep(10);
889     }
890     if (!this.scheduleThreadPool.isShutdown()) {
891       List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
892       LOG.debug("Still running " + runnables);
893     }
894     this.evictionThread.shutdown();
895   }
896 
897   /** Clears the cache. Used in tests. */
898   public void clearCache() {
899     map.clear();
900   }
901 
902   /**
903    * Used in testing. May be very inefficient.
904    * @return the set of cached file names
905    */
906   SortedSet<String> getCachedFileNamesForTest() {
907     SortedSet<String> fileNames = new TreeSet<String>();
908     for (BlockCacheKey cacheKey : map.keySet()) {
909       fileNames.add(cacheKey.getHfileName());
910     }
911     return fileNames;
912   }
913 
914   @VisibleForTesting
915   Map<BlockType, Integer> getBlockTypeCountsForTest() {
916     Map<BlockType, Integer> counts =
917         new EnumMap<BlockType, Integer>(BlockType.class);
918     for (LruCachedBlock cb : map.values()) {
919       BlockType blockType = ((HFileBlock) cb.getBuffer()).getBlockType();
920       Integer count = counts.get(blockType);
921       counts.put(blockType, (count == null ? 0 : count) + 1);
922     }
923     return counts;
924   }
925 
926   public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
927     Map<DataBlockEncoding, Integer> counts =
928         new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
929     for (BlockCacheKey cacheKey : map.keySet()) {
930       DataBlockEncoding encoding = cacheKey.getDataBlockEncoding();
931       Integer count = counts.get(encoding);
932       counts.put(encoding, (count == null ? 0 : count) + 1);
933     }
934     return counts;
935   }
936 
937   public void setVictimCache(BucketCache handler) {
938     assert victimHandler == null;
939     victimHandler = handler;
940   }
941 
942   @Override
943   public BlockCache[] getBlockCaches() {
944     return null;
945   }
946 }