1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.List;
24 import java.util.Map;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.KeyValueUtil;
34 import org.apache.hadoop.hbase.client.Scan;
35 import org.apache.hadoop.hbase.io.compress.Compression;
36 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
37 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
38 import org.apache.hadoop.hbase.regionserver.HStore;
39 import org.apache.hadoop.hbase.regionserver.InternalScanner;
40 import org.apache.hadoop.hbase.regionserver.ScanType;
41 import org.apache.hadoop.hbase.regionserver.Store;
42 import org.apache.hadoop.hbase.regionserver.StoreFile;
43 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
44 import org.apache.hadoop.hbase.regionserver.StoreScanner;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.util.StringUtils;
47
48
49
50
51
52 @InterfaceAudience.Private
53 public abstract class Compactor {
54 private static final Log LOG = LogFactory.getLog(Compactor.class);
55 protected CompactionProgress progress;
56 protected Configuration conf;
57 protected Store store;
58
59 private int compactionKVMax;
60 protected Compression.Algorithm compactionCompression;
61
62
63 Compactor(final Configuration conf, final Store store) {
64 this.conf = conf;
65 this.store = store;
66 this.compactionKVMax =
67 this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
68 this.compactionCompression = (this.store.getFamily() == null) ?
69 Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
70 }
71
72
73
74
75 public interface CellSink {
76 void append(KeyValue kv) throws IOException;
77 }
78
79 public CompactionProgress getProgress() {
80 return this.progress;
81 }
82
83
84 protected static class FileDetails {
85
86 public long maxKeyCount = 0;
87
88 public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
89
90 public long maxSeqId = 0;
91
92 public long maxMVCCReadpoint = 0;
93
94 public int maxTagsLength = 0;
95 }
96
97
98
99
100
101
102
103 protected FileDetails getFileDetails(
104 Collection<StoreFile> filesToCompact, boolean calculatePutTs) throws IOException {
105 FileDetails fd = new FileDetails();
106
107 for (StoreFile file : filesToCompact) {
108 long seqNum = file.getMaxSequenceId();
109 fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
110 StoreFile.Reader r = file.getReader();
111 if (r == null) {
112 LOG.warn("Null reader for " + file.getPath());
113 continue;
114 }
115
116
117 long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType())
118 ? r.getFilterEntries() : r.getEntries();
119 fd.maxKeyCount += keyCount;
120
121 Map<byte[], byte[]> fileInfo = r.loadFileInfo();
122 byte tmp[] = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
123 if (tmp != null) {
124 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
125 }
126 tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
127 if (tmp != null) {
128 fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
129 }
130
131
132 long earliestPutTs = 0;
133 if (calculatePutTs) {
134 tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
135 if (tmp == null) {
136
137
138 fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
139 } else {
140 earliestPutTs = Bytes.toLong(tmp);
141 fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
142 }
143 }
144 if (LOG.isDebugEnabled()) {
145 LOG.debug("Compacting " + file +
146 ", keycount=" + keyCount +
147 ", bloomtype=" + r.getBloomFilterType().toString() +
148 ", size=" + StringUtils.humanReadableInt(r.length()) +
149 ", encoding=" + r.getHFileReader().getDataBlockEncoding() +
150 ", seqNum=" + seqNum +
151 (calculatePutTs ? ", earliestPutTs=" + earliestPutTs: ""));
152 }
153 }
154 return fd;
155 }
156
157
158
159
160
161
162 protected List<StoreFileScanner> createFileScanners(
163 final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {
164 return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
165 smallestReadPoint);
166 }
167
168 protected long getSmallestReadPoint() {
169 return store.getSmallestReadPoint();
170 }
171
172
173
174
175
176
177
178
179
180 protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
181 ScanType scanType, long earliestPutTs, List<StoreFileScanner> scanners) throws IOException {
182 if (store.getCoprocessorHost() == null) return null;
183 return store.getCoprocessorHost()
184 .preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
185 }
186
187
188
189
190
191
192
193
194 protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
195 ScanType scanType, InternalScanner scanner) throws IOException {
196 if (store.getCoprocessorHost() == null) return scanner;
197 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
198 }
199
200
201
202
203
204
205
206
207 protected boolean performCompaction(InternalScanner scanner,
208 CellSink writer, long smallestReadPoint) throws IOException {
209 int bytesWritten = 0;
210
211
212 List<Cell> kvs = new ArrayList<Cell>();
213
214 int closeCheckInterval = HStore.getCloseCheckInterval();
215 boolean hasMore;
216 do {
217 hasMore = scanner.next(kvs, compactionKVMax);
218
219 for (Cell c : kvs) {
220 KeyValue kv = KeyValueUtil.ensureKeyValue(c);
221 if (kv.getMvccVersion() <= smallestReadPoint) {
222 kv.setMvccVersion(0);
223 }
224 writer.append(kv);
225 ++progress.currentCompactedKVs;
226
227
228 if (closeCheckInterval > 0) {
229 bytesWritten += kv.getLength();
230 if (bytesWritten > closeCheckInterval) {
231 bytesWritten = 0;
232 if (!store.areWritesEnabled()) {
233 progress.cancel();
234 return false;
235 }
236 }
237 }
238 }
239 kvs.clear();
240 } while (hasMore);
241 progress.complete();
242 return true;
243 }
244
245
246
247
248
249
250
251
252
253 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
254 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
255 Scan scan = new Scan();
256 scan.setMaxVersions(store.getFamily().getMaxVersions());
257 return new StoreScanner(store, store.getScanInfo(), scan, scanners,
258 scanType, smallestReadPoint, earliestPutTs);
259 }
260
261
262
263
264
265
266
267
268
269
270 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
271 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
272 byte[] dropDeletesToRow) throws IOException {
273 Scan scan = new Scan();
274 scan.setMaxVersions(store.getFamily().getMaxVersions());
275 return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
276 earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
277 }
278 }