View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.replication.regionserver;
19  
20  import java.io.IOException;
21  import java.util.Collections;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Random;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.ServerName;
30  import org.apache.hadoop.hbase.client.HConnection;
31  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
32  import org.apache.hadoop.hbase.replication.ReplicationPeers;
33  
34  import com.google.common.collect.Lists;
35  import com.google.common.collect.Maps;
36  
37  /**
38   * Maintains a collection of peers to replicate to, and randomly selects a
39   * single peer to replicate to per set of data to replicate. Also handles
40   * keeping track of peer availability.
41   */
42  public class ReplicationSinkManager {
43  
44    private static final Log LOG = LogFactory.getLog(ReplicationSinkManager.class);
45  
46    /**
47     * Default maximum number of times a replication sink can be reported as bad before
48     * it will no longer be provided as a sink for replication without the pool of
49     * replication sinks being refreshed.
50     */
51    static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
52  
53    /**
54     * Default ratio of the total number of peer cluster region servers to consider
55     * replicating to.
56     */
57    static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.1f;
58  
59  
60    private final HConnection conn;
61  
62    private final String peerClusterId;
63  
64    private final ReplicationPeers replicationPeers;
65  
66    // Count of "bad replication sink" reports per peer sink
67    private final Map<ServerName, Integer> badReportCounts;
68  
69    // Ratio of total number of potential peer region servers to be used
70    private final float ratio;
71  
72    // Maximum number of times a sink can be reported as bad before the pool of
73    // replication sinks is refreshed
74    private final int badSinkThreshold;
75  
76    private final Random random;
77  
78    // A timestamp of the last time the list of replication peers changed
79    private long lastUpdateToPeers;
80  
81    // The current pool of sinks to which replication can be performed
82    private List<ServerName> sinks = Lists.newArrayList();
83  
84    /**
85     * Instantiate for a single replication peer cluster.
86     * @param conn connection to the peer cluster
87     * @param peerClusterId identifier of the peer cluster
88     * @param replicationPeers manages peer clusters being replicated to
89     * @param conf HBase configuration, used for determining replication source ratio and bad peer
90     *          threshold
91     */
92    public ReplicationSinkManager(HConnection conn, String peerClusterId,
93        ReplicationPeers replicationPeers, Configuration conf) {
94      this.conn = conn;
95      this.peerClusterId = peerClusterId;
96      this.replicationPeers = replicationPeers;
97      this.badReportCounts = Maps.newHashMap();
98      this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
99      this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
100                                         DEFAULT_BAD_SINK_THRESHOLD);
101     this.random = new Random();
102   }
103 
104   /**
105    * Get a randomly-chosen replication sink to replicate to.
106    *
107    * @return a replication sink to replicate to
108    */
109   public SinkPeer getReplicationSink() throws IOException {
110     if (replicationPeers.getTimestampOfLastChangeToPeer(peerClusterId)
111                                                         > this.lastUpdateToPeers) {
112       LOG.info("Current list of sinks is out of date, updating");
113       chooseSinks();
114     }
115 
116     if (sinks.isEmpty()) {
117       throw new IOException("No replication sinks are available");
118     }
119     ServerName serverName = sinks.get(random.nextInt(sinks.size()));
120     return new SinkPeer(serverName, conn.getAdmin(serverName));
121   }
122 
123   /**
124    * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
125    * failed). If a single SinkPeer is reported as bad more than
126    * replication.bad.sink.threshold times, it will be removed
127    * from the pool of potential replication targets.
128    *
129    * @param sinkPeer
130    *          The SinkPeer that had a failed replication attempt on it
131    */
132   public void reportBadSink(SinkPeer sinkPeer) {
133     ServerName serverName = sinkPeer.getServerName();
134     int badReportCount = (badReportCounts.containsKey(serverName)
135                     ? badReportCounts.get(serverName) : 0) + 1;
136     badReportCounts.put(serverName, badReportCount);
137     if (badReportCount > badSinkThreshold) {
138       this.sinks.remove(serverName);
139       if (sinks.isEmpty()) {
140         chooseSinks();
141       }
142     }
143   }
144 
145   void chooseSinks() {
146     List<ServerName> slaveAddresses =
147                         replicationPeers.getRegionServersOfConnectedPeer(peerClusterId);
148     Collections.shuffle(slaveAddresses, random);
149     int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
150     sinks = slaveAddresses.subList(0, numSinks);
151     lastUpdateToPeers = System.currentTimeMillis();
152     badReportCounts.clear();
153   }
154 
155   List<ServerName> getSinks() {
156     return sinks;
157   }
158 
159   /**
160    * Wraps a replication region server sink to provide the ability to identify
161    * it.
162    */
163   public static class SinkPeer {
164     private ServerName serverName;
165     private AdminService.BlockingInterface regionServer;
166 
167     public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) {
168       this.serverName = serverName;
169       this.regionServer = regionServer;
170     }
171 
172     ServerName getServerName() {
173       return serverName;
174     }
175 
176     public AdminService.BlockingInterface getRegionServer() {
177       return regionServer;
178     }
179 
180   }
181 
182 }