1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.atomic.AtomicBoolean;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.Abortable;
34 import org.apache.hadoop.hbase.ServerName;
35 import org.apache.hadoop.hbase.exceptions.DeserializationException;
36 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
38 import org.apache.hadoop.hbase.util.Bytes;
39 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
40 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
41 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
42 import org.apache.zookeeper.KeeperException;
43 import org.apache.zookeeper.KeeperException.NodeExistsException;
44
45 import com.google.protobuf.InvalidProtocolBufferException;
46
47
48
49
50
51
52 @InterfaceAudience.Private
53 public class ReplicationPeer implements Abortable, Closeable {
54 private static final Log LOG = LogFactory.getLog(ReplicationPeer.class);
55
56 private final String clusterKey;
57 private final String id;
58 private List<ServerName> regionServers = new ArrayList<ServerName>(0);
59 private final AtomicBoolean peerEnabled = new AtomicBoolean();
60 private volatile Map<String, List<String>> tableCFs = new HashMap<String, List<String>>();
61
62 private ZooKeeperWatcher zkw;
63 private final Configuration conf;
64 private long lastRegionserverUpdate;
65
66 private PeerStateTracker peerStateTracker;
67 private TableCFsTracker tableCFsTracker;
68
69
70
71
72
73
74
75
76 public ReplicationPeer(Configuration conf, String key, String id) throws ReplicationException {
77 this.conf = conf;
78 this.clusterKey = key;
79 this.id = id;
80 try {
81 this.reloadZkWatcher();
82 } catch (IOException e) {
83 throw new ReplicationException("Error connecting to peer cluster with peerId=" + id, e);
84 }
85 }
86
87
88
89
90
91
92
93
94 public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
95 throws KeeperException {
96 ensurePeerEnabled(zookeeper, peerStateNode);
97 this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
98 this.peerStateTracker.start();
99 try {
100 this.readPeerStateZnode();
101 } catch (DeserializationException e) {
102 throw ZKUtil.convert(e);
103 }
104 }
105
106 private void readPeerStateZnode() throws DeserializationException {
107 this.peerEnabled.set(isStateEnabled(this.peerStateTracker.getData(false)));
108 }
109
110
111
112
113
114
115
116
117 public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
118 throws KeeperException {
119 this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
120 this);
121 this.tableCFsTracker.start();
122 this.readTableCFsZnode();
123 }
124
125 static Map<String, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
126 if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
127 return null;
128 }
129
130 Map<String, List<String>> tableCFsMap = null;
131
132
133
134 String[] tables = tableCFsConfig.split(";");
135 for (String tab : tables) {
136
137 tab = tab.trim();
138 if (tab.length() == 0) {
139 continue;
140 }
141
142
143 String[] pair = tab.split(":");
144 String tabName = pair[0].trim();
145 if (pair.length > 2 || tabName.length() == 0) {
146 LOG.error("ignore invalid tableCFs setting: " + tab);
147 continue;
148 }
149
150
151 List<String> cfs = null;
152 if (pair.length == 2) {
153 String[] cfsList = pair[1].split(",");
154 for (String cf : cfsList) {
155 String cfName = cf.trim();
156 if (cfName.length() > 0) {
157 if (cfs == null) {
158 cfs = new ArrayList<String>();
159 }
160 cfs.add(cfName);
161 }
162 }
163 }
164
165
166 if (tableCFsMap == null) {
167 tableCFsMap = new HashMap<String, List<String>>();
168 }
169 tableCFsMap.put(tabName, cfs);
170 }
171
172 return tableCFsMap;
173 }
174
175 private void readTableCFsZnode() {
176 String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
177 this.tableCFs = parseTableCFsFromConfig(currentTableCFs);
178 }
179
180
181
182
183
184
185 public String getClusterKey() {
186 return clusterKey;
187 }
188
189
190
191
192
193 public AtomicBoolean getPeerEnabled() {
194 return peerEnabled;
195 }
196
197
198
199
200
201 public Map<String, List<String>> getTableCFs() {
202 return this.tableCFs;
203 }
204
205
206
207
208
209
210 public List<ServerName> getRegionServers() {
211 return regionServers;
212 }
213
214
215
216
217
218 public void setRegionServers(List<ServerName> regionServers) {
219 this.regionServers = regionServers;
220 lastRegionserverUpdate = System.currentTimeMillis();
221 }
222
223
224
225
226
227 public ZooKeeperWatcher getZkw() {
228 return zkw;
229 }
230
231
232
233
234
235
236 public long getLastRegionserverUpdate() {
237 return lastRegionserverUpdate;
238 }
239
240
241
242
243
244 public String getId() {
245 return id;
246 }
247
248
249
250
251
252 public Configuration getConfiguration() {
253 return conf;
254 }
255
256 @Override
257 public void abort(String why, Throwable e) {
258 LOG.fatal("The ReplicationPeer coresponding to peer " + clusterKey
259 + " was aborted for the following reason(s):" + why, e);
260 }
261
262
263
264
265
266 public void reloadZkWatcher() throws IOException {
267 if (zkw != null) zkw.close();
268 zkw = new ZooKeeperWatcher(conf,
269 "connection to cluster: " + id, this);
270 }
271
272 @Override
273 public boolean isAborted() {
274
275
276 return false;
277 }
278
279 @Override
280 public void close() throws IOException {
281 if (zkw != null){
282 zkw.close();
283 }
284 }
285
286
287
288
289
290
291
292 public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
293 ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
294 return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
295 }
296
297
298
299
300
301
302 private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
303 throws DeserializationException {
304 ProtobufUtil.expectPBMagicPrefix(bytes);
305 int pblen = ProtobufUtil.lengthOfPBMagic();
306 ZooKeeperProtos.ReplicationState.Builder builder =
307 ZooKeeperProtos.ReplicationState.newBuilder();
308 ZooKeeperProtos.ReplicationState state;
309 try {
310 state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
311 return state.getState();
312 } catch (InvalidProtocolBufferException e) {
313 throw new DeserializationException(e);
314 }
315 }
316
317
318
319
320
321
322
323
324
325 private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
326 throws NodeExistsException, KeeperException {
327 if (ZKUtil.checkExists(zookeeper, path) == -1) {
328
329
330
331 ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
332 ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
333 return true;
334 }
335 return false;
336 }
337
338
339
340
341 public class PeerStateTracker extends ZooKeeperNodeTracker {
342
343 public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
344 Abortable abortable) {
345 super(watcher, peerStateZNode, abortable);
346 }
347
348 @Override
349 public synchronized void nodeDataChanged(String path) {
350 if (path.equals(node)) {
351 super.nodeDataChanged(path);
352 try {
353 readPeerStateZnode();
354 } catch (DeserializationException e) {
355 LOG.warn("Failed deserializing the content of " + path, e);
356 }
357 }
358 }
359 }
360
361
362
363
364 public class TableCFsTracker extends ZooKeeperNodeTracker {
365
366 public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
367 Abortable abortable) {
368 super(watcher, tableCFsZNode, abortable);
369 }
370
371 @Override
372 public synchronized void nodeDataChanged(String path) {
373 if (path.equals(node)) {
374 super.nodeDataChanged(path);
375 readTableCFsZnode();
376 }
377 }
378 }
379 }