1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.NavigableMap;
31 import java.util.Set;
32 import java.util.TreeMap;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentSkipListSet;
35 import java.util.concurrent.CopyOnWriteArrayList;
36 import java.util.concurrent.ThreadFactory;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.concurrent.locks.Lock;
41 import java.util.concurrent.locks.ReentrantLock;
42
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45 import org.apache.hadoop.classification.InterfaceAudience;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.hbase.Chore;
48 import org.apache.hadoop.hbase.HBaseIOException;
49 import org.apache.hadoop.hbase.HConstants;
50 import org.apache.hadoop.hbase.HRegionInfo;
51 import org.apache.hadoop.hbase.NotServingRegionException;
52 import org.apache.hadoop.hbase.RegionTransition;
53 import org.apache.hadoop.hbase.Server;
54 import org.apache.hadoop.hbase.ServerName;
55 import org.apache.hadoop.hbase.Stoppable;
56 import org.apache.hadoop.hbase.TableName;
57 import org.apache.hadoop.hbase.TableNotFoundException;
58 import org.apache.hadoop.hbase.catalog.CatalogTracker;
59 import org.apache.hadoop.hbase.catalog.MetaReader;
60 import org.apache.hadoop.hbase.client.Result;
61 import org.apache.hadoop.hbase.exceptions.DeserializationException;
62 import org.apache.hadoop.hbase.executor.EventHandler;
63 import org.apache.hadoop.hbase.executor.EventType;
64 import org.apache.hadoop.hbase.executor.ExecutorService;
65 import org.apache.hadoop.hbase.ipc.RpcClient;
66 import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
67 import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
68 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
69 import org.apache.hadoop.hbase.master.RegionState.State;
70 import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
71 import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
72 import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
73 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
74 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
75 import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
76 import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
77 import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
78 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
79 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
80 import org.apache.hadoop.hbase.regionserver.SplitTransaction;
81 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
82 import org.apache.hadoop.hbase.util.KeyLocker;
83 import org.apache.hadoop.hbase.util.Pair;
84 import org.apache.hadoop.hbase.util.PairOfSameType;
85 import org.apache.hadoop.hbase.util.Threads;
86 import org.apache.hadoop.hbase.util.Triple;
87 import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
88 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
89 import org.apache.hadoop.hbase.zookeeper.ZKTable;
90 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
91 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
92 import org.apache.hadoop.ipc.RemoteException;
93 import org.apache.zookeeper.AsyncCallback;
94 import org.apache.zookeeper.KeeperException;
95 import org.apache.zookeeper.KeeperException.NoNodeException;
96 import org.apache.zookeeper.KeeperException.NodeExistsException;
97 import org.apache.zookeeper.data.Stat;
98
99 import com.google.common.base.Preconditions;
100 import com.google.common.collect.LinkedHashMultimap;
101
102
103
104
105
106
107
108
109 @InterfaceAudience.Private
110 public class AssignmentManager extends ZooKeeperListener {
111 private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
112
113 public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
114 -1, -1L);
115
116 public static final String ASSIGNMENT_TIMEOUT = "hbase.master.assignment.timeoutmonitor.timeout";
117 public static final int DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT = 600000;
118 public static final String ASSIGNMENT_TIMEOUT_MANAGEMENT = "hbase.assignment.timeout.management";
119 public static final boolean DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT = false;
120
121 public static final String ALREADY_IN_TRANSITION_WAITTIME
122 = "hbase.assignment.already.intransition.waittime";
123 public static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000;
124
125 protected final Server server;
126
127 private ServerManager serverManager;
128
129 private boolean shouldAssignRegionsWithFavoredNodes;
130
131 private CatalogTracker catalogTracker;
132
133 protected final TimeoutMonitor timeoutMonitor;
134
135 private final TimerUpdater timerUpdater;
136
137 private LoadBalancer balancer;
138
139 private final MetricsAssignmentManager metricsAssignmentManager;
140
141 private final TableLockManager tableLockManager;
142
143 private AtomicInteger numRegionsOpened = new AtomicInteger(0);
144
145 final private KeyLocker<String> locker = new KeyLocker<String>();
146
147
148
149
150
151 private final Map <String, HRegionInfo> regionsToReopen;
152
153
154
155
156
157 private final int maximumAttempts;
158
159
160
161
162 private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
163 = new HashMap<String, PairOfSameType<HRegionInfo>>();
164
165
166
167
168
169 private final long sleepTimeBeforeRetryingMetaAssignment;
170
171
172
173
174
175 final NavigableMap<String, RegionPlan> regionPlans =
176 new TreeMap<String, RegionPlan>();
177
178 private final ZKTable zkTable;
179
180
181
182
183
184 private final ConcurrentSkipListSet<ServerName> serversInUpdatingTimer;
185
186 private final ExecutorService executorService;
187
188
189 private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
190
191
192 private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
193
194
195 private java.util.concurrent.ExecutorService threadPoolExecutorService;
196
197
198 private final java.util.concurrent.ExecutorService zkEventWorkers;
199
200 private List<EventType> ignoreStatesRSOffline = Arrays.asList(
201 EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
202
203 private final RegionStates regionStates;
204
205
206
207
208
209 private final int bulkAssignThresholdRegions;
210 private final int bulkAssignThresholdServers;
211
212
213
214
215 private final boolean bulkAssignWaitTillAllAssigned;
216
217
218
219
220
221
222
223
224
225 protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
226
227
228 private final boolean tomActivated;
229
230
231
232
233
234
235
236
237 private final ConcurrentHashMap<String, AtomicInteger>
238 failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
239
240
241
242
243 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
244 public static boolean TEST_SKIP_SPLIT_HANDLING = false;
245
246
247 private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
248
249
250
251
252
253
254
255
256
257
258
259 public AssignmentManager(Server server, ServerManager serverManager,
260 CatalogTracker catalogTracker, final LoadBalancer balancer,
261 final ExecutorService service, MetricsMaster metricsMaster,
262 final TableLockManager tableLockManager) throws KeeperException, IOException {
263 super(server.getZooKeeper());
264 this.server = server;
265 this.serverManager = serverManager;
266 this.catalogTracker = catalogTracker;
267 this.executorService = service;
268 this.regionsToReopen = Collections.synchronizedMap
269 (new HashMap<String, HRegionInfo> ());
270 Configuration conf = server.getConfiguration();
271
272 this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
273 HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
274 FavoredNodeLoadBalancer.class);
275 this.tomActivated = conf.getBoolean(
276 ASSIGNMENT_TIMEOUT_MANAGEMENT, DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT);
277 if (tomActivated){
278 this.serversInUpdatingTimer = new ConcurrentSkipListSet<ServerName>();
279 this.timeoutMonitor = new TimeoutMonitor(
280 conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000),
281 server, serverManager,
282 conf.getInt(ASSIGNMENT_TIMEOUT, DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT));
283 this.timerUpdater = new TimerUpdater(conf.getInt(
284 "hbase.master.assignment.timerupdater.period", 10000), server);
285 Threads.setDaemonThreadRunning(timerUpdater.getThread(),
286 server.getServerName() + ".timerUpdater");
287 } else {
288 this.serversInUpdatingTimer = null;
289 this.timeoutMonitor = null;
290 this.timerUpdater = null;
291 }
292 this.zkTable = new ZKTable(this.watcher);
293
294 this.maximumAttempts = Math.max(1,
295 this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
296 this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
297 "hbase.meta.assignment.retry.sleeptime", 1000l);
298 this.balancer = balancer;
299 int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
300 this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
301 maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
302 this.regionStates = new RegionStates(server, serverManager);
303
304 this.bulkAssignWaitTillAllAssigned =
305 conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
306 this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
307 this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
308
309 int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
310 ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
311 zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
312 TimeUnit.SECONDS, threadFactory);
313 this.tableLockManager = tableLockManager;
314
315 this.metricsAssignmentManager = new MetricsAssignmentManager();
316 }
317
318 void startTimeOutMonitor() {
319 if (tomActivated) {
320 Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), server.getServerName()
321 + ".timeoutMonitor");
322 }
323 }
324
325
326
327
328
329 public void registerListener(final AssignmentListener listener) {
330 this.listeners.add(listener);
331 }
332
333
334
335
336
337 public boolean unregisterListener(final AssignmentListener listener) {
338 return this.listeners.remove(listener);
339 }
340
341
342
343
344 public ZKTable getZKTable() {
345
346
347 return this.zkTable;
348 }
349
350
351
352
353
354
355
356 public RegionStates getRegionStates() {
357 return regionStates;
358 }
359
360 public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
361 return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
362 }
363
364
365
366
367
368
369 public void addPlan(String encodedName, RegionPlan plan) {
370 synchronized (regionPlans) {
371 regionPlans.put(encodedName, plan);
372 }
373 }
374
375
376
377
378 public void addPlans(Map<String, RegionPlan> plans) {
379 synchronized (regionPlans) {
380 regionPlans.putAll(plans);
381 }
382 }
383
384
385
386
387
388
389
390
391 public void setRegionsToReopen(List <HRegionInfo> regions) {
392 for(HRegionInfo hri : regions) {
393 regionsToReopen.put(hri.getEncodedName(), hri);
394 }
395 }
396
397
398
399
400
401
402
403
404 public Pair<Integer, Integer> getReopenStatus(TableName tableName)
405 throws IOException {
406 List <HRegionInfo> hris =
407 MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName, true);
408 Integer pending = 0;
409 for (HRegionInfo hri : hris) {
410 String name = hri.getEncodedName();
411
412 if (regionsToReopen.containsKey(name)
413 || regionStates.isRegionInTransition(name)) {
414 pending++;
415 }
416 }
417 return new Pair<Integer, Integer>(pending, hris.size());
418 }
419
420
421
422
423
424
425 public boolean isFailoverCleanupDone() {
426 return failoverCleanupDone.get();
427 }
428
429
430
431
432
433 public Lock acquireRegionLock(final String encodedName) {
434 return locker.acquireLock(encodedName);
435 }
436
437
438
439
440
441 void failoverCleanupDone() {
442 failoverCleanupDone.set(true);
443 serverManager.processQueuedDeadServers();
444 }
445
446
447
448
449
450
451
452
453 void joinCluster() throws IOException,
454 KeeperException, InterruptedException {
455
456
457
458
459
460
461
462
463
464
465 Map<ServerName, List<HRegionInfo>> deadServers = rebuildUserRegions();
466
467
468
469
470 processDeadServersAndRegionsInTransition(deadServers);
471
472 recoverTableInDisablingState();
473 recoverTableInEnablingState();
474 }
475
476
477
478
479
480
481
482
483
484
485
486
487 void processDeadServersAndRegionsInTransition(
488 final Map<ServerName, List<HRegionInfo>> deadServers)
489 throws KeeperException, IOException, InterruptedException {
490 List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
491 watcher.assignmentZNode);
492
493 if (nodes == null) {
494 String errorMessage = "Failed to get the children from ZK";
495 server.abort(errorMessage, new IOException(errorMessage));
496 return;
497 }
498
499 boolean failover = (!serverManager.getDeadServers().isEmpty() || !serverManager
500 .getRequeuedDeadServers().isEmpty());
501
502 if (!failover) {
503
504 Map<HRegionInfo, ServerName> regions = regionStates.getRegionAssignments();
505 for (HRegionInfo hri: regions.keySet()) {
506 if (!hri.isMetaTable()) {
507 LOG.debug("Found " + hri + " out on cluster");
508 failover = true;
509 break;
510 }
511 }
512 if (!failover) {
513
514 for (String encodedName: nodes) {
515 RegionState state = regionStates.getRegionState(encodedName);
516 if (state != null && !state.getRegion().isMetaRegion()) {
517 LOG.debug("Found " + state.getRegion().getRegionNameAsString() + " in RITs");
518 failover = true;
519 break;
520 }
521 }
522 }
523 }
524
525
526 if (failover) {
527 LOG.info("Found regions out on cluster or in RIT; presuming failover");
528
529
530 processDeadServersAndRecoverLostRegions(deadServers);
531 } else {
532
533 LOG.info("Clean cluster startup. Assigning userregions");
534 assignAllUserRegions();
535 }
536 }
537
538
539
540
541
542
543
544
545
546
547
548
549 boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
550 throws InterruptedException, KeeperException, IOException {
551 String encodedRegionName = hri.getEncodedName();
552 if (!processRegionInTransition(encodedRegionName, hri)) {
553 return false;
554 }
555 LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
556 while (!this.server.isStopped() &&
557 this.regionStates.isRegionInTransition(encodedRegionName)) {
558 RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
559 if (state == null || !serverManager.isServerOnline(state.getServerName())) {
560
561
562
563 break;
564 }
565 this.regionStates.waitForUpdate(100);
566 }
567 return true;
568 }
569
570
571
572
573
574
575
576
577
578
579 boolean processRegionInTransition(final String encodedRegionName,
580 final HRegionInfo regionInfo) throws KeeperException, IOException {
581
582
583
584
585 Lock lock = locker.acquireLock(encodedRegionName);
586 try {
587 Stat stat = new Stat();
588 byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
589 if (data == null) return false;
590 RegionTransition rt;
591 try {
592 rt = RegionTransition.parseFrom(data);
593 } catch (DeserializationException e) {
594 LOG.warn("Failed parse znode data", e);
595 return false;
596 }
597 HRegionInfo hri = regionInfo;
598 if (hri == null) {
599
600
601
602
603
604 hri = regionStates.getRegionInfo(rt.getRegionName());
605 EventType et = rt.getEventType();
606 if (hri == null && et != EventType.RS_ZK_REGION_MERGING
607 && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
608 LOG.warn("Couldn't find the region in recovering " + rt);
609 return false;
610 }
611 }
612 return processRegionsInTransition(
613 rt, hri, stat.getVersion());
614 } finally {
615 lock.unlock();
616 }
617 }
618
619
620
621
622
623
624
625
626
627 boolean processRegionsInTransition(
628 final RegionTransition rt, final HRegionInfo regionInfo,
629 final int expectedVersion) throws KeeperException {
630 EventType et = rt.getEventType();
631
632 final ServerName sn = rt.getServerName();
633 final byte[] regionName = rt.getRegionName();
634 final String encodedName = HRegionInfo.encodeRegionName(regionName);
635 final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
636 LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
637
638 if (regionStates.isRegionInTransition(encodedName)) {
639 LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
640 + et + ", does nothing since the region is already in transition "
641 + regionStates.getRegionTransitionState(encodedName));
642
643 return true;
644 }
645 if (!serverManager.isServerOnline(sn)) {
646
647
648
649 LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
650 " was on deadserver; forcing offline");
651 if (regionStates.isRegionOnline(regionInfo)) {
652
653
654
655 regionStates.regionOffline(regionInfo);
656 sendRegionClosedNotification(regionInfo);
657 }
658
659 regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
660
661 if (regionInfo.isMetaRegion()) {
662
663
664 MetaRegionTracker.setMetaLocation(watcher, sn);
665 } else {
666
667
668 regionStates.setLastRegionServerOfRegion(sn, encodedName);
669
670 if (!serverManager.isServerDead(sn)) {
671 serverManager.expireServer(sn);
672 }
673 }
674 return false;
675 }
676 switch (et) {
677 case M_ZK_REGION_CLOSING:
678
679
680 final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
681 this.executorService.submit(
682 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
683 @Override
684 public void process() throws IOException {
685 ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
686 try {
687 unassign(regionInfo, rsClosing, expectedVersion, null, true, null);
688 if (regionStates.isRegionOffline(regionInfo)) {
689 assign(regionInfo, true);
690 }
691 } finally {
692 lock.unlock();
693 }
694 }
695 });
696 break;
697
698 case RS_ZK_REGION_CLOSED:
699 case RS_ZK_REGION_FAILED_OPEN:
700
701 regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
702 invokeAssign(regionInfo);
703 break;
704
705 case M_ZK_REGION_OFFLINE:
706
707 regionStates.updateRegionState(rt, State.PENDING_OPEN);
708 final RegionState rsOffline = regionStates.getRegionState(regionInfo);
709 this.executorService.submit(
710 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
711 @Override
712 public void process() throws IOException {
713 ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
714 try {
715 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
716 addPlan(encodedName, plan);
717 assign(rsOffline, false, false);
718 } finally {
719 lock.unlock();
720 }
721 }
722 });
723 break;
724
725 case RS_ZK_REGION_OPENING:
726 regionStates.updateRegionState(rt, State.OPENING);
727 break;
728
729 case RS_ZK_REGION_OPENED:
730
731
732
733 regionStates.updateRegionState(rt, State.OPEN);
734 new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
735 break;
736 case RS_ZK_REQUEST_REGION_SPLIT:
737 case RS_ZK_REGION_SPLITTING:
738 case RS_ZK_REGION_SPLIT:
739
740
741
742 regionStates.regionOnline(regionInfo, sn);
743 regionStates.updateRegionState(rt, State.SPLITTING);
744 if (!handleRegionSplitting(
745 rt, encodedName, prettyPrintedRegionName, sn)) {
746 deleteSplittingNode(encodedName, sn);
747 }
748 break;
749 case RS_ZK_REQUEST_REGION_MERGE:
750 case RS_ZK_REGION_MERGING:
751 case RS_ZK_REGION_MERGED:
752 if (!handleRegionMerging(
753 rt, encodedName, prettyPrintedRegionName, sn)) {
754 deleteMergingNode(encodedName, sn);
755 }
756 break;
757 default:
758 throw new IllegalStateException("Received region in state:" + et + " is not valid.");
759 }
760 LOG.info("Processed region " + prettyPrintedRegionName + " in state "
761 + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
762 + "server: " + sn);
763 return true;
764 }
765
766
767
768
769
770 public void removeClosedRegion(HRegionInfo hri) {
771 if (regionsToReopen.remove(hri.getEncodedName()) != null) {
772 LOG.debug("Removed region from reopening regions because it was closed");
773 }
774 }
775
776
777
778
779
780
781
782
783
784
785
786 void handleRegion(final RegionTransition rt, int expectedVersion) {
787 if (rt == null) {
788 LOG.warn("Unexpected NULL input for RegionTransition rt");
789 return;
790 }
791 final ServerName sn = rt.getServerName();
792
793 if (sn.equals(HBCK_CODE_SERVERNAME)) {
794 handleHBCK(rt);
795 return;
796 }
797 final long createTime = rt.getCreateTime();
798 final byte[] regionName = rt.getRegionName();
799 String encodedName = HRegionInfo.encodeRegionName(regionName);
800 String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
801
802 if (!serverManager.isServerOnline(sn)
803 && !ignoreStatesRSOffline.contains(rt.getEventType())) {
804 LOG.warn("Attempted to handle region transition for server but " +
805 "it is not online: " + prettyPrintedRegionName + ", " + rt);
806 return;
807 }
808
809 RegionState regionState =
810 regionStates.getRegionState(encodedName);
811 long startTime = System.currentTimeMillis();
812 if (LOG.isDebugEnabled()) {
813 boolean lateEvent = createTime < (startTime - 15000);
814 LOG.debug("Handling " + rt.getEventType() +
815 ", server=" + sn + ", region=" +
816 (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
817 (lateEvent ? ", which is more than 15 seconds late" : "") +
818 ", current_state=" + regionState);
819 }
820
821
822 if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
823 return;
824 }
825
826
827 Lock lock = locker.acquireLock(encodedName);
828 try {
829 RegionState latestState =
830 regionStates.getRegionState(encodedName);
831 if ((regionState == null && latestState != null)
832 || (regionState != null && latestState == null)
833 || (regionState != null && latestState != null
834 && latestState.getState() != regionState.getState())) {
835 LOG.warn("Region state changed from " + regionState + " to "
836 + latestState + ", while acquiring lock");
837 }
838 long waitedTime = System.currentTimeMillis() - startTime;
839 if (waitedTime > 5000) {
840 LOG.warn("Took " + waitedTime + "ms to acquire the lock");
841 }
842 regionState = latestState;
843 switch (rt.getEventType()) {
844 case RS_ZK_REQUEST_REGION_SPLIT:
845 case RS_ZK_REGION_SPLITTING:
846 case RS_ZK_REGION_SPLIT:
847 if (!handleRegionSplitting(
848 rt, encodedName, prettyPrintedRegionName, sn)) {
849 deleteSplittingNode(encodedName, sn);
850 }
851 break;
852
853 case RS_ZK_REQUEST_REGION_MERGE:
854 case RS_ZK_REGION_MERGING:
855 case RS_ZK_REGION_MERGED:
856
857
858 if (!handleRegionMerging(
859 rt, encodedName, prettyPrintedRegionName, sn)) {
860 deleteMergingNode(encodedName, sn);
861 }
862 break;
863
864 case M_ZK_REGION_CLOSING:
865
866
867 if (regionState == null
868 || !regionState.isPendingCloseOrClosingOnServer(sn)) {
869 LOG.warn("Received CLOSING for " + prettyPrintedRegionName
870 + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
871 + regionStates.getRegionState(encodedName));
872 return;
873 }
874
875 regionStates.updateRegionState(rt, State.CLOSING);
876 break;
877
878 case RS_ZK_REGION_CLOSED:
879
880 if (regionState == null
881 || !regionState.isPendingCloseOrClosingOnServer(sn)) {
882 LOG.warn("Received CLOSED for " + prettyPrintedRegionName
883 + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
884 + regionStates.getRegionState(encodedName));
885 return;
886 }
887
888
889
890 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
891 updateClosedRegionHandlerTracker(regionState.getRegion());
892 break;
893
894 case RS_ZK_REGION_FAILED_OPEN:
895 if (regionState == null
896 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
897 LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
898 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
899 + regionStates.getRegionState(encodedName));
900 return;
901 }
902 AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
903 if (failedOpenCount == null) {
904 failedOpenCount = new AtomicInteger();
905
906
907
908 failedOpenTracker.put(encodedName, failedOpenCount);
909 }
910 if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
911 regionStates.updateRegionState(rt, State.FAILED_OPEN);
912
913
914 failedOpenTracker.remove(encodedName);
915 } else {
916
917 regionState = regionStates.updateRegionState(rt, State.CLOSED);
918 if (regionState != null) {
919
920
921 try {
922 getRegionPlan(regionState.getRegion(), sn, true);
923 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
924 } catch (HBaseIOException e) {
925 LOG.warn("Failed to get region plan", e);
926 }
927 }
928 }
929 break;
930
931 case RS_ZK_REGION_OPENING:
932
933
934 if (regionState == null
935 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
936 LOG.warn("Received OPENING for " + prettyPrintedRegionName
937 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
938 + regionStates.getRegionState(encodedName));
939 return;
940 }
941
942 regionStates.updateRegionState(rt, State.OPENING);
943 break;
944
945 case RS_ZK_REGION_OPENED:
946
947 if (regionState == null
948 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
949 LOG.warn("Received OPENED for " + prettyPrintedRegionName
950 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
951 + regionStates.getRegionState(encodedName));
952
953 if (regionState != null) {
954
955
956
957 unassign(regionState.getRegion(), null, -1, null, false, sn);
958 }
959 return;
960 }
961
962 regionState = regionStates.updateRegionState(rt, State.OPEN);
963 if (regionState != null) {
964 failedOpenTracker.remove(encodedName);
965 new OpenedRegionHandler(
966 server, this, regionState.getRegion(), sn, expectedVersion).process();
967 updateOpenedRegionHandlerTracker(regionState.getRegion());
968 }
969 break;
970
971 default:
972 throw new IllegalStateException("Received event is not valid.");
973 }
974 } finally {
975 lock.unlock();
976 }
977 }
978
979
980 boolean wasClosedHandlerCalled(HRegionInfo hri) {
981 AtomicBoolean b = closedRegionHandlerCalled.get(hri);
982
983
984
985 return b == null ? false : b.compareAndSet(true, false);
986 }
987
988
989 boolean wasOpenedHandlerCalled(HRegionInfo hri) {
990 AtomicBoolean b = openedRegionHandlerCalled.get(hri);
991
992
993
994 return b == null ? false : b.compareAndSet(true, false);
995 }
996
997
998 void initializeHandlerTrackers() {
999 closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1000 openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1001 }
1002
1003 void updateClosedRegionHandlerTracker(HRegionInfo hri) {
1004 if (closedRegionHandlerCalled != null) {
1005 closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1006 }
1007 }
1008
1009 void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
1010 if (openedRegionHandlerCalled != null) {
1011 openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1012 }
1013 }
1014
1015
1016
1017
1018
1019
1020 void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1021 if (!shouldAssignRegionsWithFavoredNodes) return;
1022
1023
1024 Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1025 new HashMap<HRegionInfo, List<ServerName>>();
1026 for (HRegionInfo region : regions) {
1027 regionToFavoredNodes.put(region,
1028 ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1029 }
1030 FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes, catalogTracker);
1031 }
1032
1033
1034
1035
1036
1037
1038
1039 private void handleHBCK(RegionTransition rt) {
1040 String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1041 LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1042 ", server=" + rt.getServerName() + ", region=" +
1043 HRegionInfo.prettyPrint(encodedName));
1044 RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1045 switch (rt.getEventType()) {
1046 case M_ZK_REGION_OFFLINE:
1047 HRegionInfo regionInfo;
1048 if (regionState != null) {
1049 regionInfo = regionState.getRegion();
1050 } else {
1051 try {
1052 byte [] name = rt.getRegionName();
1053 Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(catalogTracker, name);
1054 regionInfo = p.getFirst();
1055 } catch (IOException e) {
1056 LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1057 return;
1058 }
1059 }
1060 LOG.info("HBCK repair is triggering assignment of region=" +
1061 regionInfo.getRegionNameAsString());
1062
1063 assign(regionInfo, false);
1064 break;
1065
1066 default:
1067 LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1068 break;
1069 }
1070
1071 }
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087 @Override
1088 public void nodeCreated(String path) {
1089 handleAssignmentEvent(path);
1090 }
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104 @Override
1105 public void nodeDataChanged(String path) {
1106 handleAssignmentEvent(path);
1107 }
1108
1109
1110
1111
1112
1113 private final Set<String> regionsInProgress = new HashSet<String>();
1114
1115
1116 private final LinkedHashMultimap <String, RegionRunnable>
1117 zkEventWorkerWaitingList = LinkedHashMultimap.create();
1118
1119
1120
1121
1122 private interface RegionRunnable extends Runnable{
1123
1124
1125
1126 String getRegionName();
1127 }
1128
1129
1130
1131
1132
1133 protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1134
1135 synchronized (regionsInProgress) {
1136
1137
1138 if (regionsInProgress.contains(regRunnable.getRegionName())) {
1139 synchronized (zkEventWorkerWaitingList){
1140 zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1141 }
1142 return;
1143 }
1144
1145
1146 regionsInProgress.add(regRunnable.getRegionName());
1147 zkEventWorkers.submit(new Runnable() {
1148 @Override
1149 public void run() {
1150 try {
1151 regRunnable.run();
1152 } finally {
1153
1154
1155 synchronized (regionsInProgress) {
1156 regionsInProgress.remove(regRunnable.getRegionName());
1157 synchronized (zkEventWorkerWaitingList) {
1158 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1159 regRunnable.getRegionName());
1160 if (!waiting.isEmpty()) {
1161
1162 RegionRunnable toSubmit = waiting.iterator().next();
1163 zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1164 zkEventWorkersSubmit(toSubmit);
1165 }
1166 }
1167 }
1168 }
1169 }
1170 });
1171 }
1172 }
1173
1174 @Override
1175 public void nodeDeleted(final String path) {
1176 if (path.startsWith(watcher.assignmentZNode)) {
1177 final String regionName = ZKAssign.getRegionName(watcher, path);
1178 zkEventWorkersSubmit(new RegionRunnable() {
1179 @Override
1180 public String getRegionName() {
1181 return regionName;
1182 }
1183
1184 @Override
1185 public void run() {
1186 Lock lock = locker.acquireLock(regionName);
1187 try {
1188 RegionState rs = regionStates.getRegionTransitionState(regionName);
1189 if (rs == null) {
1190 rs = regionStates.getRegionState(regionName);
1191 if (rs == null || !rs.isMergingNew()) {
1192
1193 return;
1194 }
1195 }
1196
1197 HRegionInfo regionInfo = rs.getRegion();
1198 String regionNameStr = regionInfo.getRegionNameAsString();
1199 LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1200 boolean disabled = getZKTable().isDisablingOrDisabledTable(regionInfo.getTable());
1201 ServerName serverName = rs.getServerName();
1202 if (serverManager.isServerOnline(serverName)) {
1203 if (rs.isOnServer(serverName)
1204 && (rs.isOpened() || rs.isSplitting())) {
1205 regionOnline(regionInfo, serverName);
1206 if (disabled) {
1207
1208 LOG.info("Opened " + regionNameStr
1209 + "but this table is disabled, triggering close of region");
1210 unassign(regionInfo);
1211 }
1212 } else if (rs.isMergingNew()) {
1213 synchronized (regionStates) {
1214 String p = regionInfo.getEncodedName();
1215 PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1216 if (regions != null) {
1217 onlineMergingRegion(disabled, regions.getFirst(), serverName);
1218 onlineMergingRegion(disabled, regions.getSecond(), serverName);
1219 }
1220 }
1221 }
1222 }
1223 } finally {
1224 lock.unlock();
1225 }
1226 }
1227
1228 private void onlineMergingRegion(boolean disabled,
1229 final HRegionInfo hri, final ServerName serverName) {
1230 RegionState regionState = regionStates.getRegionState(hri);
1231 if (regionState != null && regionState.isMerging()
1232 && regionState.isOnServer(serverName)) {
1233 regionOnline(regionState.getRegion(), serverName);
1234 if (disabled) {
1235 unassign(hri);
1236 }
1237 }
1238 }
1239 });
1240 }
1241 }
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255 @Override
1256 public void nodeChildrenChanged(String path) {
1257 if (path.equals(watcher.assignmentZNode)) {
1258 zkEventWorkers.submit(new Runnable() {
1259 @Override
1260 public void run() {
1261 try {
1262
1263 List<String> children =
1264 ZKUtil.listChildrenAndWatchForNewChildren(
1265 watcher, watcher.assignmentZNode);
1266 if (children != null) {
1267 Stat stat = new Stat();
1268 for (String child : children) {
1269
1270
1271
1272 if (!regionStates.isRegionInTransition(child)) {
1273 ZKAssign.getDataAndWatch(watcher, child, stat);
1274 }
1275 }
1276 }
1277 } catch (KeeperException e) {
1278 server.abort("Unexpected ZK exception reading unassigned children", e);
1279 }
1280 }
1281 });
1282 }
1283 }
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293 void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1294 numRegionsOpened.incrementAndGet();
1295 regionStates.regionOnline(regionInfo, sn);
1296
1297
1298 clearRegionPlan(regionInfo);
1299
1300 addToServersInUpdatingTimer(sn);
1301 balancer.regionOnline(regionInfo, sn);
1302
1303
1304 sendRegionOpenedNotification(regionInfo, sn);
1305 }
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315 private void handleAssignmentEvent(final String path) {
1316 if (path.startsWith(watcher.assignmentZNode)) {
1317 final String regionName = ZKAssign.getRegionName(watcher, path);
1318
1319 zkEventWorkersSubmit(new RegionRunnable() {
1320 @Override
1321 public String getRegionName() {
1322 return regionName;
1323 }
1324
1325 @Override
1326 public void run() {
1327 try {
1328 Stat stat = new Stat();
1329 byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1330 if (data == null) return;
1331
1332 RegionTransition rt = RegionTransition.parseFrom(data);
1333 handleRegion(rt, stat.getVersion());
1334 } catch (KeeperException e) {
1335 server.abort("Unexpected ZK exception reading unassigned node data", e);
1336 } catch (DeserializationException e) {
1337 server.abort("Unexpected exception deserializing node data", e);
1338 }
1339 }
1340 });
1341 }
1342 }
1343
1344
1345
1346
1347
1348
1349 private void addToServersInUpdatingTimer(final ServerName sn) {
1350 if (tomActivated){
1351 this.serversInUpdatingTimer.add(sn);
1352 }
1353 }
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368 private void updateTimers(final ServerName sn) {
1369 Preconditions.checkState(tomActivated);
1370 if (sn == null) return;
1371
1372
1373
1374
1375
1376 List<Map.Entry<String, RegionPlan>> rps;
1377 synchronized(this.regionPlans) {
1378 rps = new ArrayList<Map.Entry<String, RegionPlan>>(regionPlans.entrySet());
1379 }
1380
1381 for (Map.Entry<String, RegionPlan> e : rps) {
1382 if (e.getValue() != null && e.getKey() != null && sn.equals(e.getValue().getDestination())) {
1383 RegionState regionState = regionStates.getRegionTransitionState(e.getKey());
1384 if (regionState != null) {
1385 regionState.updateTimestampToNow();
1386 }
1387 }
1388 }
1389 }
1390
1391
1392
1393
1394
1395
1396
1397
1398 public void regionOffline(final HRegionInfo regionInfo) {
1399 regionOffline(regionInfo, null);
1400 }
1401
1402 public void offlineDisabledRegion(HRegionInfo regionInfo) {
1403
1404 LOG.debug("Table being disabled so deleting ZK node and removing from " +
1405 "regions in transition, skipping assignment of region " +
1406 regionInfo.getRegionNameAsString());
1407 String encodedName = regionInfo.getEncodedName();
1408 deleteNodeInStates(encodedName, "closed", null,
1409 EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1410 regionOffline(regionInfo);
1411 }
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433 public void assign(HRegionInfo region, boolean setOfflineInZK) {
1434 assign(region, setOfflineInZK, false);
1435 }
1436
1437
1438
1439
1440 public void assign(HRegionInfo region,
1441 boolean setOfflineInZK, boolean forceNewPlan) {
1442 if (isDisabledorDisablingRegionInRIT(region)) {
1443 return;
1444 }
1445 if (this.serverManager.isClusterShutdown()) {
1446 LOG.info("Cluster shutdown is set; skipping assign of " +
1447 region.getRegionNameAsString());
1448 return;
1449 }
1450 String encodedName = region.getEncodedName();
1451 Lock lock = locker.acquireLock(encodedName);
1452 try {
1453 RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1454 if (state != null) {
1455 if (regionStates.wasRegionOnDeadServer(encodedName)) {
1456 LOG.info("Skip assigning " + region.getRegionNameAsString()
1457 + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1458 + " is dead but not processed yet");
1459 return;
1460 }
1461 assign(state, setOfflineInZK, forceNewPlan);
1462 }
1463 } finally {
1464 lock.unlock();
1465 }
1466 }
1467
1468
1469
1470
1471
1472
1473
1474 boolean assign(final ServerName destination, final List<HRegionInfo> regions) {
1475 long startTime = EnvironmentEdgeManager.currentTimeMillis();
1476 try {
1477 int regionCount = regions.size();
1478 if (regionCount == 0) {
1479 return true;
1480 }
1481 LOG.debug("Assigning " + regionCount + " region(s) to " + destination.toString());
1482 Set<String> encodedNames = new HashSet<String>(regionCount);
1483 for (HRegionInfo region : regions) {
1484 encodedNames.add(region.getEncodedName());
1485 }
1486
1487 List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1488 Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1489 try {
1490 AtomicInteger counter = new AtomicInteger(0);
1491 Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1492 OfflineCallback cb = new OfflineCallback(
1493 watcher, destination, counter, offlineNodesVersions);
1494 Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1495 List<RegionState> states = new ArrayList<RegionState>(regions.size());
1496 for (HRegionInfo region : regions) {
1497 String encodedName = region.getEncodedName();
1498 if (!isDisabledorDisablingRegionInRIT(region)) {
1499 RegionState state = forceRegionStateToOffline(region, false);
1500 boolean onDeadServer = false;
1501 if (state != null) {
1502 if (regionStates.wasRegionOnDeadServer(encodedName)) {
1503 LOG.info("Skip assigning " + region.getRegionNameAsString()
1504 + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1505 + " is dead but not processed yet");
1506 onDeadServer = true;
1507 } else if (asyncSetOfflineInZooKeeper(state, cb, destination)) {
1508 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1509 plans.put(encodedName, plan);
1510 states.add(state);
1511 continue;
1512 }
1513 }
1514
1515 if (!onDeadServer) {
1516 LOG.info("failed to force region state to offline or "
1517 + "failed to set it offline in ZK, will reassign later: " + region);
1518 failedToOpenRegions.add(region);
1519 }
1520 }
1521
1522
1523 Lock lock = locks.remove(encodedName);
1524 lock.unlock();
1525 }
1526
1527
1528 int total = states.size();
1529 for (int oldCounter = 0; !server.isStopped();) {
1530 int count = counter.get();
1531 if (oldCounter != count) {
1532 LOG.info(destination.toString() + " unassigned znodes=" + count +
1533 " of total=" + total);
1534 oldCounter = count;
1535 }
1536 if (count >= total) break;
1537 Threads.sleep(5);
1538 }
1539
1540 if (server.isStopped()) {
1541 return false;
1542 }
1543
1544
1545
1546 this.addPlans(plans);
1547
1548 List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1549 new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1550 for (RegionState state: states) {
1551 HRegionInfo region = state.getRegion();
1552 String encodedRegionName = region.getEncodedName();
1553 Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1554 if (nodeVersion == null || nodeVersion == -1) {
1555 LOG.warn("failed to offline in zookeeper: " + region);
1556 failedToOpenRegions.add(region);
1557 Lock lock = locks.remove(encodedRegionName);
1558 lock.unlock();
1559 } else {
1560 regionStates.updateRegionState(
1561 region, State.PENDING_OPEN, destination);
1562 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1563 if (this.shouldAssignRegionsWithFavoredNodes) {
1564 favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1565 }
1566 regionOpenInfos.add(new Triple<HRegionInfo, Integer, List<ServerName>>(
1567 region, nodeVersion, favoredNodes));
1568 }
1569 }
1570
1571
1572 try {
1573
1574
1575 long maxWaitTime = System.currentTimeMillis() +
1576 this.server.getConfiguration().
1577 getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1578 for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1579 try {
1580 List<RegionOpeningState> regionOpeningStateList = serverManager
1581 .sendRegionOpen(destination, regionOpenInfos);
1582 if (regionOpeningStateList == null) {
1583
1584 return false;
1585 }
1586 for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1587 RegionOpeningState openingState = regionOpeningStateList.get(k);
1588 if (openingState != RegionOpeningState.OPENED) {
1589 HRegionInfo region = regionOpenInfos.get(k).getFirst();
1590 if (openingState == RegionOpeningState.ALREADY_OPENED) {
1591 processAlreadyOpenedRegion(region, destination);
1592 } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1593
1594 failedToOpenRegions.add(region);
1595 } else {
1596 LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1597 + openingState + " in assigning region " + region);
1598 }
1599 }
1600 }
1601 break;
1602 } catch (IOException e) {
1603 if (e instanceof RemoteException) {
1604 e = ((RemoteException)e).unwrapRemoteException();
1605 }
1606 if (e instanceof RegionServerStoppedException) {
1607 LOG.warn("The region server was shut down, ", e);
1608
1609 return false;
1610 } else if (e instanceof ServerNotRunningYetException) {
1611 long now = System.currentTimeMillis();
1612 if (now < maxWaitTime) {
1613 LOG.debug("Server is not yet up; waiting up to " +
1614 (maxWaitTime - now) + "ms", e);
1615 Thread.sleep(100);
1616 i--;
1617 continue;
1618 }
1619 } else if (e instanceof java.net.SocketTimeoutException
1620 && this.serverManager.isServerOnline(destination)) {
1621
1622
1623
1624
1625 if (LOG.isDebugEnabled()) {
1626 LOG.debug("Bulk assigner openRegion() to " + destination
1627 + " has timed out, but the regions might"
1628 + " already be opened on it.", e);
1629 }
1630
1631 Thread.sleep(100);
1632 i--;
1633 continue;
1634 }
1635 throw e;
1636 }
1637 }
1638 } catch (IOException e) {
1639
1640 LOG.info("Unable to communicate with " + destination
1641 + " in order to assign regions, ", e);
1642 return false;
1643 } catch (InterruptedException e) {
1644 throw new RuntimeException(e);
1645 }
1646 } finally {
1647 for (Lock lock : locks.values()) {
1648 lock.unlock();
1649 }
1650 }
1651
1652 if (!failedToOpenRegions.isEmpty()) {
1653 for (HRegionInfo region : failedToOpenRegions) {
1654 if (!regionStates.isRegionOnline(region)) {
1655 invokeAssign(region);
1656 }
1657 }
1658 }
1659 LOG.debug("Bulk assigning done for " + destination);
1660 return true;
1661 } finally {
1662 metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
1663 }
1664 }
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676 private void unassign(final HRegionInfo region,
1677 final RegionState state, final int versionOfClosingNode,
1678 final ServerName dest, final boolean transitionInZK,
1679 final ServerName src) {
1680 ServerName server = src;
1681 if (state != null) {
1682 server = state.getServerName();
1683 }
1684 long maxWaitTime = -1;
1685 for (int i = 1; i <= this.maximumAttempts; i++) {
1686 if (this.server.isStopped() || this.server.isAborted()) {
1687 LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1688 return;
1689 }
1690
1691 if (!serverManager.isServerOnline(server)) {
1692 LOG.debug("Offline " + region.getRegionNameAsString()
1693 + ", no need to unassign since it's on a dead server: " + server);
1694 if (transitionInZK) {
1695
1696 deleteClosingOrClosedNode(region, server);
1697 }
1698 if (state != null) {
1699 regionOffline(region);
1700 }
1701 return;
1702 }
1703 try {
1704
1705 if (serverManager.sendRegionClose(server, region,
1706 versionOfClosingNode, dest, transitionInZK)) {
1707 LOG.debug("Sent CLOSE to " + server + " for region " +
1708 region.getRegionNameAsString());
1709 if (!transitionInZK && state != null) {
1710
1711
1712 unassign(region, state, versionOfClosingNode,
1713 dest, transitionInZK,src);
1714 }
1715 return;
1716 }
1717
1718
1719 LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1720 region.getRegionNameAsString());
1721 } catch (Throwable t) {
1722 if (t instanceof RemoteException) {
1723 t = ((RemoteException)t).unwrapRemoteException();
1724 }
1725 boolean logRetries = true;
1726 if (t instanceof NotServingRegionException
1727 || t instanceof RegionServerStoppedException
1728 || t instanceof ServerNotRunningYetException) {
1729 LOG.debug("Offline " + region.getRegionNameAsString()
1730 + ", it's not any more on " + server, t);
1731 if (transitionInZK) {
1732 deleteClosingOrClosedNode(region, server);
1733 }
1734 if (state != null) {
1735 regionOffline(region);
1736 }
1737 return;
1738 } else if ((t instanceof FailedServerException) || (state != null &&
1739 t instanceof RegionAlreadyInTransitionException)) {
1740 long sleepTime = 0;
1741 Configuration conf = this.server.getConfiguration();
1742 if(t instanceof FailedServerException) {
1743 sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1744 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1745 } else {
1746
1747 LOG.debug("update " + state + " the timestamp.");
1748 state.updateTimestampToNow();
1749 if (maxWaitTime < 0) {
1750 maxWaitTime =
1751 EnvironmentEdgeManager.currentTimeMillis()
1752 + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
1753 DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1754 }
1755 long now = EnvironmentEdgeManager.currentTimeMillis();
1756 if (now < maxWaitTime) {
1757 LOG.debug("Region is already in transition; "
1758 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1759 sleepTime = 100;
1760 i--;
1761 logRetries = false;
1762 }
1763 }
1764 try {
1765 if (sleepTime > 0) {
1766 Thread.sleep(sleepTime);
1767 }
1768 } catch (InterruptedException ie) {
1769 LOG.warn("Failed to unassign "
1770 + region.getRegionNameAsString() + " since interrupted", ie);
1771 Thread.currentThread().interrupt();
1772 if (!tomActivated && state != null) {
1773 regionStates.updateRegionState(region, State.FAILED_CLOSE);
1774 }
1775 return;
1776 }
1777 }
1778
1779 if (logRetries) {
1780 LOG.info("Server " + server + " returned " + t + " for "
1781 + region.getRegionNameAsString() + ", try=" + i
1782 + " of " + this.maximumAttempts, t);
1783
1784 }
1785 }
1786 }
1787
1788 if (!tomActivated && state != null) {
1789 regionStates.updateRegionState(region, State.FAILED_CLOSE);
1790 }
1791 }
1792
1793
1794
1795
1796 private RegionState forceRegionStateToOffline(
1797 final HRegionInfo region, final boolean forceNewPlan) {
1798 RegionState state = regionStates.getRegionState(region);
1799 if (state == null) {
1800 LOG.warn("Assigning a region not in region states: " + region);
1801 state = regionStates.createRegionState(region);
1802 }
1803
1804 ServerName sn = state.getServerName();
1805 if (forceNewPlan && LOG.isDebugEnabled()) {
1806 LOG.debug("Force region state offline " + state);
1807 }
1808
1809 switch (state.getState()) {
1810 case OPEN:
1811 case OPENING:
1812 case PENDING_OPEN:
1813 case CLOSING:
1814 case PENDING_CLOSE:
1815 if (!forceNewPlan) {
1816 LOG.debug("Skip assigning " +
1817 region + ", it is already " + state);
1818 return null;
1819 }
1820 case FAILED_CLOSE:
1821 case FAILED_OPEN:
1822 unassign(region, state, -1, null, false, null);
1823 state = regionStates.getRegionState(region);
1824 if (state.isFailedClose()) {
1825
1826
1827 LOG.info("Skip assigning " +
1828 region + ", we couldn't close it: " + state);
1829 return null;
1830 }
1831 case OFFLINE:
1832
1833
1834
1835
1836
1837 if (regionStates.isServerDeadAndNotProcessed(sn)
1838 && wasRegionOnDeadServerByMeta(region, sn)) {
1839 LOG.info("Skip assigning " + region.getRegionNameAsString()
1840 + ", it is on a dead but not processed yet server");
1841 return null;
1842 }
1843 case CLOSED:
1844 break;
1845 default:
1846 LOG.error("Trying to assign region " + region
1847 + ", which is " + state);
1848 return null;
1849 }
1850 return state;
1851 }
1852
1853 private boolean wasRegionOnDeadServerByMeta(
1854 final HRegionInfo region, final ServerName sn) {
1855 try {
1856 if (region.isMetaRegion()) {
1857 ServerName server = catalogTracker.getMetaLocation();
1858 return regionStates.isServerDeadAndNotProcessed(server);
1859 }
1860 while (!server.isStopped()) {
1861 try {
1862 catalogTracker.waitForMeta();
1863 Pair<HRegionInfo, ServerName> r =
1864 MetaReader.getRegion(catalogTracker, region.getRegionName());
1865 ServerName server = r == null ? null : r.getSecond();
1866 return regionStates.isServerDeadAndNotProcessed(server);
1867 } catch (IOException ioe) {
1868 LOG.info("Received exception accessing hbase:meta during force assign "
1869 + region.getRegionNameAsString() + ", retrying", ioe);
1870 }
1871 }
1872 } catch (InterruptedException e) {
1873 Thread.currentThread().interrupt();
1874 LOG.info("Interrupted accessing hbase:meta", e);
1875 }
1876
1877 return regionStates.isServerDeadAndNotProcessed(sn);
1878 }
1879
1880
1881
1882
1883
1884
1885
1886 private void assign(RegionState state,
1887 final boolean setOfflineInZK, final boolean forceNewPlan) {
1888 long startTime = EnvironmentEdgeManager.currentTimeMillis();
1889 try {
1890 Configuration conf = server.getConfiguration();
1891 RegionState currentState = state;
1892 int versionOfOfflineNode = -1;
1893 RegionPlan plan = null;
1894 long maxWaitTime = -1;
1895 HRegionInfo region = state.getRegion();
1896 RegionOpeningState regionOpenState;
1897 Throwable previousException = null;
1898 for (int i = 1; i <= maximumAttempts; i++) {
1899 if (server.isStopped() || server.isAborted()) {
1900 LOG.info("Skip assigning " + region.getRegionNameAsString()
1901 + ", the server is stopped/aborted");
1902 return;
1903 }
1904 if (plan == null) {
1905 try {
1906 plan = getRegionPlan(region, forceNewPlan);
1907 } catch (HBaseIOException e) {
1908 LOG.warn("Failed to get region plan", e);
1909 }
1910 }
1911 if (plan == null) {
1912 LOG.warn("Unable to determine a plan to assign " + region);
1913 if (tomActivated){
1914 this.timeoutMonitor.setAllRegionServersOffline(true);
1915 } else {
1916 if (region.isMetaRegion()) {
1917 try {
1918 Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
1919 if (i == maximumAttempts) i = 1;
1920 continue;
1921 } catch (InterruptedException e) {
1922 LOG.error("Got exception while waiting for hbase:meta assignment");
1923 Thread.currentThread().interrupt();
1924 }
1925 }
1926 regionStates.updateRegionState(region, State.FAILED_OPEN);
1927 }
1928 return;
1929 }
1930 if (setOfflineInZK && versionOfOfflineNode == -1) {
1931
1932
1933 versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
1934 if (versionOfOfflineNode != -1) {
1935 if (isDisabledorDisablingRegionInRIT(region)) {
1936 return;
1937 }
1938
1939
1940
1941
1942
1943
1944 TableName tableName = region.getTable();
1945 if (!zkTable.isEnablingTable(tableName) && !zkTable.isEnabledTable(tableName)) {
1946 LOG.debug("Setting table " + tableName + " to ENABLED state.");
1947 setEnabledTable(tableName);
1948 }
1949 }
1950 }
1951 if (setOfflineInZK && versionOfOfflineNode == -1) {
1952 LOG.info("Unable to set offline in ZooKeeper to assign " + region);
1953
1954
1955
1956
1957 if (!server.isAborted()) {
1958 continue;
1959 }
1960 }
1961 LOG.info("Assigning " + region.getRegionNameAsString() +
1962 " to " + plan.getDestination().toString());
1963
1964 currentState = regionStates.updateRegionState(region,
1965 State.PENDING_OPEN, plan.getDestination());
1966
1967 boolean needNewPlan;
1968 final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
1969 " to " + plan.getDestination();
1970 try {
1971 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1972 if (this.shouldAssignRegionsWithFavoredNodes) {
1973 favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1974 }
1975 regionOpenState = serverManager.sendRegionOpen(
1976 plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
1977
1978 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
1979
1980 needNewPlan = true;
1981 LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
1982 " trying to assign elsewhere instead; " +
1983 "try=" + i + " of " + this.maximumAttempts);
1984 } else {
1985
1986 if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
1987 processAlreadyOpenedRegion(region, plan.getDestination());
1988 }
1989 return;
1990 }
1991
1992 } catch (Throwable t) {
1993 if (t instanceof RemoteException) {
1994 t = ((RemoteException) t).unwrapRemoteException();
1995 }
1996 previousException = t;
1997
1998
1999
2000
2001 boolean hold = (t instanceof RegionAlreadyInTransitionException ||
2002 t instanceof ServerNotRunningYetException);
2003
2004
2005
2006
2007
2008
2009 boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
2010 && this.serverManager.isServerOnline(plan.getDestination()));
2011
2012
2013 if (hold) {
2014 LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
2015 "try=" + i + " of " + this.maximumAttempts, t);
2016
2017 if (maxWaitTime < 0) {
2018 if (t instanceof RegionAlreadyInTransitionException) {
2019 maxWaitTime = EnvironmentEdgeManager.currentTimeMillis()
2020 + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
2021 DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
2022 } else {
2023 maxWaitTime = this.server.getConfiguration().
2024 getLong("hbase.regionserver.rpc.startup.waittime", 60000);
2025 }
2026 }
2027 try {
2028 needNewPlan = false;
2029 long now = EnvironmentEdgeManager.currentTimeMillis();
2030 if (now < maxWaitTime) {
2031 LOG.debug("Server is not yet up or region is already in transition; "
2032 + "waiting up to " + (maxWaitTime - now) + "ms", t);
2033 Thread.sleep(100);
2034 i--;
2035 } else if (!(t instanceof RegionAlreadyInTransitionException)) {
2036 LOG.debug("Server is not up for a while; try a new one", t);
2037 needNewPlan = true;
2038 }
2039 } catch (InterruptedException ie) {
2040 LOG.warn("Failed to assign "
2041 + region.getRegionNameAsString() + " since interrupted", ie);
2042 Thread.currentThread().interrupt();
2043 if (!tomActivated) {
2044 regionStates.updateRegionState(region, State.FAILED_OPEN);
2045 }
2046 return;
2047 }
2048 } else if (retry) {
2049 needNewPlan = false;
2050 i--;
2051 LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
2052 } else {
2053 needNewPlan = true;
2054 LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2055 " try=" + i + " of " + this.maximumAttempts, t);
2056 }
2057 }
2058
2059 if (i == this.maximumAttempts) {
2060
2061
2062 continue;
2063 }
2064
2065
2066
2067
2068 if (needNewPlan) {
2069
2070
2071
2072
2073 RegionPlan newPlan = null;
2074 try {
2075 newPlan = getRegionPlan(region, true);
2076 } catch (HBaseIOException e) {
2077 LOG.warn("Failed to get region plan", e);
2078 }
2079 if (newPlan == null) {
2080 if (tomActivated) {
2081 this.timeoutMonitor.setAllRegionServersOffline(true);
2082 } else {
2083 regionStates.updateRegionState(region, State.FAILED_OPEN);
2084 }
2085 LOG.warn("Unable to find a viable location to assign region " +
2086 region.getRegionNameAsString());
2087 return;
2088 }
2089
2090 if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2091
2092
2093
2094 currentState = regionStates.updateRegionState(region, State.OFFLINE);
2095 versionOfOfflineNode = -1;
2096 plan = newPlan;
2097 } else if(plan.getDestination().equals(newPlan.getDestination()) &&
2098 previousException instanceof FailedServerException) {
2099 try {
2100 LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
2101 " to the same failed server.");
2102 Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
2103 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
2104 } catch (InterruptedException ie) {
2105 LOG.warn("Failed to assign "
2106 + region.getRegionNameAsString() + " since interrupted", ie);
2107 Thread.currentThread().interrupt();
2108 if (!tomActivated) {
2109 regionStates.updateRegionState(region, State.FAILED_OPEN);
2110 }
2111 return;
2112 }
2113 }
2114 }
2115 }
2116
2117 if (!tomActivated) {
2118 regionStates.updateRegionState(region, State.FAILED_OPEN);
2119 }
2120 } finally {
2121 metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
2122 }
2123 }
2124
2125 private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2126
2127
2128
2129 LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2130 + " to " + sn);
2131 String encodedName = region.getEncodedName();
2132 deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2133 regionStates.regionOnline(region, sn);
2134 }
2135
2136 private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2137 TableName tableName = region.getTable();
2138 boolean disabled = this.zkTable.isDisabledTable(tableName);
2139 if (disabled || this.zkTable.isDisablingTable(tableName)) {
2140 LOG.info("Table " + tableName + (disabled ? " disabled;" : " disabling;") +
2141 " skipping assign of " + region.getRegionNameAsString());
2142 offlineDisabledRegion(region);
2143 return true;
2144 }
2145 return false;
2146 }
2147
2148
2149
2150
2151
2152
2153
2154
2155 private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2156 if (!state.isClosed() && !state.isOffline()) {
2157 String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2158 this.server.abort(msg, new IllegalStateException(msg));
2159 return -1;
2160 }
2161 regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2162 int versionOfOfflineNode;
2163 try {
2164
2165 versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2166 state.getRegion(), destination);
2167 if (versionOfOfflineNode == -1) {
2168 LOG.warn("Attempted to create/force node into OFFLINE state before "
2169 + "completing assignment but failed to do so for " + state);
2170 return -1;
2171 }
2172 } catch (KeeperException e) {
2173 server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2174 return -1;
2175 }
2176 return versionOfOfflineNode;
2177 }
2178
2179
2180
2181
2182
2183
2184 private RegionPlan getRegionPlan(final HRegionInfo region,
2185 final boolean forceNewPlan) throws HBaseIOException {
2186 return getRegionPlan(region, null, forceNewPlan);
2187 }
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198 private RegionPlan getRegionPlan(final HRegionInfo region,
2199 final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
2200
2201 final String encodedName = region.getEncodedName();
2202 final List<ServerName> destServers =
2203 serverManager.createDestinationServersList(serverToExclude);
2204
2205 if (destServers.isEmpty()){
2206 LOG.warn("Can't move " + encodedName +
2207 ", there is no destination server available.");
2208 return null;
2209 }
2210
2211 RegionPlan randomPlan = null;
2212 boolean newPlan = false;
2213 RegionPlan existingPlan;
2214
2215 synchronized (this.regionPlans) {
2216 existingPlan = this.regionPlans.get(encodedName);
2217
2218 if (existingPlan != null && existingPlan.getDestination() != null) {
2219 LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2220 + " destination server is " + existingPlan.getDestination() +
2221 " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2222 }
2223
2224 if (forceNewPlan
2225 || existingPlan == null
2226 || existingPlan.getDestination() == null
2227 || !destServers.contains(existingPlan.getDestination())) {
2228 newPlan = true;
2229 randomPlan = new RegionPlan(region, null,
2230 balancer.randomAssignment(region, destServers));
2231 if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2232 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2233 regions.add(region);
2234 try {
2235 processFavoredNodes(regions);
2236 } catch (IOException ie) {
2237 LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2238 }
2239 }
2240 this.regionPlans.put(encodedName, randomPlan);
2241 }
2242 }
2243
2244 if (newPlan) {
2245 if (randomPlan.getDestination() == null) {
2246 LOG.warn("Can't find a destination for " + encodedName);
2247 return null;
2248 }
2249 LOG.debug("No previous transition plan found (or ignoring " +
2250 "an existing plan) for " + region.getRegionNameAsString() +
2251 "; generated random plan=" + randomPlan + "; " +
2252 serverManager.countOfRegionServers() +
2253 " (online=" + serverManager.getOnlineServers().size() +
2254 ", available=" + destServers.size() + ") available servers" +
2255 ", forceNewPlan=" + forceNewPlan);
2256 return randomPlan;
2257 }
2258 LOG.debug("Using pre-existing plan for " +
2259 region.getRegionNameAsString() + "; plan=" + existingPlan);
2260 return existingPlan;
2261 }
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276 public void unassign(HRegionInfo region) {
2277 unassign(region, false);
2278 }
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295 public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2296
2297 LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2298 + " (offlining), current state: " + regionStates.getRegionState(region));
2299
2300 String encodedName = region.getEncodedName();
2301
2302 int versionOfClosingNode = -1;
2303
2304
2305 ReentrantLock lock = locker.acquireLock(encodedName);
2306 RegionState state = regionStates.getRegionTransitionState(encodedName);
2307 boolean reassign = true;
2308 try {
2309 if (state == null) {
2310
2311
2312 state = regionStates.getRegionState(encodedName);
2313 if (state != null && state.isUnassignable()) {
2314 LOG.info("Attempting to unassign " + state + ", ignored");
2315
2316 return;
2317 }
2318
2319 try {
2320 if (state == null || state.getServerName() == null) {
2321
2322
2323 LOG.warn("Attempting to unassign a region not in RegionStates"
2324 + region.getRegionNameAsString() + ", offlined");
2325 regionOffline(region);
2326 return;
2327 }
2328 versionOfClosingNode = ZKAssign.createNodeClosing(
2329 watcher, region, state.getServerName());
2330 if (versionOfClosingNode == -1) {
2331 LOG.info("Attempting to unassign " +
2332 region.getRegionNameAsString() + " but ZK closing node "
2333 + "can't be created.");
2334 reassign = false;
2335 return;
2336 }
2337 } catch (KeeperException e) {
2338 if (e instanceof NodeExistsException) {
2339
2340
2341
2342
2343 NodeExistsException nee = (NodeExistsException)e;
2344 String path = nee.getPath();
2345 try {
2346 if (isSplitOrSplittingOrMergedOrMerging(path)) {
2347 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2348 "skipping unassign because region no longer exists -- its split or merge");
2349 reassign = false;
2350 return;
2351 }
2352 } catch (KeeperException.NoNodeException ke) {
2353 LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2354 "; presuming split and that the region to unassign, " +
2355 encodedName + ", no longer exists -- confirm", ke);
2356 return;
2357 } catch (KeeperException ke) {
2358 LOG.error("Unexpected zk state", ke);
2359 } catch (DeserializationException de) {
2360 LOG.error("Failed parse", de);
2361 }
2362 }
2363
2364 server.abort("Unexpected ZK exception creating node CLOSING", e);
2365 reassign = false;
2366 return;
2367 }
2368 state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2369 } else if (state.isFailedOpen()) {
2370
2371 regionOffline(region);
2372 return;
2373 } else if (force && state.isPendingCloseOrClosing()) {
2374 LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2375 " which is already " + state.getState() +
2376 " but forcing to send a CLOSE RPC again ");
2377 if (state.isFailedClose()) {
2378 state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2379 }
2380 state.updateTimestampToNow();
2381 } else {
2382 LOG.debug("Attempting to unassign " +
2383 region.getRegionNameAsString() + " but it is " +
2384 "already in transition (" + state.getState() + ", force=" + force + ")");
2385 return;
2386 }
2387
2388 unassign(region, state, versionOfClosingNode, dest, true, null);
2389 } finally {
2390 lock.unlock();
2391
2392
2393 if (reassign && regionStates.isRegionOffline(region)) {
2394 assign(region, true);
2395 }
2396 }
2397 }
2398
2399 public void unassign(HRegionInfo region, boolean force){
2400 unassign(region, force, null);
2401 }
2402
2403
2404
2405
2406 public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2407 String encodedName = region.getEncodedName();
2408 deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2409 EventType.RS_ZK_REGION_CLOSED);
2410 }
2411
2412
2413
2414
2415
2416
2417
2418 private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2419 throws KeeperException, DeserializationException {
2420 boolean result = false;
2421
2422
2423 byte [] data = ZKAssign.getData(watcher, path);
2424 if (data == null) {
2425 LOG.info("Node " + path + " is gone");
2426 return false;
2427 }
2428 RegionTransition rt = RegionTransition.parseFrom(data);
2429 switch (rt.getEventType()) {
2430 case RS_ZK_REQUEST_REGION_SPLIT:
2431 case RS_ZK_REGION_SPLIT:
2432 case RS_ZK_REGION_SPLITTING:
2433 case RS_ZK_REQUEST_REGION_MERGE:
2434 case RS_ZK_REGION_MERGED:
2435 case RS_ZK_REGION_MERGING:
2436 result = true;
2437 break;
2438 default:
2439 LOG.info("Node " + path + " is in " + rt.getEventType());
2440 break;
2441 }
2442 return result;
2443 }
2444
2445
2446
2447
2448
2449
2450 public int getNumRegionsOpened() {
2451 return numRegionsOpened.get();
2452 }
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462 public boolean waitForAssignment(HRegionInfo regionInfo)
2463 throws InterruptedException {
2464 while (!regionStates.isRegionOnline(regionInfo)) {
2465 if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
2466 || this.server.isStopped()) {
2467 return false;
2468 }
2469
2470
2471
2472
2473 regionStates.waitForUpdate(100);
2474 }
2475 return true;
2476 }
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488 public void assignMeta() throws KeeperException {
2489 MetaRegionTracker.deleteMetaLocation(this.watcher);
2490 assign(HRegionInfo.FIRST_META_REGIONINFO, true);
2491 }
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501 public void assign(Map<HRegionInfo, ServerName> regions)
2502 throws IOException, InterruptedException {
2503 if (regions == null || regions.isEmpty()) {
2504 return;
2505 }
2506 List<ServerName> servers = serverManager.createDestinationServersList();
2507 if (servers == null || servers.isEmpty()) {
2508 throw new IOException("Found no destination server to assign region(s)");
2509 }
2510
2511
2512 Map<ServerName, List<HRegionInfo>> bulkPlan =
2513 balancer.retainAssignment(regions, servers);
2514
2515 assign(regions.size(), servers.size(),
2516 "retainAssignment=true", bulkPlan);
2517 }
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527 public void assign(List<HRegionInfo> regions)
2528 throws IOException, InterruptedException {
2529 if (regions == null || regions.isEmpty()) {
2530 return;
2531 }
2532
2533 List<ServerName> servers = serverManager.createDestinationServersList();
2534 if (servers == null || servers.isEmpty()) {
2535 throw new IOException("Found no destination server to assign region(s)");
2536 }
2537
2538
2539 Map<ServerName, List<HRegionInfo>> bulkPlan
2540 = balancer.roundRobinAssignment(regions, servers);
2541 processFavoredNodes(regions);
2542
2543 assign(regions.size(), servers.size(),
2544 "round-robin=true", bulkPlan);
2545 }
2546
2547 private void assign(int regions, int totalServers,
2548 String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2549 throws InterruptedException, IOException {
2550
2551 int servers = bulkPlan.size();
2552 if (servers == 1 || (regions < bulkAssignThresholdRegions
2553 && servers < bulkAssignThresholdServers)) {
2554
2555
2556
2557 if (LOG.isTraceEnabled()) {
2558 LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2559 " region(s) to " + servers + " server(s)");
2560 }
2561 for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2562 if (!assign(plan.getKey(), plan.getValue())) {
2563 for (HRegionInfo region: plan.getValue()) {
2564 if (!regionStates.isRegionOnline(region)) {
2565 invokeAssign(region);
2566 }
2567 }
2568 }
2569 }
2570 } else {
2571 LOG.info("Bulk assigning " + regions + " region(s) across "
2572 + totalServers + " server(s), " + message);
2573
2574
2575 BulkAssigner ba = new GeneralBulkAssigner(
2576 this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2577 ba.bulkAssign();
2578 LOG.info("Bulk assigning done");
2579 }
2580 }
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592 private void assignAllUserRegions()
2593 throws IOException, InterruptedException, KeeperException {
2594
2595 ZKAssign.deleteAllNodes(watcher);
2596 ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
2597 this.watcher.assignmentZNode);
2598 failoverCleanupDone();
2599
2600
2601
2602
2603 Set<TableName> disabledOrDisablingOrEnabling = ZKTable.getDisabledOrDisablingTables(watcher);
2604 disabledOrDisablingOrEnabling.addAll(ZKTable.getEnablingTables(watcher));
2605
2606 Map<HRegionInfo, ServerName> allRegions;
2607 SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
2608 new SnapshotOfRegionAssignmentFromMeta(catalogTracker, disabledOrDisablingOrEnabling, true);
2609 snapshotOfRegionAssignment.initialize();
2610 allRegions = snapshotOfRegionAssignment.getRegionToRegionServerMap();
2611 if (allRegions == null || allRegions.isEmpty()) return;
2612
2613
2614 boolean retainAssignment = server.getConfiguration().
2615 getBoolean("hbase.master.startup.retainassign", true);
2616
2617 if (retainAssignment) {
2618 assign(allRegions);
2619 } else {
2620 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(allRegions.keySet());
2621 assign(regions);
2622 }
2623
2624 for (HRegionInfo hri : allRegions.keySet()) {
2625 TableName tableName = hri.getTable();
2626 if (!zkTable.isEnabledTable(tableName)) {
2627 setEnabledTable(tableName);
2628 }
2629 }
2630 }
2631
2632
2633
2634
2635
2636
2637
2638 boolean waitUntilNoRegionsInTransition(final long timeout)
2639 throws InterruptedException {
2640
2641
2642
2643
2644
2645
2646 final long endTime = System.currentTimeMillis() + timeout;
2647
2648 while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2649 && endTime > System.currentTimeMillis()) {
2650 regionStates.waitForUpdate(100);
2651 }
2652
2653 return !regionStates.isRegionsInTransition();
2654 }
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665 Map<ServerName, List<HRegionInfo>> rebuildUserRegions() throws IOException, KeeperException {
2666 Set<TableName> enablingTables = ZKTable.getEnablingTables(watcher);
2667 Set<TableName> disabledOrEnablingTables = ZKTable.getDisabledTables(watcher);
2668 disabledOrEnablingTables.addAll(enablingTables);
2669 Set<TableName> disabledOrDisablingOrEnabling = ZKTable.getDisablingTables(watcher);
2670 disabledOrDisablingOrEnabling.addAll(disabledOrEnablingTables);
2671
2672
2673 List<Result> results = MetaReader.fullScan(this.catalogTracker);
2674
2675 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2676
2677 Map<ServerName, List<HRegionInfo>> offlineServers =
2678 new TreeMap<ServerName, List<HRegionInfo>>();
2679
2680 for (Result result : results) {
2681 Pair<HRegionInfo, ServerName> region = HRegionInfo.getHRegionInfoAndServerName(result);
2682 if (region == null) continue;
2683 HRegionInfo regionInfo = region.getFirst();
2684 ServerName regionLocation = region.getSecond();
2685 if (regionInfo == null) continue;
2686 regionStates.createRegionState(regionInfo);
2687 if (regionStates.isRegionInState(regionInfo, State.SPLIT)) {
2688
2689
2690 LOG.debug("Region " + regionInfo.getRegionNameAsString()
2691 + " split is completed. Hence need not add to regions list");
2692 continue;
2693 }
2694 TableName tableName = regionInfo.getTable();
2695 if (regionLocation == null) {
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706 if (!enablingTables.contains(tableName)) {
2707 LOG.warn("Region " + regionInfo.getEncodedName() +
2708 " has null regionLocation." + " But its table " + tableName +
2709 " isn't in ENABLING state.");
2710 }
2711 } else if (!onlineServers.contains(regionLocation)) {
2712
2713 List<HRegionInfo> offlineRegions = offlineServers.get(regionLocation);
2714 if (offlineRegions == null) {
2715 offlineRegions = new ArrayList<HRegionInfo>(1);
2716 offlineServers.put(regionLocation, offlineRegions);
2717 }
2718 offlineRegions.add(regionInfo);
2719
2720
2721 if (!disabledOrDisablingOrEnabling.contains(tableName)
2722 && !getZKTable().isEnabledTable(tableName)) {
2723 setEnabledTable(tableName);
2724 }
2725 } else {
2726
2727
2728 if (!disabledOrEnablingTables.contains(tableName)) {
2729 regionStates.updateRegionState(regionInfo, State.OPEN, regionLocation);
2730 regionStates.regionOnline(regionInfo, regionLocation);
2731 balancer.regionOnline(regionInfo, regionLocation);
2732 }
2733
2734
2735 if (!disabledOrDisablingOrEnabling.contains(tableName)
2736 && !getZKTable().isEnabledTable(tableName)) {
2737 setEnabledTable(tableName);
2738 }
2739 }
2740 }
2741 return offlineServers;
2742 }
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752 private void recoverTableInDisablingState()
2753 throws KeeperException, TableNotFoundException, IOException {
2754 Set<TableName> disablingTables = ZKTable.getDisablingTables(watcher);
2755 if (disablingTables.size() != 0) {
2756 for (TableName tableName : disablingTables) {
2757
2758 LOG.info("The table " + tableName
2759 + " is in DISABLING state. Hence recovering by moving the table"
2760 + " to DISABLED state.");
2761 new DisableTableHandler(this.server, tableName, catalogTracker,
2762 this, tableLockManager, true).prepare().process();
2763 }
2764 }
2765 }
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775 private void recoverTableInEnablingState()
2776 throws KeeperException, TableNotFoundException, IOException {
2777 Set<TableName> enablingTables = ZKTable.getEnablingTables(watcher);
2778 if (enablingTables.size() != 0) {
2779 for (TableName tableName : enablingTables) {
2780
2781 LOG.info("The table " + tableName
2782 + " is in ENABLING state. Hence recovering by moving the table"
2783 + " to ENABLED state.");
2784
2785
2786 EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
2787 catalogTracker, this, tableLockManager, true);
2788 try {
2789 eth.prepare();
2790 } catch (TableNotFoundException e) {
2791 LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
2792 continue;
2793 }
2794 eth.process();
2795 }
2796 }
2797 }
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814 private void processDeadServersAndRecoverLostRegions(
2815 Map<ServerName, List<HRegionInfo>> deadServers)
2816 throws IOException, KeeperException {
2817 if (deadServers != null) {
2818 for (Map.Entry<ServerName, List<HRegionInfo>> server: deadServers.entrySet()) {
2819 ServerName serverName = server.getKey();
2820
2821 regionStates.setLastRegionServerOfRegions(serverName, server.getValue());
2822 if (!serverManager.isServerDead(serverName)) {
2823 serverManager.expireServer(serverName);
2824 }
2825 }
2826 }
2827 List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(
2828 this.watcher, this.watcher.assignmentZNode);
2829 if (!nodes.isEmpty()) {
2830 for (String encodedRegionName : nodes) {
2831 processRegionInTransition(encodedRegionName, null);
2832 }
2833 }
2834
2835
2836
2837
2838
2839 failoverCleanupDone();
2840 }
2841
2842
2843
2844
2845
2846
2847
2848
2849 public void updateRegionsInTransitionMetrics() {
2850 long currentTime = System.currentTimeMillis();
2851 int totalRITs = 0;
2852 int totalRITsOverThreshold = 0;
2853 long oldestRITTime = 0;
2854 int ritThreshold = this.server.getConfiguration().
2855 getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
2856 for (RegionState state: regionStates.getRegionsInTransition().values()) {
2857 totalRITs++;
2858 long ritTime = currentTime - state.getStamp();
2859 if (ritTime > ritThreshold) {
2860 totalRITsOverThreshold++;
2861 }
2862 if (oldestRITTime < ritTime) {
2863 oldestRITTime = ritTime;
2864 }
2865 }
2866 if (this.metricsAssignmentManager != null) {
2867 this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
2868 this.metricsAssignmentManager.updateRITCount(totalRITs);
2869 this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
2870 }
2871 }
2872
2873
2874
2875
2876 void clearRegionPlan(final HRegionInfo region) {
2877 synchronized (this.regionPlans) {
2878 this.regionPlans.remove(region.getEncodedName());
2879 }
2880 }
2881
2882
2883
2884
2885
2886
2887 public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
2888 throws IOException, InterruptedException {
2889 waitOnRegionToClearRegionsInTransition(hri, -1L);
2890 }
2891
2892
2893
2894
2895
2896
2897
2898
2899 public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
2900 throws InterruptedException {
2901 if (!regionStates.isRegionInTransition(hri)) return true;
2902 long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTimeMillis()
2903 + timeOut;
2904
2905
2906 LOG.info("Waiting for " + hri.getEncodedName() +
2907 " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
2908 while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
2909 regionStates.waitForUpdate(100);
2910 if (EnvironmentEdgeManager.currentTimeMillis() > end) {
2911 LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
2912 return false;
2913 }
2914 }
2915 if (this.server.isStopped()) {
2916 LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
2917 return false;
2918 }
2919 return true;
2920 }
2921
2922
2923
2924
2925
2926 public class TimerUpdater extends Chore {
2927
2928 public TimerUpdater(final int period, final Stoppable stopper) {
2929 super("AssignmentTimerUpdater", period, stopper);
2930 }
2931
2932 @Override
2933 protected void chore() {
2934 Preconditions.checkState(tomActivated);
2935 ServerName serverToUpdateTimer = null;
2936 while (!serversInUpdatingTimer.isEmpty() && !stopper.isStopped()) {
2937 if (serverToUpdateTimer == null) {
2938 serverToUpdateTimer = serversInUpdatingTimer.first();
2939 } else {
2940 serverToUpdateTimer = serversInUpdatingTimer
2941 .higher(serverToUpdateTimer);
2942 }
2943 if (serverToUpdateTimer == null) {
2944 break;
2945 }
2946 updateTimers(serverToUpdateTimer);
2947 serversInUpdatingTimer.remove(serverToUpdateTimer);
2948 }
2949 }
2950 }
2951
2952
2953
2954
2955 public class TimeoutMonitor extends Chore {
2956 private boolean allRegionServersOffline = false;
2957 private ServerManager serverManager;
2958 private final int timeout;
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
2969 public TimeoutMonitor(final int period, final Stoppable stopper,
2970 ServerManager serverManager,
2971 final int timeout) {
2972 super("AssignmentTimeoutMonitor", period, stopper);
2973 this.timeout = timeout;
2974 this.serverManager = serverManager;
2975 }
2976
2977 private synchronized void setAllRegionServersOffline(
2978 boolean allRegionServersOffline) {
2979 this.allRegionServersOffline = allRegionServersOffline;
2980 }
2981
2982 @Override
2983 protected void chore() {
2984 Preconditions.checkState(tomActivated);
2985 boolean noRSAvailable = this.serverManager.createDestinationServersList().isEmpty();
2986
2987
2988 long now = System.currentTimeMillis();
2989
2990
2991 for (String regionName : regionStates.getRegionsInTransition().keySet()) {
2992 RegionState regionState = regionStates.getRegionTransitionState(regionName);
2993 if (regionState == null) continue;
2994
2995 if (regionState.getStamp() + timeout <= now) {
2996
2997 actOnTimeOut(regionState);
2998 } else if (this.allRegionServersOffline && !noRSAvailable) {
2999 RegionPlan existingPlan = regionPlans.get(regionName);
3000 if (existingPlan == null
3001 || !this.serverManager.isServerOnline(existingPlan
3002 .getDestination())) {
3003
3004
3005 actOnTimeOut(regionState);
3006 }
3007 }
3008 }
3009 setAllRegionServersOffline(noRSAvailable);
3010 }
3011
3012 private void actOnTimeOut(RegionState regionState) {
3013 HRegionInfo regionInfo = regionState.getRegion();
3014 LOG.info("Regions in transition timed out: " + regionState);
3015
3016 switch (regionState.getState()) {
3017 case CLOSED:
3018 LOG.info("Region " + regionInfo.getEncodedName()
3019 + " has been CLOSED for too long, waiting on queued "
3020 + "ClosedRegionHandler to run or server shutdown");
3021
3022 regionState.updateTimestampToNow();
3023 break;
3024 case OFFLINE:
3025 LOG.info("Region has been OFFLINE for too long, " + "reassigning "
3026 + regionInfo.getRegionNameAsString() + " to a random server");
3027 invokeAssign(regionInfo);
3028 break;
3029 case PENDING_OPEN:
3030 LOG.info("Region has been PENDING_OPEN for too "
3031 + "long, reassigning region=" + regionInfo.getRegionNameAsString());
3032 invokeAssign(regionInfo);
3033 break;
3034 case OPENING:
3035 processOpeningState(regionInfo);
3036 break;
3037 case OPEN:
3038 LOG.error("Region has been OPEN for too long, " +
3039 "we don't know where region was opened so can't do anything");
3040 regionState.updateTimestampToNow();
3041 break;
3042
3043 case PENDING_CLOSE:
3044 LOG.info("Region has been PENDING_CLOSE for too "
3045 + "long, running forced unassign again on region="
3046 + regionInfo.getRegionNameAsString());
3047 invokeUnassign(regionInfo);
3048 break;
3049 case CLOSING:
3050 LOG.info("Region has been CLOSING for too " +
3051 "long, this should eventually complete or the server will " +
3052 "expire, send RPC again");
3053 invokeUnassign(regionInfo);
3054 break;
3055
3056 case SPLIT:
3057 case SPLITTING:
3058 case FAILED_OPEN:
3059 case FAILED_CLOSE:
3060 case MERGING:
3061 break;
3062
3063 default:
3064 throw new IllegalStateException("Received event is not valid.");
3065 }
3066 }
3067 }
3068
3069 private void processOpeningState(HRegionInfo regionInfo) {
3070 LOG.info("Region has been OPENING for too long, reassigning region="
3071 + regionInfo.getRegionNameAsString());
3072
3073 try {
3074 String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName());
3075 Stat stat = new Stat();
3076 byte [] data = ZKAssign.getDataNoWatch(watcher, node, stat);
3077 if (data == null) {
3078 LOG.warn("Data is null, node " + node + " no longer exists");
3079 return;
3080 }
3081 RegionTransition rt = RegionTransition.parseFrom(data);
3082 EventType et = rt.getEventType();
3083 if (et == EventType.RS_ZK_REGION_OPENED) {
3084 LOG.debug("Region has transitioned to OPENED, allowing "
3085 + "watched event handlers to process");
3086 return;
3087 } else if (et != EventType.RS_ZK_REGION_OPENING && et != EventType.RS_ZK_REGION_FAILED_OPEN ) {
3088 LOG.warn("While timing out a region, found ZK node in unexpected state: " + et);
3089 return;
3090 }
3091 invokeAssign(regionInfo);
3092 } catch (KeeperException ke) {
3093 LOG.error("Unexpected ZK exception timing out CLOSING region", ke);
3094 } catch (DeserializationException e) {
3095 LOG.error("Unexpected exception parsing CLOSING region", e);
3096 }
3097 }
3098
3099 void invokeAssign(HRegionInfo regionInfo) {
3100 threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
3101 }
3102
3103 private void invokeUnassign(HRegionInfo regionInfo) {
3104 threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3105 }
3106
3107 public boolean isCarryingMeta(ServerName serverName) {
3108 return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3109 }
3110
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120
3121 private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3122 RegionTransition rt = null;
3123 try {
3124 byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3125
3126 rt = data == null? null: RegionTransition.parseFrom(data);
3127 } catch (KeeperException e) {
3128 server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3129 } catch (DeserializationException e) {
3130 server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3131 }
3132
3133 ServerName addressFromZK = rt != null? rt.getServerName(): null;
3134 if (addressFromZK != null) {
3135
3136 boolean matchZK = addressFromZK.equals(serverName);
3137 LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3138 " current=" + serverName + ", matches=" + matchZK);
3139 return matchZK;
3140 }
3141
3142 ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3143 boolean matchAM = (addressFromAM != null &&
3144 addressFromAM.equals(serverName));
3145 LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3146 " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3147 " server being checked: " + serverName);
3148
3149 return matchAM;
3150 }
3151
3152
3153
3154
3155
3156
3157 public List<HRegionInfo> processServerShutdown(final ServerName sn) {
3158
3159 synchronized (this.regionPlans) {
3160 for (Iterator <Map.Entry<String, RegionPlan>> i =
3161 this.regionPlans.entrySet().iterator(); i.hasNext();) {
3162 Map.Entry<String, RegionPlan> e = i.next();
3163 ServerName otherSn = e.getValue().getDestination();
3164
3165 if (otherSn != null && otherSn.equals(sn)) {
3166
3167 i.remove();
3168 }
3169 }
3170 }
3171 List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3172 for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3173 HRegionInfo hri = it.next();
3174 String encodedName = hri.getEncodedName();
3175
3176
3177 Lock lock = locker.acquireLock(encodedName);
3178 try {
3179 RegionState regionState =
3180 regionStates.getRegionTransitionState(encodedName);
3181 if (regionState == null
3182 || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3183 || !(regionState.isFailedClose() || regionState.isOffline()
3184 || regionState.isPendingOpenOrOpening())) {
3185 LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3186 + " on the dead server any more: " + sn);
3187 it.remove();
3188 } else {
3189 try {
3190
3191 ZKAssign.deleteNodeFailSilent(watcher, hri);
3192 } catch (KeeperException ke) {
3193 server.abort("Unexpected ZK exception deleting node " + hri, ke);
3194 }
3195 if (zkTable.isDisablingOrDisabledTable(hri.getTable())) {
3196 regionStates.regionOffline(hri);
3197 it.remove();
3198 continue;
3199 }
3200
3201 regionStates.updateRegionState(hri, State.OFFLINE);
3202 }
3203 } finally {
3204 lock.unlock();
3205 }
3206 }
3207 return regions;
3208 }
3209
3210
3211
3212
3213 public void balance(final RegionPlan plan) {
3214 HRegionInfo hri = plan.getRegionInfo();
3215 TableName tableName = hri.getTable();
3216 if (zkTable.isDisablingOrDisabledTable(tableName)) {
3217 LOG.info("Ignored moving region of disabling/disabled table "
3218 + tableName);
3219 return;
3220 }
3221
3222
3223 String encodedName = hri.getEncodedName();
3224 ReentrantLock lock = locker.acquireLock(encodedName);
3225 try {
3226 if (!regionStates.isRegionOnline(hri)) {
3227 RegionState state = regionStates.getRegionState(encodedName);
3228 LOG.info("Ignored moving region not assigned: " + hri + ", "
3229 + (state == null ? "not in region states" : state));
3230 return;
3231 }
3232 synchronized (this.regionPlans) {
3233 this.regionPlans.put(plan.getRegionName(), plan);
3234 }
3235 unassign(hri, false, plan.getDestination());
3236 } finally {
3237 lock.unlock();
3238 }
3239 }
3240
3241 public void stop() {
3242 shutdown();
3243 if (tomActivated){
3244 this.timeoutMonitor.interrupt();
3245 this.timerUpdater.interrupt();
3246 }
3247 }
3248
3249
3250
3251
3252 public void shutdown() {
3253
3254 synchronized (zkEventWorkerWaitingList){
3255 zkEventWorkerWaitingList.clear();
3256 }
3257 threadPoolExecutorService.shutdownNow();
3258 zkEventWorkers.shutdownNow();
3259 }
3260
3261 protected void setEnabledTable(TableName tableName) {
3262 try {
3263 this.zkTable.setEnabledTable(tableName);
3264 } catch (KeeperException e) {
3265
3266 String errorMsg = "Unable to ensure that the table " + tableName
3267 + " will be" + " enabled because of a ZooKeeper issue";
3268 LOG.error(errorMsg);
3269 this.server.abort(errorMsg, e);
3270 }
3271 }
3272
3273
3274
3275
3276
3277
3278
3279 private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3280 final AsyncCallback.StringCallback cb, final ServerName destination) {
3281 if (!state.isClosed() && !state.isOffline()) {
3282 this.server.abort("Unexpected state trying to OFFLINE; " + state,
3283 new IllegalStateException());
3284 return false;
3285 }
3286 regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3287 try {
3288 ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3289 destination, cb, state);
3290 } catch (KeeperException e) {
3291 if (e instanceof NodeExistsException) {
3292 LOG.warn("Node for " + state.getRegion() + " already exists");
3293 } else {
3294 server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3295 }
3296 return false;
3297 }
3298 return true;
3299 }
3300
3301 private boolean deleteNodeInStates(String encodedName,
3302 String desc, ServerName sn, EventType... types) {
3303 try {
3304 for (EventType et: types) {
3305 if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3306 return true;
3307 }
3308 }
3309 LOG.info("Failed to delete the " + desc + " node for "
3310 + encodedName + ". The node type may not match");
3311 } catch (NoNodeException e) {
3312 if (LOG.isDebugEnabled()) {
3313 LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3314 }
3315 } catch (KeeperException ke) {
3316 server.abort("Unexpected ZK exception deleting " + desc
3317 + " node for the region " + encodedName, ke);
3318 }
3319 return false;
3320 }
3321
3322 private void deleteMergingNode(String encodedName, ServerName sn) {
3323 deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3324 EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3325 }
3326
3327 private void deleteSplittingNode(String encodedName, ServerName sn) {
3328 deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3329 EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3330 }
3331
3332
3333
3334
3335
3336 private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3337 final String prettyPrintedRegionName, final ServerName sn) {
3338 if (!serverManager.isServerOnline(sn)) {
3339 LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3340 return false;
3341 }
3342 byte [] payloadOfMerging = rt.getPayload();
3343 List<HRegionInfo> mergingRegions;
3344 try {
3345 mergingRegions = HRegionInfo.parseDelimitedFrom(
3346 payloadOfMerging, 0, payloadOfMerging.length);
3347 } catch (IOException e) {
3348 LOG.error("Dropped merging! Failed reading " + rt.getEventType()
3349 + " payload for " + prettyPrintedRegionName);
3350 return false;
3351 }
3352 assert mergingRegions.size() == 3;
3353 HRegionInfo p = mergingRegions.get(0);
3354 HRegionInfo hri_a = mergingRegions.get(1);
3355 HRegionInfo hri_b = mergingRegions.get(2);
3356
3357 RegionState rs_p = regionStates.getRegionState(p);
3358 RegionState rs_a = regionStates.getRegionState(hri_a);
3359 RegionState rs_b = regionStates.getRegionState(hri_b);
3360
3361 if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
3362 && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
3363 && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3364 LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
3365 + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3366 return false;
3367 }
3368
3369 EventType et = rt.getEventType();
3370 if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
3371 try {
3372 if (RegionMergeTransaction.transitionMergingNode(watcher, p,
3373 hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_MERGE,
3374 EventType.RS_ZK_REGION_MERGING) == -1) {
3375 byte[] data = ZKAssign.getData(watcher, encodedName);
3376 EventType currentType = null;
3377 if (data != null) {
3378 RegionTransition newRt = RegionTransition.parseFrom(data);
3379 currentType = newRt.getEventType();
3380 }
3381 if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
3382 && currentType != EventType.RS_ZK_REGION_MERGING)) {
3383 LOG.warn("Failed to transition pending_merge node "
3384 + encodedName + " to merging, it's now " + currentType);
3385 return false;
3386 }
3387 }
3388 } catch (Exception e) {
3389 LOG.warn("Failed to transition pending_merge node "
3390 + encodedName + " to merging", e);
3391 return false;
3392 }
3393 }
3394
3395 synchronized (regionStates) {
3396 regionStates.updateRegionState(hri_a, State.MERGING);
3397 regionStates.updateRegionState(hri_b, State.MERGING);
3398 regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3399
3400 if (et != EventType.RS_ZK_REGION_MERGED) {
3401 regionStates.regionOffline(p, State.MERGING_NEW);
3402 this.mergingRegions.put(encodedName,
3403 new PairOfSameType<HRegionInfo>(hri_a, hri_b));
3404 } else {
3405 this.mergingRegions.remove(encodedName);
3406 regionOffline(hri_a, State.MERGED);
3407 regionOffline(hri_b, State.MERGED);
3408 regionOnline(p, sn);
3409 }
3410 }
3411
3412 if (et == EventType.RS_ZK_REGION_MERGED) {
3413 LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
3414
3415 try {
3416 boolean successful = false;
3417 while (!successful) {
3418
3419
3420 successful = ZKAssign.deleteNode(watcher, encodedName,
3421 EventType.RS_ZK_REGION_MERGED, sn);
3422 }
3423 } catch (KeeperException e) {
3424 if (e instanceof NoNodeException) {
3425 String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3426 LOG.debug("The znode " + znodePath + " does not exist. May be deleted already.");
3427 } else {
3428 server.abort("Error deleting MERGED node " + encodedName, e);
3429 }
3430 }
3431 LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
3432 + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
3433 + hri_b.getRegionNameAsString() + ", on " + sn);
3434
3435
3436 if (zkTable.isDisablingOrDisabledTable(p.getTable())) {
3437 unassign(p);
3438 }
3439 }
3440 return true;
3441 }
3442
3443
3444
3445
3446 private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
3447 final String prettyPrintedRegionName, final ServerName sn) {
3448 if (!serverManager.isServerOnline(sn)) {
3449 LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
3450 return false;
3451 }
3452 byte [] payloadOfSplitting = rt.getPayload();
3453 List<HRegionInfo> splittingRegions;
3454 try {
3455 splittingRegions = HRegionInfo.parseDelimitedFrom(
3456 payloadOfSplitting, 0, payloadOfSplitting.length);
3457 } catch (IOException e) {
3458 LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
3459 + " payload for " + prettyPrintedRegionName);
3460 return false;
3461 }
3462 assert splittingRegions.size() == 2;
3463 HRegionInfo hri_a = splittingRegions.get(0);
3464 HRegionInfo hri_b = splittingRegions.get(1);
3465
3466 RegionState rs_p = regionStates.getRegionState(encodedName);
3467 RegionState rs_a = regionStates.getRegionState(hri_a);
3468 RegionState rs_b = regionStates.getRegionState(hri_b);
3469
3470 if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
3471 && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3472 && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3473 LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
3474 + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3475 return false;
3476 }
3477
3478 if (rs_p == null) {
3479
3480 rs_p = regionStates.updateRegionState(rt, State.OPEN);
3481 if (rs_p == null) {
3482 LOG.warn("Received splitting for region " + prettyPrintedRegionName
3483 + " from server " + sn + " but it doesn't exist anymore,"
3484 + " probably already processed its split");
3485 return false;
3486 }
3487 regionStates.regionOnline(rs_p.getRegion(), sn);
3488 }
3489
3490 HRegionInfo p = rs_p.getRegion();
3491 EventType et = rt.getEventType();
3492 if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
3493 try {
3494 if (SplitTransaction.transitionSplittingNode(watcher, p,
3495 hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_SPLIT,
3496 EventType.RS_ZK_REGION_SPLITTING) == -1) {
3497 byte[] data = ZKAssign.getData(watcher, encodedName);
3498 EventType currentType = null;
3499 if (data != null) {
3500 RegionTransition newRt = RegionTransition.parseFrom(data);
3501 currentType = newRt.getEventType();
3502 }
3503 if (currentType == null || (currentType != EventType.RS_ZK_REGION_SPLIT
3504 && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
3505 LOG.warn("Failed to transition pending_split node "
3506 + encodedName + " to splitting, it's now " + currentType);
3507 return false;
3508 }
3509 }
3510 } catch (Exception e) {
3511 LOG.warn("Failed to transition pending_split node "
3512 + encodedName + " to splitting", e);
3513 return false;
3514 }
3515 }
3516
3517 synchronized (regionStates) {
3518 regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
3519 regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
3520 regionStates.regionOffline(hri_a, State.SPLITTING_NEW);
3521 regionStates.regionOffline(hri_b, State.SPLITTING_NEW);
3522 regionStates.updateRegionState(rt, State.SPLITTING);
3523
3524
3525
3526 if (TEST_SKIP_SPLIT_HANDLING) {
3527 LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
3528 return true;
3529 }
3530
3531 if (et == EventType.RS_ZK_REGION_SPLIT) {
3532 regionOffline(p, State.SPLIT);
3533 regionOnline(hri_a, sn);
3534 regionOnline(hri_b, sn);
3535 }
3536 }
3537
3538 if (et == EventType.RS_ZK_REGION_SPLIT) {
3539 LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
3540
3541 try {
3542 boolean successful = false;
3543 while (!successful) {
3544
3545
3546 successful = ZKAssign.deleteNode(watcher, encodedName,
3547 EventType.RS_ZK_REGION_SPLIT, sn);
3548 }
3549 } catch (KeeperException e) {
3550 if (e instanceof NoNodeException) {
3551 String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3552 LOG.debug("The znode " + znodePath + " does not exist. May be deleted already.");
3553 } else {
3554 server.abort("Error deleting SPLIT node " + encodedName, e);
3555 }
3556 }
3557 LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
3558 + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
3559 + hri_b.getRegionNameAsString() + ", on " + sn);
3560
3561
3562 if (zkTable.isDisablingOrDisabledTable(p.getTable())) {
3563 unassign(hri_a);
3564 unassign(hri_b);
3565 }
3566 }
3567 return true;
3568 }
3569
3570
3571
3572
3573
3574
3575 private void regionOffline(final HRegionInfo regionInfo, final State state) {
3576 regionStates.regionOffline(regionInfo, state);
3577 removeClosedRegion(regionInfo);
3578
3579 clearRegionPlan(regionInfo);
3580 balancer.regionOffline(regionInfo);
3581
3582
3583 sendRegionClosedNotification(regionInfo);
3584 }
3585
3586 private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
3587 final ServerName serverName) {
3588 if (!this.listeners.isEmpty()) {
3589 for (AssignmentListener listener : this.listeners) {
3590 listener.regionOpened(regionInfo, serverName);
3591 }
3592 }
3593 }
3594
3595 private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
3596 if (!this.listeners.isEmpty()) {
3597 for (AssignmentListener listener : this.listeners) {
3598 listener.regionClosed(regionInfo);
3599 }
3600 }
3601 }
3602
3603
3604
3605
3606 public LoadBalancer getBalancer() {
3607 return this.balancer;
3608 }
3609 }