View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInputStream;
23  import java.io.EOFException;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InterruptedIOException;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.net.InetSocketAddress;
31  import java.net.URI;
32  import java.net.URISyntaxException;
33  import java.util.ArrayList;
34  import java.util.Collections;
35  import java.util.HashMap;
36  import java.util.LinkedList;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.concurrent.ArrayBlockingQueue;
40  import java.util.concurrent.ConcurrentHashMap;
41  import java.util.concurrent.ThreadPoolExecutor;
42  import java.util.concurrent.TimeUnit;
43  import java.util.regex.Pattern;
44  
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.classification.InterfaceAudience;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.BlockLocation;
50  import org.apache.hadoop.fs.FSDataInputStream;
51  import org.apache.hadoop.fs.FSDataOutputStream;
52  import org.apache.hadoop.fs.FileStatus;
53  import org.apache.hadoop.fs.FileSystem;
54  import org.apache.hadoop.fs.Path;
55  import org.apache.hadoop.fs.PathFilter;
56  import org.apache.hadoop.fs.permission.FsAction;
57  import org.apache.hadoop.fs.permission.FsPermission;
58  import org.apache.hadoop.hbase.ClusterId;
59  import org.apache.hadoop.hbase.HColumnDescriptor;
60  import org.apache.hadoop.hbase.HConstants;
61  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
62  import org.apache.hadoop.hbase.HRegionInfo;
63  import org.apache.hadoop.hbase.RemoteExceptionHandler;
64  import org.apache.hadoop.hbase.TableName;
65  import org.apache.hadoop.hbase.exceptions.DeserializationException;
66  import org.apache.hadoop.hbase.fs.HFileSystem;
67  import org.apache.hadoop.hbase.master.HMaster;
68  import org.apache.hadoop.hbase.master.RegionPlacementMaintainer;
69  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
70  import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
71  import org.apache.hadoop.hbase.regionserver.HRegion;
72  import org.apache.hadoop.hdfs.DistributedFileSystem;
73  import org.apache.hadoop.hdfs.protocol.FSConstants;
74  import org.apache.hadoop.io.IOUtils;
75  import org.apache.hadoop.io.SequenceFile;
76  import org.apache.hadoop.security.AccessControlException;
77  import org.apache.hadoop.security.UserGroupInformation;
78  import org.apache.hadoop.util.Progressable;
79  import org.apache.hadoop.util.ReflectionUtils;
80  import org.apache.hadoop.util.StringUtils;
81  
82  import com.google.common.primitives.Ints;
83  import com.google.protobuf.InvalidProtocolBufferException;
84  
85  /**
86   * Utility methods for interacting with the underlying file system.
87   */
88  @InterfaceAudience.Private
89  public abstract class FSUtils {
90    private static final Log LOG = LogFactory.getLog(FSUtils.class);
91  
92    /** Full access permissions (starting point for a umask) */
93    public static final String FULL_RWX_PERMISSIONS = "777";
94    private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
95    private static final int DEFAULT_THREAD_POOLSIZE = 2;
96  
97    /** Set to true on Windows platforms */
98    public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
99  
100   protected FSUtils() {
101     super();
102   }
103 
104   /**
105    * Compare of path component. Does not consider schema; i.e. if schemas different but <code>path
106    * <code> starts with <code>rootPath<code>, then the function returns true
107    * @param rootPath
108    * @param path
109    * @return True if <code>path</code> starts with <code>rootPath</code>
110    */
111   public static boolean isStartingWithPath(final Path rootPath, final String path) {
112     String uriRootPath = rootPath.toUri().getPath();
113     String tailUriPath = (new Path(path)).toUri().getPath();
114     return tailUriPath.startsWith(uriRootPath);
115   }
116 
117   /**
118    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
119    * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
120    * the two will equate.
121    * @param pathToSearch Path we will be trying to match.
122    * @param pathTail
123    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
124    */
125   public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
126     return isMatchingTail(pathToSearch, new Path(pathTail));
127   }
128 
129   /**
130    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
131    * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
132    * schema; i.e. if schemas different but path or subpath matches, the two will equate.
133    * @param pathToSearch Path we will be trying to match.
134    * @param pathTail
135    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
136    */
137   public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
138     if (pathToSearch.depth() != pathTail.depth()) return false;
139     Path tailPath = pathTail;
140     String tailName;
141     Path toSearch = pathToSearch;
142     String toSearchName;
143     boolean result = false;
144     do {
145       tailName = tailPath.getName();
146       if (tailName == null || tailName.length() <= 0) {
147         result = true;
148         break;
149       }
150       toSearchName = toSearch.getName();
151       if (toSearchName == null || toSearchName.length() <= 0) break;
152       // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
153       tailPath = tailPath.getParent();
154       toSearch = toSearch.getParent();
155     } while(tailName.equals(toSearchName));
156     return result;
157   }
158 
159   public static FSUtils getInstance(FileSystem fs, Configuration conf) {
160     String scheme = fs.getUri().getScheme();
161     if (scheme == null) {
162       LOG.warn("Could not find scheme for uri " +
163           fs.getUri() + ", default to hdfs");
164       scheme = "hdfs";
165     }
166     Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
167         scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
168     FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
169     return fsUtils;
170   }
171 
172   /**
173    * Delete if exists.
174    * @param fs filesystem object
175    * @param dir directory to delete
176    * @return True if deleted <code>dir</code>
177    * @throws IOException e
178    */
179   public static boolean deleteDirectory(final FileSystem fs, final Path dir)
180   throws IOException {
181     return fs.exists(dir) && fs.delete(dir, true);
182   }
183 
184   /**
185    * Return the number of bytes that large input files should be optimally
186    * be split into to minimize i/o time.
187    *
188    * use reflection to search for getDefaultBlockSize(Path f)
189    * if the method doesn't exist, fall back to using getDefaultBlockSize()
190    *
191    * @param fs filesystem object
192    * @return the default block size for the path's filesystem
193    * @throws IOException e
194    */
195   public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
196     Method m = null;
197     Class<? extends FileSystem> cls = fs.getClass();
198     try {
199       m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
200     } catch (NoSuchMethodException e) {
201       LOG.info("FileSystem doesn't support getDefaultBlockSize");
202     } catch (SecurityException e) {
203       LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
204       m = null; // could happen on setAccessible()
205     }
206     if (m == null) {
207       return fs.getDefaultBlockSize();
208     } else {
209       try {
210         Object ret = m.invoke(fs, path);
211         return ((Long)ret).longValue();
212       } catch (Exception e) {
213         throw new IOException(e);
214       }
215     }
216   }
217 
218   /*
219    * Get the default replication.
220    *
221    * use reflection to search for getDefaultReplication(Path f)
222    * if the method doesn't exist, fall back to using getDefaultReplication()
223    *
224    * @param fs filesystem object
225    * @param f path of file
226    * @return default replication for the path's filesystem
227    * @throws IOException e
228    */
229   public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
230     Method m = null;
231     Class<? extends FileSystem> cls = fs.getClass();
232     try {
233       m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
234     } catch (NoSuchMethodException e) {
235       LOG.info("FileSystem doesn't support getDefaultReplication");
236     } catch (SecurityException e) {
237       LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
238       m = null; // could happen on setAccessible()
239     }
240     if (m == null) {
241       return fs.getDefaultReplication();
242     } else {
243       try {
244         Object ret = m.invoke(fs, path);
245         return ((Number)ret).shortValue();
246       } catch (Exception e) {
247         throw new IOException(e);
248       }
249     }
250   }
251 
252   /**
253    * Returns the default buffer size to use during writes.
254    *
255    * The size of the buffer should probably be a multiple of hardware
256    * page size (4096 on Intel x86), and it determines how much data is
257    * buffered during read and write operations.
258    *
259    * @param fs filesystem object
260    * @return default buffer size to use during writes
261    */
262   public static int getDefaultBufferSize(final FileSystem fs) {
263     return fs.getConf().getInt("io.file.buffer.size", 4096);
264   }
265 
266   /**
267    * Create the specified file on the filesystem. By default, this will:
268    * <ol>
269    * <li>overwrite the file if it exists</li>
270    * <li>apply the umask in the configuration (if it is enabled)</li>
271    * <li>use the fs configured buffer size (or 4096 if not set)</li>
272    * <li>use the default replication</li>
273    * <li>use the default block size</li>
274    * <li>not track progress</li>
275    * </ol>
276    *
277    * @param fs {@link FileSystem} on which to write the file
278    * @param path {@link Path} to the file to write
279    * @param perm permissions
280    * @param favoredNodes
281    * @return output stream to the created file
282    * @throws IOException if the file cannot be created
283    */
284   public static FSDataOutputStream create(FileSystem fs, Path path,
285       FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
286     if (fs instanceof HFileSystem) {
287       FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
288       if (backingFs instanceof DistributedFileSystem) {
289         // Try to use the favoredNodes version via reflection to allow backwards-
290         // compatibility.
291         try {
292           return (FSDataOutputStream) (DistributedFileSystem.class
293               .getDeclaredMethod("create", Path.class, FsPermission.class,
294                   boolean.class, int.class, short.class, long.class,
295                   Progressable.class, InetSocketAddress[].class)
296                   .invoke(backingFs, path, perm, true,
297                       getDefaultBufferSize(backingFs),
298                       getDefaultReplication(backingFs, path),
299                       getDefaultBlockSize(backingFs, path),
300                       null, favoredNodes));
301         } catch (InvocationTargetException ite) {
302           // Function was properly called, but threw it's own exception.
303           throw new IOException(ite.getCause());
304         } catch (NoSuchMethodException e) {
305           LOG.debug("DFS Client does not support most favored nodes create; using default create");
306           if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
307         } catch (IllegalArgumentException e) {
308           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
309         } catch (SecurityException e) {
310           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
311         } catch (IllegalAccessException e) {
312           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
313         }
314       }
315     }
316     return create(fs, path, perm, true);
317   }
318 
319   /**
320    * Create the specified file on the filesystem. By default, this will:
321    * <ol>
322    * <li>apply the umask in the configuration (if it is enabled)</li>
323    * <li>use the fs configured buffer size (or 4096 if not set)</li>
324    * <li>use the default replication</li>
325    * <li>use the default block size</li>
326    * <li>not track progress</li>
327    * </ol>
328    *
329    * @param fs {@link FileSystem} on which to write the file
330    * @param path {@link Path} to the file to write
331    * @param perm
332    * @param overwrite Whether or not the created file should be overwritten.
333    * @return output stream to the created file
334    * @throws IOException if the file cannot be created
335    */
336   public static FSDataOutputStream create(FileSystem fs, Path path,
337       FsPermission perm, boolean overwrite) throws IOException {
338     if (LOG.isTraceEnabled()) {
339       LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
340     }
341     return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
342         getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
343   }
344 
345   /**
346    * Get the file permissions specified in the configuration, if they are
347    * enabled.
348    *
349    * @param fs filesystem that the file will be created on.
350    * @param conf configuration to read for determining if permissions are
351    *          enabled and which to use
352    * @param permssionConfKey property key in the configuration to use when
353    *          finding the permission
354    * @return the permission to use when creating a new file on the fs. If
355    *         special permissions are not specified in the configuration, then
356    *         the default permissions on the the fs will be returned.
357    */
358   public static FsPermission getFilePermissions(final FileSystem fs,
359       final Configuration conf, final String permssionConfKey) {
360     boolean enablePermissions = conf.getBoolean(
361         HConstants.ENABLE_DATA_FILE_UMASK, false);
362 
363     if (enablePermissions) {
364       try {
365         FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
366         // make sure that we have a mask, if not, go default.
367         String mask = conf.get(permssionConfKey);
368         if (mask == null)
369           return getFileDefault();
370         // appy the umask
371         FsPermission umask = new FsPermission(mask);
372         return perm.applyUMask(umask);
373       } catch (IllegalArgumentException e) {
374         LOG.warn(
375             "Incorrect umask attempted to be created: "
376                 + conf.get(permssionConfKey)
377                 + ", using default file permissions.", e);
378         return getFileDefault();
379       }
380     }
381     return getFileDefault();
382   }
383 
384   /**
385    * Get the default permission for file.
386    * This is the same method as FsPermission.getFileDefault() in Hadoop 2.
387    * We provide the method here to support compatibility with Hadoop 1.
388    * See HBASE-11061.  Would be better to do this as Interface in hadoop-compat
389    * w/ hadoop1 and hadoop2 implementations but punting on this since small
390    * risk this will change in 0.96/0.98 timeframe (only committed to these
391    * branches).
392    */
393   public static FsPermission getFileDefault() {
394     return new FsPermission((short)00666);
395   }
396 
397   /**
398    * Checks to see if the specified file system is available
399    *
400    * @param fs filesystem
401    * @throws IOException e
402    */
403   public static void checkFileSystemAvailable(final FileSystem fs)
404   throws IOException {
405     if (!(fs instanceof DistributedFileSystem)) {
406       return;
407     }
408     IOException exception = null;
409     DistributedFileSystem dfs = (DistributedFileSystem) fs;
410     try {
411       if (dfs.exists(new Path("/"))) {
412         return;
413       }
414     } catch (IOException e) {
415       exception = RemoteExceptionHandler.checkIOException(e);
416     }
417     try {
418       fs.close();
419     } catch (Exception e) {
420       LOG.error("file system close failed: ", e);
421     }
422     IOException io = new IOException("File system is not available");
423     io.initCause(exception);
424     throw io;
425   }
426 
427   /**
428    * We use reflection because {@link DistributedFileSystem#setSafeMode(
429    * FSConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
430    *
431    * @param dfs
432    * @return whether we're in safe mode
433    * @throws IOException
434    */
435   private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
436     boolean inSafeMode = false;
437     try {
438       Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
439           org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.class, boolean.class});
440       inSafeMode = (Boolean) m.invoke(dfs,
441         org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_GET, true);
442     } catch (Exception e) {
443       if (e instanceof IOException) throw (IOException) e;
444 
445       // Check whether dfs is on safemode.
446       inSafeMode = dfs.setSafeMode(
447         org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_GET);
448     }
449     return inSafeMode;
450   }
451 
452   /**
453    * Check whether dfs is in safemode.
454    * @param conf
455    * @throws IOException
456    */
457   public static void checkDfsSafeMode(final Configuration conf)
458   throws IOException {
459     boolean isInSafeMode = false;
460     FileSystem fs = FileSystem.get(conf);
461     if (fs instanceof DistributedFileSystem) {
462       DistributedFileSystem dfs = (DistributedFileSystem)fs;
463       isInSafeMode = isInSafeMode(dfs);
464     }
465     if (isInSafeMode) {
466       throw new IOException("File system is in safemode, it can't be written now");
467     }
468   }
469 
470   /**
471    * Verifies current version of file system
472    *
473    * @param fs filesystem object
474    * @param rootdir root hbase directory
475    * @return null if no version file exists, version string otherwise.
476    * @throws IOException e
477    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
478    */
479   public static String getVersion(FileSystem fs, Path rootdir)
480   throws IOException, DeserializationException {
481     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
482     FileStatus[] status = null;
483     try {
484       // hadoop 2.0 throws FNFE if directory does not exist.
485       // hadoop 1.0 returns null if directory does not exist.
486       status = fs.listStatus(versionFile);
487     } catch (FileNotFoundException fnfe) {
488       return null;
489     }
490     if (status == null || status.length == 0) return null;
491     String version = null;
492     byte [] content = new byte [(int)status[0].getLen()];
493     FSDataInputStream s = fs.open(versionFile);
494     try {
495       IOUtils.readFully(s, content, 0, content.length);
496       if (ProtobufUtil.isPBMagicPrefix(content)) {
497         version = parseVersionFrom(content);
498       } else {
499         // Presume it pre-pb format.
500         InputStream is = new ByteArrayInputStream(content);
501         DataInputStream dis = new DataInputStream(is);
502         try {
503           version = dis.readUTF();
504         } finally {
505           dis.close();
506         }
507         // Update the format
508         LOG.info("Updating the hbase.version file format with version=" + version);
509         setVersion(fs, rootdir, version, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
510       }
511     } catch (EOFException eof) {
512       LOG.warn("Version file was empty, odd, will try to set it.");
513     } finally {
514       s.close();
515     }
516     return version;
517   }
518 
519   /**
520    * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
521    * @param bytes The byte content of the hbase.version file.
522    * @return The version found in the file as a String.
523    * @throws DeserializationException
524    */
525   static String parseVersionFrom(final byte [] bytes)
526   throws DeserializationException {
527     ProtobufUtil.expectPBMagicPrefix(bytes);
528     int pblen = ProtobufUtil.lengthOfPBMagic();
529     FSProtos.HBaseVersionFileContent.Builder builder =
530       FSProtos.HBaseVersionFileContent.newBuilder();
531     FSProtos.HBaseVersionFileContent fileContent;
532     try {
533       fileContent = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
534       return fileContent.getVersion();
535     } catch (InvalidProtocolBufferException e) {
536       // Convert
537       throw new DeserializationException(e);
538     }
539   }
540 
541   /**
542    * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
543    * @param version Version to persist
544    * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
545    */
546   static byte [] toVersionByteArray(final String version) {
547     FSProtos.HBaseVersionFileContent.Builder builder =
548       FSProtos.HBaseVersionFileContent.newBuilder();
549     return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
550   }
551 
552   /**
553    * Verifies current version of file system
554    *
555    * @param fs file system
556    * @param rootdir root directory of HBase installation
557    * @param message if true, issues a message on System.out
558    *
559    * @throws IOException e
560    * @throws DeserializationException
561    */
562   public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
563   throws IOException, DeserializationException {
564     checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
565   }
566 
567   /**
568    * Verifies current version of file system
569    *
570    * @param fs file system
571    * @param rootdir root directory of HBase installation
572    * @param message if true, issues a message on System.out
573    * @param wait wait interval
574    * @param retries number of times to retry
575    *
576    * @throws IOException e
577    * @throws DeserializationException
578    */
579   public static void checkVersion(FileSystem fs, Path rootdir,
580       boolean message, int wait, int retries)
581   throws IOException, DeserializationException {
582     String version = getVersion(fs, rootdir);
583     if (version == null) {
584       if (!metaRegionExists(fs, rootdir)) {
585         // rootDir is empty (no version file and no root region)
586         // just create new version file (HBASE-1195)
587         setVersion(fs, rootdir, wait, retries);
588         return;
589       }
590     } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) return;
591 
592     // version is deprecated require migration
593     // Output on stdout so user sees it in terminal.
594     String msg = "HBase file layout needs to be upgraded."
595       + "  You have version " + version
596       + " and I want version " + HConstants.FILE_SYSTEM_VERSION
597       + ".  Is your hbase.rootdir valid?  If so, you may need to run "
598       + "'hbase hbck -fixVersionFile'.";
599     if (message) {
600       System.out.println("WARNING! " + msg);
601     }
602     throw new FileSystemVersionException(msg);
603   }
604 
605   /**
606    * Sets version of file system
607    *
608    * @param fs filesystem object
609    * @param rootdir hbase root
610    * @throws IOException e
611    */
612   public static void setVersion(FileSystem fs, Path rootdir)
613   throws IOException {
614     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
615       HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
616   }
617 
618   /**
619    * Sets version of file system
620    *
621    * @param fs filesystem object
622    * @param rootdir hbase root
623    * @param wait time to wait for retry
624    * @param retries number of times to retry before failing
625    * @throws IOException e
626    */
627   public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
628   throws IOException {
629     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
630   }
631 
632 
633   /**
634    * Sets version of file system
635    *
636    * @param fs filesystem object
637    * @param rootdir hbase root directory
638    * @param version version to set
639    * @param wait time to wait for retry
640    * @param retries number of times to retry before throwing an IOException
641    * @throws IOException e
642    */
643   public static void setVersion(FileSystem fs, Path rootdir, String version,
644       int wait, int retries) throws IOException {
645     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
646     while (true) {
647       try {
648         FSDataOutputStream s = fs.create(versionFile);
649         s.write(toVersionByteArray(version));
650         s.close();
651         LOG.debug("Created version file at " + rootdir.toString() + " with version=" + version);
652         return;
653       } catch (IOException e) {
654         if (retries > 0) {
655           LOG.warn("Unable to create version file at " + rootdir.toString() + ", retrying", e);
656           fs.delete(versionFile, false);
657           try {
658             if (wait > 0) {
659               Thread.sleep(wait);
660             }
661           } catch (InterruptedException ex) {
662             // ignore
663           }
664           retries--;
665         } else {
666           throw e;
667         }
668       }
669     }
670   }
671 
672   /**
673    * Checks that a cluster ID file exists in the HBase root directory
674    * @param fs the root directory FileSystem
675    * @param rootdir the HBase root directory in HDFS
676    * @param wait how long to wait between retries
677    * @return <code>true</code> if the file exists, otherwise <code>false</code>
678    * @throws IOException if checking the FileSystem fails
679    */
680   public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
681       int wait) throws IOException {
682     while (true) {
683       try {
684         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
685         return fs.exists(filePath);
686       } catch (IOException ioe) {
687         if (wait > 0) {
688           LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
689               ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
690           try {
691             Thread.sleep(wait);
692           } catch (InterruptedException ie) {
693             throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
694           }
695         } else {
696           throw ioe;
697         }
698       }
699     }
700   }
701 
702   /**
703    * Returns the value of the unique cluster ID stored for this HBase instance.
704    * @param fs the root directory FileSystem
705    * @param rootdir the path to the HBase root directory
706    * @return the unique cluster identifier
707    * @throws IOException if reading the cluster ID file fails
708    */
709   public static ClusterId getClusterId(FileSystem fs, Path rootdir)
710   throws IOException {
711     Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
712     ClusterId clusterId = null;
713     FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
714     if (status != null) {
715       int len = Ints.checkedCast(status.getLen());
716       byte [] content = new byte[len];
717       FSDataInputStream in = fs.open(idPath);
718       try {
719         in.readFully(content);
720       } catch (EOFException eof) {
721         LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
722       } finally{
723         in.close();
724       }
725       try {
726         clusterId = ClusterId.parseFrom(content);
727       } catch (DeserializationException e) {
728         throw new IOException("content=" + Bytes.toString(content), e);
729       }
730       // If not pb'd, make it so.
731       if (!ProtobufUtil.isPBMagicPrefix(content)) {
732         String cid = null;
733         in = fs.open(idPath);
734         try {
735           cid = in.readUTF();
736           clusterId = new ClusterId(cid);
737         } catch (EOFException eof) {
738           LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
739         } finally {
740           in.close();
741         }
742         rewriteAsPb(fs, rootdir, idPath, clusterId);
743       }
744       return clusterId;
745     } else {
746       LOG.warn("Cluster ID file does not exist at " + idPath.toString());
747     }
748     return clusterId;
749   }
750 
751   /**
752    * @param cid
753    * @throws IOException
754    */
755   private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
756       final ClusterId cid)
757   throws IOException {
758     // Rewrite the file as pb.  Move aside the old one first, write new
759     // then delete the moved-aside file.
760     Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
761     if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
762     setClusterId(fs, rootdir, cid, 100);
763     if (!fs.delete(movedAsideName, false)) {
764       throw new IOException("Failed delete of " + movedAsideName);
765     }
766     LOG.debug("Rewrote the hbase.id file as pb");
767   }
768 
769   /**
770    * Writes a new unique identifier for this cluster to the "hbase.id" file
771    * in the HBase root directory
772    * @param fs the root directory FileSystem
773    * @param rootdir the path to the HBase root directory
774    * @param clusterId the unique identifier to store
775    * @param wait how long (in milliseconds) to wait between retries
776    * @throws IOException if writing to the FileSystem fails and no wait value
777    */
778   public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
779       int wait) throws IOException {
780     while (true) {
781       try {
782         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
783         FSDataOutputStream s = fs.create(filePath);
784         try {
785           s.write(clusterId.toByteArray());
786         } finally {
787           s.close();
788         }
789         if (LOG.isDebugEnabled()) {
790           LOG.debug("Created cluster ID file at " + filePath.toString() + " with ID: " + clusterId);
791         }
792         return;
793       } catch (IOException ioe) {
794         if (wait > 0) {
795           LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
796               ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
797           try {
798             Thread.sleep(wait);
799           } catch (InterruptedException ie) {
800             Thread.currentThread().interrupt();
801             break;
802           }
803         } else {
804           throw ioe;
805         }
806       }
807     }
808   }
809 
810   /**
811    * Verifies root directory path is a valid URI with a scheme
812    *
813    * @param root root directory path
814    * @return Passed <code>root</code> argument.
815    * @throws IOException if not a valid URI with a scheme
816    */
817   public static Path validateRootPath(Path root) throws IOException {
818     try {
819       URI rootURI = new URI(root.toString());
820       String scheme = rootURI.getScheme();
821       if (scheme == null) {
822         throw new IOException("Root directory does not have a scheme");
823       }
824       return root;
825     } catch (URISyntaxException e) {
826       IOException io = new IOException("Root directory path is not a valid " +
827         "URI -- check your " + HConstants.HBASE_DIR + " configuration");
828       io.initCause(e);
829       throw io;
830     }
831   }
832 
833   /**
834    * Checks for the presence of the root path (using the provided conf object) in the given path. If
835    * it exists, this method removes it and returns the String representation of remaining relative path.
836    * @param path
837    * @param conf
838    * @return String representation of the remaining relative path
839    * @throws IOException
840    */
841   public static String removeRootPath(Path path, final Configuration conf) throws IOException {
842     Path root = FSUtils.getRootDir(conf);
843     String pathStr = path.toString();
844     // check that the path is absolute... it has the root path in it.
845     if (!pathStr.startsWith(root.toString())) return pathStr;
846     // if not, return as it is.
847     return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
848   }
849 
850   /**
851    * If DFS, check safe mode and if so, wait until we clear it.
852    * @param conf configuration
853    * @param wait Sleep between retries
854    * @throws IOException e
855    */
856   public static void waitOnSafeMode(final Configuration conf,
857     final long wait)
858   throws IOException {
859     FileSystem fs = FileSystem.get(conf);
860     if (!(fs instanceof DistributedFileSystem)) return;
861     DistributedFileSystem dfs = (DistributedFileSystem)fs;
862     // Make sure dfs is not in safe mode
863     while (isInSafeMode(dfs)) {
864       LOG.info("Waiting for dfs to exit safe mode...");
865       try {
866         Thread.sleep(wait);
867       } catch (InterruptedException e) {
868         //continue
869       }
870     }
871   }
872 
873   /**
874    * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
875    * method returns the 'path' component of a Path's URI: e.g. If a Path is
876    * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
877    * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
878    * This method is useful if you want to print out a Path without qualifying
879    * Filesystem instance.
880    * @param p Filesystem Path whose 'path' component we are to return.
881    * @return Path portion of the Filesystem
882    */
883   public static String getPath(Path p) {
884     return p.toUri().getPath();
885   }
886 
887   /**
888    * @param c configuration
889    * @return Path to hbase root directory: i.e. <code>hbase.rootdir</code> from
890    * configuration as a qualified Path.
891    * @throws IOException e
892    */
893   public static Path getRootDir(final Configuration c) throws IOException {
894     Path p = new Path(c.get(HConstants.HBASE_DIR));
895     FileSystem fs = p.getFileSystem(c);
896     return p.makeQualified(fs);
897   }
898 
899   public static void setRootDir(final Configuration c, final Path root) throws IOException {
900     c.set(HConstants.HBASE_DIR, root.toString());
901   }
902 
903   public static void setFsDefault(final Configuration c, final Path root) throws IOException {
904     c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
905     c.set("fs.default.name", root.toString()); // for hadoop 0.20
906   }
907 
908   /**
909    * Checks if meta region exists
910    *
911    * @param fs file system
912    * @param rootdir root directory of HBase installation
913    * @return true if exists
914    * @throws IOException e
915    */
916   @SuppressWarnings("deprecation")
917   public static boolean metaRegionExists(FileSystem fs, Path rootdir)
918   throws IOException {
919     Path metaRegionDir =
920       HRegion.getRegionDir(rootdir, HRegionInfo.FIRST_META_REGIONINFO);
921     return fs.exists(metaRegionDir);
922   }
923 
924   /**
925    * Compute HDFS blocks distribution of a given file, or a portion of the file
926    * @param fs file system
927    * @param status file status of the file
928    * @param start start position of the portion
929    * @param length length of the portion
930    * @return The HDFS blocks distribution
931    */
932   static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
933     final FileSystem fs, FileStatus status, long start, long length)
934     throws IOException {
935     HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
936     BlockLocation [] blockLocations =
937       fs.getFileBlockLocations(status, start, length);
938     for(BlockLocation bl : blockLocations) {
939       String [] hosts = bl.getHosts();
940       long len = bl.getLength();
941       blocksDistribution.addHostsAndBlockWeight(hosts, len);
942     }
943 
944     return blocksDistribution;
945   }
946 
947 
948 
949   /**
950    * Runs through the hbase rootdir and checks all stores have only
951    * one file in them -- that is, they've been major compacted.  Looks
952    * at root and meta tables too.
953    * @param fs filesystem
954    * @param hbaseRootDir hbase root directory
955    * @return True if this hbase install is major compacted.
956    * @throws IOException e
957    */
958   public static boolean isMajorCompacted(final FileSystem fs,
959       final Path hbaseRootDir)
960   throws IOException {
961     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
962     for (Path d : tableDirs) {
963       FileStatus[] regionDirs = fs.listStatus(d, new DirFilter(fs));
964       for (FileStatus regionDir : regionDirs) {
965         Path dd = regionDir.getPath();
966         if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
967           continue;
968         }
969         // Else its a region name.  Now look in region for families.
970         FileStatus[] familyDirs = fs.listStatus(dd, new DirFilter(fs));
971         for (FileStatus familyDir : familyDirs) {
972           Path family = familyDir.getPath();
973           // Now in family make sure only one file.
974           FileStatus[] familyStatus = fs.listStatus(family);
975           if (familyStatus.length > 1) {
976             LOG.debug(family.toString() + " has " + familyStatus.length +
977                 " files.");
978             return false;
979           }
980         }
981       }
982     }
983     return true;
984   }
985 
986   // TODO move this method OUT of FSUtils. No dependencies to HMaster
987   /**
988    * Returns the total overall fragmentation percentage. Includes hbase:meta and
989    * -ROOT- as well.
990    *
991    * @param master  The master defining the HBase root and file system.
992    * @return A map for each table and its percentage.
993    * @throws IOException When scanning the directory fails.
994    */
995   public static int getTotalTableFragmentation(final HMaster master)
996   throws IOException {
997     Map<String, Integer> map = getTableFragmentation(master);
998     return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
999   }
1000 
1001   /**
1002    * Runs through the HBase rootdir and checks how many stores for each table
1003    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1004    * percentage across all tables is stored under the special key "-TOTAL-".
1005    *
1006    * @param master  The master defining the HBase root and file system.
1007    * @return A map for each table and its percentage.
1008    *
1009    * @throws IOException When scanning the directory fails.
1010    */
1011   public static Map<String, Integer> getTableFragmentation(
1012     final HMaster master)
1013   throws IOException {
1014     Path path = getRootDir(master.getConfiguration());
1015     // since HMaster.getFileSystem() is package private
1016     FileSystem fs = path.getFileSystem(master.getConfiguration());
1017     return getTableFragmentation(fs, path);
1018   }
1019 
1020   /**
1021    * Runs through the HBase rootdir and checks how many stores for each table
1022    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1023    * percentage across all tables is stored under the special key "-TOTAL-".
1024    *
1025    * @param fs  The file system to use.
1026    * @param hbaseRootDir  The root directory to scan.
1027    * @return A map for each table and its percentage.
1028    * @throws IOException When scanning the directory fails.
1029    */
1030   public static Map<String, Integer> getTableFragmentation(
1031     final FileSystem fs, final Path hbaseRootDir)
1032   throws IOException {
1033     Map<String, Integer> frags = new HashMap<String, Integer>();
1034     int cfCountTotal = 0;
1035     int cfFragTotal = 0;
1036     DirFilter df = new DirFilter(fs);
1037     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1038     for (Path d : tableDirs) {
1039       int cfCount = 0;
1040       int cfFrag = 0;
1041       FileStatus[] regionDirs = fs.listStatus(d, df);
1042       for (FileStatus regionDir : regionDirs) {
1043         Path dd = regionDir.getPath();
1044         if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
1045           continue;
1046         }
1047         // else its a region name, now look in region for families
1048         FileStatus[] familyDirs = fs.listStatus(dd, df);
1049         for (FileStatus familyDir : familyDirs) {
1050           cfCount++;
1051           cfCountTotal++;
1052           Path family = familyDir.getPath();
1053           // now in family make sure only one file
1054           FileStatus[] familyStatus = fs.listStatus(family);
1055           if (familyStatus.length > 1) {
1056             cfFrag++;
1057             cfFragTotal++;
1058           }
1059         }
1060       }
1061       // compute percentage per table and store in result list
1062       frags.put(FSUtils.getTableName(d).getNameAsString(),
1063           Math.round((float) cfFrag / cfCount * 100));
1064     }
1065     // set overall percentage for all tables
1066     frags.put("-TOTAL-", Math.round((float) cfFragTotal / cfCountTotal * 100));
1067     return frags;
1068   }
1069 
1070   /**
1071    * Expects to find -ROOT- directory.
1072    * @param fs filesystem
1073    * @param hbaseRootDir hbase root directory
1074    * @return True if this a pre020 layout.
1075    * @throws IOException e
1076    */
1077   public static boolean isPre020FileLayout(final FileSystem fs,
1078     final Path hbaseRootDir)
1079   throws IOException {
1080     Path mapfiles = new Path(new Path(new Path(new Path(hbaseRootDir, "-ROOT-"),
1081       "70236052"), "info"), "mapfiles");
1082     return fs.exists(mapfiles);
1083   }
1084 
1085   /**
1086    * Runs through the hbase rootdir and checks all stores have only
1087    * one file in them -- that is, they've been major compacted.  Looks
1088    * at root and meta tables too.  This version differs from
1089    * {@link #isMajorCompacted(FileSystem, Path)} in that it expects a
1090    * pre-0.20.0 hbase layout on the filesystem.  Used migrating.
1091    * @param fs filesystem
1092    * @param hbaseRootDir hbase root directory
1093    * @return True if this hbase install is major compacted.
1094    * @throws IOException e
1095    */
1096   public static boolean isMajorCompactedPre020(final FileSystem fs,
1097       final Path hbaseRootDir)
1098   throws IOException {
1099     // Presumes any directory under hbase.rootdir is a table.
1100     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1101     for (Path d: tableDirs) {
1102       // Inside a table, there are compaction.dir directories to skip.
1103       // Otherwise, all else should be regions.  Then in each region, should
1104       // only be family directories.  Under each of these, should be a mapfile
1105       // and info directory and in these only one file.
1106       if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) {
1107         continue;
1108       }
1109       FileStatus[] regionDirs = fs.listStatus(d, new DirFilter(fs));
1110       for (FileStatus regionDir : regionDirs) {
1111         Path dd = regionDir.getPath();
1112         if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
1113           continue;
1114         }
1115         // Else its a region name.  Now look in region for families.
1116         FileStatus[] familyDirs = fs.listStatus(dd, new DirFilter(fs));
1117         for (FileStatus familyDir : familyDirs) {
1118           Path family = familyDir.getPath();
1119           FileStatus[] infoAndMapfile = fs.listStatus(family);
1120           // Assert that only info and mapfile in family dir.
1121           if (infoAndMapfile.length != 0 && infoAndMapfile.length != 2) {
1122             LOG.debug(family.toString() +
1123                 " has more than just info and mapfile: " + infoAndMapfile.length);
1124             return false;
1125           }
1126           // Make sure directory named info or mapfile.
1127           for (int ll = 0; ll < 2; ll++) {
1128             if (infoAndMapfile[ll].getPath().getName().equals("info") ||
1129                 infoAndMapfile[ll].getPath().getName().equals("mapfiles"))
1130               continue;
1131             LOG.debug("Unexpected directory name: " +
1132                 infoAndMapfile[ll].getPath());
1133             return false;
1134           }
1135           // Now in family, there are 'mapfile' and 'info' subdirs.  Just
1136           // look in the 'mapfile' subdir.
1137           FileStatus[] familyStatus =
1138               fs.listStatus(new Path(family, "mapfiles"));
1139           if (familyStatus.length > 1) {
1140             LOG.debug(family.toString() + " has " + familyStatus.length +
1141                 " files.");
1142             return false;
1143           }
1144         }
1145       }
1146     }
1147     return true;
1148   }
1149 
1150   /**
1151    * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
1152    * path rootdir
1153    *
1154    * @param rootdir qualified path of HBase root directory
1155    * @param tableName name of table
1156    * @return {@link org.apache.hadoop.fs.Path} for table
1157    */
1158   public static Path getTableDir(Path rootdir, final TableName tableName) {
1159     return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
1160         tableName.getQualifierAsString());
1161   }
1162 
1163   /**
1164    * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
1165    * the table directory under
1166    * path rootdir
1167    *
1168    * @param tablePath path of table
1169    * @return {@link org.apache.hadoop.fs.Path} for table
1170    */
1171   public static TableName getTableName(Path tablePath) {
1172     return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
1173   }
1174 
1175   /**
1176    * Returns the {@link org.apache.hadoop.fs.Path} object representing
1177    * the namespace directory under path rootdir
1178    *
1179    * @param rootdir qualified path of HBase root directory
1180    * @param namespace namespace name
1181    * @return {@link org.apache.hadoop.fs.Path} for table
1182    */
1183   public static Path getNamespaceDir(Path rootdir, final String namespace) {
1184     return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
1185         new Path(namespace)));
1186   }
1187 
1188   /**
1189    * A {@link PathFilter} that returns only regular files.
1190    */
1191   static class FileFilter implements PathFilter {
1192     private final FileSystem fs;
1193 
1194     public FileFilter(final FileSystem fs) {
1195       this.fs = fs;
1196     }
1197 
1198     @Override
1199     public boolean accept(Path p) {
1200       try {
1201         return fs.isFile(p);
1202       } catch (IOException e) {
1203         LOG.debug("unable to verify if path=" + p + " is a regular file", e);
1204         return false;
1205       }
1206     }
1207   }
1208 
1209   /**
1210    * Directory filter that doesn't include any of the directories in the specified blacklist
1211    */
1212   public static class BlackListDirFilter implements PathFilter {
1213     private final FileSystem fs;
1214     private List<String> blacklist;
1215 
1216     /**
1217      * Create a filter on the give filesystem with the specified blacklist
1218      * @param fs filesystem to filter
1219      * @param directoryNameBlackList list of the names of the directories to filter. If
1220      *          <tt>null</tt>, all directories are returned
1221      */
1222     @SuppressWarnings("unchecked")
1223     public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
1224       this.fs = fs;
1225       blacklist =
1226         (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
1227           : directoryNameBlackList);
1228     }
1229 
1230     @Override
1231     public boolean accept(Path p) {
1232       boolean isValid = false;
1233       try {
1234         if (blacklist.contains(p.getName().toString())) {
1235           isValid = false;
1236         } else {
1237           isValid = fs.getFileStatus(p).isDir();
1238         }
1239       } catch (IOException e) {
1240         LOG.warn("An error occurred while verifying if [" + p.toString()
1241             + "] is a valid directory. Returning 'not valid' and continuing.", e);
1242       }
1243       return isValid;
1244     }
1245   }
1246 
1247   /**
1248    * A {@link PathFilter} that only allows directories.
1249    */
1250   public static class DirFilter extends BlackListDirFilter {
1251 
1252     public DirFilter(FileSystem fs) {
1253       super(fs, null);
1254     }
1255   }
1256 
1257   /**
1258    * A {@link PathFilter} that returns usertable directories. To get all directories use the
1259    * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
1260    */
1261   public static class UserTableDirFilter extends BlackListDirFilter {
1262 
1263     public UserTableDirFilter(FileSystem fs) {
1264       super(fs, HConstants.HBASE_NON_TABLE_DIRS);
1265     }
1266   }
1267 
1268   /**
1269    * Heuristic to determine whether is safe or not to open a file for append
1270    * Looks both for dfs.support.append and use reflection to search
1271    * for SequenceFile.Writer.syncFs() or FSDataOutputStream.hflush()
1272    * @param conf
1273    * @return True if append support
1274    */
1275   public static boolean isAppendSupported(final Configuration conf) {
1276     boolean append = conf.getBoolean("dfs.support.append", false);
1277     if (append) {
1278       try {
1279         // TODO: The implementation that comes back when we do a createWriter
1280         // may not be using SequenceFile so the below is not a definitive test.
1281         // Will do for now (hdfs-200).
1282         SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
1283         append = true;
1284       } catch (SecurityException e) {
1285       } catch (NoSuchMethodException e) {
1286         append = false;
1287       }
1288     }
1289     if (!append) {
1290       // Look for the 0.21, 0.22, new-style append evidence.
1291       try {
1292         FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
1293         append = true;
1294       } catch (NoSuchMethodException e) {
1295         append = false;
1296       }
1297     }
1298     return append;
1299   }
1300 
1301   /**
1302    * @param conf
1303    * @return True if this filesystem whose scheme is 'hdfs'.
1304    * @throws IOException
1305    */
1306   public static boolean isHDFS(final Configuration conf) throws IOException {
1307     FileSystem fs = FileSystem.get(conf);
1308     String scheme = fs.getUri().getScheme();
1309     return scheme.equalsIgnoreCase("hdfs");
1310   }
1311 
1312   /**
1313    * Recover file lease. Used when a file might be suspect
1314    * to be had been left open by another process.
1315    * @param fs FileSystem handle
1316    * @param p Path of file to recover lease
1317    * @param conf Configuration handle
1318    * @throws IOException
1319    */
1320   public abstract void recoverFileLease(final FileSystem fs, final Path p,
1321       Configuration conf, CancelableProgressable reporter) throws IOException;
1322 
1323   public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
1324       throws IOException {
1325     List<Path> tableDirs = new LinkedList<Path>();
1326 
1327     for(FileStatus status :
1328         fs.globStatus(new Path(rootdir,
1329             new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
1330       tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
1331     }
1332     return tableDirs;
1333   }
1334 
1335   /**
1336    * @param fs
1337    * @param rootdir
1338    * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
1339    * .logs, .oldlogs, .corrupt folders.
1340    * @throws IOException
1341    */
1342   public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
1343       throws IOException {
1344     // presumes any directory under hbase.rootdir is a table
1345     FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1346     List<Path> tabledirs = new ArrayList<Path>(dirs.length);
1347     for (FileStatus dir: dirs) {
1348       tabledirs.add(dir.getPath());
1349     }
1350     return tabledirs;
1351   }
1352 
1353   /**
1354    * Checks if the given path is the one with 'recovered.edits' dir.
1355    * @param path
1356    * @return True if we recovered edits
1357    */
1358   public static boolean isRecoveredEdits(Path path) {
1359     return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
1360   }
1361 
1362   /**
1363    * Filter for all dirs that don't start with '.'
1364    */
1365   public static class RegionDirFilter implements PathFilter {
1366     // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1367     final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1368     final FileSystem fs;
1369 
1370     public RegionDirFilter(FileSystem fs) {
1371       this.fs = fs;
1372     }
1373 
1374     @Override
1375     public boolean accept(Path rd) {
1376       if (!regionDirPattern.matcher(rd.getName()).matches()) {
1377         return false;
1378       }
1379 
1380       try {
1381         return fs.getFileStatus(rd).isDir();
1382       } catch (IOException ioe) {
1383         // Maybe the file was moved or the fs was disconnected.
1384         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1385         return false;
1386       }
1387     }
1388   }
1389 
1390   /**
1391    * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1392    * .tableinfo
1393    * @param fs A file system for the Path
1394    * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir>
1395    * @return List of paths to valid region directories in table dir.
1396    * @throws IOException
1397    */
1398   public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1399     // assumes we are in a table dir.
1400     FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
1401     List<Path> regionDirs = new ArrayList<Path>(rds.length);
1402     for (FileStatus rdfs: rds) {
1403       Path rdPath = rdfs.getPath();
1404       regionDirs.add(rdPath);
1405     }
1406     return regionDirs;
1407   }
1408 
1409   /**
1410    * Filter for all dirs that are legal column family names.  This is generally used for colfam
1411    * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>.
1412    */
1413   public static class FamilyDirFilter implements PathFilter {
1414     final FileSystem fs;
1415 
1416     public FamilyDirFilter(FileSystem fs) {
1417       this.fs = fs;
1418     }
1419 
1420     @Override
1421     public boolean accept(Path rd) {
1422       try {
1423         // throws IAE if invalid
1424         HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(rd.getName()));
1425       } catch (IllegalArgumentException iae) {
1426         // path name is an invalid family name and thus is excluded.
1427         return false;
1428       }
1429 
1430       try {
1431         return fs.getFileStatus(rd).isDir();
1432       } catch (IOException ioe) {
1433         // Maybe the file was moved or the fs was disconnected.
1434         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1435         return false;
1436       }
1437     }
1438   }
1439 
1440   /**
1441    * Given a particular region dir, return all the familydirs inside it
1442    *
1443    * @param fs A file system for the Path
1444    * @param regionDir Path to a specific region directory
1445    * @return List of paths to valid family directories in region dir.
1446    * @throws IOException
1447    */
1448   public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1449     // assumes we are in a region dir.
1450     FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1451     List<Path> familyDirs = new ArrayList<Path>(fds.length);
1452     for (FileStatus fdfs: fds) {
1453       Path fdPath = fdfs.getPath();
1454       familyDirs.add(fdPath);
1455     }
1456     return familyDirs;
1457   }
1458 
1459   /**
1460    * Filter for HFiles that excludes reference files.
1461    */
1462   public static class HFileFilter implements PathFilter {
1463     // This pattern will accept 0.90+ style hex hfies files but reject reference files
1464     final public static Pattern hfilePattern = Pattern.compile("^([0-9a-f]+)$");
1465 
1466     final FileSystem fs;
1467 
1468     public HFileFilter(FileSystem fs) {
1469       this.fs = fs;
1470     }
1471 
1472     @Override
1473     public boolean accept(Path rd) {
1474       if (!hfilePattern.matcher(rd.getName()).matches()) {
1475         return false;
1476       }
1477 
1478       try {
1479         // only files
1480         return !fs.getFileStatus(rd).isDir();
1481       } catch (IOException ioe) {
1482         // Maybe the file was moved or the fs was disconnected.
1483         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1484         return false;
1485       }
1486     }
1487   }
1488 
1489   /**
1490    * @param conf
1491    * @return Returns the filesystem of the hbase rootdir.
1492    * @throws IOException
1493    */
1494   public static FileSystem getCurrentFileSystem(Configuration conf)
1495   throws IOException {
1496     return getRootDir(conf).getFileSystem(conf);
1497   }
1498 
1499 
1500   /**
1501    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1502    * table StoreFile names to the full Path.
1503    * <br>
1504    * Example...<br>
1505    * Key = 3944417774205889744  <br>
1506    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1507    *
1508    * @param map map to add values.  If null, this method will create and populate one to return
1509    * @param fs  The file system to use.
1510    * @param hbaseRootDir  The root directory to scan.
1511    * @param tableName name of the table to scan.
1512    * @return Map keyed by StoreFile name with a value of the full Path.
1513    * @throws IOException When scanning the directory fails.
1514    */
1515   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1516   final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1517   throws IOException {
1518     if (map == null) {
1519       map = new HashMap<String, Path>();
1520     }
1521 
1522     // only include the directory paths to tables
1523     Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1524     // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1525     // should be regions.
1526     PathFilter df = new BlackListDirFilter(fs, HConstants.HBASE_NON_TABLE_DIRS);
1527     FileStatus[] regionDirs = fs.listStatus(tableDir);
1528     for (FileStatus regionDir : regionDirs) {
1529       Path dd = regionDir.getPath();
1530       if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
1531         continue;
1532       }
1533       // else its a region name, now look in region for families
1534       FileStatus[] familyDirs = fs.listStatus(dd, df);
1535       for (FileStatus familyDir : familyDirs) {
1536         Path family = familyDir.getPath();
1537         // now in family, iterate over the StoreFiles and
1538         // put in map
1539         FileStatus[] familyStatus = fs.listStatus(family);
1540         for (FileStatus sfStatus : familyStatus) {
1541           Path sf = sfStatus.getPath();
1542           map.put( sf.getName(), sf);
1543         }
1544       }
1545     }
1546     return map;
1547   }
1548 
1549 
1550   /**
1551    * Runs through the HBase rootdir and creates a reverse lookup map for
1552    * table StoreFile names to the full Path.
1553    * <br>
1554    * Example...<br>
1555    * Key = 3944417774205889744  <br>
1556    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1557    *
1558    * @param fs  The file system to use.
1559    * @param hbaseRootDir  The root directory to scan.
1560    * @return Map keyed by StoreFile name with a value of the full Path.
1561    * @throws IOException When scanning the directory fails.
1562    */
1563   public static Map<String, Path> getTableStoreFilePathMap(
1564     final FileSystem fs, final Path hbaseRootDir)
1565   throws IOException {
1566     Map<String, Path> map = new HashMap<String, Path>();
1567 
1568     // if this method looks similar to 'getTableFragmentation' that is because
1569     // it was borrowed from it.
1570 
1571     // only include the directory paths to tables
1572     for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1573       getTableStoreFilePathMap(map, fs, hbaseRootDir,
1574           FSUtils.getTableName(tableDir));
1575     }
1576     return map;
1577   }
1578 
1579   /**
1580    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1581    * This accommodates differences between hadoop versions, where hadoop 1
1582    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1583    * while Hadoop 2 will throw FileNotFoundException.
1584    *
1585    * @param fs file system
1586    * @param dir directory
1587    * @param filter path filter
1588    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1589    */
1590   public static FileStatus [] listStatus(final FileSystem fs,
1591       final Path dir, final PathFilter filter) throws IOException {
1592     FileStatus [] status = null;
1593     try {
1594       status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
1595     } catch (FileNotFoundException fnfe) {
1596       // if directory doesn't exist, return null
1597       if (LOG.isTraceEnabled()) {
1598         LOG.trace(dir + " doesn't exist");
1599       }
1600     }
1601     if (status == null || status.length < 1) return null;
1602     return status;
1603   }
1604 
1605   /**
1606    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1607    * This would accommodates differences between hadoop versions
1608    *
1609    * @param fs file system
1610    * @param dir directory
1611    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1612    */
1613   public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
1614     return listStatus(fs, dir, null);
1615   }
1616 
1617   /**
1618    * Calls fs.delete() and returns the value returned by the fs.delete()
1619    *
1620    * @param fs
1621    * @param path
1622    * @param recursive
1623    * @return the value returned by the fs.delete()
1624    * @throws IOException
1625    */
1626   public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
1627       throws IOException {
1628     return fs.delete(path, recursive);
1629   }
1630 
1631   /**
1632    * Calls fs.exists(). Checks if the specified path exists
1633    *
1634    * @param fs
1635    * @param path
1636    * @return the value returned by fs.exists()
1637    * @throws IOException
1638    */
1639   public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
1640     return fs.exists(path);
1641   }
1642 
1643   /**
1644    * Throw an exception if an action is not permitted by a user on a file.
1645    *
1646    * @param ugi
1647    *          the user
1648    * @param file
1649    *          the file
1650    * @param action
1651    *          the action
1652    */
1653   public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1654       FsAction action) throws AccessControlException {
1655     if (ugi.getShortUserName().equals(file.getOwner())) {
1656       if (file.getPermission().getUserAction().implies(action)) {
1657         return;
1658       }
1659     } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1660       if (file.getPermission().getGroupAction().implies(action)) {
1661         return;
1662       }
1663     } else if (file.getPermission().getOtherAction().implies(action)) {
1664       return;
1665     }
1666     throw new AccessControlException("Permission denied:" + " action=" + action
1667         + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1668   }
1669 
1670   private static boolean contains(String[] groups, String user) {
1671     for (String group : groups) {
1672       if (group.equals(user)) {
1673         return true;
1674       }
1675     }
1676     return false;
1677   }
1678 
1679   /**
1680    * Log the current state of the filesystem from a certain root directory
1681    * @param fs filesystem to investigate
1682    * @param root root file/directory to start logging from
1683    * @param LOG log to output information
1684    * @throws IOException if an unexpected exception occurs
1685    */
1686   public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
1687       throws IOException {
1688     LOG.debug("Current file system:");
1689     logFSTree(LOG, fs, root, "|-");
1690   }
1691 
1692   /**
1693    * Recursive helper to log the state of the FS
1694    *
1695    * @see #logFileSystemState(FileSystem, Path, Log)
1696    */
1697   private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
1698       throws IOException {
1699     FileStatus[] files = FSUtils.listStatus(fs, root, null);
1700     if (files == null) return;
1701 
1702     for (FileStatus file : files) {
1703       if (file.isDir()) {
1704         LOG.debug(prefix + file.getPath().getName() + "/");
1705         logFSTree(LOG, fs, file.getPath(), prefix + "---");
1706       } else {
1707         LOG.debug(prefix + file.getPath().getName());
1708       }
1709     }
1710   }
1711 
1712   public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
1713       throws IOException {
1714     // set the modify time for TimeToLive Cleaner
1715     fs.setTimes(src, EnvironmentEdgeManager.currentTimeMillis(), -1);
1716     return fs.rename(src, dest);
1717   }
1718 
1719   /**
1720    * This function is to scan the root path of the file system to get the
1721    * degree of locality for each region on each of the servers having at least
1722    * one block of that region.
1723    * This is used by the tool {@link RegionPlacementMaintainer}
1724    *
1725    * @param conf
1726    *          the configuration to use
1727    * @return the mapping from region encoded name to a map of server names to
1728    *           locality fraction
1729    * @throws IOException
1730    *           in case of file system errors or interrupts
1731    */
1732   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1733       final Configuration conf) throws IOException {
1734     return getRegionDegreeLocalityMappingFromFS(
1735         conf, null,
1736         conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1737 
1738   }
1739 
1740   /**
1741    * This function is to scan the root path of the file system to get the
1742    * degree of locality for each region on each of the servers having at least
1743    * one block of that region.
1744    *
1745    * @param conf
1746    *          the configuration to use
1747    * @param desiredTable
1748    *          the table you wish to scan locality for
1749    * @param threadPoolSize
1750    *          the thread pool size to use
1751    * @return the mapping from region encoded name to a map of server names to
1752    *           locality fraction
1753    * @throws IOException
1754    *           in case of file system errors or interrupts
1755    */
1756   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1757       final Configuration conf, final String desiredTable, int threadPoolSize)
1758       throws IOException {
1759     Map<String, Map<String, Float>> regionDegreeLocalityMapping =
1760         new ConcurrentHashMap<String, Map<String, Float>>();
1761     getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
1762         regionDegreeLocalityMapping);
1763     return regionDegreeLocalityMapping;
1764   }
1765 
1766   /**
1767    * This function is to scan the root path of the file system to get either the
1768    * mapping between the region name and its best locality region server or the
1769    * degree of locality of each region on each of the servers having at least
1770    * one block of that region. The output map parameters are both optional.
1771    *
1772    * @param conf
1773    *          the configuration to use
1774    * @param desiredTable
1775    *          the table you wish to scan locality for
1776    * @param threadPoolSize
1777    *          the thread pool size to use
1778    * @param regionToBestLocalityRSMapping
1779    *          the map into which to put the best locality mapping or null
1780    * @param regionDegreeLocalityMapping
1781    *          the map into which to put the locality degree mapping or null,
1782    *          must be a thread-safe implementation
1783    * @throws IOException
1784    *           in case of file system errors or interrupts
1785    */
1786   private static void getRegionLocalityMappingFromFS(
1787       final Configuration conf, final String desiredTable,
1788       int threadPoolSize,
1789       Map<String, String> regionToBestLocalityRSMapping,
1790       Map<String, Map<String, Float>> regionDegreeLocalityMapping)
1791       throws IOException {
1792     FileSystem fs =  FileSystem.get(conf);
1793     Path rootPath = FSUtils.getRootDir(conf);
1794     long startTime = EnvironmentEdgeManager.currentTimeMillis();
1795     Path queryPath;
1796     // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1797     if (null == desiredTable) {
1798       queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1799     } else {
1800       queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1801     }
1802 
1803     // reject all paths that are not appropriate
1804     PathFilter pathFilter = new PathFilter() {
1805       @Override
1806       public boolean accept(Path path) {
1807         // this is the region name; it may get some noise data
1808         if (null == path) {
1809           return false;
1810         }
1811 
1812         // no parent?
1813         Path parent = path.getParent();
1814         if (null == parent) {
1815           return false;
1816         }
1817 
1818         String regionName = path.getName();
1819         if (null == regionName) {
1820           return false;
1821         }
1822 
1823         if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
1824           return false;
1825         }
1826         return true;
1827       }
1828     };
1829 
1830     FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1831 
1832     if (null == statusList) {
1833       return;
1834     } else {
1835       LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
1836           statusList.length);
1837     }
1838 
1839     // lower the number of threads in case we have very few expected regions
1840     threadPoolSize = Math.min(threadPoolSize, statusList.length);
1841 
1842     // run in multiple threads
1843     ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
1844         threadPoolSize, 60, TimeUnit.SECONDS,
1845         new ArrayBlockingQueue<Runnable>(statusList.length));
1846     try {
1847       // ignore all file status items that are not of interest
1848       for (FileStatus regionStatus : statusList) {
1849         if (null == regionStatus) {
1850           continue;
1851         }
1852 
1853         if (!regionStatus.isDir()) {
1854           continue;
1855         }
1856 
1857         Path regionPath = regionStatus.getPath();
1858         if (null == regionPath) {
1859           continue;
1860         }
1861 
1862         tpe.execute(new FSRegionScanner(fs, regionPath,
1863             regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
1864       }
1865     } finally {
1866       tpe.shutdown();
1867       int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1868           60 * 1000);
1869       try {
1870         // here we wait until TPE terminates, which is either naturally or by
1871         // exceptions in the execution of the threads
1872         while (!tpe.awaitTermination(threadWakeFrequency,
1873             TimeUnit.MILLISECONDS)) {
1874           // printing out rough estimate, so as to not introduce
1875           // AtomicInteger
1876           LOG.info("Locality checking is underway: { Scanned Regions : "
1877               + tpe.getCompletedTaskCount() + "/"
1878               + tpe.getTaskCount() + " }");
1879         }
1880       } catch (InterruptedException e) {
1881         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1882       }
1883     }
1884 
1885     long overhead = EnvironmentEdgeManager.currentTimeMillis() - startTime;
1886     String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
1887 
1888     LOG.info(overheadMsg);
1889   }
1890 
1891   /**
1892    * Do our short circuit read setup.
1893    * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1894    * @param conf
1895    */
1896   public static void setupShortCircuitRead(final Configuration conf) {
1897     // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1898     boolean shortCircuitSkipChecksum =
1899       conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1900     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1901     if (shortCircuitSkipChecksum) {
1902       LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1903         "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
1904         "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1905       assert !shortCircuitSkipChecksum; //this will fail if assertions are on
1906     }
1907     checkShortCircuitReadBufferSize(conf);
1908   }
1909 
1910   /**
1911    * Check if short circuit read buffer size is set and if not, set it to hbase value.
1912    * @param conf
1913    */
1914   public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1915     final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1916     final int notSet = -1;
1917     // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1918     final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1919     int size = conf.getInt(dfsKey, notSet);
1920     // If a size is set, return -- we will use it.
1921     if (size != notSet) return;
1922     // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
1923     int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1924     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1925   }
1926 }