1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.IOException;
22 import java.util.HashMap;
23 import java.util.Map;
24
25 import java.util.concurrent.atomic.AtomicInteger;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.classification.InterfaceAudience;
29 import org.apache.hadoop.fs.BlockLocation;
30 import org.apache.hadoop.fs.FileStatus;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.HConstants;
34
35
36
37
38
39
40 @InterfaceAudience.Private
41 class FSRegionScanner implements Runnable {
42 static private final Log LOG = LogFactory.getLog(FSRegionScanner.class);
43
44 private Path regionPath;
45
46
47
48
49 private FileSystem fs;
50
51
52
53
54 private Map<String,String> regionToBestLocalityRSMapping;
55
56
57
58
59
60 private Map<String, Map<String, Float>> regionDegreeLocalityMapping;
61
62 FSRegionScanner(FileSystem fs, Path regionPath,
63 Map<String, String> regionToBestLocalityRSMapping,
64 Map<String, Map<String, Float>> regionDegreeLocalityMapping) {
65 this.fs = fs;
66 this.regionPath = regionPath;
67 this.regionToBestLocalityRSMapping = regionToBestLocalityRSMapping;
68 this.regionDegreeLocalityMapping = regionDegreeLocalityMapping;
69 }
70
71 @Override
72 public void run() {
73 try {
74
75 Map<String, AtomicInteger> blockCountMap = new HashMap<String, AtomicInteger>();
76
77
78 String tableName = regionPath.getParent().getName();
79 int totalBlkCount = 0;
80
81
82 FileStatus[] cfList = fs.listStatus(regionPath);
83 if (null == cfList) {
84 return;
85 }
86
87
88 for (FileStatus cfStatus : cfList) {
89 if (!cfStatus.isDir()) {
90
91 continue;
92 }
93 if (cfStatus.getPath().getName().startsWith(".") ||
94 HConstants.HBASE_NON_USER_TABLE_DIRS.contains(cfStatus.getPath().getName())) {
95 continue;
96 }
97 FileStatus[] storeFileLists = fs.listStatus(cfStatus.getPath());
98 if (null == storeFileLists) {
99 continue;
100 }
101
102 for (FileStatus storeFile : storeFileLists) {
103 BlockLocation[] blkLocations =
104 fs.getFileBlockLocations(storeFile, 0, storeFile.getLen());
105 if (null == blkLocations) {
106 continue;
107 }
108
109 totalBlkCount += blkLocations.length;
110 for(BlockLocation blk: blkLocations) {
111 for (String host: blk.getHosts()) {
112 AtomicInteger count = blockCountMap.get(host);
113 if (count == null) {
114 count = new AtomicInteger(0);
115 blockCountMap.put(host, count);
116 }
117 count.incrementAndGet();
118 }
119 }
120 }
121 }
122
123 if (regionToBestLocalityRSMapping != null) {
124 int largestBlkCount = 0;
125 String hostToRun = null;
126 for (Map.Entry<String, AtomicInteger> entry : blockCountMap.entrySet()) {
127 String host = entry.getKey();
128
129 int tmp = entry.getValue().get();
130 if (tmp > largestBlkCount) {
131 largestBlkCount = tmp;
132 hostToRun = host;
133 }
134 }
135
136
137 if (null == hostToRun) {
138 return;
139 }
140
141 if (hostToRun.endsWith(".")) {
142 hostToRun = hostToRun.substring(0, hostToRun.length()-1);
143 }
144 String name = tableName + ":" + regionPath.getName();
145 synchronized (regionToBestLocalityRSMapping) {
146 regionToBestLocalityRSMapping.put(name, hostToRun);
147 }
148 }
149
150 if (regionDegreeLocalityMapping != null && totalBlkCount > 0) {
151 Map<String, Float> hostLocalityMap = new HashMap<String, Float>();
152 for (Map.Entry<String, AtomicInteger> entry : blockCountMap.entrySet()) {
153 String host = entry.getKey();
154 if (host.endsWith(".")) {
155 host = host.substring(0, host.length() - 1);
156 }
157
158 float locality = ((float)entry.getValue().get()) / totalBlkCount;
159 hostLocalityMap.put(host, locality);
160 }
161
162
163 regionDegreeLocalityMapping.put(regionPath.getName(), hostLocalityMap);
164 }
165 } catch (IOException e) {
166 LOG.warn("Problem scanning file system", e);
167 } catch (RuntimeException e) {
168 LOG.warn("Problem scanning file system", e);
169 }
170 }
171 }