1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication.regionserver;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Random;
29 import java.util.SortedMap;
30 import java.util.SortedSet;
31 import java.util.TreeSet;
32 import java.util.UUID;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.CopyOnWriteArrayList;
35 import java.util.concurrent.LinkedBlockingQueue;
36 import java.util.concurrent.RejectedExecutionException;
37 import java.util.concurrent.ThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.classification.InterfaceAudience;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.hbase.Stoppable;
47 import org.apache.hadoop.hbase.replication.ReplicationException;
48 import org.apache.hadoop.hbase.replication.ReplicationListener;
49 import org.apache.hadoop.hbase.replication.ReplicationPeers;
50 import org.apache.hadoop.hbase.replication.ReplicationQueues;
51 import org.apache.hadoop.hbase.replication.ReplicationTracker;
52 import org.apache.zookeeper.KeeperException;
53
54 import com.google.common.util.concurrent.ThreadFactoryBuilder;
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 @InterfaceAudience.Private
71 public class ReplicationSourceManager implements ReplicationListener {
72 private static final Log LOG =
73 LogFactory.getLog(ReplicationSourceManager.class);
74
75 private final List<ReplicationSourceInterface> sources;
76
77 private final List<ReplicationSourceInterface> oldsources;
78 private final ReplicationQueues replicationQueues;
79 private final ReplicationTracker replicationTracker;
80 private final ReplicationPeers replicationPeers;
81
82 private final UUID clusterId;
83
84 private final Stoppable stopper;
85
86 private final Map<String, SortedSet<String>> hlogsById;
87
88 private final Map<String, SortedSet<String>> hlogsByIdRecoveredQueues;
89 private final Configuration conf;
90 private final FileSystem fs;
91
92 private Path latestPath;
93
94 private final Path logDir;
95
96 private final Path oldLogDir;
97
98 private final long sleepBeforeFailover;
99
100 private final ThreadPoolExecutor executor;
101
102 private final Random rand;
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117 public ReplicationSourceManager(final ReplicationQueues replicationQueues,
118 final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
119 final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
120 final Path oldLogDir, final UUID clusterId) {
121
122
123 this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
124 this.replicationQueues = replicationQueues;
125 this.replicationPeers = replicationPeers;
126 this.replicationTracker = replicationTracker;
127 this.stopper = stopper;
128 this.hlogsById = new HashMap<String, SortedSet<String>>();
129 this.hlogsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
130 this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
131 this.conf = conf;
132 this.fs = fs;
133 this.logDir = logDir;
134 this.oldLogDir = oldLogDir;
135 this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
136 this.clusterId = clusterId;
137 this.replicationTracker.registerListener(this);
138 this.replicationPeers.getAllPeerIds();
139
140
141 int nbWorkers = conf.getInt("replication.executor.workers", 1);
142
143
144 this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
145 100, TimeUnit.MILLISECONDS,
146 new LinkedBlockingQueue<Runnable>());
147 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
148 tfb.setNameFormat("ReplicationExecutor-%d");
149 this.executor.setThreadFactory(tfb.build());
150 this.rand = new Random();
151 }
152
153
154
155
156
157
158
159
160
161
162
163
164 public void logPositionAndCleanOldLogs(Path log, String id, long position,
165 boolean queueRecovered, boolean holdLogInZK) {
166 String fileName = log.getName();
167 this.replicationQueues.setLogPosition(id, fileName, position);
168 if (holdLogInZK) {
169 return;
170 }
171 cleanOldLogs(fileName, id, queueRecovered);
172 }
173
174
175
176
177
178
179
180
181 public void cleanOldLogs(String key, String id, boolean queueRecovered) {
182 if (queueRecovered) {
183 SortedSet<String> hlogs = hlogsByIdRecoveredQueues.get(id);
184 if (hlogs != null && !hlogs.first().equals(key)) {
185 cleanOldLogs(hlogs, key, id);
186 }
187 } else {
188 synchronized (this.hlogsById) {
189 SortedSet<String> hlogs = hlogsById.get(id);
190 if (!hlogs.first().equals(key)) {
191 cleanOldLogs(hlogs, key, id);
192 }
193 }
194 }
195 }
196
197 private void cleanOldLogs(SortedSet<String> hlogs, String key, String id) {
198 SortedSet<String> hlogSet = hlogs.headSet(key);
199 LOG.debug("Removing " + hlogSet.size() + " logs in the list: " + hlogSet);
200 for (String hlog : hlogSet) {
201 this.replicationQueues.removeLog(id, hlog);
202 }
203 hlogSet.clear();
204 }
205
206
207
208
209
210 protected void init() throws IOException, ReplicationException {
211 for (String id : this.replicationPeers.getConnectedPeers()) {
212 addSource(id);
213 }
214 List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
215 if (currentReplicators == null || currentReplicators.size() == 0) {
216 return;
217 }
218 List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
219 LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
220 + otherRegionServers);
221
222
223 for (String rs : currentReplicators) {
224 if (!otherRegionServers.contains(rs)) {
225 transferQueues(rs);
226 }
227 }
228 }
229
230
231
232
233
234
235
236 protected ReplicationSourceInterface addSource(String id) throws IOException,
237 ReplicationException {
238 ReplicationSourceInterface src =
239 getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
240 this.replicationPeers, stopper, id, this.clusterId);
241 synchronized (this.hlogsById) {
242 this.sources.add(src);
243 this.hlogsById.put(id, new TreeSet<String>());
244
245 if (this.latestPath != null) {
246 String name = this.latestPath.getName();
247 this.hlogsById.get(id).add(name);
248 try {
249 this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
250 } catch (ReplicationException e) {
251 String message =
252 "Cannot add log to queue when creating a new source, queueId="
253 + src.getPeerClusterZnode() + ", filename=" + name;
254 stopper.stop(message);
255 throw e;
256 }
257 src.enqueueLog(this.latestPath);
258 }
259 }
260 src.startup();
261 return src;
262 }
263
264
265
266
267
268 public void deleteSource(String peerId, boolean closeConnection) {
269 this.replicationQueues.removeQueue(peerId);
270 if (closeConnection) {
271 this.replicationPeers.disconnectFromPeer(peerId);
272 }
273 }
274
275
276
277
278 public void join() {
279 this.executor.shutdown();
280 if (this.sources.size() == 0) {
281 this.replicationQueues.removeAllQueues();
282 }
283 for (ReplicationSourceInterface source : this.sources) {
284 source.terminate("Region server is closing");
285 }
286 }
287
288
289
290
291
292 protected Map<String, SortedSet<String>> getHLogs() {
293 return Collections.unmodifiableMap(hlogsById);
294 }
295
296
297
298
299
300 protected Map<String, SortedSet<String>> getHlogsByIdRecoveredQueues() {
301 return Collections.unmodifiableMap(hlogsByIdRecoveredQueues);
302 }
303
304
305
306
307
308 public List<ReplicationSourceInterface> getSources() {
309 return this.sources;
310 }
311
312
313
314
315
316 public List<ReplicationSourceInterface> getOldSources() {
317 return this.oldsources;
318 }
319
320 void preLogRoll(Path newLog) throws IOException {
321 synchronized (this.hlogsById) {
322 String name = newLog.getName();
323 for (ReplicationSourceInterface source : this.sources) {
324 try {
325 this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
326 } catch (ReplicationException e) {
327 throw new IOException("Cannot add log to replication queue with id="
328 + source.getPeerClusterZnode() + ", filename=" + name, e);
329 }
330 }
331 for (SortedSet<String> hlogs : this.hlogsById.values()) {
332 if (this.sources.isEmpty()) {
333
334
335 hlogs.clear();
336 }
337 hlogs.add(name);
338 }
339 }
340
341 this.latestPath = newLog;
342 }
343
344 void postLogRoll(Path newLog) throws IOException {
345
346 for (ReplicationSourceInterface source : this.sources) {
347 source.enqueueLog(newLog);
348 }
349 }
350
351
352
353
354
355
356
357
358
359
360
361 protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
362 final FileSystem fs, final ReplicationSourceManager manager,
363 final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
364 final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException {
365 ReplicationSourceInterface src;
366 try {
367 @SuppressWarnings("rawtypes")
368 Class c = Class.forName(conf.get("replication.replicationsource.implementation",
369 ReplicationSource.class.getCanonicalName()));
370 src = (ReplicationSourceInterface) c.newInstance();
371 } catch (Exception e) {
372 LOG.warn("Passed replication source implementation throws errors, " +
373 "defaulting to ReplicationSource", e);
374 src = new ReplicationSource();
375
376 }
377 src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId);
378 return src;
379 }
380
381
382
383
384
385
386
387
388
389 private void transferQueues(String rsZnode) {
390 NodeFailoverWorker transfer =
391 new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
392 this.clusterId);
393 try {
394 this.executor.execute(transfer);
395 } catch (RejectedExecutionException ex) {
396 LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
397 }
398 }
399
400
401
402
403
404 public void closeRecoveredQueue(ReplicationSourceInterface src) {
405 LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
406 this.oldsources.remove(src);
407 deleteSource(src.getPeerClusterZnode(), false);
408 this.hlogsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
409 }
410
411
412
413
414
415
416 public void removePeer(String id) {
417 LOG.info("Closing the following queue " + id + ", currently have "
418 + sources.size() + " and another "
419 + oldsources.size() + " that were recovered");
420 String terminateMessage = "Replication stream was removed by a user";
421 ReplicationSourceInterface srcToRemove = null;
422 List<ReplicationSourceInterface> oldSourcesToDelete =
423 new ArrayList<ReplicationSourceInterface>();
424
425 for (ReplicationSourceInterface src : oldsources) {
426 if (id.equals(src.getPeerClusterId())) {
427 oldSourcesToDelete.add(src);
428 }
429 }
430 for (ReplicationSourceInterface src : oldSourcesToDelete) {
431 src.terminate(terminateMessage);
432 closeRecoveredQueue((src));
433 }
434 LOG.info("Number of deleted recovered sources for " + id + ": "
435 + oldSourcesToDelete.size());
436
437 for (ReplicationSourceInterface src : this.sources) {
438 if (id.equals(src.getPeerClusterId())) {
439 srcToRemove = src;
440 break;
441 }
442 }
443 if (srcToRemove == null) {
444 LOG.error("The queue we wanted to close is missing " + id);
445 return;
446 }
447 srcToRemove.terminate(terminateMessage);
448 this.sources.remove(srcToRemove);
449 deleteSource(id, true);
450 }
451
452 @Override
453 public void regionServerRemoved(String regionserver) {
454 transferQueues(regionserver);
455 }
456
457 @Override
458 public void peerRemoved(String peerId) {
459 removePeer(peerId);
460 }
461
462 @Override
463 public void peerListChanged(List<String> peerIds) {
464 for (String id : peerIds) {
465 try {
466 boolean added = this.replicationPeers.connectToPeer(id);
467 if (added) {
468 addSource(id);
469 }
470 } catch (Exception e) {
471 LOG.error("Error while adding a new peer", e);
472 }
473 }
474 }
475
476
477
478
479
480 class NodeFailoverWorker extends Thread {
481
482 private String rsZnode;
483 private final ReplicationQueues rq;
484 private final ReplicationPeers rp;
485 private final UUID clusterId;
486
487
488
489
490
491 public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
492 final ReplicationPeers replicationPeers, final UUID clusterId) {
493 super("Failover-for-"+rsZnode);
494 this.rsZnode = rsZnode;
495 this.rq = replicationQueues;
496 this.rp = replicationPeers;
497 this.clusterId = clusterId;
498 }
499
500 @Override
501 public void run() {
502 if (this.rq.isThisOurZnode(rsZnode)) {
503 return;
504 }
505
506
507 try {
508 Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
509 } catch (InterruptedException e) {
510 LOG.warn("Interrupted while waiting before transferring a queue.");
511 Thread.currentThread().interrupt();
512 }
513
514 if (stopper.isStopped()) {
515 LOG.info("Not transferring queue since we are shutting down");
516 return;
517 }
518 SortedMap<String, SortedSet<String>> newQueues = null;
519
520 newQueues = this.rq.claimQueues(rsZnode);
521
522
523 if (newQueues.isEmpty()) {
524
525
526 return;
527 }
528
529 for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
530 String peerId = entry.getKey();
531 try {
532 ReplicationSourceInterface src =
533 getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
534 stopper, peerId, this.clusterId);
535 if (!this.rp.getConnectedPeers().contains((src.getPeerClusterId()))) {
536 src.terminate("Recovered queue doesn't belong to any current peer");
537 break;
538 }
539 oldsources.add(src);
540 SortedSet<String> hlogsSet = entry.getValue();
541 for (String hlog : hlogsSet) {
542 src.enqueueLog(new Path(oldLogDir, hlog));
543 }
544 src.startup();
545 hlogsByIdRecoveredQueues.put(peerId, hlogsSet);
546 } catch (IOException e) {
547
548 LOG.error("Failed creating a source", e);
549 }
550 }
551 }
552 }
553
554
555
556
557
558 public Path getOldLogDir() {
559 return this.oldLogDir;
560 }
561
562
563
564
565
566 public Path getLogDir() {
567 return this.logDir;
568 }
569
570
571
572
573
574 public FileSystem getFs() {
575 return this.fs;
576 }
577
578
579
580
581 public String getStats() {
582 StringBuffer stats = new StringBuffer();
583 for (ReplicationSourceInterface source : sources) {
584 stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
585 stats.append(source.getStats() + "\n");
586 }
587 for (ReplicationSourceInterface oldSource : oldsources) {
588 stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId() + ": ");
589 stats.append(oldSource.getStats()+ "\n");
590 }
591 return stats.toString();
592 }
593 }