1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.snapshot;
20
21 import java.io.BufferedInputStream;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.net.URI;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.Comparator;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Random;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.classification.InterfaceAudience;
36 import org.apache.hadoop.classification.InterfaceStability;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.conf.Configured;
39 import org.apache.hadoop.fs.FSDataInputStream;
40 import org.apache.hadoop.fs.FSDataOutputStream;
41 import org.apache.hadoop.fs.FileChecksum;
42 import org.apache.hadoop.fs.FileStatus;
43 import org.apache.hadoop.fs.FileSystem;
44 import org.apache.hadoop.fs.FileUtil;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.fs.permission.FsPermission;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.HBaseConfiguration;
49 import org.apache.hadoop.hbase.HConstants;
50 import org.apache.hadoop.hbase.io.HFileLink;
51 import org.apache.hadoop.hbase.io.HLogLink;
52 import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
53 import org.apache.hadoop.hbase.mapreduce.JobUtil;
54 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
55 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
56 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
57 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
58 import org.apache.hadoop.hbase.util.FSUtils;
59 import org.apache.hadoop.hbase.util.Pair;
60 import org.apache.hadoop.io.NullWritable;
61 import org.apache.hadoop.io.SequenceFile;
62 import org.apache.hadoop.io.Text;
63 import org.apache.hadoop.mapreduce.Job;
64 import org.apache.hadoop.mapreduce.Mapper;
65 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
66 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
67 import org.apache.hadoop.mapreduce.security.TokenCache;
68 import org.apache.hadoop.util.StringUtils;
69 import org.apache.hadoop.util.Tool;
70 import org.apache.hadoop.util.ToolRunner;
71
72
73
74
75
76
77
78
79 @InterfaceAudience.Public
80 @InterfaceStability.Evolving
81 public final class ExportSnapshot extends Configured implements Tool {
82 private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
83
84 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
85 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
86 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
87 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
88 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
89 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
90 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
91 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
92 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
93 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
94
95 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
96 static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
97
98 private static final String INPUT_FOLDER_PREFIX = "export-files.";
99
100
101 public enum Counter { MISSING_FILES, COPY_FAILED, BYTES_EXPECTED, BYTES_COPIED, FILES_COPIED };
102
103 private static class ExportMapper extends Mapper<Text, NullWritable, NullWritable, NullWritable> {
104 final static int REPORT_SIZE = 1 * 1024 * 1024;
105 final static int BUFFER_SIZE = 64 * 1024;
106
107 private boolean testFailures;
108 private Random random;
109
110 private boolean verifyChecksum;
111 private String filesGroup;
112 private String filesUser;
113 private short filesMode;
114 private int bufferSize;
115
116 private FileSystem outputFs;
117 private Path outputArchive;
118 private Path outputRoot;
119
120 private FileSystem inputFs;
121 private Path inputArchive;
122 private Path inputRoot;
123
124 @Override
125 public void setup(Context context) throws IOException {
126 Configuration conf = context.getConfiguration();
127 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
128
129 filesGroup = conf.get(CONF_FILES_GROUP);
130 filesUser = conf.get(CONF_FILES_USER);
131 filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
132 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
133 inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
134
135 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
136 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
137
138 testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
139
140 try {
141 inputFs = FileSystem.get(inputRoot.toUri(), conf);
142 } catch (IOException e) {
143 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
144 }
145
146 try {
147 outputFs = FileSystem.get(outputRoot.toUri(), conf);
148 } catch (IOException e) {
149 throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
150 }
151
152
153 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(), BUFFER_SIZE);
154 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
155 LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
156 }
157
158 @Override
159 public void map(Text key, NullWritable value, Context context)
160 throws InterruptedException, IOException {
161 Path inputPath = new Path(key.toString());
162 Path outputPath = getOutputPath(inputPath);
163
164 LOG.info("copy file input=" + inputPath + " output=" + outputPath);
165 copyFile(context, inputPath, outputPath);
166 }
167
168
169
170
171
172
173 private Path getOutputPath(final Path inputPath) throws IOException {
174 Path path;
175 if (HFileLink.isHFileLink(inputPath) || StoreFileInfo.isReference(inputPath)) {
176 String family = inputPath.getParent().getName();
177 TableName table =
178 HFileLink.getReferencedTableName(inputPath.getName());
179 String region = HFileLink.getReferencedRegionName(inputPath.getName());
180 String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
181 path = new Path(FSUtils.getTableDir(new Path("./"), table),
182 new Path(region, new Path(family, hfile)));
183 } else if (isHLogLinkPath(inputPath)) {
184 String logName = inputPath.getName();
185 path = new Path(new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME), logName);
186 } else {
187 path = inputPath;
188 }
189 return new Path(outputArchive, path);
190 }
191
192
193
194
195 private void injectTestFailure(final Context context, final Path inputPath)
196 throws IOException {
197 if (testFailures) {
198 if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
199 if (random == null) {
200 random = new Random();
201 }
202
203
204
205
206 if (random.nextFloat() < 0.03) {
207 throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputPath
208 + " time=" + System.currentTimeMillis());
209 }
210 } else {
211 context.getCounter(Counter.COPY_FAILED).increment(1);
212 throw new IOException("TEST FAILURE: Unable to copy input=" + inputPath);
213 }
214 }
215 }
216
217 private void copyFile(final Context context, final Path inputPath, final Path outputPath)
218 throws IOException {
219 injectTestFailure(context, inputPath);
220
221
222 FileStatus inputStat = getSourceFileStatus(context, inputPath);
223
224
225 if (outputFs.exists(outputPath)) {
226 FileStatus outputStat = outputFs.getFileStatus(outputPath);
227 if (outputStat != null && sameFile(inputStat, outputStat)) {
228 LOG.info("Skip copy " + inputPath + " to " + outputPath + ", same file.");
229 return;
230 }
231 }
232
233 InputStream in = openSourceFile(context, inputPath);
234 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
235 if (Integer.MAX_VALUE != bandwidthMB) {
236 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
237 }
238 try {
239 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
240
241
242 outputFs.mkdirs(outputPath.getParent());
243 FSDataOutputStream out = outputFs.create(outputPath, true);
244 try {
245 copyData(context, inputPath, in, outputPath, out, inputStat.getLen());
246 } finally {
247 out.close();
248 }
249
250
251 if (!preserveAttributes(outputPath, inputStat)) {
252 LOG.warn("You may have to run manually chown on: " + outputPath);
253 }
254 } finally {
255 in.close();
256 }
257 }
258
259
260
261
262
263
264
265
266
267 private boolean preserveAttributes(final Path path, final FileStatus refStat) {
268 FileStatus stat;
269 try {
270 stat = outputFs.getFileStatus(path);
271 } catch (IOException e) {
272 LOG.warn("Unable to get the status for file=" + path);
273 return false;
274 }
275
276 try {
277 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
278 outputFs.setPermission(path, new FsPermission(filesMode));
279 } else if (!stat.getPermission().equals(refStat.getPermission())) {
280 outputFs.setPermission(path, refStat.getPermission());
281 }
282 } catch (IOException e) {
283 LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
284 return false;
285 }
286
287 String user = stringIsNotEmpty(filesUser) ? filesUser : refStat.getOwner();
288 String group = stringIsNotEmpty(filesGroup) ? filesGroup : refStat.getGroup();
289 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
290 try {
291 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
292 outputFs.setOwner(path, user, group);
293 }
294 } catch (IOException e) {
295 LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
296 LOG.warn("The user/group may not exist on the destination cluster: user=" +
297 user + " group=" + group);
298 return false;
299 }
300 }
301
302 return true;
303 }
304
305 private boolean stringIsNotEmpty(final String str) {
306 return str != null && str.length() > 0;
307 }
308
309 private void copyData(final Context context,
310 final Path inputPath, final InputStream in,
311 final Path outputPath, final FSDataOutputStream out,
312 final long inputFileSize)
313 throws IOException {
314 final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
315 " (%.1f%%)";
316
317 try {
318 byte[] buffer = new byte[bufferSize];
319 long totalBytesWritten = 0;
320 int reportBytes = 0;
321 int bytesRead;
322
323 long stime = System.currentTimeMillis();
324 while ((bytesRead = in.read(buffer)) > 0) {
325 out.write(buffer, 0, bytesRead);
326 totalBytesWritten += bytesRead;
327 reportBytes += bytesRead;
328
329 if (reportBytes >= REPORT_SIZE) {
330 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
331 context.setStatus(String.format(statusMessage,
332 StringUtils.humanReadableInt(totalBytesWritten),
333 (totalBytesWritten/(float)inputFileSize) * 100.0f) +
334 " from " + inputPath + " to " + outputPath);
335 reportBytes = 0;
336 }
337 }
338 long etime = System.currentTimeMillis();
339
340 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
341 context.setStatus(String.format(statusMessage,
342 StringUtils.humanReadableInt(totalBytesWritten),
343 (totalBytesWritten/(float)inputFileSize) * 100.0f) +
344 " from " + inputPath + " to " + outputPath);
345
346
347 if (totalBytesWritten != inputFileSize) {
348 String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
349 " expected=" + inputFileSize + " for file=" + inputPath;
350 throw new IOException(msg);
351 }
352
353 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
354 LOG.info("size=" + totalBytesWritten +
355 " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
356 " time=" + StringUtils.formatTimeDiff(etime, stime) +
357 String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
358 context.getCounter(Counter.FILES_COPIED).increment(1);
359 } catch (IOException e) {
360 LOG.error("Error copying " + inputPath + " to " + outputPath, e);
361 context.getCounter(Counter.COPY_FAILED).increment(1);
362 throw e;
363 }
364 }
365
366
367
368
369
370
371 private FSDataInputStream openSourceFile(Context context, final Path path) throws IOException {
372 try {
373 if (HFileLink.isHFileLink(path) || StoreFileInfo.isReference(path)) {
374 return new HFileLink(inputRoot, inputArchive, path).open(inputFs);
375 } else if (isHLogLinkPath(path)) {
376 String serverName = path.getParent().getName();
377 String logName = path.getName();
378 return new HLogLink(inputRoot, serverName, logName).open(inputFs);
379 }
380 return inputFs.open(path);
381 } catch (IOException e) {
382 context.getCounter(Counter.MISSING_FILES).increment(1);
383 LOG.error("Unable to open source file=" + path, e);
384 throw e;
385 }
386 }
387
388 private FileStatus getSourceFileStatus(Context context, final Path path) throws IOException {
389 try {
390 if (HFileLink.isHFileLink(path) || StoreFileInfo.isReference(path)) {
391 HFileLink link = new HFileLink(inputRoot, inputArchive, path);
392 return link.getFileStatus(inputFs);
393 } else if (isHLogLinkPath(path)) {
394 String serverName = path.getParent().getName();
395 String logName = path.getName();
396 return new HLogLink(inputRoot, serverName, logName).getFileStatus(inputFs);
397 }
398 return inputFs.getFileStatus(path);
399 } catch (FileNotFoundException e) {
400 context.getCounter(Counter.MISSING_FILES).increment(1);
401 LOG.error("Unable to get the status for source file=" + path, e);
402 throw e;
403 } catch (IOException e) {
404 LOG.error("Unable to get the status for source file=" + path, e);
405 throw e;
406 }
407 }
408
409 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
410 try {
411 return fs.getFileChecksum(path);
412 } catch (IOException e) {
413 LOG.warn("Unable to get checksum for file=" + path, e);
414 return null;
415 }
416 }
417
418
419
420
421
422 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
423
424 if (inputStat.getLen() != outputStat.getLen()) return false;
425
426
427 if (!verifyChecksum) return true;
428
429
430 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
431 if (inChecksum == null) return false;
432
433 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
434 if (outChecksum == null) return false;
435
436 return inChecksum.equals(outChecksum);
437 }
438
439
440
441
442
443
444 private static boolean isHLogLinkPath(final Path path) {
445 return path.depth() == 2;
446 }
447 }
448
449
450
451
452
453 private List<Pair<Path, Long>> getSnapshotFiles(final FileSystem fs, final Path snapshotDir)
454 throws IOException {
455 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
456
457 final List<Pair<Path, Long>> files = new ArrayList<Pair<Path, Long>>();
458 final TableName table =
459 TableName.valueOf(snapshotDesc.getTable());
460 final Configuration conf = getConf();
461
462
463 SnapshotReferenceUtil.visitReferencedFiles(fs, snapshotDir,
464 new SnapshotReferenceUtil.FileVisitor() {
465 public void storeFile (final String region, final String family, final String hfile)
466 throws IOException {
467 Path path = HFileLink.createPath(table, region, family, hfile);
468 long size = new HFileLink(conf, path).getFileStatus(fs).getLen();
469 files.add(new Pair<Path, Long>(path, size));
470 }
471
472 public void recoveredEdits (final String region, final String logfile)
473 throws IOException {
474
475 }
476
477 public void logFile (final String server, final String logfile)
478 throws IOException {
479 long size = new HLogLink(conf, server, logfile).getFileStatus(fs).getLen();
480 files.add(new Pair<Path, Long>(new Path(server, logfile), size));
481 }
482 });
483
484 return files;
485 }
486
487
488
489
490
491
492
493
494
495 static List<List<Path>> getBalancedSplits(final List<Pair<Path, Long>> files, int ngroups) {
496
497 Collections.sort(files, new Comparator<Pair<Path, Long>>() {
498 public int compare(Pair<Path, Long> a, Pair<Path, Long> b) {
499 long r = a.getSecond() - b.getSecond();
500 return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
501 }
502 });
503
504
505 List<List<Path>> fileGroups = new LinkedList<List<Path>>();
506 long[] sizeGroups = new long[ngroups];
507 int hi = files.size() - 1;
508 int lo = 0;
509
510 List<Path> group;
511 int dir = 1;
512 int g = 0;
513
514 while (hi >= lo) {
515 if (g == fileGroups.size()) {
516 group = new LinkedList<Path>();
517 fileGroups.add(group);
518 } else {
519 group = fileGroups.get(g);
520 }
521
522 Pair<Path, Long> fileInfo = files.get(hi--);
523
524
525 sizeGroups[g] += fileInfo.getSecond();
526 group.add(fileInfo.getFirst());
527
528
529 g += dir;
530 if (g == ngroups) {
531 dir = -1;
532 g = ngroups - 1;
533 } else if (g < 0) {
534 dir = 1;
535 g = 0;
536 }
537 }
538
539 if (LOG.isDebugEnabled()) {
540 for (int i = 0; i < sizeGroups.length; ++i) {
541 LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
542 }
543 }
544
545 return fileGroups;
546 }
547
548 private static Path getInputFolderPath(Configuration conf)
549 throws IOException, InterruptedException {
550 Path stagingDir = JobUtil.getStagingDir(conf);
551 return new Path(stagingDir, INPUT_FOLDER_PREFIX +
552 String.valueOf(EnvironmentEdgeManager.currentTimeMillis()));
553 }
554
555
556
557
558
559
560
561 private static Path[] createInputFiles(final Configuration conf, final Path inputFolderPath,
562 final List<Pair<Path, Long>> snapshotFiles, int mappers)
563 throws IOException, InterruptedException {
564 FileSystem fs = inputFolderPath.getFileSystem(conf);
565 LOG.debug("Input folder location: " + inputFolderPath);
566
567 List<List<Path>> splits = getBalancedSplits(snapshotFiles, mappers);
568 Path[] inputFiles = new Path[splits.size()];
569
570 Text key = new Text();
571 for (int i = 0; i < inputFiles.length; i++) {
572 List<Path> files = splits.get(i);
573 inputFiles[i] = new Path(inputFolderPath, String.format("export-%d.seq", i));
574 SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inputFiles[i],
575 Text.class, NullWritable.class);
576 LOG.debug("Input split: " + i);
577 try {
578 for (Path file: files) {
579 LOG.debug(file.toString());
580 key.set(file.toString());
581 writer.append(key, NullWritable.get());
582 }
583 } finally {
584 writer.close();
585 }
586 }
587
588 return inputFiles;
589 }
590
591
592
593
594 private void runCopyJob(final Path inputRoot, final Path outputRoot,
595 final List<Pair<Path, Long>> snapshotFiles, final boolean verifyChecksum,
596 final String filesUser, final String filesGroup, final int filesMode,
597 final int mappers, final int bandwidthMB)
598 throws IOException, InterruptedException, ClassNotFoundException {
599 Configuration conf = getConf();
600 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
601 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
602 conf.setInt(CONF_FILES_MODE, filesMode);
603 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
604 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
605 conf.set(CONF_INPUT_ROOT, inputRoot.toString());
606 conf.setInt("mapreduce.job.maps", mappers);
607 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
608
609 Job job = new Job(conf);
610 job.setJobName("ExportSnapshot");
611 job.setJarByClass(ExportSnapshot.class);
612 TableMapReduceUtil.addDependencyJars(job);
613 job.setMapperClass(ExportMapper.class);
614 job.setInputFormatClass(SequenceFileInputFormat.class);
615 job.setOutputFormatClass(NullOutputFormat.class);
616 job.setMapSpeculativeExecution(false);
617 job.setNumReduceTasks(0);
618
619
620 Path inputFolderPath = getInputFolderPath(conf);
621 for (Path path: createInputFiles(conf, inputFolderPath, snapshotFiles, mappers)) {
622 LOG.debug("Add Input Path=" + path);
623 SequenceFileInputFormat.addInputPath(job, path);
624 }
625
626 try {
627
628 TokenCache.obtainTokensForNamenodes(job.getCredentials(),
629 new Path[] { inputRoot, outputRoot }, conf);
630
631
632 if (!job.waitForCompletion(true)) {
633
634
635 throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
636 }
637 } finally {
638
639 try {
640 inputFolderPath.getFileSystem(conf).delete(inputFolderPath, true);
641 } catch (IOException e) {
642 LOG.warn("Unable to remove MR input folder: " + inputFolderPath, e);
643 }
644 }
645 }
646
647 private void verifySnapshot(final Configuration baseConf,
648 final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
649
650 Configuration conf = new Configuration(baseConf);
651 FSUtils.setRootDir(conf, rootDir);
652 FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
653 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
654 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
655 }
656
657
658
659
660
661 @Override
662 public int run(String[] args) throws IOException {
663 boolean verifyTarget = true;
664 boolean verifyChecksum = true;
665 String snapshotName = null;
666 String targetName = null;
667 boolean overwrite = false;
668 String filesGroup = null;
669 String filesUser = null;
670 Path outputRoot = null;
671 int bandwidthMB = Integer.MAX_VALUE;
672 int filesMode = 0;
673 int mappers = 0;
674
675 Configuration conf = getConf();
676
677
678 for (int i = 0; i < args.length; i++) {
679 String cmd = args[i];
680 try {
681 if (cmd.equals("-snapshot")) {
682 snapshotName = args[++i];
683 } else if (cmd.equals("-target")) {
684 targetName = args[++i];
685 } else if (cmd.equals("-copy-to")) {
686 outputRoot = new Path(args[++i]);
687 } else if (cmd.equals("-copy-from")) {
688 Path sourceDir = new Path(args[++i]);
689 URI defaultFs = sourceDir.getFileSystem(conf).getUri();
690 FSUtils.setFsDefault(conf, new Path(defaultFs));
691 FSUtils.setRootDir(conf, sourceDir);
692 } else if (cmd.equals("-no-checksum-verify")) {
693 verifyChecksum = false;
694 } else if (cmd.equals("-no-target-verify")) {
695 verifyTarget = false;
696 } else if (cmd.equals("-mappers")) {
697 mappers = Integer.parseInt(args[++i]);
698 } else if (cmd.equals("-chuser")) {
699 filesUser = args[++i];
700 } else if (cmd.equals("-chgroup")) {
701 filesGroup = args[++i];
702 } else if (cmd.equals("-bandwidth")) {
703 bandwidthMB = Integer.parseInt(args[++i]);
704 } else if (cmd.equals("-chmod")) {
705 filesMode = Integer.parseInt(args[++i], 8);
706 } else if (cmd.equals("-overwrite")) {
707 overwrite = true;
708 } else if (cmd.equals("-h") || cmd.equals("--help")) {
709 printUsageAndExit();
710 } else {
711 System.err.println("UNEXPECTED: " + cmd);
712 printUsageAndExit();
713 }
714 } catch (Exception e) {
715 printUsageAndExit();
716 }
717 }
718
719
720 if (snapshotName == null) {
721 System.err.println("Snapshot name not provided.");
722 printUsageAndExit();
723 }
724
725 if (outputRoot == null) {
726 System.err.println("Destination file-system not provided.");
727 printUsageAndExit();
728 }
729
730 if (targetName == null) {
731 targetName = snapshotName;
732 }
733
734 Path inputRoot = FSUtils.getRootDir(conf);
735 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), conf);
736 LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
737 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf);
738 LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
739
740 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
741
742 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
743 Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
744 Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
745 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
746
747
748 if (outputFs.exists(outputSnapshotDir)) {
749 if (overwrite) {
750 if (!outputFs.delete(outputSnapshotDir, true)) {
751 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
752 return 1;
753 }
754 } else {
755 System.err.println("The snapshot '" + targetName +
756 "' already exists in the destination: " + outputSnapshotDir);
757 return 1;
758 }
759 }
760
761 if (!skipTmp) {
762
763 if (outputFs.exists(snapshotTmpDir)) {
764 if (overwrite) {
765 if (!outputFs.delete(snapshotTmpDir, true)) {
766 System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
767 return 1;
768 }
769 } else {
770 System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
771 System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
772 System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
773 return 1;
774 }
775 }
776 }
777
778
779 LOG.info("Loading Snapshot hfile list");
780 final List<Pair<Path, Long>> files = getSnapshotFiles(inputFs, snapshotDir);
781 if (mappers == 0 && files.size() > 0) {
782 mappers = 1 + (files.size() / conf.getInt(CONF_MAP_GROUP, 10));
783 mappers = Math.min(mappers, files.size());
784 }
785
786
787
788
789 try {
790 LOG.info("Copy Snapshot Manifest");
791 FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
792 } catch (IOException e) {
793 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
794 snapshotDir + " to=" + initialOutputSnapshotDir, e);
795 }
796
797
798 if (!targetName.equals(snapshotName)) {
799 SnapshotDescription snapshotDesc =
800 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
801 .toBuilder()
802 .setName(targetName)
803 .build();
804 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
805 }
806
807
808
809
810 try {
811 if (files.size() == 0) {
812 LOG.warn("There are 0 store file to be copied. There may be no data in the table.");
813 } else {
814 runCopyJob(inputRoot, outputRoot, files, verifyChecksum,
815 filesUser, filesGroup, filesMode, mappers, bandwidthMB);
816 }
817
818
819 LOG.info("Finalize the Snapshot Export");
820 if (!skipTmp) {
821
822 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
823 throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
824 snapshotTmpDir + " to=" + outputSnapshotDir);
825 }
826 }
827
828
829 if (verifyTarget) {
830 LOG.info("Verify snapshot integrity");
831 verifySnapshot(conf, outputFs, outputRoot, outputSnapshotDir);
832 }
833
834 LOG.info("Export Completed: " + targetName);
835 return 0;
836 } catch (Exception e) {
837 LOG.error("Snapshot export failed", e);
838 if (!skipTmp) {
839 outputFs.delete(snapshotTmpDir, true);
840 }
841 outputFs.delete(outputSnapshotDir, true);
842 return 1;
843 }
844 }
845
846
847 private void printUsageAndExit() {
848 System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
849 System.err.println(" where [options] are:");
850 System.err.println(" -h|-help Show this help and exit.");
851 System.err.println(" -snapshot NAME Snapshot to restore.");
852 System.err.println(" -copy-to NAME Remote destination hdfs://");
853 System.err.println(" -copy-from NAME Input folder hdfs:// (default hbase.rootdir)");
854 System.err.println(" -no-checksum-verify Do not verify checksum, use name+length only.");
855 System.err.println(" -no-target-verify Do not verify the integrity of the \\" +
856 "exported snapshot.");
857 System.err.println(" -overwrite Rewrite the snapshot manifest if already exists");
858 System.err.println(" -chuser USERNAME Change the owner of the files to the specified one.");
859 System.err.println(" -chgroup GROUP Change the group of the files to the specified one.");
860 System.err.println(" -chmod MODE Change the permission of the files to the specified one.");
861 System.err.println(" -mappers Number of mappers to use during the copy (mapreduce.job.maps).");
862 System.err.println();
863 System.err.println("Examples:");
864 System.err.println(" hbase " + getClass().getName() + " \\");
865 System.err.println(" -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
866 System.err.println(" -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
867 System.err.println();
868 System.err.println(" hbase " + getClass().getName() + " \\");
869 System.err.println(" -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
870 System.err.println(" -copy-to hdfs://srv1:50070/hbase \\");
871 System.exit(1);
872 }
873
874
875
876
877
878
879
880
881 static int innerMain(final Configuration conf, final String [] args) throws Exception {
882 return ToolRunner.run(conf, new ExportSnapshot(), args);
883 }
884
885 public static void main(String[] args) throws Exception {
886 System.exit(innerMain(HBaseConfiguration.create(), args));
887 }
888 }