View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.io.hfile.slab;
21  
22  import java.math.BigDecimal;
23  import java.util.Iterator;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  import java.util.TreeMap;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.ScheduledExecutorService;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.classification.InterfaceAudience;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.io.HeapSize;
38  import org.apache.hadoop.hbase.io.hfile.BlockCache;
39  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
40  import org.apache.hadoop.hbase.io.hfile.BlockPriority;
41  import org.apache.hadoop.hbase.io.hfile.BlockType;
42  import org.apache.hadoop.hbase.io.hfile.CacheStats;
43  import org.apache.hadoop.hbase.io.hfile.Cacheable;
44  import org.apache.hadoop.hbase.io.hfile.CachedBlock;
45  import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
46  import org.apache.hadoop.hbase.util.ClassSize;
47  import org.apache.hadoop.hbase.util.HasThread;
48  import org.apache.hadoop.util.StringUtils;
49  
50  import com.google.common.base.Preconditions;
51  import com.google.common.util.concurrent.ThreadFactoryBuilder;
52  
53  /**
54   * SlabCache is composed of multiple SingleSizeCaches. It uses a TreeMap in
55   * order to determine where a given element fits. Redirects gets and puts to the
56   * correct SingleSizeCache.
57   *
58   * @deprecated As of 1.0, replaced by {@link org.apache.hadoop.hbase.io.hfile.bucket.BucketCache}.
59   */
60  @InterfaceAudience.Private
61  @Deprecated
62  public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize {
63  
64    private final ConcurrentHashMap<BlockCacheKey, SingleSizeCache> backingStore;
65    private final TreeMap<Integer, SingleSizeCache> slabs;
66    static final Log LOG = LogFactory.getLog(SlabCache.class);
67    static final int STAT_THREAD_PERIOD_SECS = 60 * 5;
68  
69    private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
70        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Slab Statistics #%d").build());
71  
72    long size;
73    private final CacheStats stats;
74    final SlabStats requestStats;
75    final SlabStats successfullyCachedStats;
76    private final long avgBlockSize;
77    private static final long CACHE_FIXED_OVERHEAD = ClassSize.estimateBase(
78        SlabCache.class, false);
79  
80    /**
81     * Default constructor, creates an empty SlabCache.
82     *
83     * @param size Total size allocated to the SlabCache. (Bytes)
84     * @param avgBlockSize Average size of a block being cached.
85     **/
86  
87    public SlabCache(long size, long avgBlockSize) {
88      this.avgBlockSize = avgBlockSize;
89      this.size = size;
90      this.stats = new CacheStats();
91      this.requestStats = new SlabStats();
92      this.successfullyCachedStats = new SlabStats();
93  
94      backingStore = new ConcurrentHashMap<BlockCacheKey, SingleSizeCache>();
95      slabs = new TreeMap<Integer, SingleSizeCache>();
96      this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
97          STAT_THREAD_PERIOD_SECS, STAT_THREAD_PERIOD_SECS, TimeUnit.SECONDS);
98    }
99  
100   public Map<Integer, SingleSizeCache> getSizer() {
101     return slabs;
102   }
103 
104   /**
105    * A way of allocating the desired amount of Slabs of each particular size.
106    *
107    * This reads two lists from conf, hbase.offheap.slab.proportions and
108    * hbase.offheap.slab.sizes.
109    *
110    * The first list is the percentage of our total space we allocate to the
111    * slabs.
112    *
113    * The second list is blocksize of the slabs in bytes. (E.g. the slab holds
114    * blocks of this size).
115    *
116    * @param conf Configuration file.
117    */
118   public void addSlabByConf(Configuration conf) {
119     // Proportions we allocate to each slab of the total size.
120     String[] porportions = conf.getStrings(
121         "hbase.offheapcache.slab.proportions", "0.80", "0.20");
122     String[] sizes = conf.getStrings("hbase.offheapcache.slab.sizes",
123         Long.valueOf(avgBlockSize * 11 / 10).toString(),
124         Long.valueOf(avgBlockSize * 21 / 10).toString());
125 
126     if (porportions.length != sizes.length) {
127       throw new IllegalArgumentException(
128           "SlabCache conf not "
129               + "initialized, error in configuration. hbase.offheap.slab.proportions specifies "
130               + porportions.length
131               + " slabs while hbase.offheap.slab.sizes specifies "
132               + sizes.length + " slabs "
133               + "offheapslabporportions and offheapslabsizes");
134     }
135     /*
136      * We use BigDecimals instead of floats because float rounding is annoying
137      */
138 
139     BigDecimal[] parsedProportions = stringArrayToBigDecimalArray(porportions);
140     BigDecimal[] parsedSizes = stringArrayToBigDecimalArray(sizes);
141 
142     BigDecimal sumProportions = new BigDecimal(0);
143     for (BigDecimal b : parsedProportions) {
144       /* Make sure all proportions are greater than 0 */
145       Preconditions
146           .checkArgument(b.compareTo(BigDecimal.ZERO) == 1,
147               "Proportions in hbase.offheap.slab.proportions must be greater than 0!");
148       sumProportions = sumProportions.add(b);
149     }
150 
151     /* If the sum is greater than 1 */
152     Preconditions
153         .checkArgument(sumProportions.compareTo(BigDecimal.ONE) != 1,
154             "Sum of all proportions in hbase.offheap.slab.proportions must be less than 1");
155 
156     /* If the sum of all proportions is less than 0.99 */
157     if (sumProportions.compareTo(new BigDecimal("0.99")) == -1) {
158       LOG.warn("Sum of hbase.offheap.slab.proportions is less than 0.99! Memory is being wasted");
159     }
160     for (int i = 0; i < parsedProportions.length; i++) {
161       int blockSize = parsedSizes[i].intValue();
162       int numBlocks = new BigDecimal(this.size).multiply(parsedProportions[i])
163           .divide(parsedSizes[i], BigDecimal.ROUND_DOWN).intValue();
164       addSlab(blockSize, numBlocks);
165     }
166   }
167 
168   /**
169    * Gets the size of the slab cache a ByteBuffer of this size would be
170    * allocated to.
171    *
172    * @param size Size of the ByteBuffer we are checking.
173    *
174    * @return the Slab that the above bytebuffer would be allocated towards. If
175    *         object is too large, returns null.
176    */
177   Entry<Integer, SingleSizeCache> getHigherBlock(int size) {
178     return slabs.higherEntry(size - 1);
179   }
180 
181   private BigDecimal[] stringArrayToBigDecimalArray(String[] parsee) {
182     BigDecimal[] parsed = new BigDecimal[parsee.length];
183     for (int i = 0; i < parsee.length; i++) {
184       parsed[i] = new BigDecimal(parsee[i].trim());
185     }
186     return parsed;
187   }
188 
189   private void addSlab(int blockSize, int numBlocks) {
190     LOG.info("Creating a slab of blockSize " + blockSize + " with " + numBlocks
191         + " blocks, " + StringUtils.humanReadableInt(blockSize * (long) numBlocks) + "bytes.");
192     slabs.put(blockSize, new SingleSizeCache(blockSize, numBlocks, this));
193   }
194 
195   /**
196    * Cache the block with the specified key and buffer. First finds what size
197    * SingleSlabCache it should fit in. If the block doesn't fit in any, it will
198    * return without doing anything.
199    * <p>
200    * It is assumed this will NEVER be called on an already cached block. If that
201    * is done, it is assumed that you are reinserting the same exact block due to
202    * a race condition, and will throw a runtime exception.
203    *
204    * @param cacheKey block cache key
205    * @param cachedItem block buffer
206    */
207   public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem) {
208     Entry<Integer, SingleSizeCache> scacheEntry = getHigherBlock(cachedItem
209         .getSerializedLength());
210 
211     this.requestStats.addin(cachedItem.getSerializedLength());
212 
213     if (scacheEntry == null) {
214       return; // we can't cache, something too big.
215     }
216 
217     this.successfullyCachedStats.addin(cachedItem.getSerializedLength());
218     SingleSizeCache scache = scacheEntry.getValue();
219 
220     /*
221      * This will throw a runtime exception if we try to cache the same value
222      * twice
223      */
224     scache.cacheBlock(cacheKey, cachedItem);
225   } 
226 
227   /**
228    * We don't care about whether its in memory or not, so we just pass the call
229    * through.
230    */
231   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
232     cacheBlock(cacheKey, buf);
233   }
234 
235   public CacheStats getStats() {
236     return this.stats;
237   }
238 
239   /**
240    * Get the buffer of the block with the specified name.
241    *
242    * @return buffer of specified block name, or null if not in cache
243    */
244   public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
245       boolean updateCacheMetrics) {
246     SingleSizeCache cachedBlock = backingStore.get(key);
247     if (cachedBlock == null) {
248       if (!repeat) stats.miss(caching);
249       return null;
250     }
251 
252     Cacheable contentBlock = cachedBlock.getBlock(key, caching, false, updateCacheMetrics);
253 
254     if (contentBlock != null) {
255       if (updateCacheMetrics) stats.hit(caching);
256     } else if (!repeat) {
257       if (updateCacheMetrics) stats.miss(caching);
258     }
259     return contentBlock;
260   }
261 
262   /**
263    * Evicts a block from the cache. This is public, and thus contributes to the
264    * the evict counter.
265    */
266   public boolean evictBlock(BlockCacheKey cacheKey) {
267     SingleSizeCache cacheEntry = backingStore.get(cacheKey);
268     if (cacheEntry == null) {
269       return false;
270     } else {
271       cacheEntry.evictBlock(cacheKey);
272       return true;
273     }
274   }
275 
276   @Override
277   public void onEviction(BlockCacheKey key, SingleSizeCache notifier) {
278     stats.evicted();
279     backingStore.remove(key);
280   }
281   
282   @Override
283   public void onInsertion(BlockCacheKey key, SingleSizeCache notifier) {
284     backingStore.put(key, notifier);
285   }
286 
287   /**
288    * Sends a shutdown to all SingleSizeCache's contained by this cache.
289    *
290    * Also terminates the scheduleThreadPool.
291    */
292   public void shutdown() {
293     for (SingleSizeCache s : slabs.values()) {
294       s.shutdown();
295     }
296     this.scheduleThreadPool.shutdown();
297   }
298 
299   public long heapSize() {
300     long childCacheSize = 0;
301     for (SingleSizeCache s : slabs.values()) {
302       childCacheSize += s.heapSize();
303     }
304     return SlabCache.CACHE_FIXED_OVERHEAD + childCacheSize;
305   }
306 
307   public long size() {
308     return this.size;
309   }
310 
311   public long getFreeSize() {
312     long childFreeSize = 0;
313     for (SingleSizeCache s : slabs.values()) {
314       childFreeSize += s.getFreeSize();
315     }
316     return childFreeSize;
317   }
318 
319   @Override
320   public long getBlockCount() {
321     long count = 0;
322     for (SingleSizeCache cache : slabs.values()) {
323       count += cache.getBlockCount();
324     }
325     return count;
326   }
327 
328   public long getCurrentSize() {
329     return size;
330   }
331 
332   public long getEvictedCount() {
333     return stats.getEvictedCount();
334   }
335 
336   /*
337    * Statistics thread. Periodically prints the cache statistics to the log.
338    */
339   static class StatisticsThread extends HasThread {
340     SlabCache ourcache;
341 
342     public StatisticsThread(SlabCache slabCache) {
343       super("SlabCache.StatisticsThread");
344       setDaemon(true);
345       this.ourcache = slabCache;
346     }
347 
348     @Override
349     public void run() {
350       for (SingleSizeCache s : ourcache.slabs.values()) {
351         s.logStats();
352       }
353 
354       SlabCache.LOG.info("Current heap size is: "
355           + StringUtils.humanReadableInt(ourcache.heapSize()));
356 
357       LOG.info("Request Stats");
358       ourcache.requestStats.logStats();
359       LOG.info("Successfully Cached Stats");
360       ourcache.successfullyCachedStats.logStats();
361     }
362 
363   }
364 
365   /**
366    * Just like CacheStats, but more Slab specific. Finely grained profiling of
367    * sizes we store using logs.
368    *
369    */
370   static class SlabStats {
371     // the maximum size somebody will ever try to cache, then we multiply by
372     // 10
373     // so we have finer grained stats.
374     static final int MULTIPLIER = 10;
375     final int NUMDIVISIONS = (int) (Math.log(Integer.MAX_VALUE) * MULTIPLIER);
376     private final AtomicLong[] counts = new AtomicLong[NUMDIVISIONS];
377 
378     public SlabStats() {
379       for (int i = 0; i < NUMDIVISIONS; i++) {
380         counts[i] = new AtomicLong();
381       }
382     }
383 
384     public void addin(int size) {
385       int index = (int) (Math.log(size) * MULTIPLIER);
386       counts[index].incrementAndGet();
387     }
388 
389     public AtomicLong[] getUsage() {
390       return counts;
391     }
392 
393     double getUpperBound(int index) {
394       return Math.pow(Math.E, ((index + 0.5) / MULTIPLIER));
395     }
396 
397     double getLowerBound(int index) {
398       return Math.pow(Math.E, ((index - 0.5) / MULTIPLIER));
399     }
400 
401     public void logStats() {
402       AtomicLong[] fineGrainedStats = getUsage();
403       for (int i = 0; i < fineGrainedStats.length; i++) {
404 
405         if (fineGrainedStats[i].get() > 0) {
406           SlabCache.LOG.info("From  "
407               + StringUtils.humanReadableInt((long) getLowerBound(i)) + "- "
408               + StringUtils.humanReadableInt((long) getUpperBound(i)) + ": "
409               + StringUtils.humanReadableInt(fineGrainedStats[i].get())
410               + " requests");
411 
412         }
413       }
414     }
415   }
416 
417   public int evictBlocksByHfileName(String hfileName) {
418     int numEvicted = 0;
419     for (BlockCacheKey key : backingStore.keySet()) {
420       if (key.getHfileName().equals(hfileName)) {
421         if (evictBlock(key))
422           ++numEvicted;
423       }
424     }
425     return numEvicted;
426   }
427 
428   @Override
429   public Iterator<CachedBlock> iterator() {
430     // Don't bother with ramcache since stuff is in here only a little while.
431     final Iterator<Map.Entry<BlockCacheKey, SingleSizeCache>> i =
432         this.backingStore.entrySet().iterator();
433     return new Iterator<CachedBlock>() {
434       private final long now = System.nanoTime();
435 
436       @Override
437       public boolean hasNext() {
438         return i.hasNext();
439       }
440 
441       @Override
442       public CachedBlock next() {
443         final Map.Entry<BlockCacheKey, SingleSizeCache> e = i.next();
444         final Cacheable cacheable = e.getValue().getBlock(e.getKey(), false, false, false);
445         return new CachedBlock() {
446           @Override
447           public String toString() {
448             return BlockCacheUtil.toString(this, now);
449           }
450 
451           @Override
452           public BlockPriority getBlockPriority() {
453             return null;
454           }
455 
456           @Override
457           public BlockType getBlockType() {
458             return cacheable.getBlockType();
459           }
460 
461           @Override
462           public long getOffset() {
463             return e.getKey().getOffset();
464           }
465 
466           @Override
467           public long getSize() {
468             return cacheable == null? 0: cacheable.getSerializedLength();
469           }
470 
471           @Override
472           public long getCachedTime() {
473             return -1;
474           }
475 
476           @Override
477           public String getFilename() {
478             return e.getKey().getHfileName();
479           }
480 
481           @Override
482           public int compareTo(CachedBlock other) {
483             return (int)(this.getOffset() - other.getOffset());
484           }
485         };
486       }
487 
488       @Override
489       public void remove() {
490         throw new UnsupportedOperationException();
491       }
492     };
493   }
494 
495   @Override
496   public BlockCache[] getBlockCaches() {
497     return null;
498   }
499 }