View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.io.hfile.bucket;
22  
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  import com.google.common.base.Objects;
29  import com.google.common.base.Preconditions;
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
34  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
35  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
36  
37  /**
38   * This class is used to allocate a block with specified size and free the block
39   * when evicting. It manages an array of buckets, each bucket is associated with
40   * a size and caches elements up to this size. For completely empty bucket, this
41   * size could be re-specified dynamically.
42   * 
43   * This class is not thread safe.
44   */
45  @InterfaceAudience.Private
46  @JsonIgnoreProperties({"indexStatistics", "freeSize", "usedSize"})
47  public final class BucketAllocator {
48    static final Log LOG = LogFactory.getLog(BucketAllocator.class);
49  
50    @JsonIgnoreProperties({"completelyFree", "uninstantiated"})
51    public final static class Bucket {
52      private long baseOffset;
53      private int itemAllocationSize, sizeIndex;
54      private int itemCount;
55      private int freeList[];
56      private int freeCount, usedCount;
57  
58      public Bucket(long offset) {
59        baseOffset = offset;
60        sizeIndex = -1;
61      }
62  
63      void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
64        Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
65        this.sizeIndex = sizeIndex;
66        itemAllocationSize = bucketSizes[sizeIndex];
67        itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
68        freeCount = itemCount;
69        usedCount = 0;
70        freeList = new int[itemCount];
71        for (int i = 0; i < freeCount; ++i)
72          freeList[i] = i;
73      }
74  
75      public boolean isUninstantiated() {
76        return sizeIndex == -1;
77      }
78  
79      public int sizeIndex() {
80        return sizeIndex;
81      }
82  
83      public int getItemAllocationSize() {
84        return itemAllocationSize;
85      }
86  
87      public boolean hasFreeSpace() {
88        return freeCount > 0;
89      }
90  
91      public boolean isCompletelyFree() {
92        return usedCount == 0;
93      }
94  
95      public int freeCount() {
96        return freeCount;
97      }
98  
99      public int usedCount() {
100       return usedCount;
101     }
102 
103     public int getFreeBytes() {
104       return freeCount * itemAllocationSize;
105     }
106 
107     public int getUsedBytes() {
108       return usedCount * itemAllocationSize;
109     }
110 
111     public long getBaseOffset() {
112       return baseOffset;
113     }
114 
115     /**
116      * Allocate a block in this bucket, return the offset representing the
117      * position in physical space
118      * @return the offset in the IOEngine
119      */
120     public long allocate() {
121       assert freeCount > 0; // Else should not have been called
122       assert sizeIndex != -1;
123       ++usedCount;
124       long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
125       assert offset >= 0;
126       return offset;
127     }
128 
129     public void addAllocation(long offset) throws BucketAllocatorException {
130       offset -= baseOffset;
131       if (offset < 0 || offset % itemAllocationSize != 0)
132         throw new BucketAllocatorException(
133             "Attempt to add allocation for bad offset: " + offset + " base="
134                 + baseOffset + ", bucket size=" + itemAllocationSize);
135       int idx = (int) (offset / itemAllocationSize);
136       boolean matchFound = false;
137       for (int i = 0; i < freeCount; ++i) {
138         if (matchFound) freeList[i - 1] = freeList[i];
139         else if (freeList[i] == idx) matchFound = true;
140       }
141       if (!matchFound)
142         throw new BucketAllocatorException("Couldn't find match for index "
143             + idx + " in free list");
144       ++usedCount;
145       --freeCount;
146     }
147 
148     private void free(long offset) {
149       offset -= baseOffset;
150       assert offset >= 0;
151       assert offset < itemCount * itemAllocationSize;
152       assert offset % itemAllocationSize == 0;
153       assert usedCount > 0;
154       assert freeCount < itemCount; // Else duplicate free
155       int item = (int) (offset / (long) itemAllocationSize);
156       assert !freeListContains(item);
157       --usedCount;
158       freeList[freeCount++] = item;
159     }
160 
161     private boolean freeListContains(int blockNo) {
162       for (int i = 0; i < freeCount; ++i) {
163         if (freeList[i] == blockNo) return true;
164       }
165       return false;
166     }
167   }
168 
169   final class BucketSizeInfo {
170     // Free bucket means it has space to allocate a block;
171     // Completely free bucket means it has no block.
172     private List<Bucket> bucketList, freeBuckets, completelyFreeBuckets;
173     private int sizeIndex;
174 
175     BucketSizeInfo(int sizeIndex) {
176       bucketList = new ArrayList<Bucket>();
177       freeBuckets = new ArrayList<Bucket>();
178       completelyFreeBuckets = new ArrayList<Bucket>();
179       this.sizeIndex = sizeIndex;
180     }
181 
182     public void instantiateBucket(Bucket b) {
183       assert b.isUninstantiated() || b.isCompletelyFree();
184       b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
185       bucketList.add(b);
186       freeBuckets.add(b);
187       completelyFreeBuckets.add(b);
188     }
189 
190     public int sizeIndex() {
191       return sizeIndex;
192     }
193 
194     /**
195      * Find a bucket to allocate a block
196      * @return the offset in the IOEngine
197      */
198     public long allocateBlock() {
199       Bucket b = null;
200       if (freeBuckets.size() > 0) // Use up an existing one first...
201         b = freeBuckets.get(freeBuckets.size() - 1);
202       if (b == null) {
203         b = grabGlobalCompletelyFreeBucket();
204         if (b != null) instantiateBucket(b);
205       }
206       if (b == null) return -1;
207       long result = b.allocate();
208       blockAllocated(b);
209       return result;
210     }
211 
212     void blockAllocated(Bucket b) {
213       if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
214       if (!b.hasFreeSpace()) freeBuckets.remove(b);
215     }
216 
217     public Bucket findAndRemoveCompletelyFreeBucket() {
218       Bucket b = null;
219       assert bucketList.size() > 0;
220       if (bucketList.size() == 1) {
221         // So we never get complete starvation of a bucket for a size
222         return null;
223       }
224 
225       if (completelyFreeBuckets.size() > 0) {
226         b = completelyFreeBuckets.get(0);
227         removeBucket(b);
228       }
229       return b;
230     }
231 
232     private void removeBucket(Bucket b) {
233       assert b.isCompletelyFree();
234       bucketList.remove(b);
235       freeBuckets.remove(b);
236       completelyFreeBuckets.remove(b);
237     }
238 
239     public void freeBlock(Bucket b, long offset) {
240       assert bucketList.contains(b);
241       // else we shouldn't have anything to free...
242       assert (!completelyFreeBuckets.contains(b));
243       b.free(offset);
244       if (!freeBuckets.contains(b)) freeBuckets.add(b);
245       if (b.isCompletelyFree()) completelyFreeBuckets.add(b);
246     }
247 
248     public IndexStatistics statistics() {
249       long free = 0, used = 0;
250       for (Bucket b : bucketList) {
251         free += b.freeCount();
252         used += b.usedCount();
253       }
254       return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
255     }
256 
257     @Override
258     public String toString() {
259       return Objects.toStringHelper(this.getClass())
260         .add("sizeIndex", sizeIndex)
261         .add("bucketSize", bucketSizes[sizeIndex])
262         .toString();
263     }
264   }
265 
266   // Default block size is 64K, so we choose more sizes near 64K, you'd better
267   // reset it according to your cluster's block size distribution
268   // TODO Support the view of block size distribution statistics
269   private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
270       16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
271       56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
272       192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
273       512 * 1024 + 1024 };
274 
275   /**
276    * Round up the given block size to bucket size, and get the corresponding
277    * BucketSizeInfo
278    */
279   public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
280     for (int i = 0; i < bucketSizes.length; ++i)
281       if (blockSize <= bucketSizes[i])
282         return bucketSizeInfos[i];
283     return null;
284   }
285 
286   static public final int FEWEST_ITEMS_IN_BUCKET = 4;
287 
288   private final int[] bucketSizes;
289   private final int bigItemSize;
290   // The capacity size for each bucket
291   private final long bucketCapacity;
292   private Bucket[] buckets;
293   private BucketSizeInfo[] bucketSizeInfos;
294   private final long totalSize;
295   private long usedSize = 0;
296 
297   BucketAllocator(long availableSpace, int[] bucketSizes)
298       throws BucketAllocatorException {
299     this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
300     int largestBucket = this.bucketSizes[0];
301     for (int i : this.bucketSizes) {
302       largestBucket = Math.max(largestBucket, i);
303     }
304     this.bigItemSize = largestBucket;
305     this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize;
306     buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
307     if (buckets.length < this.bucketSizes.length)
308       throw new BucketAllocatorException(
309           "Bucket allocator size too small - must have room for at least "
310               + this.bucketSizes.length + " buckets");
311     bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
312     for (int i = 0; i < this.bucketSizes.length; ++i) {
313       bucketSizeInfos[i] = new BucketSizeInfo(i);
314     }
315     for (int i = 0; i < buckets.length; ++i) {
316       buckets[i] = new Bucket(bucketCapacity * i);
317       bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
318           .instantiateBucket(buckets[i]);
319     }
320     this.totalSize = ((long) buckets.length) * bucketCapacity;
321   }
322 
323   /**
324    * Rebuild the allocator's data structures from a persisted map.
325    * @param availableSpace capacity of cache
326    * @param map A map stores the block key and BucketEntry(block's meta data
327    *          like offset, length)
328    * @param realCacheSize cached data size statistics for bucket cache
329    * @throws BucketAllocatorException
330    */
331   BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
332       AtomicLong realCacheSize) throws BucketAllocatorException {
333     this(availableSpace, bucketSizes);
334 
335     // each bucket has an offset, sizeindex. probably the buckets are too big
336     // in our default state. so what we do is reconfigure them according to what
337     // we've found. we can only reconfigure each bucket once; if more than once,
338     // we know there's a bug, so we just log the info, throw, and start again...
339     boolean[] reconfigured = new boolean[buckets.length];
340     for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) {
341       long foundOffset = entry.getValue().offset();
342       int foundLen = entry.getValue().getLength();
343       int bucketSizeIndex = -1;
344       for (int i = 0; i < bucketSizes.length; ++i) {
345         if (foundLen <= bucketSizes[i]) {
346           bucketSizeIndex = i;
347           break;
348         }
349       }
350       if (bucketSizeIndex == -1) {
351         throw new BucketAllocatorException(
352             "Can't match bucket size for the block with size " + foundLen);
353       }
354       int bucketNo = (int) (foundOffset / bucketCapacity);
355       if (bucketNo < 0 || bucketNo >= buckets.length)
356         throw new BucketAllocatorException("Can't find bucket " + bucketNo
357             + ", total buckets=" + buckets.length
358             + "; did you shrink the cache?");
359       Bucket b = buckets[bucketNo];
360       if (reconfigured[bucketNo]) {
361         if (b.sizeIndex() != bucketSizeIndex)
362           throw new BucketAllocatorException(
363               "Inconsistent allocation in bucket map;");
364       } else {
365         if (!b.isCompletelyFree())
366           throw new BucketAllocatorException("Reconfiguring bucket "
367               + bucketNo + " but it's already allocated; corrupt data");
368         // Need to remove the bucket from whichever list it's currently in at
369         // the moment...
370         BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
371         BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
372         oldbsi.removeBucket(b);
373         bsi.instantiateBucket(b);
374         reconfigured[bucketNo] = true;
375       }
376       realCacheSize.addAndGet(foundLen);
377       buckets[bucketNo].addAllocation(foundOffset);
378       usedSize += buckets[bucketNo].getItemAllocationSize();
379       bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
380     }
381   }
382 
383   public String toString() {
384     StringBuilder sb = new StringBuilder(1024);
385     for (int i = 0; i < buckets.length; ++i) {
386       Bucket b = buckets[i];
387       if (i > 0) sb.append(", ");
388       sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize());
389       sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount());
390     }
391     return sb.toString();
392   }
393 
394   public long getUsedSize() {
395     return this.usedSize;
396   }
397 
398   public long getFreeSize() {
399     return this.totalSize - getUsedSize();
400   }
401 
402   public long getTotalSize() {
403     return this.totalSize;
404   }
405 
406   /**
407    * Allocate a block with specified size. Return the offset
408    * @param blockSize size of block
409    * @throws BucketAllocatorException,CacheFullException
410    * @return the offset in the IOEngine
411    */
412   public synchronized long allocateBlock(int blockSize) throws CacheFullException,
413       BucketAllocatorException {
414     assert blockSize > 0;
415     BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
416     if (bsi == null) {
417       throw new BucketAllocatorException("Allocation too big size=" + blockSize);
418     }
419     long offset = bsi.allocateBlock();
420 
421     // Ask caller to free up space and try again!
422     if (offset < 0)
423       throw new CacheFullException(blockSize, bsi.sizeIndex());
424     usedSize += bucketSizes[bsi.sizeIndex()];
425     return offset;
426   }
427 
428   private Bucket grabGlobalCompletelyFreeBucket() {
429     for (BucketSizeInfo bsi : bucketSizeInfos) {
430       Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
431       if (b != null) return b;
432     }
433     return null;
434   }
435 
436   /**
437    * Free a block with the offset
438    * @param offset block's offset
439    * @return size freed
440    */
441   public synchronized int freeBlock(long offset) {
442     int bucketNo = (int) (offset / bucketCapacity);
443     assert bucketNo >= 0 && bucketNo < buckets.length;
444     Bucket targetBucket = buckets[bucketNo];
445     bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
446     usedSize -= targetBucket.getItemAllocationSize();
447     return targetBucket.getItemAllocationSize();
448   }
449 
450   public int sizeIndexOfAllocation(long offset) {
451     int bucketNo = (int) (offset / bucketCapacity);
452     assert bucketNo >= 0 && bucketNo < buckets.length;
453     Bucket targetBucket = buckets[bucketNo];
454     return targetBucket.sizeIndex();
455   }
456 
457   public int sizeOfAllocation(long offset) {
458     int bucketNo = (int) (offset / bucketCapacity);
459     assert bucketNo >= 0 && bucketNo < buckets.length;
460     Bucket targetBucket = buckets[bucketNo];
461     return targetBucket.getItemAllocationSize();
462   }
463 
464   static class IndexStatistics {
465     private long freeCount, usedCount, itemSize, totalCount;
466 
467     public long freeCount() {
468       return freeCount;
469     }
470 
471     public long usedCount() {
472       return usedCount;
473     }
474 
475     public long totalCount() {
476       return totalCount;
477     }
478 
479     public long freeBytes() {
480       return freeCount * itemSize;
481     }
482 
483     public long usedBytes() {
484       return usedCount * itemSize;
485     }
486 
487     public long totalBytes() {
488       return totalCount * itemSize;
489     }
490 
491     public long itemSize() {
492       return itemSize;
493     }
494 
495     public IndexStatistics(long free, long used, long itemSize) {
496       setTo(free, used, itemSize);
497     }
498 
499     public IndexStatistics() {
500       setTo(-1, -1, 0);
501     }
502 
503     public void setTo(long free, long used, long itemSize) {
504       this.itemSize = itemSize;
505       this.freeCount = free;
506       this.usedCount = used;
507       this.totalCount = free + used;
508     }
509   }
510 
511   public Bucket [] getBuckets() {
512     return this.buckets;
513   }
514 
515   public void dumpToLog() {
516     logStatistics();
517     StringBuilder sb = new StringBuilder();
518     for (Bucket b : buckets) {
519       sb.append("Bucket:").append(b.baseOffset).append('\n');
520       sb.append("  Size index: " + b.sizeIndex() + "; Free:" + b.freeCount
521           + "; used:" + b.usedCount + "; freelist\n");
522       for (int i = 0; i < b.freeCount(); ++i)
523         sb.append(b.freeList[i]).append(',');
524       sb.append('\n');
525     }
526     LOG.info(sb);
527   }
528 
529   public void logStatistics() {
530     IndexStatistics total = new IndexStatistics();
531     IndexStatistics[] stats = getIndexStatistics(total);
532     LOG.info("Bucket allocator statistics follow:\n");
533     LOG.info("  Free bytes=" + total.freeBytes() + "+; used bytes="
534         + total.usedBytes() + "; total bytes=" + total.totalBytes());
535     for (IndexStatistics s : stats) {
536       LOG.info("  Object size " + s.itemSize() + " used=" + s.usedCount()
537           + "; free=" + s.freeCount() + "; total=" + s.totalCount());
538     }
539   }
540 
541   public IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
542     IndexStatistics[] stats = getIndexStatistics();
543     long totalfree = 0, totalused = 0;
544     for (IndexStatistics stat : stats) {
545       totalfree += stat.freeBytes();
546       totalused += stat.usedBytes();
547     }
548     grandTotal.setTo(totalfree, totalused, 1);
549     return stats;
550   }
551 
552   public IndexStatistics[] getIndexStatistics() {
553     IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
554     for (int i = 0; i < stats.length; ++i)
555       stats[i] = bucketSizeInfos[i].statistics();
556     return stats;
557   }
558 
559   public long freeBlock(long freeList[]) {
560     long sz = 0;
561     for (int i = 0; i < freeList.length; ++i)
562       sz += freeBlock(freeList[i]);
563     return sz;
564   }
565 
566 }