1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.io.IOException;
22 import java.lang.management.ManagementFactory;
23 import java.security.SecureRandom;
24 import java.util.ArrayList;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Random;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.util.Bytes;
33 import org.apache.hadoop.hbase.util.RetryCounter;
34 import org.apache.hadoop.hbase.util.RetryCounterFactory;
35 import org.apache.zookeeper.AsyncCallback;
36 import org.apache.zookeeper.CreateMode;
37 import org.apache.zookeeper.KeeperException;
38 import org.apache.zookeeper.Op;
39 import org.apache.zookeeper.OpResult;
40 import org.apache.zookeeper.Watcher;
41 import org.apache.zookeeper.ZooDefs;
42 import org.apache.zookeeper.ZooKeeper;
43 import org.apache.zookeeper.ZooKeeper.States;
44 import org.apache.zookeeper.data.ACL;
45 import org.apache.zookeeper.data.Stat;
46 import org.apache.zookeeper.proto.CreateRequest;
47 import org.apache.zookeeper.proto.SetDataRequest;
48 import org.cloudera.htrace.Trace;
49 import org.cloudera.htrace.TraceScope;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 @InterfaceAudience.Private
75 public class RecoverableZooKeeper {
76 private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
77
78 volatile private ZooKeeper zk;
79 private final RetryCounterFactory retryCounterFactory;
80
81 private final String identifier;
82 private final byte[] id;
83 private Watcher watcher;
84 private int sessionTimeout;
85 private String quorumServers;
86 private final Random salter;
87
88
89
90
91
92
93
94
95
96 private static final byte MAGIC =(byte) 0XFF;
97 private static final int MAGIC_SIZE = Bytes.SIZEOF_BYTE;
98 private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
99 private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT;
100
101 public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
102 Watcher watcher, int maxRetries, int retryIntervalMillis)
103 throws IOException {
104 this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis,
105 null);
106 }
107
108 public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
109 Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
110 throws IOException {
111
112 this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
113 this.retryCounterFactory =
114 new RetryCounterFactory(maxRetries+1, retryIntervalMillis);
115
116 if (identifier == null || identifier.length() == 0) {
117
118 identifier = ManagementFactory.getRuntimeMXBean().getName();
119 }
120 LOG.info("Process identifier=" + identifier +
121 " connecting to ZooKeeper ensemble=" + quorumServers);
122 this.identifier = identifier;
123 this.id = Bytes.toBytes(identifier);
124
125 this.watcher = watcher;
126 this.sessionTimeout = sessionTimeout;
127 this.quorumServers = quorumServers;
128 salter = new SecureRandom();
129 }
130
131 public void reconnectAfterExpiration()
132 throws IOException, InterruptedException {
133 LOG.info("Closing dead ZooKeeper connection, session" +
134 " was: 0x"+Long.toHexString(zk.getSessionId()));
135 zk.close();
136 this.zk = new ZooKeeper(this.quorumServers,
137 this.sessionTimeout, this.watcher);
138 LOG.info("Recreated a ZooKeeper, session" +
139 " is: 0x"+Long.toHexString(zk.getSessionId()));
140 }
141
142
143
144
145
146
147 public void delete(String path, int version)
148 throws InterruptedException, KeeperException {
149 TraceScope traceScope = null;
150 try {
151 traceScope = Trace.startSpan("RecoverableZookeeper.delete");
152 RetryCounter retryCounter = retryCounterFactory.create();
153 boolean isRetry = false;
154 while (true) {
155 try {
156 zk.delete(path, version);
157 return;
158 } catch (KeeperException e) {
159 switch (e.code()) {
160 case NONODE:
161 if (isRetry) {
162 LOG.info("Node " + path + " already deleted. Assuming a " +
163 "previous attempt succeeded.");
164 return;
165 }
166 LOG.warn("Node " + path + " already deleted, retry=" + isRetry);
167 throw e;
168
169 case CONNECTIONLOSS:
170 case SESSIONEXPIRED:
171 case OPERATIONTIMEOUT:
172 retryOrThrow(retryCounter, e, "delete");
173 break;
174
175 default:
176 throw e;
177 }
178 }
179 retryCounter.sleepUntilNextRetry();
180 isRetry = true;
181 }
182 } finally {
183 if (traceScope != null) traceScope.close();
184 }
185 }
186
187
188
189
190
191 public Stat exists(String path, Watcher watcher)
192 throws KeeperException, InterruptedException {
193 TraceScope traceScope = null;
194 try {
195 traceScope = Trace.startSpan("RecoverableZookeeper.exists");
196 RetryCounter retryCounter = retryCounterFactory.create();
197 while (true) {
198 try {
199 return zk.exists(path, watcher);
200 } catch (KeeperException e) {
201 switch (e.code()) {
202 case CONNECTIONLOSS:
203 case SESSIONEXPIRED:
204 case OPERATIONTIMEOUT:
205 retryOrThrow(retryCounter, e, "exists");
206 break;
207
208 default:
209 throw e;
210 }
211 }
212 retryCounter.sleepUntilNextRetry();
213 }
214 } finally {
215 if (traceScope != null) traceScope.close();
216 }
217 }
218
219
220
221
222
223 public Stat exists(String path, boolean watch)
224 throws KeeperException, InterruptedException {
225 TraceScope traceScope = null;
226 try {
227 traceScope = Trace.startSpan("RecoverableZookeeper.exists");
228 RetryCounter retryCounter = retryCounterFactory.create();
229 while (true) {
230 try {
231 return zk.exists(path, watch);
232 } catch (KeeperException e) {
233 switch (e.code()) {
234 case CONNECTIONLOSS:
235 case SESSIONEXPIRED:
236 case OPERATIONTIMEOUT:
237 retryOrThrow(retryCounter, e, "exists");
238 break;
239
240 default:
241 throw e;
242 }
243 }
244 retryCounter.sleepUntilNextRetry();
245 }
246 } finally {
247 if (traceScope != null) traceScope.close();
248 }
249 }
250
251 private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
252 String opName) throws KeeperException {
253 LOG.warn("Possibly transient ZooKeeper, quorum=" + quorumServers + ", exception=" + e);
254 if (!retryCounter.shouldRetry()) {
255 LOG.error("ZooKeeper " + opName + " failed after "
256 + retryCounter.getMaxAttempts() + " attempts");
257 throw e;
258 }
259 }
260
261
262
263
264
265 public List<String> getChildren(String path, Watcher watcher)
266 throws KeeperException, InterruptedException {
267 TraceScope traceScope = null;
268 try {
269 traceScope = Trace.startSpan("RecoverableZookeeper.getChildren");
270 RetryCounter retryCounter = retryCounterFactory.create();
271 while (true) {
272 try {
273 return zk.getChildren(path, watcher);
274 } catch (KeeperException e) {
275 switch (e.code()) {
276 case CONNECTIONLOSS:
277 case SESSIONEXPIRED:
278 case OPERATIONTIMEOUT:
279 retryOrThrow(retryCounter, e, "getChildren");
280 break;
281
282 default:
283 throw e;
284 }
285 }
286 retryCounter.sleepUntilNextRetry();
287 }
288 } finally {
289 if (traceScope != null) traceScope.close();
290 }
291 }
292
293
294
295
296
297 public List<String> getChildren(String path, boolean watch)
298 throws KeeperException, InterruptedException {
299 TraceScope traceScope = null;
300 try {
301 traceScope = Trace.startSpan("RecoverableZookeeper.getChildren");
302 RetryCounter retryCounter = retryCounterFactory.create();
303 while (true) {
304 try {
305 return zk.getChildren(path, watch);
306 } catch (KeeperException e) {
307 switch (e.code()) {
308 case CONNECTIONLOSS:
309 case SESSIONEXPIRED:
310 case OPERATIONTIMEOUT:
311 retryOrThrow(retryCounter, e, "getChildren");
312 break;
313
314 default:
315 throw e;
316 }
317 }
318 retryCounter.sleepUntilNextRetry();
319 }
320 } finally {
321 if (traceScope != null) traceScope.close();
322 }
323 }
324
325
326
327
328
329 public byte[] getData(String path, Watcher watcher, Stat stat)
330 throws KeeperException, InterruptedException {
331 TraceScope traceScope = null;
332 try {
333 traceScope = Trace.startSpan("RecoverableZookeeper.getData");
334 RetryCounter retryCounter = retryCounterFactory.create();
335 while (true) {
336 try {
337 byte[] revData = zk.getData(path, watcher, stat);
338 return this.removeMetaData(revData);
339 } catch (KeeperException e) {
340 switch (e.code()) {
341 case CONNECTIONLOSS:
342 case SESSIONEXPIRED:
343 case OPERATIONTIMEOUT:
344 retryOrThrow(retryCounter, e, "getData");
345 break;
346
347 default:
348 throw e;
349 }
350 }
351 retryCounter.sleepUntilNextRetry();
352 }
353 } finally {
354 if (traceScope != null) traceScope.close();
355 }
356 }
357
358
359
360
361
362 public byte[] getData(String path, boolean watch, Stat stat)
363 throws KeeperException, InterruptedException {
364 TraceScope traceScope = null;
365 try {
366 traceScope = Trace.startSpan("RecoverableZookeeper.getData");
367 RetryCounter retryCounter = retryCounterFactory.create();
368 while (true) {
369 try {
370 byte[] revData = zk.getData(path, watch, stat);
371 return this.removeMetaData(revData);
372 } catch (KeeperException e) {
373 switch (e.code()) {
374 case CONNECTIONLOSS:
375 case SESSIONEXPIRED:
376 case OPERATIONTIMEOUT:
377 retryOrThrow(retryCounter, e, "getData");
378 break;
379
380 default:
381 throw e;
382 }
383 }
384 retryCounter.sleepUntilNextRetry();
385 }
386 } finally {
387 if (traceScope != null) traceScope.close();
388 }
389 }
390
391
392
393
394
395
396
397 public Stat setData(String path, byte[] data, int version)
398 throws KeeperException, InterruptedException {
399 TraceScope traceScope = null;
400 try {
401 traceScope = Trace.startSpan("RecoverableZookeeper.setData");
402 RetryCounter retryCounter = retryCounterFactory.create();
403 byte[] newData = appendMetaData(data);
404 boolean isRetry = false;
405 while (true) {
406 try {
407 return zk.setData(path, newData, version);
408 } catch (KeeperException e) {
409 switch (e.code()) {
410 case CONNECTIONLOSS:
411 case SESSIONEXPIRED:
412 case OPERATIONTIMEOUT:
413 retryOrThrow(retryCounter, e, "setData");
414 break;
415 case BADVERSION:
416 if (isRetry) {
417
418 try{
419 Stat stat = new Stat();
420 byte[] revData = zk.getData(path, false, stat);
421 if(Bytes.compareTo(revData, newData) == 0) {
422
423 return stat;
424 }
425 } catch(KeeperException keeperException){
426
427 throw keeperException;
428 }
429 }
430
431 default:
432 throw e;
433 }
434 }
435 retryCounter.sleepUntilNextRetry();
436 isRetry = true;
437 }
438 } finally {
439 if (traceScope != null) traceScope.close();
440 }
441 }
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458 public String create(String path, byte[] data, List<ACL> acl,
459 CreateMode createMode)
460 throws KeeperException, InterruptedException {
461 TraceScope traceScope = null;
462 try {
463 traceScope = Trace.startSpan("RecoverableZookeeper.create");
464 byte[] newData = appendMetaData(data);
465 switch (createMode) {
466 case EPHEMERAL:
467 case PERSISTENT:
468 return createNonSequential(path, newData, acl, createMode);
469
470 case EPHEMERAL_SEQUENTIAL:
471 case PERSISTENT_SEQUENTIAL:
472 return createSequential(path, newData, acl, createMode);
473
474 default:
475 throw new IllegalArgumentException("Unrecognized CreateMode: " +
476 createMode);
477 }
478 } finally {
479 if (traceScope != null) traceScope.close();
480 }
481 }
482
483 private String createNonSequential(String path, byte[] data, List<ACL> acl,
484 CreateMode createMode) throws KeeperException, InterruptedException {
485 RetryCounter retryCounter = retryCounterFactory.create();
486 boolean isRetry = false;
487 while (true) {
488 try {
489 return zk.create(path, data, acl, createMode);
490 } catch (KeeperException e) {
491 switch (e.code()) {
492 case NODEEXISTS:
493 if (isRetry) {
494
495
496
497 byte[] currentData = zk.getData(path, false, null);
498 if (currentData != null &&
499 Bytes.compareTo(currentData, data) == 0) {
500
501 return path;
502 }
503 LOG.error("Node " + path + " already exists with " +
504 Bytes.toStringBinary(currentData) + ", could not write " +
505 Bytes.toStringBinary(data));
506 throw e;
507 }
508 LOG.info("Node " + path + " already exists and this is not a " +
509 "retry");
510 throw e;
511
512 case CONNECTIONLOSS:
513 case SESSIONEXPIRED:
514 case OPERATIONTIMEOUT:
515 retryOrThrow(retryCounter, e, "create");
516 break;
517
518 default:
519 throw e;
520 }
521 }
522 retryCounter.sleepUntilNextRetry();
523 isRetry = true;
524 }
525 }
526
527 private String createSequential(String path, byte[] data,
528 List<ACL> acl, CreateMode createMode)
529 throws KeeperException, InterruptedException {
530 RetryCounter retryCounter = retryCounterFactory.create();
531 boolean first = true;
532 String newPath = path+this.identifier;
533 while (true) {
534 try {
535 if (!first) {
536
537 String previousResult = findPreviousSequentialNode(newPath);
538 if (previousResult != null) {
539 return previousResult;
540 }
541 }
542 first = false;
543 return zk.create(newPath, data, acl, createMode);
544 } catch (KeeperException e) {
545 switch (e.code()) {
546 case CONNECTIONLOSS:
547 case SESSIONEXPIRED:
548 case OPERATIONTIMEOUT:
549 retryOrThrow(retryCounter, e, "create");
550 break;
551
552 default:
553 throw e;
554 }
555 }
556 retryCounter.sleepUntilNextRetry();
557 }
558 }
559
560
561
562
563 private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
564 throws UnsupportedOperationException {
565 if(ops == null) return null;
566
567 List<Op> preparedOps = new LinkedList<Op>();
568 for (Op op : ops) {
569 if (op.getType() == ZooDefs.OpCode.create) {
570 CreateRequest create = (CreateRequest)op.toRequestRecord();
571 preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
572 create.getAcl(), create.getFlags()));
573 } else if (op.getType() == ZooDefs.OpCode.delete) {
574
575 preparedOps.add(op);
576 } else if (op.getType() == ZooDefs.OpCode.setData) {
577 SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
578 preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
579 setData.getVersion()));
580 } else {
581 throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
582 }
583 }
584 return preparedOps;
585 }
586
587
588
589
590 public List<OpResult> multi(Iterable<Op> ops)
591 throws KeeperException, InterruptedException {
592 TraceScope traceScope = null;
593 try {
594 traceScope = Trace.startSpan("RecoverableZookeeper.multi");
595 RetryCounter retryCounter = retryCounterFactory.create();
596 Iterable<Op> multiOps = prepareZKMulti(ops);
597 while (true) {
598 try {
599 return zk.multi(multiOps);
600 } catch (KeeperException e) {
601 switch (e.code()) {
602 case CONNECTIONLOSS:
603 case SESSIONEXPIRED:
604 case OPERATIONTIMEOUT:
605 retryOrThrow(retryCounter, e, "multi");
606 break;
607
608 default:
609 throw e;
610 }
611 }
612 retryCounter.sleepUntilNextRetry();
613 }
614 } finally {
615 if (traceScope != null) traceScope.close();
616 }
617 }
618
619 private String findPreviousSequentialNode(String path)
620 throws KeeperException, InterruptedException {
621 int lastSlashIdx = path.lastIndexOf('/');
622 assert(lastSlashIdx != -1);
623 String parent = path.substring(0, lastSlashIdx);
624 String nodePrefix = path.substring(lastSlashIdx+1);
625
626 List<String> nodes = zk.getChildren(parent, false);
627 List<String> matching = filterByPrefix(nodes, nodePrefix);
628 for (String node : matching) {
629 String nodePath = parent + "/" + node;
630 Stat stat = zk.exists(nodePath, false);
631 if (stat != null) {
632 return nodePath;
633 }
634 }
635 return null;
636 }
637
638 public byte[] removeMetaData(byte[] data) {
639 if(data == null || data.length == 0) {
640 return data;
641 }
642
643 byte magic = data[0];
644 if(magic != MAGIC) {
645 return data;
646 }
647
648 int idLength = Bytes.toInt(data, ID_LENGTH_OFFSET);
649 int dataLength = data.length-MAGIC_SIZE-ID_LENGTH_SIZE-idLength;
650 int dataOffset = MAGIC_SIZE+ID_LENGTH_SIZE+idLength;
651
652 byte[] newData = new byte[dataLength];
653 System.arraycopy(data, dataOffset, newData, 0, dataLength);
654 return newData;
655 }
656
657 private byte[] appendMetaData(byte[] data) {
658 if(data == null || data.length == 0){
659 return data;
660 }
661 byte[] salt = Bytes.toBytes(salter.nextLong());
662 int idLength = id.length + salt.length;
663 byte[] newData = new byte[MAGIC_SIZE+ID_LENGTH_SIZE+idLength+data.length];
664 int pos = 0;
665 pos = Bytes.putByte(newData, pos, MAGIC);
666 pos = Bytes.putInt(newData, pos, idLength);
667 pos = Bytes.putBytes(newData, pos, id, 0, id.length);
668 pos = Bytes.putBytes(newData, pos, salt, 0, salt.length);
669 pos = Bytes.putBytes(newData, pos, data, 0, data.length);
670 return newData;
671 }
672
673 public long getSessionId() {
674 return zk.getSessionId();
675 }
676
677 public void close() throws InterruptedException {
678 zk.close();
679 }
680
681 public States getState() {
682 return zk.getState();
683 }
684
685 public ZooKeeper getZooKeeper() {
686 return zk;
687 }
688
689 public byte[] getSessionPasswd() {
690 return zk.getSessionPasswd();
691 }
692
693 public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) {
694 this.zk.sync(path, null, null);
695 }
696
697
698
699
700
701
702
703
704
705
706 private static List<String> filterByPrefix(List<String> nodes,
707 String... prefixes) {
708 List<String> lockChildren = new ArrayList<String>();
709 for (String child : nodes){
710 for (String prefix : prefixes){
711 if (child.startsWith(prefix)){
712 lockChildren.add(child);
713 break;
714 }
715 }
716 }
717 return lockChildren;
718 }
719
720 public String getIdentifier() {
721 return identifier;
722 }
723 }