View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.NavigableMap;
31  import java.util.Set;
32  import java.util.TreeMap;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentSkipListSet;
35  import java.util.concurrent.CopyOnWriteArrayList;
36  import java.util.concurrent.ThreadFactory;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.hbase.Chore;
48  import org.apache.hadoop.hbase.HBaseIOException;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.NotServingRegionException;
52  import org.apache.hadoop.hbase.RegionTransition;
53  import org.apache.hadoop.hbase.Server;
54  import org.apache.hadoop.hbase.ServerName;
55  import org.apache.hadoop.hbase.Stoppable;
56  import org.apache.hadoop.hbase.TableName;
57  import org.apache.hadoop.hbase.TableNotFoundException;
58  import org.apache.hadoop.hbase.catalog.CatalogTracker;
59  import org.apache.hadoop.hbase.catalog.MetaReader;
60  import org.apache.hadoop.hbase.client.Result;
61  import org.apache.hadoop.hbase.exceptions.DeserializationException;
62  import org.apache.hadoop.hbase.executor.EventHandler;
63  import org.apache.hadoop.hbase.executor.EventType;
64  import org.apache.hadoop.hbase.executor.ExecutorService;
65  import org.apache.hadoop.hbase.ipc.RpcClient;
66  import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
67  import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
68  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
69  import org.apache.hadoop.hbase.master.RegionState.State;
70  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
71  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
72  import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
73  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
74  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
75  import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
76  import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
77  import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
78  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
79  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
80  import org.apache.hadoop.hbase.regionserver.SplitTransaction;
81  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
82  import org.apache.hadoop.hbase.util.KeyLocker;
83  import org.apache.hadoop.hbase.util.Pair;
84  import org.apache.hadoop.hbase.util.PairOfSameType;
85  import org.apache.hadoop.hbase.util.Threads;
86  import org.apache.hadoop.hbase.util.Triple;
87  import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
88  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
89  import org.apache.hadoop.hbase.zookeeper.ZKTable;
90  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
91  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
92  import org.apache.hadoop.ipc.RemoteException;
93  import org.apache.zookeeper.AsyncCallback;
94  import org.apache.zookeeper.KeeperException;
95  import org.apache.zookeeper.KeeperException.NoNodeException;
96  import org.apache.zookeeper.KeeperException.NodeExistsException;
97  import org.apache.zookeeper.data.Stat;
98  
99  import com.google.common.base.Preconditions;
100 import com.google.common.collect.LinkedHashMultimap;
101 
102 /**
103  * Manages and performs region assignment.
104  * <p>
105  * Monitors ZooKeeper for events related to regions in transition.
106  * <p>
107  * Handles existing regions in transition during master failover.
108  */
109 @InterfaceAudience.Private
110 public class AssignmentManager extends ZooKeeperListener {
111   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
112 
113   public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
114       -1, -1L);
115 
116   public static final String ASSIGNMENT_TIMEOUT = "hbase.master.assignment.timeoutmonitor.timeout";
117   public static final int DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT = 600000;
118   public static final String ASSIGNMENT_TIMEOUT_MANAGEMENT = "hbase.assignment.timeout.management";
119   public static final boolean DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT = false;
120 
121   public static final String ALREADY_IN_TRANSITION_WAITTIME
122     = "hbase.assignment.already.intransition.waittime";
123   public static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
124 
125   protected final Server server;
126 
127   private ServerManager serverManager;
128 
129   private boolean shouldAssignRegionsWithFavoredNodes;
130 
131   private CatalogTracker catalogTracker;
132 
133   protected final TimeoutMonitor timeoutMonitor;
134 
135   private final TimerUpdater timerUpdater;
136 
137   private LoadBalancer balancer;
138 
139   private final MetricsAssignmentManager metricsAssignmentManager;
140 
141   private final TableLockManager tableLockManager;
142 
143   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
144 
145   final private KeyLocker<String> locker = new KeyLocker<String>();
146 
147   /**
148    * Map of regions to reopen after the schema of a table is changed. Key -
149    * encoded region name, value - HRegionInfo
150    */
151   private final Map <String, HRegionInfo> regionsToReopen;
152 
153   /*
154    * Maximum times we recurse an assignment/unassignment.
155    * See below in {@link #assign()} and {@link #unassign()}.
156    */
157   private final int maximumAttempts;
158 
159   /**
160    * Map of two merging regions from the region to be created.
161    */
162   private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
163     = new HashMap<String, PairOfSameType<HRegionInfo>>();
164 
165   /**
166    * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
167    * failure due to lack of availability of region plan
168    */
169   private final long sleepTimeBeforeRetryingMetaAssignment;
170 
171   /** Plans for region movement. Key is the encoded version of a region name*/
172   // TODO: When do plans get cleaned out?  Ever? In server open and in server
173   // shutdown processing -- St.Ack
174   // All access to this Map must be synchronized.
175   final NavigableMap<String, RegionPlan> regionPlans =
176     new TreeMap<String, RegionPlan>();
177 
178   private final ZKTable zkTable;
179 
180   /**
181    * Contains the server which need to update timer, these servers will be
182    * handled by {@link TimerUpdater}
183    */
184   private final ConcurrentSkipListSet<ServerName> serversInUpdatingTimer;
185 
186   private final ExecutorService executorService;
187 
188   // For unit tests, keep track of calls to ClosedRegionHandler
189   private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
190 
191   // For unit tests, keep track of calls to OpenedRegionHandler
192   private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
193 
194   //Thread pool executor service for timeout monitor
195   private java.util.concurrent.ExecutorService threadPoolExecutorService;
196 
197   // A bunch of ZK events workers. Each is a single thread executor service
198   private final java.util.concurrent.ExecutorService zkEventWorkers;
199 
200   private List<EventType> ignoreStatesRSOffline = Arrays.asList(
201       EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
202 
203   private final RegionStates regionStates;
204 
205   // The threshold to use bulk assigning. Using bulk assignment
206   // only if assigning at least this many regions to at least this
207   // many servers. If assigning fewer regions to fewer servers,
208   // bulk assigning may be not as efficient.
209   private final int bulkAssignThresholdRegions;
210   private final int bulkAssignThresholdServers;
211 
212   // Should bulk assignment wait till all regions are assigned,
213   // or it is timed out?  This is useful to measure bulk assignment
214   // performance, but not needed in most use cases.
215   private final boolean bulkAssignWaitTillAllAssigned;
216 
217   /**
218    * Indicator that AssignmentManager has recovered the region states so
219    * that ServerShutdownHandler can be fully enabled and re-assign regions
220    * of dead servers. So that when re-assignment happens, AssignmentManager
221    * has proper region states.
222    *
223    * Protected to ease testing.
224    */
225   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
226 
227   /** Is the TimeOutManagement activated **/
228   private final boolean tomActivated;
229 
230   /**
231    * A map to track the count a region fails to open in a row.
232    * So that we don't try to open a region forever if the failure is
233    * unrecoverable.  We don't put this information in region states
234    * because we don't expect this to happen frequently; we don't
235    * want to copy this information over during each state transition either.
236    */
237   private final ConcurrentHashMap<String, AtomicInteger>
238     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
239 
240   /**
241    * For testing only!  Set to true to skip handling of split.
242    */
243   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
244   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
245 
246   /** Listeners that are called on assignment events. */
247   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
248 
249   /**
250    * Constructs a new assignment manager.
251    *
252    * @param server
253    * @param serverManager
254    * @param catalogTracker
255    * @param service
256    * @throws KeeperException
257    * @throws IOException
258    */
259   public AssignmentManager(Server server, ServerManager serverManager,
260       CatalogTracker catalogTracker, final LoadBalancer balancer,
261       final ExecutorService service, MetricsMaster metricsMaster,
262       final TableLockManager tableLockManager) throws KeeperException, IOException {
263     super(server.getZooKeeper());
264     this.server = server;
265     this.serverManager = serverManager;
266     this.catalogTracker = catalogTracker;
267     this.executorService = service;
268     this.regionsToReopen = Collections.synchronizedMap
269                            (new HashMap<String, HRegionInfo> ());
270     Configuration conf = server.getConfiguration();
271     // Only read favored nodes if using the favored nodes load balancer.
272     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
273            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
274            FavoredNodeLoadBalancer.class);
275     this.tomActivated = conf.getBoolean(
276       ASSIGNMENT_TIMEOUT_MANAGEMENT, DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT);
277     if (tomActivated){
278       this.serversInUpdatingTimer =  new ConcurrentSkipListSet<ServerName>();
279       this.timeoutMonitor = new TimeoutMonitor(
280         conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000),
281         server, serverManager,
282         conf.getInt(ASSIGNMENT_TIMEOUT, DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT));
283       this.timerUpdater = new TimerUpdater(conf.getInt(
284         "hbase.master.assignment.timerupdater.period", 10000), server);
285       Threads.setDaemonThreadRunning(timerUpdater.getThread(),
286         server.getServerName() + ".timerUpdater");
287     } else {
288       this.serversInUpdatingTimer =  null;
289       this.timeoutMonitor = null;
290       this.timerUpdater = null;
291     }
292     this.zkTable = new ZKTable(this.watcher);
293     // This is the max attempts, not retries, so it should be at least 1.
294     this.maximumAttempts = Math.max(1,
295       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
296     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
297         "hbase.meta.assignment.retry.sleeptime", 1000l);
298     this.balancer = balancer;
299     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
300     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
301       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
302     this.regionStates = new RegionStates(server, serverManager);
303 
304     this.bulkAssignWaitTillAllAssigned =
305       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
306     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
307     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
308 
309     int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
310     ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
311     zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
312             TimeUnit.SECONDS, threadFactory);
313     this.tableLockManager = tableLockManager;
314 
315     this.metricsAssignmentManager = new MetricsAssignmentManager();
316   }
317 
318   void startTimeOutMonitor() {
319     if (tomActivated) {
320       Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), server.getServerName()
321           + ".timeoutMonitor");
322     }
323   }
324 
325   /**
326    * Add the listener to the notification list.
327    * @param listener The AssignmentListener to register
328    */
329   public void registerListener(final AssignmentListener listener) {
330     this.listeners.add(listener);
331   }
332 
333   /**
334    * Remove the listener from the notification list.
335    * @param listener The AssignmentListener to unregister
336    */
337   public boolean unregisterListener(final AssignmentListener listener) {
338     return this.listeners.remove(listener);
339   }
340 
341   /**
342    * @return Instance of ZKTable.
343    */
344   public ZKTable getZKTable() {
345     // These are 'expensive' to make involving trip to zk ensemble so allow
346     // sharing.
347     return this.zkTable;
348   }
349 
350   /**
351    * This SHOULD not be public. It is public now
352    * because of some unit tests.
353    *
354    * TODO: make it package private and keep RegionStates in the master package
355    */
356   public RegionStates getRegionStates() {
357     return regionStates;
358   }
359 
360   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
361     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
362   }
363 
364   /**
365    * Add a regionPlan for the specified region.
366    * @param encodedName
367    * @param plan
368    */
369   public void addPlan(String encodedName, RegionPlan plan) {
370     synchronized (regionPlans) {
371       regionPlans.put(encodedName, plan);
372     }
373   }
374 
375   /**
376    * Add a map of region plans.
377    */
378   public void addPlans(Map<String, RegionPlan> plans) {
379     synchronized (regionPlans) {
380       regionPlans.putAll(plans);
381     }
382   }
383 
384   /**
385    * Set the list of regions that will be reopened
386    * because of an update in table schema
387    *
388    * @param regions
389    *          list of regions that should be tracked for reopen
390    */
391   public void setRegionsToReopen(List <HRegionInfo> regions) {
392     for(HRegionInfo hri : regions) {
393       regionsToReopen.put(hri.getEncodedName(), hri);
394     }
395   }
396 
397   /**
398    * Used by the client to identify if all regions have the schema updates
399    *
400    * @param tableName
401    * @return Pair indicating the status of the alter command
402    * @throws IOException
403    */
404   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
405       throws IOException {
406     List <HRegionInfo> hris =
407       MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName, true);
408     Integer pending = 0;
409     for (HRegionInfo hri : hris) {
410       String name = hri.getEncodedName();
411       // no lock concurrent access ok: sequential consistency respected.
412       if (regionsToReopen.containsKey(name)
413           || regionStates.isRegionInTransition(name)) {
414         pending++;
415       }
416     }
417     return new Pair<Integer, Integer>(pending, hris.size());
418   }
419 
420   /**
421    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
422    * the failover cleanup before re-assigning regions of dead servers. So that
423    * when re-assignment happens, AssignmentManager has proper region states.
424    */
425   public boolean isFailoverCleanupDone() {
426     return failoverCleanupDone.get();
427   }
428 
429   /**
430    * To avoid racing with AM, external entities may need to lock a region,
431    * for example, when SSH checks what regions to skip re-assigning.
432    */
433   public Lock acquireRegionLock(final String encodedName) {
434     return locker.acquireLock(encodedName);
435   }
436 
437   /**
438    * Now, failover cleanup is completed. Notify server manager to
439    * process queued up dead servers processing, if any.
440    */
441   void failoverCleanupDone() {
442     failoverCleanupDone.set(true);
443     serverManager.processQueuedDeadServers();
444   }
445 
446   /**
447    * Called on startup.
448    * Figures whether a fresh cluster start of we are joining extant running cluster.
449    * @throws IOException
450    * @throws KeeperException
451    * @throws InterruptedException
452    */
453   void joinCluster() throws IOException,
454       KeeperException, InterruptedException {
455     // Concurrency note: In the below the accesses on regionsInTransition are
456     // outside of a synchronization block where usually all accesses to RIT are
457     // synchronized.  The presumption is that in this case it is safe since this
458     // method is being played by a single thread on startup.
459 
460     // TODO: Regions that have a null location and are not in regionsInTransitions
461     // need to be handled.
462 
463     // Scan hbase:meta to build list of existing regions, servers, and assignment
464     // Returns servers who have not checked in (assumed dead) and their regions
465     Map<ServerName, List<HRegionInfo>> deadServers = rebuildUserRegions();
466 
467     // This method will assign all user regions if a clean server startup or
468     // it will reconstruct master state and cleanup any leftovers from
469     // previous master process.
470     processDeadServersAndRegionsInTransition(deadServers);
471 
472     recoverTableInDisablingState();
473     recoverTableInEnablingState();
474   }
475 
476   /**
477    * Process all regions that are in transition in zookeeper and also
478    * processes the list of dead servers by scanning the META.
479    * Used by master joining an cluster.  If we figure this is a clean cluster
480    * startup, will assign all user regions.
481    * @param deadServers
482    *          Map of dead servers and their regions. Can be null.
483    * @throws KeeperException
484    * @throws IOException
485    * @throws InterruptedException
486    */
487   void processDeadServersAndRegionsInTransition(
488       final Map<ServerName, List<HRegionInfo>> deadServers)
489           throws KeeperException, IOException, InterruptedException {
490     List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
491       watcher.assignmentZNode);
492 
493     if (nodes == null) {
494       String errorMessage = "Failed to get the children from ZK";
495       server.abort(errorMessage, new IOException(errorMessage));
496       return;
497     }
498 
499     boolean failover = (!serverManager.getDeadServers().isEmpty() || !serverManager
500         .getRequeuedDeadServers().isEmpty());
501 
502     if (!failover) {
503       // If any one region except meta is assigned, it's a failover.
504       Map<HRegionInfo, ServerName> regions = regionStates.getRegionAssignments();
505       for (HRegionInfo hri: regions.keySet()) {
506         if (!hri.isMetaTable()) {
507           LOG.debug("Found " + hri + " out on cluster");
508           failover = true;
509           break;
510         }
511       }
512       if (!failover) {
513         // If any one region except meta is in transition, it's a failover.
514         for (String encodedName: nodes) {
515           RegionState state = regionStates.getRegionState(encodedName);
516           if (state != null && !state.getRegion().isMetaRegion()) {
517             LOG.debug("Found " + state.getRegion().getRegionNameAsString() + " in RITs");
518             failover = true;
519             break;
520           }
521         }
522       }
523     }
524 
525     // If we found user regions out on cluster, its a failover.
526     if (failover) {
527       LOG.info("Found regions out on cluster or in RIT; presuming failover");
528       // Process list of dead servers and regions in RIT.
529       // See HBASE-4580 for more information.
530       processDeadServersAndRecoverLostRegions(deadServers);
531     } else {
532       // Fresh cluster startup.
533       LOG.info("Clean cluster startup. Assigning userregions");
534       assignAllUserRegions();
535     }
536   }
537 
538   /**
539    * If region is up in zk in transition, then do fixup and block and wait until
540    * the region is assigned and out of transition.  Used on startup for
541    * catalog regions.
542    * @param hri Region to look for.
543    * @return True if we processed a region in transition else false if region
544    * was not up in zk in transition.
545    * @throws InterruptedException
546    * @throws KeeperException
547    * @throws IOException
548    */
549   boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
550       throws InterruptedException, KeeperException, IOException {
551     String encodedRegionName = hri.getEncodedName();
552     if (!processRegionInTransition(encodedRegionName, hri)) {
553       return false; // The region is not in transition
554     }
555     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
556     while (!this.server.isStopped() &&
557         this.regionStates.isRegionInTransition(encodedRegionName)) {
558       RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
559       if (state == null || !serverManager.isServerOnline(state.getServerName())) {
560         // The region is not in transition, or not in transition on an online
561         // server. Doesn't help to block here any more. Caller need to
562         // verify the region is actually assigned.
563         break;
564       }
565       this.regionStates.waitForUpdate(100);
566     }
567     return true;
568   }
569 
570   /**
571    * Process failover of new master for region <code>encodedRegionName</code>
572    * up in zookeeper.
573    * @param encodedRegionName Region to process failover for.
574    * @param regionInfo If null we'll go get it from meta table.
575    * @return True if we processed <code>regionInfo</code> as a RIT.
576    * @throws KeeperException
577    * @throws IOException
578    */
579   boolean processRegionInTransition(final String encodedRegionName,
580       final HRegionInfo regionInfo) throws KeeperException, IOException {
581     // We need a lock here to ensure that we will not put the same region twice
582     // It has no reason to be a lock shared with the other operations.
583     // We can do the lock on the region only, instead of a global lock: what we want to ensure
584     // is that we don't have two threads working on the same region.
585     Lock lock = locker.acquireLock(encodedRegionName);
586     try {
587       Stat stat = new Stat();
588       byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
589       if (data == null) return false;
590       RegionTransition rt;
591       try {
592         rt = RegionTransition.parseFrom(data);
593       } catch (DeserializationException e) {
594         LOG.warn("Failed parse znode data", e);
595         return false;
596       }
597       HRegionInfo hri = regionInfo;
598       if (hri == null) {
599         // The region info is not passed in. We will try to find the region
600         // from region states map/meta based on the encoded region name. But we
601         // may not be able to find it. This is valid for online merge that
602         // the region may have not been created if the merge is not completed.
603         // Therefore, it is not in meta at master recovery time.
604         hri = regionStates.getRegionInfo(rt.getRegionName());
605         EventType et = rt.getEventType();
606         if (hri == null && et != EventType.RS_ZK_REGION_MERGING
607             && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
608           LOG.warn("Couldn't find the region in recovering " + rt);
609           return false;
610         }
611       }
612       return processRegionsInTransition(
613         rt, hri, stat.getVersion());
614     } finally {
615       lock.unlock();
616     }
617   }
618 
619   /**
620    * This call is invoked only (1) master assign meta;
621    * (2) during failover mode startup, zk assignment node processing.
622    * The locker is set in the caller. It returns true if the region
623    * is in transition for sure, false otherwise.
624    *
625    * It should be private but it is used by some test too.
626    */
627   boolean processRegionsInTransition(
628       final RegionTransition rt, final HRegionInfo regionInfo,
629       final int expectedVersion) throws KeeperException {
630     EventType et = rt.getEventType();
631     // Get ServerName.  Could not be null.
632     final ServerName sn = rt.getServerName();
633     final byte[] regionName = rt.getRegionName();
634     final String encodedName = HRegionInfo.encodeRegionName(regionName);
635     final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
636     LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
637 
638     if (regionStates.isRegionInTransition(encodedName)) {
639       LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
640         + et + ", does nothing since the region is already in transition "
641         + regionStates.getRegionTransitionState(encodedName));
642       // Just return
643       return true;
644     }
645     if (!serverManager.isServerOnline(sn)) {
646       // It was transitioning on a dead server, so it's closed now.
647       // Force to OFFLINE and put it in transition, but not assign it
648       // since log splitting for the dead server is not done yet.
649       LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
650         " was on deadserver; forcing offline");
651       if (regionStates.isRegionOnline(regionInfo)) {
652         // Meta could still show the region is assigned to the previous
653         // server. If that server is online, when we reload the meta, the
654         // region is put back to online, we need to offline it.
655         regionStates.regionOffline(regionInfo);
656         sendRegionClosedNotification(regionInfo);
657       }
658       // Put it back in transition so that SSH can re-assign it
659       regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
660 
661       if (regionInfo.isMetaRegion()) {
662         // If it's meta region, reset the meta location.
663         // So that master knows the right meta region server.
664         MetaRegionTracker.setMetaLocation(watcher, sn);
665       } else {
666         // No matter the previous server is online or offline,
667         // we need to reset the last region server of the region.
668         regionStates.setLastRegionServerOfRegion(sn, encodedName);
669         // Make sure we know the server is dead.
670         if (!serverManager.isServerDead(sn)) {
671           serverManager.expireServer(sn);
672         }
673       }
674       return false;
675     }
676     switch (et) {
677       case M_ZK_REGION_CLOSING:
678         // Insert into RIT & resend the query to the region server: may be the previous master
679         // died before sending the query the first time.
680         final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
681         this.executorService.submit(
682           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
683             @Override
684             public void process() throws IOException {
685               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
686               try {
687                 unassign(regionInfo, rsClosing, expectedVersion, null, true, null);
688                 if (regionStates.isRegionOffline(regionInfo)) {
689                   assign(regionInfo, true);
690                 }
691               } finally {
692                 lock.unlock();
693               }
694             }
695           });
696         break;
697 
698       case RS_ZK_REGION_CLOSED:
699       case RS_ZK_REGION_FAILED_OPEN:
700         // Region is closed, insert into RIT and handle it
701         regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
702         invokeAssign(regionInfo);
703         break;
704 
705       case M_ZK_REGION_OFFLINE:
706         // Insert in RIT and resend to the regionserver
707         regionStates.updateRegionState(rt, State.PENDING_OPEN);
708         final RegionState rsOffline = regionStates.getRegionState(regionInfo);
709         this.executorService.submit(
710           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
711             @Override
712             public void process() throws IOException {
713               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
714               try {
715                 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
716                 addPlan(encodedName, plan);
717                 assign(rsOffline, false, false);
718               } finally {
719                 lock.unlock();
720               }
721             }
722           });
723         break;
724 
725       case RS_ZK_REGION_OPENING:
726         regionStates.updateRegionState(rt, State.OPENING);
727         break;
728 
729       case RS_ZK_REGION_OPENED:
730         // Region is opened, insert into RIT and handle it
731         // This could be done asynchronously, we would need then to acquire the lock in the
732         //  handler.
733         regionStates.updateRegionState(rt, State.OPEN);
734         new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
735         break;
736       case RS_ZK_REQUEST_REGION_SPLIT:
737       case RS_ZK_REGION_SPLITTING:
738       case RS_ZK_REGION_SPLIT:
739         // Splitting region should be online. We could have skipped it during
740         // user region rebuilding since we may consider the split is completed.
741         // Put it in SPLITTING state to avoid complications.
742         regionStates.regionOnline(regionInfo, sn);
743         regionStates.updateRegionState(rt, State.SPLITTING);
744         if (!handleRegionSplitting(
745             rt, encodedName, prettyPrintedRegionName, sn)) {
746           deleteSplittingNode(encodedName, sn);
747         }
748         break;
749       case RS_ZK_REQUEST_REGION_MERGE:
750       case RS_ZK_REGION_MERGING:
751       case RS_ZK_REGION_MERGED:
752         if (!handleRegionMerging(
753             rt, encodedName, prettyPrintedRegionName, sn)) {
754           deleteMergingNode(encodedName, sn);
755         }
756         break;
757       default:
758         throw new IllegalStateException("Received region in state:" + et + " is not valid.");
759     }
760     LOG.info("Processed region " + prettyPrintedRegionName + " in state "
761       + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
762       + "server: " + sn);
763     return true;
764   }
765 
766   /**
767    * When a region is closed, it should be removed from the regionsToReopen
768    * @param hri HRegionInfo of the region which was closed
769    */
770   public void removeClosedRegion(HRegionInfo hri) {
771     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
772       LOG.debug("Removed region from reopening regions because it was closed");
773     }
774   }
775 
776   /**
777    * Handles various states an unassigned node can be in.
778    * <p>
779    * Method is called when a state change is suspected for an unassigned node.
780    * <p>
781    * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
782    * yet).
783    * @param rt
784    * @param expectedVersion
785    */
786   void handleRegion(final RegionTransition rt, int expectedVersion) {
787     if (rt == null) {
788       LOG.warn("Unexpected NULL input for RegionTransition rt");
789       return;
790     }
791     final ServerName sn = rt.getServerName();
792     // Check if this is a special HBCK transition
793     if (sn.equals(HBCK_CODE_SERVERNAME)) {
794       handleHBCK(rt);
795       return;
796     }
797     final long createTime = rt.getCreateTime();
798     final byte[] regionName = rt.getRegionName();
799     String encodedName = HRegionInfo.encodeRegionName(regionName);
800     String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
801     // Verify this is a known server
802     if (!serverManager.isServerOnline(sn)
803       && !ignoreStatesRSOffline.contains(rt.getEventType())) {
804       LOG.warn("Attempted to handle region transition for server but " +
805         "it is not online: " + prettyPrintedRegionName + ", " + rt);
806       return;
807     }
808 
809     RegionState regionState =
810       regionStates.getRegionState(encodedName);
811     long startTime = System.currentTimeMillis();
812     if (LOG.isDebugEnabled()) {
813       boolean lateEvent = createTime < (startTime - 15000);
814       LOG.debug("Handling " + rt.getEventType() +
815         ", server=" + sn + ", region=" +
816         (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
817         (lateEvent ? ", which is more than 15 seconds late" : "") +
818         ", current_state=" + regionState);
819     }
820     // We don't do anything for this event,
821     // so separate it out, no need to lock/unlock anything
822     if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
823       return;
824     }
825 
826     // We need a lock on the region as we could update it
827     Lock lock = locker.acquireLock(encodedName);
828     try {
829       RegionState latestState =
830         regionStates.getRegionState(encodedName);
831       if ((regionState == null && latestState != null)
832           || (regionState != null && latestState == null)
833           || (regionState != null && latestState != null
834             && latestState.getState() != regionState.getState())) {
835         LOG.warn("Region state changed from " + regionState + " to "
836           + latestState + ", while acquiring lock");
837       }
838       long waitedTime = System.currentTimeMillis() - startTime;
839       if (waitedTime > 5000) {
840         LOG.warn("Took " + waitedTime + "ms to acquire the lock");
841       }
842       regionState = latestState;
843       switch (rt.getEventType()) {
844       case RS_ZK_REQUEST_REGION_SPLIT:
845       case RS_ZK_REGION_SPLITTING:
846       case RS_ZK_REGION_SPLIT:
847         if (!handleRegionSplitting(
848             rt, encodedName, prettyPrintedRegionName, sn)) {
849           deleteSplittingNode(encodedName, sn);
850         }
851         break;
852 
853       case RS_ZK_REQUEST_REGION_MERGE:
854       case RS_ZK_REGION_MERGING:
855       case RS_ZK_REGION_MERGED:
856         // Merged region is a new region, we can't find it in the region states now.
857         // However, the two merging regions are not new. They should be in state for merging.
858         if (!handleRegionMerging(
859             rt, encodedName, prettyPrintedRegionName, sn)) {
860           deleteMergingNode(encodedName, sn);
861         }
862         break;
863 
864       case M_ZK_REGION_CLOSING:
865         // Should see CLOSING after we have asked it to CLOSE or additional
866         // times after already being in state of CLOSING
867         if (regionState == null
868             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
869           LOG.warn("Received CLOSING for " + prettyPrintedRegionName
870             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
871             + regionStates.getRegionState(encodedName));
872           return;
873         }
874         // Transition to CLOSING (or update stamp if already CLOSING)
875         regionStates.updateRegionState(rt, State.CLOSING);
876         break;
877 
878       case RS_ZK_REGION_CLOSED:
879         // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
880         if (regionState == null
881             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
882           LOG.warn("Received CLOSED for " + prettyPrintedRegionName
883             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
884             + regionStates.getRegionState(encodedName));
885           return;
886         }
887         // Handle CLOSED by assigning elsewhere or stopping if a disable
888         // If we got here all is good.  Need to update RegionState -- else
889         // what follows will fail because not in expected state.
890         new ClosedRegionHandler(server, this, regionState.getRegion()).process();
891         updateClosedRegionHandlerTracker(regionState.getRegion());
892         break;
893 
894         case RS_ZK_REGION_FAILED_OPEN:
895           if (regionState == null
896               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
897             LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
898               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
899               + regionStates.getRegionState(encodedName));
900             return;
901           }
902           AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
903           if (failedOpenCount == null) {
904             failedOpenCount = new AtomicInteger();
905             // No need to use putIfAbsent, or extra synchronization since
906             // this whole handleRegion block is locked on the encoded region
907             // name, and failedOpenTracker is updated only in this block
908             failedOpenTracker.put(encodedName, failedOpenCount);
909           }
910           if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
911             regionStates.updateRegionState(rt, State.FAILED_OPEN);
912             // remove the tracking info to save memory, also reset
913             // the count for next open initiative
914             failedOpenTracker.remove(encodedName);
915           } else {
916             // Handle this the same as if it were opened and then closed.
917             regionState = regionStates.updateRegionState(rt, State.CLOSED);
918             if (regionState != null) {
919               // When there are more than one region server a new RS is selected as the
920               // destination and the same is updated in the regionplan. (HBASE-5546)
921               try {
922                 getRegionPlan(regionState.getRegion(), sn, true);
923                 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
924               } catch (HBaseIOException e) {
925                 LOG.warn("Failed to get region plan", e);
926               }
927             }
928           }
929           break;
930 
931         case RS_ZK_REGION_OPENING:
932           // Should see OPENING after we have asked it to OPEN or additional
933           // times after already being in state of OPENING
934           if (regionState == null
935               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
936             LOG.warn("Received OPENING for " + prettyPrintedRegionName
937               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
938               + regionStates.getRegionState(encodedName));
939             return;
940           }
941           // Transition to OPENING (or update stamp if already OPENING)
942           regionStates.updateRegionState(rt, State.OPENING);
943           break;
944 
945         case RS_ZK_REGION_OPENED:
946           // Should see OPENED after OPENING but possible after PENDING_OPEN.
947           if (regionState == null
948               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
949             LOG.warn("Received OPENED for " + prettyPrintedRegionName
950               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
951               + regionStates.getRegionState(encodedName));
952 
953             if (regionState != null) {
954               // Close it without updating the internal region states,
955               // so as not to create double assignments in unlucky scenarios
956               // mentioned in OpenRegionHandler#process
957               unassign(regionState.getRegion(), null, -1, null, false, sn);
958             }
959             return;
960           }
961           // Handle OPENED by removing from transition and deleted zk node
962           regionState = regionStates.updateRegionState(rt, State.OPEN);
963           if (regionState != null) {
964             failedOpenTracker.remove(encodedName); // reset the count, if any
965             new OpenedRegionHandler(
966               server, this, regionState.getRegion(), sn, expectedVersion).process();
967             updateOpenedRegionHandlerTracker(regionState.getRegion());
968           }
969           break;
970 
971         default:
972           throw new IllegalStateException("Received event is not valid.");
973       }
974     } finally {
975       lock.unlock();
976     }
977   }
978 
979   //For unit tests only
980   boolean wasClosedHandlerCalled(HRegionInfo hri) {
981     AtomicBoolean b = closedRegionHandlerCalled.get(hri);
982     //compareAndSet to be sure that unit tests don't see stale values. Means,
983     //we will return true exactly once unless the handler code resets to true
984     //this value.
985     return b == null ? false : b.compareAndSet(true, false);
986   }
987 
988   //For unit tests only
989   boolean wasOpenedHandlerCalled(HRegionInfo hri) {
990     AtomicBoolean b = openedRegionHandlerCalled.get(hri);
991     //compareAndSet to be sure that unit tests don't see stale values. Means,
992     //we will return true exactly once unless the handler code resets to true
993     //this value.
994     return b == null ? false : b.compareAndSet(true, false);
995   }
996 
997   //For unit tests only
998   void initializeHandlerTrackers() {
999     closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1000     openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1001   }
1002 
1003   void updateClosedRegionHandlerTracker(HRegionInfo hri) {
1004     if (closedRegionHandlerCalled != null) { //only for unit tests this is true
1005       closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1006     }
1007   }
1008 
1009   void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
1010     if (openedRegionHandlerCalled != null) { //only for unit tests this is true
1011       openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1012     }
1013   }
1014 
1015   // TODO: processFavoredNodes might throw an exception, for e.g., if the
1016   // meta could not be contacted/updated. We need to see how seriously to treat
1017   // this problem as. Should we fail the current assignment. We should be able
1018   // to recover from this problem eventually (if the meta couldn't be updated
1019   // things should work normally and eventually get fixed up).
1020   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1021     if (!shouldAssignRegionsWithFavoredNodes) return;
1022     // The AM gets the favored nodes info for each region and updates the meta
1023     // table with that info
1024     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1025         new HashMap<HRegionInfo, List<ServerName>>();
1026     for (HRegionInfo region : regions) {
1027       regionToFavoredNodes.put(region,
1028           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1029     }
1030     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes, catalogTracker);
1031   }
1032 
1033   /**
1034    * Handle a ZK unassigned node transition triggered by HBCK repair tool.
1035    * <p>
1036    * This is handled in a separate code path because it breaks the normal rules.
1037    * @param rt
1038    */
1039   private void handleHBCK(RegionTransition rt) {
1040     String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1041     LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1042       ", server=" + rt.getServerName() + ", region=" +
1043       HRegionInfo.prettyPrint(encodedName));
1044     RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1045     switch (rt.getEventType()) {
1046       case M_ZK_REGION_OFFLINE:
1047         HRegionInfo regionInfo;
1048         if (regionState != null) {
1049           regionInfo = regionState.getRegion();
1050         } else {
1051           try {
1052             byte [] name = rt.getRegionName();
1053             Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(catalogTracker, name);
1054             regionInfo = p.getFirst();
1055           } catch (IOException e) {
1056             LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1057             return;
1058           }
1059         }
1060         LOG.info("HBCK repair is triggering assignment of region=" +
1061             regionInfo.getRegionNameAsString());
1062         // trigger assign, node is already in OFFLINE so don't need to update ZK
1063         assign(regionInfo, false);
1064         break;
1065 
1066       default:
1067         LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1068         break;
1069     }
1070 
1071   }
1072 
1073   // ZooKeeper events
1074 
1075   /**
1076    * New unassigned node has been created.
1077    *
1078    * <p>This happens when an RS begins the OPENING or CLOSING of a region by
1079    * creating an unassigned node.
1080    *
1081    * <p>When this happens we must:
1082    * <ol>
1083    *   <li>Watch the node for further events</li>
1084    *   <li>Read and handle the state in the node</li>
1085    * </ol>
1086    */
1087   @Override
1088   public void nodeCreated(String path) {
1089     handleAssignmentEvent(path);
1090   }
1091 
1092   /**
1093    * Existing unassigned node has had data changed.
1094    *
1095    * <p>This happens when an RS transitions from OFFLINE to OPENING, or between
1096    * OPENING/OPENED and CLOSING/CLOSED.
1097    *
1098    * <p>When this happens we must:
1099    * <ol>
1100    *   <li>Watch the node for further events</li>
1101    *   <li>Read and handle the state in the node</li>
1102    * </ol>
1103    */
1104   @Override
1105   public void nodeDataChanged(String path) {
1106     handleAssignmentEvent(path);
1107   }
1108 
1109 
1110   // We  don't want to have two events on the same region managed simultaneously.
1111   // For this reason, we need to wait if an event on the same region is currently in progress.
1112   // So we track the region names of the events in progress, and we keep a waiting list.
1113   private final Set<String> regionsInProgress = new HashSet<String>();
1114   // In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need
1115   //  this as we want the events to be managed in the same order as we received them.
1116   private final LinkedHashMultimap <String, RegionRunnable>
1117       zkEventWorkerWaitingList = LinkedHashMultimap.create();
1118 
1119   /**
1120    * A specific runnable that works only on a region.
1121    */
1122   private interface RegionRunnable extends Runnable{
1123     /**
1124      * @return - the name of the region it works on.
1125      */
1126     String getRegionName();
1127   }
1128 
1129   /**
1130    * Submit a task, ensuring that there is only one task at a time that working on a given region.
1131    * Order is respected.
1132    */
1133   protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1134 
1135     synchronized (regionsInProgress) {
1136       // If we're there is already a task with this region, we add it to the
1137       //  waiting list and return.
1138       if (regionsInProgress.contains(regRunnable.getRegionName())) {
1139         synchronized (zkEventWorkerWaitingList){
1140           zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1141         }
1142         return;
1143       }
1144 
1145       // No event in progress on this region => we can submit a new task immediately.
1146       regionsInProgress.add(regRunnable.getRegionName());
1147       zkEventWorkers.submit(new Runnable() {
1148         @Override
1149         public void run() {
1150           try {
1151             regRunnable.run();
1152           } finally {
1153             // now that we have finished, let's see if there is an event for the same region in the
1154             //  waiting list. If it's the case, we can now submit it to the pool.
1155             synchronized (regionsInProgress) {
1156               regionsInProgress.remove(regRunnable.getRegionName());
1157               synchronized (zkEventWorkerWaitingList) {
1158                 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1159                     regRunnable.getRegionName());
1160                 if (!waiting.isEmpty()) {
1161                   // We want the first object only. The only way to get it is through an iterator.
1162                   RegionRunnable toSubmit = waiting.iterator().next();
1163                   zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1164                   zkEventWorkersSubmit(toSubmit);
1165                 }
1166               }
1167             }
1168           }
1169         }
1170       });
1171     }
1172   }
1173 
1174   @Override
1175   public void nodeDeleted(final String path) {
1176     if (path.startsWith(watcher.assignmentZNode)) {
1177       final String regionName = ZKAssign.getRegionName(watcher, path);
1178       zkEventWorkersSubmit(new RegionRunnable() {
1179         @Override
1180         public String getRegionName() {
1181           return regionName;
1182         }
1183 
1184         @Override
1185         public void run() {
1186           Lock lock = locker.acquireLock(regionName);
1187           try {
1188             RegionState rs = regionStates.getRegionTransitionState(regionName);
1189             if (rs == null) {
1190               rs = regionStates.getRegionState(regionName);
1191               if (rs == null || !rs.isMergingNew()) {
1192                 // MergingNew is an offline state
1193                 return;
1194               }
1195             }
1196 
1197             HRegionInfo regionInfo = rs.getRegion();
1198             String regionNameStr = regionInfo.getRegionNameAsString();
1199             LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1200             boolean disabled = getZKTable().isDisablingOrDisabledTable(regionInfo.getTable());
1201             ServerName serverName = rs.getServerName();
1202             if (serverManager.isServerOnline(serverName)) {
1203               if (rs.isOnServer(serverName)
1204                   && (rs.isOpened() || rs.isSplitting())) {
1205                 regionOnline(regionInfo, serverName);
1206                 if (disabled) {
1207                   // if server is offline, no hurt to unassign again
1208                   LOG.info("Opened " + regionNameStr
1209                     + "but this table is disabled, triggering close of region");
1210                   unassign(regionInfo);
1211                 }
1212               } else if (rs.isMergingNew()) {
1213                 synchronized (regionStates) {
1214                   String p = regionInfo.getEncodedName();
1215                   PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1216                   if (regions != null) {
1217                     onlineMergingRegion(disabled, regions.getFirst(), serverName);
1218                     onlineMergingRegion(disabled, regions.getSecond(), serverName);
1219                   }
1220                 }
1221               }
1222             }
1223           } finally {
1224             lock.unlock();
1225           }
1226         }
1227 
1228         private void onlineMergingRegion(boolean disabled,
1229             final HRegionInfo hri, final ServerName serverName) {
1230           RegionState regionState = regionStates.getRegionState(hri);
1231           if (regionState != null && regionState.isMerging()
1232               && regionState.isOnServer(serverName)) {
1233             regionOnline(regionState.getRegion(), serverName);
1234             if (disabled) {
1235               unassign(hri);
1236             }
1237           }
1238         }
1239       });
1240     }
1241   }
1242 
1243   /**
1244    * New unassigned node has been created.
1245    *
1246    * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a
1247    * region by creating a znode.
1248    *
1249    * <p>When this happens we must:
1250    * <ol>
1251    *   <li>Watch the node for further children changed events</li>
1252    *   <li>Watch all new children for changed events</li>
1253    * </ol>
1254    */
1255   @Override
1256   public void nodeChildrenChanged(String path) {
1257     if (path.equals(watcher.assignmentZNode)) {
1258       zkEventWorkers.submit(new Runnable() {
1259         @Override
1260         public void run() {
1261           try {
1262             // Just make sure we see the changes for the new znodes
1263             List<String> children =
1264               ZKUtil.listChildrenAndWatchForNewChildren(
1265                 watcher, watcher.assignmentZNode);
1266             if (children != null) {
1267               Stat stat = new Stat();
1268               for (String child : children) {
1269                 // if region is in transition, we already have a watch
1270                 // on it, so no need to watch it again. So, as I know for now,
1271                 // this is needed to watch splitting nodes only.
1272                 if (!regionStates.isRegionInTransition(child)) {
1273                   ZKAssign.getDataAndWatch(watcher, child, stat);
1274                 }
1275               }
1276             }
1277           } catch (KeeperException e) {
1278             server.abort("Unexpected ZK exception reading unassigned children", e);
1279           }
1280         }
1281       });
1282     }
1283   }
1284 
1285   /**
1286    * Marks the region as online.  Removes it from regions in transition and
1287    * updates the in-memory assignment information.
1288    * <p>
1289    * Used when a region has been successfully opened on a region server.
1290    * @param regionInfo
1291    * @param sn
1292    */
1293   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1294     numRegionsOpened.incrementAndGet();
1295     regionStates.regionOnline(regionInfo, sn);
1296 
1297     // Remove plan if one.
1298     clearRegionPlan(regionInfo);
1299     // Add the server to serversInUpdatingTimer
1300     addToServersInUpdatingTimer(sn);
1301     balancer.regionOnline(regionInfo, sn);
1302 
1303     // Tell our listeners that a region was opened
1304     sendRegionOpenedNotification(regionInfo, sn);
1305   }
1306 
1307   /**
1308    * Pass the assignment event to a worker for processing.
1309    * Each worker is a single thread executor service.  The reason
1310    * for just one thread is to make sure all events for a given
1311    * region are processed in order.
1312    *
1313    * @param path
1314    */
1315   private void handleAssignmentEvent(final String path) {
1316     if (path.startsWith(watcher.assignmentZNode)) {
1317       final String regionName = ZKAssign.getRegionName(watcher, path);
1318 
1319       zkEventWorkersSubmit(new RegionRunnable() {
1320         @Override
1321         public String getRegionName() {
1322           return regionName;
1323         }
1324 
1325         @Override
1326         public void run() {
1327           try {
1328             Stat stat = new Stat();
1329             byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1330             if (data == null) return;
1331 
1332             RegionTransition rt = RegionTransition.parseFrom(data);
1333             handleRegion(rt, stat.getVersion());
1334           } catch (KeeperException e) {
1335             server.abort("Unexpected ZK exception reading unassigned node data", e);
1336           } catch (DeserializationException e) {
1337             server.abort("Unexpected exception deserializing node data", e);
1338           }
1339         }
1340       });
1341     }
1342   }
1343 
1344   /**
1345    * Add the server to the set serversInUpdatingTimer, then {@link TimerUpdater}
1346    * will update timers for this server in background
1347    * @param sn
1348    */
1349   private void addToServersInUpdatingTimer(final ServerName sn) {
1350     if (tomActivated){
1351       this.serversInUpdatingTimer.add(sn);
1352     }
1353   }
1354 
1355   /**
1356    * Touch timers for all regions in transition that have the passed
1357    * <code>sn</code> in common.
1358    * Call this method whenever a server checks in.  Doing so helps the case where
1359    * a new regionserver has joined the cluster and its been given 1k regions to
1360    * open.  If this method is tickled every time the region reports in a
1361    * successful open then the 1k-th region won't be timed out just because its
1362    * sitting behind the open of 999 other regions.  This method is NOT used
1363    * as part of bulk assign -- there we have a different mechanism for extending
1364    * the regions in transition timer (we turn it off temporarily -- because
1365    * there is no regionplan involved when bulk assigning.
1366    * @param sn
1367    */
1368   private void updateTimers(final ServerName sn) {
1369     Preconditions.checkState(tomActivated);
1370     if (sn == null) return;
1371 
1372     // This loop could be expensive.
1373     // First make a copy of current regionPlan rather than hold sync while
1374     // looping because holding sync can cause deadlock.  Its ok in this loop
1375     // if the Map we're going against is a little stale
1376     List<Map.Entry<String, RegionPlan>> rps;
1377     synchronized(this.regionPlans) {
1378       rps = new ArrayList<Map.Entry<String, RegionPlan>>(regionPlans.entrySet());
1379     }
1380 
1381     for (Map.Entry<String, RegionPlan> e : rps) {
1382       if (e.getValue() != null && e.getKey() != null && sn.equals(e.getValue().getDestination())) {
1383         RegionState regionState = regionStates.getRegionTransitionState(e.getKey());
1384         if (regionState != null) {
1385           regionState.updateTimestampToNow();
1386         }
1387       }
1388     }
1389   }
1390 
1391   /**
1392    * Marks the region as offline.  Removes it from regions in transition and
1393    * removes in-memory assignment information.
1394    * <p>
1395    * Used when a region has been closed and should remain closed.
1396    * @param regionInfo
1397    */
1398   public void regionOffline(final HRegionInfo regionInfo) {
1399     regionOffline(regionInfo, null);
1400   }
1401 
1402   public void offlineDisabledRegion(HRegionInfo regionInfo) {
1403     // Disabling so should not be reassigned, just delete the CLOSED node
1404     LOG.debug("Table being disabled so deleting ZK node and removing from " +
1405       "regions in transition, skipping assignment of region " +
1406         regionInfo.getRegionNameAsString());
1407     String encodedName = regionInfo.getEncodedName();
1408     deleteNodeInStates(encodedName, "closed", null,
1409       EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1410     regionOffline(regionInfo);
1411   }
1412 
1413   // Assignment methods
1414 
1415   /**
1416    * Assigns the specified region.
1417    * <p>
1418    * If a RegionPlan is available with a valid destination then it will be used
1419    * to determine what server region is assigned to.  If no RegionPlan is
1420    * available, region will be assigned to a random available server.
1421    * <p>
1422    * Updates the RegionState and sends the OPEN RPC.
1423    * <p>
1424    * This will only succeed if the region is in transition and in a CLOSED or
1425    * OFFLINE state or not in transition (in-memory not zk), and of course, the
1426    * chosen server is up and running (It may have just crashed!).  If the
1427    * in-memory checks pass, the zk node is forced to OFFLINE before assigning.
1428    *
1429    * @param region server to be assigned
1430    * @param setOfflineInZK whether ZK node should be created/transitioned to an
1431    *                       OFFLINE state before assigning the region
1432    */
1433   public void assign(HRegionInfo region, boolean setOfflineInZK) {
1434     assign(region, setOfflineInZK, false);
1435   }
1436 
1437   /**
1438    * Use care with forceNewPlan. It could cause double assignment.
1439    */
1440   public void assign(HRegionInfo region,
1441       boolean setOfflineInZK, boolean forceNewPlan) {
1442     if (isDisabledorDisablingRegionInRIT(region)) {
1443       return;
1444     }
1445     if (this.serverManager.isClusterShutdown()) {
1446       LOG.info("Cluster shutdown is set; skipping assign of " +
1447         region.getRegionNameAsString());
1448       return;
1449     }
1450     String encodedName = region.getEncodedName();
1451     Lock lock = locker.acquireLock(encodedName);
1452     try {
1453       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1454       if (state != null) {
1455         if (regionStates.wasRegionOnDeadServer(encodedName)) {
1456           LOG.info("Skip assigning " + region.getRegionNameAsString()
1457             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1458             + " is dead but not processed yet");
1459           return;
1460         }
1461         assign(state, setOfflineInZK, forceNewPlan);
1462       }
1463     } finally {
1464       lock.unlock();
1465     }
1466   }
1467 
1468   /**
1469    * Bulk assign regions to <code>destination</code>.
1470    * @param destination
1471    * @param regions Regions to assign.
1472    * @return true if successful
1473    */
1474   boolean assign(final ServerName destination, final List<HRegionInfo> regions) {
1475     long startTime = EnvironmentEdgeManager.currentTimeMillis();
1476     try {
1477       int regionCount = regions.size();
1478       if (regionCount == 0) {
1479         return true;
1480       }
1481       LOG.debug("Assigning " + regionCount + " region(s) to " + destination.toString());
1482       Set<String> encodedNames = new HashSet<String>(regionCount);
1483       for (HRegionInfo region : regions) {
1484         encodedNames.add(region.getEncodedName());
1485       }
1486 
1487       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1488       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1489       try {
1490         AtomicInteger counter = new AtomicInteger(0);
1491         Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1492         OfflineCallback cb = new OfflineCallback(
1493           watcher, destination, counter, offlineNodesVersions);
1494         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1495         List<RegionState> states = new ArrayList<RegionState>(regions.size());
1496         for (HRegionInfo region : regions) {
1497           String encodedName = region.getEncodedName();
1498           if (!isDisabledorDisablingRegionInRIT(region)) {
1499             RegionState state = forceRegionStateToOffline(region, false);
1500             boolean onDeadServer = false;
1501             if (state != null) {
1502               if (regionStates.wasRegionOnDeadServer(encodedName)) {
1503                 LOG.info("Skip assigning " + region.getRegionNameAsString()
1504                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1505                   + " is dead but not processed yet");
1506                 onDeadServer = true;
1507               } else if (asyncSetOfflineInZooKeeper(state, cb, destination)) {
1508                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1509                 plans.put(encodedName, plan);
1510                 states.add(state);
1511                 continue;
1512               }
1513             }
1514             // Reassign if the region wasn't on a dead server
1515             if (!onDeadServer) {
1516               LOG.info("failed to force region state to offline or "
1517                 + "failed to set it offline in ZK, will reassign later: " + region);
1518               failedToOpenRegions.add(region); // assign individually later
1519             }
1520           }
1521           // Release the lock, this region is excluded from bulk assign because
1522           // we can't update its state, or set its znode to offline.
1523           Lock lock = locks.remove(encodedName);
1524           lock.unlock();
1525         }
1526 
1527         // Wait until all unassigned nodes have been put up and watchers set.
1528         int total = states.size();
1529         for (int oldCounter = 0; !server.isStopped();) {
1530           int count = counter.get();
1531           if (oldCounter != count) {
1532             LOG.info(destination.toString() + " unassigned znodes=" + count +
1533               " of total=" + total);
1534             oldCounter = count;
1535           }
1536           if (count >= total) break;
1537           Threads.sleep(5);
1538         }
1539 
1540         if (server.isStopped()) {
1541           return false;
1542         }
1543 
1544         // Add region plans, so we can updateTimers when one region is opened so
1545         // that unnecessary timeout on RIT is reduced.
1546         this.addPlans(plans);
1547 
1548         List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1549           new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1550         for (RegionState state: states) {
1551           HRegionInfo region = state.getRegion();
1552           String encodedRegionName = region.getEncodedName();
1553           Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1554           if (nodeVersion == null || nodeVersion == -1) {
1555             LOG.warn("failed to offline in zookeeper: " + region);
1556             failedToOpenRegions.add(region); // assign individually later
1557             Lock lock = locks.remove(encodedRegionName);
1558             lock.unlock();
1559           } else {
1560             regionStates.updateRegionState(
1561               region, State.PENDING_OPEN, destination);
1562             List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1563             if (this.shouldAssignRegionsWithFavoredNodes) {
1564               favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1565             }
1566             regionOpenInfos.add(new Triple<HRegionInfo, Integer,  List<ServerName>>(
1567               region, nodeVersion, favoredNodes));
1568           }
1569         }
1570 
1571         // Move on to open regions.
1572         try {
1573           // Send OPEN RPC. If it fails on a IOE or RemoteException,
1574           // regions will be assigned individually.
1575           long maxWaitTime = System.currentTimeMillis() +
1576             this.server.getConfiguration().
1577               getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1578           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1579             try {
1580               List<RegionOpeningState> regionOpeningStateList = serverManager
1581                 .sendRegionOpen(destination, regionOpenInfos);
1582               if (regionOpeningStateList == null) {
1583                 // Failed getting RPC connection to this server
1584                 return false;
1585               }
1586               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1587                 RegionOpeningState openingState = regionOpeningStateList.get(k);
1588                 if (openingState != RegionOpeningState.OPENED) {
1589                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
1590                   if (openingState == RegionOpeningState.ALREADY_OPENED) {
1591                     processAlreadyOpenedRegion(region, destination);
1592                   } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1593                     // Failed opening this region, reassign it later
1594                     failedToOpenRegions.add(region);
1595                   } else {
1596                     LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1597                       + openingState + " in assigning region " + region);
1598                   }
1599                 }
1600               }
1601               break;
1602             } catch (IOException e) {
1603               if (e instanceof RemoteException) {
1604                 e = ((RemoteException)e).unwrapRemoteException();
1605               }
1606               if (e instanceof RegionServerStoppedException) {
1607                 LOG.warn("The region server was shut down, ", e);
1608                 // No need to retry, the region server is a goner.
1609                 return false;
1610               } else if (e instanceof ServerNotRunningYetException) {
1611                 long now = System.currentTimeMillis();
1612                 if (now < maxWaitTime) {
1613                   LOG.debug("Server is not yet up; waiting up to " +
1614                     (maxWaitTime - now) + "ms", e);
1615                   Thread.sleep(100);
1616                   i--; // reset the try count
1617                   continue;
1618                 }
1619               } else if (e instanceof java.net.SocketTimeoutException
1620                   && this.serverManager.isServerOnline(destination)) {
1621                 // In case socket is timed out and the region server is still online,
1622                 // the openRegion RPC could have been accepted by the server and
1623                 // just the response didn't go through.  So we will retry to
1624                 // open the region on the same server.
1625                 if (LOG.isDebugEnabled()) {
1626                   LOG.debug("Bulk assigner openRegion() to " + destination
1627                     + " has timed out, but the regions might"
1628                     + " already be opened on it.", e);
1629                 }
1630                 // wait and reset the re-try count, server might be just busy.
1631                 Thread.sleep(100);
1632                 i--;
1633                 continue;
1634               }
1635               throw e;
1636             }
1637           }
1638         } catch (IOException e) {
1639           // Can be a socket timeout, EOF, NoRouteToHost, etc
1640           LOG.info("Unable to communicate with " + destination
1641             + " in order to assign regions, ", e);
1642           return false;
1643         } catch (InterruptedException e) {
1644           throw new RuntimeException(e);
1645         }
1646       } finally {
1647         for (Lock lock : locks.values()) {
1648           lock.unlock();
1649         }
1650       }
1651 
1652       if (!failedToOpenRegions.isEmpty()) {
1653         for (HRegionInfo region : failedToOpenRegions) {
1654           if (!regionStates.isRegionOnline(region)) {
1655             invokeAssign(region);
1656           }
1657         }
1658       }
1659       LOG.debug("Bulk assigning done for " + destination);
1660       return true;
1661     } finally {
1662       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
1663     }
1664   }
1665 
1666   /**
1667    * Send CLOSE RPC if the server is online, otherwise, offline the region.
1668    *
1669    * The RPC will be sent only to the region sever found in the region state
1670    * if it is passed in, otherwise, to the src server specified. If region
1671    * state is not specified, we don't update region state at all, instead
1672    * we just send the RPC call. This is useful for some cleanup without
1673    * messing around the region states (see handleRegion, on region opened
1674    * on an unexpected server scenario, for an example)
1675    */
1676   private void unassign(final HRegionInfo region,
1677       final RegionState state, final int versionOfClosingNode,
1678       final ServerName dest, final boolean transitionInZK,
1679       final ServerName src) {
1680     ServerName server = src;
1681     if (state != null) {
1682       server = state.getServerName();
1683     }
1684     long maxWaitTime = -1;
1685     for (int i = 1; i <= this.maximumAttempts; i++) {
1686       if (this.server.isStopped() || this.server.isAborted()) {
1687         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1688         return;
1689       }
1690       // ClosedRegionhandler can remove the server from this.regions
1691       if (!serverManager.isServerOnline(server)) {
1692         LOG.debug("Offline " + region.getRegionNameAsString()
1693           + ", no need to unassign since it's on a dead server: " + server);
1694         if (transitionInZK) {
1695           // delete the node. if no node exists need not bother.
1696           deleteClosingOrClosedNode(region, server);
1697         }
1698         if (state != null) {
1699           regionOffline(region);
1700         }
1701         return;
1702       }
1703       try {
1704         // Send CLOSE RPC
1705         if (serverManager.sendRegionClose(server, region,
1706           versionOfClosingNode, dest, transitionInZK)) {
1707           LOG.debug("Sent CLOSE to " + server + " for region " +
1708             region.getRegionNameAsString());
1709           if (!transitionInZK && state != null) {
1710             // Retry to make sure the region is
1711             // closed so as to avoid double assignment.
1712             unassign(region, state, versionOfClosingNode,
1713               dest, transitionInZK,src);
1714           }
1715           return;
1716         }
1717         // This never happens. Currently regionserver close always return true.
1718         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
1719         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1720           region.getRegionNameAsString());
1721       } catch (Throwable t) {
1722         if (t instanceof RemoteException) {
1723           t = ((RemoteException)t).unwrapRemoteException();
1724         }
1725         boolean logRetries = true;
1726         if (t instanceof NotServingRegionException
1727             || t instanceof RegionServerStoppedException
1728             || t instanceof ServerNotRunningYetException) {
1729           LOG.debug("Offline " + region.getRegionNameAsString()
1730             + ", it's not any more on " + server, t);
1731           if (transitionInZK) {
1732             deleteClosingOrClosedNode(region, server);
1733           }
1734           if (state != null) {
1735             regionOffline(region);
1736           }
1737           return;
1738         } else if ((t instanceof FailedServerException) || (state != null &&
1739             t instanceof RegionAlreadyInTransitionException)) {
1740           long sleepTime = 0;
1741           Configuration conf = this.server.getConfiguration();
1742           if(t instanceof FailedServerException) {
1743             sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1744                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1745           } else {
1746             // RS is already processing this region, only need to update the timestamp
1747             LOG.debug("update " + state + " the timestamp.");
1748             state.updateTimestampToNow();
1749             if (maxWaitTime < 0) {
1750               maxWaitTime =
1751                   EnvironmentEdgeManager.currentTimeMillis()
1752                       + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
1753                         DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1754             }
1755             long now = EnvironmentEdgeManager.currentTimeMillis();
1756             if (now < maxWaitTime) {
1757               LOG.debug("Region is already in transition; "
1758                 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1759               sleepTime = 100;
1760               i--; // reset the try count
1761               logRetries = false;
1762             }
1763           }
1764           try {
1765             if (sleepTime > 0) {
1766               Thread.sleep(sleepTime);
1767             }
1768           } catch (InterruptedException ie) {
1769             LOG.warn("Failed to unassign "
1770               + region.getRegionNameAsString() + " since interrupted", ie);
1771             Thread.currentThread().interrupt();
1772             if (!tomActivated && state != null) {
1773               regionStates.updateRegionState(region, State.FAILED_CLOSE);
1774             }
1775             return;
1776           }
1777         }
1778 
1779         if (logRetries) {
1780           LOG.info("Server " + server + " returned " + t + " for "
1781             + region.getRegionNameAsString() + ", try=" + i
1782             + " of " + this.maximumAttempts, t);
1783           // Presume retry or server will expire.
1784         }
1785       }
1786     }
1787     // Run out of attempts
1788     if (!tomActivated && state != null) {
1789       regionStates.updateRegionState(region, State.FAILED_CLOSE);
1790     }
1791   }
1792 
1793   /**
1794    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
1795    */
1796   private RegionState forceRegionStateToOffline(
1797       final HRegionInfo region, final boolean forceNewPlan) {
1798     RegionState state = regionStates.getRegionState(region);
1799     if (state == null) {
1800       LOG.warn("Assigning a region not in region states: " + region);
1801       state = regionStates.createRegionState(region);
1802     }
1803 
1804     ServerName sn = state.getServerName();
1805     if (forceNewPlan && LOG.isDebugEnabled()) {
1806       LOG.debug("Force region state offline " + state);
1807     }
1808 
1809     switch (state.getState()) {
1810     case OPEN:
1811     case OPENING:
1812     case PENDING_OPEN:
1813     case CLOSING:
1814     case PENDING_CLOSE:
1815       if (!forceNewPlan) {
1816         LOG.debug("Skip assigning " +
1817           region + ", it is already " + state);
1818         return null;
1819       }
1820     case FAILED_CLOSE:
1821     case FAILED_OPEN:
1822       unassign(region, state, -1, null, false, null);
1823       state = regionStates.getRegionState(region);
1824       if (state.isFailedClose()) {
1825         // If we can't close the region, we can't re-assign
1826         // it so as to avoid possible double assignment/data loss.
1827         LOG.info("Skip assigning " +
1828           region + ", we couldn't close it: " + state);
1829         return null;
1830       }
1831     case OFFLINE:
1832       // This region could have been open on this server
1833       // for a while. If the server is dead and not processed
1834       // yet, we can move on only if the meta shows the
1835       // region is not on this server actually, or on a server
1836       // not dead, or dead and processed already.
1837       if (regionStates.isServerDeadAndNotProcessed(sn)
1838           && wasRegionOnDeadServerByMeta(region, sn)) {
1839         LOG.info("Skip assigning " + region.getRegionNameAsString()
1840           + ", it is on a dead but not processed yet server");
1841         return null;
1842       }
1843     case CLOSED:
1844       break;
1845     default:
1846       LOG.error("Trying to assign region " + region
1847         + ", which is " + state);
1848       return null;
1849     }
1850     return state;
1851   }
1852 
1853   private boolean wasRegionOnDeadServerByMeta(
1854       final HRegionInfo region, final ServerName sn) {
1855     try {
1856       if (region.isMetaRegion()) {
1857         ServerName server = catalogTracker.getMetaLocation();
1858         return regionStates.isServerDeadAndNotProcessed(server);
1859       }
1860       while (!server.isStopped()) {
1861         try {
1862           catalogTracker.waitForMeta();
1863           Pair<HRegionInfo, ServerName> r =
1864             MetaReader.getRegion(catalogTracker, region.getRegionName());
1865           ServerName server = r == null ? null : r.getSecond();
1866           return regionStates.isServerDeadAndNotProcessed(server);
1867         } catch (IOException ioe) {
1868           LOG.info("Received exception accessing hbase:meta during force assign "
1869             + region.getRegionNameAsString() + ", retrying", ioe);
1870         }
1871       }
1872     } catch (InterruptedException e) {
1873       Thread.currentThread().interrupt();
1874       LOG.info("Interrupted accessing hbase:meta", e);
1875     }
1876     // Call is interrupted or server is stopped.
1877     return regionStates.isServerDeadAndNotProcessed(sn);
1878   }
1879 
1880   /**
1881    * Caller must hold lock on the passed <code>state</code> object.
1882    * @param state
1883    * @param setOfflineInZK
1884    * @param forceNewPlan
1885    */
1886   private void assign(RegionState state,
1887       final boolean setOfflineInZK, final boolean forceNewPlan) {
1888     long startTime = EnvironmentEdgeManager.currentTimeMillis();
1889     try {
1890       Configuration conf = server.getConfiguration();
1891       RegionState currentState = state;
1892       int versionOfOfflineNode = -1;
1893       RegionPlan plan = null;
1894       long maxWaitTime = -1;
1895       HRegionInfo region = state.getRegion();
1896       RegionOpeningState regionOpenState;
1897       Throwable previousException = null;
1898       for (int i = 1; i <= maximumAttempts; i++) {
1899         if (server.isStopped() || server.isAborted()) {
1900           LOG.info("Skip assigning " + region.getRegionNameAsString()
1901             + ", the server is stopped/aborted");
1902           return;
1903         }
1904         if (plan == null) { // Get a server for the region at first
1905           try {
1906             plan = getRegionPlan(region, forceNewPlan);
1907           } catch (HBaseIOException e) {
1908             LOG.warn("Failed to get region plan", e);
1909           }
1910         }
1911         if (plan == null) {
1912           LOG.warn("Unable to determine a plan to assign " + region);
1913           if (tomActivated){
1914             this.timeoutMonitor.setAllRegionServersOffline(true);
1915           } else {
1916             if (region.isMetaRegion()) {
1917               try {
1918                 Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
1919                 if (i == maximumAttempts) i = 1;
1920                 continue;
1921               } catch (InterruptedException e) {
1922                 LOG.error("Got exception while waiting for hbase:meta assignment");
1923                 Thread.currentThread().interrupt();
1924               }
1925             }
1926             regionStates.updateRegionState(region, State.FAILED_OPEN);
1927           }
1928           return;
1929         }
1930         if (setOfflineInZK && versionOfOfflineNode == -1) {
1931           // get the version of the znode after setting it to OFFLINE.
1932           // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
1933           versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
1934           if (versionOfOfflineNode != -1) {
1935             if (isDisabledorDisablingRegionInRIT(region)) {
1936               return;
1937             }
1938             // In case of assignment from EnableTableHandler table state is ENABLING. Any how
1939             // EnableTableHandler will set ENABLED after assigning all the table regions. If we
1940             // try to set to ENABLED directly then client API may think table is enabled.
1941             // When we have a case such as all the regions are added directly into hbase:meta and we call
1942             // assignRegion then we need to make the table ENABLED. Hence in such case the table
1943             // will not be in ENABLING or ENABLED state.
1944             TableName tableName = region.getTable();
1945             if (!zkTable.isEnablingTable(tableName) && !zkTable.isEnabledTable(tableName)) {
1946               LOG.debug("Setting table " + tableName + " to ENABLED state.");
1947               setEnabledTable(tableName);
1948             }
1949           }
1950         }
1951         if (setOfflineInZK && versionOfOfflineNode == -1) {
1952           LOG.info("Unable to set offline in ZooKeeper to assign " + region);
1953           // Setting offline in ZK must have been failed due to ZK racing or some
1954           // exception which may make the server to abort. If it is ZK racing,
1955           // we should retry since we already reset the region state,
1956           // existing (re)assignment will fail anyway.
1957           if (!server.isAborted()) {
1958             continue;
1959           }
1960         }
1961         LOG.info("Assigning " + region.getRegionNameAsString() +
1962             " to " + plan.getDestination().toString());
1963         // Transition RegionState to PENDING_OPEN
1964         currentState = regionStates.updateRegionState(region,
1965           State.PENDING_OPEN, plan.getDestination());
1966 
1967         boolean needNewPlan;
1968         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
1969             " to " + plan.getDestination();
1970         try {
1971           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1972           if (this.shouldAssignRegionsWithFavoredNodes) {
1973             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1974           }
1975           regionOpenState = serverManager.sendRegionOpen(
1976               plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
1977 
1978           if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
1979             // Failed opening this region, looping again on a new server.
1980             needNewPlan = true;
1981             LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
1982                 " trying to assign elsewhere instead; " +
1983                 "try=" + i + " of " + this.maximumAttempts);
1984           } else {
1985             // we're done
1986             if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
1987               processAlreadyOpenedRegion(region, plan.getDestination());
1988             }
1989             return;
1990           }
1991 
1992         } catch (Throwable t) {
1993           if (t instanceof RemoteException) {
1994             t = ((RemoteException) t).unwrapRemoteException();
1995           }
1996           previousException = t;
1997 
1998           // Should we wait a little before retrying? If the server is starting it's yes.
1999           // If the region is already in transition, it's yes as well: we want to be sure that
2000           //  the region will get opened but we don't want a double assignment.
2001           boolean hold = (t instanceof RegionAlreadyInTransitionException ||
2002               t instanceof ServerNotRunningYetException);
2003 
2004           // In case socket is timed out and the region server is still online,
2005           // the openRegion RPC could have been accepted by the server and
2006           // just the response didn't go through.  So we will retry to
2007           // open the region on the same server to avoid possible
2008           // double assignment.
2009           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
2010               && this.serverManager.isServerOnline(plan.getDestination()));
2011 
2012 
2013           if (hold) {
2014             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
2015               "try=" + i + " of " + this.maximumAttempts, t);
2016 
2017             if (maxWaitTime < 0) {
2018               if (t instanceof RegionAlreadyInTransitionException) {
2019                 maxWaitTime = EnvironmentEdgeManager.currentTimeMillis()
2020                   + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
2021                     DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
2022               } else {
2023                 maxWaitTime = this.server.getConfiguration().
2024                   getLong("hbase.regionserver.rpc.startup.waittime", 60000);
2025               }
2026             }
2027             try {
2028               needNewPlan = false;
2029               long now = EnvironmentEdgeManager.currentTimeMillis();
2030               if (now < maxWaitTime) {
2031                 LOG.debug("Server is not yet up or region is already in transition; "
2032                   + "waiting up to " + (maxWaitTime - now) + "ms", t);
2033                 Thread.sleep(100);
2034                 i--; // reset the try count
2035               } else if (!(t instanceof RegionAlreadyInTransitionException)) {
2036                 LOG.debug("Server is not up for a while; try a new one", t);
2037                 needNewPlan = true;
2038               }
2039             } catch (InterruptedException ie) {
2040               LOG.warn("Failed to assign "
2041                   + region.getRegionNameAsString() + " since interrupted", ie);
2042               Thread.currentThread().interrupt();
2043               if (!tomActivated) {
2044                 regionStates.updateRegionState(region, State.FAILED_OPEN);
2045               }
2046               return;
2047             }
2048           } else if (retry) {
2049             needNewPlan = false;
2050             i--; // we want to retry as many times as needed as long as the RS is not dead.
2051             LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
2052           } else {
2053             needNewPlan = true;
2054             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2055                 " try=" + i + " of " + this.maximumAttempts, t);
2056           }
2057         }
2058 
2059         if (i == this.maximumAttempts) {
2060           // Don't reset the region state or get a new plan any more.
2061           // This is the last try.
2062           continue;
2063         }
2064 
2065         // If region opened on destination of present plan, reassigning to new
2066         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
2067         // reassigning to same RS.
2068         if (needNewPlan) {
2069           // Force a new plan and reassign. Will return null if no servers.
2070           // The new plan could be the same as the existing plan since we don't
2071           // exclude the server of the original plan, which should not be
2072           // excluded since it could be the only server up now.
2073           RegionPlan newPlan = null;
2074           try {
2075             newPlan = getRegionPlan(region, true);
2076           } catch (HBaseIOException e) {
2077             LOG.warn("Failed to get region plan", e);
2078           }
2079           if (newPlan == null) {
2080             if (tomActivated) {
2081               this.timeoutMonitor.setAllRegionServersOffline(true);
2082             } else {
2083               regionStates.updateRegionState(region, State.FAILED_OPEN);
2084             }
2085             LOG.warn("Unable to find a viable location to assign region " +
2086                 region.getRegionNameAsString());
2087             return;
2088           }
2089 
2090           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2091             // Clean out plan we failed execute and one that doesn't look like it'll
2092             // succeed anyways; we need a new plan!
2093             // Transition back to OFFLINE
2094             currentState = regionStates.updateRegionState(region, State.OFFLINE);
2095             versionOfOfflineNode = -1;
2096             plan = newPlan;
2097           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
2098               previousException instanceof FailedServerException) {
2099             try {
2100               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
2101                 " to the same failed server.");
2102               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
2103                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
2104             } catch (InterruptedException ie) {
2105               LOG.warn("Failed to assign "
2106                   + region.getRegionNameAsString() + " since interrupted", ie);
2107               Thread.currentThread().interrupt();
2108               if (!tomActivated) {
2109                 regionStates.updateRegionState(region, State.FAILED_OPEN);
2110               }
2111               return;
2112             }
2113           }
2114         }
2115       }
2116       // Run out of attempts
2117       if (!tomActivated) {
2118         regionStates.updateRegionState(region, State.FAILED_OPEN);
2119       }
2120     } finally {
2121       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
2122     }
2123   }
2124 
2125   private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2126     // Remove region from in-memory transition and unassigned node from ZK
2127     // While trying to enable the table the regions of the table were
2128     // already enabled.
2129     LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2130       + " to " + sn);
2131     String encodedName = region.getEncodedName();
2132     deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2133     regionStates.regionOnline(region, sn);
2134   }
2135 
2136   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2137     TableName tableName = region.getTable();
2138     boolean disabled = this.zkTable.isDisabledTable(tableName);
2139     if (disabled || this.zkTable.isDisablingTable(tableName)) {
2140       LOG.info("Table " + tableName + (disabled ? " disabled;" : " disabling;") +
2141         " skipping assign of " + region.getRegionNameAsString());
2142       offlineDisabledRegion(region);
2143       return true;
2144     }
2145     return false;
2146   }
2147 
2148   /**
2149    * Set region as OFFLINED up in zookeeper
2150    *
2151    * @param state
2152    * @return the version of the offline node if setting of the OFFLINE node was
2153    *         successful, -1 otherwise.
2154    */
2155   private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2156     if (!state.isClosed() && !state.isOffline()) {
2157       String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2158       this.server.abort(msg, new IllegalStateException(msg));
2159       return -1;
2160     }
2161     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2162     int versionOfOfflineNode;
2163     try {
2164       // get the version after setting the znode to OFFLINE
2165       versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2166         state.getRegion(), destination);
2167       if (versionOfOfflineNode == -1) {
2168         LOG.warn("Attempted to create/force node into OFFLINE state before "
2169             + "completing assignment but failed to do so for " + state);
2170         return -1;
2171       }
2172     } catch (KeeperException e) {
2173       server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2174       return -1;
2175     }
2176     return versionOfOfflineNode;
2177   }
2178 
2179   /**
2180    * @param region the region to assign
2181    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2182    * if no servers to assign, it returns null).
2183    */
2184   private RegionPlan getRegionPlan(final HRegionInfo region,
2185       final boolean forceNewPlan)  throws HBaseIOException {
2186     return getRegionPlan(region, null, forceNewPlan);
2187   }
2188 
2189   /**
2190    * @param region the region to assign
2191    * @param serverToExclude Server to exclude (we know its bad). Pass null if
2192    * all servers are thought to be assignable.
2193    * @param forceNewPlan If true, then if an existing plan exists, a new plan
2194    * will be generated.
2195    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2196    * if no servers to assign, it returns null).
2197    */
2198   private RegionPlan getRegionPlan(final HRegionInfo region,
2199       final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
2200     // Pickup existing plan or make a new one
2201     final String encodedName = region.getEncodedName();
2202     final List<ServerName> destServers =
2203       serverManager.createDestinationServersList(serverToExclude);
2204 
2205     if (destServers.isEmpty()){
2206       LOG.warn("Can't move " + encodedName +
2207         ", there is no destination server available.");
2208       return null;
2209     }
2210 
2211     RegionPlan randomPlan = null;
2212     boolean newPlan = false;
2213     RegionPlan existingPlan;
2214 
2215     synchronized (this.regionPlans) {
2216       existingPlan = this.regionPlans.get(encodedName);
2217 
2218       if (existingPlan != null && existingPlan.getDestination() != null) {
2219         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2220           + " destination server is " + existingPlan.getDestination() +
2221             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2222       }
2223 
2224       if (forceNewPlan
2225           || existingPlan == null
2226           || existingPlan.getDestination() == null
2227           || !destServers.contains(existingPlan.getDestination())) {
2228         newPlan = true;
2229         randomPlan = new RegionPlan(region, null,
2230             balancer.randomAssignment(region, destServers));
2231         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2232           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2233           regions.add(region);
2234           try {
2235             processFavoredNodes(regions);
2236           } catch (IOException ie) {
2237             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2238           }
2239         }
2240         this.regionPlans.put(encodedName, randomPlan);
2241       }
2242     }
2243 
2244     if (newPlan) {
2245       if (randomPlan.getDestination() == null) {
2246         LOG.warn("Can't find a destination for " + encodedName);
2247         return null;
2248       }
2249       LOG.debug("No previous transition plan found (or ignoring " +
2250         "an existing plan) for " + region.getRegionNameAsString() +
2251         "; generated random plan=" + randomPlan + "; " +
2252         serverManager.countOfRegionServers() +
2253                " (online=" + serverManager.getOnlineServers().size() +
2254                ", available=" + destServers.size() + ") available servers" +
2255                ", forceNewPlan=" + forceNewPlan);
2256         return randomPlan;
2257       }
2258     LOG.debug("Using pre-existing plan for " +
2259       region.getRegionNameAsString() + "; plan=" + existingPlan);
2260     return existingPlan;
2261   }
2262 
2263   /**
2264    * Unassigns the specified region.
2265    * <p>
2266    * Updates the RegionState and sends the CLOSE RPC unless region is being
2267    * split by regionserver; then the unassign fails (silently) because we
2268    * presume the region being unassigned no longer exists (its been split out
2269    * of existence). TODO: What to do if split fails and is rolled back and
2270    * parent is revivified?
2271    * <p>
2272    * If a RegionPlan is already set, it will remain.
2273    *
2274    * @param region server to be unassigned
2275    */
2276   public void unassign(HRegionInfo region) {
2277     unassign(region, false);
2278   }
2279 
2280 
2281   /**
2282    * Unassigns the specified region.
2283    * <p>
2284    * Updates the RegionState and sends the CLOSE RPC unless region is being
2285    * split by regionserver; then the unassign fails (silently) because we
2286    * presume the region being unassigned no longer exists (its been split out
2287    * of existence). TODO: What to do if split fails and is rolled back and
2288    * parent is revivified?
2289    * <p>
2290    * If a RegionPlan is already set, it will remain.
2291    *
2292    * @param region server to be unassigned
2293    * @param force if region should be closed even if already closing
2294    */
2295   public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2296     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
2297     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2298       + " (offlining), current state: " + regionStates.getRegionState(region));
2299 
2300     String encodedName = region.getEncodedName();
2301     // Grab the state of this region and synchronize on it
2302     int versionOfClosingNode = -1;
2303     // We need a lock here as we're going to do a put later and we don't want multiple states
2304     //  creation
2305     ReentrantLock lock = locker.acquireLock(encodedName);
2306     RegionState state = regionStates.getRegionTransitionState(encodedName);
2307     boolean reassign = true;
2308     try {
2309       if (state == null) {
2310         // Region is not in transition.
2311         // We can unassign it only if it's not SPLIT/MERGED.
2312         state = regionStates.getRegionState(encodedName);
2313         if (state != null && state.isUnassignable()) {
2314           LOG.info("Attempting to unassign " + state + ", ignored");
2315           // Offline region will be reassigned below
2316           return;
2317         }
2318         // Create the znode in CLOSING state
2319         try {
2320           if (state == null || state.getServerName() == null) {
2321             // We don't know where the region is, offline it.
2322             // No need to send CLOSE RPC
2323             LOG.warn("Attempting to unassign a region not in RegionStates"
2324               + region.getRegionNameAsString() + ", offlined");
2325             regionOffline(region);
2326             return;
2327           }
2328           versionOfClosingNode = ZKAssign.createNodeClosing(
2329             watcher, region, state.getServerName());
2330           if (versionOfClosingNode == -1) {
2331             LOG.info("Attempting to unassign " +
2332               region.getRegionNameAsString() + " but ZK closing node "
2333               + "can't be created.");
2334             reassign = false; // not unassigned at all
2335             return;
2336           }
2337         } catch (KeeperException e) {
2338           if (e instanceof NodeExistsException) {
2339             // Handle race between master initiated close and regionserver
2340             // orchestrated splitting. See if existing node is in a
2341             // SPLITTING or SPLIT state.  If so, the regionserver started
2342             // an op on node before we could get our CLOSING in.  Deal.
2343             NodeExistsException nee = (NodeExistsException)e;
2344             String path = nee.getPath();
2345             try {
2346               if (isSplitOrSplittingOrMergedOrMerging(path)) {
2347                 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2348                   "skipping unassign because region no longer exists -- its split or merge");
2349                 reassign = false; // no need to reassign for split/merged region
2350                 return;
2351               }
2352             } catch (KeeperException.NoNodeException ke) {
2353               LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2354                 "; presuming split and that the region to unassign, " +
2355                 encodedName + ", no longer exists -- confirm", ke);
2356               return;
2357             } catch (KeeperException ke) {
2358               LOG.error("Unexpected zk state", ke);
2359             } catch (DeserializationException de) {
2360               LOG.error("Failed parse", de);
2361             }
2362           }
2363           // If we get here, don't understand whats going on -- abort.
2364           server.abort("Unexpected ZK exception creating node CLOSING", e);
2365           reassign = false; // heading out already
2366           return;
2367         }
2368         state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2369       } else if (state.isFailedOpen()) {
2370         // The region is not open yet
2371         regionOffline(region);
2372         return;
2373       } else if (force && state.isPendingCloseOrClosing()) {
2374         LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2375           " which is already " + state.getState()  +
2376           " but forcing to send a CLOSE RPC again ");
2377         if (state.isFailedClose()) {
2378           state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2379         }
2380         state.updateTimestampToNow();
2381       } else {
2382         LOG.debug("Attempting to unassign " +
2383           region.getRegionNameAsString() + " but it is " +
2384           "already in transition (" + state.getState() + ", force=" + force + ")");
2385         return;
2386       }
2387 
2388       unassign(region, state, versionOfClosingNode, dest, true, null);
2389     } finally {
2390       lock.unlock();
2391 
2392       // Region is expected to be reassigned afterwards
2393       if (reassign && regionStates.isRegionOffline(region)) {
2394         assign(region, true);
2395       }
2396     }
2397   }
2398 
2399   public void unassign(HRegionInfo region, boolean force){
2400      unassign(region, force, null);
2401   }
2402 
2403   /**
2404    * @param region regioninfo of znode to be deleted.
2405    */
2406   public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2407     String encodedName = region.getEncodedName();
2408     deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2409       EventType.RS_ZK_REGION_CLOSED);
2410   }
2411 
2412   /**
2413    * @param path
2414    * @return True if znode is in SPLIT or SPLITTING or MERGED or MERGING state.
2415    * @throws KeeperException Can happen if the znode went away in meantime.
2416    * @throws DeserializationException
2417    */
2418   private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2419       throws KeeperException, DeserializationException {
2420     boolean result = false;
2421     // This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets
2422     // cleaned up before we can get data from it.
2423     byte [] data = ZKAssign.getData(watcher, path);
2424     if (data == null) {
2425       LOG.info("Node " + path + " is gone");
2426       return false;
2427     }
2428     RegionTransition rt = RegionTransition.parseFrom(data);
2429     switch (rt.getEventType()) {
2430     case RS_ZK_REQUEST_REGION_SPLIT:
2431     case RS_ZK_REGION_SPLIT:
2432     case RS_ZK_REGION_SPLITTING:
2433     case RS_ZK_REQUEST_REGION_MERGE:
2434     case RS_ZK_REGION_MERGED:
2435     case RS_ZK_REGION_MERGING:
2436       result = true;
2437       break;
2438     default:
2439       LOG.info("Node " + path + " is in " + rt.getEventType());
2440       break;
2441     }
2442     return result;
2443   }
2444 
2445   /**
2446    * Used by unit tests. Return the number of regions opened so far in the life
2447    * of the master. Increases by one every time the master opens a region
2448    * @return the counter value of the number of regions opened so far
2449    */
2450   public int getNumRegionsOpened() {
2451     return numRegionsOpened.get();
2452   }
2453 
2454   /**
2455    * Waits until the specified region has completed assignment.
2456    * <p>
2457    * If the region is already assigned, returns immediately.  Otherwise, method
2458    * blocks until the region is assigned.
2459    * @param regionInfo region to wait on assignment for
2460    * @throws InterruptedException
2461    */
2462   public boolean waitForAssignment(HRegionInfo regionInfo)
2463       throws InterruptedException {
2464     while (!regionStates.isRegionOnline(regionInfo)) {
2465       if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
2466           || this.server.isStopped()) {
2467         return false;
2468       }
2469 
2470       // We should receive a notification, but it's
2471       //  better to have a timeout to recheck the condition here:
2472       //  it lowers the impact of a race condition if any
2473       regionStates.waitForUpdate(100);
2474     }
2475     return true;
2476   }
2477 
2478   /**
2479    * Assigns the hbase:meta region.
2480    * <p>
2481    * Assumes that hbase:meta is currently closed and is not being actively served by
2482    * any RegionServer.
2483    * <p>
2484    * Forcibly unsets the current meta region location in ZooKeeper and assigns
2485    * hbase:meta to a random RegionServer.
2486    * @throws KeeperException
2487    */
2488   public void assignMeta() throws KeeperException {
2489     MetaRegionTracker.deleteMetaLocation(this.watcher);
2490     assign(HRegionInfo.FIRST_META_REGIONINFO, true);
2491   }
2492 
2493   /**
2494    * Assigns specified regions retaining assignments, if any.
2495    * <p>
2496    * This is a synchronous call and will return once every region has been
2497    * assigned.  If anything fails, an exception is thrown
2498    * @throws InterruptedException
2499    * @throws IOException
2500    */
2501   public void assign(Map<HRegionInfo, ServerName> regions)
2502         throws IOException, InterruptedException {
2503     if (regions == null || regions.isEmpty()) {
2504       return;
2505     }
2506     List<ServerName> servers = serverManager.createDestinationServersList();
2507     if (servers == null || servers.isEmpty()) {
2508       throw new IOException("Found no destination server to assign region(s)");
2509     }
2510 
2511     // Reuse existing assignment info
2512     Map<ServerName, List<HRegionInfo>> bulkPlan =
2513       balancer.retainAssignment(regions, servers);
2514 
2515     assign(regions.size(), servers.size(),
2516       "retainAssignment=true", bulkPlan);
2517   }
2518 
2519   /**
2520    * Assigns specified regions round robin, if any.
2521    * <p>
2522    * This is a synchronous call and will return once every region has been
2523    * assigned.  If anything fails, an exception is thrown
2524    * @throws InterruptedException
2525    * @throws IOException
2526    */
2527   public void assign(List<HRegionInfo> regions)
2528         throws IOException, InterruptedException {
2529     if (regions == null || regions.isEmpty()) {
2530       return;
2531     }
2532 
2533     List<ServerName> servers = serverManager.createDestinationServersList();
2534     if (servers == null || servers.isEmpty()) {
2535       throw new IOException("Found no destination server to assign region(s)");
2536     }
2537 
2538     // Generate a round-robin bulk assignment plan
2539     Map<ServerName, List<HRegionInfo>> bulkPlan
2540       = balancer.roundRobinAssignment(regions, servers);
2541     processFavoredNodes(regions);
2542 
2543     assign(regions.size(), servers.size(),
2544       "round-robin=true", bulkPlan);
2545   }
2546 
2547   private void assign(int regions, int totalServers,
2548       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2549           throws InterruptedException, IOException {
2550 
2551     int servers = bulkPlan.size();
2552     if (servers == 1 || (regions < bulkAssignThresholdRegions
2553         && servers < bulkAssignThresholdServers)) {
2554 
2555       // Not use bulk assignment.  This could be more efficient in small
2556       // cluster, especially mini cluster for testing, so that tests won't time out
2557       if (LOG.isTraceEnabled()) {
2558         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2559           " region(s) to " + servers + " server(s)");
2560       }
2561       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2562         if (!assign(plan.getKey(), plan.getValue())) {
2563           for (HRegionInfo region: plan.getValue()) {
2564             if (!regionStates.isRegionOnline(region)) {
2565               invokeAssign(region);
2566             }
2567           }
2568         }
2569       }
2570     } else {
2571       LOG.info("Bulk assigning " + regions + " region(s) across "
2572         + totalServers + " server(s), " + message);
2573 
2574       // Use fixed count thread pool assigning.
2575       BulkAssigner ba = new GeneralBulkAssigner(
2576         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2577       ba.bulkAssign();
2578       LOG.info("Bulk assigning done");
2579     }
2580   }
2581 
2582   /**
2583    * Assigns all user regions, if any exist.  Used during cluster startup.
2584    * <p>
2585    * This is a synchronous call and will return once every region has been
2586    * assigned.  If anything fails, an exception is thrown and the cluster
2587    * should be shutdown.
2588    * @throws InterruptedException
2589    * @throws IOException
2590    * @throws KeeperException
2591    */
2592   private void assignAllUserRegions()
2593       throws IOException, InterruptedException, KeeperException {
2594     // Cleanup any existing ZK nodes and start watching
2595     ZKAssign.deleteAllNodes(watcher);
2596     ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
2597       this.watcher.assignmentZNode);
2598     failoverCleanupDone();
2599 
2600     // Skip assignment for regions of tables in DISABLING state because during clean cluster startup
2601     // no RS is alive and regions map also doesn't have any information about the regions.
2602     // See HBASE-6281.
2603     Set<TableName> disabledOrDisablingOrEnabling = ZKTable.getDisabledOrDisablingTables(watcher);
2604     disabledOrDisablingOrEnabling.addAll(ZKTable.getEnablingTables(watcher));
2605     // Scan hbase:meta for all user regions, skipping any disabled tables
2606     Map<HRegionInfo, ServerName> allRegions;
2607     SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
2608        new SnapshotOfRegionAssignmentFromMeta(catalogTracker, disabledOrDisablingOrEnabling, true);
2609     snapshotOfRegionAssignment.initialize();
2610     allRegions = snapshotOfRegionAssignment.getRegionToRegionServerMap();
2611     if (allRegions == null || allRegions.isEmpty()) return;
2612 
2613     // Determine what type of assignment to do on startup
2614     boolean retainAssignment = server.getConfiguration().
2615       getBoolean("hbase.master.startup.retainassign", true);
2616 
2617     if (retainAssignment) {
2618       assign(allRegions);
2619     } else {
2620       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(allRegions.keySet());
2621       assign(regions);
2622     }
2623 
2624     for (HRegionInfo hri : allRegions.keySet()) {
2625       TableName tableName = hri.getTable();
2626       if (!zkTable.isEnabledTable(tableName)) {
2627         setEnabledTable(tableName);
2628       }
2629     }
2630   }
2631 
2632   /**
2633    * Wait until no regions in transition.
2634    * @param timeout How long to wait.
2635    * @return True if nothing in regions in transition.
2636    * @throws InterruptedException
2637    */
2638   boolean waitUntilNoRegionsInTransition(final long timeout)
2639       throws InterruptedException {
2640     // Blocks until there are no regions in transition. It is possible that
2641     // there
2642     // are regions in transition immediately after this returns but guarantees
2643     // that if it returns without an exception that there was a period of time
2644     // with no regions in transition from the point-of-view of the in-memory
2645     // state of the Master.
2646     final long endTime = System.currentTimeMillis() + timeout;
2647 
2648     while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2649         && endTime > System.currentTimeMillis()) {
2650       regionStates.waitForUpdate(100);
2651     }
2652 
2653     return !regionStates.isRegionsInTransition();
2654   }
2655 
2656   /**
2657    * Rebuild the list of user regions and assignment information.
2658    * <p>
2659    * Returns a map of servers that are not found to be online and the regions
2660    * they were hosting.
2661    * @return map of servers not online to their assigned regions, as stored
2662    *         in META
2663    * @throws IOException
2664    */
2665   Map<ServerName, List<HRegionInfo>> rebuildUserRegions() throws IOException, KeeperException {
2666     Set<TableName> enablingTables = ZKTable.getEnablingTables(watcher);
2667     Set<TableName> disabledOrEnablingTables = ZKTable.getDisabledTables(watcher);
2668     disabledOrEnablingTables.addAll(enablingTables);
2669     Set<TableName> disabledOrDisablingOrEnabling = ZKTable.getDisablingTables(watcher);
2670     disabledOrDisablingOrEnabling.addAll(disabledOrEnablingTables);
2671 
2672     // Region assignment from META
2673     List<Result> results = MetaReader.fullScan(this.catalogTracker);
2674     // Get any new but slow to checkin region server that joined the cluster
2675     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2676     // Map of offline servers and their regions to be returned
2677     Map<ServerName, List<HRegionInfo>> offlineServers =
2678       new TreeMap<ServerName, List<HRegionInfo>>();
2679     // Iterate regions in META
2680     for (Result result : results) {
2681       Pair<HRegionInfo, ServerName> region = HRegionInfo.getHRegionInfoAndServerName(result);
2682       if (region == null) continue;
2683       HRegionInfo regionInfo = region.getFirst();
2684       ServerName regionLocation = region.getSecond();
2685       if (regionInfo == null) continue;
2686       regionStates.createRegionState(regionInfo);
2687       if (regionStates.isRegionInState(regionInfo, State.SPLIT)) {
2688         // Split is considered to be completed. If the split znode still
2689         // exists, the region will be put back to SPLITTING state later
2690         LOG.debug("Region " + regionInfo.getRegionNameAsString()
2691            + " split is completed. Hence need not add to regions list");
2692         continue;
2693       }
2694       TableName tableName = regionInfo.getTable();
2695       if (regionLocation == null) {
2696         // regionLocation could be null if createTable didn't finish properly.
2697         // When createTable is in progress, HMaster restarts.
2698         // Some regions have been added to hbase:meta, but have not been assigned.
2699         // When this happens, the region's table must be in ENABLING state.
2700         // It can't be in ENABLED state as that is set when all regions are
2701         // assigned.
2702         // It can't be in DISABLING state, because DISABLING state transitions
2703         // from ENABLED state when application calls disableTable.
2704         // It can't be in DISABLED state, because DISABLED states transitions
2705         // from DISABLING state.
2706         if (!enablingTables.contains(tableName)) {
2707           LOG.warn("Region " + regionInfo.getEncodedName() +
2708             " has null regionLocation." + " But its table " + tableName +
2709             " isn't in ENABLING state.");
2710         }
2711       } else if (!onlineServers.contains(regionLocation)) {
2712         // Region is located on a server that isn't online
2713         List<HRegionInfo> offlineRegions = offlineServers.get(regionLocation);
2714         if (offlineRegions == null) {
2715           offlineRegions = new ArrayList<HRegionInfo>(1);
2716           offlineServers.put(regionLocation, offlineRegions);
2717         }
2718         offlineRegions.add(regionInfo);
2719         // need to enable the table if not disabled or disabling or enabling
2720         // this will be used in rolling restarts
2721         if (!disabledOrDisablingOrEnabling.contains(tableName)
2722             && !getZKTable().isEnabledTable(tableName)) {
2723           setEnabledTable(tableName);
2724         }
2725       } else {
2726         // Region is being served and on an active server
2727         // add only if region not in disabled or enabling table
2728         if (!disabledOrEnablingTables.contains(tableName)) {
2729           regionStates.updateRegionState(regionInfo, State.OPEN, regionLocation);
2730           regionStates.regionOnline(regionInfo, regionLocation);
2731           balancer.regionOnline(regionInfo, regionLocation);
2732         }
2733         // need to enable the table if not disabled or disabling or enabling
2734         // this will be used in rolling restarts
2735         if (!disabledOrDisablingOrEnabling.contains(tableName)
2736             && !getZKTable().isEnabledTable(tableName)) {
2737           setEnabledTable(tableName);
2738         }
2739       }
2740     }
2741     return offlineServers;
2742   }
2743 
2744   /**
2745    * Recover the tables that were not fully moved to DISABLED state. These
2746    * tables are in DISABLING state when the master restarted/switched.
2747    *
2748    * @throws KeeperException
2749    * @throws TableNotFoundException
2750    * @throws IOException
2751    */
2752   private void recoverTableInDisablingState()
2753       throws KeeperException, TableNotFoundException, IOException {
2754     Set<TableName> disablingTables = ZKTable.getDisablingTables(watcher);
2755     if (disablingTables.size() != 0) {
2756       for (TableName tableName : disablingTables) {
2757         // Recover by calling DisableTableHandler
2758         LOG.info("The table " + tableName
2759             + " is in DISABLING state.  Hence recovering by moving the table"
2760             + " to DISABLED state.");
2761         new DisableTableHandler(this.server, tableName, catalogTracker,
2762             this, tableLockManager, true).prepare().process();
2763       }
2764     }
2765   }
2766 
2767   /**
2768    * Recover the tables that are not fully moved to ENABLED state. These tables
2769    * are in ENABLING state when the master restarted/switched
2770    *
2771    * @throws KeeperException
2772    * @throws org.apache.hadoop.hbase.TableNotFoundException
2773    * @throws IOException
2774    */
2775   private void recoverTableInEnablingState()
2776       throws KeeperException, TableNotFoundException, IOException {
2777     Set<TableName> enablingTables = ZKTable.getEnablingTables(watcher);
2778     if (enablingTables.size() != 0) {
2779       for (TableName tableName : enablingTables) {
2780         // Recover by calling EnableTableHandler
2781         LOG.info("The table " + tableName
2782             + " is in ENABLING state.  Hence recovering by moving the table"
2783             + " to ENABLED state.");
2784         // enableTable in sync way during master startup,
2785         // no need to invoke coprocessor
2786         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
2787           catalogTracker, this, tableLockManager, true);
2788         try {
2789           eth.prepare();
2790         } catch (TableNotFoundException e) {
2791           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
2792           continue;
2793         }
2794         eth.process();
2795       }
2796     }
2797   }
2798 
2799   /**
2800    * Processes list of dead servers from result of hbase:meta scan and regions in RIT
2801    * <p>
2802    * This is used for failover to recover the lost regions that belonged to
2803    * RegionServers which failed while there was no active master or regions
2804    * that were in RIT.
2805    * <p>
2806    *
2807    *
2808    * @param deadServers
2809    *          The list of dead servers which failed while there was no active
2810    *          master. Can be null.
2811    * @throws IOException
2812    * @throws KeeperException
2813    */
2814   private void processDeadServersAndRecoverLostRegions(
2815       Map<ServerName, List<HRegionInfo>> deadServers)
2816           throws IOException, KeeperException {
2817     if (deadServers != null) {
2818       for (Map.Entry<ServerName, List<HRegionInfo>> server: deadServers.entrySet()) {
2819         ServerName serverName = server.getKey();
2820         // We need to keep such info even if the server is known dead
2821         regionStates.setLastRegionServerOfRegions(serverName, server.getValue());
2822         if (!serverManager.isServerDead(serverName)) {
2823           serverManager.expireServer(serverName); // Let SSH do region re-assign
2824         }
2825       }
2826     }
2827     List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(
2828       this.watcher, this.watcher.assignmentZNode);
2829     if (!nodes.isEmpty()) {
2830       for (String encodedRegionName : nodes) {
2831         processRegionInTransition(encodedRegionName, null);
2832       }
2833     }
2834 
2835     // Now we can safely claim failover cleanup completed and enable
2836     // ServerShutdownHandler for further processing. The nodes (below)
2837     // in transition, if any, are for regions not related to those
2838     // dead servers at all, and can be done in parallel to SSH.
2839     failoverCleanupDone();
2840   }
2841 
2842   /**
2843    * Set Regions in transitions metrics.
2844    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
2845    * This iterator is not fail fast, which may lead to stale read; but that's better than
2846    * creating a copy of the map for metrics computation, as this method will be invoked
2847    * on a frequent interval.
2848    */
2849   public void updateRegionsInTransitionMetrics() {
2850     long currentTime = System.currentTimeMillis();
2851     int totalRITs = 0;
2852     int totalRITsOverThreshold = 0;
2853     long oldestRITTime = 0;
2854     int ritThreshold = this.server.getConfiguration().
2855       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
2856     for (RegionState state: regionStates.getRegionsInTransition().values()) {
2857       totalRITs++;
2858       long ritTime = currentTime - state.getStamp();
2859       if (ritTime > ritThreshold) { // more than the threshold
2860         totalRITsOverThreshold++;
2861       }
2862       if (oldestRITTime < ritTime) {
2863         oldestRITTime = ritTime;
2864       }
2865     }
2866     if (this.metricsAssignmentManager != null) {
2867       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
2868       this.metricsAssignmentManager.updateRITCount(totalRITs);
2869       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
2870     }
2871   }
2872 
2873   /**
2874    * @param region Region whose plan we are to clear.
2875    */
2876   void clearRegionPlan(final HRegionInfo region) {
2877     synchronized (this.regionPlans) {
2878       this.regionPlans.remove(region.getEncodedName());
2879     }
2880   }
2881 
2882   /**
2883    * Wait on region to clear regions-in-transition.
2884    * @param hri Region to wait on.
2885    * @throws IOException
2886    */
2887   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
2888       throws IOException, InterruptedException {
2889     waitOnRegionToClearRegionsInTransition(hri, -1L);
2890   }
2891 
2892   /**
2893    * Wait on region to clear regions-in-transition or time out
2894    * @param hri
2895    * @param timeOut Milliseconds to wait for current region to be out of transition state.
2896    * @return True when a region clears regions-in-transition before timeout otherwise false
2897    * @throws InterruptedException
2898    */
2899   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
2900       throws InterruptedException {
2901     if (!regionStates.isRegionInTransition(hri)) return true;
2902     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTimeMillis()
2903         + timeOut;
2904     // There is already a timeout monitor on regions in transition so I
2905     // should not have to have one here too?
2906     LOG.info("Waiting for " + hri.getEncodedName() +
2907         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
2908     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
2909       regionStates.waitForUpdate(100);
2910       if (EnvironmentEdgeManager.currentTimeMillis() > end) {
2911         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
2912         return false;
2913       }
2914     }
2915     if (this.server.isStopped()) {
2916       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
2917       return false;
2918     }
2919     return true;
2920   }
2921 
2922   /**
2923    * Update timers for all regions in transition going against the server in the
2924    * serversInUpdatingTimer.
2925    */
2926   public class TimerUpdater extends Chore {
2927 
2928     public TimerUpdater(final int period, final Stoppable stopper) {
2929       super("AssignmentTimerUpdater", period, stopper);
2930     }
2931 
2932     @Override
2933     protected void chore() {
2934       Preconditions.checkState(tomActivated);
2935       ServerName serverToUpdateTimer = null;
2936       while (!serversInUpdatingTimer.isEmpty() && !stopper.isStopped()) {
2937         if (serverToUpdateTimer == null) {
2938           serverToUpdateTimer = serversInUpdatingTimer.first();
2939         } else {
2940           serverToUpdateTimer = serversInUpdatingTimer
2941               .higher(serverToUpdateTimer);
2942         }
2943         if (serverToUpdateTimer == null) {
2944           break;
2945         }
2946         updateTimers(serverToUpdateTimer);
2947         serversInUpdatingTimer.remove(serverToUpdateTimer);
2948       }
2949     }
2950   }
2951 
2952   /**
2953    * Monitor to check for time outs on region transition operations
2954    */
2955   public class TimeoutMonitor extends Chore {
2956     private boolean allRegionServersOffline = false;
2957     private ServerManager serverManager;
2958     private final int timeout;
2959 
2960     /**
2961      * Creates a periodic monitor to check for time outs on region transition
2962      * operations.  This will deal with retries if for some reason something
2963      * doesn't happen within the specified timeout.
2964      * @param period
2965    * @param stopper When {@link Stoppable#isStopped()} is true, this thread will
2966    * cleanup and exit cleanly.
2967      * @param timeout
2968      */
2969     public TimeoutMonitor(final int period, final Stoppable stopper,
2970         ServerManager serverManager,
2971         final int timeout) {
2972       super("AssignmentTimeoutMonitor", period, stopper);
2973       this.timeout = timeout;
2974       this.serverManager = serverManager;
2975     }
2976 
2977     private synchronized void setAllRegionServersOffline(
2978       boolean allRegionServersOffline) {
2979       this.allRegionServersOffline = allRegionServersOffline;
2980     }
2981 
2982     @Override
2983     protected void chore() {
2984       Preconditions.checkState(tomActivated);
2985       boolean noRSAvailable = this.serverManager.createDestinationServersList().isEmpty();
2986 
2987       // Iterate all regions in transition checking for time outs
2988       long now = System.currentTimeMillis();
2989       // no lock concurrent access ok: we will be working on a copy, and it's java-valid to do
2990       //  a copy while another thread is adding/removing items
2991       for (String regionName : regionStates.getRegionsInTransition().keySet()) {
2992         RegionState regionState = regionStates.getRegionTransitionState(regionName);
2993         if (regionState == null) continue;
2994 
2995         if (regionState.getStamp() + timeout <= now) {
2996           // decide on action upon timeout
2997           actOnTimeOut(regionState);
2998         } else if (this.allRegionServersOffline && !noRSAvailable) {
2999           RegionPlan existingPlan = regionPlans.get(regionName);
3000           if (existingPlan == null
3001               || !this.serverManager.isServerOnline(existingPlan
3002                   .getDestination())) {
3003             // if some RSs just came back online, we can start the assignment
3004             // right away
3005             actOnTimeOut(regionState);
3006           }
3007         }
3008       }
3009       setAllRegionServersOffline(noRSAvailable);
3010     }
3011 
3012     private void actOnTimeOut(RegionState regionState) {
3013       HRegionInfo regionInfo = regionState.getRegion();
3014       LOG.info("Regions in transition timed out:  " + regionState);
3015       // Expired! Do a retry.
3016       switch (regionState.getState()) {
3017       case CLOSED:
3018         LOG.info("Region " + regionInfo.getEncodedName()
3019             + " has been CLOSED for too long, waiting on queued "
3020             + "ClosedRegionHandler to run or server shutdown");
3021         // Update our timestamp.
3022         regionState.updateTimestampToNow();
3023         break;
3024       case OFFLINE:
3025         LOG.info("Region has been OFFLINE for too long, " + "reassigning "
3026             + regionInfo.getRegionNameAsString() + " to a random server");
3027         invokeAssign(regionInfo);
3028         break;
3029       case PENDING_OPEN:
3030         LOG.info("Region has been PENDING_OPEN for too "
3031             + "long, reassigning region=" + regionInfo.getRegionNameAsString());
3032         invokeAssign(regionInfo);
3033         break;
3034       case OPENING:
3035         processOpeningState(regionInfo);
3036         break;
3037       case OPEN:
3038         LOG.error("Region has been OPEN for too long, " +
3039             "we don't know where region was opened so can't do anything");
3040         regionState.updateTimestampToNow();
3041         break;
3042 
3043       case PENDING_CLOSE:
3044         LOG.info("Region has been PENDING_CLOSE for too "
3045             + "long, running forced unassign again on region="
3046             + regionInfo.getRegionNameAsString());
3047         invokeUnassign(regionInfo);
3048         break;
3049       case CLOSING:
3050         LOG.info("Region has been CLOSING for too " +
3051           "long, this should eventually complete or the server will " +
3052           "expire, send RPC again");
3053         invokeUnassign(regionInfo);
3054         break;
3055 
3056       case SPLIT:
3057       case SPLITTING:
3058       case FAILED_OPEN:
3059       case FAILED_CLOSE:
3060       case MERGING:
3061         break;
3062 
3063       default:
3064         throw new IllegalStateException("Received event is not valid.");
3065       }
3066     }
3067   }
3068 
3069   private void processOpeningState(HRegionInfo regionInfo) {
3070     LOG.info("Region has been OPENING for too long, reassigning region="
3071         + regionInfo.getRegionNameAsString());
3072     // Should have a ZK node in OPENING state
3073     try {
3074       String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName());
3075       Stat stat = new Stat();
3076       byte [] data = ZKAssign.getDataNoWatch(watcher, node, stat);
3077       if (data == null) {
3078         LOG.warn("Data is null, node " + node + " no longer exists");
3079         return;
3080       }
3081       RegionTransition rt = RegionTransition.parseFrom(data);
3082       EventType et = rt.getEventType();
3083       if (et == EventType.RS_ZK_REGION_OPENED) {
3084         LOG.debug("Region has transitioned to OPENED, allowing "
3085             + "watched event handlers to process");
3086         return;
3087       } else if (et != EventType.RS_ZK_REGION_OPENING && et != EventType.RS_ZK_REGION_FAILED_OPEN ) {
3088         LOG.warn("While timing out a region, found ZK node in unexpected state: " + et);
3089         return;
3090       }
3091       invokeAssign(regionInfo);
3092     } catch (KeeperException ke) {
3093       LOG.error("Unexpected ZK exception timing out CLOSING region", ke);
3094     } catch (DeserializationException e) {
3095       LOG.error("Unexpected exception parsing CLOSING region", e);
3096     }
3097   }
3098 
3099   void invokeAssign(HRegionInfo regionInfo) {
3100     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
3101   }
3102 
3103   private void invokeUnassign(HRegionInfo regionInfo) {
3104     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3105   }
3106 
3107   public boolean isCarryingMeta(ServerName serverName) {
3108     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3109   }
3110 
3111   /**
3112    * Check if the shutdown server carries the specific region.
3113    * We have a bunch of places that store region location
3114    * Those values aren't consistent. There is a delay of notification.
3115    * The location from zookeeper unassigned node has the most recent data;
3116    * but the node could be deleted after the region is opened by AM.
3117    * The AM's info could be old when OpenedRegionHandler
3118    * processing hasn't finished yet when server shutdown occurs.
3119    * @return whether the serverName currently hosts the region
3120    */
3121   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3122     RegionTransition rt = null;
3123     try {
3124       byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3125       // This call can legitimately come by null
3126       rt = data == null? null: RegionTransition.parseFrom(data);
3127     } catch (KeeperException e) {
3128       server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3129     } catch (DeserializationException e) {
3130       server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3131     }
3132 
3133     ServerName addressFromZK = rt != null? rt.getServerName():  null;
3134     if (addressFromZK != null) {
3135       // if we get something from ZK, we will use the data
3136       boolean matchZK = addressFromZK.equals(serverName);
3137       LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3138         " current=" + serverName + ", matches=" + matchZK);
3139       return matchZK;
3140     }
3141 
3142     ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3143     boolean matchAM = (addressFromAM != null &&
3144       addressFromAM.equals(serverName));
3145     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3146       " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3147       " server being checked: " + serverName);
3148 
3149     return matchAM;
3150   }
3151 
3152   /**
3153    * Process shutdown server removing any assignments.
3154    * @param sn Server that went down.
3155    * @return list of regions in transition on this server
3156    */
3157   public List<HRegionInfo> processServerShutdown(final ServerName sn) {
3158     // Clean out any existing assignment plans for this server
3159     synchronized (this.regionPlans) {
3160       for (Iterator <Map.Entry<String, RegionPlan>> i =
3161           this.regionPlans.entrySet().iterator(); i.hasNext();) {
3162         Map.Entry<String, RegionPlan> e = i.next();
3163         ServerName otherSn = e.getValue().getDestination();
3164         // The name will be null if the region is planned for a random assign.
3165         if (otherSn != null && otherSn.equals(sn)) {
3166           // Use iterator's remove else we'll get CME
3167           i.remove();
3168         }
3169       }
3170     }
3171     List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3172     for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3173       HRegionInfo hri = it.next();
3174       String encodedName = hri.getEncodedName();
3175 
3176       // We need a lock on the region as we could update it
3177       Lock lock = locker.acquireLock(encodedName);
3178       try {
3179         RegionState regionState =
3180           regionStates.getRegionTransitionState(encodedName);
3181         if (regionState == null
3182             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3183             || !(regionState.isFailedClose() || regionState.isOffline()
3184               || regionState.isPendingOpenOrOpening())) {
3185           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3186             + " on the dead server any more: " + sn);
3187           it.remove();
3188         } else {
3189           try {
3190             // Delete the ZNode if exists
3191             ZKAssign.deleteNodeFailSilent(watcher, hri);
3192           } catch (KeeperException ke) {
3193             server.abort("Unexpected ZK exception deleting node " + hri, ke);
3194           }
3195           if (zkTable.isDisablingOrDisabledTable(hri.getTable())) {
3196             regionStates.regionOffline(hri);
3197             it.remove();
3198             continue;
3199           }
3200           // Mark the region offline and assign it again by SSH
3201           regionStates.updateRegionState(hri, State.OFFLINE);
3202         }
3203       } finally {
3204         lock.unlock();
3205       }
3206     }
3207     return regions;
3208   }
3209 
3210   /**
3211    * @param plan Plan to execute.
3212    */
3213   public void balance(final RegionPlan plan) {
3214     HRegionInfo hri = plan.getRegionInfo();
3215     TableName tableName = hri.getTable();
3216     if (zkTable.isDisablingOrDisabledTable(tableName)) {
3217       LOG.info("Ignored moving region of disabling/disabled table "
3218         + tableName);
3219       return;
3220     }
3221 
3222     // Move the region only if it's assigned
3223     String encodedName = hri.getEncodedName();
3224     ReentrantLock lock = locker.acquireLock(encodedName);
3225     try {
3226       if (!regionStates.isRegionOnline(hri)) {
3227         RegionState state = regionStates.getRegionState(encodedName);
3228         LOG.info("Ignored moving region not assigned: " + hri + ", "
3229           + (state == null ? "not in region states" : state));
3230         return;
3231       }
3232       synchronized (this.regionPlans) {
3233         this.regionPlans.put(plan.getRegionName(), plan);
3234       }
3235       unassign(hri, false, plan.getDestination());
3236     } finally {
3237       lock.unlock();
3238     }
3239   }
3240 
3241   public void stop() {
3242     shutdown(); // Stop executor service, etc
3243     if (tomActivated){
3244       this.timeoutMonitor.interrupt();
3245       this.timerUpdater.interrupt();
3246     }
3247   }
3248 
3249   /**
3250    * Shutdown the threadpool executor service
3251    */
3252   public void shutdown() {
3253     // It's an immediate shutdown, so we're clearing the remaining tasks.
3254     synchronized (zkEventWorkerWaitingList){
3255       zkEventWorkerWaitingList.clear();
3256     }
3257     threadPoolExecutorService.shutdownNow();
3258     zkEventWorkers.shutdownNow();
3259   }
3260 
3261   protected void setEnabledTable(TableName tableName) {
3262     try {
3263       this.zkTable.setEnabledTable(tableName);
3264     } catch (KeeperException e) {
3265       // here we can abort as it is the start up flow
3266       String errorMsg = "Unable to ensure that the table " + tableName
3267           + " will be" + " enabled because of a ZooKeeper issue";
3268       LOG.error(errorMsg);
3269       this.server.abort(errorMsg, e);
3270     }
3271   }
3272 
3273   /**
3274    * Set region as OFFLINED up in zookeeper asynchronously.
3275    * @param state
3276    * @return True if we succeeded, false otherwise (State was incorrect or failed
3277    * updating zk).
3278    */
3279   private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3280       final AsyncCallback.StringCallback cb, final ServerName destination) {
3281     if (!state.isClosed() && !state.isOffline()) {
3282       this.server.abort("Unexpected state trying to OFFLINE; " + state,
3283         new IllegalStateException());
3284       return false;
3285     }
3286     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3287     try {
3288       ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3289         destination, cb, state);
3290     } catch (KeeperException e) {
3291       if (e instanceof NodeExistsException) {
3292         LOG.warn("Node for " + state.getRegion() + " already exists");
3293       } else {
3294         server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3295       }
3296       return false;
3297     }
3298     return true;
3299   }
3300 
3301   private boolean deleteNodeInStates(String encodedName,
3302       String desc, ServerName sn, EventType... types) {
3303     try {
3304       for (EventType et: types) {
3305         if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3306           return true;
3307         }
3308       }
3309       LOG.info("Failed to delete the " + desc + " node for "
3310         + encodedName + ". The node type may not match");
3311     } catch (NoNodeException e) {
3312       if (LOG.isDebugEnabled()) {
3313         LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3314       }
3315     } catch (KeeperException ke) {
3316       server.abort("Unexpected ZK exception deleting " + desc
3317         + " node for the region " + encodedName, ke);
3318     }
3319     return false;
3320   }
3321 
3322   private void deleteMergingNode(String encodedName, ServerName sn) {
3323     deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3324       EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3325   }
3326 
3327   private void deleteSplittingNode(String encodedName, ServerName sn) {
3328     deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3329       EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3330   }
3331 
3332   /**
3333    * A helper to handle region merging transition event.
3334    * It transitions merging regions to MERGING state.
3335    */
3336   private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3337       final String prettyPrintedRegionName, final ServerName sn) {
3338     if (!serverManager.isServerOnline(sn)) {
3339       LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3340       return false;
3341     }
3342     byte [] payloadOfMerging = rt.getPayload();
3343     List<HRegionInfo> mergingRegions;
3344     try {
3345       mergingRegions = HRegionInfo.parseDelimitedFrom(
3346         payloadOfMerging, 0, payloadOfMerging.length);
3347     } catch (IOException e) {
3348       LOG.error("Dropped merging! Failed reading "  + rt.getEventType()
3349         + " payload for " + prettyPrintedRegionName);
3350       return false;
3351     }
3352     assert mergingRegions.size() == 3;
3353     HRegionInfo p = mergingRegions.get(0);
3354     HRegionInfo hri_a = mergingRegions.get(1);
3355     HRegionInfo hri_b = mergingRegions.get(2);
3356 
3357     RegionState rs_p = regionStates.getRegionState(p);
3358     RegionState rs_a = regionStates.getRegionState(hri_a);
3359     RegionState rs_b = regionStates.getRegionState(hri_b);
3360 
3361     if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
3362         && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
3363         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3364       LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
3365         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3366       return false;
3367     }
3368 
3369     EventType et = rt.getEventType();
3370     if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
3371       try {
3372         if (RegionMergeTransaction.transitionMergingNode(watcher, p,
3373             hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_MERGE,
3374             EventType.RS_ZK_REGION_MERGING) == -1) {
3375           byte[] data = ZKAssign.getData(watcher, encodedName);
3376           EventType currentType = null;
3377           if (data != null) {
3378             RegionTransition newRt = RegionTransition.parseFrom(data);
3379             currentType = newRt.getEventType();
3380           }
3381           if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
3382               && currentType != EventType.RS_ZK_REGION_MERGING)) {
3383             LOG.warn("Failed to transition pending_merge node "
3384               + encodedName + " to merging, it's now " + currentType);
3385             return false;
3386           }
3387         }
3388       } catch (Exception e) {
3389         LOG.warn("Failed to transition pending_merge node "
3390           + encodedName + " to merging", e);
3391         return false;
3392       }
3393     }
3394 
3395     synchronized (regionStates) {
3396       regionStates.updateRegionState(hri_a, State.MERGING);
3397       regionStates.updateRegionState(hri_b, State.MERGING);
3398       regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3399 
3400       if (et != EventType.RS_ZK_REGION_MERGED) {
3401         regionStates.regionOffline(p, State.MERGING_NEW);
3402         this.mergingRegions.put(encodedName,
3403           new PairOfSameType<HRegionInfo>(hri_a, hri_b));
3404       } else {
3405         this.mergingRegions.remove(encodedName);
3406         regionOffline(hri_a, State.MERGED);
3407         regionOffline(hri_b, State.MERGED);
3408         regionOnline(p, sn);
3409       }
3410     }
3411 
3412     if (et == EventType.RS_ZK_REGION_MERGED) {
3413       LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
3414       // Remove region from ZK
3415       try {
3416         boolean successful = false;
3417         while (!successful) {
3418           // It's possible that the RS tickles in between the reading of the
3419           // znode and the deleting, so it's safe to retry.
3420           successful = ZKAssign.deleteNode(watcher, encodedName,
3421             EventType.RS_ZK_REGION_MERGED, sn);
3422         }
3423       } catch (KeeperException e) {
3424         if (e instanceof NoNodeException) {
3425           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3426           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
3427         } else {
3428           server.abort("Error deleting MERGED node " + encodedName, e);
3429         }
3430       }
3431       LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
3432         + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
3433         + hri_b.getRegionNameAsString() + ", on " + sn);
3434 
3435       // User could disable the table before master knows the new region.
3436       if (zkTable.isDisablingOrDisabledTable(p.getTable())) {
3437         unassign(p);
3438       }
3439     }
3440     return true;
3441   }
3442 
3443   /**
3444    * A helper to handle region splitting transition event.
3445    */
3446   private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
3447       final String prettyPrintedRegionName, final ServerName sn) {
3448     if (!serverManager.isServerOnline(sn)) {
3449       LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
3450       return false;
3451     }
3452     byte [] payloadOfSplitting = rt.getPayload();
3453     List<HRegionInfo> splittingRegions;
3454     try {
3455       splittingRegions = HRegionInfo.parseDelimitedFrom(
3456         payloadOfSplitting, 0, payloadOfSplitting.length);
3457     } catch (IOException e) {
3458       LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
3459         + " payload for " + prettyPrintedRegionName);
3460       return false;
3461     }
3462     assert splittingRegions.size() == 2;
3463     HRegionInfo hri_a = splittingRegions.get(0);
3464     HRegionInfo hri_b = splittingRegions.get(1);
3465 
3466     RegionState rs_p = regionStates.getRegionState(encodedName);
3467     RegionState rs_a = regionStates.getRegionState(hri_a);
3468     RegionState rs_b = regionStates.getRegionState(hri_b);
3469 
3470     if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
3471         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3472         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3473       LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
3474         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3475       return false;
3476     }
3477 
3478     if (rs_p == null) {
3479       // Splitting region should be online
3480       rs_p = regionStates.updateRegionState(rt, State.OPEN);
3481       if (rs_p == null) {
3482         LOG.warn("Received splitting for region " + prettyPrintedRegionName
3483           + " from server " + sn + " but it doesn't exist anymore,"
3484           + " probably already processed its split");
3485         return false;
3486       }
3487       regionStates.regionOnline(rs_p.getRegion(), sn);
3488     }
3489 
3490     HRegionInfo p = rs_p.getRegion();
3491     EventType et = rt.getEventType();
3492     if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
3493       try {
3494         if (SplitTransaction.transitionSplittingNode(watcher, p,
3495             hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_SPLIT,
3496             EventType.RS_ZK_REGION_SPLITTING) == -1) {
3497           byte[] data = ZKAssign.getData(watcher, encodedName);
3498           EventType currentType = null;
3499           if (data != null) {
3500             RegionTransition newRt = RegionTransition.parseFrom(data);
3501             currentType = newRt.getEventType();
3502           }
3503           if (currentType == null || (currentType != EventType.RS_ZK_REGION_SPLIT
3504               && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
3505             LOG.warn("Failed to transition pending_split node "
3506               + encodedName + " to splitting, it's now " + currentType);
3507             return false;
3508           }
3509         }
3510       } catch (Exception e) {
3511         LOG.warn("Failed to transition pending_split node "
3512           + encodedName + " to splitting", e);
3513         return false;
3514       }
3515     }
3516 
3517     synchronized (regionStates) {
3518       regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
3519       regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
3520       regionStates.regionOffline(hri_a, State.SPLITTING_NEW);
3521       regionStates.regionOffline(hri_b, State.SPLITTING_NEW);
3522       regionStates.updateRegionState(rt, State.SPLITTING);
3523 
3524       // The below is for testing ONLY!  We can't do fault injection easily, so
3525       // resort to this kinda uglyness -- St.Ack 02/25/2011.
3526       if (TEST_SKIP_SPLIT_HANDLING) {
3527         LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
3528         return true; // return true so that the splitting node stays
3529       }
3530 
3531       if (et == EventType.RS_ZK_REGION_SPLIT) {
3532         regionOffline(p, State.SPLIT);
3533         regionOnline(hri_a, sn);
3534         regionOnline(hri_b, sn);
3535       }
3536     }
3537 
3538     if (et == EventType.RS_ZK_REGION_SPLIT) {
3539       LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
3540       // Remove region from ZK
3541       try {
3542         boolean successful = false;
3543         while (!successful) {
3544           // It's possible that the RS tickles in between the reading of the
3545           // znode and the deleting, so it's safe to retry.
3546           successful = ZKAssign.deleteNode(watcher, encodedName,
3547             EventType.RS_ZK_REGION_SPLIT, sn);
3548         }
3549       } catch (KeeperException e) {
3550         if (e instanceof NoNodeException) {
3551           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3552           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
3553         } else {
3554           server.abort("Error deleting SPLIT node " + encodedName, e);
3555         }
3556       }
3557       LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
3558         + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
3559         + hri_b.getRegionNameAsString() + ", on " + sn);
3560 
3561       // User could disable the table before master knows the new region.
3562       if (zkTable.isDisablingOrDisabledTable(p.getTable())) {
3563         unassign(hri_a);
3564         unassign(hri_b);
3565       }
3566     }
3567     return true;
3568   }
3569 
3570   /**
3571    * A region is offline.  The new state should be the specified one,
3572    * if not null.  If the specified state is null, the new state is Offline.
3573    * The specified state can be Split/Merged/Offline/null only.
3574    */
3575   private void regionOffline(final HRegionInfo regionInfo, final State state) {
3576     regionStates.regionOffline(regionInfo, state);
3577     removeClosedRegion(regionInfo);
3578     // remove the region plan as well just in case.
3579     clearRegionPlan(regionInfo);
3580     balancer.regionOffline(regionInfo);
3581 
3582     // Tell our listeners that a region was closed
3583     sendRegionClosedNotification(regionInfo);
3584   }
3585 
3586   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
3587       final ServerName serverName) {
3588     if (!this.listeners.isEmpty()) {
3589       for (AssignmentListener listener : this.listeners) {
3590         listener.regionOpened(regionInfo, serverName);
3591       }
3592     }
3593   }
3594 
3595   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
3596     if (!this.listeners.isEmpty()) {
3597       for (AssignmentListener listener : this.listeners) {
3598         listener.regionClosed(regionInfo);
3599       }
3600     }
3601   }
3602 
3603   /**
3604    * @return Instance of load balancer
3605    */
3606   public LoadBalancer getBalancer() {
3607     return this.balancer;
3608   }
3609 }