View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Random;
29  import java.util.SortedMap;
30  import java.util.SortedSet;
31  import java.util.TreeSet;
32  import java.util.UUID;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.CopyOnWriteArrayList;
35  import java.util.concurrent.LinkedBlockingQueue;
36  import java.util.concurrent.RejectedExecutionException;
37  import java.util.concurrent.ThreadPoolExecutor;
38  import java.util.concurrent.TimeUnit;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.classification.InterfaceAudience;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.Path;
46  import org.apache.hadoop.hbase.Stoppable;
47  import org.apache.hadoop.hbase.replication.ReplicationException;
48  import org.apache.hadoop.hbase.replication.ReplicationListener;
49  import org.apache.hadoop.hbase.replication.ReplicationPeers;
50  import org.apache.hadoop.hbase.replication.ReplicationQueues;
51  import org.apache.hadoop.hbase.replication.ReplicationTracker;
52  import org.apache.zookeeper.KeeperException;
53  
54  import com.google.common.util.concurrent.ThreadFactoryBuilder;
55  
56  /**
57   * This class is responsible to manage all the replication
58   * sources. There are two classes of sources:
59   * <li> Normal sources are persistent and one per peer cluster</li>
60   * <li> Old sources are recovered from a failed region server and our
61   * only goal is to finish replicating the HLog queue it had up in ZK</li>
62   *
63   * When a region server dies, this class uses a watcher to get notified and it
64   * tries to grab a lock in order to transfer all the queues in a local
65   * old source.
66   *
67   * This class implements the ReplicationListener interface so that it can track changes in
68   * replication state.
69   */
70  @InterfaceAudience.Private
71  public class ReplicationSourceManager implements ReplicationListener {
72    private static final Log LOG =
73        LogFactory.getLog(ReplicationSourceManager.class);
74    // List of all the sources that read this RS's logs
75    private final List<ReplicationSourceInterface> sources;
76    // List of all the sources we got from died RSs
77    private final List<ReplicationSourceInterface> oldsources;
78    private final ReplicationQueues replicationQueues;
79    private final ReplicationTracker replicationTracker;
80    private final ReplicationPeers replicationPeers;
81    // UUID for this cluster
82    private final UUID clusterId;
83    // All about stopping
84    private final Stoppable stopper;
85    // All logs we are currently tracking
86    private final Map<String, SortedSet<String>> hlogsById;
87    // Logs for recovered sources we are currently tracking
88    private final Map<String, SortedSet<String>> hlogsByIdRecoveredQueues;
89    private final Configuration conf;
90    private final FileSystem fs;
91    // The path to the latest log we saw, for new coming sources
92    private Path latestPath;
93    // Path to the hlogs directories
94    private final Path logDir;
95    // Path to the hlog archive
96    private final Path oldLogDir;
97    // The number of ms that we wait before moving znodes, HBASE-3596
98    private final long sleepBeforeFailover;
99    // Homemade executer service for replication
100   private final ThreadPoolExecutor executor;
101 
102   private final Random rand;
103 
104 
105   /**
106    * Creates a replication manager and sets the watch on all the other registered region servers
107    * @param replicationQueues the interface for manipulating replication queues
108    * @param replicationPeers
109    * @param replicationTracker
110    * @param conf the configuration to use
111    * @param stopper the stopper object for this region server
112    * @param fs the file system to use
113    * @param logDir the directory that contains all hlog directories of live RSs
114    * @param oldLogDir the directory where old logs are archived
115    * @param clusterId
116    */
117   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
118       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
119       final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
120       final Path oldLogDir, final UUID clusterId) {
121     //CopyOnWriteArrayList is thread-safe.
122     //Generally, reading is more than modifying. 
123     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
124     this.replicationQueues = replicationQueues;
125     this.replicationPeers = replicationPeers;
126     this.replicationTracker = replicationTracker;
127     this.stopper = stopper;
128     this.hlogsById = new HashMap<String, SortedSet<String>>();
129     this.hlogsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
130     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
131     this.conf = conf;
132     this.fs = fs;
133     this.logDir = logDir;
134     this.oldLogDir = oldLogDir;
135     this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
136     this.clusterId = clusterId;
137     this.replicationTracker.registerListener(this);
138     this.replicationPeers.getAllPeerIds();
139     // It's preferable to failover 1 RS at a time, but with good zk servers
140     // more could be processed at the same time.
141     int nbWorkers = conf.getInt("replication.executor.workers", 1);
142     // use a short 100ms sleep since this could be done inline with a RS startup
143     // even if we fail, other region servers can take care of it
144     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
145         100, TimeUnit.MILLISECONDS,
146         new LinkedBlockingQueue<Runnable>());
147     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
148     tfb.setNameFormat("ReplicationExecutor-%d");
149     this.executor.setThreadFactory(tfb.build());
150     this.rand = new Random();
151   }
152 
153   /**
154    * Provide the id of the peer and a log key and this method will figure which
155    * hlog it belongs to and will log, for this region server, the current
156    * position. It will also clean old logs from the queue.
157    * @param log Path to the log currently being replicated from
158    * replication status in zookeeper. It will also delete older entries.
159    * @param id id of the peer cluster
160    * @param position current location in the log
161    * @param queueRecovered indicates if this queue comes from another region server
162    * @param holdLogInZK if true then the log is retained in ZK
163    */
164   public void logPositionAndCleanOldLogs(Path log, String id, long position,
165       boolean queueRecovered, boolean holdLogInZK) {
166     String fileName = log.getName();
167     this.replicationQueues.setLogPosition(id, fileName, position);
168     if (holdLogInZK) {
169      return;
170     }
171     cleanOldLogs(fileName, id, queueRecovered);
172   }
173 
174   /**
175    * Cleans a log file and all older files from ZK. Called when we are sure that a
176    * log file is closed and has no more entries.
177    * @param key Path to the log
178    * @param id id of the peer cluster
179    * @param queueRecovered Whether this is a recovered queue
180    */
181   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
182     if (queueRecovered) {
183       SortedSet<String> hlogs = hlogsByIdRecoveredQueues.get(id);
184       if (hlogs != null && !hlogs.first().equals(key)) {
185         cleanOldLogs(hlogs, key, id);
186       }
187     } else {
188       synchronized (this.hlogsById) {
189         SortedSet<String> hlogs = hlogsById.get(id);
190         if (!hlogs.first().equals(key)) {
191           cleanOldLogs(hlogs, key, id);
192         }
193       }
194     }
195  }
196   
197   private void cleanOldLogs(SortedSet<String> hlogs, String key, String id) {
198     SortedSet<String> hlogSet = hlogs.headSet(key);
199     LOG.debug("Removing " + hlogSet.size() + " logs in the list: " + hlogSet);
200     for (String hlog : hlogSet) {
201       this.replicationQueues.removeLog(id, hlog);
202     }
203     hlogSet.clear();
204   }
205 
206   /**
207    * Adds a normal source per registered peer cluster and tries to process all
208    * old region server hlog queues
209    */
210   protected void init() throws IOException, ReplicationException {
211     for (String id : this.replicationPeers.getConnectedPeers()) {
212       addSource(id);
213     }
214     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
215     if (currentReplicators == null || currentReplicators.size() == 0) {
216       return;
217     }
218     List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
219     LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
220         + otherRegionServers);
221 
222     // Look if there's anything to process after a restart
223     for (String rs : currentReplicators) {
224       if (!otherRegionServers.contains(rs)) {
225         transferQueues(rs);
226       }
227     }
228   }
229 
230   /**
231    * Add a new normal source to this region server
232    * @param id the id of the peer cluster
233    * @return the source that was created
234    * @throws IOException
235    */
236   protected ReplicationSourceInterface addSource(String id) throws IOException,
237       ReplicationException {
238     ReplicationSourceInterface src =
239         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
240           this.replicationPeers, stopper, id, this.clusterId);
241     synchronized (this.hlogsById) {
242       this.sources.add(src);
243       this.hlogsById.put(id, new TreeSet<String>());
244       // Add the latest hlog to that source's queue
245       if (this.latestPath != null) {
246         String name = this.latestPath.getName();
247         this.hlogsById.get(id).add(name);
248         try {
249           this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
250         } catch (ReplicationException e) {
251           String message =
252               "Cannot add log to queue when creating a new source, queueId="
253                   + src.getPeerClusterZnode() + ", filename=" + name;
254           stopper.stop(message);
255           throw e;
256         }
257         src.enqueueLog(this.latestPath);
258       }
259     }
260     src.startup();
261     return src;
262   }
263 
264   /**
265    * Delete a complete queue of hlogs associated with a peer cluster
266    * @param peerId Id of the peer cluster queue of hlogs to delete
267    */
268   public void deleteSource(String peerId, boolean closeConnection) {
269     this.replicationQueues.removeQueue(peerId);
270     if (closeConnection) {
271       this.replicationPeers.disconnectFromPeer(peerId);
272     }
273   }
274 
275   /**
276    * Terminate the replication on this region server
277    */
278   public void join() {
279     this.executor.shutdown();
280     if (this.sources.size() == 0) {
281       this.replicationQueues.removeAllQueues();
282     }
283     for (ReplicationSourceInterface source : this.sources) {
284       source.terminate("Region server is closing");
285     }
286   }
287 
288   /**
289    * Get a copy of the hlogs of the first source on this rs
290    * @return a sorted set of hlog names
291    */
292   protected Map<String, SortedSet<String>> getHLogs() {
293     return Collections.unmodifiableMap(hlogsById);
294   }
295   
296   /**
297    * Get a copy of the hlogs of the recovered sources on this rs
298    * @return a sorted set of hlog names
299    */
300   protected Map<String, SortedSet<String>> getHlogsByIdRecoveredQueues() {
301     return Collections.unmodifiableMap(hlogsByIdRecoveredQueues);
302   }
303 
304   /**
305    * Get a list of all the normal sources of this rs
306    * @return lis of all sources
307    */
308   public List<ReplicationSourceInterface> getSources() {
309     return this.sources;
310   }
311 
312   /**
313    * Get a list of all the old sources of this rs
314    * @return list of all old sources
315    */
316   public List<ReplicationSourceInterface> getOldSources() {
317     return this.oldsources;
318   }
319 
320   void preLogRoll(Path newLog) throws IOException {
321     synchronized (this.hlogsById) {
322       String name = newLog.getName();
323       for (ReplicationSourceInterface source : this.sources) {
324         try {
325           this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
326         } catch (ReplicationException e) {
327           throw new IOException("Cannot add log to replication queue with id="
328               + source.getPeerClusterZnode() + ", filename=" + name, e);
329         }
330       }
331       for (SortedSet<String> hlogs : this.hlogsById.values()) {
332         if (this.sources.isEmpty()) {
333           // If there's no slaves, don't need to keep the old hlogs since
334           // we only consider the last one when a new slave comes in
335           hlogs.clear();
336         }
337         hlogs.add(name);
338       }
339     }
340 
341     this.latestPath = newLog;
342   }
343 
344   void postLogRoll(Path newLog) throws IOException {
345     // This only updates the sources we own, not the recovered ones
346     for (ReplicationSourceInterface source : this.sources) {
347       source.enqueueLog(newLog);
348     }
349   }
350 
351   /**
352    * Factory method to create a replication source
353    * @param conf the configuration to use
354    * @param fs the file system to use
355    * @param manager the manager to use
356    * @param stopper the stopper object for this region server
357    * @param peerId the id of the peer cluster
358    * @return the created source
359    * @throws IOException
360    */
361   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
362       final FileSystem fs, final ReplicationSourceManager manager,
363       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
364       final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException {
365     ReplicationSourceInterface src;
366     try {
367       @SuppressWarnings("rawtypes")
368       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
369           ReplicationSource.class.getCanonicalName()));
370       src = (ReplicationSourceInterface) c.newInstance();
371     } catch (Exception e) {
372       LOG.warn("Passed replication source implementation throws errors, " +
373           "defaulting to ReplicationSource", e);
374       src = new ReplicationSource();
375 
376     }
377     src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId);
378     return src;
379   }
380 
381   /**
382    * Transfer all the queues of the specified to this region server.
383    * First it tries to grab a lock and if it works it will move the
384    * znodes and finally will delete the old znodes.
385    *
386    * It creates one old source for any type of source of the old rs.
387    * @param rsZnode
388    */
389   private void transferQueues(String rsZnode) {
390     NodeFailoverWorker transfer =
391         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
392             this.clusterId);
393     try {
394       this.executor.execute(transfer);
395     } catch (RejectedExecutionException ex) {
396       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
397     }
398   }
399 
400   /**
401    * Clear the references to the specified old source
402    * @param src source to clear
403    */
404   public void closeRecoveredQueue(ReplicationSourceInterface src) {
405     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
406     this.oldsources.remove(src);
407     deleteSource(src.getPeerClusterZnode(), false);
408     this.hlogsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
409   }
410 
411   /**
412    * Thie method first deletes all the recovered sources for the specified
413    * id, then deletes the normal source (deleting all related data in ZK).
414    * @param id The id of the peer cluster
415    */
416   public void removePeer(String id) {
417     LOG.info("Closing the following queue " + id + ", currently have "
418         + sources.size() + " and another "
419         + oldsources.size() + " that were recovered");
420     String terminateMessage = "Replication stream was removed by a user";
421     ReplicationSourceInterface srcToRemove = null;
422     List<ReplicationSourceInterface> oldSourcesToDelete =
423         new ArrayList<ReplicationSourceInterface>();
424     // First close all the recovered sources for this peer
425     for (ReplicationSourceInterface src : oldsources) {
426       if (id.equals(src.getPeerClusterId())) {
427         oldSourcesToDelete.add(src);
428       }
429     }
430     for (ReplicationSourceInterface src : oldSourcesToDelete) {
431       src.terminate(terminateMessage);
432       closeRecoveredQueue((src));
433     }
434     LOG.info("Number of deleted recovered sources for " + id + ": "
435         + oldSourcesToDelete.size());
436     // Now look for the one on this cluster
437     for (ReplicationSourceInterface src : this.sources) {
438       if (id.equals(src.getPeerClusterId())) {
439         srcToRemove = src;
440         break;
441       }
442     }
443     if (srcToRemove == null) {
444       LOG.error("The queue we wanted to close is missing " + id);
445       return;
446     }
447     srcToRemove.terminate(terminateMessage);
448     this.sources.remove(srcToRemove);
449     deleteSource(id, true);
450   }
451 
452   @Override
453   public void regionServerRemoved(String regionserver) {
454     transferQueues(regionserver);
455   }
456 
457   @Override
458   public void peerRemoved(String peerId) {
459     removePeer(peerId);
460   }
461 
462   @Override
463   public void peerListChanged(List<String> peerIds) {
464     for (String id : peerIds) {
465       try {
466         boolean added = this.replicationPeers.connectToPeer(id);
467         if (added) {
468           addSource(id);
469         }
470       } catch (Exception e) {
471         LOG.error("Error while adding a new peer", e);
472       }
473     }
474   }
475 
476   /**
477    * Class responsible to setup new ReplicationSources to take care of the
478    * queues from dead region servers.
479    */
480   class NodeFailoverWorker extends Thread {
481 
482     private String rsZnode;
483     private final ReplicationQueues rq;
484     private final ReplicationPeers rp;
485     private final UUID clusterId;
486 
487     /**
488      *
489      * @param rsZnode
490      */
491     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
492         final ReplicationPeers replicationPeers, final UUID clusterId) {
493       super("Failover-for-"+rsZnode);
494       this.rsZnode = rsZnode;
495       this.rq = replicationQueues;
496       this.rp = replicationPeers;
497       this.clusterId = clusterId;
498     }
499 
500     @Override
501     public void run() {
502       if (this.rq.isThisOurZnode(rsZnode)) {
503         return;
504       }
505       // Wait a bit before transferring the queues, we may be shutting down.
506       // This sleep may not be enough in some cases.
507       try {
508         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
509       } catch (InterruptedException e) {
510         LOG.warn("Interrupted while waiting before transferring a queue.");
511         Thread.currentThread().interrupt();
512       }
513       // We try to lock that rs' queue directory
514       if (stopper.isStopped()) {
515         LOG.info("Not transferring queue since we are shutting down");
516         return;
517       }
518       SortedMap<String, SortedSet<String>> newQueues = null;
519 
520       newQueues = this.rq.claimQueues(rsZnode);
521 
522       // Copying over the failed queue is completed.
523       if (newQueues.isEmpty()) {
524         // We either didn't get the lock or the failed region server didn't have any outstanding
525         // HLogs to replicate, so we are done.
526         return;
527       }
528 
529       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
530         String peerId = entry.getKey();
531         try {
532           ReplicationSourceInterface src =
533               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
534                 stopper, peerId, this.clusterId);
535           if (!this.rp.getConnectedPeers().contains((src.getPeerClusterId()))) {
536             src.terminate("Recovered queue doesn't belong to any current peer");
537             break;
538           }
539           oldsources.add(src);
540           SortedSet<String> hlogsSet = entry.getValue();
541           for (String hlog : hlogsSet) {
542             src.enqueueLog(new Path(oldLogDir, hlog));
543           }
544           src.startup();
545           hlogsByIdRecoveredQueues.put(peerId, hlogsSet);
546         } catch (IOException e) {
547           // TODO manage it
548           LOG.error("Failed creating a source", e);
549         }
550       }
551     }
552   }
553 
554   /**
555    * Get the directory where hlogs are archived
556    * @return the directory where hlogs are archived
557    */
558   public Path getOldLogDir() {
559     return this.oldLogDir;
560   }
561 
562   /**
563    * Get the directory where hlogs are stored by their RSs
564    * @return the directory where hlogs are stored by their RSs
565    */
566   public Path getLogDir() {
567     return this.logDir;
568   }
569 
570   /**
571    * Get the handle on the local file system
572    * @return Handle on the local file system
573    */
574   public FileSystem getFs() {
575     return this.fs;
576   }
577 
578   /**
579    * Get a string representation of all the sources' metrics
580    */
581   public String getStats() {
582     StringBuffer stats = new StringBuffer();
583     for (ReplicationSourceInterface source : sources) {
584       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
585       stats.append(source.getStats() + "\n");
586     }
587     for (ReplicationSourceInterface oldSource : oldsources) {
588       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId() + ": ");
589       stats.append(oldSource.getStats()+ "\n");
590     }
591     return stats.toString();
592   }
593 }