1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.compactions;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.Random;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
34 import org.apache.hadoop.hbase.regionserver.StoreFile;
35 import org.apache.hadoop.hbase.regionserver.StoreUtils;
36 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
37
38 import com.google.common.base.Preconditions;
39 import com.google.common.base.Predicate;
40 import com.google.common.collect.Collections2;
41
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class RatioBasedCompactionPolicy extends CompactionPolicy {
49 private static final Log LOG = LogFactory.getLog(RatioBasedCompactionPolicy.class);
50
51 public RatioBasedCompactionPolicy(Configuration conf,
52 StoreConfigInformation storeConfigInfo) {
53 super(conf, storeConfigInfo);
54 }
55
56 private ArrayList<StoreFile> getCurrentEligibleFiles(
57 ArrayList<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
58
59 if (!filesCompacting.isEmpty()) {
60
61
62 StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
63 int idx = candidateFiles.indexOf(last);
64 Preconditions.checkArgument(idx != -1);
65 candidateFiles.subList(0, idx + 1).clear();
66 }
67 return candidateFiles;
68 }
69
70 public List<StoreFile> preSelectCompactionForCoprocessor(
71 final Collection<StoreFile> candidates, final List<StoreFile> filesCompacting) {
72 return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), filesCompacting);
73 }
74
75
76
77
78
79
80 public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
81 final List<StoreFile> filesCompacting, final boolean isUserCompaction,
82 final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
83
84 ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
85
86
87
88 int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
89 boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
90 >= storeConfigInfo.getBlockingFileCount();
91 candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
92 LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
93 filesCompacting.size() + " compacting, " + candidateSelection.size() +
94 " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
95
96 long cfTtl = this.storeConfigInfo.getStoreFileTtl();
97 if (!forceMajor) {
98
99 if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
100 ArrayList<StoreFile> expiredSelection = selectExpiredStoreFiles(
101 candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
102 if (expiredSelection != null) {
103 return new CompactionRequest(expiredSelection);
104 }
105 }
106 candidateSelection = skipLargeFiles(candidateSelection);
107 }
108
109
110
111
112
113 boolean majorCompaction = (
114 (forceMajor && isUserCompaction)
115 || ((forceMajor || isMajorCompaction(candidateSelection))
116 && (candidateSelection.size() < comConf.getMaxFilesToCompact()))
117 || StoreUtils.hasReferences(candidateSelection)
118 );
119
120 if (!majorCompaction) {
121
122 candidateSelection = filterBulk(candidateSelection);
123 candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
124 candidateSelection = checkMinFilesCriteria(candidateSelection);
125 }
126 candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
127 CompactionRequest result = new CompactionRequest(candidateSelection);
128 result.setOffPeak(!candidateSelection.isEmpty() && !majorCompaction && mayUseOffPeak);
129 return result;
130 }
131
132
133
134
135
136
137
138
139
140
141
142 private ArrayList<StoreFile> selectExpiredStoreFiles(
143 ArrayList<StoreFile> candidates, long maxExpiredTimeStamp) {
144 if (candidates == null || candidates.size() == 0) return null;
145 ArrayList<StoreFile> expiredStoreFiles = null;
146
147 for (StoreFile storeFile : candidates) {
148 if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
149 LOG.info("Deleting the expired store file by compaction: "
150 + storeFile.getPath() + " whose maxTimeStamp is "
151 + storeFile.getReader().getMaxTimestamp()
152 + " while the max expired timestamp is " + maxExpiredTimeStamp);
153 if (expiredStoreFiles == null) {
154 expiredStoreFiles = new ArrayList<StoreFile>();
155 }
156 expiredStoreFiles.add(storeFile);
157 }
158 }
159 if (expiredStoreFiles != null && expiredStoreFiles.size() == 1
160 && expiredStoreFiles.get(0).getReader().getEntries() == 0) {
161
162 return null;
163 }
164 return expiredStoreFiles;
165 }
166
167
168
169
170
171
172
173 private ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates) {
174 int pos = 0;
175 while (pos < candidates.size() && !candidates.get(pos).isReference()
176 && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize())) {
177 ++pos;
178 }
179 if (pos > 0) {
180 LOG.debug("Some files are too large. Excluding " + pos
181 + " files from compaction candidates");
182 candidates.subList(0, pos).clear();
183 }
184 return candidates;
185 }
186
187
188
189
190
191
192 private ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
193 candidates.removeAll(Collections2.filter(candidates,
194 new Predicate<StoreFile>() {
195 @Override
196 public boolean apply(StoreFile input) {
197 return input.excludeFromMinorCompaction();
198 }
199 }));
200 return candidates;
201 }
202
203
204
205
206
207
208 private ArrayList<StoreFile> removeExcessFiles(ArrayList<StoreFile> candidates,
209 boolean isUserCompaction, boolean isMajorCompaction) {
210 int excess = candidates.size() - comConf.getMaxFilesToCompact();
211 if (excess > 0) {
212 if (isMajorCompaction && isUserCompaction) {
213 LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
214 " files because of a user-requested major compaction");
215 } else {
216 LOG.debug("Too many admissible files. Excluding " + excess
217 + " files from compaction candidates");
218 candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
219 }
220 }
221 return candidates;
222 }
223
224
225
226
227
228 private ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) {
229 int minFiles = comConf.getMinFilesToCompact();
230 if (candidates.size() < minFiles) {
231 if(LOG.isDebugEnabled()) {
232 LOG.debug("Not compacting files because we only have " + candidates.size() +
233 " files ready for compaction. Need " + minFiles + " to initiate.");
234 }
235 candidates.clear();
236 }
237 return candidates;
238 }
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270 ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
271 boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
272 if (candidates.isEmpty()) {
273 return candidates;
274 }
275
276
277 int start = 0;
278 double ratio = comConf.getCompactionRatio();
279 if (mayUseOffPeak) {
280 ratio = comConf.getCompactionRatioOffPeak();
281 LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
282 }
283
284
285 final int countOfFiles = candidates.size();
286 long[] fileSizes = new long[countOfFiles];
287 long[] sumSize = new long[countOfFiles];
288 for (int i = countOfFiles - 1; i >= 0; --i) {
289 StoreFile file = candidates.get(i);
290 fileSizes[i] = file.getReader().length();
291
292 int tooFar = i + comConf.getMaxFilesToCompact() - 1;
293 sumSize[i] = fileSizes[i]
294 + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
295 - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
296 }
297
298
299 while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
300 fileSizes[start] > Math.max(comConf.getMinCompactSize(),
301 (long) (sumSize[start + 1] * ratio))) {
302 ++start;
303 }
304 if (start < countOfFiles) {
305 LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
306 + " files from " + countOfFiles + " candidates");
307 } else if (mayBeStuck) {
308
309 int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
310 if (filesToLeave >= 0) {
311 start = filesToLeave;
312 }
313 }
314 candidates.subList(0, start).clear();
315 return candidates;
316 }
317
318
319
320
321
322 public boolean isMajorCompaction(final Collection<StoreFile> filesToCompact)
323 throws IOException {
324 boolean result = false;
325 long mcTime = getNextMajorCompactTime(filesToCompact);
326 if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
327 return result;
328 }
329
330 long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
331 long now = System.currentTimeMillis();
332 if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
333
334 long cfTtl = this.storeConfigInfo.getStoreFileTtl();
335 if (filesToCompact.size() == 1) {
336
337 StoreFile sf = filesToCompact.iterator().next();
338 Long minTimestamp = sf.getMinimumTimestamp();
339 long oldest = (minTimestamp == null)
340 ? Long.MIN_VALUE
341 : now - minTimestamp.longValue();
342 if (sf.isMajorCompaction() &&
343 (cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
344 if (LOG.isDebugEnabled()) {
345 LOG.debug("Skipping major compaction of " + this +
346 " because one (major) compacted file only and oldestTime " +
347 oldest + "ms is < ttl=" + cfTtl);
348 }
349 } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
350 LOG.debug("Major compaction triggered on store " + this +
351 ", because keyvalues outdated; time since last major compaction " +
352 (now - lowTimestamp) + "ms");
353 result = true;
354 }
355 } else {
356 if (LOG.isDebugEnabled()) {
357 LOG.debug("Major compaction triggered on store " + this +
358 "; time since last major compaction " + (now - lowTimestamp) + "ms");
359 }
360 result = true;
361 }
362 }
363 return result;
364 }
365
366 public long getNextMajorCompactTime(final Collection<StoreFile> filesToCompact) {
367
368 long ret = comConf.getMajorCompactionPeriod();
369 if (ret > 0) {
370
371 double jitterPct = comConf.getMajorCompactionJitter();
372 if (jitterPct > 0) {
373 long jitter = Math.round(ret * jitterPct);
374
375 Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
376 if (seed != null) {
377 double rnd = (new Random(seed)).nextDouble();
378 ret += jitter - Math.round(2L * jitter * rnd);
379 } else {
380 ret = 0;
381 }
382 }
383 }
384 return ret;
385 }
386
387
388
389
390
391 public boolean throttleCompaction(long compactionSize) {
392 return compactionSize > comConf.getThrottlePoint();
393 }
394
395 public boolean needsCompaction(final Collection<StoreFile> storeFiles,
396 final List<StoreFile> filesCompacting) {
397 int numCandidates = storeFiles.size() - filesCompacting.size();
398 return numCandidates >= comConf.getMinFilesToCompact();
399 }
400 }