1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.net.InetAddress;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Map.Entry;
31 import java.util.Set;
32 import java.util.SortedMap;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentSkipListMap;
35 import java.util.concurrent.CopyOnWriteArrayList;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.classification.InterfaceAudience;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.hbase.ClockOutOfSyncException;
42 import org.apache.hadoop.hbase.HRegionInfo;
43 import org.apache.hadoop.hbase.RegionLoad;
44 import org.apache.hadoop.hbase.Server;
45 import org.apache.hadoop.hbase.ServerLoad;
46 import org.apache.hadoop.hbase.ServerName;
47 import org.apache.hadoop.hbase.YouAreDeadException;
48 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
49 import org.apache.hadoop.hbase.client.HConnection;
50 import org.apache.hadoop.hbase.client.HConnectionManager;
51 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
52 import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
53 import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
54 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
55 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
56 import org.apache.hadoop.hbase.protobuf.RequestConverter;
57 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
58 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
59 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
60 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
61 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
62 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
63 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
64 import org.apache.hadoop.hbase.util.Bytes;
65 import org.apache.hadoop.hbase.util.Triple;
66
67 import com.google.common.annotations.VisibleForTesting;
68 import com.google.protobuf.ServiceException;
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92 @InterfaceAudience.Private
93 public class ServerManager {
94 public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
95 "hbase.master.wait.on.regionservers.maxtostart";
96
97 public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
98 "hbase.master.wait.on.regionservers.mintostart";
99
100 public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
101 "hbase.master.wait.on.regionservers.timeout";
102
103 public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
104 "hbase.master.wait.on.regionservers.interval";
105
106 private static final Log LOG = LogFactory.getLog(ServerManager.class);
107
108
109 private volatile boolean clusterShutdown = false;
110
111 private final SortedMap<byte[], Long> flushedSequenceIdByRegion =
112 new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
113
114
115 private final ConcurrentHashMap<ServerName, ServerLoad> onlineServers =
116 new ConcurrentHashMap<ServerName, ServerLoad>();
117
118
119
120
121
122 private final Map<ServerName, AdminService.BlockingInterface> rsAdmins =
123 new HashMap<ServerName, AdminService.BlockingInterface>();
124
125
126
127
128
129 private final ArrayList<ServerName> drainingServers =
130 new ArrayList<ServerName>();
131
132 private final Server master;
133 private final MasterServices services;
134 private final HConnection connection;
135
136 private final DeadServer deadservers = new DeadServer();
137
138 private final long maxSkew;
139 private final long warningSkew;
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157 private Set<ServerName> queuedDeadServers = new HashSet<ServerName>();
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174 private Map<ServerName, Boolean> requeuedDeadServers = new HashMap<ServerName, Boolean>();
175
176
177 private List<ServerListener> listeners = new CopyOnWriteArrayList<ServerListener>();
178
179
180
181
182
183
184
185 public ServerManager(final Server master, final MasterServices services)
186 throws IOException {
187 this(master, services, true);
188 }
189
190 @SuppressWarnings("deprecation")
191 ServerManager(final Server master, final MasterServices services,
192 final boolean connect) throws IOException {
193 this.master = master;
194 this.services = services;
195 Configuration c = master.getConfiguration();
196 maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
197 warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
198 this.connection = connect ? HConnectionManager.getConnection(c) : null;
199 }
200
201
202
203
204
205 public void registerListener(final ServerListener listener) {
206 this.listeners.add(listener);
207 }
208
209
210
211
212
213 public boolean unregisterListener(final ServerListener listener) {
214 return this.listeners.remove(listener);
215 }
216
217
218
219
220
221
222
223
224
225
226 ServerName regionServerStartup(final InetAddress ia, final int port,
227 final long serverStartcode, long serverCurrentTime)
228 throws IOException {
229
230
231
232
233
234
235
236 ServerName sn = ServerName.valueOf(ia.getHostName(), port, serverStartcode);
237 checkClockSkew(sn, serverCurrentTime);
238 checkIsDead(sn, "STARTUP");
239 if (!checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
240 LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup"
241 + " could not record the server: " + sn);
242 }
243 return sn;
244 }
245
246
247
248
249
250
251 private void updateLastFlushedSequenceIds(ServerName sn, ServerLoad hsl) {
252 Map<byte[], RegionLoad> regionsLoad = hsl.getRegionsLoad();
253 for (Entry<byte[], RegionLoad> entry : regionsLoad.entrySet()) {
254 Long existingValue = flushedSequenceIdByRegion.get(entry.getKey());
255 long l = entry.getValue().getCompleteSequenceId();
256 if (existingValue != null) {
257 if (l != -1 && l < existingValue) {
258 LOG.warn("RegionServer " + sn +
259 " indicates a last flushed sequence id (" + entry.getValue() +
260 ") that is less than the previous last flushed sequence id (" +
261 existingValue + ") for region " +
262 Bytes.toString(entry.getKey()) + " Ignoring.");
263
264 continue;
265
266 }
267 }
268 flushedSequenceIdByRegion.put(entry.getKey(), l);
269 }
270 }
271
272 void regionServerReport(ServerName sn,
273 ServerLoad sl) throws YouAreDeadException {
274 checkIsDead(sn, "REPORT");
275 if (null == this.onlineServers.replace(sn, sl)) {
276
277
278
279
280
281
282 if (!checkAndRecordNewServer(sn, sl)) {
283 LOG.info("RegionServerReport ignored, could not record the server: " + sn);
284 return;
285 }
286 }
287 updateLastFlushedSequenceIds(sn, sl);
288 }
289
290
291
292
293
294
295
296
297
298 boolean checkAndRecordNewServer(
299 final ServerName serverName, final ServerLoad sl) {
300 ServerName existingServer = null;
301 synchronized (this.onlineServers) {
302 existingServer = findServerWithSameHostnamePortWithLock(serverName);
303 if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
304 LOG.info("Server serverName=" + serverName + " rejected; we already have "
305 + existingServer.toString() + " registered with same hostname and port");
306 return false;
307 }
308 recordNewServerWithLock(serverName, sl);
309 }
310
311
312 if (!this.listeners.isEmpty()) {
313 for (ServerListener listener : this.listeners) {
314 listener.serverAdded(serverName);
315 }
316 }
317
318
319
320 if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
321 LOG.info("Triggering server recovery; existingServer " +
322 existingServer + " looks stale, new server:" + serverName);
323 expireServer(existingServer);
324 }
325 return true;
326 }
327
328
329
330
331
332
333
334
335
336 private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
337 throws ClockOutOfSyncException {
338 long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
339 if (skew > maxSkew) {
340 String message = "Server " + serverName + " has been " +
341 "rejected; Reported time is too far out of sync with master. " +
342 "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
343 LOG.warn(message);
344 throw new ClockOutOfSyncException(message);
345 } else if (skew > warningSkew){
346 String message = "Reported time for server " + serverName + " is out of sync with master " +
347 "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
348 "error threshold is " + maxSkew + "ms)";
349 LOG.warn(message);
350 }
351 }
352
353
354
355
356
357
358
359
360
361 private void checkIsDead(final ServerName serverName, final String what)
362 throws YouAreDeadException {
363 if (this.deadservers.isDeadServer(serverName)) {
364
365
366 String message = "Server " + what + " rejected; currently processing " +
367 serverName + " as dead server";
368 LOG.debug(message);
369 throw new YouAreDeadException(message);
370 }
371
372
373 if ((this.services == null || ((HMaster) this.services).isInitialized())
374 && this.deadservers.cleanPreviousInstance(serverName)) {
375
376
377 LOG.debug(what + ":" + " Server " + serverName + " came back up," +
378 " removed it from the dead servers list");
379 }
380 }
381
382
383
384
385
386 private ServerName findServerWithSameHostnamePortWithLock(
387 final ServerName serverName) {
388 for (ServerName sn: this.onlineServers.keySet()) {
389 if (ServerName.isSameHostnameAndPort(serverName, sn)) return sn;
390 }
391 return null;
392 }
393
394
395
396
397
398
399
400 @VisibleForTesting
401 void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {
402 LOG.info("Registering server=" + serverName);
403 this.onlineServers.put(serverName, sl);
404 this.rsAdmins.remove(serverName);
405 }
406
407 public long getLastFlushedSequenceId(byte[] regionName) {
408 long seqId = -1;
409 if (flushedSequenceIdByRegion.containsKey(regionName)) {
410 seqId = flushedSequenceIdByRegion.get(regionName);
411 }
412 return seqId;
413 }
414
415
416
417
418
419 public ServerLoad getLoad(final ServerName serverName) {
420 return this.onlineServers.get(serverName);
421 }
422
423
424
425
426
427
428
429 public double getAverageLoad() {
430 int totalLoad = 0;
431 int numServers = 0;
432 double averageLoad;
433 for (ServerLoad sl: this.onlineServers.values()) {
434 numServers++;
435 totalLoad += sl.getNumberOfRegions();
436 }
437 averageLoad = (double)totalLoad / (double)numServers;
438 return averageLoad;
439 }
440
441
442 int countOfRegionServers() {
443
444 return this.onlineServers.size();
445 }
446
447
448
449
450 public Map<ServerName, ServerLoad> getOnlineServers() {
451
452 synchronized (this.onlineServers) {
453 return Collections.unmodifiableMap(this.onlineServers);
454 }
455 }
456
457
458 public DeadServer getDeadServers() {
459 return this.deadservers;
460 }
461
462
463
464
465
466 public boolean areDeadServersInProgress() {
467 return this.deadservers.areDeadServersInProgress();
468 }
469
470 void letRegionServersShutdown() {
471 long previousLogTime = 0;
472 int onlineServersCt;
473 while ((onlineServersCt = onlineServers.size()) > 0) {
474
475 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
476 StringBuilder sb = new StringBuilder();
477
478 for (ServerName key : this.onlineServers.keySet()) {
479 if (sb.length() > 0) {
480 sb.append(", ");
481 }
482 sb.append(key);
483 }
484 LOG.info("Waiting on regionserver(s) to go down " + sb.toString());
485 previousLogTime = System.currentTimeMillis();
486 }
487
488 synchronized (onlineServers) {
489 try {
490 if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
491 } catch (InterruptedException ignored) {
492
493 }
494 }
495 }
496 }
497
498
499
500
501
502 public synchronized void expireServer(final ServerName serverName) {
503 if (!services.isServerShutdownHandlerEnabled()) {
504 LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
505 + "delay expiring server " + serverName);
506 this.queuedDeadServers.add(serverName);
507 return;
508 }
509 if (this.deadservers.isDeadServer(serverName)) {
510
511 LOG.warn("Expiration of " + serverName +
512 " but server shutdown already in progress");
513 return;
514 }
515 synchronized (onlineServers) {
516 if (!this.onlineServers.containsKey(serverName)) {
517 LOG.warn("Expiration of " + serverName + " but server not online");
518 }
519
520
521
522 this.deadservers.add(serverName);
523 this.onlineServers.remove(serverName);
524 onlineServers.notifyAll();
525 }
526 this.rsAdmins.remove(serverName);
527
528
529 if (this.clusterShutdown) {
530 LOG.info("Cluster shutdown set; " + serverName +
531 " expired; onlineServers=" + this.onlineServers.size());
532 if (this.onlineServers.isEmpty()) {
533 master.stop("Cluster shutdown set; onlineServer=0");
534 }
535 return;
536 }
537
538 boolean carryingMeta = services.getAssignmentManager().isCarryingMeta(serverName);
539 if (carryingMeta) {
540 this.services.getExecutorService().submit(new MetaServerShutdownHandler(this.master,
541 this.services, this.deadservers, serverName));
542 } else {
543 this.services.getExecutorService().submit(new ServerShutdownHandler(this.master,
544 this.services, this.deadservers, serverName, true));
545 }
546 LOG.debug("Added=" + serverName +
547 " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
548
549
550 if (!this.listeners.isEmpty()) {
551 for (ServerListener listener : this.listeners) {
552 listener.serverRemoved(serverName);
553 }
554 }
555 }
556
557 public synchronized void processDeadServer(final ServerName serverName) {
558 this.processDeadServer(serverName, false);
559 }
560
561 public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitHlog) {
562
563
564
565
566
567
568
569
570 if (!services.getAssignmentManager().isFailoverCleanupDone()) {
571 requeuedDeadServers.put(serverName, shouldSplitHlog);
572 return;
573 }
574
575 this.deadservers.add(serverName);
576 this.services.getExecutorService().submit(
577 new ServerShutdownHandler(this.master, this.services, this.deadservers, serverName,
578 shouldSplitHlog));
579 }
580
581
582
583
584
585 synchronized void processQueuedDeadServers() {
586 if (!services.isServerShutdownHandlerEnabled()) {
587 LOG.info("Master hasn't enabled ServerShutdownHandler");
588 }
589 Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
590 while (serverIterator.hasNext()) {
591 ServerName tmpServerName = serverIterator.next();
592 expireServer(tmpServerName);
593 serverIterator.remove();
594 requeuedDeadServers.remove(tmpServerName);
595 }
596
597 if (!services.getAssignmentManager().isFailoverCleanupDone()) {
598 LOG.info("AssignmentManager hasn't finished failover cleanup; waiting");
599 }
600
601 for(ServerName tmpServerName : requeuedDeadServers.keySet()){
602 processDeadServer(tmpServerName, requeuedDeadServers.get(tmpServerName));
603 }
604 requeuedDeadServers.clear();
605 }
606
607
608
609
610 public boolean removeServerFromDrainList(final ServerName sn) {
611
612
613
614 if (!this.isServerOnline(sn)) {
615 LOG.warn("Server " + sn + " is not currently online. " +
616 "Removing from draining list anyway, as requested.");
617 }
618
619 return this.drainingServers.remove(sn);
620 }
621
622
623
624
625 public boolean addServerToDrainList(final ServerName sn) {
626
627
628
629 if (!this.isServerOnline(sn)) {
630 LOG.warn("Server " + sn + " is not currently online. " +
631 "Ignoring request to add it to draining list.");
632 return false;
633 }
634
635
636 if (this.drainingServers.contains(sn)) {
637 LOG.warn("Server " + sn + " is already in the draining server list." +
638 "Ignoring request to add it again.");
639 return false;
640 }
641 return this.drainingServers.add(sn);
642 }
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657 public RegionOpeningState sendRegionOpen(final ServerName server,
658 HRegionInfo region, int versionOfOfflineNode, List<ServerName> favoredNodes)
659 throws IOException {
660 AdminService.BlockingInterface admin = getRsAdmin(server);
661 if (admin == null) {
662 LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
663 " failed because no RPC connection found to this server");
664 return RegionOpeningState.FAILED_OPENING;
665 }
666 OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
667 region, versionOfOfflineNode, favoredNodes,
668 (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
669 try {
670 OpenRegionResponse response = admin.openRegion(null, request);
671 return ResponseConverter.getRegionOpeningState(response);
672 } catch (ServiceException se) {
673 throw ProtobufUtil.getRemoteException(se);
674 }
675 }
676
677
678
679
680
681
682
683
684
685
686 public List<RegionOpeningState> sendRegionOpen(ServerName server,
687 List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos)
688 throws IOException {
689 AdminService.BlockingInterface admin = getRsAdmin(server);
690 if (admin == null) {
691 LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
692 " failed because no RPC connection found to this server");
693 return null;
694 }
695
696 OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(regionOpenInfos,
697 (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
698 try {
699 OpenRegionResponse response = admin.openRegion(null, request);
700 return ResponseConverter.getRegionOpeningStateList(response);
701 } catch (ServiceException se) {
702 throw ProtobufUtil.getRemoteException(se);
703 }
704 }
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720 public boolean sendRegionClose(ServerName server, HRegionInfo region,
721 int versionOfClosingNode, ServerName dest, boolean transitionInZK) throws IOException {
722 if (server == null) throw new NullPointerException("Passed server is null");
723 AdminService.BlockingInterface admin = getRsAdmin(server);
724 if (admin == null) {
725 throw new IOException("Attempting to send CLOSE RPC to server " +
726 server.toString() + " for region " +
727 region.getRegionNameAsString() +
728 " failed because no RPC connection found to this server");
729 }
730 return ProtobufUtil.closeRegion(admin, server, region.getRegionName(),
731 versionOfClosingNode, dest, transitionInZK);
732 }
733
734 public boolean sendRegionClose(ServerName server,
735 HRegionInfo region, int versionOfClosingNode) throws IOException {
736 return sendRegionClose(server, region, versionOfClosingNode, null, true);
737 }
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752 public void sendRegionsMerge(ServerName server, HRegionInfo region_a,
753 HRegionInfo region_b, boolean forcible) throws IOException {
754 if (server == null)
755 throw new NullPointerException("Passed server is null");
756 if (region_a == null || region_b == null)
757 throw new NullPointerException("Passed region is null");
758 AdminService.BlockingInterface admin = getRsAdmin(server);
759 if (admin == null) {
760 throw new IOException("Attempting to send MERGE REGIONS RPC to server "
761 + server.toString() + " for region "
762 + region_a.getRegionNameAsString() + ","
763 + region_b.getRegionNameAsString()
764 + " failed because no RPC connection found to this server");
765 }
766 ProtobufUtil.mergeRegions(admin, region_a, region_b, forcible);
767 }
768
769
770
771
772 public boolean isServerReachable(ServerName server) {
773 if (server == null) throw new NullPointerException("Passed server is null");
774 int maximumAttempts = Math.max(1, master.getConfiguration().getInt(
775 "hbase.master.maximum.ping.server.attempts", 10));
776 for (int i = 0; i < maximumAttempts; i++) {
777 try {
778 AdminService.BlockingInterface admin = getRsAdmin(server);
779 if (admin != null) {
780 ServerInfo info = ProtobufUtil.getServerInfo(admin);
781 return info != null && info.hasServerName()
782 && server.getStartcode() == info.getServerName().getStartCode();
783 }
784 } catch (IOException ioe) {
785 LOG.debug("Couldn't reach " + server + ", try=" + i
786 + " of " + maximumAttempts, ioe);
787 }
788 }
789 return false;
790 }
791
792
793
794
795
796
797
798 private AdminService.BlockingInterface getRsAdmin(final ServerName sn)
799 throws IOException {
800 AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
801 if (admin == null) {
802 LOG.debug("New admin connection to " + sn.toString());
803 admin = this.connection.getAdmin(sn);
804 this.rsAdmins.put(sn, admin);
805 }
806 return admin;
807 }
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822 public void waitForRegionServers(MonitoredTask status)
823 throws InterruptedException {
824 final long interval = this.master.getConfiguration().
825 getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
826 final long timeout = this.master.getConfiguration().
827 getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
828 int minToStart = this.master.getConfiguration().
829 getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
830 if (minToStart < 1) {
831 LOG.warn(String.format(
832 "The value of '%s' (%d) can not be less than 1, ignoring.",
833 WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
834 minToStart = 1;
835 }
836 int maxToStart = this.master.getConfiguration().
837 getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
838 if (maxToStart < minToStart) {
839 LOG.warn(String.format(
840 "The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
841 WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
842 WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
843 maxToStart = Integer.MAX_VALUE;
844 }
845
846 long now = System.currentTimeMillis();
847 final long startTime = now;
848 long slept = 0;
849 long lastLogTime = 0;
850 long lastCountChange = startTime;
851 int count = countOfRegionServers();
852 int oldCount = 0;
853 while (
854 !this.master.isStopped() &&
855 count < maxToStart &&
856 (lastCountChange+interval > now || timeout > slept || count < minToStart)
857 ){
858
859
860 if (oldCount != count || lastLogTime+interval < now){
861 lastLogTime = now;
862 String msg =
863 "Waiting for region servers count to settle; currently"+
864 " checked in " + count + ", slept for " + slept + " ms," +
865 " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+
866 ", timeout of "+timeout+" ms, interval of "+interval+" ms.";
867 LOG.info(msg);
868 status.setStatus(msg);
869 }
870
871
872 final long sleepTime = 50;
873 Thread.sleep(sleepTime);
874 now = System.currentTimeMillis();
875 slept = now - startTime;
876
877 oldCount = count;
878 count = countOfRegionServers();
879 if (count != oldCount) {
880 lastCountChange = now;
881 }
882 }
883
884 LOG.info("Finished waiting for region servers count to settle;" +
885 " checked in " + count + ", slept for " + slept + " ms," +
886 " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+","+
887 " master is "+ (this.master.isStopped() ? "stopped.": "running.")
888 );
889 }
890
891
892
893
894 public List<ServerName> getOnlineServersList() {
895
896
897 return new ArrayList<ServerName>(this.onlineServers.keySet());
898 }
899
900
901
902
903 public List<ServerName> getDrainingServersList() {
904 return new ArrayList<ServerName>(this.drainingServers);
905 }
906
907
908
909
910 Set<ServerName> getDeadNotExpiredServers() {
911 return new HashSet<ServerName>(this.queuedDeadServers);
912 }
913
914
915
916
917
918 Map<ServerName, Boolean> getRequeuedDeadServers() {
919 return Collections.unmodifiableMap(this.requeuedDeadServers);
920 }
921
922 public boolean isServerOnline(ServerName serverName) {
923 return serverName != null && onlineServers.containsKey(serverName);
924 }
925
926
927
928
929
930
931
932 public synchronized boolean isServerDead(ServerName serverName) {
933 return serverName == null || deadservers.isDeadServer(serverName)
934 || queuedDeadServers.contains(serverName)
935 || requeuedDeadServers.containsKey(serverName);
936 }
937
938 public void shutdownCluster() {
939 this.clusterShutdown = true;
940 this.master.stop("Cluster shutdown requested");
941 }
942
943 public boolean isClusterShutdown() {
944 return this.clusterShutdown;
945 }
946
947
948
949
950 public void stop() {
951 if (connection != null) {
952 try {
953 connection.close();
954 } catch (IOException e) {
955 LOG.error("Attempt to close connection to master failed", e);
956 }
957 }
958 }
959
960
961
962
963
964
965 public List<ServerName> createDestinationServersList(final ServerName serverToExclude){
966 final List<ServerName> destServers = getOnlineServersList();
967
968 if (serverToExclude != null){
969 destServers.remove(serverToExclude);
970 }
971
972
973 final List<ServerName> drainingServersCopy = getDrainingServersList();
974 if (!drainingServersCopy.isEmpty()) {
975 for (final ServerName server: drainingServersCopy) {
976 destServers.remove(server);
977 }
978 }
979
980
981 removeDeadNotExpiredServers(destServers);
982
983 return destServers;
984 }
985
986
987
988
989 public List<ServerName> createDestinationServersList(){
990 return createDestinationServersList(null);
991 }
992
993
994
995
996
997
998
999 void removeDeadNotExpiredServers(List<ServerName> servers) {
1000 Set<ServerName> deadNotExpiredServersCopy = this.getDeadNotExpiredServers();
1001 if (!deadNotExpiredServersCopy.isEmpty()) {
1002 for (ServerName server : deadNotExpiredServersCopy) {
1003 LOG.debug("Removing dead but not expired server: " + server
1004 + " from eligible server pool.");
1005 servers.remove(server);
1006 }
1007 }
1008 }
1009
1010
1011
1012
1013 void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
1014 for (ServerName serverName : getOnlineServersList()) {
1015 deadservers.cleanAllPreviousInstances(serverName);
1016 }
1017 }
1018 }