View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Set;
28  import java.util.TreeMap;
29  import java.util.UUID;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.Abortable;
35  import org.apache.hadoop.hbase.ServerName;
36  import org.apache.hadoop.hbase.exceptions.DeserializationException;
37  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
41  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
42  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
43  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
44  import org.apache.zookeeper.KeeperException;
45  import org.apache.zookeeper.KeeperException.AuthFailedException;
46  import org.apache.zookeeper.KeeperException.ConnectionLossException;
47  import org.apache.zookeeper.KeeperException.SessionExpiredException;
48  
49  import com.google.protobuf.InvalidProtocolBufferException;
50  
51  /**
52   * This class provides an implementation of the ReplicationPeers interface using Zookeeper. The
53   * peers znode contains a list of all peer replication clusters and the current replication state of
54   * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with
55   * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the
56   * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of
57   * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase.
58   * For example:
59   *
60   *  /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
61   *  /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
62   *
63   * Each of these peer znodes has a child znode that indicates whether or not replication is enabled
64   * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a
65   * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the
66   * ReplicationPeer.PeerStateTracker class. For example:
67   *
68   * /hbase/replication/peers/1/peer-state [Value: ENABLED]
69   *
70   * Each of these peer znodes has a child znode that indicates which data will be replicated
71   * to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a
72   * table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker
73   * class. For example:
74   *
75   * /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"]
76   */
77  public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
78  
79    // Map of peer clusters keyed by their id
80    private Map<String, ReplicationPeer> peerClusters;
81    private final String tableCFsNodeName;
82  
83    private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
84  
85    public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
86        Abortable abortable) {
87      super(zk, conf, abortable);
88      this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
89      this.peerClusters = new HashMap<String, ReplicationPeer>();
90    }
91  
92    @Override
93    public void init() throws ReplicationException {
94      try {
95        if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
96          ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
97        }
98      } catch (KeeperException e) {
99        throw new ReplicationException("Could not initialize replication peers", e);
100     }
101     connectExistingPeers();
102   }
103 
104   @Override
105   public void addPeer(String id, String clusterKey) throws ReplicationException {
106     addPeer(id, clusterKey, null);
107   }
108 
109   @Override
110   public void addPeer(String id, String clusterKey, String tableCFs) throws ReplicationException {
111     try {
112       if (peerExists(id)) {
113         throw new IllegalArgumentException("Cannot add a peer with id=" + id
114             + " because that id already exists.");
115       }
116       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
117       ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
118         toByteArray(clusterKey));
119       // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
120       // peer-state znode. This happens while adding a peer.
121       // The peer state data is set as "ENABLED" by default.
122       ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id),
123         ENABLED_ZNODE_BYTES);
124       // A peer is enabled by default
125 
126       String tableCFsStr = (tableCFs == null) ? "" : tableCFs;
127       ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getTableCFsNode(id),
128                     Bytes.toBytes(tableCFsStr));
129     } catch (KeeperException e) {
130       throw new ReplicationException("Could not add peer with id=" + id
131           + ", clusterKey=" + clusterKey, e);
132     }
133   }
134 
135   @Override
136   public void removePeer(String id) throws ReplicationException {
137     try {
138       if (!peerExists(id)) {
139         throw new IllegalArgumentException("Cannot remove peer with id=" + id
140             + " because that id does not exist.");
141       }
142       ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
143     } catch (KeeperException e) {
144       throw new ReplicationException("Could not remove peer with id=" + id, e);
145     }
146   }
147 
148   @Override
149   public void enablePeer(String id) throws ReplicationException {
150     changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
151     LOG.info("peer " + id + " is enabled");
152   }
153 
154   @Override
155   public void disablePeer(String id) throws ReplicationException {
156     changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
157     LOG.info("peer " + id + " is disabled");
158   }
159 
160   @Override
161   public String getPeerTableCFsConfig(String id) throws ReplicationException {
162     try {
163       if (!peerExists(id)) {
164         throw new IllegalArgumentException("peer " + id + " doesn't exist");
165       }
166       try {
167         return Bytes.toString(ZKUtil.getData(this.zookeeper, getTableCFsNode(id)));
168       } catch (Exception e) {
169         throw new ReplicationException(e);
170       }
171     } catch (KeeperException e) {
172       throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
173     }
174   }
175 
176   @Override
177   public void setPeerTableCFsConfig(String id, String tableCFsStr) throws ReplicationException {
178     try {
179       if (!peerExists(id)) {
180         throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
181             + " does not exist.");
182       }
183       String tableCFsZKNode = getTableCFsNode(id);
184       byte[] tableCFs = Bytes.toBytes(tableCFsStr);
185       if (ZKUtil.checkExists(this.zookeeper, tableCFsZKNode) != -1) {
186         ZKUtil.setData(this.zookeeper, tableCFsZKNode, tableCFs);
187       } else {
188         ZKUtil.createAndWatch(this.zookeeper, tableCFsZKNode, tableCFs);
189       }
190       LOG.info("Peer tableCFs with id= " + id + " is now " + tableCFsStr);
191     } catch (KeeperException e) {
192       throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
193     }
194   }
195 
196   @Override
197   public Map<String, List<String>> getTableCFs(String id) throws IllegalArgumentException {
198     if (!this.peerClusters.containsKey(id)) {
199       throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
200     }
201     return this.peerClusters.get(id).getTableCFs();
202   }
203 
204   @Override
205   public boolean getStatusOfConnectedPeer(String id) {
206     if (!this.peerClusters.containsKey(id)) {
207       throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
208     }
209     return this.peerClusters.get(id).getPeerEnabled().get();
210   }
211 
212   @Override
213   public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
214     try {
215       if (!peerExists(id)) {
216         throw new IllegalArgumentException("peer " + id + " doesn't exist");
217       }
218       String peerStateZNode = getPeerStateNode(id);
219       try {
220         return ReplicationPeer.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
221       } catch (KeeperException e) {
222         throw new ReplicationException(e);
223       } catch (DeserializationException e) {
224         throw new ReplicationException(e);
225       }
226     } catch (KeeperException e) {
227       throw new ReplicationException("Unable to get status of the peer with id=" + id +
228           " from backing store", e);
229     }
230   }
231 
232   @Override
233   public boolean connectToPeer(String peerId) throws ReplicationException {
234     if (peerClusters == null) {
235       return false;
236     }
237     if (this.peerClusters.containsKey(peerId)) {
238       return false;
239     }
240     ReplicationPeer peer = null;
241     try {
242       peer = getPeer(peerId);
243     } catch (Exception e) {
244       throw new ReplicationException("Error connecting to peer with id=" + peerId, e);
245     }
246     if (peer == null) {
247       return false;
248     }
249     this.peerClusters.put(peerId, peer);
250     LOG.info("Added new peer cluster " + peer.getClusterKey());
251     return true;
252   }
253 
254   @Override
255   public void disconnectFromPeer(String peerId) {
256     ReplicationPeer rp = this.peerClusters.get(peerId);
257     if (rp != null) {
258       rp.getZkw().close();
259       this.peerClusters.remove(peerId);
260     }
261   }
262 
263   @Override
264   public Map<String, String> getAllPeerClusterKeys() {
265     Map<String, String> peers = new TreeMap<String, String>();
266     List<String> ids = null;
267     try {
268       ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
269       for (String id : ids) {
270         byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
271         String clusterKey = null;
272         try {
273           clusterKey = parsePeerFrom(bytes);
274         } catch (DeserializationException de) {
275           LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
276           continue;
277         }
278         peers.put(id, clusterKey);
279       }
280     } catch (KeeperException e) {
281       this.abortable.abort("Cannot get the list of peers ", e);
282     }
283     return peers;
284   }
285 
286   @Override
287   public List<ServerName> getRegionServersOfConnectedPeer(String peerId) {
288     if (this.peerClusters.size() == 0) {
289       return Collections.emptyList();
290     }
291     ReplicationPeer peer = this.peerClusters.get(peerId);
292     if (peer == null) {
293       return Collections.emptyList();
294     }
295     List<ServerName> addresses;
296     try {
297       addresses = fetchSlavesAddresses(peer.getZkw());
298     } catch (KeeperException ke) {
299       if (LOG.isDebugEnabled()) {
300         LOG.debug("Fetch salves addresses failed.", ke);
301       }
302       reconnectPeer(ke, peer);
303       addresses = Collections.emptyList();
304     }
305     peer.setRegionServers(addresses);
306     return peer.getRegionServers();
307   }
308 
309   @Override
310   public UUID getPeerUUID(String peerId) {
311     ReplicationPeer peer = this.peerClusters.get(peerId);
312     if (peer == null) {
313       return null;
314     }
315     UUID peerUUID = null;
316     try {
317       peerUUID = ZKClusterId.getUUIDForCluster(peer.getZkw());
318     } catch (KeeperException ke) {
319       reconnectPeer(ke, peer);
320     }
321     return peerUUID;
322   }
323 
324   @Override
325   public Set<String> getConnectedPeers() {
326     return this.peerClusters.keySet();
327   }
328 
329   @Override
330   public Configuration getPeerConf(String peerId) throws ReplicationException {
331     String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
332     byte[] data = null;
333     try {
334       data = ZKUtil.getData(this.zookeeper, znode);
335     } catch (KeeperException e) {
336       throw new ReplicationException("Error getting configuration for peer with id="
337           + peerId, e);
338     }
339     if (data == null) {
340       LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
341       return null;
342     }
343     String otherClusterKey = "";
344     try {
345       otherClusterKey = parsePeerFrom(data);
346     } catch (DeserializationException e) {
347       LOG.warn("Failed to parse cluster key from peerId=" + peerId
348           + ", specifically the content from the following znode: " + znode);
349       return null;
350     }
351 
352     Configuration otherConf = new Configuration(this.conf);
353     try {
354       ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
355     } catch (IOException e) {
356       LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
357       return null;
358     }
359     return otherConf;
360   }
361 
362   /**
363    * List all registered peer clusters and set a watch on their znodes.
364    */
365   @Override
366   public List<String> getAllPeerIds() {
367     List<String> ids = null;
368     try {
369       ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
370     } catch (KeeperException e) {
371       this.abortable.abort("Cannot get the list of peers ", e);
372     }
373     return ids;
374   }
375 
376   @Override
377   public long getTimestampOfLastChangeToPeer(String peerId) {
378     if (!peerClusters.containsKey(peerId)) {
379       throw new IllegalArgumentException("Unknown peer id: " + peerId);
380     }
381     return peerClusters.get(peerId).getLastRegionserverUpdate();
382   }
383 
384   /**
385    * A private method used during initialization. This method attempts to connect to all registered
386    * peer clusters. This method does not set a watch on the peer cluster znodes.
387    */
388   private void connectExistingPeers() throws ReplicationException {
389     List<String> znodes = null;
390     try {
391       znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
392     } catch (KeeperException e) {
393       throw new ReplicationException("Error getting the list of peer clusters.", e);
394     }
395     if (znodes != null) {
396       for (String z : znodes) {
397         connectToPeer(z);
398       }
399     }
400   }
401 
402   /**
403    * A private method used to re-establish a zookeeper session with a peer cluster.
404    * @param ke
405    * @param peer
406    */
407   private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
408     if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
409         || ke instanceof AuthFailedException) {
410       LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke);
411       try {
412         peer.reloadZkWatcher();
413         peer.getZkw().registerListener(new PeerRegionServerListener(peer));
414       } catch (IOException io) {
415         LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io);
416       }
417     }
418   }
419 
420   /**
421    * Get the list of all the region servers from the specified peer
422    * @param zkw zk connection to use
423    * @return list of region server addresses or an empty list if the slave is unavailable
424    */
425   private static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
426       throws KeeperException {
427     List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode);
428     if (children == null) {
429       return Collections.emptyList();
430     }
431     List<ServerName> addresses = new ArrayList<ServerName>(children.size());
432     for (String child : children) {
433       addresses.add(ServerName.parseServerName(child));
434     }
435     return addresses;
436   }
437 
438   private String getTableCFsNode(String id) {
439     return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName));
440   }
441 
442   private String getPeerStateNode(String id) {
443     return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
444   }
445 
446   /**
447    * Update the state znode of a peer cluster.
448    * @param id
449    * @param state
450    */
451   private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
452       throws ReplicationException {
453     try {
454       if (!peerExists(id)) {
455         throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
456             + " does not exist.");
457       }
458       String peerStateZNode = getPeerStateNode(id);
459       byte[] stateBytes =
460           (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
461               : DISABLED_ZNODE_BYTES;
462       if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
463         ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
464       } else {
465         ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
466       }
467       LOG.info("Peer with id= " + id + " is now " + state.name());
468     } catch (KeeperException e) {
469       throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
470     }
471   }
472 
473   /**
474    * Helper method to connect to a peer
475    * @param peerId peer's identifier
476    * @return object representing the peer
477    * @throws ReplicationException
478    */
479   private ReplicationPeer getPeer(String peerId) throws ReplicationException {
480     Configuration peerConf = getPeerConf(peerId);
481     if (peerConf == null) {
482       return null;
483     }
484     if (this.ourClusterKey.equals(ZKUtil.getZooKeeperClusterKey(peerConf))) {
485       LOG.debug("Not connecting to " + peerId + " because it's us");
486       return null;
487     }
488 
489     ReplicationPeer peer =
490         new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf));
491     try {
492       peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
493     } catch (KeeperException e) {
494       throw new ReplicationException("Error starting the peer state tracker for peerId=" +
495           peerId, e);
496     }
497 
498     try {
499       peer.startTableCFsTracker(this.zookeeper, this.getTableCFsNode(peerId));
500     } catch (KeeperException e) {
501       throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
502           peerId, e);
503     }
504 
505     peer.getZkw().registerListener(new PeerRegionServerListener(peer));
506     return peer;
507   }
508 
509   /**
510    * @param bytes Content of a peer znode.
511    * @return ClusterKey parsed from the passed bytes.
512    * @throws DeserializationException
513    */
514   private static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
515     if (ProtobufUtil.isPBMagicPrefix(bytes)) {
516       int pblen = ProtobufUtil.lengthOfPBMagic();
517       ZooKeeperProtos.ReplicationPeer.Builder builder =
518           ZooKeeperProtos.ReplicationPeer.newBuilder();
519       ZooKeeperProtos.ReplicationPeer peer;
520       try {
521         peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
522       } catch (InvalidProtocolBufferException e) {
523         throw new DeserializationException(e);
524       }
525       return peer.getClusterkey();
526     } else {
527       if (bytes.length > 0) {
528         return Bytes.toString(bytes);
529       }
530       return "";
531     }
532   }
533 
534   /**
535    * @param clusterKey
536    * @return Serialized protobuf of <code>clusterKey</code> with pb magic prefix prepended suitable
537    *         for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
538    *         /hbase/replication/peers/PEER_ID
539    */
540   private static byte[] toByteArray(final String clusterKey) {
541     byte[] bytes =
542         ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
543             .toByteArray();
544     return ProtobufUtil.prependPBMagic(bytes);
545   }
546 
547   /**
548    * Tracks changes to the list of region servers in a peer's cluster.
549    */
550   public static class PeerRegionServerListener extends ZooKeeperListener {
551 
552     private ReplicationPeer peer;
553     private String regionServerListNode;
554 
555     public PeerRegionServerListener(ReplicationPeer replicationPeer) {
556       super(replicationPeer.getZkw());
557       this.peer = replicationPeer;
558       this.regionServerListNode = peer.getZkw().rsZNode;
559     }
560 
561     public PeerRegionServerListener(String regionServerListNode, ZooKeeperWatcher zkw) {
562       super(zkw);
563       this.regionServerListNode = regionServerListNode;
564     }
565 
566     @Override
567     public synchronized void nodeChildrenChanged(String path) {
568       if (path.equals(regionServerListNode)) {
569         try {
570           LOG.info("Detected change to peer regionservers, fetching updated list");
571           peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
572         } catch (KeeperException e) {
573           LOG.fatal("Error reading slave addresses", e);
574         }
575       }
576     }
577 
578   }
579 }