1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.NavigableMap;
33 import java.util.Random;
34 import java.util.Set;
35 import java.util.TreeMap;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.CopyOnWriteArrayList;
39 import java.util.concurrent.ThreadFactory;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import java.util.concurrent.locks.Lock;
44 import java.util.concurrent.locks.ReentrantLock;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.fs.FileSystem;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.hbase.CoordinatedStateException;
52 import org.apache.hadoop.hbase.HBaseIOException;
53 import org.apache.hadoop.hbase.HConstants;
54 import org.apache.hadoop.hbase.HRegionInfo;
55 import org.apache.hadoop.hbase.HRegionLocation;
56 import org.apache.hadoop.hbase.HTableDescriptor;
57 import org.apache.hadoop.hbase.MetaTableAccessor;
58 import org.apache.hadoop.hbase.NotServingRegionException;
59 import org.apache.hadoop.hbase.RegionLocations;
60 import org.apache.hadoop.hbase.RegionStateListener;
61 import org.apache.hadoop.hbase.RegionTransition;
62 import org.apache.hadoop.hbase.ServerName;
63 import org.apache.hadoop.hbase.TableName;
64 import org.apache.hadoop.hbase.TableNotFoundException;
65 import org.apache.hadoop.hbase.TableStateManager;
66 import org.apache.hadoop.hbase.classification.InterfaceAudience;
67 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
68 import org.apache.hadoop.hbase.client.Result;
69 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
70 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
71 import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
72 import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
73 import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
74 import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
75 import org.apache.hadoop.hbase.exceptions.DeserializationException;
76 import org.apache.hadoop.hbase.executor.EventHandler;
77 import org.apache.hadoop.hbase.executor.EventType;
78 import org.apache.hadoop.hbase.executor.ExecutorService;
79 import org.apache.hadoop.hbase.ipc.FailedServerException;
80 import org.apache.hadoop.hbase.ipc.RpcClient;
81 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
82 import org.apache.hadoop.hbase.master.RegionState.State;
83 import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
84 import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
85 import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
86 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
87 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
88 import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
89 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
90 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
91 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
92 import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
93 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
94 import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
95 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
96 import org.apache.hadoop.hbase.util.ConfigUtil;
97 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
98 import org.apache.hadoop.hbase.util.FSUtils;
99 import org.apache.hadoop.hbase.util.KeyLocker;
100 import org.apache.hadoop.hbase.util.Pair;
101 import org.apache.hadoop.hbase.util.PairOfSameType;
102 import org.apache.hadoop.hbase.util.Threads;
103 import org.apache.hadoop.hbase.util.Triple;
104 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
105 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
106 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
107 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
108 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
109 import org.apache.hadoop.ipc.RemoteException;
110 import org.apache.hadoop.util.StringUtils;
111 import org.apache.zookeeper.AsyncCallback;
112 import org.apache.zookeeper.KeeperException;
113 import org.apache.zookeeper.KeeperException.NoNodeException;
114 import org.apache.zookeeper.KeeperException.NodeExistsException;
115 import org.apache.zookeeper.data.Stat;
116
117 import com.google.common.annotations.VisibleForTesting;
118 import com.google.common.collect.LinkedHashMultimap;
119
120
121
122
123
124
125
126
127 @InterfaceAudience.Private
128 public class AssignmentManager extends ZooKeeperListener {
129 private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
130
131 public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
132 -1, -1L);
133
134 static final String ALREADY_IN_TRANSITION_WAITTIME
135 = "hbase.assignment.already.intransition.waittime";
136 static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000;
137
138 protected final MasterServices server;
139
140 private ServerManager serverManager;
141
142 private boolean shouldAssignRegionsWithFavoredNodes;
143
144 private LoadBalancer balancer;
145
146 private final MetricsAssignmentManager metricsAssignmentManager;
147
148 private final TableLockManager tableLockManager;
149
150 private AtomicInteger numRegionsOpened = new AtomicInteger(0);
151
152 final private KeyLocker<String> locker = new KeyLocker<String>();
153
154 Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
155
156
157
158
159
160 private final Map <String, HRegionInfo> regionsToReopen;
161
162
163
164
165
166 private final int maximumAttempts;
167
168
169
170
171 private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
172 = new HashMap<String, PairOfSameType<HRegionInfo>>();
173
174 private final Map<HRegionInfo, PairOfSameType<HRegionInfo>> splitRegions
175 = new HashMap<HRegionInfo, PairOfSameType<HRegionInfo>>();
176
177
178
179
180
181 private final long sleepTimeBeforeRetryingMetaAssignment;
182
183
184
185
186
187 final NavigableMap<String, RegionPlan> regionPlans =
188 new TreeMap<String, RegionPlan>();
189
190 private final TableStateManager tableStateManager;
191
192 private final ExecutorService executorService;
193
194
195 private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
196
197
198 private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
199
200
201 private java.util.concurrent.ExecutorService threadPoolExecutorService;
202
203
204 private final java.util.concurrent.ExecutorService zkEventWorkers;
205
206 private List<EventType> ignoreStatesRSOffline = Arrays.asList(
207 EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
208
209 private final RegionStates regionStates;
210
211
212
213
214
215 private final int bulkAssignThresholdRegions;
216 private final int bulkAssignThresholdServers;
217 private final int bulkPerRegionOpenTimeGuesstimate;
218
219
220
221
222 private final boolean bulkAssignWaitTillAllAssigned;
223
224
225
226
227
228
229
230
231
232 protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
233
234
235
236
237
238
239
240
241 private final ConcurrentHashMap<String, AtomicInteger>
242 failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
243
244
245 private final boolean useZKForAssignment;
246
247
248
249 private final RegionStateStore regionStateStore;
250
251
252
253
254 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
255 public static boolean TEST_SKIP_SPLIT_HANDLING = false;
256
257
258 private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
259
260 private RegionStateListener regionStateListener;
261
262 public enum ServerHostRegion {
263 NOT_HOSTING_REGION, HOSTING_REGION, UNKNOWN,
264 }
265
266
267
268
269
270
271
272
273
274
275
276
277
278 public AssignmentManager(MasterServices server, ServerManager serverManager,
279 final LoadBalancer balancer,
280 final ExecutorService service, MetricsMaster metricsMaster,
281 final TableLockManager tableLockManager) throws KeeperException,
282 IOException, CoordinatedStateException {
283 super(server.getZooKeeper());
284 this.server = server;
285 this.serverManager = serverManager;
286 this.executorService = service;
287 this.regionStateStore = new RegionStateStore(server);
288 this.regionsToReopen = Collections.synchronizedMap
289 (new HashMap<String, HRegionInfo> ());
290 Configuration conf = server.getConfiguration();
291
292 this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
293 HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
294 FavoredNodeLoadBalancer.class);
295 try {
296 if (server.getCoordinatedStateManager() != null) {
297 this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager();
298 } else {
299 this.tableStateManager = null;
300 }
301 } catch (InterruptedException e) {
302 throw new InterruptedIOException();
303 }
304
305 this.maximumAttempts = Math.max(1,
306 this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
307 this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
308 "hbase.meta.assignment.retry.sleeptime", 1000l);
309 this.balancer = balancer;
310 int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
311 this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
312 maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
313 this.regionStates = new RegionStates(
314 server, tableStateManager, serverManager, regionStateStore);
315
316 this.bulkAssignWaitTillAllAssigned =
317 conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
318 this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
319 this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
320 this.bulkPerRegionOpenTimeGuesstimate =
321 conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
322
323 int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
324 ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
325 zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
326 TimeUnit.SECONDS, threadFactory);
327 this.tableLockManager = tableLockManager;
328
329 this.metricsAssignmentManager = new MetricsAssignmentManager();
330 useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
331 }
332
333
334
335
336
337 public void registerListener(final AssignmentListener listener) {
338 this.listeners.add(listener);
339 }
340
341
342
343
344
345 public boolean unregisterListener(final AssignmentListener listener) {
346 return this.listeners.remove(listener);
347 }
348
349
350
351
352 public TableStateManager getTableStateManager() {
353
354
355 return this.tableStateManager;
356 }
357
358
359
360
361
362
363
364 public RegionStates getRegionStates() {
365 return regionStates;
366 }
367
368
369
370
371 @VisibleForTesting
372 RegionStateStore getRegionStateStore() {
373 return regionStateStore;
374 }
375
376 public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
377 return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
378 }
379
380
381
382
383
384
385 public void addPlan(String encodedName, RegionPlan plan) {
386 synchronized (regionPlans) {
387 regionPlans.put(encodedName, plan);
388 }
389 }
390
391
392
393
394 public void addPlans(Map<String, RegionPlan> plans) {
395 synchronized (regionPlans) {
396 regionPlans.putAll(plans);
397 }
398 }
399
400
401
402
403
404
405
406
407 public void setRegionsToReopen(List <HRegionInfo> regions) {
408 for(HRegionInfo hri : regions) {
409 regionsToReopen.put(hri.getEncodedName(), hri);
410 }
411 }
412
413
414
415
416
417
418
419
420 public Pair<Integer, Integer> getReopenStatus(TableName tableName)
421 throws IOException {
422 List<HRegionInfo> hris;
423 if (TableName.META_TABLE_NAME.equals(tableName)) {
424 hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
425 } else {
426 hris = MetaTableAccessor.getTableRegions(server.getZooKeeper(),
427 server.getConnection(), tableName, true);
428 }
429
430 Integer pending = 0;
431 for (HRegionInfo hri : hris) {
432 String name = hri.getEncodedName();
433
434 if (regionsToReopen.containsKey(name)
435 || regionStates.isRegionInTransition(name)) {
436 pending++;
437 }
438 }
439 return new Pair<Integer, Integer>(pending, hris.size());
440 }
441
442
443
444
445
446
447 public boolean isFailoverCleanupDone() {
448 return failoverCleanupDone.get();
449 }
450
451
452
453
454
455 public Lock acquireRegionLock(final String encodedName) {
456 return locker.acquireLock(encodedName);
457 }
458
459
460
461
462
463 void failoverCleanupDone() {
464 failoverCleanupDone.set(true);
465 serverManager.processQueuedDeadServers();
466 }
467
468
469
470
471
472
473
474
475
476 void joinCluster() throws IOException,
477 KeeperException, InterruptedException, CoordinatedStateException {
478 long startTime = System.currentTimeMillis();
479
480
481
482
483
484
485
486
487
488
489
490 Set<ServerName> deadServers = rebuildUserRegions();
491
492
493
494 boolean failover = processDeadServersAndRegionsInTransition(deadServers);
495
496 if (!useZKForAssignment) {
497
498 ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode);
499 }
500 recoverTableInDisablingState();
501 recoverTableInEnablingState();
502 LOG.info("Joined the cluster in " + (System.currentTimeMillis()
503 - startTime) + "ms, failover=" + failover);
504 }
505
506
507
508
509
510
511
512
513
514
515
516
517 boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
518 throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
519 List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
520
521 if (useZKForAssignment && nodes == null) {
522 String errorMessage = "Failed to get the children from ZK";
523 server.abort(errorMessage, new IOException(errorMessage));
524 return true;
525 }
526
527 boolean failover = !serverManager.getDeadServers().isEmpty();
528 if (failover) {
529
530 if (LOG.isDebugEnabled()) {
531 LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
532 }
533 } else {
534
535 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
536 for (Map.Entry<HRegionInfo, ServerName> en:
537 regionStates.getRegionAssignments().entrySet()) {
538 HRegionInfo hri = en.getKey();
539 if (!hri.isMetaTable()
540 && onlineServers.contains(en.getValue())) {
541 LOG.debug("Found " + hri + " out on cluster");
542 failover = true;
543 break;
544 }
545 }
546 if (!failover && nodes != null) {
547
548 for (String encodedName: nodes) {
549 RegionState regionState = regionStates.getRegionState(encodedName);
550 if (regionState != null && !regionState.getRegion().isMetaRegion()) {
551 LOG.debug("Found " + regionState + " in RITs");
552 failover = true;
553 break;
554 }
555 }
556 }
557 }
558 if (!failover && !useZKForAssignment) {
559
560 Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
561 if (!regionsInTransition.isEmpty()) {
562 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
563 for (RegionState regionState: regionsInTransition.values()) {
564 ServerName serverName = regionState.getServerName();
565 if (!regionState.getRegion().isMetaRegion()
566 && serverName != null && onlineServers.contains(serverName)) {
567 LOG.debug("Found " + regionState + " in RITs");
568 failover = true;
569 break;
570 }
571 }
572 }
573 }
574 if (!failover) {
575
576
577
578
579 Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
580 if (!queuedDeadServers.isEmpty()) {
581 Configuration conf = server.getConfiguration();
582 Path rootdir = FSUtils.getRootDir(conf);
583 FileSystem fs = rootdir.getFileSystem(conf);
584 for (ServerName serverName: queuedDeadServers) {
585
586
587 Path logDir = new Path(rootdir,
588 DefaultWALProvider.getWALDirectoryName(serverName.toString()));
589 Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
590 if (fs.exists(logDir) || fs.exists(splitDir)) {
591 LOG.debug("Found queued dead server " + serverName);
592 failover = true;
593 break;
594 }
595 }
596 if (!failover) {
597
598
599 LOG.info("AM figured that it's not a failover and cleaned up "
600 + queuedDeadServers.size() + " queued dead servers");
601 serverManager.removeRequeuedDeadServers();
602 }
603 }
604 }
605
606 Set<TableName> disabledOrDisablingOrEnabling = null;
607 Map<HRegionInfo, ServerName> allRegions = null;
608
609 if (!failover) {
610 disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
611 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING,
612 ZooKeeperProtos.Table.State.ENABLING);
613
614
615 allRegions = regionStates.closeAllUserRegions(
616 disabledOrDisablingOrEnabling);
617 }
618
619
620 regionStateStore.start();
621
622
623 if (failover) {
624 LOG.info("Found regions out on cluster or in RIT; presuming failover");
625
626
627 processDeadServersAndRecoverLostRegions(deadServers);
628 }
629
630 if (!failover && useZKForAssignment) {
631
632 ZKAssign.deleteAllNodes(watcher);
633 ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
634 this.watcher.assignmentZNode);
635 }
636
637
638
639
640
641 failoverCleanupDone();
642 if (!failover) {
643
644 LOG.info("Clean cluster startup. Assigning user regions");
645 assignAllUserRegions(allRegions);
646 }
647
648
649
650 for (HRegionInfo h : replicasToClose) {
651 unassign(h);
652 }
653 replicasToClose.clear();
654 return failover;
655 }
656
657
658
659
660
661
662
663
664
665
666
667
668 boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
669 throws InterruptedException, KeeperException, IOException {
670 String encodedRegionName = hri.getEncodedName();
671 if (!processRegionInTransition(encodedRegionName, hri)) {
672 return false;
673 }
674 LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
675 while (!this.server.isStopped() &&
676 this.regionStates.isRegionInTransition(encodedRegionName)) {
677 RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
678 if (state == null || !serverManager.isServerOnline(state.getServerName())) {
679
680
681
682 break;
683 }
684 this.regionStates.waitForUpdate(100);
685 }
686 return true;
687 }
688
689
690
691
692
693
694
695
696
697
698 boolean processRegionInTransition(final String encodedRegionName,
699 final HRegionInfo regionInfo) throws KeeperException, IOException {
700
701
702
703
704 Lock lock = locker.acquireLock(encodedRegionName);
705 try {
706 Stat stat = new Stat();
707 byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
708 if (data == null) return false;
709 RegionTransition rt;
710 try {
711 rt = RegionTransition.parseFrom(data);
712 } catch (DeserializationException e) {
713 LOG.warn("Failed parse znode data", e);
714 return false;
715 }
716 HRegionInfo hri = regionInfo;
717 if (hri == null) {
718
719
720
721
722
723 hri = regionStates.getRegionInfo(rt.getRegionName());
724 EventType et = rt.getEventType();
725 if (hri == null && et != EventType.RS_ZK_REGION_MERGING
726 && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
727 LOG.warn("Couldn't find the region in recovering " + rt);
728 return false;
729 }
730 }
731
732
733
734 BaseCoordinatedStateManager cp =
735 (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager();
736 OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination();
737
738 ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
739 new ZkOpenRegionCoordination.ZkOpenRegionDetails();
740 zkOrd.setVersion(stat.getVersion());
741 zkOrd.setServerName(cp.getServer().getServerName());
742
743 return processRegionsInTransition(
744 rt, hri, openRegionCoordination, zkOrd);
745 } finally {
746 lock.unlock();
747 }
748 }
749
750
751
752
753
754
755
756
757
758 boolean processRegionsInTransition(
759 final RegionTransition rt, final HRegionInfo regionInfo,
760 OpenRegionCoordination coordination,
761 final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException {
762 EventType et = rt.getEventType();
763
764 final ServerName sn = rt.getServerName();
765 final byte[] regionName = rt.getRegionName();
766 final String encodedName = HRegionInfo.encodeRegionName(regionName);
767 final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
768 LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
769
770 if (regionStates.isRegionInTransition(encodedName)
771 && (regionInfo.isMetaRegion() || !useZKForAssignment)) {
772 LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
773 + et + ", does nothing since the region is already in transition "
774 + regionStates.getRegionTransitionState(encodedName));
775
776 return true;
777 }
778 if (!serverManager.isServerOnline(sn)) {
779
780
781
782 LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
783 " was on deadserver; forcing offline");
784 if (regionStates.isRegionOnline(regionInfo)) {
785
786
787
788 regionStates.regionOffline(regionInfo);
789 sendRegionClosedNotification(regionInfo);
790 }
791
792 regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
793
794 if (regionInfo.isMetaRegion()) {
795
796
797 MetaTableLocator.setMetaLocation(watcher, sn, State.OPEN);
798 } else {
799
800
801 regionStates.setLastRegionServerOfRegion(sn, encodedName);
802
803 if (!serverManager.isServerDead(sn)) {
804 serverManager.expireServer(sn);
805 }
806 }
807 return false;
808 }
809 switch (et) {
810 case M_ZK_REGION_CLOSING:
811
812
813 final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
814 this.executorService.submit(
815 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
816 @Override
817 public void process() throws IOException {
818 ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
819 try {
820 final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
821 .getVersion();
822 unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
823 if (regionStates.isRegionOffline(regionInfo)) {
824 assign(regionInfo, true);
825 }
826 } finally {
827 lock.unlock();
828 }
829 }
830 });
831 break;
832
833 case RS_ZK_REGION_CLOSED:
834 case RS_ZK_REGION_FAILED_OPEN:
835
836 regionStates.setLastRegionServerOfRegion(sn, encodedName);
837 regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
838 if (!replicasToClose.contains(regionInfo)) {
839 invokeAssign(regionInfo);
840 } else {
841 offlineDisabledRegion(regionInfo);
842 }
843 break;
844
845 case M_ZK_REGION_OFFLINE:
846
847 regionStates.updateRegionState(rt, State.PENDING_OPEN);
848 final RegionState rsOffline = regionStates.getRegionState(regionInfo);
849 this.executorService.submit(
850 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
851 @Override
852 public void process() throws IOException {
853 ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
854 try {
855 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
856 addPlan(encodedName, plan);
857 assign(rsOffline, false, false);
858 } finally {
859 lock.unlock();
860 }
861 }
862 });
863 break;
864
865 case RS_ZK_REGION_OPENING:
866 regionStates.updateRegionState(rt, State.OPENING);
867 break;
868
869 case RS_ZK_REGION_OPENED:
870
871
872
873 regionStates.updateRegionState(rt, State.OPEN);
874 new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process();
875 break;
876 case RS_ZK_REQUEST_REGION_SPLIT:
877 case RS_ZK_REGION_SPLITTING:
878 case RS_ZK_REGION_SPLIT:
879
880
881
882 regionStates.regionOnline(regionInfo, sn);
883 regionStates.updateRegionState(rt, State.SPLITTING);
884 if (!handleRegionSplitting(
885 rt, encodedName, prettyPrintedRegionName, sn)) {
886 deleteSplittingNode(encodedName, sn);
887 }
888 break;
889 case RS_ZK_REQUEST_REGION_MERGE:
890 case RS_ZK_REGION_MERGING:
891 case RS_ZK_REGION_MERGED:
892 if (!handleRegionMerging(
893 rt, encodedName, prettyPrintedRegionName, sn)) {
894 deleteMergingNode(encodedName, sn);
895 }
896 break;
897 default:
898 throw new IllegalStateException("Received region in state:" + et + " is not valid.");
899 }
900 LOG.info("Processed region " + prettyPrintedRegionName + " in state "
901 + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
902 + "server: " + sn);
903 return true;
904 }
905
906
907
908
909
910 public void removeClosedRegion(HRegionInfo hri) {
911 if (regionsToReopen.remove(hri.getEncodedName()) != null) {
912 LOG.debug("Removed region from reopening regions because it was closed");
913 }
914 }
915
916
917
918
919
920
921
922
923
924
925
926
927 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
928 value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
929 justification="Needs work; says access to ConcurrentHashMaps not ATOMIC!!!")
930 void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
931 OpenRegionCoordination.OpenRegionDetails ord) {
932 if (rt == null) {
933 LOG.warn("Unexpected NULL input for RegionTransition rt");
934 return;
935 }
936 final ServerName sn = rt.getServerName();
937
938 if (sn.equals(HBCK_CODE_SERVERNAME)) {
939 handleHBCK(rt);
940 return;
941 }
942 final long createTime = rt.getCreateTime();
943 final byte[] regionName = rt.getRegionName();
944 String encodedName = HRegionInfo.encodeRegionName(regionName);
945 String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
946
947 if (!serverManager.isServerOnline(sn)
948 && !ignoreStatesRSOffline.contains(rt.getEventType())) {
949 LOG.warn("Attempted to handle region transition for server but " +
950 "it is not online: " + prettyPrintedRegionName + ", " + rt);
951 return;
952 }
953
954 RegionState regionState =
955 regionStates.getRegionState(encodedName);
956 long startTime = System.currentTimeMillis();
957 if (LOG.isDebugEnabled()) {
958 boolean lateEvent = createTime < (startTime - 15000);
959 LOG.debug("Handling " + rt.getEventType() +
960 ", server=" + sn + ", region=" +
961 (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
962 (lateEvent ? ", which is more than 15 seconds late" : "") +
963 ", current_state=" + regionState);
964 }
965
966
967 if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
968 return;
969 }
970
971
972 Lock lock = locker.acquireLock(encodedName);
973 try {
974 RegionState latestState =
975 regionStates.getRegionState(encodedName);
976 if ((regionState == null && latestState != null)
977 || (regionState != null && latestState == null)
978 || (regionState != null && latestState != null
979 && latestState.getState() != regionState.getState())) {
980 LOG.warn("Region state changed from " + regionState + " to "
981 + latestState + ", while acquiring lock");
982 }
983 long waitedTime = System.currentTimeMillis() - startTime;
984 if (waitedTime > 5000) {
985 LOG.warn("Took " + waitedTime + "ms to acquire the lock");
986 }
987 regionState = latestState;
988 switch (rt.getEventType()) {
989 case RS_ZK_REQUEST_REGION_SPLIT:
990 case RS_ZK_REGION_SPLITTING:
991 case RS_ZK_REGION_SPLIT:
992 if (!handleRegionSplitting(
993 rt, encodedName, prettyPrintedRegionName, sn)) {
994 deleteSplittingNode(encodedName, sn);
995 }
996 break;
997
998 case RS_ZK_REQUEST_REGION_MERGE:
999 case RS_ZK_REGION_MERGING:
1000 case RS_ZK_REGION_MERGED:
1001
1002
1003 if (!handleRegionMerging(
1004 rt, encodedName, prettyPrintedRegionName, sn)) {
1005 deleteMergingNode(encodedName, sn);
1006 }
1007 break;
1008
1009 case M_ZK_REGION_CLOSING:
1010
1011
1012 if (regionState == null
1013 || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1014 LOG.warn("Received CLOSING for " + prettyPrintedRegionName
1015 + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1016 + regionStates.getRegionState(encodedName));
1017 return;
1018 }
1019
1020 regionStates.updateRegionState(rt, State.CLOSING);
1021 break;
1022
1023 case RS_ZK_REGION_CLOSED:
1024
1025 if (regionState == null
1026 || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1027 LOG.warn("Received CLOSED for " + prettyPrintedRegionName
1028 + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1029 + regionStates.getRegionState(encodedName));
1030 return;
1031 }
1032
1033
1034
1035 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1036 updateClosedRegionHandlerTracker(regionState.getRegion());
1037 break;
1038
1039 case RS_ZK_REGION_FAILED_OPEN:
1040 if (regionState == null
1041 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1042 LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
1043 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1044 + regionStates.getRegionState(encodedName));
1045 return;
1046 }
1047 AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
1048 if (failedOpenCount == null) {
1049 failedOpenCount = new AtomicInteger();
1050
1051
1052
1053
1054 failedOpenTracker.put(encodedName, failedOpenCount);
1055 }
1056 if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
1057
1058 regionStates.updateRegionState(rt, State.FAILED_OPEN);
1059
1060
1061 failedOpenTracker.remove(encodedName);
1062 } else {
1063
1064 regionState = regionStates.updateRegionState(rt, State.CLOSED);
1065 if (regionState != null) {
1066
1067
1068 try {
1069 getRegionPlan(regionState.getRegion(), sn, true);
1070 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1071 } catch (HBaseIOException e) {
1072 LOG.warn("Failed to get region plan", e);
1073 }
1074 }
1075 }
1076 break;
1077
1078 case RS_ZK_REGION_OPENING:
1079
1080
1081 if (regionState == null
1082 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1083 LOG.warn("Received OPENING for " + prettyPrintedRegionName
1084 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1085 + regionStates.getRegionState(encodedName));
1086 return;
1087 }
1088
1089 regionStates.updateRegionState(rt, State.OPENING);
1090 break;
1091
1092 case RS_ZK_REGION_OPENED:
1093
1094 if (regionState == null
1095 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1096 LOG.warn("Received OPENED for " + prettyPrintedRegionName
1097 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1098 + regionStates.getRegionState(encodedName));
1099
1100 if (regionState != null) {
1101
1102
1103
1104 unassign(regionState.getRegion(), null, -1, null, false, sn);
1105 }
1106 return;
1107 }
1108
1109 regionState =
1110 regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn);
1111 if (regionState != null) {
1112 failedOpenTracker.remove(encodedName);
1113 new OpenedRegionHandler(
1114 server, this, regionState.getRegion(), coordination, ord).process();
1115 updateOpenedRegionHandlerTracker(regionState.getRegion());
1116 }
1117 break;
1118
1119 default:
1120 throw new IllegalStateException("Received event is not valid.");
1121 }
1122 } finally {
1123 lock.unlock();
1124 }
1125 }
1126
1127
1128 boolean wasClosedHandlerCalled(HRegionInfo hri) {
1129 AtomicBoolean b = closedRegionHandlerCalled.get(hri);
1130
1131
1132
1133 return b == null ? false : b.compareAndSet(true, false);
1134 }
1135
1136
1137 boolean wasOpenedHandlerCalled(HRegionInfo hri) {
1138 AtomicBoolean b = openedRegionHandlerCalled.get(hri);
1139
1140
1141
1142 return b == null ? false : b.compareAndSet(true, false);
1143 }
1144
1145
1146 void initializeHandlerTrackers() {
1147 closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1148 openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1149 }
1150
1151 void updateClosedRegionHandlerTracker(HRegionInfo hri) {
1152 if (closedRegionHandlerCalled != null) {
1153 closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1154 }
1155 }
1156
1157 void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
1158 if (openedRegionHandlerCalled != null) {
1159 openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1160 }
1161 }
1162
1163
1164
1165
1166
1167
1168 void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1169 if (!shouldAssignRegionsWithFavoredNodes) return;
1170
1171
1172 Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1173 new HashMap<HRegionInfo, List<ServerName>>();
1174 for (HRegionInfo region : regions) {
1175 regionToFavoredNodes.put(region,
1176 ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1177 }
1178 FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
1179 this.server.getConnection());
1180 }
1181
1182
1183
1184
1185
1186
1187
1188 @SuppressWarnings("deprecation")
1189 private void handleHBCK(RegionTransition rt) {
1190 String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1191 LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1192 ", server=" + rt.getServerName() + ", region=" +
1193 HRegionInfo.prettyPrint(encodedName));
1194 RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1195 switch (rt.getEventType()) {
1196 case M_ZK_REGION_OFFLINE:
1197 HRegionInfo regionInfo;
1198 if (regionState != null) {
1199 regionInfo = regionState.getRegion();
1200 } else {
1201 try {
1202 byte [] name = rt.getRegionName();
1203 Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1204 this.server.getConnection(), name);
1205 regionInfo = p.getFirst();
1206 } catch (IOException e) {
1207 LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1208 return;
1209 }
1210 }
1211 LOG.info("HBCK repair is triggering assignment of region=" +
1212 regionInfo.getRegionNameAsString());
1213
1214 assign(regionInfo, false);
1215 break;
1216
1217 default:
1218 LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1219 break;
1220 }
1221
1222 }
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238 @Override
1239 public void nodeCreated(String path) {
1240 handleAssignmentEvent(path);
1241 }
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255 @Override
1256 public void nodeDataChanged(String path) {
1257 handleAssignmentEvent(path);
1258 }
1259
1260
1261
1262
1263
1264 private final Set<String> regionsInProgress = new HashSet<String>();
1265
1266
1267 private final LinkedHashMultimap <String, RegionRunnable>
1268 zkEventWorkerWaitingList = LinkedHashMultimap.create();
1269
1270
1271
1272
1273 private interface RegionRunnable extends Runnable{
1274
1275
1276
1277 String getRegionName();
1278 }
1279
1280
1281
1282
1283
1284 protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1285
1286 synchronized (regionsInProgress) {
1287
1288
1289 if (regionsInProgress.contains(regRunnable.getRegionName())) {
1290 synchronized (zkEventWorkerWaitingList){
1291 zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1292 }
1293 return;
1294 }
1295
1296
1297 regionsInProgress.add(regRunnable.getRegionName());
1298 zkEventWorkers.submit(new Runnable() {
1299 @Override
1300 public void run() {
1301 try {
1302 regRunnable.run();
1303 } finally {
1304
1305
1306 synchronized (regionsInProgress) {
1307 regionsInProgress.remove(regRunnable.getRegionName());
1308 synchronized (zkEventWorkerWaitingList) {
1309 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1310 regRunnable.getRegionName());
1311 if (!waiting.isEmpty()) {
1312
1313 RegionRunnable toSubmit = waiting.iterator().next();
1314 zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1315 zkEventWorkersSubmit(toSubmit);
1316 }
1317 }
1318 }
1319 }
1320 }
1321 });
1322 }
1323 }
1324
1325 @Override
1326 public void nodeDeleted(final String path) {
1327 if (path.startsWith(watcher.assignmentZNode)) {
1328 final String regionName = ZKAssign.getRegionName(watcher, path);
1329 zkEventWorkersSubmit(new RegionRunnable() {
1330 @Override
1331 public String getRegionName() {
1332 return regionName;
1333 }
1334
1335 @Override
1336 public void run() {
1337 Lock lock = locker.acquireLock(regionName);
1338 try {
1339 RegionState rs = regionStates.getRegionTransitionState(regionName);
1340 if (rs == null) {
1341 rs = regionStates.getRegionState(regionName);
1342 if (rs == null || !rs.isMergingNew()) {
1343
1344 return;
1345 }
1346 }
1347
1348 HRegionInfo regionInfo = rs.getRegion();
1349 String regionNameStr = regionInfo.getRegionNameAsString();
1350 LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1351
1352 boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(),
1353 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
1354
1355 ServerName serverName = rs.getServerName();
1356 if (serverManager.isServerOnline(serverName)) {
1357 if (rs.isOnServer(serverName) && (rs.isOpened() || rs.isSplitting())) {
1358 synchronized (regionStates) {
1359 regionOnline(regionInfo, serverName);
1360 if (rs.isSplitting() && splitRegions.containsKey(regionInfo)) {
1361
1362
1363 HRegionInfo hri_a = splitRegions.get(regionInfo).getFirst();
1364 HRegionInfo hri_b = splitRegions.get(regionInfo).getSecond();
1365 if (!regionStates.isRegionInTransition(hri_a.getEncodedName())) {
1366 LOG.warn("Split daughter region not in transition " + hri_a);
1367 }
1368 if (!regionStates.isRegionInTransition(hri_b.getEncodedName())) {
1369 LOG.warn("Split daughter region not in transition" + hri_b);
1370 }
1371 regionOffline(hri_a);
1372 regionOffline(hri_b);
1373 splitRegions.remove(regionInfo);
1374 }
1375 if (disabled) {
1376
1377 LOG.info("Opened " + regionNameStr
1378 + "but this table is disabled, triggering close of region");
1379 unassign(regionInfo);
1380 }
1381 }
1382 } else if (rs.isMergingNew()) {
1383 synchronized (regionStates) {
1384 String p = regionInfo.getEncodedName();
1385 PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1386 if (regions != null) {
1387 onlineMergingRegion(disabled, regions.getFirst(), serverName);
1388 onlineMergingRegion(disabled, regions.getSecond(), serverName);
1389 }
1390 }
1391 }
1392 }
1393 } finally {
1394 lock.unlock();
1395 }
1396 }
1397
1398 private void onlineMergingRegion(boolean disabled,
1399 final HRegionInfo hri, final ServerName serverName) {
1400 RegionState regionState = regionStates.getRegionState(hri);
1401 if (regionState != null && regionState.isMerging()
1402 && regionState.isOnServer(serverName)) {
1403 regionOnline(regionState.getRegion(), serverName);
1404 if (disabled) {
1405 unassign(hri);
1406 }
1407 }
1408 }
1409 });
1410 }
1411 }
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425 @Override
1426 public void nodeChildrenChanged(String path) {
1427 if (path.equals(watcher.assignmentZNode)) {
1428 zkEventWorkers.submit(new Runnable() {
1429 @Override
1430 public void run() {
1431 try {
1432
1433 List<String> children =
1434 ZKUtil.listChildrenAndWatchForNewChildren(
1435 watcher, watcher.assignmentZNode);
1436 if (children != null) {
1437 Stat stat = new Stat();
1438 for (String child : children) {
1439
1440
1441
1442 if (!regionStates.isRegionInTransition(child)) {
1443 ZKAssign.getDataAndWatch(watcher, child, stat);
1444 }
1445 }
1446 }
1447 } catch (KeeperException e) {
1448 server.abort("Unexpected ZK exception reading unassigned children", e);
1449 }
1450 }
1451 });
1452 }
1453 }
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464 void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1465 regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
1466 }
1467
1468 void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
1469 numRegionsOpened.incrementAndGet();
1470 regionStates.regionOnline(regionInfo, sn, openSeqNum);
1471
1472
1473 clearRegionPlan(regionInfo);
1474 balancer.regionOnline(regionInfo, sn);
1475
1476
1477 sendRegionOpenedNotification(regionInfo, sn);
1478 }
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488 private void handleAssignmentEvent(final String path) {
1489 if (path.startsWith(watcher.assignmentZNode)) {
1490 final String regionName = ZKAssign.getRegionName(watcher, path);
1491
1492 zkEventWorkersSubmit(new RegionRunnable() {
1493 @Override
1494 public String getRegionName() {
1495 return regionName;
1496 }
1497
1498 @Override
1499 public void run() {
1500 try {
1501 Stat stat = new Stat();
1502 byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1503 if (data == null) return;
1504
1505 RegionTransition rt = RegionTransition.parseFrom(data);
1506
1507
1508
1509 BaseCoordinatedStateManager csm =
1510 (BaseCoordinatedStateManager) server.getCoordinatedStateManager();
1511 OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination();
1512
1513 ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
1514 new ZkOpenRegionCoordination.ZkOpenRegionDetails();
1515 zkOrd.setVersion(stat.getVersion());
1516 zkOrd.setServerName(csm.getServer().getServerName());
1517
1518 handleRegion(rt, openRegionCoordination, zkOrd);
1519 } catch (KeeperException e) {
1520 server.abort("Unexpected ZK exception reading unassigned node data", e);
1521 } catch (DeserializationException e) {
1522 server.abort("Unexpected exception deserializing node data", e);
1523 }
1524 }
1525 });
1526 }
1527 }
1528
1529
1530
1531
1532
1533
1534
1535
1536 public void regionOffline(final HRegionInfo regionInfo) {
1537 regionOffline(regionInfo, null);
1538 }
1539
1540 public void offlineDisabledRegion(HRegionInfo regionInfo) {
1541 if (useZKForAssignment) {
1542
1543 LOG.debug("Table being disabled so deleting ZK node and removing from " +
1544 "regions in transition, skipping assignment of region " +
1545 regionInfo.getRegionNameAsString());
1546 String encodedName = regionInfo.getEncodedName();
1547 deleteNodeInStates(encodedName, "closed", null,
1548 EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1549 }
1550 replicasToClose.remove(regionInfo);
1551 regionOffline(regionInfo);
1552 }
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574 public void assign(HRegionInfo region, boolean setOfflineInZK) {
1575 assign(region, setOfflineInZK, false);
1576 }
1577
1578
1579
1580
1581 public void assign(HRegionInfo region,
1582 boolean setOfflineInZK, boolean forceNewPlan) {
1583 if (isDisabledorDisablingRegionInRIT(region)) {
1584 return;
1585 }
1586 String encodedName = region.getEncodedName();
1587 Lock lock = locker.acquireLock(encodedName);
1588 try {
1589 RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1590 if (state != null) {
1591 if (regionStates.wasRegionOnDeadServer(encodedName)) {
1592 LOG.info("Skip assigning " + region.getRegionNameAsString()
1593 + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1594 + " is dead but not processed yet");
1595 return;
1596 }
1597 assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
1598 }
1599 } finally {
1600 lock.unlock();
1601 }
1602 }
1603
1604
1605
1606
1607
1608
1609
1610 boolean assign(final ServerName destination, final List<HRegionInfo> regions)
1611 throws InterruptedException {
1612 long startTime = EnvironmentEdgeManager.currentTime();
1613 try {
1614 int regionCount = regions.size();
1615 if (regionCount == 0) {
1616 return true;
1617 }
1618 LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
1619 Set<String> encodedNames = new HashSet<String>(regionCount);
1620 for (HRegionInfo region : regions) {
1621 encodedNames.add(region.getEncodedName());
1622 }
1623
1624 List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1625 Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1626 try {
1627 AtomicInteger counter = new AtomicInteger(0);
1628 Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1629 OfflineCallback cb = new OfflineCallback(
1630 watcher, destination, counter, offlineNodesVersions);
1631 Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1632 List<RegionState> states = new ArrayList<RegionState>(regions.size());
1633 for (HRegionInfo region : regions) {
1634 String encodedName = region.getEncodedName();
1635 if (!isDisabledorDisablingRegionInRIT(region)) {
1636 RegionState state = forceRegionStateToOffline(region, false);
1637 boolean onDeadServer = false;
1638 if (state != null) {
1639 if (regionStates.wasRegionOnDeadServer(encodedName)) {
1640 LOG.info("Skip assigning " + region.getRegionNameAsString()
1641 + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1642 + " is dead but not processed yet");
1643 onDeadServer = true;
1644 } else if (!useZKForAssignment
1645 || asyncSetOfflineInZooKeeper(state, cb, destination)) {
1646 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1647 plans.put(encodedName, plan);
1648 states.add(state);
1649 continue;
1650 }
1651 }
1652
1653 if (!onDeadServer) {
1654 LOG.info("failed to force region state to offline or "
1655 + "failed to set it offline in ZK, will reassign later: " + region);
1656 failedToOpenRegions.add(region);
1657 }
1658 }
1659
1660
1661 Lock lock = locks.remove(encodedName);
1662 lock.unlock();
1663 }
1664
1665 if (useZKForAssignment) {
1666
1667 int total = states.size();
1668 for (int oldCounter = 0; !server.isStopped();) {
1669 int count = counter.get();
1670 if (oldCounter != count) {
1671 LOG.debug(destination.toString() + " unassigned znodes=" + count +
1672 " of total=" + total + "; oldCounter=" + oldCounter);
1673 oldCounter = count;
1674 }
1675 if (count >= total) break;
1676 Thread.sleep(5);
1677 }
1678 }
1679
1680 if (server.isStopped()) {
1681 return false;
1682 }
1683
1684
1685
1686 this.addPlans(plans);
1687
1688 List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1689 new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1690 for (RegionState state: states) {
1691 HRegionInfo region = state.getRegion();
1692 String encodedRegionName = region.getEncodedName();
1693 Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1694 if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
1695 LOG.warn("failed to offline in zookeeper: " + region);
1696 failedToOpenRegions.add(region);
1697 Lock lock = locks.remove(encodedRegionName);
1698 lock.unlock();
1699 } else {
1700 regionStates.updateRegionState(
1701 region, State.PENDING_OPEN, destination);
1702 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1703 if (this.shouldAssignRegionsWithFavoredNodes) {
1704 favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1705 }
1706 regionOpenInfos.add(new Triple<HRegionInfo, Integer, List<ServerName>>(
1707 region, nodeVersion, favoredNodes));
1708 }
1709 }
1710
1711
1712 try {
1713
1714
1715 long maxWaitTime = System.currentTimeMillis() +
1716 this.server.getConfiguration().
1717 getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1718 for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1719 try {
1720
1721 if (regionOpenInfos.isEmpty()) {
1722 break;
1723 }
1724 List<RegionOpeningState> regionOpeningStateList = serverManager
1725 .sendRegionOpen(destination, regionOpenInfos);
1726 if (regionOpeningStateList == null) {
1727
1728 return false;
1729 }
1730 for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1731 RegionOpeningState openingState = regionOpeningStateList.get(k);
1732 if (openingState != RegionOpeningState.OPENED) {
1733 HRegionInfo region = regionOpenInfos.get(k).getFirst();
1734 if (openingState == RegionOpeningState.ALREADY_OPENED) {
1735 processAlreadyOpenedRegion(region, destination);
1736 } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1737
1738 failedToOpenRegions.add(region);
1739 } else {
1740 LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1741 + openingState + " in assigning region " + region);
1742 }
1743 }
1744 }
1745 break;
1746 } catch (IOException e) {
1747 if (e instanceof RemoteException) {
1748 e = ((RemoteException)e).unwrapRemoteException();
1749 }
1750 if (e instanceof RegionServerStoppedException) {
1751 LOG.warn("The region server was shut down, ", e);
1752
1753 return false;
1754 } else if (e instanceof ServerNotRunningYetException) {
1755 long now = System.currentTimeMillis();
1756 if (now < maxWaitTime) {
1757 LOG.debug("Server is not yet up; waiting up to " +
1758 (maxWaitTime - now) + "ms", e);
1759 Thread.sleep(100);
1760 i--;
1761 continue;
1762 }
1763 } else if (e instanceof java.net.SocketTimeoutException
1764 && this.serverManager.isServerOnline(destination)) {
1765
1766
1767
1768
1769 if (LOG.isDebugEnabled()) {
1770 LOG.debug("Bulk assigner openRegion() to " + destination
1771 + " has timed out, but the regions might"
1772 + " already be opened on it.", e);
1773 }
1774
1775 Thread.sleep(100);
1776 i--;
1777 continue;
1778 }
1779 throw e;
1780 }
1781 }
1782 } catch (IOException e) {
1783
1784 LOG.info("Unable to communicate with " + destination
1785 + " in order to assign regions, ", e);
1786 return false;
1787 }
1788 } finally {
1789 for (Lock lock : locks.values()) {
1790 lock.unlock();
1791 }
1792 }
1793
1794 if (!failedToOpenRegions.isEmpty()) {
1795 for (HRegionInfo region : failedToOpenRegions) {
1796 if (!regionStates.isRegionOnline(region)) {
1797 invokeAssign(region);
1798 }
1799 }
1800 }
1801
1802
1803 ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
1804 for (HRegionInfo region: regions) {
1805 if (!region.getTable().isSystemTable()) {
1806 userRegionSet.add(region);
1807 }
1808 }
1809 if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
1810 System.currentTimeMillis())) {
1811 LOG.debug("some user regions are still in transition: " + userRegionSet);
1812 }
1813 LOG.debug("Bulk assigning done for " + destination);
1814 return true;
1815 } finally {
1816 metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
1817 }
1818 }
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830 private void unassign(final HRegionInfo region,
1831 final RegionState state, final int versionOfClosingNode,
1832 final ServerName dest, final boolean transitionInZK,
1833 final ServerName src) {
1834 ServerName server = src;
1835 if (state != null) {
1836 server = state.getServerName();
1837 }
1838 long maxWaitTime = -1;
1839 for (int i = 1; i <= this.maximumAttempts; i++) {
1840 if (this.server.isStopped() || this.server.isAborted()) {
1841 LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1842 return;
1843 }
1844
1845 if (!serverManager.isServerOnline(server)) {
1846 LOG.debug("Offline " + region.getRegionNameAsString()
1847 + ", no need to unassign since it's on a dead server: " + server);
1848 if (transitionInZK) {
1849
1850 deleteClosingOrClosedNode(region, server);
1851 }
1852 if (state != null) {
1853 regionOffline(region);
1854 }
1855 return;
1856 }
1857 try {
1858
1859 if (serverManager.sendRegionClose(server, region,
1860 versionOfClosingNode, dest, transitionInZK)) {
1861 LOG.debug("Sent CLOSE to " + server + " for region " +
1862 region.getRegionNameAsString());
1863 if (useZKForAssignment && !transitionInZK && state != null) {
1864
1865
1866 unassign(region, state, versionOfClosingNode,
1867 dest, transitionInZK, src);
1868 }
1869 return;
1870 }
1871
1872
1873 LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1874 region.getRegionNameAsString());
1875 } catch (Throwable t) {
1876 long sleepTime = 0;
1877 Configuration conf = this.server.getConfiguration();
1878 if (t instanceof RemoteException) {
1879 t = ((RemoteException)t).unwrapRemoteException();
1880 }
1881 boolean logRetries = true;
1882 if (t instanceof RegionServerAbortedException
1883 || t instanceof RegionServerStoppedException
1884 || t instanceof ServerNotRunningYetException) {
1885
1886
1887 sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1888 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1889
1890 } else if (t instanceof NotServingRegionException) {
1891 LOG.debug("Offline " + region.getRegionNameAsString()
1892 + ", it's not any more on " + server, t);
1893 if (transitionInZK) {
1894 deleteClosingOrClosedNode(region, server);
1895 }
1896 if (state != null) {
1897 regionOffline(region);
1898 }
1899 return;
1900 } else if ((t instanceof FailedServerException) || (state != null &&
1901 t instanceof RegionAlreadyInTransitionException)) {
1902 if(t instanceof FailedServerException) {
1903 sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1904 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1905 } else {
1906
1907 LOG.debug("update " + state + " the timestamp.");
1908 state.updateTimestampToNow();
1909 if (maxWaitTime < 0) {
1910 maxWaitTime =
1911 EnvironmentEdgeManager.currentTime()
1912 + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
1913 DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1914 }
1915 long now = EnvironmentEdgeManager.currentTime();
1916 if (now < maxWaitTime) {
1917 LOG.debug("Region is already in transition; "
1918 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1919 sleepTime = 100;
1920 i--;
1921 logRetries = false;
1922 }
1923 }
1924 }
1925
1926 try {
1927 if (sleepTime > 0) {
1928 Thread.sleep(sleepTime);
1929 }
1930 } catch (InterruptedException ie) {
1931 LOG.warn("Failed to unassign "
1932 + region.getRegionNameAsString() + " since interrupted", ie);
1933 Thread.currentThread().interrupt();
1934 if (state != null) {
1935 regionStates.updateRegionState(region, State.FAILED_CLOSE);
1936 }
1937 return;
1938 }
1939
1940 if (logRetries) {
1941 LOG.info("Server " + server + " returned " + t + " for "
1942 + region.getRegionNameAsString() + ", try=" + i
1943 + " of " + this.maximumAttempts, t);
1944
1945 }
1946 }
1947 }
1948
1949 if (state != null) {
1950 regionStates.updateRegionState(region, State.FAILED_CLOSE);
1951 }
1952 }
1953
1954
1955
1956
1957 private RegionState forceRegionStateToOffline(
1958 final HRegionInfo region, final boolean forceNewPlan) {
1959 RegionState state = regionStates.getRegionState(region);
1960 if (state == null) {
1961 LOG.warn("Assigning but not in region states: " + region);
1962 state = regionStates.createRegionState(region);
1963 }
1964
1965 ServerName sn = state.getServerName();
1966 if (forceNewPlan && LOG.isDebugEnabled()) {
1967 LOG.debug("Force region state offline " + state);
1968 }
1969
1970 switch (state.getState()) {
1971 case OPEN:
1972 case OPENING:
1973 case PENDING_OPEN:
1974 case CLOSING:
1975 case PENDING_CLOSE:
1976 if (!forceNewPlan) {
1977 LOG.debug("Skip assigning " +
1978 region + ", it is already " + state);
1979 return null;
1980 }
1981 case FAILED_CLOSE:
1982 case FAILED_OPEN:
1983 unassign(region, state, -1, null, false, null);
1984 state = regionStates.getRegionState(region);
1985 if (state.isFailedClose()) {
1986
1987
1988 LOG.info("Skip assigning " +
1989 region + ", we couldn't close it: " + state);
1990 return null;
1991 }
1992 case OFFLINE:
1993
1994
1995
1996
1997
1998
1999
2000
2001 if (useZKForAssignment
2002 && regionStates.isServerDeadAndNotProcessed(sn)
2003 && wasRegionOnDeadServerByMeta(region, sn)) {
2004 if (!regionStates.isRegionInTransition(region)) {
2005 LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
2006 regionStates.updateRegionState(region, State.OFFLINE);
2007 }
2008 LOG.info("Skip assigning " + region.getRegionNameAsString()
2009 + ", it is on a dead but not processed yet server: " + sn);
2010 return null;
2011 }
2012 case CLOSED:
2013 break;
2014 default:
2015 LOG.error("Trying to assign region " + region
2016 + ", which is " + state);
2017 return null;
2018 }
2019 return state;
2020 }
2021
2022 @SuppressWarnings("deprecation")
2023 protected boolean wasRegionOnDeadServerByMeta(
2024 final HRegionInfo region, final ServerName sn) {
2025 try {
2026 if (region.isMetaRegion()) {
2027 ServerName server = this.server.getMetaTableLocator().
2028 getMetaRegionLocation(this.server.getZooKeeper());
2029 return regionStates.isServerDeadAndNotProcessed(server);
2030 }
2031 while (!server.isStopped()) {
2032 try {
2033 this.server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper());
2034 Result r = MetaTableAccessor.getRegionResult(server.getConnection(),
2035 region.getRegionName());
2036 if (r == null || r.isEmpty()) return false;
2037 ServerName server = HRegionInfo.getServerName(r);
2038 return regionStates.isServerDeadAndNotProcessed(server);
2039 } catch (IOException ioe) {
2040 LOG.info("Received exception accessing hbase:meta during force assign "
2041 + region.getRegionNameAsString() + ", retrying", ioe);
2042 }
2043 }
2044 } catch (InterruptedException e) {
2045 Thread.currentThread().interrupt();
2046 LOG.info("Interrupted accessing hbase:meta", e);
2047 }
2048
2049 return regionStates.isServerDeadAndNotProcessed(sn);
2050 }
2051
2052
2053
2054
2055
2056
2057
2058 private void assign(RegionState state,
2059 boolean setOfflineInZK, final boolean forceNewPlan) {
2060 long startTime = EnvironmentEdgeManager.currentTime();
2061 try {
2062 Configuration conf = server.getConfiguration();
2063 RegionState currentState = state;
2064 int versionOfOfflineNode = -1;
2065 RegionPlan plan = null;
2066 long maxWaitTime = -1;
2067 HRegionInfo region = state.getRegion();
2068 RegionOpeningState regionOpenState;
2069 Throwable previousException = null;
2070 for (int i = 1; i <= maximumAttempts; i++) {
2071 if (server.isStopped() || server.isAborted()) {
2072 LOG.info("Skip assigning " + region.getRegionNameAsString()
2073 + ", the server is stopped/aborted");
2074 return;
2075 }
2076
2077 if (plan == null) {
2078 try {
2079 plan = getRegionPlan(region, forceNewPlan);
2080 } catch (HBaseIOException e) {
2081 LOG.warn("Failed to get region plan", e);
2082 }
2083 }
2084
2085 if (plan == null) {
2086 LOG.warn("Unable to determine a plan to assign " + region);
2087
2088
2089 if (region.isMetaRegion()) {
2090 if (i == maximumAttempts) {
2091 i = 0;
2092
2093 LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
2094 " after maximumAttempts (" + this.maximumAttempts +
2095 "). Reset attempts count and continue retrying.");
2096 }
2097 waitForRetryingMetaAssignment();
2098 continue;
2099 }
2100
2101 regionStates.updateRegionState(region, State.FAILED_OPEN);
2102 return;
2103 }
2104 if (setOfflineInZK && versionOfOfflineNode == -1) {
2105 LOG.info("Setting node as OFFLINED in ZooKeeper for region " + region);
2106
2107
2108 versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
2109 if (versionOfOfflineNode != -1) {
2110 if (isDisabledorDisablingRegionInRIT(region)) {
2111 return;
2112 }
2113
2114
2115
2116
2117
2118
2119 TableName tableName = region.getTable();
2120 if (!tableStateManager.isTableState(tableName,
2121 ZooKeeperProtos.Table.State.ENABLED, ZooKeeperProtos.Table.State.ENABLING)) {
2122 LOG.debug("Setting table " + tableName + " to ENABLED state.");
2123 setEnabledTable(tableName);
2124 }
2125 }
2126 }
2127 if (setOfflineInZK && versionOfOfflineNode == -1) {
2128 LOG.info("Unable to set offline in ZooKeeper to assign " + region);
2129
2130
2131
2132
2133 if (!server.isAborted()) {
2134 continue;
2135 }
2136 }
2137 LOG.info("Assigning " + region.getRegionNameAsString() +
2138 " to " + plan.getDestination().toString());
2139
2140 currentState = regionStates.updateRegionState(region,
2141 State.PENDING_OPEN, plan.getDestination());
2142
2143 boolean needNewPlan;
2144 final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
2145 " to " + plan.getDestination();
2146 try {
2147 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
2148 if (this.shouldAssignRegionsWithFavoredNodes) {
2149 favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
2150 }
2151 regionOpenState = serverManager.sendRegionOpen(
2152 plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
2153
2154 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
2155
2156 needNewPlan = true;
2157 LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
2158 " trying to assign elsewhere instead; " +
2159 "try=" + i + " of " + this.maximumAttempts);
2160 } else {
2161
2162 if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
2163 processAlreadyOpenedRegion(region, plan.getDestination());
2164 }
2165 return;
2166 }
2167
2168 } catch (Throwable t) {
2169 if (t instanceof RemoteException) {
2170 t = ((RemoteException) t).unwrapRemoteException();
2171 }
2172 previousException = t;
2173
2174
2175
2176
2177 boolean hold = (t instanceof RegionAlreadyInTransitionException ||
2178 t instanceof ServerNotRunningYetException);
2179
2180
2181
2182
2183
2184
2185 boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
2186 && this.serverManager.isServerOnline(plan.getDestination()));
2187
2188
2189 if (hold) {
2190 LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
2191 "try=" + i + " of " + this.maximumAttempts, t);
2192
2193 if (maxWaitTime < 0) {
2194 if (t instanceof RegionAlreadyInTransitionException) {
2195 maxWaitTime = EnvironmentEdgeManager.currentTime()
2196 + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
2197 DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
2198 } else {
2199 maxWaitTime = EnvironmentEdgeManager.currentTime()
2200 + this.server.getConfiguration().getLong(
2201 "hbase.regionserver.rpc.startup.waittime", 60000);
2202 }
2203 }
2204 try {
2205 needNewPlan = false;
2206 long now = EnvironmentEdgeManager.currentTime();
2207 if (now < maxWaitTime) {
2208 LOG.debug("Server is not yet up or region is already in transition; "
2209 + "waiting up to " + (maxWaitTime - now) + "ms", t);
2210 Thread.sleep(100);
2211 i--;
2212 } else if (!(t instanceof RegionAlreadyInTransitionException)) {
2213 LOG.debug("Server is not up for a while; try a new one", t);
2214 needNewPlan = true;
2215 }
2216 } catch (InterruptedException ie) {
2217 LOG.warn("Failed to assign "
2218 + region.getRegionNameAsString() + " since interrupted", ie);
2219 regionStates.updateRegionState(region, State.FAILED_OPEN);
2220 Thread.currentThread().interrupt();
2221 return;
2222 }
2223 } else if (retry) {
2224 needNewPlan = false;
2225 i--;
2226 LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
2227 } else {
2228 needNewPlan = true;
2229 LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2230 " try=" + i + " of " + this.maximumAttempts, t);
2231 }
2232 }
2233
2234 if (i == this.maximumAttempts) {
2235
2236 if (region.isMetaRegion()) {
2237 i = 0;
2238 LOG.warn(assignMsg +
2239 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
2240 this.maximumAttempts + "). Reset attempt counts and continue retrying.");
2241 waitForRetryingMetaAssignment();
2242 }
2243 else {
2244
2245
2246 continue;
2247 }
2248 }
2249
2250
2251
2252
2253 if (needNewPlan) {
2254
2255
2256
2257
2258 RegionPlan newPlan = null;
2259 try {
2260 newPlan = getRegionPlan(region, true);
2261 } catch (HBaseIOException e) {
2262 LOG.warn("Failed to get region plan", e);
2263 }
2264 if (newPlan == null) {
2265 regionStates.updateRegionState(region, State.FAILED_OPEN);
2266 LOG.warn("Unable to find a viable location to assign region " +
2267 region.getRegionNameAsString());
2268 return;
2269 }
2270
2271 if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2272
2273
2274
2275 LOG.info("Region assignment plan changed from " + plan.getDestination() + " to "
2276 + newPlan.getDestination() + " server.");
2277 currentState = regionStates.updateRegionState(region, State.OFFLINE);
2278 versionOfOfflineNode = -1;
2279 if (useZKForAssignment) {
2280 setOfflineInZK = true;
2281 }
2282 plan = newPlan;
2283 } else if(plan.getDestination().equals(newPlan.getDestination()) &&
2284 previousException instanceof FailedServerException) {
2285 try {
2286 LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
2287 " to the same failed server.");
2288 Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
2289 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
2290 } catch (InterruptedException ie) {
2291 LOG.warn("Failed to assign "
2292 + region.getRegionNameAsString() + " since interrupted", ie);
2293 regionStates.updateRegionState(region, State.FAILED_OPEN);
2294 Thread.currentThread().interrupt();
2295 return;
2296 }
2297 }
2298 }
2299 }
2300
2301 regionStates.updateRegionState(region, State.FAILED_OPEN);
2302 } finally {
2303 metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
2304 }
2305 }
2306
2307 private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2308
2309
2310
2311 LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2312 + " to " + sn);
2313 String encodedName = region.getEncodedName();
2314
2315
2316
2317 if(useZKForAssignment){
2318 String node = ZKAssign.getNodeName(watcher, encodedName);
2319 Stat stat = new Stat();
2320 try {
2321 byte[] existingBytes = ZKUtil.getDataNoWatch(watcher, node, stat);
2322 if(existingBytes!=null){
2323 RegionTransition rt= RegionTransition.parseFrom(existingBytes);
2324 EventType et = rt.getEventType();
2325 if (et.equals(EventType.RS_ZK_REGION_OPENED)) {
2326 LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2327 + " and node in "+et+" state");
2328 return;
2329 }
2330 }
2331 } catch (KeeperException ke) {
2332 LOG.warn("Unexpected ZK exception getData " + node
2333 + " node for the region " + encodedName, ke);
2334 } catch (DeserializationException e) {
2335 LOG.warn("Get RegionTransition from zk deserialization failed! ", e);
2336 }
2337
2338 deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2339 }
2340
2341 regionStates.regionOnline(region, sn);
2342 }
2343
2344 private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2345 if (this.tableStateManager.isTableState(region.getTable(),
2346 ZooKeeperProtos.Table.State.DISABLED,
2347 ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) {
2348 LOG.info("Table " + region.getTable() + " is disabled or disabling;"
2349 + " skipping assign of " + region.getRegionNameAsString());
2350 offlineDisabledRegion(region);
2351 return true;
2352 }
2353 return false;
2354 }
2355
2356
2357
2358
2359
2360
2361
2362
2363 private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2364 if (!state.isClosed() && !state.isOffline()) {
2365 String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2366 this.server.abort(msg, new IllegalStateException(msg));
2367 return -1;
2368 }
2369 regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2370 int versionOfOfflineNode;
2371 try {
2372
2373 versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2374 state.getRegion(), destination);
2375 if (versionOfOfflineNode == -1) {
2376 LOG.warn("Attempted to create/force node into OFFLINE state before "
2377 + "completing assignment but failed to do so for " + state);
2378 return -1;
2379 }
2380 } catch (KeeperException e) {
2381 server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2382 return -1;
2383 }
2384 return versionOfOfflineNode;
2385 }
2386
2387
2388
2389
2390
2391
2392 private RegionPlan getRegionPlan(final HRegionInfo region,
2393 final boolean forceNewPlan) throws HBaseIOException {
2394 return getRegionPlan(region, null, forceNewPlan);
2395 }
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406 private RegionPlan getRegionPlan(final HRegionInfo region,
2407 final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
2408
2409 final String encodedName = region.getEncodedName();
2410 final List<ServerName> destServers =
2411 serverManager.createDestinationServersList(serverToExclude);
2412
2413 if (destServers.isEmpty()){
2414 LOG.warn("Can't move " + encodedName +
2415 ", there is no destination server available.");
2416 return null;
2417 }
2418
2419 RegionPlan randomPlan = null;
2420 boolean newPlan = false;
2421 RegionPlan existingPlan;
2422
2423 synchronized (this.regionPlans) {
2424 existingPlan = this.regionPlans.get(encodedName);
2425
2426 if (existingPlan != null && existingPlan.getDestination() != null) {
2427 LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2428 + " destination server is " + existingPlan.getDestination() +
2429 " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2430 }
2431
2432 if (forceNewPlan
2433 || existingPlan == null
2434 || existingPlan.getDestination() == null
2435 || !destServers.contains(existingPlan.getDestination())) {
2436 newPlan = true;
2437 }
2438 }
2439
2440 if (newPlan) {
2441 ServerName destination = balancer.randomAssignment(region, destServers);
2442 if (destination == null) {
2443 LOG.warn("Can't find a destination for " + encodedName);
2444 return null;
2445 }
2446 synchronized (this.regionPlans) {
2447 randomPlan = new RegionPlan(region, null, destination);
2448 if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2449 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2450 regions.add(region);
2451 try {
2452 processFavoredNodes(regions);
2453 } catch (IOException ie) {
2454 LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2455 }
2456 }
2457 this.regionPlans.put(encodedName, randomPlan);
2458 }
2459 LOG.debug("No previous transition plan found (or ignoring " + "an existing plan) for "
2460 + region.getRegionNameAsString() + "; generated random plan=" + randomPlan + "; "
2461 + destServers.size() + " (online=" + serverManager.getOnlineServers().size()
2462 + ") available servers, forceNewPlan=" + forceNewPlan);
2463 return randomPlan;
2464 }
2465 LOG.debug("Using pre-existing plan for " +
2466 region.getRegionNameAsString() + "; plan=" + existingPlan);
2467 return existingPlan;
2468 }
2469
2470
2471
2472
2473 private void waitForRetryingMetaAssignment() {
2474 try {
2475 Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
2476 } catch (InterruptedException e) {
2477 LOG.error("Got exception while waiting for hbase:meta assignment");
2478 Thread.currentThread().interrupt();
2479 }
2480 }
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495 public void unassign(HRegionInfo region) {
2496 unassign(region, false);
2497 }
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514 public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2515
2516 LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2517 + " (offlining), current state: " + regionStates.getRegionState(region));
2518
2519 String encodedName = region.getEncodedName();
2520
2521 int versionOfClosingNode = -1;
2522
2523
2524 ReentrantLock lock = locker.acquireLock(encodedName);
2525 RegionState state = regionStates.getRegionTransitionState(encodedName);
2526 boolean reassign = true;
2527 try {
2528 if (state == null) {
2529
2530
2531 state = regionStates.getRegionState(encodedName);
2532 if (state != null && state.isUnassignable()) {
2533 LOG.info("Attempting to unassign " + state + ", ignored");
2534
2535 return;
2536 }
2537
2538 try {
2539 if (state == null || state.getServerName() == null) {
2540
2541
2542 LOG.warn("Attempting to unassign a region not in RegionStates "
2543 + region.getRegionNameAsString() + ", offlined");
2544 regionOffline(region);
2545 return;
2546 }
2547 if (useZKForAssignment) {
2548 versionOfClosingNode = ZKAssign.createNodeClosing(
2549 watcher, region, state.getServerName());
2550 if (versionOfClosingNode == -1) {
2551 LOG.info("Attempting to unassign " +
2552 region.getRegionNameAsString() + " but ZK closing node "
2553 + "can't be created.");
2554 reassign = false;
2555 return;
2556 }
2557 }
2558 } catch (KeeperException e) {
2559 if (e instanceof NodeExistsException) {
2560
2561
2562
2563
2564 NodeExistsException nee = (NodeExistsException)e;
2565 String path = nee.getPath();
2566 try {
2567 if (isSplitOrSplittingOrMergedOrMerging(path)) {
2568 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2569 "skipping unassign because region no longer exists -- its split or merge");
2570 reassign = false;
2571 return;
2572 }
2573 } catch (KeeperException.NoNodeException ke) {
2574 LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2575 "; presuming split and that the region to unassign, " +
2576 encodedName + ", no longer exists -- confirm", ke);
2577 return;
2578 } catch (KeeperException ke) {
2579 LOG.error("Unexpected zk state", ke);
2580 } catch (DeserializationException de) {
2581 LOG.error("Failed parse", de);
2582 }
2583 }
2584
2585 server.abort("Unexpected ZK exception creating node CLOSING", e);
2586 reassign = false;
2587 return;
2588 }
2589 state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2590 } else if (state.isFailedOpen()) {
2591
2592 regionOffline(region);
2593 return;
2594 } else if (force && state.isPendingCloseOrClosing()) {
2595 LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2596 " which is already " + state.getState() +
2597 " but forcing to send a CLOSE RPC again ");
2598 if (state.isFailedClose()) {
2599 state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2600 }
2601 state.updateTimestampToNow();
2602 } else {
2603 LOG.debug("Attempting to unassign " +
2604 region.getRegionNameAsString() + " but it is " +
2605 "already in transition (" + state.getState() + ", force=" + force + ")");
2606 return;
2607 }
2608
2609 unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
2610 } finally {
2611 lock.unlock();
2612
2613
2614 if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) {
2615 assign(region, true);
2616 }
2617 }
2618 }
2619
2620 public void unassign(HRegionInfo region, boolean force){
2621 unassign(region, force, null);
2622 }
2623
2624
2625
2626
2627 public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2628 String encodedName = region.getEncodedName();
2629 deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2630 EventType.RS_ZK_REGION_CLOSED);
2631 }
2632
2633
2634
2635
2636
2637
2638
2639 private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2640 throws KeeperException, DeserializationException {
2641 boolean result = false;
2642
2643
2644 byte [] data = ZKAssign.getData(watcher, path);
2645 if (data == null) {
2646 LOG.info("Node " + path + " is gone");
2647 return false;
2648 }
2649 RegionTransition rt = RegionTransition.parseFrom(data);
2650 switch (rt.getEventType()) {
2651 case RS_ZK_REQUEST_REGION_SPLIT:
2652 case RS_ZK_REGION_SPLIT:
2653 case RS_ZK_REGION_SPLITTING:
2654 case RS_ZK_REQUEST_REGION_MERGE:
2655 case RS_ZK_REGION_MERGED:
2656 case RS_ZK_REGION_MERGING:
2657 result = true;
2658 break;
2659 default:
2660 LOG.info("Node " + path + " is in " + rt.getEventType());
2661 break;
2662 }
2663 return result;
2664 }
2665
2666
2667
2668
2669
2670
2671 public int getNumRegionsOpened() {
2672 return numRegionsOpened.get();
2673 }
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684 public boolean waitForAssignment(HRegionInfo regionInfo)
2685 throws InterruptedException {
2686 ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
2687 regionSet.add(regionInfo);
2688 return waitForAssignment(regionSet, true, Long.MAX_VALUE);
2689 }
2690
2691
2692
2693
2694 protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2695 final boolean waitTillAllAssigned, final int reassigningRegions,
2696 final long minEndTime) throws InterruptedException {
2697 long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
2698 return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
2699 }
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709 protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2710 final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
2711
2712 while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
2713 int failedOpenCount = 0;
2714 Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
2715 while (regionInfoIterator.hasNext()) {
2716 HRegionInfo hri = regionInfoIterator.next();
2717 if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
2718 State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
2719 regionInfoIterator.remove();
2720 } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
2721 failedOpenCount++;
2722 }
2723 }
2724 if (!waitTillAllAssigned) {
2725
2726 break;
2727 }
2728 if (!regionSet.isEmpty()) {
2729 if (failedOpenCount == regionSet.size()) {
2730
2731 break;
2732 }
2733 regionStates.waitForUpdate(100);
2734 }
2735 }
2736 return regionSet.isEmpty();
2737 }
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750 public void assignMeta(HRegionInfo hri) throws KeeperException {
2751 this.server.getMetaTableLocator().deleteMetaLocation(this.watcher, hri.getReplicaId());
2752 assign(hri, true);
2753 }
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763 public void assign(Map<HRegionInfo, ServerName> regions)
2764 throws IOException, InterruptedException {
2765 if (regions == null || regions.isEmpty()) {
2766 return;
2767 }
2768 List<ServerName> servers = serverManager.createDestinationServersList();
2769 if (servers == null || servers.isEmpty()) {
2770 throw new IOException("Found no destination server to assign region(s)");
2771 }
2772
2773
2774 Map<ServerName, List<HRegionInfo>> bulkPlan =
2775 balancer.retainAssignment(regions, servers);
2776 if (bulkPlan == null) {
2777 throw new IOException("Unable to determine a plan to assign region(s)");
2778 }
2779
2780 assign(regions.size(), servers.size(),
2781 "retainAssignment=true", bulkPlan);
2782 }
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792 public void assign(List<HRegionInfo> regions)
2793 throws IOException, InterruptedException {
2794 if (regions == null || regions.isEmpty()) {
2795 return;
2796 }
2797
2798 List<ServerName> servers = serverManager.createDestinationServersList();
2799 if (servers == null || servers.isEmpty()) {
2800 throw new IOException("Found no destination server to assign region(s)");
2801 }
2802
2803
2804 Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers);
2805 if (bulkPlan == null) {
2806 throw new IOException("Unable to determine a plan to assign region(s)");
2807 }
2808
2809 processFavoredNodes(regions);
2810 assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
2811 }
2812
2813 private void assign(int regions, int totalServers,
2814 String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2815 throws InterruptedException, IOException {
2816
2817 int servers = bulkPlan.size();
2818 if (servers == 1 || (regions < bulkAssignThresholdRegions
2819 && servers < bulkAssignThresholdServers)) {
2820
2821
2822
2823 if (LOG.isTraceEnabled()) {
2824 LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2825 " region(s) to " + servers + " server(s)");
2826 }
2827
2828
2829 ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
2830 for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2831 if (!assign(plan.getKey(), plan.getValue())) {
2832 for (HRegionInfo region: plan.getValue()) {
2833 if (!regionStates.isRegionOnline(region)) {
2834 invokeAssign(region);
2835 if (!region.getTable().isSystemTable()) {
2836 userRegionSet.add(region);
2837 }
2838 }
2839 }
2840 }
2841 }
2842
2843
2844 if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
2845 System.currentTimeMillis())) {
2846 LOG.debug("some user regions are still in transition: " + userRegionSet);
2847 }
2848 } else {
2849 LOG.info("Bulk assigning " + regions + " region(s) across "
2850 + totalServers + " server(s), " + message);
2851
2852
2853 BulkAssigner ba = new GeneralBulkAssigner(
2854 this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2855 ba.bulkAssign();
2856 LOG.info("Bulk assigning done");
2857 }
2858 }
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869 private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
2870 throws IOException, InterruptedException {
2871 if (allRegions == null || allRegions.isEmpty()) return;
2872
2873
2874 boolean retainAssignment = server.getConfiguration().
2875 getBoolean("hbase.master.startup.retainassign", true);
2876
2877 Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
2878 if (retainAssignment) {
2879 assign(allRegions);
2880 } else {
2881 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
2882 assign(regions);
2883 }
2884
2885 for (HRegionInfo hri : regionsFromMetaScan) {
2886 TableName tableName = hri.getTable();
2887 if (!tableStateManager.isTableState(tableName,
2888 ZooKeeperProtos.Table.State.ENABLED)) {
2889 setEnabledTable(tableName);
2890 }
2891 }
2892
2893 assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, server));
2894 }
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907 public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
2908 Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
2909 List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
2910 for (HRegionInfo hri : regionsRecordedInMeta) {
2911 TableName table = hri.getTable();
2912 HTableDescriptor htd = master.getTableDescriptors().get(table);
2913
2914 int desiredRegionReplication = htd.getRegionReplication();
2915 for (int i = 0; i < desiredRegionReplication; i++) {
2916 HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
2917 if (regionsRecordedInMeta.contains(replica)) continue;
2918 regionsNotRecordedInMeta.add(replica);
2919 }
2920 }
2921 return regionsNotRecordedInMeta;
2922 }
2923
2924
2925
2926
2927
2928
2929
2930 boolean waitUntilNoRegionsInTransition(final long timeout)
2931 throws InterruptedException {
2932
2933
2934
2935
2936
2937
2938 final long endTime = System.currentTimeMillis() + timeout;
2939
2940 while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2941 && endTime > System.currentTimeMillis()) {
2942 regionStates.waitForUpdate(100);
2943 }
2944
2945 return !regionStates.isRegionsInTransition();
2946 }
2947
2948
2949
2950
2951
2952
2953
2954 Set<ServerName> rebuildUserRegions() throws
2955 IOException, KeeperException, CoordinatedStateException {
2956 Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
2957 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
2958
2959 Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
2960 ZooKeeperProtos.Table.State.DISABLED,
2961 ZooKeeperProtos.Table.State.DISABLING,
2962 ZooKeeperProtos.Table.State.ENABLING);
2963
2964
2965 List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getConnection());
2966
2967 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2968
2969 Set<ServerName> offlineServers = new HashSet<ServerName>();
2970
2971 for (Result result : results) {
2972 if (result == null && LOG.isDebugEnabled()){
2973 LOG.debug("null result from meta - ignoring but this is strange.");
2974 continue;
2975 }
2976
2977
2978
2979 PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
2980 if (p.getFirst() != null && p.getSecond() != null) {
2981 int numReplicas = server.getTableDescriptors().get(p.getFirst().
2982 getTable()).getRegionReplication();
2983 for (HRegionInfo merge : p) {
2984 for (int i = 1; i < numReplicas; i++) {
2985 replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
2986 }
2987 }
2988 }
2989 RegionLocations rl = MetaTableAccessor.getRegionLocations(result);
2990 if (rl == null) continue;
2991 HRegionLocation[] locations = rl.getRegionLocations();
2992 if (locations == null) continue;
2993 for (HRegionLocation hrl : locations) {
2994 if (hrl == null) continue;
2995 HRegionInfo regionInfo = hrl.getRegionInfo();
2996 if (regionInfo == null) continue;
2997 int replicaId = regionInfo.getReplicaId();
2998 State state = RegionStateStore.getRegionState(result, replicaId);
2999
3000
3001
3002 if (replicaId == 0 && state.equals(State.SPLIT)) {
3003 for (HRegionLocation h : locations) {
3004 replicasToClose.add(h.getRegionInfo());
3005 }
3006 }
3007 ServerName lastHost = hrl.getServerName();
3008 ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
3009 if (tableStateManager.isTableState(regionInfo.getTable(),
3010 ZooKeeperProtos.Table.State.DISABLED)) {
3011
3012
3013 lastHost = null;
3014 regionLocation = null;
3015 }
3016 regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
3017 if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
3018
3019 continue;
3020 }
3021 TableName tableName = regionInfo.getTable();
3022 if (!onlineServers.contains(regionLocation)) {
3023
3024 offlineServers.add(regionLocation);
3025 if (useZKForAssignment) {
3026 regionStates.regionOffline(regionInfo);
3027 }
3028 } else if (!disabledOrEnablingTables.contains(tableName)) {
3029
3030
3031 regionStates.regionOnline(regionInfo, regionLocation);
3032 balancer.regionOnline(regionInfo, regionLocation);
3033 } else if (useZKForAssignment) {
3034 regionStates.regionOffline(regionInfo);
3035 }
3036
3037
3038 if (!disabledOrDisablingOrEnabling.contains(tableName)
3039 && !getTableStateManager().isTableState(tableName,
3040 ZooKeeperProtos.Table.State.ENABLED)) {
3041 setEnabledTable(tableName);
3042 }
3043 }
3044 }
3045 return offlineServers;
3046 }
3047
3048
3049
3050
3051
3052
3053
3054
3055
3056 private void recoverTableInDisablingState()
3057 throws KeeperException, IOException, CoordinatedStateException {
3058 Set<TableName> disablingTables =
3059 tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING);
3060 if (disablingTables.size() != 0) {
3061 for (TableName tableName : disablingTables) {
3062
3063 LOG.info("The table " + tableName
3064 + " is in DISABLING state. Hence recovering by moving the table"
3065 + " to DISABLED state.");
3066 new DisableTableHandler(this.server, tableName,
3067 this, tableLockManager, true).prepare().process();
3068 }
3069 }
3070 }
3071
3072
3073
3074
3075
3076
3077
3078
3079
3080 private void recoverTableInEnablingState()
3081 throws KeeperException, IOException, CoordinatedStateException {
3082 Set<TableName> enablingTables = tableStateManager.
3083 getTablesInStates(ZooKeeperProtos.Table.State.ENABLING);
3084 if (enablingTables.size() != 0) {
3085 for (TableName tableName : enablingTables) {
3086
3087 LOG.info("The table " + tableName
3088 + " is in ENABLING state. Hence recovering by moving the table"
3089 + " to ENABLED state.");
3090
3091
3092 EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
3093 this, tableLockManager, true);
3094 try {
3095 eth.prepare();
3096 } catch (TableNotFoundException e) {
3097 LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
3098 continue;
3099 }
3100 eth.process();
3101 }
3102 }
3103 }
3104
3105
3106
3107
3108
3109
3110
3111
3112
3113
3114
3115
3116 private void processDeadServersAndRecoverLostRegions(Set<ServerName> deadServers)
3117 throws IOException, KeeperException {
3118 if (deadServers != null && !deadServers.isEmpty()) {
3119 for (ServerName serverName: deadServers) {
3120 if (!serverManager.isServerDead(serverName)) {
3121 serverManager.expireServer(serverName);
3122 }
3123 }
3124 }
3125
3126 List<String> nodes = useZKForAssignment ?
3127 ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
3128 : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
3129 if (nodes != null && !nodes.isEmpty()) {
3130 for (String encodedRegionName : nodes) {
3131 processRegionInTransition(encodedRegionName, null);
3132 }
3133 } else if (!useZKForAssignment) {
3134 processRegionInTransitionZkLess();
3135 }
3136 }
3137
3138 void processRegionInTransitionZkLess() {
3139
3140
3141
3142
3143
3144 Map<String, RegionState> rits = regionStates.getRegionsInTransition();
3145 for (RegionState regionState : rits.values()) {
3146 LOG.info("Processing " + regionState);
3147 ServerName serverName = regionState.getServerName();
3148
3149
3150 if (serverName != null
3151 && !serverManager.getOnlineServers().containsKey(serverName)) {
3152 LOG.info("Server " + serverName + " isn't online. SSH will handle this");
3153 continue;
3154 }
3155 HRegionInfo regionInfo = regionState.getRegion();
3156 State state = regionState.getState();
3157
3158 switch (state) {
3159 case CLOSED:
3160 invokeAssign(regionInfo);
3161 break;
3162 case PENDING_OPEN:
3163 retrySendRegionOpen(regionState);
3164 break;
3165 case PENDING_CLOSE:
3166 retrySendRegionClose(regionState);
3167 break;
3168 case FAILED_CLOSE:
3169 case FAILED_OPEN:
3170 invokeUnAssign(regionInfo);
3171 break;
3172 default:
3173
3174 }
3175 }
3176 }
3177
3178
3179
3180
3181
3182 private void retrySendRegionOpen(final RegionState regionState) {
3183 this.executorService.submit(
3184 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3185 @Override
3186 public void process() throws IOException {
3187 HRegionInfo hri = regionState.getRegion();
3188 ServerName serverName = regionState.getServerName();
3189 ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3190 try {
3191 for (int i = 1; i <= maximumAttempts; i++) {
3192 if (!serverManager.isServerOnline(serverName)
3193 || server.isStopped() || server.isAborted()) {
3194 return;
3195 }
3196 try {
3197 if (!regionState.equals(regionStates.getRegionState(hri))) {
3198 return;
3199 }
3200 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
3201 if (shouldAssignRegionsWithFavoredNodes) {
3202 favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
3203 }
3204 RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
3205 serverName, hri, -1, favoredNodes);
3206
3207 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
3208
3209
3210 LOG.debug("Got failed_opening in retry sendRegionOpen for "
3211 + regionState + ", re-assign it");
3212 invokeAssign(hri, true);
3213 }
3214 return;
3215 } catch (Throwable t) {
3216 if (t instanceof RemoteException) {
3217 t = ((RemoteException) t).unwrapRemoteException();
3218 }
3219
3220 if (t instanceof java.net.SocketTimeoutException
3221 || t instanceof FailedServerException) {
3222 Threads.sleep(100);
3223 continue;
3224 }
3225
3226 LOG.debug("Got exception in retry sendRegionOpen for "
3227 + regionState + ", re-assign it", t);
3228 invokeAssign(hri);
3229 return;
3230 }
3231 }
3232 } finally {
3233 lock.unlock();
3234 }
3235 }
3236 });
3237 }
3238
3239
3240
3241
3242
3243 private void retrySendRegionClose(final RegionState regionState) {
3244 this.executorService.submit(
3245 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3246 @Override
3247 public void process() throws IOException {
3248 HRegionInfo hri = regionState.getRegion();
3249 ServerName serverName = regionState.getServerName();
3250 ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3251 try {
3252 for (int i = 1; i <= maximumAttempts; i++) {
3253 if (!serverManager.isServerOnline(serverName)
3254 || server.isStopped() || server.isAborted()) {
3255 return;
3256 }
3257 try {
3258 if (!regionState.equals(regionStates.getRegionState(hri))) {
3259 return;
3260 }
3261 if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
3262
3263 LOG.debug("Got false in retry sendRegionClose for "
3264 + regionState + ", re-close it");
3265 invokeUnAssign(hri);
3266 }
3267 return;
3268 } catch (Throwable t) {
3269 if (t instanceof RemoteException) {
3270 t = ((RemoteException) t).unwrapRemoteException();
3271 }
3272
3273 if (t instanceof java.net.SocketTimeoutException
3274 || t instanceof FailedServerException) {
3275 Threads.sleep(100);
3276 continue;
3277 }
3278 if (!(t instanceof NotServingRegionException
3279 || t instanceof RegionAlreadyInTransitionException)) {
3280
3281
3282
3283 LOG.debug("Got exception in retry sendRegionClose for "
3284 + regionState + ", re-close it", t);
3285 invokeUnAssign(hri);
3286 }
3287 return;
3288 }
3289 }
3290 } finally {
3291 lock.unlock();
3292 }
3293 }
3294 });
3295 }
3296
3297
3298
3299
3300
3301
3302
3303
3304 public void updateRegionsInTransitionMetrics() {
3305 long currentTime = System.currentTimeMillis();
3306 int totalRITs = 0;
3307 int totalRITsOverThreshold = 0;
3308 long oldestRITTime = 0;
3309 int ritThreshold = this.server.getConfiguration().
3310 getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
3311 for (RegionState state: regionStates.getRegionsInTransition().values()) {
3312 totalRITs++;
3313 long ritTime = currentTime - state.getStamp();
3314 if (ritTime > ritThreshold) {
3315 totalRITsOverThreshold++;
3316 }
3317 if (oldestRITTime < ritTime) {
3318 oldestRITTime = ritTime;
3319 }
3320 }
3321 if (this.metricsAssignmentManager != null) {
3322 this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
3323 this.metricsAssignmentManager.updateRITCount(totalRITs);
3324 this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
3325 }
3326 }
3327
3328
3329
3330
3331 void clearRegionPlan(final HRegionInfo region) {
3332 synchronized (this.regionPlans) {
3333 this.regionPlans.remove(region.getEncodedName());
3334 }
3335 }
3336
3337
3338
3339
3340
3341
3342 public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
3343 throws IOException, InterruptedException {
3344 waitOnRegionToClearRegionsInTransition(hri, -1L);
3345 }
3346
3347
3348
3349
3350
3351
3352
3353
3354 public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
3355 throws InterruptedException {
3356 if (!regionStates.isRegionInTransition(hri)) return true;
3357 long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
3358 + timeOut;
3359
3360
3361 LOG.info("Waiting for " + hri.getEncodedName() +
3362 " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
3363 while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
3364 regionStates.waitForUpdate(100);
3365 if (EnvironmentEdgeManager.currentTime() > end) {
3366 LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
3367 return false;
3368 }
3369 }
3370 if (this.server.isStopped()) {
3371 LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
3372 return false;
3373 }
3374 return true;
3375 }
3376
3377 void invokeAssign(HRegionInfo regionInfo) {
3378 invokeAssign(regionInfo, true);
3379 }
3380
3381 void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
3382 threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
3383 }
3384
3385 void invokeUnAssign(HRegionInfo regionInfo) {
3386 threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3387 }
3388
3389 public ServerHostRegion isCarryingMeta(ServerName serverName) {
3390 return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3391 }
3392
3393 public ServerHostRegion isCarryingMetaReplica(ServerName serverName, int replicaId) {
3394 return isCarryingRegion(serverName,
3395 RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
3396 }
3397
3398 public ServerHostRegion isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
3399 return isCarryingRegion(serverName, metaHri);
3400 }
3401
3402
3403
3404
3405
3406
3407
3408
3409
3410
3411
3412 private ServerHostRegion isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3413 RegionTransition rt = null;
3414 try {
3415 byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3416
3417 rt = data == null? null: RegionTransition.parseFrom(data);
3418 } catch (KeeperException e) {
3419 server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3420 } catch (DeserializationException e) {
3421 server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3422 }
3423
3424 ServerName addressFromZK = rt != null? rt.getServerName(): null;
3425 if (addressFromZK != null) {
3426
3427 boolean matchZK = addressFromZK.equals(serverName);
3428 LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3429 " current=" + serverName + ", matches=" + matchZK);
3430 return matchZK ? ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3431 }
3432
3433 ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3434 if (LOG.isDebugEnabled()) {
3435 LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3436 " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3437 " server being checked: " + serverName);
3438 }
3439 if (addressFromAM != null) {
3440 return addressFromAM.equals(serverName) ?
3441 ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3442 }
3443
3444 if (hri.isMetaRegion() && RegionReplicaUtil.isDefaultReplica(hri)) {
3445
3446 final ServerName serverNameInZK =
3447 server.getMetaTableLocator().getMetaRegionLocation(this.server.getZooKeeper());
3448 if (LOG.isDebugEnabled()) {
3449 LOG.debug("Based on MetaTableLocator, the META region is on server=" +
3450 (serverNameInZK == null ? "null" : serverNameInZK) +
3451 " server being checked: " + serverName);
3452 }
3453 if (serverNameInZK != null) {
3454 return serverNameInZK.equals(serverName) ?
3455 ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3456 }
3457 }
3458
3459
3460 return ServerHostRegion.UNKNOWN;
3461 }
3462
3463
3464
3465
3466
3467
3468 public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) {
3469
3470 synchronized (this.regionPlans) {
3471 for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator();
3472 i.hasNext();) {
3473 Map.Entry<String, RegionPlan> e = i.next();
3474 ServerName otherSn = e.getValue().getDestination();
3475
3476 if (otherSn != null && otherSn.equals(sn)) {
3477
3478 i.remove();
3479 }
3480 }
3481 }
3482 List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3483 for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3484 HRegionInfo hri = it.next();
3485 String encodedName = hri.getEncodedName();
3486
3487
3488 Lock lock = locker.acquireLock(encodedName);
3489 try {
3490 RegionState regionState = regionStates.getRegionTransitionState(encodedName);
3491 if (regionState == null
3492 || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3493 || !(regionState.isFailedClose() || regionState.isOffline()
3494 || regionState.isPendingOpenOrOpening())) {
3495 LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3496 + " on the dead server any more: " + sn);
3497 it.remove();
3498 } else {
3499 try {
3500
3501 ZKAssign.deleteNodeFailSilent(watcher, hri);
3502 } catch (KeeperException ke) {
3503 server.abort("Unexpected ZK exception deleting node " + hri, ke);
3504 }
3505 if (tableStateManager.isTableState(hri.getTable(),
3506 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3507 regionStates.regionOffline(hri);
3508 it.remove();
3509 continue;
3510 }
3511
3512 regionStates.updateRegionState(hri, State.OFFLINE);
3513 }
3514 } finally {
3515 lock.unlock();
3516 }
3517 }
3518 return regions;
3519 }
3520
3521
3522
3523
3524 public void balance(final RegionPlan plan) {
3525
3526 HRegionInfo hri = plan.getRegionInfo();
3527 TableName tableName = hri.getTable();
3528 if (tableStateManager.isTableState(tableName,
3529 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3530 LOG.info("Ignored moving region of disabling/disabled table "
3531 + tableName);
3532 return;
3533 }
3534
3535
3536 String encodedName = hri.getEncodedName();
3537 ReentrantLock lock = locker.acquireLock(encodedName);
3538 try {
3539 if (!regionStates.isRegionOnline(hri)) {
3540 RegionState state = regionStates.getRegionState(encodedName);
3541 LOG.info("Ignored moving region not assigned: " + hri + ", "
3542 + (state == null ? "not in region states" : state));
3543 return;
3544 }
3545 synchronized (this.regionPlans) {
3546 this.regionPlans.put(plan.getRegionName(), plan);
3547 }
3548 unassign(hri, false, plan.getDestination());
3549 } finally {
3550 lock.unlock();
3551 }
3552 }
3553
3554 public void stop() {
3555 shutdown();
3556 }
3557
3558
3559
3560
3561 public void shutdown() {
3562
3563 synchronized (zkEventWorkerWaitingList){
3564 zkEventWorkerWaitingList.clear();
3565 }
3566
3567
3568 threadPoolExecutorService.shutdownNow();
3569 zkEventWorkers.shutdownNow();
3570 regionStateStore.stop();
3571 }
3572
3573 protected void setEnabledTable(TableName tableName) {
3574 try {
3575 this.tableStateManager.setTableState(tableName,
3576 ZooKeeperProtos.Table.State.ENABLED);
3577 } catch (CoordinatedStateException e) {
3578
3579 String errorMsg = "Unable to ensure that the table " + tableName
3580 + " will be" + " enabled because of a ZooKeeper issue";
3581 LOG.error(errorMsg);
3582 this.server.abort(errorMsg, e);
3583 }
3584 }
3585
3586
3587
3588
3589
3590
3591
3592 private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3593 final AsyncCallback.StringCallback cb, final ServerName destination) {
3594 if (!state.isClosed() && !state.isOffline()) {
3595 this.server.abort("Unexpected state trying to OFFLINE; " + state,
3596 new IllegalStateException());
3597 return false;
3598 }
3599 regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3600 try {
3601 ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3602 destination, cb, state);
3603 } catch (KeeperException e) {
3604 if (e instanceof NodeExistsException) {
3605 LOG.warn("Node for " + state.getRegion() + " already exists");
3606 } else {
3607 server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3608 }
3609 return false;
3610 }
3611 return true;
3612 }
3613
3614 private boolean deleteNodeInStates(String encodedName,
3615 String desc, ServerName sn, EventType... types) {
3616 try {
3617 for (EventType et: types) {
3618 if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3619 return true;
3620 }
3621 }
3622 LOG.info("Failed to delete the " + desc + " node for "
3623 + encodedName + ". The node type may not match");
3624 } catch (NoNodeException e) {
3625 if (LOG.isDebugEnabled()) {
3626 LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3627 }
3628 } catch (KeeperException ke) {
3629 server.abort("Unexpected ZK exception deleting " + desc
3630 + " node for the region " + encodedName, ke);
3631 }
3632 return false;
3633 }
3634
3635 private void deleteMergingNode(String encodedName, ServerName sn) {
3636 deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3637 EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3638 }
3639
3640 private void deleteSplittingNode(String encodedName, ServerName sn) {
3641 deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3642 EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3643 }
3644
3645 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
3646 value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
3647 justification="Modification of Maps not ATOMIC!!!! FIX!!!")
3648 private void onRegionFailedOpen(
3649 final HRegionInfo hri, final ServerName sn) {
3650 String encodedName = hri.getEncodedName();
3651
3652 AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
3653 if (failedOpenCount == null) {
3654 failedOpenCount = new AtomicInteger();
3655
3656
3657
3658
3659 failedOpenTracker.put(encodedName, failedOpenCount);
3660 }
3661 if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
3662
3663 regionStates.updateRegionState(hri, State.FAILED_OPEN);
3664
3665
3666 failedOpenTracker.remove(encodedName);
3667 } else {
3668 if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
3669
3670
3671 LOG.warn("Failed to open the hbase:meta region " +
3672 hri.getRegionNameAsString() + " after" +
3673 failedOpenCount.get() + " retries. Continue retrying.");
3674 }
3675
3676
3677 RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
3678 if (regionState != null) {
3679
3680
3681 if (getTableStateManager().isTableState(hri.getTable(),
3682 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3683 replicasToClose.contains(hri)) {
3684 offlineDisabledRegion(hri);
3685 return;
3686 }
3687
3688 regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3689
3690 removeClosedRegion(hri);
3691 try {
3692 getRegionPlan(hri, sn, true);
3693 } catch (HBaseIOException e) {
3694 LOG.warn("Failed to get region plan", e);
3695 }
3696 invokeAssign(hri, false);
3697 }
3698 }
3699 }
3700
3701 private void onRegionOpen(final HRegionInfo hri, final ServerName sn, long openSeqNum) {
3702 regionOnline(hri, sn, openSeqNum);
3703 if (useZKForAssignment) {
3704 try {
3705
3706 ZKAssign.deleteNodeFailSilent(watcher, hri);
3707 } catch (KeeperException ke) {
3708 server.abort("Unexpected ZK exception deleting node " + hri, ke);
3709 }
3710 }
3711
3712
3713 failedOpenTracker.remove(hri.getEncodedName());
3714 if (getTableStateManager().isTableState(hri.getTable(),
3715 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3716 invokeUnAssign(hri);
3717 }
3718 }
3719
3720 private void onRegionClosed(final HRegionInfo hri) {
3721 if (getTableStateManager().isTableState(hri.getTable(),
3722 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3723 replicasToClose.contains(hri)) {
3724 offlineDisabledRegion(hri);
3725 return;
3726 }
3727 regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3728 sendRegionClosedNotification(hri);
3729
3730 removeClosedRegion(hri);
3731 invokeAssign(hri, false);
3732 }
3733
3734 private String checkInStateForSplit(ServerName sn,
3735 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3736 final RegionState rs_p = regionStates.getRegionState(p);
3737 RegionState rs_a = regionStates.getRegionState(a);
3738 RegionState rs_b = regionStates.getRegionState(b);
3739 if (!(rs_p.isOpenOrSplittingOnServer(sn)
3740 && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3741 && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3742 return "Not in state good for split";
3743 }
3744 return "";
3745 }
3746
3747 private String onRegionSplitReverted(ServerName sn,
3748 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3749 String s = checkInStateForSplit(sn, p, a, b);
3750 if (!org.apache.commons.lang.StringUtils.isEmpty(s)) {
3751 return s;
3752 }
3753 regionOnline(p, sn);
3754 regionOffline(a);
3755 regionOffline(b);
3756
3757 if (getTableStateManager().isTableState(p.getTable(),
3758 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3759 invokeUnAssign(p);
3760 }
3761 return null;
3762 }
3763
3764 private String onRegionSplit(ServerName sn, TransitionCode code,
3765 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3766 String s = checkInStateForSplit(sn, p, a, b);
3767 if (!org.apache.commons.lang.StringUtils.isEmpty(s)) {
3768 return s;
3769 }
3770
3771 regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
3772 regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
3773 regionStates.updateRegionState(p, State.SPLITTING);
3774
3775 if (code == TransitionCode.SPLIT) {
3776 if (TEST_SKIP_SPLIT_HANDLING) {
3777 return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
3778 }
3779 regionOffline(p, State.SPLIT);
3780 regionOnline(a, sn, 1);
3781 regionOnline(b, sn, 1);
3782
3783
3784 if (getTableStateManager().isTableState(p.getTable(),
3785 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3786 invokeUnAssign(a);
3787 invokeUnAssign(b);
3788 } else {
3789 Callable<Object> splitReplicasCallable = new Callable<Object>() {
3790 @Override
3791 public Object call() {
3792 doSplittingOfReplicas(p, a, b);
3793 return null;
3794 }
3795 };
3796 threadPoolExecutorService.submit(splitReplicasCallable);
3797 }
3798 } else if (code == TransitionCode.SPLIT_PONR) {
3799 try {
3800 regionStates.splitRegion(p, a, b, sn);
3801 } catch (IOException ioe) {
3802 LOG.info("Failed to record split region " + p.getShortNameToLog());
3803 return "Failed to record the splitting in meta";
3804 }
3805 }
3806 return null;
3807 }
3808
3809 private String onRegionMerge(ServerName sn, TransitionCode code,
3810 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3811 RegionState rs_p = regionStates.getRegionState(p);
3812 RegionState rs_a = regionStates.getRegionState(a);
3813 RegionState rs_b = regionStates.getRegionState(b);
3814 if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
3815 && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3816 return "Not in state good for merge";
3817 }
3818
3819 regionStates.updateRegionState(a, State.MERGING);
3820 regionStates.updateRegionState(b, State.MERGING);
3821 regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3822
3823 String encodedName = p.getEncodedName();
3824 if (code == TransitionCode.READY_TO_MERGE) {
3825 mergingRegions.put(encodedName,
3826 new PairOfSameType<HRegionInfo>(a, b));
3827 } else if (code == TransitionCode.MERGED) {
3828 mergingRegions.remove(encodedName);
3829 regionOffline(a, State.MERGED);
3830 regionOffline(b, State.MERGED);
3831 regionOnline(p, sn, 1);
3832
3833
3834 if (getTableStateManager().isTableState(p.getTable(),
3835 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3836 invokeUnAssign(p);
3837 } else {
3838 Callable<Object> mergeReplicasCallable = new Callable<Object>() {
3839 @Override
3840 public Object call() {
3841 doMergingOfReplicas(p, a, b);
3842 return null;
3843 }
3844 };
3845 threadPoolExecutorService.submit(mergeReplicasCallable);
3846 }
3847 } else if (code == TransitionCode.MERGE_PONR) {
3848 try {
3849 regionStates.mergeRegions(p, a, b, sn);
3850 } catch (IOException ioe) {
3851 LOG.info("Failed to record merged region " + p.getShortNameToLog());
3852 return "Failed to record the merging in meta";
3853 }
3854 } else {
3855 mergingRegions.remove(encodedName);
3856 regionOnline(a, sn);
3857 regionOnline(b, sn);
3858 regionOffline(p);
3859
3860 if (getTableStateManager().isTableState(p.getTable(),
3861 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3862 invokeUnAssign(a);
3863 invokeUnAssign(b);
3864 }
3865 }
3866 return null;
3867 }
3868
3869
3870
3871
3872
3873 private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3874 final String prettyPrintedRegionName, final ServerName sn) {
3875 if (!serverManager.isServerOnline(sn)) {
3876 LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3877 return false;
3878 }
3879 byte [] payloadOfMerging = rt.getPayload();
3880 List<HRegionInfo> mergingRegions;
3881 try {
3882 mergingRegions = HRegionInfo.parseDelimitedFrom(
3883 payloadOfMerging, 0, payloadOfMerging.length);
3884 } catch (IOException e) {
3885 LOG.error("Dropped merging! Failed reading " + rt.getEventType()
3886 + " payload for " + prettyPrintedRegionName);
3887 return false;
3888 }
3889 assert mergingRegions.size() == 3;
3890 HRegionInfo p = mergingRegions.get(0);
3891 HRegionInfo hri_a = mergingRegions.get(1);
3892 HRegionInfo hri_b = mergingRegions.get(2);
3893
3894 RegionState rs_p = regionStates.getRegionState(p);
3895 RegionState rs_a = regionStates.getRegionState(hri_a);
3896 RegionState rs_b = regionStates.getRegionState(hri_b);
3897
3898 if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
3899 && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
3900 && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3901 LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
3902 + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3903 return false;
3904 }
3905
3906 EventType et = rt.getEventType();
3907 if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
3908 try {
3909 RegionMergeCoordination.RegionMergeDetails std =
3910 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3911 .getRegionMergeCoordination().getDefaultDetails();
3912 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3913 .getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std);
3914 if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) {
3915 byte[] data = ZKAssign.getData(watcher, encodedName);
3916 EventType currentType = null;
3917 if (data != null) {
3918 RegionTransition newRt = RegionTransition.parseFrom(data);
3919 currentType = newRt.getEventType();
3920 }
3921 if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
3922 && currentType != EventType.RS_ZK_REGION_MERGING)) {
3923 LOG.warn("Failed to transition pending_merge node "
3924 + encodedName + " to merging, it's now " + currentType);
3925 return false;
3926 }
3927 }
3928 } catch (Exception e) {
3929 LOG.warn("Failed to transition pending_merge node "
3930 + encodedName + " to merging", e);
3931 return false;
3932 }
3933 }
3934
3935 synchronized (regionStates) {
3936 regionStates.updateRegionState(hri_a, State.MERGING);
3937 regionStates.updateRegionState(hri_b, State.MERGING);
3938 regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3939
3940 if (et != EventType.RS_ZK_REGION_MERGED) {
3941 this.mergingRegions.put(encodedName,
3942 new PairOfSameType<HRegionInfo>(hri_a, hri_b));
3943 } else {
3944 this.mergingRegions.remove(encodedName);
3945 regionOffline(hri_a, State.MERGED);
3946 regionOffline(hri_b, State.MERGED);
3947 regionOnline(p, sn);
3948 }
3949 }
3950
3951 if (et == EventType.RS_ZK_REGION_MERGED) {
3952 doMergingOfReplicas(p, hri_a, hri_b);
3953 LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
3954
3955 try {
3956 boolean successful = false;
3957 while (!successful) {
3958
3959
3960 successful = ZKAssign.deleteNode(watcher, encodedName,
3961 EventType.RS_ZK_REGION_MERGED, sn);
3962 }
3963 } catch (KeeperException e) {
3964 if (e instanceof NoNodeException) {
3965 String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3966 LOG.debug("The znode " + znodePath + " does not exist. May be deleted already.");
3967 } else {
3968 server.abort("Error deleting MERGED node " + encodedName, e);
3969 }
3970 }
3971 LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
3972 + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
3973 + hri_b.getRegionNameAsString() + ", on " + sn);
3974
3975
3976 if (tableStateManager.isTableState(p.getTable(),
3977 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3978 unassign(p);
3979 }
3980 }
3981 return true;
3982 }
3983
3984
3985
3986
3987 private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
3988 final String prettyPrintedRegionName, final ServerName sn) {
3989 if (!serverManager.isServerOnline(sn)) {
3990 LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
3991 return false;
3992 }
3993 byte [] payloadOfSplitting = rt.getPayload();
3994 List<HRegionInfo> splittingRegions;
3995 try {
3996 splittingRegions = HRegionInfo.parseDelimitedFrom(
3997 payloadOfSplitting, 0, payloadOfSplitting.length);
3998 } catch (IOException e) {
3999 LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
4000 + " payload for " + prettyPrintedRegionName);
4001 return false;
4002 }
4003 assert splittingRegions.size() == 2;
4004 HRegionInfo hri_a = splittingRegions.get(0);
4005 HRegionInfo hri_b = splittingRegions.get(1);
4006
4007 RegionState rs_p = regionStates.getRegionState(encodedName);
4008 RegionState rs_a = regionStates.getRegionState(hri_a);
4009 RegionState rs_b = regionStates.getRegionState(hri_b);
4010
4011 if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
4012 && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
4013 && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
4014 LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
4015 + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
4016 return false;
4017 }
4018
4019 if (rs_p == null) {
4020
4021 rs_p = regionStates.updateRegionState(rt, State.OPEN);
4022 if (rs_p == null) {
4023 LOG.warn("Received splitting for region " + prettyPrintedRegionName
4024 + " from server " + sn + " but it doesn't exist anymore,"
4025 + " probably already processed its split");
4026 return false;
4027 }
4028 regionStates.regionOnline(rs_p.getRegion(), sn);
4029 }
4030
4031 HRegionInfo p = rs_p.getRegion();
4032 EventType et = rt.getEventType();
4033 if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
4034 try {
4035 SplitTransactionDetails std =
4036 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4037 .getSplitTransactionCoordination().getDefaultDetails();
4038 if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4039 .getSplitTransactionCoordination().processTransition(p, hri_a, hri_b, sn, std) == -1) {
4040 byte[] data = ZKAssign.getData(watcher, encodedName);
4041 EventType currentType = null;
4042 if (data != null) {
4043 RegionTransition newRt = RegionTransition.parseFrom(data);
4044 currentType = newRt.getEventType();
4045 }
4046 if (currentType == null
4047 || (currentType != EventType.RS_ZK_REGION_SPLIT && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
4048 LOG.warn("Failed to transition pending_split node " + encodedName
4049 + " to splitting, it's now " + currentType);
4050 return false;
4051 }
4052 }
4053 } catch (Exception e) {
4054 LOG.warn("Failed to transition pending_split node " + encodedName + " to splitting", e);
4055 return false;
4056 }
4057 }
4058
4059 synchronized (regionStates) {
4060 splitRegions.put(p, new PairOfSameType<HRegionInfo>(hri_a, hri_b));
4061 regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
4062 regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
4063 regionStates.updateRegionState(rt, State.SPLITTING);
4064
4065
4066
4067 if (TEST_SKIP_SPLIT_HANDLING) {
4068 LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
4069 return true;
4070 }
4071
4072 if (et == EventType.RS_ZK_REGION_SPLIT) {
4073 regionOffline(p, State.SPLIT);
4074 regionOnline(hri_a, sn);
4075 regionOnline(hri_b, sn);
4076 splitRegions.remove(p);
4077 }
4078 }
4079
4080 if (et == EventType.RS_ZK_REGION_SPLIT) {
4081
4082 doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b);
4083 LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
4084
4085 try {
4086 boolean successful = false;
4087 while (!successful) {
4088
4089
4090 successful = ZKAssign.deleteNode(watcher, encodedName,
4091 EventType.RS_ZK_REGION_SPLIT, sn);
4092 }
4093 } catch (KeeperException e) {
4094 if (e instanceof NoNodeException) {
4095 String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
4096 LOG.debug("The znode " + znodePath + " does not exist. May be deleted already.");
4097 } else {
4098 server.abort("Error deleting SPLIT node " + encodedName, e);
4099 }
4100 }
4101 LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
4102 + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
4103 + hri_b.getRegionNameAsString() + ", on " + sn);
4104
4105
4106 if (tableStateManager.isTableState(p.getTable(),
4107 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
4108 unassign(hri_a);
4109 unassign(hri_b);
4110 }
4111 }
4112 return true;
4113 }
4114
4115 private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
4116 final HRegionInfo hri_b) {
4117
4118
4119 List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
4120 unmergedRegions.add(hri_a);
4121 unmergedRegions.add(hri_b);
4122 Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
4123 Collection<List<HRegionInfo>> c = map.values();
4124 for (List<HRegionInfo> l : c) {
4125 for (HRegionInfo h : l) {
4126 if (!RegionReplicaUtil.isDefaultReplica(h)) {
4127 LOG.debug("Unassigning un-merged replica " + h);
4128 unassign(h);
4129 }
4130 }
4131 }
4132 int numReplicas = 1;
4133 try {
4134 numReplicas = server.getTableDescriptors().get(mergedHri.getTable()).
4135 getRegionReplication();
4136 } catch (IOException e) {
4137 LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
4138 " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
4139 "will not be done");
4140 }
4141 List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
4142 for (int i = 1; i < numReplicas; i++) {
4143 regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
4144 }
4145 try {
4146 assign(regions);
4147 } catch (IOException ioe) {
4148 LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
4149 ioe.getMessage());
4150 } catch (InterruptedException ie) {
4151 LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
4152 ie.getMessage());
4153 }
4154 }
4155
4156 private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
4157 final HRegionInfo hri_b) {
4158
4159
4160
4161 int numReplicas = 1;
4162 try {
4163 numReplicas = server.getTableDescriptors().get(parentHri.getTable()).
4164 getRegionReplication();
4165 } catch (IOException e) {
4166 LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
4167 " due to " + e.getMessage() + ". The assignment of daughter replicas " +
4168 "replicas will not be done");
4169 }
4170
4171 List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
4172 parentRegion.add(parentHri);
4173 Map<ServerName, List<HRegionInfo>> currentAssign =
4174 regionStates.getRegionAssignments(parentRegion);
4175 Collection<List<HRegionInfo>> c = currentAssign.values();
4176 for (List<HRegionInfo> l : c) {
4177 for (HRegionInfo h : l) {
4178 if (!RegionReplicaUtil.isDefaultReplica(h)) {
4179 LOG.debug("Unassigning parent's replica " + h);
4180 unassign(h);
4181 }
4182 }
4183 }
4184
4185 Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
4186 for (int i = 1; i < numReplicas; i++) {
4187 prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
4188 prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
4189 }
4190 try {
4191 assign(map);
4192 } catch (IOException e) {
4193 LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4194 } catch (InterruptedException e) {
4195 LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4196 }
4197 }
4198
4199 private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
4200 int replicaId, Map<HRegionInfo, ServerName> map) {
4201 HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
4202 HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
4203 replicaId);
4204 LOG.debug("Created replica region for daughter " + daughterReplica);
4205 ServerName sn;
4206 if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
4207 map.put(daughterReplica, sn);
4208 } else {
4209 List<ServerName> servers = serverManager.getOnlineServersList();
4210 sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
4211 map.put(daughterReplica, sn);
4212 }
4213 }
4214
4215 public Set<HRegionInfo> getReplicasToClose() {
4216 return replicasToClose;
4217 }
4218
4219
4220
4221
4222
4223
4224 private void regionOffline(final HRegionInfo regionInfo, final State state) {
4225 regionStates.regionOffline(regionInfo, state);
4226 removeClosedRegion(regionInfo);
4227
4228 clearRegionPlan(regionInfo);
4229 balancer.regionOffline(regionInfo);
4230
4231
4232 sendRegionClosedNotification(regionInfo);
4233
4234 if (state != null && state.equals(State.SPLIT)) {
4235 Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4236 c.add(regionInfo);
4237 Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4238 Collection<List<HRegionInfo>> allReplicas = map.values();
4239 for (List<HRegionInfo> list : allReplicas) {
4240 replicasToClose.addAll(list);
4241 }
4242 }
4243 else if (state != null && state.equals(State.MERGED)) {
4244 Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4245 c.add(regionInfo);
4246 Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4247 Collection<List<HRegionInfo>> allReplicas = map.values();
4248 for (List<HRegionInfo> list : allReplicas) {
4249 replicasToClose.addAll(list);
4250 }
4251 }
4252 }
4253
4254 private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
4255 final ServerName serverName) {
4256 if (!this.listeners.isEmpty()) {
4257 for (AssignmentListener listener : this.listeners) {
4258 listener.regionOpened(regionInfo, serverName);
4259 }
4260 }
4261 }
4262
4263 private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
4264 if (!this.listeners.isEmpty()) {
4265 for (AssignmentListener listener : this.listeners) {
4266 listener.regionClosed(regionInfo);
4267 }
4268 }
4269 }
4270
4271
4272
4273
4274
4275
4276
4277
4278
4279
4280
4281
4282
4283
4284
4285
4286
4287
4288
4289
4290
4291
4292
4293
4294
4295
4296
4297
4298
4299
4300
4301
4302
4303
4304
4305
4306
4307
4308
4309
4310
4311
4312
4313
4314
4315 protected String onRegionTransition(final ServerName serverName,
4316 final RegionStateTransition transition) {
4317 TransitionCode code = transition.getTransitionCode();
4318 HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
4319 RegionState current = regionStates.getRegionState(hri);
4320 if (LOG.isDebugEnabled()) {
4321 LOG.debug("Got transition " + code + " for "
4322 + (current != null ? current.toString() : hri.getShortNameToLog())
4323 + " from " + serverName);
4324 }
4325 String errorMsg = null;
4326 switch (code) {
4327 case OPENED:
4328 if (current != null && current.isOpened() && current.isOnServer(serverName)) {
4329 LOG.info("Region " + hri.getShortNameToLog() + " is already " + current.getState() + " on "
4330 + serverName);
4331 break;
4332 }
4333 case FAILED_OPEN:
4334 if (current == null
4335 || !current.isPendingOpenOrOpeningOnServer(serverName)) {
4336 errorMsg = hri.getShortNameToLog()
4337 + " is not pending open on " + serverName;
4338 } else if (code == TransitionCode.FAILED_OPEN) {
4339 onRegionFailedOpen(hri, serverName);
4340 } else {
4341 long openSeqNum = HConstants.NO_SEQNUM;
4342 if (transition.hasOpenSeqNum()) {
4343 openSeqNum = transition.getOpenSeqNum();
4344 }
4345 if (openSeqNum < 0) {
4346 errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
4347 } else {
4348 onRegionOpen(hri, serverName, openSeqNum);
4349 }
4350 }
4351 break;
4352
4353 case CLOSED:
4354 if (current == null
4355 || !current.isPendingCloseOrClosingOnServer(serverName)) {
4356 errorMsg = hri.getShortNameToLog()
4357 + " is not pending close on " + serverName;
4358 } else {
4359 onRegionClosed(hri);
4360 }
4361 break;
4362
4363 case READY_TO_SPLIT:
4364 try {
4365 regionStateListener.onRegionSplit(hri);
4366 } catch (IOException exp) {
4367 errorMsg = StringUtils.stringifyException(exp);
4368 }
4369 break;
4370 case SPLIT_PONR:
4371 case SPLIT:
4372 errorMsg =
4373 onRegionSplit(serverName, code, hri, HRegionInfo.convert(transition.getRegionInfo(1)),
4374 HRegionInfo.convert(transition.getRegionInfo(2)));
4375 break;
4376
4377 case SPLIT_REVERTED:
4378 errorMsg =
4379 onRegionSplitReverted(serverName, hri,
4380 HRegionInfo.convert(transition.getRegionInfo(1)),
4381 HRegionInfo.convert(transition.getRegionInfo(2)));
4382 if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4383 try {
4384 regionStateListener.onRegionSplitReverted(hri);
4385 } catch (IOException exp) {
4386 LOG.warn(StringUtils.stringifyException(exp));
4387 }
4388 }
4389 break;
4390 case READY_TO_MERGE:
4391 case MERGE_PONR:
4392 case MERGED:
4393 case MERGE_REVERTED:
4394 errorMsg = onRegionMerge(serverName, code, hri,
4395 HRegionInfo.convert(transition.getRegionInfo(1)),
4396 HRegionInfo.convert(transition.getRegionInfo(2)));
4397 if (code == TransitionCode.MERGED && org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4398 try {
4399 regionStateListener.onRegionMerged(hri);
4400 } catch (IOException exp) {
4401 errorMsg = StringUtils.stringifyException(exp);
4402 }
4403 }
4404 break;
4405
4406 default:
4407 errorMsg = "Unexpected transition code " + code;
4408 }
4409 if (errorMsg != null) {
4410 LOG.error("Failed to transtion region from " + current + " to "
4411 + code + " by " + serverName + ": " + errorMsg);
4412 }
4413 return errorMsg;
4414 }
4415
4416
4417
4418
4419 public LoadBalancer getBalancer() {
4420 return this.balancer;
4421 }
4422
4423 public Map<ServerName, List<HRegionInfo>>
4424 getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
4425 return getRegionStates().getRegionAssignments(infos);
4426 }
4427
4428 void setRegionStateListener(RegionStateListener listener) {
4429 this.regionStateListener = listener;
4430 }
4431 }