1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.concurrent.CountDownLatch;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.Abortable;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
35 import org.apache.hadoop.hbase.util.Threads;
36 import org.apache.zookeeper.KeeperException;
37 import org.apache.zookeeper.WatchedEvent;
38 import org.apache.zookeeper.Watcher;
39 import org.apache.zookeeper.ZooDefs;
40 import org.apache.zookeeper.data.ACL;
41
42
43
44
45
46
47
48
49
50
51
52
53 @InterfaceAudience.Private
54 public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
55 private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
56
57
58
59 private String identifier;
60
61
62 private String quorum;
63
64
65 private RecoverableZooKeeper recoverableZooKeeper;
66
67
68 protected Abortable abortable;
69
70 private boolean aborted = false;
71
72
73 private final List<ZooKeeperListener> listeners =
74 new CopyOnWriteArrayList<ZooKeeperListener>();
75
76
77
78 public CountDownLatch saslLatch = new CountDownLatch(1);
79
80
81
82
83 public String baseZNode;
84
85 public String metaServerZNode;
86
87 public String rsZNode;
88
89 public String drainingZNode;
90
91 private String masterAddressZNode;
92
93 public String backupMasterAddressesZNode;
94
95 public String clusterStateZNode;
96
97 public String assignmentZNode;
98
99 public String tableZNode;
100
101 public String clusterIdZNode;
102
103 public String splitLogZNode;
104
105 public String balancerZNode;
106
107 public String tableLockZNode;
108
109 public String recoveringRegionsZNode;
110
111 public static String namespaceZNode = "namespace";
112
113
114
115 public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE =
116 new ArrayList<ACL>() { {
117 add(new ACL(ZooDefs.Perms.READ,ZooDefs.Ids.ANYONE_ID_UNSAFE));
118 add(new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.AUTH_IDS));
119 }};
120
121 private final Configuration conf;
122
123 private final Exception constructorCaller;
124
125
126
127
128
129
130
131
132 public ZooKeeperWatcher(Configuration conf, String identifier,
133 Abortable abortable) throws ZooKeeperConnectionException, IOException {
134 this(conf, identifier, abortable, false);
135 }
136
137
138
139
140
141
142
143
144
145
146
147
148 public ZooKeeperWatcher(Configuration conf, String identifier,
149 Abortable abortable, boolean canCreateBaseZNode)
150 throws IOException, ZooKeeperConnectionException {
151 this.conf = conf;
152
153
154 try {
155 throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING");
156 } catch (Exception e) {
157 this.constructorCaller = e;
158 }
159 this.quorum = ZKConfig.getZKQuorumServersString(conf);
160
161
162 this.identifier = identifier;
163 this.abortable = abortable;
164 setNodeNames(conf);
165 this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
166 if (canCreateBaseZNode) {
167 createBaseZNodes();
168 }
169 }
170
171 private void createBaseZNodes() throws ZooKeeperConnectionException {
172 try {
173
174 ZKUtil.createWithParents(this, baseZNode);
175 ZKUtil.createAndFailSilent(this, assignmentZNode);
176 ZKUtil.createAndFailSilent(this, rsZNode);
177 ZKUtil.createAndFailSilent(this, drainingZNode);
178 ZKUtil.createAndFailSilent(this, tableZNode);
179 ZKUtil.createAndFailSilent(this, splitLogZNode);
180 ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode);
181 ZKUtil.createAndFailSilent(this, tableLockZNode);
182 ZKUtil.createAndFailSilent(this, recoveringRegionsZNode);
183 } catch (KeeperException e) {
184 throw new ZooKeeperConnectionException(
185 prefix("Unexpected KeeperException creating base node"), e);
186 }
187 }
188
189 @Override
190 public String toString() {
191 return this.identifier + ", quorum=" + quorum + ", baseZNode=" + baseZNode;
192 }
193
194
195
196
197
198
199
200 public String prefix(final String str) {
201 return this.toString() + " " + str;
202 }
203
204
205
206
207 private void setNodeNames(Configuration conf) {
208 baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
209 HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
210 metaServerZNode = ZKUtil.joinZNode(baseZNode,
211 conf.get("zookeeper.znode.metaserver", "meta-region-server"));
212 rsZNode = ZKUtil.joinZNode(baseZNode,
213 conf.get("zookeeper.znode.rs", "rs"));
214 drainingZNode = ZKUtil.joinZNode(baseZNode,
215 conf.get("zookeeper.znode.draining.rs", "draining"));
216 masterAddressZNode = ZKUtil.joinZNode(baseZNode,
217 conf.get("zookeeper.znode.master", "master"));
218 backupMasterAddressesZNode = ZKUtil.joinZNode(baseZNode,
219 conf.get("zookeeper.znode.backup.masters", "backup-masters"));
220 clusterStateZNode = ZKUtil.joinZNode(baseZNode,
221 conf.get("zookeeper.znode.state", "running"));
222 assignmentZNode = ZKUtil.joinZNode(baseZNode,
223 conf.get("zookeeper.znode.unassigned", "region-in-transition"));
224 tableZNode = ZKUtil.joinZNode(baseZNode,
225 conf.get("zookeeper.znode.tableEnableDisable", "table"));
226 clusterIdZNode = ZKUtil.joinZNode(baseZNode,
227 conf.get("zookeeper.znode.clusterId", "hbaseid"));
228 splitLogZNode = ZKUtil.joinZNode(baseZNode,
229 conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
230 balancerZNode = ZKUtil.joinZNode(baseZNode,
231 conf.get("zookeeper.znode.balancer", "balancer"));
232 tableLockZNode = ZKUtil.joinZNode(baseZNode,
233 conf.get("zookeeper.znode.tableLock", "table-lock"));
234 recoveringRegionsZNode = ZKUtil.joinZNode(baseZNode,
235 conf.get("zookeeper.znode.recovering.regions", "recovering-regions"));
236 namespaceZNode = ZKUtil.joinZNode(baseZNode,
237 conf.get("zookeeper.znode.namespace", "namespace"));
238 }
239
240
241
242
243
244 public void registerListener(ZooKeeperListener listener) {
245 listeners.add(listener);
246 }
247
248
249
250
251
252
253 public void registerListenerFirst(ZooKeeperListener listener) {
254 listeners.add(0, listener);
255 }
256
257 public void unregisterListener(ZooKeeperListener listener) {
258 listeners.remove(listener);
259 }
260
261
262
263
264 public void unregisterAllListeners() {
265 listeners.clear();
266 }
267
268
269
270
271 public List<ZooKeeperListener> getListeners() {
272 return new ArrayList<ZooKeeperListener>(listeners);
273 }
274
275
276
277
278 public int getNumberOfListeners() {
279 return listeners.size();
280 }
281
282
283
284
285
286 public RecoverableZooKeeper getRecoverableZooKeeper() {
287 return recoverableZooKeeper;
288 }
289
290 public void reconnectAfterExpiration() throws IOException, InterruptedException {
291 recoverableZooKeeper.reconnectAfterExpiration();
292 }
293
294
295
296
297
298 public String getQuorum() {
299 return quorum;
300 }
301
302
303
304
305
306
307
308 @Override
309 public void process(WatchedEvent event) {
310 LOG.debug(prefix("Received ZooKeeper Event, " +
311 "type=" + event.getType() + ", " +
312 "state=" + event.getState() + ", " +
313 "path=" + event.getPath()));
314
315 switch(event.getType()) {
316
317
318 case None: {
319 connectionEvent(event);
320 break;
321 }
322
323
324
325 case NodeCreated: {
326 for(ZooKeeperListener listener : listeners) {
327 listener.nodeCreated(event.getPath());
328 }
329 break;
330 }
331
332 case NodeDeleted: {
333 for(ZooKeeperListener listener : listeners) {
334 listener.nodeDeleted(event.getPath());
335 }
336 break;
337 }
338
339 case NodeDataChanged: {
340 for(ZooKeeperListener listener : listeners) {
341 listener.nodeDataChanged(event.getPath());
342 }
343 break;
344 }
345
346 case NodeChildrenChanged: {
347 for(ZooKeeperListener listener : listeners) {
348 listener.nodeChildrenChanged(event.getPath());
349 }
350 break;
351 }
352 }
353 }
354
355
356
357
358
359
360
361
362
363
364
365
366
367 private void connectionEvent(WatchedEvent event) {
368 switch(event.getState()) {
369 case SyncConnected:
370
371
372 long finished = System.currentTimeMillis() +
373 this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
374 while (System.currentTimeMillis() < finished) {
375 Threads.sleep(1);
376 if (this.recoverableZooKeeper != null) break;
377 }
378 if (this.recoverableZooKeeper == null) {
379 LOG.error("ZK is null on connection event -- see stack trace " +
380 "for the stack trace when constructor was called on this zkw",
381 this.constructorCaller);
382 throw new NullPointerException("ZK is null");
383 }
384 this.identifier = this.identifier + "-0x" +
385 Long.toHexString(this.recoverableZooKeeper.getSessionId());
386
387 LOG.debug(this.identifier + " connected");
388 break;
389
390
391 case Disconnected:
392 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
393 break;
394
395 case Expired:
396 String msg = prefix(this.identifier + " received expired from " +
397 "ZooKeeper, aborting");
398
399
400 if (this.abortable != null) {
401 this.abortable.abort(msg, new KeeperException.SessionExpiredException());
402 }
403 break;
404
405 case ConnectedReadOnly:
406 case SaslAuthenticated:
407 case AuthFailed:
408 break;
409
410 default:
411 throw new IllegalStateException("Received event is not valid: " + event.getState());
412 }
413 }
414
415
416
417
418
419
420
421
422
423
424
425
426
427 public void sync(String path) {
428 this.recoverableZooKeeper.sync(path, null, null);
429 }
430
431
432
433
434
435
436
437
438
439
440
441 public void keeperException(KeeperException ke)
442 throws KeeperException {
443 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
444 throw ke;
445 }
446
447
448
449
450
451
452
453
454
455
456
457
458 public void interruptedException(InterruptedException ie) {
459 LOG.debug(prefix("Received InterruptedException, doing nothing here"), ie);
460
461 Thread.currentThread().interrupt();
462
463 }
464
465
466
467
468
469
470 public void close() {
471 try {
472 if (recoverableZooKeeper != null) {
473 recoverableZooKeeper.close();
474 }
475 } catch (InterruptedException e) {
476 Thread.currentThread().interrupt();
477 }
478 }
479
480 public Configuration getConfiguration() {
481 return conf;
482 }
483
484 @Override
485 public void abort(String why, Throwable e) {
486 if (this.abortable != null) this.abortable.abort(why, e);
487 else this.aborted = true;
488 }
489
490 @Override
491 public boolean isAborted() {
492 return this.abortable == null? this.aborted: this.abortable.isAborted();
493 }
494
495
496
497
498 public String getMasterAddressZNode() {
499 return this.masterAddressZNode;
500 }
501
502 }