1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client.replication;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.classification.InterfaceStability;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.Abortable;
34 import org.apache.hadoop.hbase.HColumnDescriptor;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.client.HConnection;
38 import org.apache.hadoop.hbase.client.HConnectionManager;
39 import org.apache.hadoop.hbase.replication.ReplicationException;
40 import org.apache.hadoop.hbase.replication.ReplicationFactory;
41 import org.apache.hadoop.hbase.replication.ReplicationPeers;
42 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
43 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 @InterfaceAudience.Public
69 @InterfaceStability.Evolving
70 public class ReplicationAdmin implements Closeable {
71 private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
72
73 public static final String TNAME = "tableName";
74 public static final String CFNAME = "columnFamlyName";
75
76
77
78 public static final String REPLICATIONTYPE = "replicationType";
79 public static final String REPLICATIONGLOBAL = Integer
80 .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
81
82 private final HConnection connection;
83 private final ReplicationQueuesClient replicationQueuesClient;
84 private final ReplicationPeers replicationPeers;
85
86
87
88
89
90
91
92 public ReplicationAdmin(Configuration conf) throws IOException {
93 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
94 HConstants.REPLICATION_ENABLE_DEFAULT)) {
95 throw new RuntimeException("hbase.replication isn't true, please " +
96 "enable it in order to use replication");
97 }
98 this.connection = HConnectionManager.getConnection(conf);
99 ZooKeeperWatcher zkw = createZooKeeperWatcher();
100 try {
101 this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
102 this.replicationPeers.init();
103 this.replicationQueuesClient =
104 ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
105 this.replicationQueuesClient.init();
106
107 } catch (ReplicationException e) {
108 throw new IOException("Error initializing the replication admin client.", e);
109 }
110 }
111
112 private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
113 return new ZooKeeperWatcher(connection.getConfiguration(),
114 "Replication Admin", new Abortable() {
115 @Override
116 public void abort(String why, Throwable e) {
117 LOG.error(why, e);
118 System.exit(1);
119 }
120
121 @Override
122 public boolean isAborted() {
123 return false;
124 }
125
126 });
127 }
128
129
130
131
132
133
134
135
136
137
138 public void addPeer(String id, String clusterKey) throws ReplicationException {
139 this.replicationPeers.addPeer(id, clusterKey);
140 }
141
142 public void addPeer(String id, String clusterKey, String tableCFs)
143 throws ReplicationException {
144 this.replicationPeers.addPeer(id, clusterKey, tableCFs);
145 }
146
147
148
149
150
151 public void removePeer(String id) throws ReplicationException {
152 this.replicationPeers.removePeer(id);
153 }
154
155
156
157
158
159 public void enablePeer(String id) throws ReplicationException {
160 this.replicationPeers.enablePeer(id);
161 }
162
163
164
165
166
167 public void disablePeer(String id) throws ReplicationException {
168 this.replicationPeers.disablePeer(id);
169 }
170
171
172
173
174
175 public int getPeersCount() {
176 return this.replicationPeers.getAllPeerIds().size();
177 }
178
179
180
181
182
183 public Map<String, String> listPeers() {
184 return this.replicationPeers.getAllPeerClusterKeys();
185 }
186
187
188
189
190
191 public String getPeerTableCFs(String id) throws ReplicationException {
192 return this.replicationPeers.getPeerTableCFsConfig(id);
193 }
194
195
196
197
198
199 public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
200 this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
201 }
202
203
204
205
206
207
208
209 public boolean getPeerState(String id) throws ReplicationException {
210 return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
211 }
212
213 @Override
214 public void close() throws IOException {
215 if (this.connection != null) {
216 this.connection.close();
217 }
218 }
219
220
221
222
223
224
225
226
227
228
229
230
231
232 public List<HashMap<String, String>> listReplicated() throws IOException {
233 List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
234 HTableDescriptor[] tables = this.connection.listTables();
235
236 for (HTableDescriptor table : tables) {
237 HColumnDescriptor[] columns = table.getColumnFamilies();
238 String tableName = table.getNameAsString();
239 for (HColumnDescriptor column : columns) {
240 if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
241
242 HashMap<String, String> replicationEntry = new HashMap<String, String>();
243 replicationEntry.put(TNAME, tableName);
244 replicationEntry.put(CFNAME, column.getNameAsString());
245 replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
246 replicationColFams.add(replicationEntry);
247 }
248 }
249 }
250
251 return replicationColFams;
252 }
253 }