1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.CopyOnWriteArrayList;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Abortable;
29 import org.apache.hadoop.hbase.Stoppable;
30 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
31 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
32 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
33 import org.apache.zookeeper.KeeperException;
34
35
36
37
38
39
40 public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements ReplicationTracker {
41
42 private static final Log LOG = LogFactory.getLog(ReplicationTrackerZKImpl.class);
43
44 private final Stoppable stopper;
45
46 private final List<ReplicationListener> listeners =
47 new CopyOnWriteArrayList<ReplicationListener>();
48
49 private final ArrayList<String> otherRegionServers = new ArrayList<String>();
50 private final ReplicationPeers replicationPeers;
51
52 public ReplicationTrackerZKImpl(ZooKeeperWatcher zookeeper,
53 final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
54 Stoppable stopper) {
55 super(zookeeper, conf, abortable);
56 this.replicationPeers = replicationPeers;
57 this.stopper = stopper;
58 this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
59 this.zookeeper.registerListener(new PeersWatcher(this.zookeeper));
60 }
61
62 @Override
63 public void registerListener(ReplicationListener listener) {
64 listeners.add(listener);
65 }
66
67 @Override
68 public void removeListener(ReplicationListener listener) {
69 listeners.remove(listener);
70 }
71
72
73
74
75 @Override
76 public List<String> getListOfRegionServers() {
77 refreshOtherRegionServersList();
78
79 List<String> list = null;
80 synchronized (otherRegionServers) {
81 list = new ArrayList<String>(otherRegionServers);
82 }
83 return list;
84 }
85
86
87
88
89
90 public class OtherRegionServerWatcher extends ZooKeeperListener {
91
92
93
94
95 public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
96 super(watcher);
97 }
98
99
100
101
102
103 public void nodeCreated(String path) {
104 refreshListIfRightPath(path);
105 }
106
107
108
109
110
111 public void nodeDeleted(String path) {
112 if (stopper.isStopped()) {
113 return;
114 }
115 boolean cont = refreshListIfRightPath(path);
116 if (!cont) {
117 return;
118 }
119 LOG.info(path + " znode expired, triggering replicatorRemoved event");
120 for (ReplicationListener rl : listeners) {
121 rl.regionServerRemoved(getZNodeName(path));
122 }
123 }
124
125
126
127
128
129 public void nodeChildrenChanged(String path) {
130 if (stopper.isStopped()) {
131 return;
132 }
133 refreshListIfRightPath(path);
134 }
135
136 private boolean refreshListIfRightPath(String path) {
137 if (!path.startsWith(this.watcher.rsZNode)) {
138 return false;
139 }
140 return refreshOtherRegionServersList();
141 }
142 }
143
144
145
146
147 public class PeersWatcher extends ZooKeeperListener {
148
149
150
151
152 public PeersWatcher(ZooKeeperWatcher watcher) {
153 super(watcher);
154 }
155
156
157
158
159
160 public void nodeDeleted(String path) {
161 List<String> peers = refreshPeersList(path);
162 if (peers == null) {
163 return;
164 }
165 if (isPeerPath(path)) {
166 String id = getZNodeName(path);
167 LOG.info(path + " znode expired, triggering peerRemoved event");
168 for (ReplicationListener rl : listeners) {
169 rl.peerRemoved(id);
170 }
171 }
172 }
173
174
175
176
177
178 public void nodeChildrenChanged(String path) {
179 List<String> peers = refreshPeersList(path);
180 if (peers == null) {
181 return;
182 }
183 LOG.info(path + " znode expired, triggering peerListChanged event");
184 for (ReplicationListener rl : listeners) {
185 rl.peerListChanged(peers);
186 }
187 }
188 }
189
190
191
192
193
194
195
196 private List<String> refreshPeersList(String path) {
197 if (!path.startsWith(getPeersZNode())) {
198 return null;
199 }
200 return this.replicationPeers.getAllPeerIds();
201 }
202
203 private String getPeersZNode() {
204 return this.peersZNode;
205 }
206
207
208
209
210
211
212 private String getZNodeName(String fullPath) {
213 String[] parts = fullPath.split("/");
214 return parts.length > 0 ? parts[parts.length - 1] : "";
215 }
216
217
218
219
220
221
222
223 private boolean refreshOtherRegionServersList() {
224 List<String> newRsList = getRegisteredRegionServers();
225 if (newRsList == null) {
226 return false;
227 } else {
228 synchronized (otherRegionServers) {
229 otherRegionServers.clear();
230 otherRegionServers.addAll(newRsList);
231 }
232 }
233 return true;
234 }
235
236
237
238
239
240 private List<String> getRegisteredRegionServers() {
241 List<String> result = null;
242 try {
243 result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.rsZNode);
244 } catch (KeeperException e) {
245 this.abortable.abort("Get list of registered region servers", e);
246 }
247 return result;
248 }
249 }