1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.regex.Matcher;
30 import java.util.regex.Pattern;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.Abortable;
37 import org.apache.hadoop.hbase.AuthUtil;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HRegionInfo;
40 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
41 import org.apache.hadoop.hbase.classification.InterfaceAudience;
42 import org.apache.hadoop.hbase.security.Superusers;
43 import org.apache.hadoop.security.UserGroupInformation;
44 import org.apache.zookeeper.KeeperException;
45 import org.apache.zookeeper.WatchedEvent;
46 import org.apache.zookeeper.Watcher;
47 import org.apache.zookeeper.ZooDefs;
48 import org.apache.zookeeper.ZooDefs.Ids;
49 import org.apache.zookeeper.ZooDefs.Perms;
50 import org.apache.zookeeper.data.ACL;
51 import org.apache.zookeeper.data.Id;
52 import org.apache.zookeeper.data.Stat;
53
54
55
56
57
58
59
60
61
62
63
64
65 @InterfaceAudience.Private
66 public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
67 private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
68
69
70
71 private String prefix;
72 private String identifier;
73
74
75 private String quorum;
76
77
78 private RecoverableZooKeeper recoverableZooKeeper;
79
80
81 protected Abortable abortable;
82
83 private boolean aborted = false;
84
85
86 private final List<ZooKeeperListener> listeners =
87 new CopyOnWriteArrayList<ZooKeeperListener>();
88
89
90
91 public CountDownLatch saslLatch = new CountDownLatch(1);
92
93
94
95
96 public String baseZNode;
97
98 private Map<Integer,String> metaReplicaZnodes = new HashMap<Integer, String>();
99
100 public String rsZNode;
101
102 public String drainingZNode;
103
104 private String masterAddressZNode;
105
106 public String backupMasterAddressesZNode;
107
108 public String clusterStateZNode;
109
110 public String assignmentZNode;
111
112 public String tableZNode;
113
114 public String clusterIdZNode;
115
116 public String splitLogZNode;
117
118 public String balancerZNode;
119
120 private String regionNormalizerZNode;
121
122 public String tableLockZNode;
123
124 public String recoveringRegionsZNode;
125
126 public static String namespaceZNode = "namespace";
127
128
129 public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE =
130 new ArrayList<ACL>() { {
131 add(new ACL(ZooDefs.Perms.READ,ZooDefs.Ids.ANYONE_ID_UNSAFE));
132 add(new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.AUTH_IDS));
133 }};
134
135 public final static String META_ZNODE_PREFIX = "meta-region-server";
136
137 private final Configuration conf;
138
139 private final Exception constructorCaller;
140
141
142 private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
143
144
145
146
147
148
149
150
151 public ZooKeeperWatcher(Configuration conf, String identifier,
152 Abortable abortable) throws ZooKeeperConnectionException, IOException {
153 this(conf, identifier, abortable, false);
154 }
155
156
157
158
159
160
161
162
163
164
165
166
167 public ZooKeeperWatcher(Configuration conf, String identifier,
168 Abortable abortable, boolean canCreateBaseZNode)
169 throws IOException, ZooKeeperConnectionException {
170 this.conf = conf;
171
172
173 try {
174 throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING");
175 } catch (Exception e) {
176 this.constructorCaller = e;
177 }
178 this.quorum = ZKConfig.getZKQuorumServersString(conf);
179 this.prefix = identifier;
180
181
182 this.identifier = identifier + "0x0";
183 this.abortable = abortable;
184 setNodeNames(conf);
185 this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
186 if (canCreateBaseZNode) {
187 createBaseZNodes();
188 }
189 }
190
191 private void createBaseZNodes() throws ZooKeeperConnectionException {
192 try {
193
194 ZKUtil.createWithParents(this, baseZNode);
195 if (conf.getBoolean("hbase.assignment.usezk", true)) {
196 ZKUtil.createAndFailSilent(this, assignmentZNode);
197 }
198 ZKUtil.createAndFailSilent(this, rsZNode);
199 ZKUtil.createAndFailSilent(this, drainingZNode);
200 ZKUtil.createAndFailSilent(this, tableZNode);
201 ZKUtil.createAndFailSilent(this, splitLogZNode);
202 ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode);
203 ZKUtil.createAndFailSilent(this, tableLockZNode);
204 ZKUtil.createAndFailSilent(this, recoveringRegionsZNode);
205 } catch (KeeperException e) {
206 throw new ZooKeeperConnectionException(
207 prefix("Unexpected KeeperException creating base node"), e);
208 }
209 }
210
211
212
213 public boolean isClientReadable(String node) {
214
215
216
217 return
218 node.equals(baseZNode) ||
219 isAnyMetaReplicaZnode(node) ||
220 node.equals(getMasterAddressZNode()) ||
221 node.equals(clusterIdZNode)||
222 node.equals(rsZNode) ||
223
224 node.equals(tableZNode) ||
225 node.startsWith(tableZNode + "/");
226 }
227
228
229
230
231
232
233
234 public void checkAndSetZNodeAcls() {
235 if (!ZKUtil.isSecureZooKeeper(getConfiguration())) {
236 LOG.info("not a secure deployment, proceeding");
237 return;
238 }
239
240
241
242 try {
243 List<ACL> actualAcls = recoverableZooKeeper.getAcl(baseZNode, new Stat());
244
245 if (!isBaseZnodeAclSetup(actualAcls)) {
246 LOG.info("setting znode ACLs");
247 setZnodeAclsRecursive(baseZNode);
248 }
249 } catch(KeeperException.NoNodeException nne) {
250 return;
251 } catch(InterruptedException ie) {
252 interruptedException(ie);
253 } catch (IOException|KeeperException e) {
254 LOG.warn("Received exception while checking and setting zookeeper ACLs", e);
255 }
256 }
257
258
259
260
261
262
263 private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException {
264 List<String> children = recoverableZooKeeper.getChildren(znode, false);
265
266 for (String child : children) {
267 setZnodeAclsRecursive(ZKUtil.joinZNode(znode, child));
268 }
269 List<ACL> acls = ZKUtil.createACL(this, znode, true);
270 LOG.info("Setting ACLs for znode:" + znode + " , acl:" + acls);
271 recoverableZooKeeper.setAcl(znode, acls, -1);
272 }
273
274
275
276
277
278
279
280 private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
281 if (LOG.isDebugEnabled()) {
282 LOG.debug("Checking znode ACLs");
283 }
284 String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
285
286 if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
287 return false;
288 }
289
290
291
292 String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
293
294 if (acls.isEmpty()) {
295 if (LOG.isDebugEnabled()) {
296 LOG.debug("ACL is empty");
297 }
298 return false;
299 }
300
301 for (ACL acl : acls) {
302 int perms = acl.getPerms();
303 Id id = acl.getId();
304
305
306 if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
307 if (perms != Perms.READ) {
308 if (LOG.isDebugEnabled()) {
309 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
310 id, perms, Perms.READ));
311 }
312 return false;
313 }
314 } else if (superUsers != null && isSuperUserId(superUsers, id)) {
315 if (perms != Perms.ALL) {
316 if (LOG.isDebugEnabled()) {
317 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
318 id, perms, Perms.ALL));
319 }
320 return false;
321 }
322 } else if ("sasl".equals(id.getScheme())) {
323 String name = id.getId();
324
325 Matcher match = NAME_PATTERN.matcher(name);
326 if (match.matches()) {
327 name = match.group(1);
328 }
329 if (name.equals(hbaseUser)) {
330 if (perms != Perms.ALL) {
331 if (LOG.isDebugEnabled()) {
332 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
333 id, perms, Perms.ALL));
334 }
335 return false;
336 }
337 } else {
338 if (LOG.isDebugEnabled()) {
339 LOG.debug("Unexpected shortname in SASL ACL: " + id);
340 }
341 return false;
342 }
343 } else {
344 if (LOG.isDebugEnabled()) {
345 LOG.debug("unexpected ACL id '" + id + "'");
346 }
347 return false;
348 }
349 }
350 return true;
351 }
352
353
354
355
356 private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
357 for (String user : superUsers) {
358 boolean hasAccess = false;
359
360 if (!user.startsWith(AuthUtil.GROUP_PREFIX)) {
361 for (ACL acl : acls) {
362 if (user.equals(acl.getId().getId())) {
363 if (acl.getPerms() == Perms.ALL) {
364 hasAccess = true;
365 } else {
366 if (LOG.isDebugEnabled()) {
367 LOG.debug(String.format(
368 "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x",
369 acl.getId().getId(), acl.getPerms(), Perms.ALL));
370 }
371 }
372 break;
373 }
374 }
375 if (!hasAccess) {
376 return false;
377 }
378 }
379 }
380 return true;
381 }
382
383
384
385
386 public static boolean isSuperUserId(String[] superUsers, Id id) {
387 for (String user : superUsers) {
388
389 if (!user.startsWith(AuthUtil.GROUP_PREFIX) && new Id("sasl", user).equals(id)) {
390 return true;
391 }
392 }
393 return false;
394 }
395
396 @Override
397 public String toString() {
398 return this.identifier + ", quorum=" + quorum + ", baseZNode=" + baseZNode;
399 }
400
401
402
403
404
405
406
407 public String prefix(final String str) {
408 return this.toString() + " " + str;
409 }
410
411
412
413
414 private void setNodeNames(Configuration conf) {
415 baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
416 HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
417 metaReplicaZnodes.put(0, ZKUtil.joinZNode(baseZNode,
418 conf.get("zookeeper.znode.metaserver", "meta-region-server")));
419 int numMetaReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
420 HConstants.DEFAULT_META_REPLICA_NUM);
421 for (int i = 1; i < numMetaReplicas; i++) {
422 String str = ZKUtil.joinZNode(baseZNode,
423 conf.get("zookeeper.znode.metaserver", "meta-region-server") + "-" + i);
424 metaReplicaZnodes.put(i, str);
425 }
426 rsZNode = ZKUtil.joinZNode(baseZNode,
427 conf.get("zookeeper.znode.rs", "rs"));
428 drainingZNode = ZKUtil.joinZNode(baseZNode,
429 conf.get("zookeeper.znode.draining.rs", "draining"));
430 masterAddressZNode = ZKUtil.joinZNode(baseZNode,
431 conf.get("zookeeper.znode.master", "master"));
432 backupMasterAddressesZNode = ZKUtil.joinZNode(baseZNode,
433 conf.get("zookeeper.znode.backup.masters", "backup-masters"));
434 clusterStateZNode = ZKUtil.joinZNode(baseZNode,
435 conf.get("zookeeper.znode.state", "running"));
436 assignmentZNode = ZKUtil.joinZNode(baseZNode,
437 conf.get("zookeeper.znode.unassigned", "region-in-transition"));
438 tableZNode = ZKUtil.joinZNode(baseZNode,
439 conf.get("zookeeper.znode.tableEnableDisable", "table"));
440 clusterIdZNode = ZKUtil.joinZNode(baseZNode,
441 conf.get("zookeeper.znode.clusterId", "hbaseid"));
442 splitLogZNode = ZKUtil.joinZNode(baseZNode,
443 conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
444 balancerZNode = ZKUtil.joinZNode(baseZNode,
445 conf.get("zookeeper.znode.balancer", "balancer"));
446 regionNormalizerZNode = ZKUtil.joinZNode(baseZNode,
447 conf.get("zookeeper.znode.regionNormalizer", "normalizer"));
448 tableLockZNode = ZKUtil.joinZNode(baseZNode,
449 conf.get("zookeeper.znode.tableLock", "table-lock"));
450 recoveringRegionsZNode = ZKUtil.joinZNode(baseZNode,
451 conf.get("zookeeper.znode.recovering.regions", "recovering-regions"));
452 namespaceZNode = ZKUtil.joinZNode(baseZNode,
453 conf.get("zookeeper.znode.namespace", "namespace"));
454 }
455
456
457
458
459
460
461 public boolean isAnyMetaReplicaZnode(String node) {
462 if (metaReplicaZnodes.values().contains(node)) {
463 return true;
464 }
465 return false;
466 }
467
468
469
470
471
472
473 public boolean isDefaultMetaReplicaZnode(String node) {
474 if (getZNodeForReplica(HRegionInfo.DEFAULT_REPLICA_ID).equals(node)) {
475 return true;
476 }
477 return false;
478 }
479
480
481
482
483
484
485 public List<String> getMetaReplicaNodes() throws KeeperException {
486 List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, baseZNode);
487 List<String> metaReplicaNodes = new ArrayList<String>(2);
488 String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
489 for (String child : childrenOfBaseNode) {
490 if (child.startsWith(pattern)) metaReplicaNodes.add(child);
491 }
492 return metaReplicaNodes;
493 }
494
495
496
497
498
499
500 public String getZNodeForReplica(int replicaId) {
501 String str = metaReplicaZnodes.get(replicaId);
502
503
504
505 if (str == null) {
506 str = ZKUtil.joinZNode(baseZNode,
507 conf.get("zookeeper.znode.metaserver", "meta-region-server") + "-" + replicaId);
508 }
509 return str;
510 }
511
512
513
514
515
516
517 public int getMetaReplicaIdFromZnode(String znode) {
518 String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
519 if (znode.equals(pattern)) return HRegionInfo.DEFAULT_REPLICA_ID;
520
521 String nonDefaultPattern = pattern + "-";
522 return Integer.parseInt(znode.substring(nonDefaultPattern.length()));
523 }
524
525
526
527
528
529 public void registerListener(ZooKeeperListener listener) {
530 listeners.add(listener);
531 }
532
533
534
535
536
537
538 public void registerListenerFirst(ZooKeeperListener listener) {
539 listeners.add(0, listener);
540 }
541
542 public void unregisterListener(ZooKeeperListener listener) {
543 listeners.remove(listener);
544 }
545
546
547
548
549 public void unregisterAllListeners() {
550 listeners.clear();
551 }
552
553
554
555
556 public List<ZooKeeperListener> getListeners() {
557 return new ArrayList<ZooKeeperListener>(listeners);
558 }
559
560
561
562
563 public int getNumberOfListeners() {
564 return listeners.size();
565 }
566
567
568
569
570
571 public RecoverableZooKeeper getRecoverableZooKeeper() {
572 return recoverableZooKeeper;
573 }
574
575 public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException {
576 recoverableZooKeeper.reconnectAfterExpiration();
577 }
578
579
580
581
582
583 public String getQuorum() {
584 return quorum;
585 }
586
587
588
589
590 public String getBaseZNode() {
591 return baseZNode;
592 }
593
594
595
596
597
598
599
600 @Override
601 public void process(WatchedEvent event) {
602 LOG.debug(prefix("Received ZooKeeper Event, " +
603 "type=" + event.getType() + ", " +
604 "state=" + event.getState() + ", " +
605 "path=" + event.getPath()));
606
607 switch(event.getType()) {
608
609
610 case None: {
611 connectionEvent(event);
612 break;
613 }
614
615
616
617 case NodeCreated: {
618 for(ZooKeeperListener listener : listeners) {
619 listener.nodeCreated(event.getPath());
620 }
621 break;
622 }
623
624 case NodeDeleted: {
625 for(ZooKeeperListener listener : listeners) {
626 listener.nodeDeleted(event.getPath());
627 }
628 break;
629 }
630
631 case NodeDataChanged: {
632 for(ZooKeeperListener listener : listeners) {
633 listener.nodeDataChanged(event.getPath());
634 }
635 break;
636 }
637
638 case NodeChildrenChanged: {
639 for(ZooKeeperListener listener : listeners) {
640 listener.nodeChildrenChanged(event.getPath());
641 }
642 break;
643 }
644 }
645 }
646
647
648
649
650
651
652
653
654
655
656
657
658
659 private void connectionEvent(WatchedEvent event) {
660 switch(event.getState()) {
661 case SyncConnected:
662
663
664 long finished = System.currentTimeMillis() +
665 this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
666 while (System.currentTimeMillis() < finished) {
667 try {
668 Thread.sleep(1);
669 } catch (InterruptedException e) {
670 LOG.warn("Interrupted while sleeping");
671 throw new RuntimeException("Interrupted while waiting for" +
672 " recoverableZooKeeper is set");
673 }
674 if (this.recoverableZooKeeper != null) break;
675 }
676
677 if (this.recoverableZooKeeper == null) {
678 LOG.error("ZK is null on connection event -- see stack trace " +
679 "for the stack trace when constructor was called on this zkw",
680 this.constructorCaller);
681 throw new NullPointerException("ZK is null");
682 }
683 this.identifier = this.prefix + "-0x" +
684 Long.toHexString(this.recoverableZooKeeper.getSessionId());
685
686 LOG.debug(this.identifier + " connected");
687 break;
688
689
690 case Disconnected:
691 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
692 break;
693
694 case Expired:
695 String msg = prefix(this.identifier + " received expired from " +
696 "ZooKeeper, aborting");
697
698
699 if (this.abortable != null) {
700 this.abortable.abort(msg, new KeeperException.SessionExpiredException());
701 }
702 break;
703
704 case ConnectedReadOnly:
705 case SaslAuthenticated:
706 case AuthFailed:
707 break;
708
709 default:
710 throw new IllegalStateException("Received event is not valid: " + event.getState());
711 }
712 }
713
714
715
716
717
718
719
720
721
722
723
724
725
726 public void sync(String path) throws KeeperException {
727 this.recoverableZooKeeper.sync(path, null, null);
728 }
729
730
731
732
733
734
735
736
737
738
739
740 public void keeperException(KeeperException ke)
741 throws KeeperException {
742 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
743 throw ke;
744 }
745
746
747
748
749
750
751
752
753
754
755
756
757 public void interruptedException(InterruptedException ie) {
758 LOG.debug(prefix("Received InterruptedException, doing nothing here"), ie);
759
760 Thread.currentThread().interrupt();
761
762 }
763
764
765
766
767
768 @Override
769 public void close() {
770 try {
771 if (recoverableZooKeeper != null) {
772 recoverableZooKeeper.close();
773 }
774 } catch (InterruptedException e) {
775 Thread.currentThread().interrupt();
776 }
777 }
778
779 public Configuration getConfiguration() {
780 return conf;
781 }
782
783 @Override
784 public void abort(String why, Throwable e) {
785 if (this.abortable != null) this.abortable.abort(why, e);
786 else this.aborted = true;
787 }
788
789 @Override
790 public boolean isAborted() {
791 return this.abortable == null? this.aborted: this.abortable.isAborted();
792 }
793
794
795
796
797 public String getMasterAddressZNode() {
798 return this.masterAddressZNode;
799 }
800
801
802
803
804 public String getRegionNormalizerZNode() {
805 return regionNormalizerZNode;
806 }
807 }