View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.CellScanner;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.HRegionInfo;
35  import org.apache.hadoop.hbase.HRegionLocation;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.client.HConnection;
39  import org.apache.hadoop.hbase.client.RegionServerCallable;
40  import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
41  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
42  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
43  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
44  import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
45  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
46  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
47  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
48  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49  import org.apache.hadoop.hbase.util.Pair;
50  
51  import com.google.protobuf.ServiceException;
52  
53  /**
54   * This class is responsible for replaying the edits coming from a failed region server.
55   * <p/>
56   * This class uses the native HBase client in order to replay WAL entries.
57   * <p/>
58   */
59  @InterfaceAudience.Private
60  public class WALEditsReplaySink {
61  
62    private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
63    private static final int MAX_BATCH_SIZE = 1024;
64  
65    private final Configuration conf;
66    private final HConnection conn;
67    private final TableName tableName;
68    private final MetricsWALEditsReplay metrics;
69    private final AtomicLong totalReplayedEdits = new AtomicLong();
70    private final boolean skipErrors;
71    private final int replayTimeout;
72    private RpcControllerFactory rpcControllerFactory;
73  
74    /**
75     * Create a sink for WAL log entries replay
76     * @param conf
77     * @param tableName
78     * @param conn
79     * @throws IOException
80     */
81    public WALEditsReplaySink(Configuration conf, TableName tableName, HConnection conn)
82        throws IOException {
83      this.conf = conf;
84      this.metrics = new MetricsWALEditsReplay();
85      this.conn = conn;
86      this.tableName = tableName;
87      this.skipErrors = conf.getBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
88        HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS);
89      // a single replay operation time out and default is 60 seconds
90      this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000);
91      this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
92    }
93  
94    /**
95     * Replay an array of actions of the same region directly into the newly assigned Region Server
96     * @param entries
97     * @throws IOException
98     */
99    public void replayEntries(List<Pair<HRegionLocation, HLog.Entry>> entries) throws IOException {
100     if (entries.size() == 0) {
101       return;
102     }
103 
104     int batchSize = entries.size();
105     Map<HRegionInfo, List<HLog.Entry>> entriesByRegion =
106         new HashMap<HRegionInfo, List<HLog.Entry>>();
107     HRegionLocation loc = null;
108     HLog.Entry entry = null;
109     List<HLog.Entry> regionEntries = null;
110     // Build the action list.
111     for (int i = 0; i < batchSize; i++) {
112       loc = entries.get(i).getFirst();
113       entry = entries.get(i).getSecond();
114       if (entriesByRegion.containsKey(loc.getRegionInfo())) {
115         regionEntries = entriesByRegion.get(loc.getRegionInfo());
116       } else {
117         regionEntries = new ArrayList<HLog.Entry>();
118         entriesByRegion.put(loc.getRegionInfo(), regionEntries);
119       }
120       regionEntries.add(entry);
121     }
122 
123     long startTime = EnvironmentEdgeManager.currentTimeMillis();
124 
125     // replaying edits by region
126     for (Map.Entry<HRegionInfo, List<HLog.Entry>> _entry : entriesByRegion.entrySet()) {
127       HRegionInfo curRegion = _entry.getKey();
128       List<HLog.Entry> allActions = _entry.getValue();
129       // send edits in chunks
130       int totalActions = allActions.size();
131       int replayedActions = 0;
132       int curBatchSize = 0;
133       for (; replayedActions < totalActions;) {
134         curBatchSize = (totalActions > (MAX_BATCH_SIZE + replayedActions)) ? MAX_BATCH_SIZE
135                 : (totalActions - replayedActions);
136         replayEdits(loc, curRegion, allActions.subList(replayedActions,
137           replayedActions + curBatchSize));
138         replayedActions += curBatchSize;
139       }
140     }
141 
142     long endTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
143     LOG.debug("number of rows:" + entries.size() + " are sent by batch! spent " + endTime
144         + "(ms)!");
145 
146     metrics.updateReplayTime(endTime);
147     metrics.updateReplayBatchSize(batchSize);
148 
149     this.totalReplayedEdits.addAndGet(batchSize);
150   }
151 
152   /**
153    * Get a string representation of this sink's metrics
154    * @return string with the total replayed edits count
155    */
156   public String getStats() {
157     return this.totalReplayedEdits.get() == 0 ? "" : "Sink: total replayed edits: "
158         + this.totalReplayedEdits;
159   }
160 
161   private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
162       final List<HLog.Entry> entries) throws IOException {
163     try {
164       RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf);
165       ReplayServerCallable<ReplicateWALEntryResponse> callable =
166           new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
167               regionInfo, entries);
168       factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
169     } catch (IOException ie) {
170       if (skipErrors) {
171         LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
172             + "=true so continuing replayEdits with error:" + ie.getMessage());
173       } else {
174         throw ie;
175       }
176     }
177   }
178 
179   /**
180    * Callable that handles the <code>replay</code> method call going against a single regionserver
181    * @param <R>
182    */
183   class ReplayServerCallable<R> extends RegionServerCallable<ReplicateWALEntryResponse> {
184     private HRegionInfo regionInfo;
185     private List<HLog.Entry> entries;
186 
187     ReplayServerCallable(final HConnection connection, final TableName tableName,
188         final HRegionLocation regionLoc, final HRegionInfo regionInfo,
189         final List<HLog.Entry> entries) {
190       super(connection, tableName, null);
191       this.entries = entries;
192       this.regionInfo = regionInfo;
193       setLocation(regionLoc);
194     }
195 
196     @Override
197     public ReplicateWALEntryResponse call() throws IOException {
198       try {
199         replayToServer(this.regionInfo, this.entries);
200       } catch (ServiceException se) {
201         throw ProtobufUtil.getRemoteException(se);
202       }
203       return null;
204     }
205 
206     private void replayToServer(HRegionInfo regionInfo, List<HLog.Entry> entries)
207         throws IOException, ServiceException {
208       if (entries.isEmpty()) return;
209 
210       HLog.Entry[] entriesArray = new HLog.Entry[entries.size()];
211       entriesArray = entries.toArray(entriesArray);
212       AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName());
213 
214       Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
215           ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
216       try {
217         PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
218         remoteSvr.replay(controller, p.getFirst());
219       } catch (ServiceException se) {
220         throw ProtobufUtil.getRemoteException(se);
221       }
222     }
223 
224     @Override
225     public void prepare(boolean reload) throws IOException {
226       if (!reload) return;
227       // relocate regions in case we have a new dead server or network hiccup
228       // if not due to connection issue, the following code should run fast because it uses
229       // cached location
230       boolean skip = false;
231       for (HLog.Entry entry : this.entries) {
232         WALEdit edit = entry.getEdit();
233         List<KeyValue> kvs = edit.getKeyValues();
234         for (KeyValue kv : kvs) {
235           // filtering HLog meta entries
236           setLocation(conn.locateRegion(tableName, kv.getRow()));
237           skip = true;
238           break;
239         }
240         // use first log entry to relocate region because all entries are for one region
241         if (skip) break;
242       }
243     }
244   }
245 }