1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master;
19
20 import java.lang.Thread.UncaughtExceptionHandler;
21 import java.util.ArrayList;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.TimeUnit;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.classification.InterfaceAudience;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.HRegionInfo;
36 import org.apache.hadoop.hbase.Server;
37 import org.apache.hadoop.hbase.ServerName;
38 import org.apache.hadoop.hbase.master.RegionState.State;
39
40
41
42
43
44 @InterfaceAudience.Private
45 public class GeneralBulkAssigner extends BulkAssigner {
46 private static final Log LOG = LogFactory.getLog(GeneralBulkAssigner.class);
47
48 private Map<ServerName, List<HRegionInfo>> failedPlans
49 = new ConcurrentHashMap<ServerName, List<HRegionInfo>>();
50 private ExecutorService pool;
51
52 final Map<ServerName, List<HRegionInfo>> bulkPlan;
53 final AssignmentManager assignmentManager;
54 final boolean waitTillAllAssigned;
55
56 public GeneralBulkAssigner(final Server server,
57 final Map<ServerName, List<HRegionInfo>> bulkPlan,
58 final AssignmentManager am, final boolean waitTillAllAssigned) {
59 super(server);
60 this.bulkPlan = bulkPlan;
61 this.assignmentManager = am;
62 this.waitTillAllAssigned = waitTillAllAssigned;
63 }
64
65 @Override
66 protected String getThreadNamePrefix() {
67 return this.server.getServerName() + "-GeneralBulkAssigner";
68 }
69
70 @Override
71 protected void populatePool(ExecutorService pool) {
72 this.pool = pool;
73 for (Map.Entry<ServerName, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
74 pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(),
75 this.assignmentManager, this.failedPlans));
76 }
77 }
78
79
80
81
82
83
84 @Override
85 protected boolean waitUntilDone(final long timeout)
86 throws InterruptedException {
87 Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
88 for (List<HRegionInfo> regionList : bulkPlan.values()) {
89 regionSet.addAll(regionList);
90 }
91
92 pool.shutdown();
93 int serverCount = bulkPlan.size();
94 int regionCount = regionSet.size();
95 long startTime = System.currentTimeMillis();
96 long rpcWaitTime = startTime + timeout;
97 while (!server.isStopped() && !pool.isTerminated()
98 && rpcWaitTime > System.currentTimeMillis()) {
99 if (failedPlans.isEmpty()) {
100 pool.awaitTermination(100, TimeUnit.MILLISECONDS);
101 } else {
102 reassignFailedPlans();
103 }
104 }
105 if (!pool.isTerminated()) {
106 LOG.warn("bulk assigner is still running after "
107 + (System.currentTimeMillis() - startTime) + "ms, shut it down now");
108
109 List<Runnable> notStarted = pool.shutdownNow();
110 if (notStarted != null && !notStarted.isEmpty()) {
111 server.abort("some single server assigner hasn't started yet"
112 + " when the bulk assigner timed out", null);
113 return false;
114 }
115 }
116
117 int reassigningRegions = 0;
118 if (!failedPlans.isEmpty() && !server.isStopped()) {
119 reassigningRegions = reassignFailedPlans();
120 }
121
122 Configuration conf = server.getConfiguration();
123 long perRegionOpenTimeGuesstimate =
124 conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
125 long endTime = Math.max(System.currentTimeMillis(), rpcWaitTime)
126 + perRegionOpenTimeGuesstimate * (reassigningRegions + 1);
127 RegionStates regionStates = assignmentManager.getRegionStates();
128
129 while (!regionSet.isEmpty() && !server.isStopped() && endTime > System.currentTimeMillis()) {
130 Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
131 while (regionInfoIterator.hasNext()) {
132 HRegionInfo hri = regionInfoIterator.next();
133 if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
134 State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
135 regionInfoIterator.remove();
136 }
137 }
138 if (!waitTillAllAssigned) {
139
140 break;
141 }
142 if (!regionSet.isEmpty()) {
143 regionStates.waitForUpdate(100);
144 }
145 }
146
147 if (LOG.isDebugEnabled()) {
148 long elapsedTime = System.currentTimeMillis() - startTime;
149 String status = "successfully";
150 if (!regionSet.isEmpty()) {
151 status = "with " + regionSet.size() + " regions still in transition";
152 }
153 LOG.debug("bulk assigning total " + regionCount + " regions to "
154 + serverCount + " servers, took " + elapsedTime + "ms, " + status);
155 }
156 return regionSet.isEmpty();
157 }
158
159 @Override
160 protected long getTimeoutOnRIT() {
161
162
163 Configuration conf = server.getConfiguration();
164 long perRegionOpenTimeGuesstimate =
165 conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
166 int maxRegionsPerServer = 1;
167 for (List<HRegionInfo> regionList : bulkPlan.values()) {
168 int size = regionList.size();
169 if (size > maxRegionsPerServer) {
170 maxRegionsPerServer = size;
171 }
172 }
173 long timeout = perRegionOpenTimeGuesstimate * maxRegionsPerServer
174 + conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000)
175 + conf.getLong("hbase.bulk.assignment.perregionserver.rpc.waittime",
176 30000) * bulkPlan.size();
177 LOG.debug("Timeout-on-RIT=" + timeout);
178 return timeout;
179 }
180
181 @Override
182 protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
183 return new UncaughtExceptionHandler() {
184 @Override
185 public void uncaughtException(Thread t, Throwable e) {
186 LOG.warn("Assigning regions in " + t.getName(), e);
187 }
188 };
189 }
190
191 private int reassignFailedPlans() {
192 List<HRegionInfo> reassigningRegions = new ArrayList<HRegionInfo>();
193 for (Map.Entry<ServerName, List<HRegionInfo>> e : failedPlans.entrySet()) {
194 LOG.info("Failed assigning " + e.getValue().size()
195 + " regions to server " + e.getKey() + ", reassigning them");
196 reassigningRegions.addAll(failedPlans.remove(e.getKey()));
197 }
198 RegionStates regionStates = assignmentManager.getRegionStates();
199 for (HRegionInfo region : reassigningRegions) {
200 if (!regionStates.isRegionOnline(region)) {
201 assignmentManager.invokeAssign(region);
202 }
203 }
204 return reassigningRegions.size();
205 }
206
207
208
209
210 static class SingleServerBulkAssigner implements Runnable {
211 private final ServerName regionserver;
212 private final List<HRegionInfo> regions;
213 private final AssignmentManager assignmentManager;
214 private final Map<ServerName, List<HRegionInfo>> failedPlans;
215
216 SingleServerBulkAssigner(final ServerName regionserver,
217 final List<HRegionInfo> regions, final AssignmentManager am,
218 final Map<ServerName, List<HRegionInfo>> failedPlans) {
219 this.regionserver = regionserver;
220 this.regions = regions;
221 this.assignmentManager = am;
222 this.failedPlans = failedPlans;
223 }
224
225 @Override
226 public void run() {
227 try {
228 if (!assignmentManager.assign(regionserver, regions)) {
229 failedPlans.put(regionserver, regions);
230 }
231 } catch (Throwable t) {
232 LOG.warn("Failed bulking assigning " + regions.size()
233 + " region(s) to " + regionserver.getServerName()
234 + ", and continue to bulk assign others", t);
235 failedPlans.put(regionserver, regions);
236 }
237 }
238 }
239 }