1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.balancer;
19
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.NavigableMap;
27 import java.util.Random;
28 import java.util.TreeMap;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.HRegionInfo;
34 import org.apache.hadoop.hbase.ServerName;
35 import org.apache.hadoop.hbase.master.AssignmentManager;
36 import org.apache.hadoop.hbase.master.RegionPlan;
37
38 import com.google.common.collect.MinMaxPriorityQueue;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 @InterfaceAudience.Private
57 public class SimpleLoadBalancer extends BaseLoadBalancer {
58 private static final Log LOG = LogFactory.getLog(SimpleLoadBalancer.class);
59 private static final Random RANDOM = new Random(System.currentTimeMillis());
60
61 private RegionInfoComparator riComparator = new RegionInfoComparator();
62 private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
63
64
65
66
67
68
69
70
71
72
73 static class BalanceInfo {
74
75 private final int nextRegionForUnload;
76 private int numRegionsAdded;
77
78 public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
79 this.nextRegionForUnload = nextRegionForUnload;
80 this.numRegionsAdded = numRegionsAdded;
81 }
82
83 int getNextRegionForUnload() {
84 return nextRegionForUnload;
85 }
86
87 int getNumRegionsAdded() {
88 return numRegionsAdded;
89 }
90
91 void setNumRegionsAdded(int numAdded) {
92 this.numRegionsAdded = numAdded;
93 }
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181 public List<RegionPlan> balanceCluster(
182 Map<ServerName, List<HRegionInfo>> clusterMap) {
183 boolean emptyRegionServerPresent = false;
184 long startTime = System.currentTimeMillis();
185
186 ClusterLoadState cs = new ClusterLoadState(clusterMap);
187
188 if (!this.needsBalance(cs)) return null;
189
190 int numServers = cs.getNumServers();
191 NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
192 int numRegions = cs.getNumRegions();
193 int min = numRegions / numServers;
194 int max = numRegions % numServers == 0 ? min : min + 1;
195
196
197 StringBuilder strBalanceParam = new StringBuilder();
198 strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
199 .append(", numServers=").append(numServers).append(", max=").append(max)
200 .append(", min=").append(min);
201 LOG.debug(strBalanceParam.toString());
202
203
204
205 MinMaxPriorityQueue<RegionPlan> regionsToMove =
206 MinMaxPriorityQueue.orderedBy(rpComparator).create();
207 List<RegionPlan> regionsToReturn = new ArrayList<RegionPlan>();
208
209
210 int serversOverloaded = 0;
211
212 boolean fetchFromTail = false;
213 Map<ServerName, BalanceInfo> serverBalanceInfo =
214 new TreeMap<ServerName, BalanceInfo>();
215 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
216 serversByLoad.descendingMap().entrySet()) {
217 ServerAndLoad sal = server.getKey();
218 int regionCount = sal.getLoad();
219 if (regionCount <= max) {
220 serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
221 break;
222 }
223 serversOverloaded++;
224 List<HRegionInfo> regions = server.getValue();
225 int numToOffload = Math.min(regionCount - max, regions.size());
226
227
228 Collections.sort(regions, riComparator);
229 int numTaken = 0;
230 for (int i = 0; i <= numToOffload; ) {
231 HRegionInfo hri = regions.get(i);
232 if (fetchFromTail) {
233 hri = regions.get(regions.size() - 1 - i);
234 }
235 i++;
236
237 if (hri.isMetaRegion()) continue;
238 regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
239 numTaken++;
240 if (numTaken >= numToOffload) break;
241
242 if (emptyRegionServerPresent) {
243 fetchFromTail = !fetchFromTail;
244 }
245 }
246 serverBalanceInfo.put(sal.getServerName(),
247 new BalanceInfo(numToOffload, (-1)*numTaken));
248 }
249 int totalNumMoved = regionsToMove.size();
250
251
252 int neededRegions = 0;
253 fetchFromTail = false;
254
255 Map<ServerName, Integer> underloadedServers = new HashMap<ServerName, Integer>();
256 float average = (float)numRegions / numServers;
257 int maxToTake = numRegions - (int)average;
258 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
259 serversByLoad.entrySet()) {
260 if (maxToTake == 0) break;
261 int regionCount = server.getKey().getLoad();
262 if (regionCount >= min && regionCount > 0) {
263 continue;
264 }
265 int regionsToPut = min - regionCount;
266 if (regionsToPut == 0)
267 {
268 regionsToPut = 1;
269 }
270 maxToTake -= regionsToPut;
271 underloadedServers.put(server.getKey().getServerName(), regionsToPut);
272 }
273
274 int serversUnderloaded = underloadedServers.size();
275 int incr = 1;
276 List<ServerName> sns =
277 Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
278 Collections.shuffle(sns, RANDOM);
279 while (regionsToMove.size() > 0) {
280 int cnt = 0;
281 int i = incr > 0 ? 0 : underloadedServers.size()-1;
282 for (; i >= 0 && i < underloadedServers.size(); i += incr) {
283 if (regionsToMove.isEmpty()) break;
284 ServerName si = sns.get(i);
285 int numToTake = underloadedServers.get(si);
286 if (numToTake == 0) continue;
287
288 addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
289 if (emptyRegionServerPresent) {
290 fetchFromTail = !fetchFromTail;
291 }
292
293 underloadedServers.put(si, numToTake-1);
294 cnt++;
295 BalanceInfo bi = serverBalanceInfo.get(si);
296 if (bi == null) {
297 bi = new BalanceInfo(0, 0);
298 serverBalanceInfo.put(si, bi);
299 }
300 bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
301 }
302 if (cnt == 0) break;
303
304 incr = -incr;
305 }
306 for (Integer i : underloadedServers.values()) {
307
308 neededRegions += i;
309 }
310
311
312
313 if (neededRegions == 0 && regionsToMove.isEmpty()) {
314 long endTime = System.currentTimeMillis();
315 LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
316 "Moving " + totalNumMoved + " regions off of " +
317 serversOverloaded + " overloaded servers onto " +
318 serversUnderloaded + " less loaded servers");
319 return regionsToReturn;
320 }
321
322
323
324
325
326 if (neededRegions != 0) {
327
328 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
329 serversByLoad.descendingMap().entrySet()) {
330 BalanceInfo balanceInfo =
331 serverBalanceInfo.get(server.getKey().getServerName());
332 int idx =
333 balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
334 if (idx >= server.getValue().size()) break;
335 HRegionInfo region = server.getValue().get(idx);
336 if (region.isMetaRegion()) continue;
337 regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
338 totalNumMoved++;
339 if (--neededRegions == 0) {
340
341 break;
342 }
343 }
344 }
345
346
347
348
349
350 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
351 serversByLoad.entrySet()) {
352 int regionCount = server.getKey().getLoad();
353 if (regionCount >= min) break;
354 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
355 if(balanceInfo != null) {
356 regionCount += balanceInfo.getNumRegionsAdded();
357 }
358 if(regionCount >= min) {
359 continue;
360 }
361 int numToTake = min - regionCount;
362 int numTaken = 0;
363 while(numTaken < numToTake && 0 < regionsToMove.size()) {
364 addRegionPlan(regionsToMove, fetchFromTail,
365 server.getKey().getServerName(), regionsToReturn);
366 numTaken++;
367 if (emptyRegionServerPresent) {
368 fetchFromTail = !fetchFromTail;
369 }
370 }
371 }
372
373
374 if (0 < regionsToMove.size()) {
375 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
376 serversByLoad.entrySet()) {
377 int regionCount = server.getKey().getLoad();
378 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
379 if(balanceInfo != null) {
380 regionCount += balanceInfo.getNumRegionsAdded();
381 }
382 if(regionCount >= max) {
383 break;
384 }
385 addRegionPlan(regionsToMove, fetchFromTail,
386 server.getKey().getServerName(), regionsToReturn);
387 if (emptyRegionServerPresent) {
388 fetchFromTail = !fetchFromTail;
389 }
390 if (regionsToMove.isEmpty()) {
391 break;
392 }
393 }
394 }
395
396 long endTime = System.currentTimeMillis();
397
398 if (!regionsToMove.isEmpty() || neededRegions != 0) {
399
400 LOG.warn("regionsToMove=" + totalNumMoved +
401 ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
402 ", serversUnderloaded=" + serversUnderloaded);
403 StringBuilder sb = new StringBuilder();
404 for (Map.Entry<ServerName, List<HRegionInfo>> e: clusterMap.entrySet()) {
405 if (sb.length() > 0) sb.append(", ");
406 sb.append(e.getKey().toString());
407 sb.append(" ");
408 sb.append(e.getValue().size());
409 }
410 LOG.warn("Input " + sb.toString());
411 }
412
413
414 LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
415 "Moving " + totalNumMoved + " regions off of " +
416 serversOverloaded + " overloaded servers onto " +
417 serversUnderloaded + " less loaded servers");
418
419 return regionsToReturn;
420 }
421
422
423
424
425 private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
426 final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
427 RegionPlan rp = null;
428 if (!fetchFromTail) rp = regionsToMove.remove();
429 else rp = regionsToMove.removeLast();
430 rp.setDestination(sn);
431 regionsToReturn.add(rp);
432 }
433 }