1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.HashSet;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.TreeMap;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.classification.InterfaceAudience;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HRegionInfo;
35 import org.apache.hadoop.hbase.RegionTransition;
36 import org.apache.hadoop.hbase.Server;
37 import org.apache.hadoop.hbase.ServerLoad;
38 import org.apache.hadoop.hbase.ServerName;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.catalog.MetaReader;
41 import org.apache.hadoop.hbase.master.RegionState.State;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.hbase.util.Pair;
44 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
45 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
46 import org.apache.zookeeper.KeeperException;
47
48 import com.google.common.base.Preconditions;
49
50
51
52
53
54
55
56 @InterfaceAudience.Private
57 public class RegionStates {
58 private static final Log LOG = LogFactory.getLog(RegionStates.class);
59
60
61
62
63 final HashMap<String, RegionState> regionsInTransition;
64
65
66
67
68
69 private final Map<String, RegionState> regionStates;
70
71
72
73
74
75 private final Map<ServerName, Set<HRegionInfo>> serverHoldings;
76
77
78
79
80
81 private final TreeMap<HRegionInfo, ServerName> regionAssignments;
82
83
84
85
86
87
88
89
90
91
92
93 private final HashMap<String, ServerName> lastAssignments;
94
95
96
97
98
99
100 private final HashMap<String, Long> deadServers;
101
102
103
104
105
106
107
108
109 private final HashMap<ServerName, Long> processedServers;
110 private long lastProcessedServerCleanTime;
111
112 private final ServerManager serverManager;
113 private final Server server;
114
115
116 static final String LOG_SPLIT_TIME = "hbase.master.maximum.logsplit.keeptime";
117 static final long DEFAULT_LOG_SPLIT_TIME = 7200000L;
118
119 RegionStates(final Server master, final ServerManager serverManager) {
120 regionStates = new HashMap<String, RegionState>();
121 regionsInTransition = new HashMap<String, RegionState>();
122 serverHoldings = new HashMap<ServerName, Set<HRegionInfo>>();
123 regionAssignments = new TreeMap<HRegionInfo, ServerName>();
124 lastAssignments = new HashMap<String, ServerName>();
125 processedServers = new HashMap<ServerName, Long>();
126 deadServers = new HashMap<String, Long>();
127 this.serverManager = serverManager;
128 this.server = master;
129 }
130
131
132
133
134 @SuppressWarnings("unchecked")
135 public synchronized Map<HRegionInfo, ServerName> getRegionAssignments() {
136 return (Map<HRegionInfo, ServerName>)regionAssignments.clone();
137 }
138
139 public synchronized ServerName getRegionServerOfRegion(HRegionInfo hri) {
140 return regionAssignments.get(hri);
141 }
142
143
144
145
146 @SuppressWarnings("unchecked")
147 public synchronized Map<String, RegionState> getRegionsInTransition() {
148 return (Map<String, RegionState>)regionsInTransition.clone();
149 }
150
151
152
153
154 public synchronized boolean isRegionInTransition(final HRegionInfo hri) {
155 return regionsInTransition.containsKey(hri.getEncodedName());
156 }
157
158
159
160
161 public synchronized boolean isRegionInTransition(final String encodedName) {
162 return regionsInTransition.containsKey(encodedName);
163 }
164
165
166
167
168 public synchronized boolean isRegionsInTransition() {
169 return !regionsInTransition.isEmpty();
170 }
171
172
173
174
175 public synchronized boolean isRegionOnline(final HRegionInfo hri) {
176 return !isRegionInTransition(hri) && regionAssignments.containsKey(hri);
177 }
178
179
180
181
182
183 public synchronized boolean isRegionOffline(final HRegionInfo hri) {
184 return getRegionState(hri) == null || (!isRegionInTransition(hri)
185 && isRegionInState(hri, State.OFFLINE, State.CLOSED));
186 }
187
188
189
190
191 public synchronized boolean isRegionInState(
192 final HRegionInfo hri, final State... states) {
193 return isRegionInState(hri.getEncodedName(), states);
194 }
195
196
197
198
199 public synchronized boolean isRegionInState(
200 final String encodedName, final State... states) {
201 RegionState regionState = getRegionState(encodedName);
202 State s = regionState != null ? regionState.getState() : null;
203 for (State state: states) {
204 if (s == state) return true;
205 }
206 return false;
207 }
208
209
210
211
212 public synchronized void waitForUpdate(
213 final long timeout) throws InterruptedException {
214 this.wait(timeout);
215 }
216
217
218
219
220 public synchronized RegionState
221 getRegionTransitionState(final HRegionInfo hri) {
222 return regionsInTransition.get(hri.getEncodedName());
223 }
224
225
226
227
228 public synchronized RegionState
229 getRegionTransitionState(final String encodedName) {
230 return regionsInTransition.get(encodedName);
231 }
232
233
234
235
236
237
238 public synchronized void createRegionStates(
239 final List<HRegionInfo> hris) {
240 for (HRegionInfo hri: hris) {
241 createRegionState(hri);
242 }
243 }
244
245
246
247
248
249
250
251 public synchronized RegionState createRegionState(final HRegionInfo hri) {
252 State newState = (hri.isOffline() && hri.isSplit()) ? State.SPLIT : State.OFFLINE;
253 String encodedName = hri.getEncodedName();
254 RegionState regionState = regionStates.get(encodedName);
255 if (regionState != null) {
256 LOG.warn("Tried to create a state for a region already in RegionStates, "
257 + "used existing: " + regionState + ", ignored new: " + newState);
258 } else {
259 regionState = new RegionState(hri, newState);
260 regionStates.put(encodedName, regionState);
261 }
262 return regionState;
263 }
264
265
266
267
268 public synchronized RegionState updateRegionState(
269 final HRegionInfo hri, final State state) {
270 RegionState regionState = regionStates.get(hri.getEncodedName());
271 return updateRegionState(hri, state,
272 regionState == null ? null : regionState.getServerName());
273 }
274
275
276
277
278
279
280
281 public synchronized RegionState updateRegionState(
282 final RegionTransition transition, final State state) {
283 byte [] regionName = transition.getRegionName();
284 HRegionInfo regionInfo = getRegionInfo(regionName);
285 if (regionInfo == null) {
286 String prettyRegionName = HRegionInfo.prettyPrint(
287 HRegionInfo.encodeRegionName(regionName));
288 LOG.warn("Failed to find region " + prettyRegionName
289 + " in updating its state to " + state
290 + " based on region transition " + transition);
291 return null;
292 }
293 return updateRegionState(regionInfo, state,
294 transition.getServerName());
295 }
296
297
298
299
300 public synchronized RegionState updateRegionState(
301 final HRegionInfo hri, final State state, final ServerName serverName) {
302 if (state == State.FAILED_CLOSE || state == State.FAILED_OPEN) {
303 LOG.warn("Failed to open/close " + hri.getShortNameToLog()
304 + " on " + serverName + ", set to " + state);
305 }
306
307 String encodedName = hri.getEncodedName();
308 RegionState regionState = new RegionState(
309 hri, state, System.currentTimeMillis(), serverName);
310 regionsInTransition.put(encodedName, regionState);
311 RegionState oldState = regionStates.put(encodedName, regionState);
312 ServerName oldServerName = oldState == null ? null : oldState.getServerName();
313 if (oldState == null || oldState.getState() != regionState.getState()
314 || (oldServerName == null && serverName != null)
315 || (oldServerName != null && !oldServerName.equals(serverName))) {
316 LOG.info("Transitioned " + oldState + " to " + regionState);
317 }
318
319
320
321 if ((state == State.CLOSED || state == State.MERGED
322 || state == State.SPLIT) && lastAssignments.containsKey(encodedName)) {
323 ServerName last = lastAssignments.get(encodedName);
324 if (last.equals(serverName)) {
325 lastAssignments.remove(encodedName);
326 } else {
327 LOG.warn(encodedName + " moved to " + state + " on "
328 + serverName + ", expected " + last);
329 }
330 }
331
332
333 if (serverName != null && state == State.OPEN) {
334 ServerName last = lastAssignments.get(encodedName);
335 if (!serverName.equals(last)) {
336 lastAssignments.put(encodedName, serverName);
337 if (last != null && isServerDeadAndNotProcessed(last)) {
338 LOG.warn(encodedName + " moved to " + serverName
339 + ", while it's previous host " + last
340 + " is dead but not processed yet");
341 }
342 }
343 }
344
345
346 this.notifyAll();
347 return regionState;
348 }
349
350
351
352
353
354
355 public synchronized void regionOnline(
356 final HRegionInfo hri, final ServerName serverName) {
357 if (!serverManager.isServerOnline(serverName)) {
358
359
360
361
362 LOG.warn("Ignored, " + hri.getEncodedName()
363 + " was opened on a dead server: " + serverName);
364 return;
365 }
366
367 String encodedName = hri.getEncodedName();
368 RegionState oldState = regionStates.get(encodedName);
369 if (oldState == null) {
370 LOG.warn("Online region not in RegionStates: " + hri.getShortNameToLog());
371 }
372 updateRegionState(hri, State.OPEN, serverName);
373 regionsInTransition.remove(encodedName);
374
375 ServerName oldServerName = regionAssignments.put(hri, serverName);
376 if (!serverName.equals(oldServerName)) {
377 LOG.info("Onlined " + hri.getShortNameToLog() + " on " + serverName);
378 Set<HRegionInfo> regions = serverHoldings.get(serverName);
379 if (regions == null) {
380 regions = new HashSet<HRegionInfo>();
381 serverHoldings.put(serverName, regions);
382 }
383 regions.add(hri);
384 if (oldServerName != null) {
385 LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName);
386 Set<HRegionInfo> oldRegions = serverHoldings.get(oldServerName);
387 oldRegions.remove(hri);
388 if (oldRegions.isEmpty()) {
389 serverHoldings.remove(oldServerName);
390 }
391 }
392 }
393 }
394
395
396
397
398
399 public synchronized void logSplit(final ServerName serverName) {
400 for (Iterator<Map.Entry<String, ServerName>> it
401 = lastAssignments.entrySet().iterator(); it.hasNext();) {
402 Map.Entry<String, ServerName> e = it.next();
403 if (e.getValue().equals(serverName)) {
404 it.remove();
405 }
406 }
407 long now = System.currentTimeMillis();
408 processedServers.put(serverName, Long.valueOf(now));
409 Configuration conf = server.getConfiguration();
410 long obsoleteTime = conf.getLong(LOG_SPLIT_TIME, DEFAULT_LOG_SPLIT_TIME);
411
412 if (now > lastProcessedServerCleanTime + obsoleteTime) {
413 lastProcessedServerCleanTime = now;
414 long cutoff = now - obsoleteTime;
415 for (Iterator<Map.Entry<ServerName, Long>> it
416 = processedServers.entrySet().iterator(); it.hasNext();) {
417 Map.Entry<ServerName, Long> e = it.next();
418 if (e.getValue().longValue() < cutoff) {
419 it.remove();
420 }
421 }
422 }
423 }
424
425
426
427
428 public synchronized void logSplit(final HRegionInfo region) {
429 clearLastAssignment(region);
430 }
431
432 public synchronized void clearLastAssignment(final HRegionInfo region) {
433 lastAssignments.remove(region.getEncodedName());
434 }
435
436
437
438
439 public void regionOffline(final HRegionInfo hri) {
440 regionOffline(hri, null);
441 }
442
443
444
445
446
447
448 public synchronized void regionOffline(
449 final HRegionInfo hri, final State expectedState) {
450 Preconditions.checkArgument(expectedState == null
451 || RegionState.isUnassignable(expectedState),
452 "Offlined region should not be " + expectedState);
453 String encodedName = hri.getEncodedName();
454 State newState =
455 expectedState == null ? State.OFFLINE : expectedState;
456 updateRegionState(hri, newState);
457 regionsInTransition.remove(encodedName);
458
459 ServerName oldServerName = regionAssignments.remove(hri);
460 if (oldServerName != null) {
461 LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName);
462 Set<HRegionInfo> oldRegions = serverHoldings.get(oldServerName);
463 oldRegions.remove(hri);
464 if (oldRegions.isEmpty()) {
465 serverHoldings.remove(oldServerName);
466 }
467 }
468 }
469
470
471
472
473 public synchronized List<HRegionInfo> serverOffline(
474 final ZooKeeperWatcher watcher, final ServerName sn) {
475
476 List<HRegionInfo> rits = new ArrayList<HRegionInfo>();
477 Set<HRegionInfo> assignedRegions = serverHoldings.get(sn);
478 if (assignedRegions == null) {
479 assignedRegions = new HashSet<HRegionInfo>();
480 }
481
482
483 Set<HRegionInfo> regionsToOffline = new HashSet<HRegionInfo>();
484 for (HRegionInfo region : assignedRegions) {
485
486 if (isRegionOnline(region)) {
487 regionsToOffline.add(region);
488 } else {
489 if (isRegionInState(region, State.SPLITTING, State.MERGING)) {
490 LOG.debug("Offline splitting/merging region " + getRegionState(region));
491 try {
492
493 ZKAssign.deleteNodeFailSilent(watcher, region);
494 regionsToOffline.add(region);
495 } catch (KeeperException ke) {
496 server.abort("Unexpected ZK exception deleting node " + region, ke);
497 }
498 }
499 }
500 }
501
502 for (HRegionInfo hri : regionsToOffline) {
503 regionOffline(hri);
504 }
505
506 for (RegionState state : regionsInTransition.values()) {
507 HRegionInfo hri = state.getRegion();
508 if (assignedRegions.contains(hri)) {
509
510
511
512 LOG.info("Transitioning " + state + " will be handled by SSH for " + sn);
513 } else if (sn.equals(state.getServerName())) {
514
515
516
517
518
519
520
521 if (state.isPendingOpenOrOpening() || state.isFailedClose() || state.isOffline()) {
522 LOG.info("Found region in " + state + " to be reassigned by SSH for " + sn);
523 rits.add(hri);
524 } else {
525 LOG.warn("THIS SHOULD NOT HAPPEN: unexpected " + state);
526 }
527 }
528 }
529
530 this.notifyAll();
531 return rits;
532 }
533
534
535
536
537
538
539
540
541
542
543
544 public synchronized List<HRegionInfo> getRegionsOfTable(TableName tableName) {
545 List<HRegionInfo> tableRegions = new ArrayList<HRegionInfo>();
546
547
548 HRegionInfo boundary = new HRegionInfo(tableName, null, null, false, 0L);
549 for (HRegionInfo hri: regionAssignments.tailMap(boundary).keySet()) {
550 if(!hri.getTable().equals(tableName)) break;
551 tableRegions.add(hri);
552 }
553 return tableRegions;
554 }
555
556
557
558
559
560
561
562
563 public synchronized void waitOnRegionToClearRegionsInTransition(
564 final HRegionInfo hri) throws InterruptedException {
565 if (!isRegionInTransition(hri)) return;
566
567 while(!server.isStopped() && isRegionInTransition(hri)) {
568 RegionState rs = getRegionState(hri);
569 LOG.info("Waiting on " + rs + " to clear regions-in-transition");
570 waitForUpdate(100);
571 }
572
573 if (server.isStopped()) {
574 LOG.info("Giving up wait on region in " +
575 "transition because stoppable.isStopped is set");
576 }
577 }
578
579
580
581
582
583 public synchronized void tableDeleted(final TableName tableName) {
584 Set<HRegionInfo> regionsToDelete = new HashSet<HRegionInfo>();
585 for (RegionState state: regionStates.values()) {
586 HRegionInfo region = state.getRegion();
587 if (region.getTable().equals(tableName)) {
588 regionsToDelete.add(region);
589 }
590 }
591 for (HRegionInfo region: regionsToDelete) {
592 deleteRegion(region);
593 }
594 }
595
596
597
598
599
600
601
602
603
604
605
606 synchronized boolean wasRegionOnDeadServer(final String encodedName) {
607 ServerName server = lastAssignments.get(encodedName);
608 return isServerDeadAndNotProcessed(server);
609 }
610
611 synchronized boolean isServerDeadAndNotProcessed(ServerName server) {
612 if (server == null) return false;
613 if (serverManager.isServerOnline(server)) {
614 String hostAndPort = server.getHostAndPort();
615 long startCode = server.getStartcode();
616 Long deadCode = deadServers.get(hostAndPort);
617 if (deadCode == null || startCode > deadCode.longValue()) {
618 if (serverManager.isServerReachable(server)) {
619 return false;
620 }
621
622 deadServers.put(hostAndPort, Long.valueOf(startCode));
623 }
624
625
626
627
628
629
630
631 LOG.warn("Couldn't reach online server " + server);
632 }
633
634 return !processedServers.containsKey(server);
635 }
636
637
638
639
640
641 synchronized ServerName getLastRegionServerOfRegion(final String encodedName) {
642 return lastAssignments.get(encodedName);
643 }
644
645 synchronized void setLastRegionServerOfRegions(
646 final ServerName serverName, final List<HRegionInfo> regionInfos) {
647 for (HRegionInfo hri: regionInfos) {
648 setLastRegionServerOfRegion(serverName, hri.getEncodedName());
649 }
650 }
651
652 synchronized void setLastRegionServerOfRegion(
653 final ServerName serverName, final String encodedName) {
654 lastAssignments.put(encodedName, serverName);
655 }
656
657
658
659
660
661
662
663 protected synchronized double getAverageLoad() {
664 int numServers = 0, totalLoad = 0;
665 for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
666 Set<HRegionInfo> regions = e.getValue();
667 ServerName serverName = e.getKey();
668 int regionCount = regions.size();
669 if (regionCount > 0 || serverManager.isServerOnline(serverName)) {
670 totalLoad += regionCount;
671 numServers++;
672 }
673 }
674 return numServers == 0 ? 0.0 :
675 (double)totalLoad / (double)numServers;
676 }
677
678
679
680
681
682
683
684
685
686 protected Map<TableName, Map<ServerName, List<HRegionInfo>>>
687 getAssignmentsByTable() {
688 Map<TableName, Map<ServerName, List<HRegionInfo>>> result =
689 new HashMap<TableName, Map<ServerName,List<HRegionInfo>>>();
690 synchronized (this) {
691 if (!server.getConfiguration().getBoolean("hbase.master.loadbalance.bytable", false)) {
692 Map<ServerName, List<HRegionInfo>> svrToRegions =
693 new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
694 for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
695 svrToRegions.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
696 }
697 result.put(TableName.valueOf("ensemble"), svrToRegions);
698 } else {
699 for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
700 for (HRegionInfo hri: e.getValue()) {
701 if (hri.isMetaRegion()) continue;
702 TableName tablename = hri.getTable();
703 Map<ServerName, List<HRegionInfo>> svrToRegions = result.get(tablename);
704 if (svrToRegions == null) {
705 svrToRegions = new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
706 result.put(tablename, svrToRegions);
707 }
708 List<HRegionInfo> regions = svrToRegions.get(e.getKey());
709 if (regions == null) {
710 regions = new ArrayList<HRegionInfo>();
711 svrToRegions.put(e.getKey(), regions);
712 }
713 regions.add(hri);
714 }
715 }
716 }
717 }
718
719 Map<ServerName, ServerLoad>
720 onlineSvrs = serverManager.getOnlineServers();
721
722 for (Map<ServerName, List<HRegionInfo>> map: result.values()) {
723 for (ServerName svr: onlineSvrs.keySet()) {
724 if (!map.containsKey(svr)) {
725 map.put(svr, new ArrayList<HRegionInfo>());
726 }
727 }
728 }
729 return result;
730 }
731
732 protected synchronized RegionState getRegionState(final HRegionInfo hri) {
733 return regionStates.get(hri.getEncodedName());
734 }
735
736 protected synchronized RegionState getRegionState(final String encodedName) {
737 return regionStates.get(encodedName);
738 }
739
740
741
742
743
744
745 protected HRegionInfo getRegionInfo(final byte [] regionName) {
746 String encodedName = HRegionInfo.encodeRegionName(regionName);
747 RegionState regionState = regionStates.get(encodedName);
748 if (regionState != null) {
749 return regionState.getRegion();
750 }
751
752 try {
753 Pair<HRegionInfo, ServerName> p =
754 MetaReader.getRegion(server.getCatalogTracker(), regionName);
755 HRegionInfo hri = p == null ? null : p.getFirst();
756 if (hri != null) {
757 createRegionState(hri);
758 }
759 return hri;
760 } catch (IOException e) {
761 server.abort("Aborting because error occoured while reading "
762 + Bytes.toStringBinary(regionName) + " from hbase:meta", e);
763 return null;
764 }
765 }
766
767
768
769
770 private void deleteRegion(final HRegionInfo hri) {
771 String encodedName = hri.getEncodedName();
772 regionsInTransition.remove(encodedName);
773 regionStates.remove(encodedName);
774 lastAssignments.remove(encodedName);
775 ServerName sn = regionAssignments.remove(hri);
776 if (sn != null) {
777 Set<HRegionInfo> regions = serverHoldings.get(sn);
778 regions.remove(hri);
779 }
780 }
781 }