View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import java.util.ArrayList;
21  import java.util.Arrays;
22  import java.util.Collections;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.NavigableMap;
27  import java.util.Random;
28  import java.util.TreeMap;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.ServerName;
35  import org.apache.hadoop.hbase.master.AssignmentManager;
36  import org.apache.hadoop.hbase.master.RegionPlan;
37  
38  import com.google.common.collect.MinMaxPriorityQueue;
39  
40  /**
41   * Makes decisions about the placement and movement of Regions across
42   * RegionServers.
43   *
44   * <p>Cluster-wide load balancing will occur only when there are no regions in
45   * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}.
46   *
47   * <p>Inline region placement with {@link #immediateAssignment} can be used when
48   * the Master needs to handle closed regions that it currently does not have
49   * a destination set for.  This can happen during master failover.
50   *
51   * <p>On cluster startup, bulk assignment can be used to determine
52   * locations for all Regions in a cluster.
53   *
54   * <p>This classes produces plans for the {@link AssignmentManager} to execute.
55   */
56  @InterfaceAudience.Private
57  public class SimpleLoadBalancer extends BaseLoadBalancer {
58    private static final Log LOG = LogFactory.getLog(SimpleLoadBalancer.class);
59    private static final Random RANDOM = new Random(System.currentTimeMillis());
60  
61    private RegionInfoComparator riComparator = new RegionInfoComparator();
62    private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
63  
64  
65    /**
66     * Stores additional per-server information about the regions added/removed
67     * during the run of the balancing algorithm.
68     *
69     * For servers that shed regions, we need to track which regions we have already
70     * shed. <b>nextRegionForUnload</b> contains the index in the list of regions on
71     * the server that is the next to be shed.
72     */
73    static class BalanceInfo {
74  
75      private final int nextRegionForUnload;
76      private int numRegionsAdded;
77  
78      public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
79        this.nextRegionForUnload = nextRegionForUnload;
80        this.numRegionsAdded = numRegionsAdded;
81      }
82  
83      int getNextRegionForUnload() {
84        return nextRegionForUnload;
85      }
86  
87      int getNumRegionsAdded() {
88        return numRegionsAdded;
89      }
90  
91      void setNumRegionsAdded(int numAdded) {
92        this.numRegionsAdded = numAdded;
93      }
94    }
95  
96    /**
97     * Generate a global load balancing plan according to the specified map of
98     * server information to the most loaded regions of each server.
99     *
100    * The load balancing invariant is that all servers are within 1 region of the
101    * average number of regions per server.  If the average is an integer number,
102    * all servers will be balanced to the average.  Otherwise, all servers will
103    * have either floor(average) or ceiling(average) regions.
104    *
105    * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that
106    *   we can fetch from both ends of the queue. 
107    * At the beginning, we check whether there was empty region server 
108    *   just discovered by Master. If so, we alternately choose new / old
109    *   regions from head / tail of regionsToMove, respectively. This alternation
110    *   avoids clustering young regions on the newly discovered region server.
111    *   Otherwise, we choose new regions from head of regionsToMove.
112    *   
113    * Another improvement from HBASE-3609 is that we assign regions from
114    *   regionsToMove to underloaded servers in round-robin fashion.
115    *   Previously one underloaded server would be filled before we move onto
116    *   the next underloaded server, leading to clustering of young regions.
117    *   
118    * Finally, we randomly shuffle underloaded servers so that they receive
119    *   offloaded regions relatively evenly across calls to balanceCluster().
120    *         
121    * The algorithm is currently implemented as such:
122    *
123    * <ol>
124    * <li>Determine the two valid numbers of regions each server should have,
125    *     <b>MIN</b>=floor(average) and <b>MAX</b>=ceiling(average).
126    *
127    * <li>Iterate down the most loaded servers, shedding regions from each so
128    *     each server hosts exactly <b>MAX</b> regions.  Stop once you reach a
129    *     server that already has &lt;= <b>MAX</b> regions.
130    *     <p>
131    *     Order the regions to move from most recent to least.
132    *
133    * <li>Iterate down the least loaded servers, assigning regions so each server
134    *     has exactly </b>MIN</b> regions.  Stop once you reach a server that
135    *     already has &gt;= <b>MIN</b> regions.
136    *
137    *     Regions being assigned to underloaded servers are those that were shed
138    *     in the previous step.  It is possible that there were not enough
139    *     regions shed to fill each underloaded server to <b>MIN</b>.  If so we
140    *     end up with a number of regions required to do so, <b>neededRegions</b>.
141    *
142    *     It is also possible that we were able to fill each underloaded but ended
143    *     up with regions that were unassigned from overloaded servers but that
144    *     still do not have assignment.
145    *
146    *     If neither of these conditions hold (no regions needed to fill the
147    *     underloaded servers, no regions leftover from overloaded servers),
148    *     we are done and return.  Otherwise we handle these cases below.
149    *
150    * <li>If <b>neededRegions</b> is non-zero (still have underloaded servers),
151    *     we iterate the most loaded servers again, shedding a single server from
152    *     each (this brings them from having <b>MAX</b> regions to having
153    *     <b>MIN</b> regions).
154    *
155    * <li>We now definitely have more regions that need assignment, either from
156    *     the previous step or from the original shedding from overloaded servers.
157    *     Iterate the least loaded servers filling each to <b>MIN</b>.
158    *
159    * <li>If we still have more regions that need assignment, again iterate the
160    *     least loaded servers, this time giving each one (filling them to
161    *     </b>MAX</b>) until we run out.
162    *
163    * <li>All servers will now either host <b>MIN</b> or <b>MAX</b> regions.
164    *
165    *     In addition, any server hosting &gt;= <b>MAX</b> regions is guaranteed
166    *     to end up with <b>MAX</b> regions at the end of the balancing.  This
167    *     ensures the minimal number of regions possible are moved.
168    * </ol>
169    *
170    * TODO: We can at-most reassign the number of regions away from a particular
171    *       server to be how many they report as most loaded.
172    *       Should we just keep all assignment in memory?  Any objections?
173    *       Does this mean we need HeapSize on HMaster?  Or just careful monitor?
174    *       (current thinking is we will hold all assignments in memory)
175    *
176    * @param clusterMap Map of regionservers and their load/region information to
177    *                   a list of their most loaded regions
178    * @return a list of regions to be moved, including source and destination,
179    *         or null if cluster is already balanced
180    */
181   public List<RegionPlan> balanceCluster(
182       Map<ServerName, List<HRegionInfo>> clusterMap) {
183     boolean emptyRegionServerPresent = false;
184     long startTime = System.currentTimeMillis();
185 
186     ClusterLoadState cs = new ClusterLoadState(clusterMap);
187 
188     if (!this.needsBalance(cs)) return null;
189     
190     int numServers = cs.getNumServers();
191     NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
192     int numRegions = cs.getNumRegions();
193     int min = numRegions / numServers;
194     int max = numRegions % numServers == 0 ? min : min + 1;
195 
196     // Using to check balance result.
197     StringBuilder strBalanceParam = new StringBuilder();
198     strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
199         .append(", numServers=").append(numServers).append(", max=").append(max)
200         .append(", min=").append(min);
201     LOG.debug(strBalanceParam.toString());
202 
203     // Balance the cluster
204     // TODO: Look at data block locality or a more complex load to do this
205     MinMaxPriorityQueue<RegionPlan> regionsToMove =
206       MinMaxPriorityQueue.orderedBy(rpComparator).create();
207     List<RegionPlan> regionsToReturn = new ArrayList<RegionPlan>();
208 
209     // Walk down most loaded, pruning each to the max
210     int serversOverloaded = 0;
211     // flag used to fetch regions from head and tail of list, alternately
212     boolean fetchFromTail = false;
213     Map<ServerName, BalanceInfo> serverBalanceInfo =
214       new TreeMap<ServerName, BalanceInfo>();
215     for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
216         serversByLoad.descendingMap().entrySet()) {
217       ServerAndLoad sal = server.getKey();
218       int regionCount = sal.getLoad();
219       if (regionCount <= max) {
220         serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
221         break;
222       }
223       serversOverloaded++;
224       List<HRegionInfo> regions = server.getValue();
225       int numToOffload = Math.min(regionCount - max, regions.size());
226       // account for the out-of-band regions which were assigned to this server
227       // after some other region server crashed 
228       Collections.sort(regions, riComparator);
229       int numTaken = 0;
230       for (int i = 0; i <= numToOffload; ) {
231         HRegionInfo hri = regions.get(i); // fetch from head
232         if (fetchFromTail) {
233           hri = regions.get(regions.size() - 1 - i);
234         }
235         i++;
236         // Don't rebalance meta regions.
237         if (hri.isMetaRegion()) continue;
238         regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
239         numTaken++;
240         if (numTaken >= numToOffload) break;
241         // fetch in alternate order if there is new region server
242         if (emptyRegionServerPresent) {
243           fetchFromTail = !fetchFromTail;
244         }
245       }
246       serverBalanceInfo.put(sal.getServerName(),
247         new BalanceInfo(numToOffload, (-1)*numTaken));
248     }
249     int totalNumMoved = regionsToMove.size();
250 
251     // Walk down least loaded, filling each to the min
252     int neededRegions = 0; // number of regions needed to bring all up to min
253     fetchFromTail = false;
254 
255     Map<ServerName, Integer> underloadedServers = new HashMap<ServerName, Integer>();
256     float average = (float)numRegions / numServers; // for logging
257     int maxToTake = numRegions - (int)average;
258     for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
259         serversByLoad.entrySet()) {
260       if (maxToTake == 0) break; // no more to take
261       int regionCount = server.getKey().getLoad();
262       if (regionCount >= min && regionCount > 0) {
263         continue; // look for other servers which haven't reached min
264       }
265       int regionsToPut = min - regionCount;
266       if (regionsToPut == 0)
267       {
268         regionsToPut = 1;
269       }
270       maxToTake -= regionsToPut;
271       underloadedServers.put(server.getKey().getServerName(), regionsToPut);
272     }
273     // number of servers that get new regions
274     int serversUnderloaded = underloadedServers.size();
275     int incr = 1;
276     List<ServerName> sns =
277       Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
278     Collections.shuffle(sns, RANDOM);
279     while (regionsToMove.size() > 0) {
280       int cnt = 0;
281       int i = incr > 0 ? 0 : underloadedServers.size()-1;
282       for (; i >= 0 && i < underloadedServers.size(); i += incr) {
283         if (regionsToMove.isEmpty()) break;
284         ServerName si = sns.get(i);
285         int numToTake = underloadedServers.get(si);
286         if (numToTake == 0) continue;
287 
288         addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
289         if (emptyRegionServerPresent) {
290           fetchFromTail = !fetchFromTail;
291         }
292 
293         underloadedServers.put(si, numToTake-1);
294         cnt++;
295         BalanceInfo bi = serverBalanceInfo.get(si);
296         if (bi == null) {
297           bi = new BalanceInfo(0, 0);
298           serverBalanceInfo.put(si, bi);
299         }
300         bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
301       }
302       if (cnt == 0) break;
303       // iterates underloadedServers in the other direction
304       incr = -incr;
305     }
306     for (Integer i : underloadedServers.values()) {
307       // If we still want to take some, increment needed
308       neededRegions += i;
309     }
310 
311     // If none needed to fill all to min and none left to drain all to max,
312     // we are done
313     if (neededRegions == 0 && regionsToMove.isEmpty()) {
314       long endTime = System.currentTimeMillis();
315       LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
316           "Moving " + totalNumMoved + " regions off of " +
317           serversOverloaded + " overloaded servers onto " +
318           serversUnderloaded + " less loaded servers");
319       return regionsToReturn;
320     }
321 
322     // Need to do a second pass.
323     // Either more regions to assign out or servers that are still underloaded
324 
325     // If we need more to fill min, grab one from each most loaded until enough
326     if (neededRegions != 0) {
327       // Walk down most loaded, grabbing one from each until we get enough
328       for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
329         serversByLoad.descendingMap().entrySet()) {
330         BalanceInfo balanceInfo =
331           serverBalanceInfo.get(server.getKey().getServerName());
332         int idx =
333           balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
334         if (idx >= server.getValue().size()) break;
335         HRegionInfo region = server.getValue().get(idx);
336         if (region.isMetaRegion()) continue; // Don't move meta regions.
337         regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
338         totalNumMoved++;
339         if (--neededRegions == 0) {
340           // No more regions needed, done shedding
341           break;
342         }
343       }
344     }
345 
346     // Now we have a set of regions that must be all assigned out
347     // Assign each underloaded up to the min, then if leftovers, assign to max
348 
349     // Walk down least loaded, assigning to each to fill up to min
350     for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
351         serversByLoad.entrySet()) {
352       int regionCount = server.getKey().getLoad();
353       if (regionCount >= min) break;
354       BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
355       if(balanceInfo != null) {
356         regionCount += balanceInfo.getNumRegionsAdded();
357       }
358       if(regionCount >= min) {
359         continue;
360       }
361       int numToTake = min - regionCount;
362       int numTaken = 0;
363       while(numTaken < numToTake && 0 < regionsToMove.size()) {
364         addRegionPlan(regionsToMove, fetchFromTail,
365           server.getKey().getServerName(), regionsToReturn);
366         numTaken++;
367         if (emptyRegionServerPresent) {
368           fetchFromTail = !fetchFromTail;
369         }
370       }
371     }
372 
373     // If we still have regions to dish out, assign underloaded to max
374     if (0 < regionsToMove.size()) {
375       for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
376         serversByLoad.entrySet()) {
377         int regionCount = server.getKey().getLoad();
378         BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
379         if(balanceInfo != null) {
380           regionCount += balanceInfo.getNumRegionsAdded();
381         }
382         if(regionCount >= max) {
383           break;
384         }
385         addRegionPlan(regionsToMove, fetchFromTail,
386           server.getKey().getServerName(), regionsToReturn);
387         if (emptyRegionServerPresent) {
388           fetchFromTail = !fetchFromTail;
389         }
390         if (regionsToMove.isEmpty()) {
391           break;
392         }
393       }
394     }
395 
396     long endTime = System.currentTimeMillis();
397 
398     if (!regionsToMove.isEmpty() || neededRegions != 0) {
399       // Emit data so can diagnose how balancer went astray.
400       LOG.warn("regionsToMove=" + totalNumMoved +
401         ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
402         ", serversUnderloaded=" + serversUnderloaded);
403       StringBuilder sb = new StringBuilder();
404       for (Map.Entry<ServerName, List<HRegionInfo>> e: clusterMap.entrySet()) {
405         if (sb.length() > 0) sb.append(", ");
406         sb.append(e.getKey().toString());
407         sb.append(" ");
408         sb.append(e.getValue().size());
409       }
410       LOG.warn("Input " + sb.toString());
411     }
412 
413     // All done!
414     LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
415         "Moving " + totalNumMoved + " regions off of " +
416         serversOverloaded + " overloaded servers onto " +
417         serversUnderloaded + " less loaded servers");
418 
419     return regionsToReturn;
420   }
421 
422   /**
423    * Add a region from the head or tail to the List of regions to return.
424    */
425   private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
426       final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
427     RegionPlan rp = null;
428     if (!fetchFromTail) rp = regionsToMove.remove();
429     else rp = regionsToMove.removeLast();
430     rp.setDestination(sn);
431     regionsToReturn.add(rp);
432   }
433 }