View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.concurrent.CopyOnWriteArrayList;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Abortable;
29  import org.apache.hadoop.hbase.Stoppable;
30  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
31  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
32  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
33  import org.apache.zookeeper.KeeperException;
34  
35  /**
36   * This class is a Zookeeper implementation of the ReplicationTracker interface. This class is
37   * responsible for handling replication events that are defined in the ReplicationListener
38   * interface.
39   */
40  public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements ReplicationTracker {
41  
42    private static final Log LOG = LogFactory.getLog(ReplicationTrackerZKImpl.class);
43    // All about stopping
44    private final Stoppable stopper;
45    // listeners to be notified
46    private final List<ReplicationListener> listeners =
47        new CopyOnWriteArrayList<ReplicationListener>();
48    // List of all the other region servers in this cluster
49    private final ArrayList<String> otherRegionServers = new ArrayList<String>();
50    private final ReplicationPeers replicationPeers;
51  
52    public ReplicationTrackerZKImpl(ZooKeeperWatcher zookeeper,
53        final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
54        Stoppable stopper) {
55      super(zookeeper, conf, abortable);
56      this.replicationPeers = replicationPeers;
57      this.stopper = stopper;
58      this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
59      this.zookeeper.registerListener(new PeersWatcher(this.zookeeper));
60    }
61  
62    @Override
63    public void registerListener(ReplicationListener listener) {
64      listeners.add(listener);
65    }
66  
67    @Override
68    public void removeListener(ReplicationListener listener) {
69      listeners.remove(listener);
70    }
71  
72    /**
73     * Return a snapshot of the current region servers.
74     */
75    @Override
76    public List<String> getListOfRegionServers() {
77      refreshOtherRegionServersList();
78  
79      List<String> list = null;
80      synchronized (otherRegionServers) {
81        list = new ArrayList<String>(otherRegionServers);
82      }
83      return list;
84    }
85  
86    /**
87     * Watcher used to be notified of the other region server's death in the local cluster. It
88     * initiates the process to transfer the queues if it is able to grab the lock.
89     */
90    public class OtherRegionServerWatcher extends ZooKeeperListener {
91  
92      /**
93       * Construct a ZooKeeper event listener.
94       */
95      public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
96        super(watcher);
97      }
98  
99      /**
100      * Called when a new node has been created.
101      * @param path full path of the new node
102      */
103     public void nodeCreated(String path) {
104       refreshListIfRightPath(path);
105     }
106 
107     /**
108      * Called when a node has been deleted
109      * @param path full path of the deleted node
110      */
111     public void nodeDeleted(String path) {
112       if (stopper.isStopped()) {
113         return;
114       }
115       boolean cont = refreshListIfRightPath(path);
116       if (!cont) {
117         return;
118       }
119       LOG.info(path + " znode expired, triggering replicatorRemoved event");
120       for (ReplicationListener rl : listeners) {
121         rl.regionServerRemoved(getZNodeName(path));
122       }
123     }
124 
125     /**
126      * Called when an existing node has a child node added or removed.
127      * @param path full path of the node whose children have changed
128      */
129     public void nodeChildrenChanged(String path) {
130       if (stopper.isStopped()) {
131         return;
132       }
133       refreshListIfRightPath(path);
134     }
135 
136     private boolean refreshListIfRightPath(String path) {
137       if (!path.startsWith(this.watcher.rsZNode)) {
138         return false;
139       }
140       return refreshOtherRegionServersList();
141     }
142   }
143 
144   /**
145    * Watcher used to follow the creation and deletion of peer clusters.
146    */
147   public class PeersWatcher extends ZooKeeperListener {
148 
149     /**
150      * Construct a ZooKeeper event listener.
151      */
152     public PeersWatcher(ZooKeeperWatcher watcher) {
153       super(watcher);
154     }
155 
156     /**
157      * Called when a node has been deleted
158      * @param path full path of the deleted node
159      */
160     public void nodeDeleted(String path) {
161       List<String> peers = refreshPeersList(path);
162       if (peers == null) {
163         return;
164       }
165       if (isPeerPath(path)) {
166         String id = getZNodeName(path);
167         LOG.info(path + " znode expired, triggering peerRemoved event");
168         for (ReplicationListener rl : listeners) {
169           rl.peerRemoved(id);
170         }
171       }
172     }
173 
174     /**
175      * Called when an existing node has a child node added or removed.
176      * @param path full path of the node whose children have changed
177      */
178     public void nodeChildrenChanged(String path) {
179       List<String> peers = refreshPeersList(path);
180       if (peers == null) {
181         return;
182       }
183       LOG.info(path + " znode expired, triggering peerListChanged event");
184       for (ReplicationListener rl : listeners) {
185         rl.peerListChanged(peers);
186       }
187     }
188   }
189 
190   /**
191    * Verify if this event is meant for us, and if so then get the latest peers' list from ZK. Also
192    * reset the watches.
193    * @param path path to check against
194    * @return A list of peers' identifiers if the event concerns this watcher, else null.
195    */
196   private List<String> refreshPeersList(String path) {
197     if (!path.startsWith(getPeersZNode())) {
198       return null;
199     }
200     return this.replicationPeers.getAllPeerIds();
201   }
202 
203   private String getPeersZNode() {
204     return this.peersZNode;
205   }
206 
207   /**
208    * Extracts the znode name of a peer cluster from a ZK path
209    * @param fullPath Path to extract the id from
210    * @return the id or an empty string if path is invalid
211    */
212   private String getZNodeName(String fullPath) {
213     String[] parts = fullPath.split("/");
214     return parts.length > 0 ? parts[parts.length - 1] : "";
215   }
216 
217   /**
218    * Reads the list of region servers from ZK and atomically clears our local view of it and
219    * replaces it with the updated list.
220    * @return true if the local list of the other region servers was updated with the ZK data (even
221    *         if it was empty), false if the data was missing in ZK
222    */
223   private boolean refreshOtherRegionServersList() {
224     List<String> newRsList = getRegisteredRegionServers();
225     if (newRsList == null) {
226       return false;
227     } else {
228       synchronized (otherRegionServers) {
229         otherRegionServers.clear();
230         otherRegionServers.addAll(newRsList);
231       }
232     }
233     return true;
234   }
235 
236   /**
237    * Get a list of all the other region servers in this cluster and set a watch
238    * @return a list of server nanes
239    */
240   private List<String> getRegisteredRegionServers() {
241     List<String> result = null;
242     try {
243       result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.rsZNode);
244     } catch (KeeperException e) {
245       this.abortable.abort("Get list of registered region servers", e);
246     }
247     return result;
248   }
249 }