1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.IOException;
22 import java.net.ConnectException;
23 import java.net.SocketTimeoutException;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.CompletionService;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorCompletionService;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.ThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35
36 import com.google.common.annotations.VisibleForTesting;
37 import org.apache.commons.lang.StringUtils;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.hbase.HBaseConfiguration;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.TableNotFoundException;
45 import org.apache.hadoop.hbase.client.HConnection;
46 import org.apache.hadoop.hbase.client.HConnectionManager;
47 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
48 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.wal.WAL.Entry;
51 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
52 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
53 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
54 import org.apache.hadoop.ipc.RemoteException;
55
56
57
58
59
60
61
62
63
64
65
66
67 @InterfaceAudience.Private
68 public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
69
70 private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
71 private HConnection conn;
72
73 private Configuration conf;
74
75
76 private long sleepForRetries;
77
78
79 private int maxRetriesMultiplier;
80
81 private int socketTimeoutMultiplier;
82
83 private MetricsSource metrics;
84
85 private ReplicationSinkManager replicationSinkMgr;
86 private boolean peersSelected = false;
87 private ThreadPoolExecutor exec;
88 private int maxThreads;
89
90 @Override
91 public void init(Context context) throws IOException {
92 super.init(context);
93 this.conf = HBaseConfiguration.create(ctx.getConfiguration());
94 decorateConf();
95 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
96 this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
97 maxRetriesMultiplier);
98
99
100
101 this.conn = HConnectionManager.createConnection(this.conf);
102 this.sleepForRetries =
103 this.conf.getLong("replication.source.sleepforretries", 1000);
104 this.metrics = context.getMetrics();
105
106 this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
107
108 this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
109 HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
110
111 this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS,
112 new LinkedBlockingQueue<Runnable>());
113 this.exec.allowCoreThreadTimeOut(true);
114 }
115
116 private void decorateConf() {
117 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
118 if (StringUtils.isNotEmpty(replicationCodec)) {
119 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
120 }
121 }
122
123 private void connectToPeers() {
124 getRegionServers();
125
126 int sleepMultiplier = 1;
127
128
129 while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
130 replicationSinkMgr.chooseSinks();
131 if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
132 if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
133 sleepMultiplier++;
134 }
135 }
136 }
137 }
138
139
140
141
142
143
144
145 protected boolean sleepForRetries(String msg, int sleepMultiplier) {
146 try {
147 if (LOG.isTraceEnabled()) {
148 LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
149 }
150 Thread.sleep(this.sleepForRetries * sleepMultiplier);
151 } catch (InterruptedException e) {
152 LOG.debug("Interrupted while sleeping between retries");
153 }
154 return sleepMultiplier < maxRetriesMultiplier;
155 }
156
157
158
159
160 @Override
161 public boolean replicate(ReplicateContext replicateContext) {
162 CompletionService<Integer> pool = new ExecutorCompletionService<Integer>(this.exec);
163 List<Entry> entries = replicateContext.getEntries();
164 String walGroupId = replicateContext.getWalGroupId();
165 int sleepMultiplier = 1;
166 int numReplicated = 0;
167
168 if (!peersSelected && this.isRunning()) {
169 connectToPeers();
170 peersSelected = true;
171 }
172
173 int numSinks = replicationSinkMgr.getNumSinks();
174 if (numSinks == 0) {
175 LOG.warn("No replication sinks found, returning without replicating. The source should retry"
176 + " with the same set of edits.");
177 return false;
178 }
179
180
181
182 int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
183
184 List<List<Entry>> entryLists = new ArrayList<List<Entry>>(n);
185 if (n == 1) {
186 entryLists.add(entries);
187 } else {
188 for (int i=0; i<n; i++) {
189 entryLists.add(new ArrayList<Entry>(entries.size()/n+1));
190 }
191
192 for (Entry e : entries) {
193 entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
194 }
195 }
196 while (this.isRunning()) {
197 if (!isPeerEnabled()) {
198 if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
199 sleepMultiplier++;
200 }
201 continue;
202 }
203 try {
204 if (LOG.isTraceEnabled()) {
205 LOG.trace("Replicating " + entries.size() +
206 " entries of total size " + replicateContext.getSize());
207 }
208
209 int futures = 0;
210 for (int i=0; i<entryLists.size(); i++) {
211 if (!entryLists.get(i).isEmpty()) {
212 if (LOG.isTraceEnabled()) {
213 LOG.trace("Submitting " + entryLists.get(i).size() +
214 " entries of total size " + replicateContext.getSize());
215 }
216
217 pool.submit(createReplicator(entryLists.get(i), i));
218 futures++;
219 }
220 }
221 IOException iox = null;
222
223 for (int i=0; i<futures; i++) {
224 try {
225
226
227 Future<Integer> f = pool.take();
228 int index = f.get().intValue();
229 int batchSize = entryLists.get(index).size();
230 entryLists.set(index, Collections.<Entry>emptyList());
231
232 numReplicated += batchSize;
233 } catch (InterruptedException ie) {
234 iox = new IOException(ie);
235 } catch (ExecutionException ee) {
236
237 iox = (IOException)ee.getCause();
238 }
239 }
240 if (iox != null) {
241
242 throw iox;
243 }
244 if (numReplicated != entries.size()) {
245
246 LOG.warn("The number of edits replicated is different from the number received,"
247 + " failing for now.");
248 return false;
249 }
250
251 this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
252 walGroupId);
253 return true;
254
255 } catch (IOException ioe) {
256
257 this.metrics.refreshAgeOfLastShippedOp(walGroupId);
258 if (ioe instanceof RemoteException) {
259 ioe = ((RemoteException) ioe).unwrapRemoteException();
260 LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
261 if (ioe instanceof TableNotFoundException) {
262 if (sleepForRetries("A table is missing in the peer cluster. "
263 + "Replication cannot proceed without losing data.", sleepMultiplier)) {
264 sleepMultiplier++;
265 }
266 }
267 } else {
268 if (ioe instanceof SocketTimeoutException) {
269
270
271
272 sleepForRetries("Encountered a SocketTimeoutException. Since the " +
273 "call to the remote cluster timed out, which is usually " +
274 "caused by a machine failure or a massive slowdown",
275 this.socketTimeoutMultiplier);
276 } else if (ioe instanceof ConnectException) {
277 LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
278 replicationSinkMgr.chooseSinks();
279 } else {
280 LOG.warn("Can't replicate because of a local or network error: ", ioe);
281 }
282 }
283 if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
284 sleepMultiplier++;
285 }
286 }
287 }
288 return false;
289 }
290
291 protected boolean isPeerEnabled() {
292 return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED;
293 }
294
295 @Override
296 protected void doStop() {
297 disconnect();
298 if (this.conn != null) {
299 try {
300 this.conn.close();
301 this.conn = null;
302 } catch (IOException e) {
303 LOG.warn("Failed to close the connection");
304 }
305 }
306 exec.shutdownNow();
307 notifyStopped();
308 }
309
310
311 @Override
312 public State stopAndWait() {
313 doStop();
314 return super.stopAndWait();
315 }
316
317 @VisibleForTesting
318 protected Replicator createReplicator(List<Entry> entries, int ordinal) {
319 return new Replicator(entries, ordinal);
320 }
321
322 @VisibleForTesting
323 protected class Replicator implements Callable<Integer> {
324 private List<Entry> entries;
325 private int ordinal;
326 public Replicator(List<Entry> entries, int ordinal) {
327 this.entries = entries;
328 this.ordinal = ordinal;
329 }
330
331 @Override
332 public Integer call() throws IOException {
333 SinkPeer sinkPeer = null;
334 try {
335 sinkPeer = replicationSinkMgr.getReplicationSink();
336 BlockingInterface rrs = sinkPeer.getRegionServer();
337 ReplicationProtbufUtil.replicateWALEntry(rrs,
338 entries.toArray(new Entry[entries.size()]));
339 replicationSinkMgr.reportSinkSuccess(sinkPeer);
340 return ordinal;
341
342 } catch (IOException ioe) {
343 if (sinkPeer != null) {
344 replicationSinkMgr.reportBadSink(sinkPeer);
345 }
346 throw ioe;
347 }
348 }
349
350 }
351 }