1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.balancer;
19
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Comparator;
23 import java.util.Deque;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Map.Entry;
28 import java.util.NavigableMap;
29 import java.util.Random;
30 import java.util.Set;
31 import java.util.TreeMap;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.ClusterStatus;
37 import org.apache.hadoop.hbase.HBaseIOException;
38 import org.apache.hadoop.hbase.HRegionInfo;
39 import org.apache.hadoop.hbase.RegionLoad;
40 import org.apache.hadoop.hbase.ServerName;
41 import org.apache.hadoop.hbase.master.AssignmentManager;
42 import org.apache.hadoop.hbase.master.LoadBalancer;
43 import org.apache.hadoop.hbase.master.MasterServices;
44
45 import com.google.common.base.Joiner;
46 import com.google.common.collect.ArrayListMultimap;
47 import com.google.common.collect.Sets;
48
49
50
51
52
53
54
55 public abstract class BaseLoadBalancer implements LoadBalancer {
56 private static final int MIN_SERVER_BALANCE = 2;
57 private volatile boolean stopped = false;
58
59
60
61
62
63
64 protected static class Cluster {
65 ServerName[] servers;
66 ArrayList<String> tables;
67 HRegionInfo[] regions;
68 Deque<RegionLoad>[] regionLoads;
69 int[][] regionLocations;
70
71 int[][] regionsPerServer;
72 int[] regionIndexToServerIndex;
73 int[] initialRegionIndexToServerIndex;
74 int[] regionIndexToTableIndex;
75 int[][] numRegionsPerServerPerTable;
76 int[] numMaxRegionsPerTable;
77
78 Integer[] serverIndicesSortedByRegionCount;
79
80 Map<String, Integer> serversToIndex;
81 Map<String, Integer> tablesToIndex;
82
83 int numRegions;
84 int numServers;
85 int numTables;
86
87 int numMovedRegions = 0;
88 int numMovedMetaRegions = 0;
89
90 protected Cluster(Map<ServerName, List<HRegionInfo>> clusterState, Map<String, Deque<RegionLoad>> loads,
91 RegionLocationFinder regionFinder) {
92
93 serversToIndex = new HashMap<String, Integer>();
94 tablesToIndex = new HashMap<String, Integer>();
95
96
97
98 tables = new ArrayList<String>();
99
100
101 numRegions = 0;
102
103 int serverIndex = 0;
104
105
106
107 for (ServerName sn:clusterState.keySet()) {
108 if (serversToIndex.get(sn.getHostAndPort()) == null) {
109 serversToIndex.put(sn.getHostAndPort(), serverIndex++);
110 }
111 }
112
113
114 for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
115 numRegions += entry.getValue().size();
116 }
117
118 numServers = serversToIndex.size();
119 regionsPerServer = new int[serversToIndex.size()][];
120
121 servers = new ServerName[numServers];
122 regions = new HRegionInfo[numRegions];
123 regionIndexToServerIndex = new int[numRegions];
124 initialRegionIndexToServerIndex = new int[numRegions];
125 regionIndexToTableIndex = new int[numRegions];
126 regionLoads = new Deque[numRegions];
127 regionLocations = new int[numRegions][];
128 serverIndicesSortedByRegionCount = new Integer[numServers];
129
130 int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
131
132 for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
133 serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
134
135
136
137 if (servers[serverIndex] == null ||
138 servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
139 servers[serverIndex] = entry.getKey();
140 }
141
142 if (regionsPerServer[serverIndex] != null) {
143
144
145 regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
146 } else {
147 regionsPerServer[serverIndex] = new int[entry.getValue().size()];
148 }
149 serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
150 }
151
152 for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
153 serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
154 regionPerServerIndex = 0;
155
156 for (HRegionInfo region : entry.getValue()) {
157 String tableName = region.getTable().getNameAsString();
158 Integer idx = tablesToIndex.get(tableName);
159 if (idx == null) {
160 tables.add(tableName);
161 idx = tableIndex;
162 tablesToIndex.put(tableName, tableIndex++);
163 }
164
165 regions[regionIndex] = region;
166 regionIndexToServerIndex[regionIndex] = serverIndex;
167 initialRegionIndexToServerIndex[regionIndex] = serverIndex;
168 regionIndexToTableIndex[regionIndex] = idx;
169 regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
170
171
172 if (loads != null) {
173 Deque<RegionLoad> rl = loads.get(region.getRegionNameAsString());
174
175 if (rl == null) {
176
177 rl = loads.get(region.getEncodedName());
178 }
179 regionLoads[regionIndex] = rl;
180 }
181
182 if (regionFinder != null) {
183
184 List<ServerName> loc = regionFinder.getTopBlockLocations(region);
185 regionLocations[regionIndex] = new int[loc.size()];
186 for (int i=0; i < loc.size(); i++) {
187 regionLocations[regionIndex][i] =
188 loc.get(i) == null ? -1 :
189 (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 : serversToIndex.get(loc.get(i).getHostAndPort()));
190 }
191 }
192
193 regionIndex++;
194 }
195 }
196
197 numTables = tables.size();
198 numRegionsPerServerPerTable = new int[numServers][numTables];
199
200 for (int i = 0; i < numServers; i++) {
201 for (int j = 0; j < numTables; j++) {
202 numRegionsPerServerPerTable[i][j] = 0;
203 }
204 }
205
206 for (int i=0; i < regionIndexToServerIndex.length; i++) {
207 numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
208 }
209
210 numMaxRegionsPerTable = new int[numTables];
211 for (serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
212 for (tableIndex = 0 ; tableIndex < numRegionsPerServerPerTable[serverIndex].length; tableIndex++) {
213 if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
214 numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex];
215 }
216 }
217 }
218 }
219
220 public void moveOrSwapRegion(int lServer, int rServer, int lRegion, int rRegion) {
221
222 if (rRegion >= 0 && lRegion >= 0) {
223 regionMoved(rRegion, rServer, lServer);
224 regionsPerServer[rServer] = replaceRegion(regionsPerServer[rServer], rRegion, lRegion);
225 regionMoved(lRegion, lServer, rServer);
226 regionsPerServer[lServer] = replaceRegion(regionsPerServer[lServer], lRegion, rRegion);
227 } else if (rRegion >= 0) {
228 regionMoved(rRegion, rServer, lServer);
229 regionsPerServer[rServer] = removeRegion(regionsPerServer[rServer], rRegion);
230 regionsPerServer[lServer] = addRegion(regionsPerServer[lServer], rRegion);
231 } else if (lRegion >= 0) {
232 regionMoved(lRegion, lServer, rServer);
233 regionsPerServer[lServer] = removeRegion(regionsPerServer[lServer], lRegion);
234 regionsPerServer[rServer] = addRegion(regionsPerServer[rServer], lRegion);
235 }
236 }
237
238
239 void regionMoved(int regionIndex, int oldServerIndex, int newServerIndex) {
240 regionIndexToServerIndex[regionIndex] = newServerIndex;
241 if (initialRegionIndexToServerIndex[regionIndex] == newServerIndex) {
242 numMovedRegions--;
243 if (regions[regionIndex].isMetaRegion()) {
244 numMovedMetaRegions--;
245 }
246 } else if (initialRegionIndexToServerIndex[regionIndex] == oldServerIndex) {
247 numMovedRegions++;
248 if (regions[regionIndex].isMetaRegion()) {
249 numMovedMetaRegions++;
250 }
251 }
252 int tableIndex = regionIndexToTableIndex[regionIndex];
253 numRegionsPerServerPerTable[oldServerIndex][tableIndex]--;
254 numRegionsPerServerPerTable[newServerIndex][tableIndex]++;
255
256
257 if (numRegionsPerServerPerTable[newServerIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
258 numRegionsPerServerPerTable[newServerIndex][tableIndex] = numMaxRegionsPerTable[tableIndex];
259 } else if ((numRegionsPerServerPerTable[oldServerIndex][tableIndex] + 1)
260 == numMaxRegionsPerTable[tableIndex]) {
261
262 for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
263 if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
264 numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex];
265 }
266 }
267 }
268 }
269
270 int[] removeRegion(int[] regions, int regionIndex) {
271
272 int[] newRegions = new int[regions.length - 1];
273 int i = 0;
274 for (i = 0; i < regions.length; i++) {
275 if (regions[i] == regionIndex) {
276 break;
277 }
278 newRegions[i] = regions[i];
279 }
280 System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i);
281 return newRegions;
282 }
283
284 int[] addRegion(int[] regions, int regionIndex) {
285 int[] newRegions = new int[regions.length + 1];
286 System.arraycopy(regions, 0, newRegions, 0, regions.length);
287 newRegions[newRegions.length - 1] = regionIndex;
288 return newRegions;
289 }
290
291 int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
292 int i = 0;
293 for (i = 0; i < regions.length; i++) {
294 if (regions[i] == regionIndex) {
295 regions[i] = newRegionIndex;
296 break;
297 }
298 }
299 return regions;
300 }
301
302 void sortServersByRegionCount() {
303 Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
304 }
305
306 int getNumRegions(int server) {
307 return regionsPerServer[server].length;
308 }
309
310 private Comparator<Integer> numRegionsComparator = new Comparator<Integer>() {
311 @Override
312 public int compare(Integer integer, Integer integer2) {
313 return Integer.valueOf(getNumRegions(integer)).compareTo(getNumRegions(integer2));
314 }
315 };
316
317 @Override
318 public String toString() {
319 String desc = "Cluster{" +
320 "servers=[";
321 for(ServerName sn:servers) {
322 desc += sn.getHostAndPort() + ", ";
323 }
324 desc +=
325 ", serverIndicesSortedByRegionCount="+
326 Arrays.toString(serverIndicesSortedByRegionCount) +
327 ", regionsPerServer=[";
328
329 for (int[]r:regionsPerServer) {
330 desc += Arrays.toString(r);
331 }
332 desc += "]" +
333 ", numMaxRegionsPerTable=" +
334 Arrays.toString(numMaxRegionsPerTable) +
335 ", numRegions=" +
336 numRegions +
337 ", numServers=" +
338 numServers +
339 ", numTables=" +
340 numTables +
341 ", numMovedRegions=" +
342 numMovedRegions +
343 ", numMovedMetaRegions=" +
344 numMovedMetaRegions +
345 '}';
346 return desc;
347 }
348 }
349
350
351 protected float slop;
352 private Configuration config;
353 private static final Random RANDOM = new Random(System.currentTimeMillis());
354 private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class);
355
356 protected final MetricsBalancer metricsBalancer = new MetricsBalancer();
357 protected MasterServices services;
358
359 @Override
360 public void setConf(Configuration conf) {
361 setSlop(conf);
362 if (slop < 0) slop = 0;
363 else if (slop > 1) slop = 1;
364
365 this.config = conf;
366 }
367
368 protected void setSlop(Configuration conf) {
369 this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
370 }
371
372 @Override
373 public Configuration getConf() {
374 return this.config;
375 }
376
377 @Override
378 public void setClusterStatus(ClusterStatus st) {
379
380 }
381
382 @Override
383 public void setMasterServices(MasterServices masterServices) {
384 this.services = masterServices;
385 }
386
387 protected boolean needsBalance(ClusterLoadState cs) {
388 if (cs.getNumServers() < MIN_SERVER_BALANCE) {
389 if (LOG.isDebugEnabled()) {
390 LOG.debug("Not running balancer because only " + cs.getNumServers()
391 + " active regionserver(s)");
392 }
393 return false;
394 }
395
396
397 float average = cs.getLoadAverage();
398 int floor = (int) Math.floor(average * (1 - slop));
399 int ceiling = (int) Math.ceil(average * (1 + slop));
400 if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
401 NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
402 if (LOG.isTraceEnabled()) {
403
404 LOG.trace("Skipping load balancing because balanced cluster; " +
405 "servers=" + cs.getNumServers() + " " +
406 "regions=" + cs.getNumRegions() + " average=" + average + " " +
407 "mostloaded=" + serversByLoad.lastKey().getLoad() +
408 " leastloaded=" + serversByLoad.firstKey().getLoad());
409 }
410 return false;
411 }
412 return true;
413 }
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432 @Override
433 public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
434 List<ServerName> servers) {
435 metricsBalancer.incrMiscInvocations();
436
437 if (regions.isEmpty() || servers.isEmpty()) {
438 return null;
439 }
440 Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
441 int numRegions = regions.size();
442 int numServers = servers.size();
443 int max = (int) Math.ceil((float) numRegions / numServers);
444 int serverIdx = 0;
445 if (numServers > 1) {
446 serverIdx = RANDOM.nextInt(numServers);
447 }
448 int regionIdx = 0;
449 for (int j = 0; j < numServers; j++) {
450 ServerName server = servers.get((j + serverIdx) % numServers);
451 List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
452 for (int i = regionIdx; i < numRegions; i += numServers) {
453 serverRegions.add(regions.get(i % numRegions));
454 }
455 assignments.put(server, serverRegions);
456 regionIdx++;
457 }
458 return assignments;
459 }
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478 @Override
479 public Map<HRegionInfo, ServerName> immediateAssignment(List<HRegionInfo> regions,
480 List<ServerName> servers) {
481 metricsBalancer.incrMiscInvocations();
482
483 Map<HRegionInfo, ServerName> assignments = new TreeMap<HRegionInfo, ServerName>();
484 for (HRegionInfo region : regions) {
485 assignments.put(region, randomAssignment(region, servers));
486 }
487 return assignments;
488 }
489
490
491
492
493 @Override
494 public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) {
495 metricsBalancer.incrMiscInvocations();
496
497 if (servers == null || servers.isEmpty()) {
498 LOG.warn("Wanted to do random assignment but no servers to assign to");
499 return null;
500 }
501 return servers.get(RANDOM.nextInt(servers.size()));
502 }
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521 @Override
522 public Map<ServerName, List<HRegionInfo>> retainAssignment(Map<HRegionInfo, ServerName> regions,
523 List<ServerName> servers) {
524
525 metricsBalancer.incrMiscInvocations();
526
527
528
529
530
531
532
533 ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create();
534 for (ServerName server : servers) {
535 serversByHostname.put(server.getHostname(), server);
536 }
537
538
539 Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
540
541 for (ServerName server : servers) {
542 assignments.put(server, new ArrayList<HRegionInfo>());
543 }
544
545
546
547
548 Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
549
550 int numRandomAssignments = 0;
551 int numRetainedAssigments = 0;
552 for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) {
553 HRegionInfo region = entry.getKey();
554 ServerName oldServerName = entry.getValue();
555 List<ServerName> localServers = new ArrayList<ServerName>();
556 if (oldServerName != null) {
557 localServers = serversByHostname.get(oldServerName.getHostname());
558 }
559 if (localServers.isEmpty()) {
560
561
562 ServerName randomServer = servers.get(RANDOM.nextInt(servers.size()));
563 assignments.get(randomServer).add(region);
564 numRandomAssignments++;
565 if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname());
566 } else if (localServers.size() == 1) {
567
568 assignments.get(localServers.get(0)).add(region);
569 numRetainedAssigments++;
570 } else {
571
572 int size = localServers.size();
573 ServerName target =
574 localServers.contains(oldServerName) ? oldServerName : localServers.get(RANDOM
575 .nextInt(size));
576 assignments.get(target).add(region);
577 numRetainedAssigments++;
578 }
579 }
580
581 String randomAssignMsg = "";
582 if (numRandomAssignments > 0) {
583 randomAssignMsg =
584 numRandomAssignments + " regions were assigned "
585 + "to random hosts, since the old hosts for these regions are no "
586 + "longer present in the cluster. These hosts were:\n "
587 + Joiner.on("\n ").join(oldHostsNoLongerPresent);
588 }
589
590 LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
591 + " retained the pre-restart assignment. " + randomAssignMsg);
592 return assignments;
593 }
594
595 @Override
596 public void initialize() throws HBaseIOException{
597 }
598
599 @Override
600 public void regionOnline(HRegionInfo regionInfo, ServerName sn) {
601 }
602
603 @Override
604 public void regionOffline(HRegionInfo regionInfo) {
605 }
606
607 @Override
608 public boolean isStopped() {
609 return stopped;
610 }
611
612 @Override
613 public void stop(String why) {
614 LOG.info("Load Balancer stop requested: "+why);
615 stopped = true;
616 }
617 }