1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.compactions;
20
21 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
34 import org.apache.hadoop.hbase.regionserver.StoreFile;
35 import org.apache.hadoop.hbase.regionserver.StoreUtils;
36 import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
37 import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
38 import org.apache.hadoop.hbase.util.Bytes;
39 import org.apache.hadoop.hbase.util.ConcatenatedLists;
40 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
41 import org.apache.hadoop.hbase.util.Pair;
42
43 import com.google.common.collect.ImmutableList;
44
45
46
47
48 @InterfaceAudience.Private
49 public class StripeCompactionPolicy extends CompactionPolicy {
50 private final static Log LOG = LogFactory.getLog(StripeCompactionPolicy.class);
51
52 private ExploringCompactionPolicy stripePolicy = null;
53
54 private StripeStoreConfig config;
55
56 public StripeCompactionPolicy(
57 Configuration conf, StoreConfigInformation storeConfigInfo, StripeStoreConfig config) {
58 super(conf, storeConfigInfo);
59 this.config = config;
60 stripePolicy = new ExploringCompactionPolicy(conf, storeConfigInfo);
61 }
62
63 public List<StoreFile> preSelectFilesForCoprocessor(StripeInformationProvider si,
64 List<StoreFile> filesCompacting) {
65
66
67
68 ArrayList<StoreFile> candidateFiles = new ArrayList<StoreFile>(si.getStorefiles());
69 candidateFiles.removeAll(filesCompacting);
70 return candidateFiles;
71 }
72
73 public StripeCompactionRequest createEmptyRequest(
74 StripeInformationProvider si, CompactionRequest request) {
75
76 if (si.getStripeCount() > 0) {
77 return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries());
78 }
79 Pair<Long, Integer> targetKvsAndCount = estimateTargetKvs(
80 request.getFiles(), this.config.getInitialCount());
81 return new SplitStripeCompactionRequest(
82 request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst());
83 }
84
85 public StripeStoreFlusher.StripeFlushRequest selectFlush(
86 StripeInformationProvider si, int kvCount) {
87 if (this.config.isUsingL0Flush()) {
88 return new StripeStoreFlusher.StripeFlushRequest();
89 }
90 if (si.getStripeCount() == 0) {
91
92 int initialCount = this.config.getInitialCount();
93 return new StripeStoreFlusher.SizeStripeFlushRequest(initialCount, kvCount / initialCount);
94 }
95
96 return new StripeStoreFlusher.BoundaryStripeFlushRequest(si.getStripeBoundaries());
97 }
98
99 public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
100 List<StoreFile> filesCompacting, boolean isOffpeak) throws IOException {
101
102
103 if (!filesCompacting.isEmpty()) {
104 LOG.debug("Not selecting compaction: " + filesCompacting.size() + " files compacting");
105 return null;
106 }
107
108
109
110
111
112
113
114 Collection<StoreFile> allFiles = si.getStorefiles();
115 if (StoreUtils.hasReferences(allFiles)) {
116 LOG.debug("There are references in the store; compacting all files");
117 long targetKvs = estimateTargetKvs(allFiles, config.getInitialCount()).getFirst();
118 SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
119 allFiles, OPEN_KEY, OPEN_KEY, targetKvs);
120 request.setMajorRangeFull();
121 return request;
122 }
123
124 int stripeCount = si.getStripeCount();
125 List<StoreFile> l0Files = si.getLevel0Files();
126
127
128 boolean shouldCompactL0 = (this.config.getLevel0MinFiles() <= l0Files.size());
129 if (stripeCount == 0) {
130 if (!shouldCompactL0) return null;
131 return selectNewStripesCompaction(si);
132 }
133
134 boolean canDropDeletesNoL0 = l0Files.size() == 0;
135 if (shouldCompactL0) {
136 if (!canDropDeletesNoL0) {
137
138 StripeCompactionRequest result = selectSingleStripeCompaction(
139 si, true, canDropDeletesNoL0, isOffpeak);
140 if (result != null) return result;
141 }
142 LOG.debug("Selecting L0 compaction with " + l0Files.size() + " files");
143 return new BoundaryStripeCompactionRequest(l0Files, si.getStripeBoundaries());
144 }
145
146
147 StripeCompactionRequest result = selectExpiredMergeCompaction(si, canDropDeletesNoL0);
148 if (result != null) return result;
149
150
151
152 return selectSingleStripeCompaction(si, false, canDropDeletesNoL0, isOffpeak);
153 }
154
155 public boolean needsCompactions(StripeInformationProvider si, List<StoreFile> filesCompacting) {
156
157 return filesCompacting.isEmpty()
158 && (StoreUtils.hasReferences(si.getStorefiles())
159 || (si.getLevel0Files().size() >= this.config.getLevel0MinFiles())
160 || needsSingleStripeCompaction(si));
161 }
162
163 @Override
164 public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
165 return false;
166 }
167
168 @Override
169 public boolean throttleCompaction(long compactionSize) {
170 return compactionSize > comConf.getThrottlePoint();
171 }
172
173
174
175
176
177 protected boolean needsSingleStripeCompaction(StripeInformationProvider si) {
178 int minFiles = this.config.getStripeCompactMinFiles();
179 for (List<StoreFile> stripe : si.getStripes()) {
180 if (stripe.size() >= minFiles) return true;
181 }
182 return false;
183 }
184
185 protected StripeCompactionRequest selectSingleStripeCompaction(StripeInformationProvider si,
186 boolean includeL0, boolean canDropDeletesWithoutL0, boolean isOffpeak) throws IOException {
187 ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
188
189 int bqIndex = -1;
190 List<StoreFile> bqSelection = null;
191 int stripeCount = stripes.size();
192 long bqTotalSize = -1;
193 for (int i = 0; i < stripeCount; ++i) {
194
195
196 List<StoreFile> selection = selectSimpleCompaction(stripes.get(i),
197 !canDropDeletesWithoutL0 && includeL0, isOffpeak);
198 if (selection.isEmpty()) continue;
199 long size = 0;
200 for (StoreFile sf : selection) {
201 size += sf.getReader().length();
202 }
203 if (bqSelection == null || selection.size() > bqSelection.size() ||
204 (selection.size() == bqSelection.size() && size < bqTotalSize)) {
205 bqSelection = selection;
206 bqIndex = i;
207 bqTotalSize = size;
208 }
209 }
210 if (bqSelection == null) {
211 LOG.debug("No good compaction is possible in any stripe");
212 return null;
213 }
214 List<StoreFile> filesToCompact = new ArrayList<StoreFile>(bqSelection);
215
216 int targetCount = 1;
217 long targetKvs = Long.MAX_VALUE;
218 boolean hasAllFiles = filesToCompact.size() == stripes.get(bqIndex).size();
219 String splitString = "";
220 if (hasAllFiles && bqTotalSize >= config.getSplitSize()) {
221 if (includeL0) {
222
223
224 return null;
225 }
226 Pair<Long, Integer> kvsAndCount = estimateTargetKvs(filesToCompact, config.getSplitCount());
227 targetKvs = kvsAndCount.getFirst();
228 targetCount = kvsAndCount.getSecond();
229 splitString = "; the stripe will be split into at most "
230 + targetCount + " stripes with " + targetKvs + " target KVs";
231 }
232
233 LOG.debug("Found compaction in a stripe with end key ["
234 + Bytes.toString(si.getEndRow(bqIndex)) + "], with "
235 + filesToCompact.size() + " files of total size " + bqTotalSize + splitString);
236
237
238 StripeCompactionRequest req;
239 if (includeL0) {
240 assert hasAllFiles;
241 List<StoreFile> l0Files = si.getLevel0Files();
242 LOG.debug("Adding " + l0Files.size() + " files to compaction to be able to drop deletes");
243 ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
244 sfs.addSublist(filesToCompact);
245 sfs.addSublist(l0Files);
246 req = new BoundaryStripeCompactionRequest(sfs, si.getStripeBoundaries());
247 } else {
248 req = new SplitStripeCompactionRequest(
249 filesToCompact, si.getStartRow(bqIndex), si.getEndRow(bqIndex), targetCount, targetKvs);
250 }
251 if (canDropDeletesWithoutL0 || includeL0) {
252 req.setMajorRange(si.getStartRow(bqIndex), si.getEndRow(bqIndex));
253 }
254 req.getRequest().setOffPeak(isOffpeak);
255 return req;
256 }
257
258
259
260
261
262
263
264 private List<StoreFile> selectSimpleCompaction(
265 List<StoreFile> sfs, boolean allFilesOnly, boolean isOffpeak) {
266 int minFilesLocal = Math.max(
267 allFilesOnly ? sfs.size() : 0, this.config.getStripeCompactMinFiles());
268 int maxFilesLocal = Math.max(this.config.getStripeCompactMaxFiles(), minFilesLocal);
269 return stripePolicy.applyCompactionPolicy(sfs, false, isOffpeak, minFilesLocal, maxFilesLocal);
270 }
271
272
273
274
275
276
277
278
279 private StripeCompactionRequest selectCompactionOfAllFiles(StripeInformationProvider si,
280 int targetStripeCount, long targetSize) {
281 Collection<StoreFile> allFiles = si.getStorefiles();
282 SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
283 allFiles, OPEN_KEY, OPEN_KEY, targetStripeCount, targetSize);
284 request.setMajorRangeFull();
285 LOG.debug("Selecting a compaction that includes all " + allFiles.size() + " files");
286 return request;
287 }
288
289 private StripeCompactionRequest selectNewStripesCompaction(StripeInformationProvider si) {
290 List<StoreFile> l0Files = si.getLevel0Files();
291 Pair<Long, Integer> kvsAndCount = estimateTargetKvs(l0Files, config.getInitialCount());
292 LOG.debug("Creating " + kvsAndCount.getSecond() + " initial stripes with "
293 + kvsAndCount.getFirst() + " kvs each via L0 compaction of " + l0Files.size() + " files");
294 SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
295 si.getLevel0Files(), OPEN_KEY, OPEN_KEY, kvsAndCount.getSecond(), kvsAndCount.getFirst());
296 request.setMajorRangeFull();
297 return request;
298 }
299
300 private StripeCompactionRequest selectExpiredMergeCompaction(
301 StripeInformationProvider si, boolean canDropDeletesNoL0) {
302 long cfTtl = this.storeConfigInfo.getStoreFileTtl();
303 if (cfTtl == Long.MAX_VALUE) {
304 return null;
305 }
306 long timestampCutoff = EnvironmentEdgeManager.currentTimeMillis() - cfTtl;
307
308 int start = -1, bestStart = -1, length = 0, bestLength = 0;
309 ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
310 OUTER: for (int i = 0; i < stripes.size(); ++i) {
311 for (StoreFile storeFile : stripes.get(i)) {
312 if (storeFile.getReader().getMaxTimestamp() < timestampCutoff) continue;
313
314 if (length > bestLength) {
315 bestStart = start;
316 bestLength = length;
317 }
318 start = -1;
319 length = 0;
320 continue OUTER;
321 }
322 if (start == -1) {
323 start = i;
324 }
325 ++length;
326 }
327 if (length > bestLength) {
328 bestStart = start;
329 bestLength = length;
330 }
331 if (bestLength == 0) return null;
332 if (bestLength == 1) {
333
334
335
336
337 if (bestStart == (stripes.size() - 1)) return null;
338 ++bestLength;
339 }
340 LOG.debug("Merging " + bestLength + " stripes to delete expired store files");
341 int endIndex = bestStart + bestLength - 1;
342 ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
343 sfs.addAllSublists(stripes.subList(bestStart, endIndex + 1));
344 SplitStripeCompactionRequest result = new SplitStripeCompactionRequest(sfs,
345 si.getStartRow(bestStart), si.getEndRow(endIndex), 1, Long.MAX_VALUE);
346 if (canDropDeletesNoL0) {
347 result.setMajorRangeFull();
348 }
349 return result;
350 }
351
352 private static long getTotalKvCount(final Collection<StoreFile> candidates) {
353 long totalSize = 0;
354 for (StoreFile storeFile : candidates) {
355 totalSize += storeFile.getReader().getEntries();
356 }
357 return totalSize;
358 }
359
360 public static long getTotalFileSize(final Collection<StoreFile> candidates) {
361 long totalSize = 0;
362 for (StoreFile storeFile : candidates) {
363 totalSize += storeFile.getReader().length();
364 }
365 return totalSize;
366 }
367
368 private Pair<Long, Integer> estimateTargetKvs(Collection<StoreFile> files, double splitCount) {
369
370
371
372
373 long totalSize = getTotalFileSize(files);
374 long targetPartSize = config.getSplitPartSize();
375 assert targetPartSize > 0 && splitCount > 0;
376 double ratio = totalSize / (splitCount * targetPartSize);
377 while (ratio > 1.0) {
378
379 double newRatio = totalSize / ((splitCount + 1.0) * targetPartSize);
380 if ((1.0 / newRatio) >= ratio) break;
381 ratio = newRatio;
382 splitCount += 1.0;
383 }
384 long kvCount = (long)(getTotalKvCount(files) / splitCount);
385 return new Pair<Long, Integer>(kvCount, (int)Math.ceil(splitCount));
386 }
387
388
389 public abstract static class StripeCompactionRequest {
390 protected CompactionRequest request;
391 protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
392
393
394
395
396
397
398
399 public abstract List<Path> execute(StripeCompactor compactor) throws IOException;
400
401 public StripeCompactionRequest(CompactionRequest request) {
402 this.request = request;
403 }
404
405
406
407
408
409
410
411 public void setMajorRange(byte[] startRow, byte[] endRow) {
412 this.majorRangeFromRow = startRow;
413 this.majorRangeToRow = endRow;
414 }
415
416 public CompactionRequest getRequest() {
417 return this.request;
418 }
419
420 public void setRequest(CompactionRequest request) {
421 assert request != null;
422 this.request = request;
423 this.majorRangeFromRow = this.majorRangeToRow = null;
424 }
425 }
426
427
428
429
430
431 private static class BoundaryStripeCompactionRequest extends StripeCompactionRequest {
432 private final List<byte[]> targetBoundaries;
433
434
435
436
437
438 public BoundaryStripeCompactionRequest(CompactionRequest request,
439 List<byte[]> targetBoundaries) {
440 super(request);
441 this.targetBoundaries = targetBoundaries;
442 }
443
444 public BoundaryStripeCompactionRequest(Collection<StoreFile> files,
445 List<byte[]> targetBoundaries) {
446 this(new CompactionRequest(files), targetBoundaries);
447 }
448
449 @Override
450 public List<Path> execute(StripeCompactor compactor) throws IOException {
451 return compactor.compact(
452 this.request, this.targetBoundaries, this.majorRangeFromRow, this.majorRangeToRow);
453 }
454 }
455
456
457
458
459
460
461
462 private static class SplitStripeCompactionRequest extends StripeCompactionRequest {
463 private final byte[] startRow, endRow;
464 private final int targetCount;
465 private final long targetKvs;
466
467
468
469
470
471
472
473
474
475 public SplitStripeCompactionRequest(CompactionRequest request,
476 byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
477 super(request);
478 this.startRow = startRow;
479 this.endRow = endRow;
480 this.targetCount = targetCount;
481 this.targetKvs = targetKvs;
482 }
483
484 public SplitStripeCompactionRequest(
485 CompactionRequest request, byte[] startRow, byte[] endRow, long targetKvs) {
486 this(request, startRow, endRow, Integer.MAX_VALUE, targetKvs);
487 }
488
489 public SplitStripeCompactionRequest(
490 Collection<StoreFile> files, byte[] startRow, byte[] endRow, long targetKvs) {
491 this(files, startRow, endRow, Integer.MAX_VALUE, targetKvs);
492 }
493
494 public SplitStripeCompactionRequest(Collection<StoreFile> files,
495 byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
496 this(new CompactionRequest(files), startRow, endRow, targetCount, targetKvs);
497 }
498
499 @Override
500 public List<Path> execute(StripeCompactor compactor) throws IOException {
501 return compactor.compact(this.request, this.targetCount, this.targetKvs,
502 this.startRow, this.endRow, this.majorRangeFromRow, this.majorRangeToRow);
503 }
504
505
506
507 public void setMajorRangeFull() {
508 setMajorRange(this.startRow, this.endRow);
509 }
510 }
511
512
513 public static interface StripeInformationProvider {
514 public Collection<StoreFile> getStorefiles();
515
516
517
518
519
520
521 public byte[] getStartRow(int stripeIndex);
522
523
524
525
526
527
528 public byte[] getEndRow(int stripeIndex);
529
530
531
532
533 public List<StoreFile> getLevel0Files();
534
535
536
537
538 public List<byte[]> getStripeBoundaries();
539
540
541
542
543 public ArrayList<ImmutableList<StoreFile>> getStripes();
544
545
546
547
548 public int getStripeCount();
549 }
550 }