1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.regionserver.Store;
30 import org.apache.hadoop.hbase.regionserver.InternalScanner;
31 import org.apache.hadoop.hbase.regionserver.ScanType;
32 import org.apache.hadoop.hbase.regionserver.StoreFile;
33 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
34
35
36
37
38 @InterfaceAudience.Private
39 public class DefaultCompactor extends Compactor {
40 public DefaultCompactor(final Configuration conf, final Store store) {
41 super(conf, store);
42 }
43
44
45
46
47 public List<Path> compact(final CompactionRequest request) throws IOException {
48 FileDetails fd = getFileDetails(request.getFiles(), request.isMajor());
49 this.progress = new CompactionProgress(fd.maxKeyCount);
50
51
52 long smallestReadPoint = getSmallestReadPoint();
53 List<StoreFileScanner> scanners = createFileScanners(request.getFiles(), smallestReadPoint);
54
55 StoreFile.Writer writer = null;
56 List<Path> newFiles = new ArrayList<Path>();
57 try {
58 InternalScanner scanner = null;
59 try {
60
61 ScanType scanType =
62 request.isMajor() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;
63 scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
64 if (scanner == null) {
65 scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);
66 }
67 scanner = postCreateCoprocScanner(request, scanType, scanner);
68 if (scanner == null) {
69
70 return newFiles;
71 }
72
73
74 writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
75 fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
76 boolean finished = performCompaction(scanner, writer, smallestReadPoint);
77 if (!finished) {
78 writer.close();
79 store.getFileSystem().delete(writer.getPath(), false);
80 writer = null;
81 throw new InterruptedIOException( "Aborting compaction of store " + store +
82 " in region " + store.getRegionInfo().getRegionNameAsString() +
83 " because it was interrupted.");
84 }
85 } finally {
86 if (scanner != null) {
87 scanner.close();
88 }
89 }
90 } finally {
91 if (writer != null) {
92 writer.appendMetadata(fd.maxSeqId, request.isMajor());
93 writer.close();
94 newFiles.add(writer.getPath());
95 }
96 }
97 return newFiles;
98 }
99
100
101
102
103
104
105
106
107
108
109
110 public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
111 throws IOException {
112 CompactionRequest cr = new CompactionRequest(filesToCompact);
113 cr.setIsMajor(isMajor);
114 return this.compact(cr);
115 }
116 }