1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.io.IOException;
22 import java.lang.management.ManagementFactory;
23 import java.util.ArrayList;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Random;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.util.Bytes;
32 import org.apache.hadoop.hbase.util.RetryCounter;
33 import org.apache.hadoop.hbase.util.RetryCounterFactory;
34 import org.apache.zookeeper.AsyncCallback;
35 import org.apache.zookeeper.CreateMode;
36 import org.apache.zookeeper.KeeperException;
37 import org.apache.zookeeper.Op;
38 import org.apache.zookeeper.OpResult;
39 import org.apache.zookeeper.Watcher;
40 import org.apache.zookeeper.ZooDefs;
41 import org.apache.zookeeper.ZooKeeper;
42 import org.apache.zookeeper.ZooKeeper.States;
43 import org.apache.zookeeper.data.ACL;
44 import org.apache.zookeeper.data.Stat;
45 import org.apache.zookeeper.proto.CreateRequest;
46 import org.apache.zookeeper.proto.SetDataRequest;
47 import org.apache.htrace.Trace;
48 import org.apache.htrace.TraceScope;
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 @InterfaceAudience.Private
74 public class RecoverableZooKeeper {
75 private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
76
77 private ZooKeeper zk;
78 private final RetryCounterFactory retryCounterFactory;
79
80 private final String identifier;
81 private final byte[] id;
82 private Watcher watcher;
83 private int sessionTimeout;
84 private String quorumServers;
85 private final Random salter;
86
87
88
89
90
91
92
93
94
95 private static final byte MAGIC =(byte) 0XFF;
96 private static final int MAGIC_SIZE = Bytes.SIZEOF_BYTE;
97 private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
98 private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT;
99
100 public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
101 Watcher watcher, int maxRetries, int retryIntervalMillis)
102 throws IOException {
103 this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis,
104 null);
105 }
106
107 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
108 justification="None. Its always been this way.")
109 public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
110 Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
111 throws IOException {
112
113 this.retryCounterFactory =
114 new RetryCounterFactory(maxRetries+1, retryIntervalMillis);
115
116 if (identifier == null || identifier.length() == 0) {
117
118 identifier = ManagementFactory.getRuntimeMXBean().getName();
119 }
120 LOG.info("Process identifier=" + identifier +
121 " connecting to ZooKeeper ensemble=" + quorumServers);
122 this.identifier = identifier;
123 this.id = Bytes.toBytes(identifier);
124
125 this.watcher = watcher;
126 this.sessionTimeout = sessionTimeout;
127 this.quorumServers = quorumServers;
128 try {checkZk();} catch (Exception x) {
129 salter = new Random();
130 }
131
132
133
134
135
136
137
138 protected synchronized ZooKeeper checkZk() throws KeeperException {
139 if (this.zk == null) {
140 try {
141 this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
142 } catch (IOException ex) {
143 LOG.warn("Unable to create ZooKeeper Connection", ex);
144 throw new KeeperException.OperationTimeoutException();
145 }
146 }
147 return zk;
148 }
149
150 public synchronized void reconnectAfterExpiration()
151 throws IOException, KeeperException, InterruptedException {
152 if (zk != null) {
153 LOG.info("Closing dead ZooKeeper connection, session" +
154 " was: 0x"+Long.toHexString(zk.getSessionId()));
155 zk.close();
156
157 zk = null;
158 }
159 checkZk();
160 LOG.info("Recreated a ZooKeeper, session" +
161 " is: 0x"+Long.toHexString(zk.getSessionId()));
162 }
163
164
165
166
167
168
169 public void delete(String path, int version)
170 throws InterruptedException, KeeperException {
171 TraceScope traceScope = null;
172 try {
173 traceScope = Trace.startSpan("RecoverableZookeeper.delete");
174 RetryCounter retryCounter = retryCounterFactory.create();
175 boolean isRetry = false;
176 while (true) {
177 try {
178 checkZk().delete(path, version);
179 return;
180 } catch (KeeperException e) {
181 switch (e.code()) {
182 case NONODE:
183 if (isRetry) {
184 LOG.debug("Node " + path + " already deleted. Assuming a " +
185 "previous attempt succeeded.");
186 return;
187 }
188 LOG.debug("Node " + path + " already deleted, retry=" + isRetry);
189 throw e;
190
191 case CONNECTIONLOSS:
192 case SESSIONEXPIRED:
193 case OPERATIONTIMEOUT:
194 retryOrThrow(retryCounter, e, "delete");
195 break;
196
197 default:
198 throw e;
199 }
200 }
201 retryCounter.sleepUntilNextRetry();
202 isRetry = true;
203 }
204 } finally {
205 if (traceScope != null) traceScope.close();
206 }
207 }
208
209
210
211
212
213 public Stat exists(String path, Watcher watcher)
214 throws KeeperException, InterruptedException {
215 TraceScope traceScope = null;
216 try {
217 traceScope = Trace.startSpan("RecoverableZookeeper.exists");
218 RetryCounter retryCounter = retryCounterFactory.create();
219 while (true) {
220 try {
221 return checkZk().exists(path, watcher);
222 } catch (KeeperException e) {
223 switch (e.code()) {
224 case CONNECTIONLOSS:
225 case SESSIONEXPIRED:
226 case OPERATIONTIMEOUT:
227 retryOrThrow(retryCounter, e, "exists");
228 break;
229
230 default:
231 throw e;
232 }
233 }
234 retryCounter.sleepUntilNextRetry();
235 }
236 } finally {
237 if (traceScope != null) traceScope.close();
238 }
239 }
240
241
242
243
244
245 public Stat exists(String path, boolean watch)
246 throws KeeperException, InterruptedException {
247 TraceScope traceScope = null;
248 try {
249 traceScope = Trace.startSpan("RecoverableZookeeper.exists");
250 RetryCounter retryCounter = retryCounterFactory.create();
251 while (true) {
252 try {
253 return checkZk().exists(path, watch);
254 } catch (KeeperException e) {
255 switch (e.code()) {
256 case CONNECTIONLOSS:
257 case SESSIONEXPIRED:
258 case OPERATIONTIMEOUT:
259 retryOrThrow(retryCounter, e, "exists");
260 break;
261
262 default:
263 throw e;
264 }
265 }
266 retryCounter.sleepUntilNextRetry();
267 }
268 } finally {
269 if (traceScope != null) traceScope.close();
270 }
271 }
272
273 private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
274 String opName) throws KeeperException {
275 LOG.debug("Possibly transient ZooKeeper, quorum=" + quorumServers + ", exception=" + e);
276 if (!retryCounter.shouldRetry()) {
277 LOG.error("ZooKeeper " + opName + " failed after "
278 + retryCounter.getMaxAttempts() + " attempts");
279 throw e;
280 }
281 }
282
283
284
285
286
287 public List<String> getChildren(String path, Watcher watcher)
288 throws KeeperException, InterruptedException {
289 TraceScope traceScope = null;
290 try {
291 traceScope = Trace.startSpan("RecoverableZookeeper.getChildren");
292 RetryCounter retryCounter = retryCounterFactory.create();
293 while (true) {
294 try {
295 return checkZk().getChildren(path, watcher);
296 } catch (KeeperException e) {
297 switch (e.code()) {
298 case CONNECTIONLOSS:
299 case SESSIONEXPIRED:
300 case OPERATIONTIMEOUT:
301 retryOrThrow(retryCounter, e, "getChildren");
302 break;
303
304 default:
305 throw e;
306 }
307 }
308 retryCounter.sleepUntilNextRetry();
309 }
310 } finally {
311 if (traceScope != null) traceScope.close();
312 }
313 }
314
315
316
317
318
319 public List<String> getChildren(String path, boolean watch)
320 throws KeeperException, InterruptedException {
321 TraceScope traceScope = null;
322 try {
323 traceScope = Trace.startSpan("RecoverableZookeeper.getChildren");
324 RetryCounter retryCounter = retryCounterFactory.create();
325 while (true) {
326 try {
327 return checkZk().getChildren(path, watch);
328 } catch (KeeperException e) {
329 switch (e.code()) {
330 case CONNECTIONLOSS:
331 case SESSIONEXPIRED:
332 case OPERATIONTIMEOUT:
333 retryOrThrow(retryCounter, e, "getChildren");
334 break;
335
336 default:
337 throw e;
338 }
339 }
340 retryCounter.sleepUntilNextRetry();
341 }
342 } finally {
343 if (traceScope != null) traceScope.close();
344 }
345 }
346
347
348
349
350
351 public byte[] getData(String path, Watcher watcher, Stat stat)
352 throws KeeperException, InterruptedException {
353 TraceScope traceScope = null;
354 try {
355 traceScope = Trace.startSpan("RecoverableZookeeper.getData");
356 RetryCounter retryCounter = retryCounterFactory.create();
357 while (true) {
358 try {
359 byte[] revData = checkZk().getData(path, watcher, stat);
360 return this.removeMetaData(revData);
361 } catch (KeeperException e) {
362 switch (e.code()) {
363 case CONNECTIONLOSS:
364 case SESSIONEXPIRED:
365 case OPERATIONTIMEOUT:
366 retryOrThrow(retryCounter, e, "getData");
367 break;
368
369 default:
370 throw e;
371 }
372 }
373 retryCounter.sleepUntilNextRetry();
374 }
375 } finally {
376 if (traceScope != null) traceScope.close();
377 }
378 }
379
380
381
382
383
384 public byte[] getData(String path, boolean watch, Stat stat)
385 throws KeeperException, InterruptedException {
386 TraceScope traceScope = null;
387 try {
388 traceScope = Trace.startSpan("RecoverableZookeeper.getData");
389 RetryCounter retryCounter = retryCounterFactory.create();
390 while (true) {
391 try {
392 byte[] revData = checkZk().getData(path, watch, stat);
393 return this.removeMetaData(revData);
394 } catch (KeeperException e) {
395 switch (e.code()) {
396 case CONNECTIONLOSS:
397 case SESSIONEXPIRED:
398 case OPERATIONTIMEOUT:
399 retryOrThrow(retryCounter, e, "getData");
400 break;
401
402 default:
403 throw e;
404 }
405 }
406 retryCounter.sleepUntilNextRetry();
407 }
408 } finally {
409 if (traceScope != null) traceScope.close();
410 }
411 }
412
413
414
415
416
417
418
419 public Stat setData(String path, byte[] data, int version)
420 throws KeeperException, InterruptedException {
421 TraceScope traceScope = null;
422 try {
423 traceScope = Trace.startSpan("RecoverableZookeeper.setData");
424 RetryCounter retryCounter = retryCounterFactory.create();
425 byte[] newData = appendMetaData(data);
426 boolean isRetry = false;
427 while (true) {
428 try {
429 return checkZk().setData(path, newData, version);
430 } catch (KeeperException e) {
431 switch (e.code()) {
432 case CONNECTIONLOSS:
433 case SESSIONEXPIRED:
434 case OPERATIONTIMEOUT:
435 retryOrThrow(retryCounter, e, "setData");
436 break;
437 case BADVERSION:
438 if (isRetry) {
439
440 try{
441 Stat stat = new Stat();
442 byte[] revData = checkZk().getData(path, false, stat);
443 if(Bytes.compareTo(revData, newData) == 0) {
444
445 return stat;
446 }
447 } catch(KeeperException keeperException){
448
449 throw keeperException;
450 }
451 }
452
453 default:
454 throw e;
455 }
456 }
457 retryCounter.sleepUntilNextRetry();
458 isRetry = true;
459 }
460 } finally {
461 if (traceScope != null) traceScope.close();
462 }
463 }
464
465
466
467
468
469 public List<ACL> getAcl(String path, Stat stat)
470 throws KeeperException, InterruptedException {
471 TraceScope traceScope = null;
472 try {
473 traceScope = Trace.startSpan("RecoverableZookeeper.getAcl");
474 RetryCounter retryCounter = retryCounterFactory.create();
475 while (true) {
476 try {
477 return checkZk().getACL(path, stat);
478 } catch (KeeperException e) {
479 switch (e.code()) {
480 case CONNECTIONLOSS:
481 case SESSIONEXPIRED:
482 case OPERATIONTIMEOUT:
483 retryOrThrow(retryCounter, e, "getAcl");
484 break;
485
486 default:
487 throw e;
488 }
489 }
490 retryCounter.sleepUntilNextRetry();
491 }
492 } finally {
493 if (traceScope != null) traceScope.close();
494 }
495 }
496
497
498
499
500
501 public Stat setAcl(String path, List<ACL> acls, int version)
502 throws KeeperException, InterruptedException {
503 TraceScope traceScope = null;
504 try {
505 traceScope = Trace.startSpan("RecoverableZookeeper.setAcl");
506 RetryCounter retryCounter = retryCounterFactory.create();
507 while (true) {
508 try {
509 return checkZk().setACL(path, acls, version);
510 } catch (KeeperException e) {
511 switch (e.code()) {
512 case CONNECTIONLOSS:
513 case SESSIONEXPIRED:
514 case OPERATIONTIMEOUT:
515 retryOrThrow(retryCounter, e, "setAcl");
516 break;
517
518 default:
519 throw e;
520 }
521 }
522 retryCounter.sleepUntilNextRetry();
523 }
524 } finally {
525 if (traceScope != null) traceScope.close();
526 }
527 }
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544 public String create(String path, byte[] data, List<ACL> acl,
545 CreateMode createMode)
546 throws KeeperException, InterruptedException {
547 TraceScope traceScope = null;
548 try {
549 traceScope = Trace.startSpan("RecoverableZookeeper.create");
550 byte[] newData = appendMetaData(data);
551 switch (createMode) {
552 case EPHEMERAL:
553 case PERSISTENT:
554 return createNonSequential(path, newData, acl, createMode);
555
556 case EPHEMERAL_SEQUENTIAL:
557 case PERSISTENT_SEQUENTIAL:
558 return createSequential(path, newData, acl, createMode);
559
560 default:
561 throw new IllegalArgumentException("Unrecognized CreateMode: " +
562 createMode);
563 }
564 } finally {
565 if (traceScope != null) traceScope.close();
566 }
567 }
568
569 private String createNonSequential(String path, byte[] data, List<ACL> acl,
570 CreateMode createMode) throws KeeperException, InterruptedException {
571 RetryCounter retryCounter = retryCounterFactory.create();
572 boolean isRetry = false;
573 while (true) {
574 try {
575 return checkZk().create(path, data, acl, createMode);
576 } catch (KeeperException e) {
577 switch (e.code()) {
578 case NODEEXISTS:
579 if (isRetry) {
580
581
582
583 byte[] currentData = checkZk().getData(path, false, null);
584 if (currentData != null &&
585 Bytes.compareTo(currentData, data) == 0) {
586
587 return path;
588 }
589 LOG.error("Node " + path + " already exists with " +
590 Bytes.toStringBinary(currentData) + ", could not write " +
591 Bytes.toStringBinary(data));
592 throw e;
593 }
594 LOG.debug("Node " + path + " already exists");
595 throw e;
596
597 case CONNECTIONLOSS:
598 case SESSIONEXPIRED:
599 case OPERATIONTIMEOUT:
600 retryOrThrow(retryCounter, e, "create");
601 break;
602
603 default:
604 throw e;
605 }
606 }
607 retryCounter.sleepUntilNextRetry();
608 isRetry = true;
609 }
610 }
611
612 private String createSequential(String path, byte[] data,
613 List<ACL> acl, CreateMode createMode)
614 throws KeeperException, InterruptedException {
615 RetryCounter retryCounter = retryCounterFactory.create();
616 boolean first = true;
617 String newPath = path+this.identifier;
618 while (true) {
619 try {
620 if (!first) {
621
622 String previousResult = findPreviousSequentialNode(newPath);
623 if (previousResult != null) {
624 return previousResult;
625 }
626 }
627 first = false;
628 return checkZk().create(newPath, data, acl, createMode);
629 } catch (KeeperException e) {
630 switch (e.code()) {
631 case CONNECTIONLOSS:
632 case SESSIONEXPIRED:
633 case OPERATIONTIMEOUT:
634 retryOrThrow(retryCounter, e, "create");
635 break;
636
637 default:
638 throw e;
639 }
640 }
641 retryCounter.sleepUntilNextRetry();
642 }
643 }
644
645
646
647
648 private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
649 throws UnsupportedOperationException {
650 if(ops == null) return null;
651
652 List<Op> preparedOps = new LinkedList<Op>();
653 for (Op op : ops) {
654 if (op.getType() == ZooDefs.OpCode.create) {
655 CreateRequest create = (CreateRequest)op.toRequestRecord();
656 preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
657 create.getAcl(), create.getFlags()));
658 } else if (op.getType() == ZooDefs.OpCode.delete) {
659
660 preparedOps.add(op);
661 } else if (op.getType() == ZooDefs.OpCode.setData) {
662 SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
663 preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
664 setData.getVersion()));
665 } else {
666 throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
667 }
668 }
669 return preparedOps;
670 }
671
672
673
674
675 public List<OpResult> multi(Iterable<Op> ops)
676 throws KeeperException, InterruptedException {
677 TraceScope traceScope = null;
678 try {
679 traceScope = Trace.startSpan("RecoverableZookeeper.multi");
680 RetryCounter retryCounter = retryCounterFactory.create();
681 Iterable<Op> multiOps = prepareZKMulti(ops);
682 while (true) {
683 try {
684 return checkZk().multi(multiOps);
685 } catch (KeeperException e) {
686 switch (e.code()) {
687 case CONNECTIONLOSS:
688 case SESSIONEXPIRED:
689 case OPERATIONTIMEOUT:
690 retryOrThrow(retryCounter, e, "multi");
691 break;
692
693 default:
694 throw e;
695 }
696 }
697 retryCounter.sleepUntilNextRetry();
698 }
699 } finally {
700 if (traceScope != null) traceScope.close();
701 }
702 }
703
704 private String findPreviousSequentialNode(String path)
705 throws KeeperException, InterruptedException {
706 int lastSlashIdx = path.lastIndexOf('/');
707 assert(lastSlashIdx != -1);
708 String parent = path.substring(0, lastSlashIdx);
709 String nodePrefix = path.substring(lastSlashIdx+1);
710
711 List<String> nodes = checkZk().getChildren(parent, false);
712 List<String> matching = filterByPrefix(nodes, nodePrefix);
713 for (String node : matching) {
714 String nodePath = parent + "/" + node;
715 Stat stat = checkZk().exists(nodePath, false);
716 if (stat != null) {
717 return nodePath;
718 }
719 }
720 return null;
721 }
722
723 public byte[] removeMetaData(byte[] data) {
724 if(data == null || data.length == 0) {
725 return data;
726 }
727
728 byte magic = data[0];
729 if(magic != MAGIC) {
730 return data;
731 }
732
733 int idLength = Bytes.toInt(data, ID_LENGTH_OFFSET);
734 int dataLength = data.length-MAGIC_SIZE-ID_LENGTH_SIZE-idLength;
735 int dataOffset = MAGIC_SIZE+ID_LENGTH_SIZE+idLength;
736
737 byte[] newData = new byte[dataLength];
738 System.arraycopy(data, dataOffset, newData, 0, dataLength);
739 return newData;
740 }
741
742 private byte[] appendMetaData(byte[] data) {
743 if(data == null || data.length == 0){
744 return data;
745 }
746 byte[] salt = Bytes.toBytes(salter.nextLong());
747 int idLength = id.length + salt.length;
748 byte[] newData = new byte[MAGIC_SIZE+ID_LENGTH_SIZE+idLength+data.length];
749 int pos = 0;
750 pos = Bytes.putByte(newData, pos, MAGIC);
751 pos = Bytes.putInt(newData, pos, idLength);
752 pos = Bytes.putBytes(newData, pos, id, 0, id.length);
753 pos = Bytes.putBytes(newData, pos, salt, 0, salt.length);
754 pos = Bytes.putBytes(newData, pos, data, 0, data.length);
755 return newData;
756 }
757
758 public synchronized long getSessionId() {
759 return zk == null ? -1 : zk.getSessionId();
760 }
761
762 public synchronized void close() throws InterruptedException {
763 if (zk != null) zk.close();
764 }
765
766 public synchronized States getState() {
767 return zk == null ? null : zk.getState();
768 }
769
770 public synchronized ZooKeeper getZooKeeper() {
771 return zk;
772 }
773
774 public synchronized byte[] getSessionPasswd() {
775 return zk == null ? null : zk.getSessionPasswd();
776 }
777
778 public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
779 checkZk().sync(path, null, null);
780 }
781
782
783
784
785
786
787
788
789
790
791 private static List<String> filterByPrefix(List<String> nodes,
792 String... prefixes) {
793 List<String> lockChildren = new ArrayList<String>();
794 for (String child : nodes){
795 for (String prefix : prefixes){
796 if (child.startsWith(prefix)){
797 lockChildren.add(child);
798 break;
799 }
800 }
801 }
802 return lockChildren;
803 }
804
805 public String getIdentifier() {
806 return identifier;
807 }
808 }