1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.NavigableMap;
24 import java.util.TreeMap;
25 import java.util.UUID;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.TimeUnit;
29
30 import com.google.common.util.concurrent.ThreadFactoryBuilder;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.classification.InterfaceAudience;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.hbase.CellScanner;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HRegionInfo;
40 import org.apache.hadoop.hbase.HTableDescriptor;
41 import org.apache.hadoop.hbase.KeyValue;
42 import org.apache.hadoop.hbase.Server;
43 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
44 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
45 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
46 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
47 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
48 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
49 import org.apache.hadoop.hbase.replication.ReplicationException;
50 import org.apache.hadoop.hbase.replication.ReplicationFactory;
51 import org.apache.hadoop.hbase.replication.ReplicationPeers;
52 import org.apache.hadoop.hbase.replication.ReplicationQueues;
53 import org.apache.hadoop.hbase.replication.ReplicationTracker;
54 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
55 import org.apache.hadoop.hbase.util.Bytes;
56 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
57 import org.apache.zookeeper.KeeperException;
58
59 import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
60 import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
61 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
62
63
64
65
66 @InterfaceAudience.Private
67 public class Replication implements WALActionsListener,
68 ReplicationSourceService, ReplicationSinkService {
69 private static final Log LOG =
70 LogFactory.getLog(Replication.class);
71 private boolean replication;
72 private ReplicationSourceManager replicationManager;
73 private ReplicationQueues replicationQueues;
74 private ReplicationPeers replicationPeers;
75 private ReplicationTracker replicationTracker;
76 private Configuration conf;
77 private ReplicationSink replicationSink;
78
79 private Server server;
80
81 private ScheduledExecutorService scheduleThreadPool;
82 private int statsThreadPeriod;
83
84
85
86
87
88
89
90
91
92 public Replication(final Server server, final FileSystem fs,
93 final Path logDir, final Path oldLogDir) throws IOException{
94 initialize(server, fs, logDir, oldLogDir);
95 }
96
97
98
99
100 public Replication() {
101 }
102
103 public void initialize(final Server server, final FileSystem fs,
104 final Path logDir, final Path oldLogDir) throws IOException {
105 this.server = server;
106 this.conf = this.server.getConfiguration();
107 this.replication = isReplication(this.conf);
108 this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
109 new ThreadFactoryBuilder()
110 .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
111 .setDaemon(true)
112 .build());
113 if (replication) {
114 try {
115 this.replicationQueues =
116 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server);
117 this.replicationQueues.init(this.server.getServerName().toString());
118 this.replicationPeers =
119 ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
120 this.replicationPeers.init();
121 this.replicationTracker =
122 ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
123 this.conf, this.server, this.server);
124 } catch (ReplicationException e) {
125 throw new IOException("Failed replication handler create", e);
126 }
127 UUID clusterId = null;
128 try {
129 clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
130 } catch (KeeperException ke) {
131 throw new IOException("Could not read cluster id", ke);
132 }
133 this.replicationManager =
134 new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
135 conf, this.server, fs, logDir, oldLogDir, clusterId);
136 this.statsThreadPeriod =
137 this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
138 LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
139 } else {
140 this.replicationManager = null;
141 this.replicationQueues = null;
142 this.replicationPeers = null;
143 this.replicationTracker = null;
144 }
145 }
146
147
148
149
150
151 public static boolean isReplication(final Configuration c) {
152 return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
153 }
154
155
156
157
158 public WALActionsListener getWALActionsListener() {
159 return this;
160 }
161
162
163
164 public void stopReplicationService() {
165 join();
166 }
167
168
169
170
171 public void join() {
172 if (this.replication) {
173 this.replicationManager.join();
174 if (this.replicationSink != null) {
175 this.replicationSink.stopReplicationSinkServices();
176 }
177 }
178 scheduleThreadPool.shutdown();
179 }
180
181
182
183
184
185
186
187
188
189 public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException {
190 if (this.replication) {
191 this.replicationSink.replicateEntries(entries, cells);
192 }
193 }
194
195
196
197
198
199
200 public void startReplicationService() throws IOException {
201 if (this.replication) {
202 try {
203 this.replicationManager.init();
204 } catch (ReplicationException e) {
205 throw new IOException(e);
206 }
207 this.replicationSink = new ReplicationSink(this.conf, this.server);
208 this.scheduleThreadPool.scheduleAtFixedRate(
209 new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
210 statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
211 }
212 }
213
214
215
216
217
218 public ReplicationSourceManager getReplicationManager() {
219 return this.replicationManager;
220 }
221
222 @Override
223 public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
224 WALEdit logEdit) {
225
226 }
227
228 @Override
229 public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
230 WALEdit logEdit) {
231 scopeWALEdits(htd, logKey, logEdit);
232 }
233
234
235
236
237
238
239
240
241 public static void scopeWALEdits(HTableDescriptor htd, HLogKey logKey,
242 WALEdit logEdit) {
243 NavigableMap<byte[], Integer> scopes =
244 new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
245 byte[] family;
246 for (KeyValue kv : logEdit.getKeyValues()) {
247 family = kv.getFamily();
248
249 if (kv.matchingFamily(WALEdit.METAFAMILY)) continue;
250
251 assert htd.getFamily(family) != null;
252
253 int scope = htd.getFamily(family).getScope();
254 if (scope != REPLICATION_SCOPE_LOCAL &&
255 !scopes.containsKey(family)) {
256 scopes.put(family, scope);
257 }
258 }
259 if (!scopes.isEmpty()) {
260 logKey.setScopes(scopes);
261 }
262 }
263
264 @Override
265 public void preLogRoll(Path oldPath, Path newPath) throws IOException {
266 getReplicationManager().preLogRoll(newPath);
267 }
268
269 @Override
270 public void postLogRoll(Path oldPath, Path newPath) throws IOException {
271 getReplicationManager().postLogRoll(newPath);
272 }
273
274 @Override
275 public void preLogArchive(Path oldPath, Path newPath) throws IOException {
276
277 }
278
279 @Override
280 public void postLogArchive(Path oldPath, Path newPath) throws IOException {
281
282 }
283
284
285
286
287
288
289 public static void decorateMasterConfiguration(Configuration conf) {
290 if (!isReplication(conf)) {
291 return;
292 }
293 String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
294 String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
295 if (!plugins.contains(cleanerClass)) {
296 conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
297 }
298 }
299
300 @Override
301 public void logRollRequested() {
302
303 }
304
305 @Override
306 public void logCloseRequested() {
307
308 }
309
310
311
312
313 static class ReplicationStatisticsThread extends Thread {
314
315 private final ReplicationSink replicationSink;
316 private final ReplicationSourceManager replicationManager;
317
318 public ReplicationStatisticsThread(final ReplicationSink replicationSink,
319 final ReplicationSourceManager replicationManager) {
320 super("ReplicationStatisticsThread");
321 this.replicationManager = replicationManager;
322 this.replicationSink = replicationSink;
323 }
324
325 @Override
326 public void run() {
327 printStats(this.replicationManager.getStats());
328 printStats(this.replicationSink.getStats());
329 }
330
331 private void printStats(String stats) {
332 if (!stats.isEmpty()) {
333 LOG.info(stats);
334 }
335 }
336 }
337 }