1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Set;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30
31 import org.apache.hadoop.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.conf.Configured;
34 import org.apache.hadoop.fs.FileStatus;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.FSDataOutputStream;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.io.LongWritable;
39 import org.apache.hadoop.io.NullWritable;
40 import org.apache.hadoop.io.Text;
41 import org.apache.hadoop.util.LineReader;
42 import org.apache.hadoop.util.Tool;
43 import org.apache.hadoop.util.ToolRunner;
44
45 import org.apache.hadoop.mapreduce.InputSplit;
46 import org.apache.hadoop.mapreduce.Job;
47 import org.apache.hadoop.mapreduce.JobContext;
48 import org.apache.hadoop.mapreduce.Mapper;
49 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
50 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
51 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
52
53 import org.apache.hadoop.hbase.HBaseConfiguration;
54 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
55 import org.apache.hadoop.hbase.HTableDescriptor;
56 import org.apache.hadoop.hbase.HRegionInfo;
57 import org.apache.hadoop.hbase.regionserver.HRegion;
58 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
59 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
60 import org.apache.hadoop.hbase.mapreduce.JobUtil;
61 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
62 import org.apache.hadoop.hbase.util.Bytes;
63 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
64 import org.apache.hadoop.hbase.util.FSTableDescriptors;
65 import org.apache.hadoop.hbase.util.FSUtils;
66
67
68
69
70
71
72
73
74
75 @InterfaceAudience.Public
76 public class CompactionTool extends Configured implements Tool {
77 private static final Log LOG = LogFactory.getLog(CompactionTool.class);
78
79 private final static String CONF_TMP_DIR = "hbase.tmp.dir";
80 private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
81 private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
82 private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
83 private final static String CONF_COMPLETE_COMPACTION = "hbase.hstore.compaction.complete";
84
85
86
87
88
89 private static class CompactionWorker {
90 private final boolean keepCompactedFiles;
91 private final boolean deleteCompacted;
92 private final Configuration conf;
93 private final FileSystem fs;
94 private final Path tmpDir;
95
96 public CompactionWorker(final FileSystem fs, final Configuration conf) {
97 this.conf = conf;
98 this.keepCompactedFiles = !conf.getBoolean(CONF_COMPLETE_COMPACTION, true);
99 this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
100 this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
101 this.fs = fs;
102 }
103
104
105
106
107
108
109
110
111 public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException {
112 if (isFamilyDir(fs, path)) {
113 Path regionDir = path.getParent();
114 Path tableDir = regionDir.getParent();
115 HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
116 HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
117 compactStoreFiles(tableDir, htd, hri, path.getName(), compactOnce, major);
118 } else if (isRegionDir(fs, path)) {
119 Path tableDir = path.getParent();
120 HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
121 compactRegion(tableDir, htd, path, compactOnce, major);
122 } else if (isTableDir(fs, path)) {
123 compactTable(path, compactOnce, major);
124 } else {
125 throw new IOException(
126 "Specified path is not a table, region or family directory. path=" + path);
127 }
128 }
129
130 private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major)
131 throws IOException {
132 HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
133 for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
134 compactRegion(tableDir, htd, regionDir, compactOnce, major);
135 }
136 }
137
138 private void compactRegion(final Path tableDir, final HTableDescriptor htd,
139 final Path regionDir, final boolean compactOnce, final boolean major)
140 throws IOException {
141 HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
142 for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
143 compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major);
144 }
145 }
146
147
148
149
150
151
152 private void compactStoreFiles(final Path tableDir, final HTableDescriptor htd,
153 final HRegionInfo hri, final String familyName, final boolean compactOnce,
154 final boolean major) throws IOException {
155 HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir);
156 LOG.info("Compact table=" + htd.getTableName() +
157 " region=" + hri.getRegionNameAsString() +
158 " family=" + familyName);
159 if (major) {
160 store.triggerMajorCompaction();
161 }
162 do {
163 CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null);
164 if (compaction == null) break;
165 List<StoreFile> storeFiles = store.compact(compaction);
166 if (storeFiles != null && !storeFiles.isEmpty()) {
167 if (keepCompactedFiles && deleteCompacted) {
168 for (StoreFile storeFile: storeFiles) {
169 fs.delete(storeFile.getPath(), false);
170 }
171 }
172 }
173 } while (store.needsCompaction() && !compactOnce);
174 }
175
176
177
178
179
180 private static HStore getStore(final Configuration conf, final FileSystem fs,
181 final Path tableDir, final HTableDescriptor htd, final HRegionInfo hri,
182 final String familyName, final Path tempDir) throws IOException {
183 HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) {
184 @Override
185 public Path getTempDir() {
186 return tempDir;
187 }
188 };
189 HRegion region = new HRegion(regionFs, null, conf, htd, null);
190 return new HStore(region, htd.getFamily(Bytes.toBytes(familyName)), conf);
191 }
192 }
193
194 private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
195 Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE);
196 return fs.exists(regionInfo);
197 }
198
199 private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
200 return FSTableDescriptors.getTableInfoPath(fs, path) != null;
201 }
202
203 private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException {
204 return isRegionDir(fs, path.getParent());
205 }
206
207 private static class CompactionMapper
208 extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
209 private CompactionWorker compactor = null;
210 private boolean compactOnce = false;
211 private boolean major = false;
212
213 @Override
214 public void setup(Context context) {
215 Configuration conf = context.getConfiguration();
216 compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false);
217 major = conf.getBoolean(CONF_COMPACT_MAJOR, false);
218
219 try {
220 FileSystem fs = FileSystem.get(conf);
221 this.compactor = new CompactionWorker(fs, conf);
222 } catch (IOException e) {
223 throw new RuntimeException("Could not get the input FileSystem", e);
224 }
225 }
226
227 @Override
228 public void map(LongWritable key, Text value, Context context)
229 throws InterruptedException, IOException {
230 Path path = new Path(value.toString());
231 this.compactor.compact(path, compactOnce, major);
232 }
233 }
234
235
236
237
238 private static class CompactionInputFormat extends TextInputFormat {
239 @Override
240 protected boolean isSplitable(JobContext context, Path file) {
241 return true;
242 }
243
244
245
246
247
248 @Override
249 public List<InputSplit> getSplits(JobContext job) throws IOException {
250 List<InputSplit> splits = new ArrayList<InputSplit>();
251 List<FileStatus> files = listStatus(job);
252
253 Text key = new Text();
254 for (FileStatus file: files) {
255 Path path = file.getPath();
256 FileSystem fs = path.getFileSystem(job.getConfiguration());
257 LineReader reader = new LineReader(fs.open(path));
258 long pos = 0;
259 int n;
260 try {
261 while ((n = reader.readLine(key)) > 0) {
262 String[] hosts = getStoreDirHosts(fs, path);
263 splits.add(new FileSplit(path, pos, n, hosts));
264 pos += n;
265 }
266 } finally {
267 reader.close();
268 }
269 }
270
271 return splits;
272 }
273
274
275
276
277 private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
278 throws IOException {
279 FileStatus[] files = FSUtils.listStatus(fs, path);
280 if (files == null) {
281 return new String[] {};
282 }
283
284 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
285 for (FileStatus hfileStatus: files) {
286 HDFSBlocksDistribution storeFileBlocksDistribution =
287 FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen());
288 hdfsBlocksDistribution.add(storeFileBlocksDistribution);
289 }
290
291 List<String> hosts = hdfsBlocksDistribution.getTopHosts();
292 return hosts.toArray(new String[hosts.size()]);
293 }
294
295
296
297
298
299
300 public static void createInputFile(final FileSystem fs, final Path path,
301 final Set<Path> toCompactDirs) throws IOException {
302
303 List<Path> storeDirs = new LinkedList<Path>();
304 for (Path compactDir: toCompactDirs) {
305 if (isFamilyDir(fs, compactDir)) {
306 storeDirs.add(compactDir);
307 } else if (isRegionDir(fs, compactDir)) {
308 for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) {
309 storeDirs.add(familyDir);
310 }
311 } else if (isTableDir(fs, compactDir)) {
312
313 for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
314 for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
315 storeDirs.add(familyDir);
316 }
317 }
318 } else {
319 throw new IOException(
320 "Specified path is not a table, region or family directory. path=" + compactDir);
321 }
322 }
323
324
325 FSDataOutputStream stream = fs.create(path);
326 LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
327 try {
328 final byte[] newLine = Bytes.toBytes("\n");
329 for (Path storeDir: storeDirs) {
330 stream.write(Bytes.toBytes(storeDir.toString()));
331 stream.write(newLine);
332 }
333 } finally {
334 stream.close();
335 }
336 }
337 }
338
339
340
341
342 private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
343 final boolean compactOnce, final boolean major) throws Exception {
344 Configuration conf = getConf();
345 conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
346 conf.setBoolean(CONF_COMPACT_MAJOR, major);
347
348 Job job = new Job(conf);
349 job.setJobName("CompactionTool");
350 job.setJarByClass(CompactionTool.class);
351 job.setMapperClass(CompactionMapper.class);
352 job.setInputFormatClass(CompactionInputFormat.class);
353 job.setOutputFormatClass(NullOutputFormat.class);
354 job.setMapSpeculativeExecution(false);
355 job.setNumReduceTasks(0);
356
357
358 TableMapReduceUtil.addDependencyJars(job);
359
360 TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
361 org.cliffc.high_scale_lib.Counter.class);
362
363 Path stagingDir = JobUtil.getStagingDir(conf);
364 try {
365
366 Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTimeMillis());
367 CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs);
368 CompactionInputFormat.addInputPath(job, inputPath);
369
370
371 TableMapReduceUtil.initCredentials(job);
372
373
374 return job.waitForCompletion(true) ? 0 : 1;
375 } finally {
376 fs.delete(stagingDir, true);
377 }
378 }
379
380
381
382
383 private int doClient(final FileSystem fs, final Set<Path> toCompactDirs,
384 final boolean compactOnce, final boolean major) throws IOException {
385 CompactionWorker worker = new CompactionWorker(fs, getConf());
386 for (Path path: toCompactDirs) {
387 worker.compact(path, compactOnce, major);
388 }
389 return 0;
390 }
391
392 @Override
393 public int run(String[] args) throws Exception {
394 Set<Path> toCompactDirs = new HashSet<Path>();
395 boolean compactOnce = false;
396 boolean major = false;
397 boolean mapred = false;
398
399 Configuration conf = getConf();
400 FileSystem fs = FileSystem.get(conf);
401
402 try {
403 for (int i = 0; i < args.length; ++i) {
404 String opt = args[i];
405 if (opt.equals("-compactOnce")) {
406 compactOnce = true;
407 } else if (opt.equals("-major")) {
408 major = true;
409 } else if (opt.equals("-mapred")) {
410 mapred = true;
411 } else if (!opt.startsWith("-")) {
412 Path path = new Path(opt);
413 FileStatus status = fs.getFileStatus(path);
414 if (!status.isDir()) {
415 printUsage("Specified path is not a directory. path=" + path);
416 return 1;
417 }
418 toCompactDirs.add(path);
419 } else {
420 printUsage();
421 }
422 }
423 } catch (Exception e) {
424 printUsage(e.getMessage());
425 return 1;
426 }
427
428 if (toCompactDirs.size() == 0) {
429 printUsage("No directories to compact specified.");
430 return 1;
431 }
432
433
434 if (mapred) {
435 return doMapReduce(fs, toCompactDirs, compactOnce, major);
436 } else {
437 return doClient(fs, toCompactDirs, compactOnce, major);
438 }
439 }
440
441 private void printUsage() {
442 printUsage(null);
443 }
444
445 private void printUsage(final String message) {
446 if (message != null && message.length() > 0) {
447 System.err.println(message);
448 }
449 System.err.println("Usage: java " + this.getClass().getName() + " \\");
450 System.err.println(" [-compactOnce] [-major] [-mapred] [-D<property=value>]* files...");
451 System.err.println();
452 System.err.println("Options:");
453 System.err.println(" mapred Use MapReduce to run compaction.");
454 System.err.println(" compactOnce Execute just one compaction step. (default: while needed)");
455 System.err.println(" major Trigger major compaction.");
456 System.err.println();
457 System.err.println("Note: -D properties will be applied to the conf used. ");
458 System.err.println("For example: ");
459 System.err.println(" To preserve input files, pass -D"+CONF_COMPLETE_COMPACTION+"=false");
460 System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
461 System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
462 System.err.println();
463 System.err.println("Examples:");
464 System.err.println(" To compact the full 'TestTable' using MapReduce:");
465 System.err.println(" $ bin/hbase " + this.getClass().getName() + " -mapred hdfs:///hbase/data/default/TestTable");
466 System.err.println();
467 System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
468 System.err.println(" $ bin/hbase " + this.getClass().getName() + " hdfs:///hbase/data/default/TestTable/abc/x");
469 }
470
471 public static void main(String[] args) throws Exception {
472 System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
473 }
474 }