1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.master;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.classification.InterfaceAudience;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.FileStatus;
26 import org.apache.hadoop.hbase.Abortable;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.client.HConnectionManager;
29 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
30 import org.apache.hadoop.hbase.replication.ReplicationException;
31 import org.apache.hadoop.hbase.replication.ReplicationFactory;
32 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
33 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
34 import org.apache.zookeeper.KeeperException;
35
36 import java.io.IOException;
37 import java.util.List;
38 import java.util.Set;
39
40 import com.google.common.base.Predicate;
41 import com.google.common.collect.ImmutableSet;
42 import com.google.common.collect.Iterables;
43 import com.google.common.collect.Sets;
44
45
46
47
48
49 @InterfaceAudience.Private
50 public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable {
51 private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
52 private ZooKeeperWatcher zkw;
53 private ReplicationQueuesClient replicationQueues;
54 private boolean stopped = false;
55 private boolean aborted;
56
57
58 @Override
59 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
60
61
62 if (this.getConf() == null) {
63 return files;
64 }
65
66 final Set<String> hlogs = loadHLogsFromQueues();
67 return Iterables.filter(files, new Predicate<FileStatus>() {
68 @Override
69 public boolean apply(FileStatus file) {
70 String hlog = file.getPath().getName();
71 boolean logInReplicationQueue = hlogs.contains(hlog);
72 if (LOG.isDebugEnabled()) {
73 if (logInReplicationQueue) {
74 LOG.debug("Found log in ZK, keeping: " + hlog);
75 } else {
76 LOG.debug("Didn't find this log in ZK, deleting: " + hlog);
77 }
78 }
79 return !logInReplicationQueue;
80 }});
81 }
82
83
84
85
86 private Set<String> loadHLogsFromQueues() {
87 List<String> rss = replicationQueues.getListOfReplicators();
88 if (rss == null) {
89 LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
90 return ImmutableSet.of();
91 }
92 Set<String> hlogs = Sets.newHashSet();
93 for (String rs: rss) {
94 List<String> listOfPeers = replicationQueues.getAllQueues(rs);
95
96 if (listOfPeers == null) {
97 continue;
98 }
99 for (String id : listOfPeers) {
100 List<String> peersHlogs = replicationQueues.getLogsInQueue(rs, id);
101 if (peersHlogs != null) {
102 hlogs.addAll(peersHlogs);
103 }
104 }
105 }
106 return hlogs;
107 }
108
109 @Override
110 public void setConf(Configuration config) {
111
112 if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
113 HConstants.REPLICATION_ENABLE_DEFAULT)) {
114 LOG.warn("Not configured - allowing all hlogs to be deleted");
115 return;
116 }
117
118
119 Configuration conf = new Configuration(config);
120 super.setConf(conf);
121 try {
122 this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
123 this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this);
124 this.replicationQueues.init();
125 } catch (ReplicationException e) {
126 LOG.error("Error while configuring " + this.getClass().getName(), e);
127 } catch (IOException e) {
128 LOG.error("Error while configuring " + this.getClass().getName(), e);
129 }
130 }
131
132 @Override
133 public void stop(String why) {
134 if (this.stopped) return;
135 this.stopped = true;
136 if (this.zkw != null) {
137 LOG.info("Stopping " + this.zkw);
138 this.zkw.close();
139 }
140
141 HConnectionManager.deleteConnection(this.getConf());
142 }
143
144 @Override
145 public boolean isStopped() {
146 return this.stopped;
147 }
148
149 @Override
150 public void abort(String why, Throwable e) {
151 LOG.warn("Aborting ReplicationLogCleaner because " + why, e);
152 this.aborted = true;
153 stop(why);
154 }
155
156 @Override
157 public boolean isAborted() {
158 return this.aborted;
159 }
160 }