1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.TreeMap;
29 import java.util.UUID;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.Abortable;
35 import org.apache.hadoop.hbase.ServerName;
36 import org.apache.hadoop.hbase.exceptions.DeserializationException;
37 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
41 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
42 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
43 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
44 import org.apache.zookeeper.KeeperException;
45 import org.apache.zookeeper.KeeperException.AuthFailedException;
46 import org.apache.zookeeper.KeeperException.ConnectionLossException;
47 import org.apache.zookeeper.KeeperException.SessionExpiredException;
48
49 import com.google.protobuf.InvalidProtocolBufferException;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
78
79
80 private Map<String, ReplicationPeer> peerClusters;
81 private final String tableCFsNodeName;
82
83 private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
84
85 public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
86 Abortable abortable) {
87 super(zk, conf, abortable);
88 this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
89 this.peerClusters = new HashMap<String, ReplicationPeer>();
90 }
91
92 @Override
93 public void init() throws ReplicationException {
94 try {
95 if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
96 ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
97 }
98 } catch (KeeperException e) {
99 throw new ReplicationException("Could not initialize replication peers", e);
100 }
101 connectExistingPeers();
102 }
103
104 @Override
105 public void addPeer(String id, String clusterKey) throws ReplicationException {
106 addPeer(id, clusterKey, null);
107 }
108
109 @Override
110 public void addPeer(String id, String clusterKey, String tableCFs) throws ReplicationException {
111 try {
112 if (peerExists(id)) {
113 throw new IllegalArgumentException("Cannot add a peer with id=" + id
114 + " because that id already exists.");
115 }
116 ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
117 ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
118 toByteArray(clusterKey));
119
120
121
122 ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id),
123 ENABLED_ZNODE_BYTES);
124
125
126 String tableCFsStr = (tableCFs == null) ? "" : tableCFs;
127 ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getTableCFsNode(id),
128 Bytes.toBytes(tableCFsStr));
129 } catch (KeeperException e) {
130 throw new ReplicationException("Could not add peer with id=" + id
131 + ", clusterKey=" + clusterKey, e);
132 }
133 }
134
135 @Override
136 public void removePeer(String id) throws ReplicationException {
137 try {
138 if (!peerExists(id)) {
139 throw new IllegalArgumentException("Cannot remove peer with id=" + id
140 + " because that id does not exist.");
141 }
142 ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
143 } catch (KeeperException e) {
144 throw new ReplicationException("Could not remove peer with id=" + id, e);
145 }
146 }
147
148 @Override
149 public void enablePeer(String id) throws ReplicationException {
150 changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
151 LOG.info("peer " + id + " is enabled");
152 }
153
154 @Override
155 public void disablePeer(String id) throws ReplicationException {
156 changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
157 LOG.info("peer " + id + " is disabled");
158 }
159
160 @Override
161 public String getPeerTableCFsConfig(String id) throws ReplicationException {
162 try {
163 if (!peerExists(id)) {
164 throw new IllegalArgumentException("peer " + id + " doesn't exist");
165 }
166 try {
167 return Bytes.toString(ZKUtil.getData(this.zookeeper, getTableCFsNode(id)));
168 } catch (Exception e) {
169 throw new ReplicationException(e);
170 }
171 } catch (KeeperException e) {
172 throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
173 }
174 }
175
176 @Override
177 public void setPeerTableCFsConfig(String id, String tableCFsStr) throws ReplicationException {
178 try {
179 if (!peerExists(id)) {
180 throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
181 + " does not exist.");
182 }
183 String tableCFsZKNode = getTableCFsNode(id);
184 byte[] tableCFs = Bytes.toBytes(tableCFsStr);
185 if (ZKUtil.checkExists(this.zookeeper, tableCFsZKNode) != -1) {
186 ZKUtil.setData(this.zookeeper, tableCFsZKNode, tableCFs);
187 } else {
188 ZKUtil.createAndWatch(this.zookeeper, tableCFsZKNode, tableCFs);
189 }
190 LOG.info("Peer tableCFs with id= " + id + " is now " + tableCFsStr);
191 } catch (KeeperException e) {
192 throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
193 }
194 }
195
196 @Override
197 public Map<String, List<String>> getTableCFs(String id) throws IllegalArgumentException {
198 if (!this.peerClusters.containsKey(id)) {
199 throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
200 }
201 return this.peerClusters.get(id).getTableCFs();
202 }
203
204 @Override
205 public boolean getStatusOfConnectedPeer(String id) {
206 if (!this.peerClusters.containsKey(id)) {
207 throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
208 }
209 return this.peerClusters.get(id).getPeerEnabled().get();
210 }
211
212 @Override
213 public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
214 try {
215 if (!peerExists(id)) {
216 throw new IllegalArgumentException("peer " + id + " doesn't exist");
217 }
218 String peerStateZNode = getPeerStateNode(id);
219 try {
220 return ReplicationPeer.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
221 } catch (KeeperException e) {
222 throw new ReplicationException(e);
223 } catch (DeserializationException e) {
224 throw new ReplicationException(e);
225 }
226 } catch (KeeperException e) {
227 throw new ReplicationException("Unable to get status of the peer with id=" + id +
228 " from backing store", e);
229 }
230 }
231
232 @Override
233 public boolean connectToPeer(String peerId) throws ReplicationException {
234 if (peerClusters == null) {
235 return false;
236 }
237 if (this.peerClusters.containsKey(peerId)) {
238 return false;
239 }
240 ReplicationPeer peer = null;
241 try {
242 peer = getPeer(peerId);
243 } catch (Exception e) {
244 throw new ReplicationException("Error connecting to peer with id=" + peerId, e);
245 }
246 if (peer == null) {
247 return false;
248 }
249 this.peerClusters.put(peerId, peer);
250 LOG.info("Added new peer cluster " + peer.getClusterKey());
251 return true;
252 }
253
254 @Override
255 public void disconnectFromPeer(String peerId) {
256 ReplicationPeer rp = this.peerClusters.get(peerId);
257 if (rp != null) {
258 rp.getZkw().close();
259 this.peerClusters.remove(peerId);
260 }
261 }
262
263 @Override
264 public Map<String, String> getAllPeerClusterKeys() {
265 Map<String, String> peers = new TreeMap<String, String>();
266 List<String> ids = null;
267 try {
268 ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
269 for (String id : ids) {
270 byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
271 String clusterKey = null;
272 try {
273 clusterKey = parsePeerFrom(bytes);
274 } catch (DeserializationException de) {
275 LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
276 continue;
277 }
278 peers.put(id, clusterKey);
279 }
280 } catch (KeeperException e) {
281 this.abortable.abort("Cannot get the list of peers ", e);
282 }
283 return peers;
284 }
285
286 @Override
287 public List<ServerName> getRegionServersOfConnectedPeer(String peerId) {
288 if (this.peerClusters.size() == 0) {
289 return Collections.emptyList();
290 }
291 ReplicationPeer peer = this.peerClusters.get(peerId);
292 if (peer == null) {
293 return Collections.emptyList();
294 }
295 List<ServerName> addresses;
296 try {
297 addresses = fetchSlavesAddresses(peer.getZkw());
298 } catch (KeeperException ke) {
299 if (LOG.isDebugEnabled()) {
300 LOG.debug("Fetch salves addresses failed.", ke);
301 }
302 reconnectPeer(ke, peer);
303 addresses = Collections.emptyList();
304 }
305 peer.setRegionServers(addresses);
306 return peer.getRegionServers();
307 }
308
309 @Override
310 public UUID getPeerUUID(String peerId) {
311 ReplicationPeer peer = this.peerClusters.get(peerId);
312 if (peer == null) {
313 return null;
314 }
315 UUID peerUUID = null;
316 try {
317 peerUUID = ZKClusterId.getUUIDForCluster(peer.getZkw());
318 } catch (KeeperException ke) {
319 reconnectPeer(ke, peer);
320 }
321 return peerUUID;
322 }
323
324 @Override
325 public Set<String> getConnectedPeers() {
326 return this.peerClusters.keySet();
327 }
328
329 @Override
330 public Configuration getPeerConf(String peerId) throws ReplicationException {
331 String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
332 byte[] data = null;
333 try {
334 data = ZKUtil.getData(this.zookeeper, znode);
335 } catch (KeeperException e) {
336 throw new ReplicationException("Error getting configuration for peer with id="
337 + peerId, e);
338 }
339 if (data == null) {
340 LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
341 return null;
342 }
343 String otherClusterKey = "";
344 try {
345 otherClusterKey = parsePeerFrom(data);
346 } catch (DeserializationException e) {
347 LOG.warn("Failed to parse cluster key from peerId=" + peerId
348 + ", specifically the content from the following znode: " + znode);
349 return null;
350 }
351
352 Configuration otherConf = new Configuration(this.conf);
353 try {
354 ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
355 } catch (IOException e) {
356 LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
357 return null;
358 }
359 return otherConf;
360 }
361
362
363
364
365 @Override
366 public List<String> getAllPeerIds() {
367 List<String> ids = null;
368 try {
369 ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
370 } catch (KeeperException e) {
371 this.abortable.abort("Cannot get the list of peers ", e);
372 }
373 return ids;
374 }
375
376 @Override
377 public long getTimestampOfLastChangeToPeer(String peerId) {
378 if (!peerClusters.containsKey(peerId)) {
379 throw new IllegalArgumentException("Unknown peer id: " + peerId);
380 }
381 return peerClusters.get(peerId).getLastRegionserverUpdate();
382 }
383
384
385
386
387
388 private void connectExistingPeers() throws ReplicationException {
389 List<String> znodes = null;
390 try {
391 znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
392 } catch (KeeperException e) {
393 throw new ReplicationException("Error getting the list of peer clusters.", e);
394 }
395 if (znodes != null) {
396 for (String z : znodes) {
397 connectToPeer(z);
398 }
399 }
400 }
401
402
403
404
405
406
407 private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
408 if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
409 || ke instanceof AuthFailedException) {
410 LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke);
411 try {
412 peer.reloadZkWatcher();
413 peer.getZkw().registerListener(new PeerRegionServerListener(peer));
414 } catch (IOException io) {
415 LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io);
416 }
417 }
418 }
419
420
421
422
423
424
425 private static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
426 throws KeeperException {
427 List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode);
428 if (children == null) {
429 return Collections.emptyList();
430 }
431 List<ServerName> addresses = new ArrayList<ServerName>(children.size());
432 for (String child : children) {
433 addresses.add(ServerName.parseServerName(child));
434 }
435 return addresses;
436 }
437
438 private String getTableCFsNode(String id) {
439 return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName));
440 }
441
442 private String getPeerStateNode(String id) {
443 return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
444 }
445
446
447
448
449
450
451 private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
452 throws ReplicationException {
453 try {
454 if (!peerExists(id)) {
455 throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
456 + " does not exist.");
457 }
458 String peerStateZNode = getPeerStateNode(id);
459 byte[] stateBytes =
460 (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
461 : DISABLED_ZNODE_BYTES;
462 if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
463 ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
464 } else {
465 ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
466 }
467 LOG.info("Peer with id= " + id + " is now " + state.name());
468 } catch (KeeperException e) {
469 throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
470 }
471 }
472
473
474
475
476
477
478
479 private ReplicationPeer getPeer(String peerId) throws ReplicationException {
480 Configuration peerConf = getPeerConf(peerId);
481 if (peerConf == null) {
482 return null;
483 }
484 if (this.ourClusterKey.equals(ZKUtil.getZooKeeperClusterKey(peerConf))) {
485 LOG.debug("Not connecting to " + peerId + " because it's us");
486 return null;
487 }
488
489 ReplicationPeer peer =
490 new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf));
491 try {
492 peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
493 } catch (KeeperException e) {
494 throw new ReplicationException("Error starting the peer state tracker for peerId=" +
495 peerId, e);
496 }
497
498 try {
499 peer.startTableCFsTracker(this.zookeeper, this.getTableCFsNode(peerId));
500 } catch (KeeperException e) {
501 throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
502 peerId, e);
503 }
504
505 peer.getZkw().registerListener(new PeerRegionServerListener(peer));
506 return peer;
507 }
508
509
510
511
512
513
514 private static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
515 if (ProtobufUtil.isPBMagicPrefix(bytes)) {
516 int pblen = ProtobufUtil.lengthOfPBMagic();
517 ZooKeeperProtos.ReplicationPeer.Builder builder =
518 ZooKeeperProtos.ReplicationPeer.newBuilder();
519 ZooKeeperProtos.ReplicationPeer peer;
520 try {
521 peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
522 } catch (InvalidProtocolBufferException e) {
523 throw new DeserializationException(e);
524 }
525 return peer.getClusterkey();
526 } else {
527 if (bytes.length > 0) {
528 return Bytes.toString(bytes);
529 }
530 return "";
531 }
532 }
533
534
535
536
537
538
539
540 private static byte[] toByteArray(final String clusterKey) {
541 byte[] bytes =
542 ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
543 .toByteArray();
544 return ProtobufUtil.prependPBMagic(bytes);
545 }
546
547
548
549
550 public static class PeerRegionServerListener extends ZooKeeperListener {
551
552 private ReplicationPeer peer;
553 private String regionServerListNode;
554
555 public PeerRegionServerListener(ReplicationPeer replicationPeer) {
556 super(replicationPeer.getZkw());
557 this.peer = replicationPeer;
558 this.regionServerListNode = peer.getZkw().rsZNode;
559 }
560
561 public PeerRegionServerListener(String regionServerListNode, ZooKeeperWatcher zkw) {
562 super(zkw);
563 this.regionServerListNode = regionServerListNode;
564 }
565
566 @Override
567 public synchronized void nodeChildrenChanged(String path) {
568 if (path.equals(regionServerListNode)) {
569 try {
570 LOG.info("Detected change to peer regionservers, fetching updated list");
571 peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
572 } catch (KeeperException e) {
573 LOG.fatal("Error reading slave addresses", e);
574 }
575 }
576 }
577
578 }
579 }