View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.snapshot;
20  
21  import java.io.BufferedInputStream;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.net.URI;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.Comparator;
29  import java.util.LinkedList;
30  import java.util.List;
31  import java.util.Random;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.classification.InterfaceAudience;
36  import org.apache.hadoop.classification.InterfaceStability;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.conf.Configured;
39  import org.apache.hadoop.fs.FSDataInputStream;
40  import org.apache.hadoop.fs.FSDataOutputStream;
41  import org.apache.hadoop.fs.FileChecksum;
42  import org.apache.hadoop.fs.FileStatus;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.FileUtil;
45  import org.apache.hadoop.fs.Path;
46  import org.apache.hadoop.fs.permission.FsPermission;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.HBaseConfiguration;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.io.HFileLink;
51  import org.apache.hadoop.hbase.io.HLogLink;
52  import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
53  import org.apache.hadoop.hbase.mapreduce.JobUtil;
54  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
55  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
56  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
57  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
58  import org.apache.hadoop.hbase.util.FSUtils;
59  import org.apache.hadoop.hbase.util.Pair;
60  import org.apache.hadoop.io.NullWritable;
61  import org.apache.hadoop.io.SequenceFile;
62  import org.apache.hadoop.io.Text;
63  import org.apache.hadoop.mapreduce.Job;
64  import org.apache.hadoop.mapreduce.Mapper;
65  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
66  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
67  import org.apache.hadoop.mapreduce.security.TokenCache;
68  import org.apache.hadoop.util.StringUtils;
69  import org.apache.hadoop.util.Tool;
70  import org.apache.hadoop.util.ToolRunner;
71  
72  /**
73   * Export the specified snapshot to a given FileSystem.
74   *
75   * The .snapshot/name folder is copied to the destination cluster
76   * and then all the hfiles/hlogs are copied using a Map-Reduce Job in the .archive/ location.
77   * When everything is done, the second cluster can restore the snapshot.
78   */
79  @InterfaceAudience.Public
80  @InterfaceStability.Evolving
81  public final class ExportSnapshot extends Configured implements Tool {
82    private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
83  
84    private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
85    private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
86    private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
87    private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
88    private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
89    private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
90    private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
91    private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
92    private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
93    protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
94  
95    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
96    static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
97  
98    private static final String INPUT_FOLDER_PREFIX = "export-files.";
99  
100   // Export Map-Reduce Counters, to keep track of the progress
101   public enum Counter { MISSING_FILES, COPY_FAILED, BYTES_EXPECTED, BYTES_COPIED, FILES_COPIED };
102 
103   private static class ExportMapper extends Mapper<Text, NullWritable, NullWritable, NullWritable> {
104     final static int REPORT_SIZE = 1 * 1024 * 1024;
105     final static int BUFFER_SIZE = 64 * 1024;
106 
107     private boolean testFailures;
108     private Random random;
109 
110     private boolean verifyChecksum;
111     private String filesGroup;
112     private String filesUser;
113     private short filesMode;
114     private int bufferSize;
115 
116     private FileSystem outputFs;
117     private Path outputArchive;
118     private Path outputRoot;
119 
120     private FileSystem inputFs;
121     private Path inputArchive;
122     private Path inputRoot;
123 
124     @Override
125     public void setup(Context context) throws IOException {
126       Configuration conf = context.getConfiguration();
127       verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
128 
129       filesGroup = conf.get(CONF_FILES_GROUP);
130       filesUser = conf.get(CONF_FILES_USER);
131       filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
132       outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
133       inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
134 
135       inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
136       outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
137 
138       testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
139 
140       try {
141         inputFs = FileSystem.get(inputRoot.toUri(), conf);
142       } catch (IOException e) {
143         throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
144       }
145 
146       try {
147         outputFs = FileSystem.get(outputRoot.toUri(), conf);
148       } catch (IOException e) {
149         throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
150       }
151 
152       // Use the default block size of the outputFs if bigger
153       int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(), BUFFER_SIZE);
154       bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
155       LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
156     }
157 
158     @Override
159     public void map(Text key, NullWritable value, Context context)
160         throws InterruptedException, IOException {
161       Path inputPath = new Path(key.toString());
162       Path outputPath = getOutputPath(inputPath);
163 
164       LOG.info("copy file input=" + inputPath + " output=" + outputPath);
165       copyFile(context, inputPath, outputPath);
166     }
167 
168     /**
169      * Returns the location where the inputPath will be copied.
170      *  - hfiles are encoded as hfile links hfile-region-table
171      *  - logs are encoded as serverName/logName
172      */
173     private Path getOutputPath(final Path inputPath) throws IOException {
174       Path path;
175       if (HFileLink.isHFileLink(inputPath) || StoreFileInfo.isReference(inputPath)) {
176         String family = inputPath.getParent().getName();
177         TableName table =
178             HFileLink.getReferencedTableName(inputPath.getName());
179         String region = HFileLink.getReferencedRegionName(inputPath.getName());
180         String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
181         path = new Path(FSUtils.getTableDir(new Path("./"), table),
182             new Path(region, new Path(family, hfile)));
183       } else if (isHLogLinkPath(inputPath)) {
184         String logName = inputPath.getName();
185         path = new Path(new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME), logName);
186       } else {
187         path = inputPath;
188       }
189       return new Path(outputArchive, path);
190     }
191 
192     /*
193      * Used by TestExportSnapshot to simulate a failure
194      */
195     private void injectTestFailure(final Context context, final Path inputPath)
196         throws IOException {
197       if (testFailures) {
198         if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
199           if (random == null) {
200             random = new Random();
201           }
202 
203           // FLAKY-TEST-WARN: lower is better, we can get some runs without the
204           // retry, but at least we reduce the number of test failures due to
205           // this test exception from the same map task.
206           if (random.nextFloat() < 0.03) {
207             throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputPath
208                                   + " time=" + System.currentTimeMillis());
209           }
210         } else {
211           context.getCounter(Counter.COPY_FAILED).increment(1);
212           throw new IOException("TEST FAILURE: Unable to copy input=" + inputPath);
213         }
214       }
215     }
216 
217     private void copyFile(final Context context, final Path inputPath, final Path outputPath)
218         throws IOException {
219       injectTestFailure(context, inputPath);
220 
221       // Get the file information
222       FileStatus inputStat = getSourceFileStatus(context, inputPath);
223 
224       // Verify if the output file exists and is the same that we want to copy
225       if (outputFs.exists(outputPath)) {
226         FileStatus outputStat = outputFs.getFileStatus(outputPath);
227         if (outputStat != null && sameFile(inputStat, outputStat)) {
228           LOG.info("Skip copy " + inputPath + " to " + outputPath + ", same file.");
229           return;
230         }
231       }
232 
233       InputStream in = openSourceFile(context, inputPath);
234       int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
235       if (Integer.MAX_VALUE != bandwidthMB) {
236         in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
237       }
238       try {
239         context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
240 
241         // Ensure that the output folder is there and copy the file
242         outputFs.mkdirs(outputPath.getParent());
243         FSDataOutputStream out = outputFs.create(outputPath, true);
244         try {
245           copyData(context, inputPath, in, outputPath, out, inputStat.getLen());
246         } finally {
247           out.close();
248         }
249 
250         // Try to Preserve attributes
251         if (!preserveAttributes(outputPath, inputStat)) {
252           LOG.warn("You may have to run manually chown on: " + outputPath);
253         }
254       } finally {
255         in.close();
256       }
257     }
258 
259     /**
260      * Try to Preserve the files attribute selected by the user copying them from the source file
261      * This is only required when you are exporting as a different user than "hbase" or on a system
262      * that doesn't have the "hbase" user.
263      *
264      * This is not considered a blocking failure since the user can force a chmod with the user
265      * that knows is available on the system.
266      */
267     private boolean preserveAttributes(final Path path, final FileStatus refStat) {
268       FileStatus stat;
269       try {
270         stat = outputFs.getFileStatus(path);
271       } catch (IOException e) {
272         LOG.warn("Unable to get the status for file=" + path);
273         return false;
274       }
275 
276       try {
277         if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
278           outputFs.setPermission(path, new FsPermission(filesMode));
279         } else if (!stat.getPermission().equals(refStat.getPermission())) {
280           outputFs.setPermission(path, refStat.getPermission());
281         }
282       } catch (IOException e) {
283         LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
284         return false;
285       }
286 
287       String user = stringIsNotEmpty(filesUser) ? filesUser : refStat.getOwner();
288       String group = stringIsNotEmpty(filesGroup) ? filesGroup : refStat.getGroup();
289       if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
290         try {
291           if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
292             outputFs.setOwner(path, user, group);
293           }
294         } catch (IOException e) {
295           LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
296           LOG.warn("The user/group may not exist on the destination cluster: user=" +
297                    user + " group=" + group);
298           return false;
299         }
300       }
301 
302       return true;
303     }
304 
305     private boolean stringIsNotEmpty(final String str) {
306       return str != null && str.length() > 0;
307     }
308 
309     private void copyData(final Context context,
310         final Path inputPath, final InputStream in,
311         final Path outputPath, final FSDataOutputStream out,
312         final long inputFileSize)
313         throws IOException {
314       final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
315                                    " (%.1f%%)";
316 
317       try {
318         byte[] buffer = new byte[bufferSize];
319         long totalBytesWritten = 0;
320         int reportBytes = 0;
321         int bytesRead;
322 
323         long stime = System.currentTimeMillis();
324         while ((bytesRead = in.read(buffer)) > 0) {
325           out.write(buffer, 0, bytesRead);
326           totalBytesWritten += bytesRead;
327           reportBytes += bytesRead;
328 
329           if (reportBytes >= REPORT_SIZE) {
330             context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
331             context.setStatus(String.format(statusMessage,
332                               StringUtils.humanReadableInt(totalBytesWritten),
333                               (totalBytesWritten/(float)inputFileSize) * 100.0f) +
334                               " from " + inputPath + " to " + outputPath);
335             reportBytes = 0;
336           }
337         }
338         long etime = System.currentTimeMillis();
339 
340         context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
341         context.setStatus(String.format(statusMessage,
342                           StringUtils.humanReadableInt(totalBytesWritten),
343                           (totalBytesWritten/(float)inputFileSize) * 100.0f) +
344                           " from " + inputPath + " to " + outputPath);
345 
346         // Verify that the written size match
347         if (totalBytesWritten != inputFileSize) {
348           String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
349                        " expected=" + inputFileSize + " for file=" + inputPath;
350           throw new IOException(msg);
351         }
352 
353         LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
354         LOG.info("size=" + totalBytesWritten +
355             " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
356             " time=" + StringUtils.formatTimeDiff(etime, stime) +
357             String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
358         context.getCounter(Counter.FILES_COPIED).increment(1);
359       } catch (IOException e) {
360         LOG.error("Error copying " + inputPath + " to " + outputPath, e);
361         context.getCounter(Counter.COPY_FAILED).increment(1);
362         throw e;
363       }
364     }
365 
366     /**
367      * Try to open the "source" file.
368      * Throws an IOException if the communication with the inputFs fail or
369      * if the file is not found.
370      */
371     private FSDataInputStream openSourceFile(Context context, final Path path) throws IOException {
372       try {
373         if (HFileLink.isHFileLink(path) || StoreFileInfo.isReference(path)) {
374           return new HFileLink(inputRoot, inputArchive, path).open(inputFs);
375         } else if (isHLogLinkPath(path)) {
376           String serverName = path.getParent().getName();
377           String logName = path.getName();
378           return new HLogLink(inputRoot, serverName, logName).open(inputFs);
379         }
380         return inputFs.open(path);
381       } catch (IOException e) {
382         context.getCounter(Counter.MISSING_FILES).increment(1);
383         LOG.error("Unable to open source file=" + path, e);
384         throw e;
385       }
386     }
387 
388     private FileStatus getSourceFileStatus(Context context, final Path path) throws IOException {
389       try {
390         if (HFileLink.isHFileLink(path) || StoreFileInfo.isReference(path)) {
391           HFileLink link = new HFileLink(inputRoot, inputArchive, path);
392           return link.getFileStatus(inputFs);
393         } else if (isHLogLinkPath(path)) {
394           String serverName = path.getParent().getName();
395           String logName = path.getName();
396           return new HLogLink(inputRoot, serverName, logName).getFileStatus(inputFs);
397         }
398         return inputFs.getFileStatus(path);
399       } catch (FileNotFoundException e) {
400         context.getCounter(Counter.MISSING_FILES).increment(1);
401         LOG.error("Unable to get the status for source file=" + path, e);
402         throw e;
403       } catch (IOException e) {
404         LOG.error("Unable to get the status for source file=" + path, e);
405         throw e;
406       }
407     }
408 
409     private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
410       try {
411         return fs.getFileChecksum(path);
412       } catch (IOException e) {
413         LOG.warn("Unable to get checksum for file=" + path, e);
414         return null;
415       }
416     }
417 
418     /**
419      * Check if the two files are equal by looking at the file length,
420      * and at the checksum (if user has specified the verifyChecksum flag).
421      */
422     private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
423       // Not matching length
424       if (inputStat.getLen() != outputStat.getLen()) return false;
425 
426       // Mark files as equals, since user asked for no checksum verification
427       if (!verifyChecksum) return true;
428 
429       // If checksums are not available, files are not the same.
430       FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
431       if (inChecksum == null) return false;
432 
433       FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
434       if (outChecksum == null) return false;
435 
436       return inChecksum.equals(outChecksum);
437     }
438 
439     /**
440      * HLog files are encoded as serverName/logName
441      * and since all the other files should be in /hbase/table/..path..
442      * we can rely on the depth, for now.
443      */
444     private static boolean isHLogLinkPath(final Path path) {
445       return path.depth() == 2;
446     }
447   }
448 
449   /**
450    * Extract the list of files (HFiles/HLogs) to copy using Map-Reduce.
451    * @return list of files referenced by the snapshot (pair of path and size)
452    */
453   private List<Pair<Path, Long>> getSnapshotFiles(final FileSystem fs, final Path snapshotDir)
454       throws IOException {
455     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
456 
457     final List<Pair<Path, Long>> files = new ArrayList<Pair<Path, Long>>();
458     final TableName table =
459         TableName.valueOf(snapshotDesc.getTable());
460     final Configuration conf = getConf();
461 
462     // Get snapshot files
463     SnapshotReferenceUtil.visitReferencedFiles(fs, snapshotDir,
464       new SnapshotReferenceUtil.FileVisitor() {
465         public void storeFile (final String region, final String family, final String hfile)
466             throws IOException {
467           Path path = HFileLink.createPath(table, region, family, hfile);
468           long size = new HFileLink(conf, path).getFileStatus(fs).getLen();
469           files.add(new Pair<Path, Long>(path, size));
470         }
471 
472         public void recoveredEdits (final String region, final String logfile)
473             throws IOException {
474           // copied with the snapshot referenecs
475         }
476 
477         public void logFile (final String server, final String logfile)
478             throws IOException {
479           long size = new HLogLink(conf, server, logfile).getFileStatus(fs).getLen();
480           files.add(new Pair<Path, Long>(new Path(server, logfile), size));
481         }
482     });
483 
484     return files;
485   }
486 
487   /**
488    * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
489    * The groups created will have similar amounts of bytes.
490    * <p>
491    * The algorithm used is pretty straightforward; the file list is sorted by size,
492    * and then each group fetch the bigger file available, iterating through groups
493    * alternating the direction.
494    */
495   static List<List<Path>> getBalancedSplits(final List<Pair<Path, Long>> files, int ngroups) {
496     // Sort files by size, from small to big
497     Collections.sort(files, new Comparator<Pair<Path, Long>>() {
498       public int compare(Pair<Path, Long> a, Pair<Path, Long> b) {
499         long r = a.getSecond() - b.getSecond();
500         return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
501       }
502     });
503 
504     // create balanced groups
505     List<List<Path>> fileGroups = new LinkedList<List<Path>>();
506     long[] sizeGroups = new long[ngroups];
507     int hi = files.size() - 1;
508     int lo = 0;
509 
510     List<Path> group;
511     int dir = 1;
512     int g = 0;
513 
514     while (hi >= lo) {
515       if (g == fileGroups.size()) {
516         group = new LinkedList<Path>();
517         fileGroups.add(group);
518       } else {
519         group = fileGroups.get(g);
520       }
521 
522       Pair<Path, Long> fileInfo = files.get(hi--);
523 
524       // add the hi one
525       sizeGroups[g] += fileInfo.getSecond();
526       group.add(fileInfo.getFirst());
527 
528       // change direction when at the end or the beginning
529       g += dir;
530       if (g == ngroups) {
531         dir = -1;
532         g = ngroups - 1;
533       } else if (g < 0) {
534         dir = 1;
535         g = 0;
536       }
537     }
538 
539     if (LOG.isDebugEnabled()) {
540       for (int i = 0; i < sizeGroups.length; ++i) {
541         LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
542       }
543     }
544 
545     return fileGroups;
546   }
547 
548   private static Path getInputFolderPath(Configuration conf)
549       throws IOException, InterruptedException {
550     Path stagingDir = JobUtil.getStagingDir(conf);
551     return new Path(stagingDir, INPUT_FOLDER_PREFIX +
552       String.valueOf(EnvironmentEdgeManager.currentTimeMillis()));
553   }
554 
555   /**
556    * Create the input files, with the path to copy, for the MR job.
557    * Each input files contains n files, and each input file has a similar amount data to copy.
558    * The number of input files created are based on the number of mappers provided as argument
559    * and the number of the files to copy.
560    */
561   private static Path[] createInputFiles(final Configuration conf, final Path inputFolderPath,
562       final List<Pair<Path, Long>> snapshotFiles, int mappers)
563       throws IOException, InterruptedException {
564     FileSystem fs = inputFolderPath.getFileSystem(conf);
565     LOG.debug("Input folder location: " + inputFolderPath);
566 
567     List<List<Path>> splits = getBalancedSplits(snapshotFiles, mappers);
568     Path[] inputFiles = new Path[splits.size()];
569 
570     Text key = new Text();
571     for (int i = 0; i < inputFiles.length; i++) {
572       List<Path> files = splits.get(i);
573       inputFiles[i] = new Path(inputFolderPath, String.format("export-%d.seq", i));
574       SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inputFiles[i],
575         Text.class, NullWritable.class);
576       LOG.debug("Input split: " + i);
577       try {
578         for (Path file: files) {
579           LOG.debug(file.toString());
580           key.set(file.toString());
581           writer.append(key, NullWritable.get());
582         }
583       } finally {
584         writer.close();
585       }
586     }
587 
588     return inputFiles;
589   }
590 
591   /**
592    * Run Map-Reduce Job to perform the files copy.
593    */
594   private void runCopyJob(final Path inputRoot, final Path outputRoot,
595       final List<Pair<Path, Long>> snapshotFiles, final boolean verifyChecksum,
596       final String filesUser, final String filesGroup, final int filesMode,
597       final int mappers, final int bandwidthMB)
598           throws IOException, InterruptedException, ClassNotFoundException {
599     Configuration conf = getConf();
600     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
601     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
602     conf.setInt(CONF_FILES_MODE, filesMode);
603     conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
604     conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
605     conf.set(CONF_INPUT_ROOT, inputRoot.toString());
606     conf.setInt("mapreduce.job.maps", mappers);
607     conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
608 
609     Job job = new Job(conf);
610     job.setJobName("ExportSnapshot");
611     job.setJarByClass(ExportSnapshot.class);
612     TableMapReduceUtil.addDependencyJars(job);
613     job.setMapperClass(ExportMapper.class);
614     job.setInputFormatClass(SequenceFileInputFormat.class);
615     job.setOutputFormatClass(NullOutputFormat.class);
616     job.setMapSpeculativeExecution(false);
617     job.setNumReduceTasks(0);
618 
619     // Create MR Input
620     Path inputFolderPath = getInputFolderPath(conf);
621     for (Path path: createInputFiles(conf, inputFolderPath, snapshotFiles, mappers)) {
622       LOG.debug("Add Input Path=" + path);
623       SequenceFileInputFormat.addInputPath(job, path);
624     }
625 
626     try {
627       // Acquire the delegation Tokens
628       TokenCache.obtainTokensForNamenodes(job.getCredentials(),
629         new Path[] { inputRoot, outputRoot }, conf);
630 
631       // Run the MR Job
632       if (!job.waitForCompletion(true)) {
633         // TODO: Replace the fixed string with job.getStatus().getFailureInfo()
634         // when it will be available on all the supported versions.
635         throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
636       }
637     } finally {
638       // Remove MR Input
639       try {
640         inputFolderPath.getFileSystem(conf).delete(inputFolderPath, true);
641       } catch (IOException e) {
642         LOG.warn("Unable to remove MR input folder: " + inputFolderPath, e);
643       }
644     }
645   }
646 
647   private void verifySnapshot(final Configuration baseConf,
648       final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
649     // Update the conf with the current root dir, since may be a different cluster
650     Configuration conf = new Configuration(baseConf);
651     FSUtils.setRootDir(conf, rootDir);
652     FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
653     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
654     SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
655   }
656 
657   /**
658    * Execute the export snapshot by copying the snapshot metadata, hfiles and hlogs.
659    * @return 0 on success, and != 0 upon failure.
660    */
661   @Override
662   public int run(String[] args) throws IOException {
663     boolean verifyTarget = true;
664     boolean verifyChecksum = true;
665     String snapshotName = null;
666     String targetName = null;
667     boolean overwrite = false;
668     String filesGroup = null;
669     String filesUser = null;
670     Path outputRoot = null;
671     int bandwidthMB = Integer.MAX_VALUE;
672     int filesMode = 0;
673     int mappers = 0;
674 
675     Configuration conf = getConf();
676 
677     // Process command line args
678     for (int i = 0; i < args.length; i++) {
679       String cmd = args[i];
680       try {
681         if (cmd.equals("-snapshot")) {
682           snapshotName = args[++i];
683         } else if (cmd.equals("-target")) {
684           targetName = args[++i];
685         } else if (cmd.equals("-copy-to")) {
686           outputRoot = new Path(args[++i]);
687         } else if (cmd.equals("-copy-from")) {
688           Path sourceDir = new Path(args[++i]);
689           URI defaultFs = sourceDir.getFileSystem(conf).getUri();
690           FSUtils.setFsDefault(conf, new Path(defaultFs));
691           FSUtils.setRootDir(conf, sourceDir);
692         } else if (cmd.equals("-no-checksum-verify")) {
693           verifyChecksum = false;
694         } else if (cmd.equals("-no-target-verify")) {
695           verifyTarget = false;
696         } else if (cmd.equals("-mappers")) {
697           mappers = Integer.parseInt(args[++i]);
698         } else if (cmd.equals("-chuser")) {
699           filesUser = args[++i];
700         } else if (cmd.equals("-chgroup")) {
701           filesGroup = args[++i];
702         } else if (cmd.equals("-bandwidth")) {
703           bandwidthMB = Integer.parseInt(args[++i]);
704         } else if (cmd.equals("-chmod")) {
705           filesMode = Integer.parseInt(args[++i], 8);
706         } else if (cmd.equals("-overwrite")) {
707           overwrite = true;
708         } else if (cmd.equals("-h") || cmd.equals("--help")) {
709           printUsageAndExit();
710         } else {
711           System.err.println("UNEXPECTED: " + cmd);
712           printUsageAndExit();
713         }
714       } catch (Exception e) {
715         printUsageAndExit();
716       }
717     }
718 
719     // Check user options
720     if (snapshotName == null) {
721       System.err.println("Snapshot name not provided.");
722       printUsageAndExit();
723     }
724 
725     if (outputRoot == null) {
726       System.err.println("Destination file-system not provided.");
727       printUsageAndExit();
728     }
729 
730     if (targetName == null) {
731       targetName = snapshotName;
732     }
733 
734     Path inputRoot = FSUtils.getRootDir(conf);
735     FileSystem inputFs = FileSystem.get(inputRoot.toUri(), conf);
736     LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
737     FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf);
738     LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
739 
740     boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
741 
742     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
743     Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
744     Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
745     Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
746 
747     // Check if the snapshot already exists
748     if (outputFs.exists(outputSnapshotDir)) {
749       if (overwrite) {
750         if (!outputFs.delete(outputSnapshotDir, true)) {
751           System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
752           return 1;
753         }
754       } else {
755         System.err.println("The snapshot '" + targetName +
756           "' already exists in the destination: " + outputSnapshotDir);
757         return 1;
758       }
759     }
760 
761     if (!skipTmp) {
762       // Check if the snapshot already in-progress
763       if (outputFs.exists(snapshotTmpDir)) {
764         if (overwrite) {
765           if (!outputFs.delete(snapshotTmpDir, true)) {
766             System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
767             return 1;
768           }
769         } else {
770           System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
771           System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
772           System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
773           return 1;
774         }
775       }
776     }
777 
778     // Step 0 - Extract snapshot files to copy
779     LOG.info("Loading Snapshot hfile list");
780     final List<Pair<Path, Long>> files = getSnapshotFiles(inputFs, snapshotDir);
781     if (mappers == 0 && files.size() > 0) {
782       mappers = 1 + (files.size() / conf.getInt(CONF_MAP_GROUP, 10));
783       mappers = Math.min(mappers, files.size());
784     }
785 
786     // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
787     // The snapshot references must be copied before the hfiles otherwise the cleaner
788     // will remove them because they are unreferenced.
789     try {
790       LOG.info("Copy Snapshot Manifest");
791       FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
792     } catch (IOException e) {
793       throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
794         snapshotDir + " to=" + initialOutputSnapshotDir, e);
795     }
796 
797     // Write a new .snapshotinfo if the target name is different from the source name
798     if (!targetName.equals(snapshotName)) {
799       SnapshotDescription snapshotDesc =
800         SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
801           .toBuilder()
802           .setName(targetName)
803           .build();
804       SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
805     }
806 
807     // Step 2 - Start MR Job to copy files
808     // The snapshot references must be copied before the files otherwise the files gets removed
809     // by the HFileArchiver, since they have no references.
810     try {
811       if (files.size() == 0) {
812         LOG.warn("There are 0 store file to be copied. There may be no data in the table.");
813       } else {
814         runCopyJob(inputRoot, outputRoot, files, verifyChecksum,
815                    filesUser, filesGroup, filesMode, mappers, bandwidthMB);
816       }
817 
818 
819       LOG.info("Finalize the Snapshot Export");
820       if (!skipTmp) {
821         // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
822         if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
823           throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
824             snapshotTmpDir + " to=" + outputSnapshotDir);
825         }
826       }
827 
828       // Step 4 - Verify snapshot integrity
829       if (verifyTarget) {
830         LOG.info("Verify snapshot integrity");
831         verifySnapshot(conf, outputFs, outputRoot, outputSnapshotDir);
832       }
833 
834       LOG.info("Export Completed: " + targetName);
835       return 0;
836     } catch (Exception e) {
837       LOG.error("Snapshot export failed", e);
838       if (!skipTmp) {
839         outputFs.delete(snapshotTmpDir, true);
840       }
841       outputFs.delete(outputSnapshotDir, true);
842       return 1;
843     }
844   }
845 
846   // ExportSnapshot
847   private void printUsageAndExit() {
848     System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
849     System.err.println(" where [options] are:");
850     System.err.println("  -h|-help                Show this help and exit.");
851     System.err.println("  -snapshot NAME          Snapshot to restore.");
852     System.err.println("  -copy-to NAME           Remote destination hdfs://");
853     System.err.println("  -copy-from NAME         Input folder hdfs:// (default hbase.rootdir)");
854     System.err.println("  -no-checksum-verify     Do not verify checksum, use name+length only.");
855     System.err.println("  -no-target-verify       Do not verify the integrity of the \\" +
856         "exported snapshot.");
857     System.err.println("  -overwrite              Rewrite the snapshot manifest if already exists");
858     System.err.println("  -chuser USERNAME        Change the owner of the files to the specified one.");
859     System.err.println("  -chgroup GROUP          Change the group of the files to the specified one.");
860     System.err.println("  -chmod MODE             Change the permission of the files to the specified one.");
861     System.err.println("  -mappers                Number of mappers to use during the copy (mapreduce.job.maps).");
862     System.err.println();
863     System.err.println("Examples:");
864     System.err.println("  hbase " + getClass().getName() + " \\");
865     System.err.println("    -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
866     System.err.println("    -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
867     System.err.println();
868     System.err.println("  hbase " + getClass().getName() + " \\");
869     System.err.println("    -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
870     System.err.println("    -copy-to hdfs://srv1:50070/hbase \\");
871     System.exit(1);
872   }
873 
874   /**
875    * The guts of the {@link #main} method.
876    * Call this method to avoid the {@link #main(String[])} System.exit.
877    * @param args
878    * @return errCode
879    * @throws Exception
880    */
881   static int innerMain(final Configuration conf, final String [] args) throws Exception {
882     return ToolRunner.run(conf, new ExportSnapshot(), args);
883   }
884 
885   public static void main(String[] args) throws Exception {
886     System.exit(innerMain(HBaseConfiguration.create(), args));
887   }
888 }