View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.IOException;
22  import java.util.HashMap;
23  import java.util.Map;
24  
25  import java.util.concurrent.atomic.AtomicInteger;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.classification.InterfaceAudience;
29  import org.apache.hadoop.fs.BlockLocation;
30  import org.apache.hadoop.fs.FileStatus;
31  import org.apache.hadoop.fs.FileSystem;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.HConstants;
34  
35  /**
36   * Thread that walks over the filesystem, and computes the mappings
37   * <Region -> BestHost> and <Region -> Map<HostName, fractional-locality-of-region>>
38   *
39   */
40  @InterfaceAudience.Private
41  class FSRegionScanner implements Runnable {
42    static private final Log LOG = LogFactory.getLog(FSRegionScanner.class);
43  
44    private Path regionPath;
45  
46    /**
47     * The file system used
48     */
49    private FileSystem fs;
50  
51    /**
52     * Maps each region to the RS with highest locality for that region.
53     */
54    private Map<String,String> regionToBestLocalityRSMapping;
55  
56    /**
57     * Maps region encoded names to maps of hostnames to fractional locality of
58     * that region on that host.
59     */
60    private Map<String, Map<String, Float>> regionDegreeLocalityMapping;
61  
62    FSRegionScanner(FileSystem fs, Path regionPath,
63                    Map<String, String> regionToBestLocalityRSMapping,
64                    Map<String, Map<String, Float>> regionDegreeLocalityMapping) {
65      this.fs = fs;
66      this.regionPath = regionPath;
67      this.regionToBestLocalityRSMapping = regionToBestLocalityRSMapping;
68      this.regionDegreeLocalityMapping = regionDegreeLocalityMapping;
69    }
70  
71    @Override
72    public void run() {
73      try {
74        // empty the map for each region
75        Map<String, AtomicInteger> blockCountMap = new HashMap<String, AtomicInteger>();
76  
77        //get table name
78        String tableName = regionPath.getParent().getName();
79        int totalBlkCount = 0;
80  
81        // ignore null
82        FileStatus[] cfList = fs.listStatus(regionPath);
83        if (null == cfList) {
84          return;
85        }
86  
87        // for each cf, get all the blocks information
88        for (FileStatus cfStatus : cfList) {
89          if (!cfStatus.isDir()) {
90            // skip because this is not a CF directory
91            continue;
92          }
93          if (cfStatus.getPath().getName().startsWith(".") ||
94              HConstants.HBASE_NON_USER_TABLE_DIRS.contains(cfStatus.getPath().getName())) {
95            continue;
96          }
97          FileStatus[] storeFileLists = fs.listStatus(cfStatus.getPath());
98          if (null == storeFileLists) {
99            continue;
100         }
101 
102         for (FileStatus storeFile : storeFileLists) {
103           BlockLocation[] blkLocations =
104             fs.getFileBlockLocations(storeFile, 0, storeFile.getLen());
105           if (null == blkLocations) {
106             continue;
107           }
108 
109           totalBlkCount += blkLocations.length;
110           for(BlockLocation blk: blkLocations) {
111             for (String host: blk.getHosts()) {
112               AtomicInteger count = blockCountMap.get(host);
113               if (count == null) {
114                 count = new AtomicInteger(0);
115                 blockCountMap.put(host, count);
116               }
117               count.incrementAndGet();
118             }
119           }
120         }
121       }
122 
123       if (regionToBestLocalityRSMapping != null) {
124         int largestBlkCount = 0;
125         String hostToRun = null;
126         for (Map.Entry<String, AtomicInteger> entry : blockCountMap.entrySet()) {
127           String host = entry.getKey();
128 
129           int tmp = entry.getValue().get();
130           if (tmp > largestBlkCount) {
131             largestBlkCount = tmp;
132             hostToRun = host;
133           }
134         }
135 
136         // empty regions could make this null
137         if (null == hostToRun) {
138           return;
139         }
140 
141         if (hostToRun.endsWith(".")) {
142           hostToRun = hostToRun.substring(0, hostToRun.length()-1);
143         }
144         String name = tableName + ":" + regionPath.getName();
145         synchronized (regionToBestLocalityRSMapping) {
146           regionToBestLocalityRSMapping.put(name,  hostToRun);
147         }
148       }
149 
150       if (regionDegreeLocalityMapping != null && totalBlkCount > 0) {
151         Map<String, Float> hostLocalityMap = new HashMap<String, Float>();
152         for (Map.Entry<String, AtomicInteger> entry : blockCountMap.entrySet()) {
153           String host = entry.getKey();
154           if (host.endsWith(".")) {
155             host = host.substring(0, host.length() - 1);
156           }
157           // Locality is fraction of blocks local to this host.
158           float locality = ((float)entry.getValue().get()) / totalBlkCount;
159           hostLocalityMap.put(host, locality);
160         }
161         // Put the locality map into the result map, keyed by the encoded name
162         // of the region.
163         regionDegreeLocalityMapping.put(regionPath.getName(), hostLocalityMap);
164       }
165     } catch (IOException e) {
166       LOG.warn("Problem scanning file system", e);
167     } catch (RuntimeException e) {
168       LOG.warn("Problem scanning file system", e);
169     }
170   }
171 }