View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.compactions;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.List;
26  import java.util.Random;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
34  import org.apache.hadoop.hbase.regionserver.StoreFile;
35  import org.apache.hadoop.hbase.regionserver.StoreUtils;
36  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
37  
38  import com.google.common.base.Preconditions;
39  import com.google.common.base.Predicate;
40  import com.google.common.collect.Collections2;
41  
42  /**
43   * The default algorithm for selecting files for compaction.
44   * Combines the compaction configuration and the provisional file selection that
45   * it's given to produce the list of suitable candidates for compaction.
46   */
47  @InterfaceAudience.Private
48  public class RatioBasedCompactionPolicy extends CompactionPolicy {
49    private static final Log LOG = LogFactory.getLog(RatioBasedCompactionPolicy.class);
50  
51    public RatioBasedCompactionPolicy(Configuration conf,
52                                      StoreConfigInformation storeConfigInfo) {
53      super(conf, storeConfigInfo);
54    }
55  
56    private ArrayList<StoreFile> getCurrentEligibleFiles(
57        ArrayList<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
58      // candidates = all storefiles not already in compaction queue
59      if (!filesCompacting.isEmpty()) {
60        // exclude all files older than the newest file we're currently
61        // compacting. this allows us to preserve contiguity (HBASE-2856)
62        StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
63        int idx = candidateFiles.indexOf(last);
64        Preconditions.checkArgument(idx != -1);
65        candidateFiles.subList(0, idx + 1).clear();
66      }
67      return candidateFiles;
68    }
69  
70    public List<StoreFile> preSelectCompactionForCoprocessor(
71        final Collection<StoreFile> candidates, final List<StoreFile> filesCompacting) {
72      return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), filesCompacting);
73    }
74  
75    /**
76     * @param candidateFiles candidate files, ordered from oldest to newest
77     * @return subset copy of candidate list that meets compaction criteria
78     * @throws java.io.IOException
79     */
80    public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
81        final List<StoreFile> filesCompacting, final boolean isUserCompaction,
82        final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
83      // Preliminary compaction subject to filters
84      ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
85      // Stuck and not compacting enough (estimate). It is not guaranteed that we will be
86      // able to compact more if stuck and compacting, because ratio policy excludes some
87      // non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
88      int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
89      boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
90          >= storeConfigInfo.getBlockingFileCount();
91      candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
92      LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
93          filesCompacting.size() + " compacting, " + candidateSelection.size() +
94          " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
95  
96      long cfTtl = this.storeConfigInfo.getStoreFileTtl();
97      if (!forceMajor) {
98        // If there are expired files, only select them so that compaction deletes them
99        if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
100         ArrayList<StoreFile> expiredSelection = selectExpiredStoreFiles(
101             candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
102         if (expiredSelection != null) {
103           return new CompactionRequest(expiredSelection);
104         }
105       }
106       candidateSelection = skipLargeFiles(candidateSelection);
107     }
108 
109     // Force a major compaction if this is a user-requested major compaction,
110     // or if we do not have too many files to compact and this was requested
111     // as a major compaction.
112     // Or, if there are any references among the candidates.
113     boolean majorCompaction = (
114       (forceMajor && isUserCompaction)
115       || ((forceMajor || isMajorCompaction(candidateSelection))
116           && (candidateSelection.size() < comConf.getMaxFilesToCompact()))
117       || StoreUtils.hasReferences(candidateSelection)
118       );
119 
120     if (!majorCompaction) {
121       // we're doing a minor compaction, let's see what files are applicable
122       candidateSelection = filterBulk(candidateSelection);
123       candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
124       candidateSelection = checkMinFilesCriteria(candidateSelection);
125     }
126     candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
127     CompactionRequest result = new CompactionRequest(candidateSelection);
128     result.setOffPeak(!candidateSelection.isEmpty() && !majorCompaction && mayUseOffPeak);
129     return result;
130   }
131 
132   /**
133    * Select the expired store files to compact
134    *
135    * @param candidates the initial set of storeFiles
136    * @param maxExpiredTimeStamp
137    *          The store file will be marked as expired if its max time stamp is
138    *          less than this maxExpiredTimeStamp.
139    * @return A CompactSelection contains the expired store files as
140    *         filesToCompact
141    */
142   private ArrayList<StoreFile> selectExpiredStoreFiles(
143       ArrayList<StoreFile> candidates, long maxExpiredTimeStamp) {
144     if (candidates == null || candidates.size() == 0) return null;
145     ArrayList<StoreFile> expiredStoreFiles = null;
146 
147     for (StoreFile storeFile : candidates) {
148       if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
149         LOG.info("Deleting the expired store file by compaction: "
150             + storeFile.getPath() + " whose maxTimeStamp is "
151             + storeFile.getReader().getMaxTimestamp()
152             + " while the max expired timestamp is " + maxExpiredTimeStamp);
153         if (expiredStoreFiles == null) {
154           expiredStoreFiles = new ArrayList<StoreFile>();
155         }
156         expiredStoreFiles.add(storeFile);
157       }
158     }
159     if (expiredStoreFiles != null && expiredStoreFiles.size() == 1
160         && expiredStoreFiles.get(0).getReader().getEntries() == 0) {
161       // If just one empty store file, do not select for compaction.
162       return null;
163     }
164     return expiredStoreFiles;
165   }
166 
167   /**
168    * @param candidates pre-filtrate
169    * @return filtered subset
170    * exclude all files above maxCompactSize
171    * Also save all references. We MUST compact them
172    */
173   private ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates) {
174     int pos = 0;
175     while (pos < candidates.size() && !candidates.get(pos).isReference()
176       && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize())) {
177       ++pos;
178     }
179     if (pos > 0) {
180       LOG.debug("Some files are too large. Excluding " + pos
181           + " files from compaction candidates");
182       candidates.subList(0, pos).clear();
183     }
184     return candidates;
185   }
186 
187   /**
188    * @param candidates pre-filtrate
189    * @return filtered subset
190    * exclude all bulk load files if configured
191    */
192   private ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
193     candidates.removeAll(Collections2.filter(candidates,
194         new Predicate<StoreFile>() {
195           @Override
196           public boolean apply(StoreFile input) {
197             return input.excludeFromMinorCompaction();
198           }
199         }));
200     return candidates;
201   }
202 
203   /**
204    * @param candidates pre-filtrate
205    * @return filtered subset
206    * take upto maxFilesToCompact from the start
207    */
208   private ArrayList<StoreFile> removeExcessFiles(ArrayList<StoreFile> candidates,
209       boolean isUserCompaction, boolean isMajorCompaction) {
210     int excess = candidates.size() - comConf.getMaxFilesToCompact();
211     if (excess > 0) {
212       if (isMajorCompaction && isUserCompaction) {
213         LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
214             " files because of a user-requested major compaction");
215       } else {
216         LOG.debug("Too many admissible files. Excluding " + excess
217           + " files from compaction candidates");
218         candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
219       }
220     }
221     return candidates;
222   }
223   /**
224    * @param candidates pre-filtrate
225    * @return filtered subset
226    * forget the compactionSelection if we don't have enough files
227    */
228   private ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) {
229     int minFiles = comConf.getMinFilesToCompact();
230     if (candidates.size() < minFiles) {
231       if(LOG.isDebugEnabled()) {
232         LOG.debug("Not compacting files because we only have " + candidates.size() +
233           " files ready for compaction. Need " + minFiles + " to initiate.");
234       }
235       candidates.clear();
236     }
237     return candidates;
238   }
239 
240   /**
241     * @param candidates pre-filtrate
242     * @return filtered subset
243     * -- Default minor compaction selection algorithm:
244     * choose CompactSelection from candidates --
245     * First exclude bulk-load files if indicated in configuration.
246     * Start at the oldest file and stop when you find the first file that
247     * meets compaction criteria:
248     * (1) a recently-flushed, small file (i.e. <= minCompactSize)
249     * OR
250     * (2) within the compactRatio of sum(newer_files)
251     * Given normal skew, any newer files will also meet this criteria
252     * <p/>
253     * Additional Note:
254     * If fileSizes.size() >> maxFilesToCompact, we will recurse on
255     * compact().  Consider the oldest files first to avoid a
256     * situation where we always compact [end-threshold,end).  Then, the
257     * last file becomes an aggregate of the previous compactions.
258     *
259     * normal skew:
260     *
261     *         older ----> newer (increasing seqID)
262     *     _
263     *    | |   _
264     *    | |  | |   _
265     *  --|-|- |-|- |-|---_-------_-------  minCompactSize
266     *    | |  | |  | |  | |  _  | |
267     *    | |  | |  | |  | | | | | |
268     *    | |  | |  | |  | | | | | |
269     */
270   ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
271       boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
272     if (candidates.isEmpty()) {
273       return candidates;
274     }
275 
276     // we're doing a minor compaction, let's see what files are applicable
277     int start = 0;
278     double ratio = comConf.getCompactionRatio();
279     if (mayUseOffPeak) {
280       ratio = comConf.getCompactionRatioOffPeak();
281       LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
282     }
283 
284     // get store file sizes for incremental compacting selection.
285     final int countOfFiles = candidates.size();
286     long[] fileSizes = new long[countOfFiles];
287     long[] sumSize = new long[countOfFiles];
288     for (int i = countOfFiles - 1; i >= 0; --i) {
289       StoreFile file = candidates.get(i);
290       fileSizes[i] = file.getReader().length();
291       // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
292       int tooFar = i + comConf.getMaxFilesToCompact() - 1;
293       sumSize[i] = fileSizes[i]
294         + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
295         - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
296     }
297 
298 
299     while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
300       fileSizes[start] > Math.max(comConf.getMinCompactSize(),
301           (long) (sumSize[start + 1] * ratio))) {
302       ++start;
303     }
304     if (start < countOfFiles) {
305       LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
306         + " files from " + countOfFiles + " candidates");
307     } else if (mayBeStuck) {
308       // We may be stuck. Compact the latest files if we can.
309       int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
310       if (filesToLeave >= 0) {
311         start = filesToLeave;
312       }
313     }
314     candidates.subList(0, start).clear();
315     return candidates;
316   }
317 
318   /*
319    * @param filesToCompact Files to compact. Can be null.
320    * @return True if we should run a major compaction.
321    */
322   public boolean isMajorCompaction(final Collection<StoreFile> filesToCompact)
323       throws IOException {
324     boolean result = false;
325     long mcTime = getNextMajorCompactTime(filesToCompact);
326     if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
327       return result;
328     }
329     // TODO: Use better method for determining stamp of last major (HBASE-2990)
330     long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
331     long now = System.currentTimeMillis();
332     if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
333       // Major compaction time has elapsed.
334       long cfTtl = this.storeConfigInfo.getStoreFileTtl();
335       if (filesToCompact.size() == 1) {
336         // Single file
337         StoreFile sf = filesToCompact.iterator().next();
338         Long minTimestamp = sf.getMinimumTimestamp();
339         long oldest = (minTimestamp == null)
340             ? Long.MIN_VALUE
341             : now - minTimestamp.longValue();
342         if (sf.isMajorCompaction() &&
343             (cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
344           if (LOG.isDebugEnabled()) {
345             LOG.debug("Skipping major compaction of " + this +
346                 " because one (major) compacted file only and oldestTime " +
347                 oldest + "ms is < ttl=" + cfTtl);
348           }
349         } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
350           LOG.debug("Major compaction triggered on store " + this +
351             ", because keyvalues outdated; time since last major compaction " +
352             (now - lowTimestamp) + "ms");
353           result = true;
354         }
355       } else {
356         if (LOG.isDebugEnabled()) {
357           LOG.debug("Major compaction triggered on store " + this +
358               "; time since last major compaction " + (now - lowTimestamp) + "ms");
359         }
360         result = true;
361       }
362     }
363     return result;
364   }
365 
366   public long getNextMajorCompactTime(final Collection<StoreFile> filesToCompact) {
367     // default = 24hrs
368     long ret = comConf.getMajorCompactionPeriod();
369     if (ret > 0) {
370       // default = 20% = +/- 4.8 hrs
371       double jitterPct = comConf.getMajorCompactionJitter();
372       if (jitterPct > 0) {
373         long jitter = Math.round(ret * jitterPct);
374         // deterministic jitter avoids a major compaction storm on restart
375         Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
376         if (seed != null) {
377           double rnd = (new Random(seed)).nextDouble();
378           ret += jitter - Math.round(2L * jitter * rnd);
379         } else {
380           ret = 0; // no storefiles == no major compaction
381         }
382       }
383     }
384     return ret;
385   }
386 
387   /**
388    * @param compactionSize Total size of some compaction
389    * @return whether this should be a large or small compaction
390    */
391   public boolean throttleCompaction(long compactionSize) {
392     return compactionSize > comConf.getThrottlePoint();
393   }
394 
395   public boolean needsCompaction(final Collection<StoreFile> storeFiles,
396       final List<StoreFile> filesCompacting) {
397     int numCandidates = storeFiles.size() - filesCompacting.size();
398     return numCandidates >= comConf.getMinFilesToCompact();
399   }
400 }