1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.net.ConnectException;
25 import java.net.SocketTimeoutException;
26 import java.util.ArrayList;
27 import java.util.Comparator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.NavigableMap;
31 import java.util.UUID;
32 import java.util.concurrent.PriorityBlockingQueue;
33 import java.util.concurrent.TimeUnit;
34
35 import org.apache.commons.lang.StringUtils;
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.classification.InterfaceAudience;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.fs.FileStatus;
41 import org.apache.hadoop.fs.FileSystem;
42 import org.apache.hadoop.fs.Path;
43 import org.apache.hadoop.hbase.HBaseConfiguration;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.KeyValue;
46 import org.apache.hadoop.hbase.Stoppable;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.TableNotFoundException;
49 import org.apache.hadoop.hbase.client.HConnection;
50 import org.apache.hadoop.hbase.client.HConnectionManager;
51 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
52 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
53 import org.apache.hadoop.hbase.regionserver.wal.HLog;
54 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
55 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
56 import org.apache.hadoop.hbase.replication.ReplicationException;
57 import org.apache.hadoop.hbase.replication.ReplicationPeers;
58 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
59 import org.apache.hadoop.hbase.replication.ReplicationQueues;
60 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
61 import org.apache.hadoop.hbase.util.Bytes;
62 import org.apache.hadoop.hbase.util.Threads;
63 import org.apache.hadoop.ipc.RemoteException;
64
65
66
67
68
69
70
71
72
73
74
75
76
77 @InterfaceAudience.Private
78 public class ReplicationSource extends Thread
79 implements ReplicationSourceInterface {
80
81 public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
82
83 private PriorityBlockingQueue<Path> queue;
84 private HConnection conn;
85 private ReplicationQueues replicationQueues;
86 private ReplicationPeers replicationPeers;
87 private Configuration conf;
88 private ReplicationQueueInfo replicationQueueInfo;
89
90 private String peerId;
91
92 private ReplicationSourceManager manager;
93
94 private Stoppable stopper;
95
96 private long sleepForRetries;
97
98 private long replicationQueueSizeCapacity;
99
100 private int replicationQueueNbCapacity;
101
102 private HLog.Reader reader;
103
104 private long lastLoggedPosition = -1;
105
106 private volatile Path currentPath;
107 private FileSystem fs;
108
109 private UUID clusterId;
110
111 private UUID peerClusterId;
112
113 private long totalReplicatedEdits = 0;
114
115 private long totalReplicatedOperations = 0;
116
117 private String peerClusterZnode;
118
119 private int maxRetriesMultiplier;
120
121 private int socketTimeoutMultiplier;
122
123 private int currentNbOperations = 0;
124
125 private int currentSize = 0;
126
127 private volatile boolean running = true;
128
129 private MetricsSource metrics;
130
131 private ReplicationHLogReaderManager repLogReader;
132
133 private ReplicationSinkManager replicationSinkMgr;
134
135 private int logQueueWarnThreshold;
136
137 private ReplicationThrottler throttler;
138
139
140
141
142
143
144
145
146
147
148
149 public void init(final Configuration conf, final FileSystem fs,
150 final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
151 final ReplicationPeers replicationPeers, final Stoppable stopper,
152 final String peerClusterZnode, final UUID clusterId) throws IOException {
153 this.stopper = stopper;
154 this.conf = HBaseConfiguration.create(conf);
155 decorateConf();
156 this.replicationQueueSizeCapacity =
157 this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
158 this.replicationQueueNbCapacity =
159 this.conf.getInt("replication.source.nb.capacity", 25000);
160 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
161 this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
162 maxRetriesMultiplier * maxRetriesMultiplier);
163 this.queue =
164 new PriorityBlockingQueue<Path>(
165 this.conf.getInt("hbase.regionserver.maxlogs", 32),
166 new LogsComparator());
167
168
169
170 this.conn = HConnectionManager.getConnection(this.conf);
171 long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
172 this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
173 this.replicationQueues = replicationQueues;
174 this.replicationPeers = replicationPeers;
175 this.manager = manager;
176 this.sleepForRetries =
177 this.conf.getLong("replication.source.sleepforretries", 1000);
178 this.fs = fs;
179 this.metrics = new MetricsSource(peerClusterZnode);
180 this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
181 this.clusterId = clusterId;
182
183 this.peerClusterZnode = peerClusterZnode;
184 this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
185
186 this.peerId = this.replicationQueueInfo.getPeerId();
187 this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, this.conf);
188 this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
189 }
190
191 private void decorateConf() {
192 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
193 if (StringUtils.isNotEmpty(replicationCodec)) {
194 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
195 }
196 }
197
198 @Override
199 public void enqueueLog(Path log) {
200 this.queue.put(log);
201 int queueSize = queue.size();
202 this.metrics.setSizeOfLogQueue(queueSize);
203
204 if (queueSize > this.logQueueWarnThreshold) {
205 LOG.warn("Queue size: " + queueSize +
206 " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
207 }
208 }
209
210 private void uninitialize() {
211 if (this.conn != null) {
212 try {
213 this.conn.close();
214 } catch (IOException e) {
215 LOG.debug("Attempt to close connection failed", e);
216 }
217 }
218 LOG.debug("Source exiting " + this.peerId);
219 metrics.clear();
220 }
221
222 @Override
223 public void run() {
224 connectToPeers();
225
226 if (!this.isActive()) {
227 uninitialize();
228 return;
229 }
230
231 int sleepMultiplier = 1;
232
233 while (this.isActive() && this.peerClusterId == null) {
234 this.peerClusterId = replicationPeers.getPeerUUID(this.peerId);
235 if (this.isActive() && this.peerClusterId == null) {
236 if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
237 sleepMultiplier++;
238 }
239 }
240 }
241
242 if (!this.isActive()) {
243 uninitialize();
244 return;
245 }
246
247
248 sleepMultiplier = 1;
249
250
251
252 if (clusterId.equals(peerClusterId)) {
253 this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
254 + peerClusterId);
255 }
256 LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
257
258
259
260 if (this.replicationQueueInfo.isQueueRecovered()) {
261 try {
262 this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode,
263 this.queue.peek().getName()));
264 if (LOG.isTraceEnabled()) {
265 LOG.trace("Recovered queue started with log " + this.queue.peek() +
266 " at position " + this.repLogReader.getPosition());
267 }
268 } catch (ReplicationException e) {
269 this.terminate("Couldn't get the position of this recovered queue " +
270 this.peerClusterZnode, e);
271 }
272 }
273
274 while (isActive()) {
275
276 if (!isPeerEnabled()) {
277 if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
278 sleepMultiplier++;
279 }
280 continue;
281 }
282 Path oldPath = getCurrentPath();
283
284
285
286 boolean hasCurrentPath = getNextPath();
287 if (getCurrentPath() != null && oldPath == null) {
288 sleepMultiplier = 1;
289 }
290 if (!hasCurrentPath) {
291 if (sleepForRetries("No log to process", sleepMultiplier)) {
292 sleepMultiplier++;
293 }
294 continue;
295 }
296 boolean currentWALisBeingWrittenTo = false;
297
298
299
300
301
302
303
304
305 if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
306 currentWALisBeingWrittenTo = true;
307 }
308
309 if (!openReader(sleepMultiplier)) {
310
311 sleepMultiplier = 1;
312 continue;
313 }
314
315
316 if (this.reader == null) {
317 if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
318 sleepMultiplier++;
319 }
320 continue;
321 }
322
323 boolean gotIOE = false;
324 currentNbOperations = 0;
325 List<HLog.Entry> entries = new ArrayList<HLog.Entry>(1);
326 currentSize = 0;
327 try {
328 if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
329 continue;
330 }
331 } catch (IOException ioe) {
332 LOG.warn(this.peerClusterZnode + " Got: ", ioe);
333 gotIOE = true;
334 if (ioe.getCause() instanceof EOFException) {
335
336 boolean considerDumping = false;
337 if (this.replicationQueueInfo.isQueueRecovered()) {
338 try {
339 FileStatus stat = this.fs.getFileStatus(this.currentPath);
340 if (stat.getLen() == 0) {
341 LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
342 }
343 considerDumping = true;
344 } catch (IOException e) {
345 LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
346 }
347 }
348
349 if (considerDumping &&
350 sleepMultiplier == this.maxRetriesMultiplier &&
351 processEndOfFile()) {
352 continue;
353 }
354 }
355 } finally {
356 try {
357 this.reader = null;
358 this.repLogReader.closeReader();
359 } catch (IOException e) {
360 gotIOE = true;
361 LOG.warn("Unable to finalize the tailing of a file", e);
362 }
363 }
364
365
366
367
368 if (this.isActive() && (gotIOE || entries.isEmpty())) {
369 if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
370 this.manager.logPositionAndCleanOldLogs(this.currentPath,
371 this.peerClusterZnode, this.repLogReader.getPosition(),
372 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
373 this.lastLoggedPosition = this.repLogReader.getPosition();
374 }
375
376 if (!gotIOE) {
377 sleepMultiplier = 1;
378 }
379 if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
380 sleepMultiplier++;
381 }
382 continue;
383 }
384 sleepMultiplier = 1;
385 shipEdits(currentWALisBeingWrittenTo, entries);
386 }
387 uninitialize();
388 }
389
390
391
392
393
394
395
396
397
398
399 protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries)
400 throws IOException{
401 long seenEntries = 0;
402 if (LOG.isTraceEnabled()) {
403 LOG.trace("Seeking in " + this.currentPath + " at position "
404 + this.repLogReader.getPosition());
405 }
406 this.repLogReader.seek();
407 long positionBeforeRead = this.repLogReader.getPosition();
408 HLog.Entry entry =
409 this.repLogReader.readNextAndSetPosition();
410 while (entry != null) {
411 WALEdit edit = entry.getEdit();
412 this.metrics.incrLogEditsRead();
413 seenEntries++;
414
415 HLogKey logKey = entry.getKey();
416
417 if (!logKey.getClusterIds().contains(peerClusterId)) {
418 removeNonReplicableEdits(entry);
419
420
421 if (!logKey.getTablename().equals(TableName.META_TABLE_NAME) &&
422 edit.size() != 0) {
423
424 logKey.addClusterId(clusterId);
425 currentNbOperations += countDistinctRowKeys(edit);
426 entries.add(entry);
427 currentSize += entry.getEdit().heapSize();
428 } else {
429 this.metrics.incrLogEditsFiltered();
430 }
431 }
432
433 if (currentSize >= this.replicationQueueSizeCapacity ||
434 entries.size() >= this.replicationQueueNbCapacity) {
435 break;
436 }
437 try {
438 entry = this.repLogReader.readNextAndSetPosition();
439 } catch (IOException ie) {
440 LOG.debug("Break on IOE: " + ie.getMessage());
441 break;
442 }
443 }
444 metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
445 if (currentWALisBeingWrittenTo) {
446 return false;
447 }
448
449
450 return seenEntries == 0 && processEndOfFile();
451 }
452
453 private void connectToPeers() {
454 int sleepMultiplier = 1;
455
456
457 while (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
458 replicationSinkMgr.chooseSinks();
459 if (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
460 if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
461 sleepMultiplier++;
462 }
463 }
464 }
465 }
466
467
468
469
470
471 protected boolean getNextPath() {
472 try {
473 if (this.currentPath == null) {
474 this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
475 this.metrics.setSizeOfLogQueue(queue.size());
476 if (this.currentPath != null) {
477 this.manager.cleanOldLogs(this.currentPath.getName(),
478 this.peerId,
479 this.replicationQueueInfo.isQueueRecovered());
480 if (LOG.isTraceEnabled()) {
481 LOG.trace("New log: " + this.currentPath);
482 }
483 }
484 }
485 } catch (InterruptedException e) {
486 LOG.warn("Interrupted while reading edits", e);
487 }
488 return this.currentPath != null;
489 }
490
491
492
493
494
495
496
497 protected boolean openReader(int sleepMultiplier) {
498 try {
499 try {
500 if (LOG.isTraceEnabled()) {
501 LOG.trace("Opening log " + this.currentPath);
502 }
503 this.reader = repLogReader.openReader(this.currentPath);
504 } catch (FileNotFoundException fnfe) {
505 if (this.replicationQueueInfo.isQueueRecovered()) {
506
507
508
509 List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
510 LOG.info("NB dead servers : " + deadRegionServers.size());
511 for (String curDeadServerName : deadRegionServers) {
512 Path deadRsDirectory =
513 new Path(manager.getLogDir().getParent(), curDeadServerName);
514 Path[] locs = new Path[] {
515 new Path(deadRsDirectory, currentPath.getName()),
516 new Path(deadRsDirectory.suffix(HLog.SPLITTING_EXT),
517 currentPath.getName()),
518 };
519 for (Path possibleLogLocation : locs) {
520 LOG.info("Possible location " + possibleLogLocation.toUri().toString());
521 if (this.manager.getFs().exists(possibleLogLocation)) {
522
523 LOG.info("Log " + this.currentPath + " still exists at " +
524 possibleLogLocation);
525
526 return true;
527 }
528 }
529 }
530
531
532 if (stopper instanceof ReplicationSyncUp.DummyServer) {
533 FileStatus[] rss = fs.listStatus(manager.getLogDir());
534 for (FileStatus rs : rss) {
535 Path p = rs.getPath();
536 FileStatus[] logs = fs.listStatus(p);
537 for (FileStatus log : logs) {
538 p = new Path(p, log.getPath().getName());
539 if (p.getName().equals(currentPath.getName())) {
540 currentPath = p;
541 LOG.info("Log " + this.currentPath + " exists under " + manager.getLogDir());
542
543 this.openReader(sleepMultiplier);
544 return true;
545 }
546 }
547 }
548 }
549
550
551
552
553
554
555
556
557 throw new IOException("File from recovered queue is " +
558 "nowhere to be found", fnfe);
559 } else {
560
561 Path archivedLogLocation =
562 new Path(manager.getOldLogDir(), currentPath.getName());
563 if (this.manager.getFs().exists(archivedLogLocation)) {
564 currentPath = archivedLogLocation;
565 LOG.info("Log " + this.currentPath + " was moved to " +
566 archivedLogLocation);
567
568 this.openReader(sleepMultiplier);
569
570 }
571
572 }
573 }
574 } catch (IOException ioe) {
575 if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
576 LOG.warn(this.peerClusterZnode + " Got: ", ioe);
577 this.reader = null;
578 if (ioe.getCause() instanceof NullPointerException) {
579
580
581
582 LOG.warn("Got NPE opening reader, will retry.");
583 } else if (sleepMultiplier == this.maxRetriesMultiplier) {
584
585
586 LOG.warn("Waited too long for this file, considering dumping");
587 return !processEndOfFile();
588 }
589 }
590 return true;
591 }
592
593
594
595
596
597
598
599 private boolean isCurrentLogEmpty() {
600 return (this.repLogReader.getPosition() == 0 &&
601 !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
602 }
603
604
605
606
607
608
609
610 protected boolean sleepForRetries(String msg, int sleepMultiplier) {
611 try {
612 if (LOG.isTraceEnabled()) {
613 LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
614 }
615 Thread.sleep(this.sleepForRetries * sleepMultiplier);
616 } catch (InterruptedException e) {
617 LOG.debug("Interrupted while sleeping between retries");
618 Thread.currentThread().interrupt();
619 }
620 return sleepMultiplier < maxRetriesMultiplier;
621 }
622
623
624
625
626
627 protected void removeNonReplicableEdits(HLog.Entry entry) {
628 String tabName = entry.getKey().getTablename().getNameAsString();
629 ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
630 Map<String, List<String>> tableCFs = null;
631 try {
632 tableCFs = this.replicationPeers.getTableCFs(peerId);
633 } catch (IllegalArgumentException e) {
634 LOG.error("should not happen: can't get tableCFs for peer " + peerId +
635 ", degenerate as if it's not configured by keeping tableCFs==null");
636 }
637 int size = kvs.size();
638
639
640
641 if (tableCFs != null && !tableCFs.containsKey(tabName)) {
642 kvs.clear();
643 } else {
644 NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
645 List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
646 for (int i = size - 1; i >= 0; i--) {
647 KeyValue kv = kvs.get(i);
648
649
650
651
652 if (scopes == null || !scopes.containsKey(kv.getFamily()) ||
653 (cfs != null && !cfs.contains(Bytes.toString(kv.getFamily())))) {
654 kvs.remove(i);
655 }
656 }
657 }
658
659 if (kvs.size() < size/2) {
660 kvs.trimToSize();
661 }
662 }
663
664
665
666
667
668
669
670 private int countDistinctRowKeys(WALEdit edit) {
671 List<KeyValue> kvs = edit.getKeyValues();
672 int distinctRowKeys = 1;
673 KeyValue lastKV = kvs.get(0);
674 for (int i = 0; i < edit.size(); i++) {
675 if (!kvs.get(i).matchingRow(lastKV)) {
676 distinctRowKeys++;
677 }
678 }
679 return distinctRowKeys;
680 }
681
682
683
684
685
686
687 protected void shipEdits(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries) {
688 int sleepMultiplier = 1;
689 if (entries.isEmpty()) {
690 LOG.warn("Was given 0 edits to ship");
691 return;
692 }
693 while (this.isActive()) {
694 if (!isPeerEnabled()) {
695 if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
696 sleepMultiplier++;
697 }
698 continue;
699 }
700 SinkPeer sinkPeer = null;
701 try {
702 if (this.throttler.isEnabled()) {
703 long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
704 if (sleepTicks > 0) {
705 try {
706 if (LOG.isTraceEnabled()) {
707 LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
708 }
709 Thread.sleep(sleepTicks);
710 } catch (InterruptedException e) {
711 LOG.debug("Interrupted while sleeping for throttling control");
712 Thread.currentThread().interrupt();
713
714
715 continue;
716 }
717
718 this.throttler.resetStartTick();
719 }
720 }
721 sinkPeer = replicationSinkMgr.getReplicationSink();
722 BlockingInterface rrs = sinkPeer.getRegionServer();
723 if (LOG.isTraceEnabled()) {
724 LOG.trace("Replicating " + entries.size() +
725 " entries of total size " + currentSize);
726 }
727 ReplicationProtbufUtil.replicateWALEntry(rrs,
728 entries.toArray(new HLog.Entry[entries.size()]));
729 if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
730 this.manager.logPositionAndCleanOldLogs(this.currentPath,
731 this.peerClusterZnode, this.repLogReader.getPosition(),
732 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
733 this.lastLoggedPosition = this.repLogReader.getPosition();
734 }
735 if (this.throttler.isEnabled()) {
736 this.throttler.addPushSize(currentSize);
737 }
738 this.totalReplicatedEdits += entries.size();
739 this.totalReplicatedOperations += currentNbOperations;
740 this.metrics.shipBatch(this.currentNbOperations, this.currentSize/1024);
741 this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
742 if (LOG.isTraceEnabled()) {
743 LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
744 + this.totalReplicatedOperations + " operations");
745 }
746 break;
747
748 } catch (IOException ioe) {
749
750 this.metrics.refreshAgeOfLastShippedOp();
751 if (ioe instanceof RemoteException) {
752 ioe = ((RemoteException) ioe).unwrapRemoteException();
753 LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
754 if (ioe instanceof TableNotFoundException) {
755 if (sleepForRetries("A table is missing in the peer cluster. "
756 + "Replication cannot proceed without losing data.", sleepMultiplier)) {
757 sleepMultiplier++;
758 }
759
760
761 if (isInterrupted()) {
762 continue;
763 }
764 }
765 } else {
766 if (ioe instanceof SocketTimeoutException) {
767
768
769
770 sleepForRetries("Encountered a SocketTimeoutException. Since the " +
771 "call to the remote cluster timed out, which is usually " +
772 "caused by a machine failure or a massive slowdown",
773 this.socketTimeoutMultiplier);
774
775
776 if (isInterrupted()) {
777 continue;
778 }
779 } else if (ioe instanceof ConnectException) {
780 LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
781 replicationSinkMgr.chooseSinks();
782 } else {
783 LOG.warn("Can't replicate because of a local or network error: ", ioe);
784 }
785 }
786
787 if (sinkPeer != null) {
788 replicationSinkMgr.reportBadSink(sinkPeer);
789 }
790 if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
791 sleepMultiplier++;
792 }
793 }
794 }
795 }
796
797
798
799
800
801
802 protected boolean isPeerEnabled() {
803 return this.replicationPeers.getStatusOfConnectedPeer(this.peerId);
804 }
805
806
807
808
809
810
811
812
813 protected boolean processEndOfFile() {
814 if (this.queue.size() != 0) {
815 if (LOG.isTraceEnabled()) {
816 String filesize = "N/A";
817 try {
818 FileStatus stat = this.fs.getFileStatus(this.currentPath);
819 filesize = stat.getLen()+"";
820 } catch (IOException ex) {}
821 LOG.trace("Reached the end of a log, stats: " + getStats() +
822 ", and the length of the file is " + filesize);
823 }
824 this.currentPath = null;
825 this.repLogReader.finishCurrentFile();
826 this.reader = null;
827 return true;
828 } else if (this.replicationQueueInfo.isQueueRecovered()) {
829 this.manager.closeRecoveredQueue(this);
830 LOG.info("Finished recovering the queue with the following stats " + getStats());
831 this.running = false;
832 return true;
833 }
834 return false;
835 }
836
837 public void startup() {
838 String n = Thread.currentThread().getName();
839 Thread.UncaughtExceptionHandler handler =
840 new Thread.UncaughtExceptionHandler() {
841 public void uncaughtException(final Thread t, final Throwable e) {
842 LOG.error("Unexpected exception in ReplicationSource," +
843 " currentPath=" + currentPath, e);
844 }
845 };
846 Threads.setDaemonThreadRunning(
847 this, n + ".replicationSource," +
848 this.peerClusterZnode, handler);
849 }
850
851 public void terminate(String reason) {
852 terminate(reason, null);
853 }
854
855 public void terminate(String reason, Exception cause) {
856 if (cause == null) {
857 LOG.info("Closing source "
858 + this.peerClusterZnode + " because: " + reason);
859
860 } else {
861 LOG.error("Closing source " + this.peerClusterZnode
862 + " because an error occurred: " + reason, cause);
863 }
864 this.running = false;
865 this.interrupt();
866 Threads.shutdown(this, this.sleepForRetries * this.maxRetriesMultiplier);
867 }
868
869 public String getPeerClusterZnode() {
870 return this.peerClusterZnode;
871 }
872
873 public String getPeerClusterId() {
874 return this.peerId;
875 }
876
877 public Path getCurrentPath() {
878 return this.currentPath;
879 }
880
881 private boolean isActive() {
882 return !this.stopper.isStopped() && this.running && !isInterrupted();
883 }
884
885
886
887
888 public static class LogsComparator implements Comparator<Path> {
889
890 @Override
891 public int compare(Path o1, Path o2) {
892 return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
893 }
894
895
896
897
898
899
900
901 private long getTS(Path p) {
902 String[] parts = p.getName().split("\\.");
903 return Long.parseLong(parts[parts.length-1]);
904 }
905 }
906
907 @Override
908 public String getStats() {
909 long position = this.repLogReader.getPosition();
910 return "Total replicated edits: " + totalReplicatedEdits +
911 ", currently replicating from: " + this.currentPath +
912 " at position: " + position;
913 }
914 }