1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.atomic.AtomicLong;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.CellScanner;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.HRegionInfo;
35 import org.apache.hadoop.hbase.HRegionLocation;
36 import org.apache.hadoop.hbase.KeyValue;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.client.HConnection;
39 import org.apache.hadoop.hbase.client.RegionServerCallable;
40 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
41 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
42 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
43 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
44 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
45 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
46 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
47 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
48 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49 import org.apache.hadoop.hbase.util.Pair;
50
51 import com.google.protobuf.ServiceException;
52
53
54
55
56
57
58
59 @InterfaceAudience.Private
60 public class WALEditsReplaySink {
61
62 private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
63 private static final int MAX_BATCH_SIZE = 1024;
64
65 private final Configuration conf;
66 private final HConnection conn;
67 private final TableName tableName;
68 private final MetricsWALEditsReplay metrics;
69 private final AtomicLong totalReplayedEdits = new AtomicLong();
70 private final boolean skipErrors;
71 private final int replayTimeout;
72 private RpcControllerFactory rpcControllerFactory;
73
74
75
76
77
78
79
80
81 public WALEditsReplaySink(Configuration conf, TableName tableName, HConnection conn)
82 throws IOException {
83 this.conf = conf;
84 this.metrics = new MetricsWALEditsReplay();
85 this.conn = conn;
86 this.tableName = tableName;
87 this.skipErrors = conf.getBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
88 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS);
89
90 this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000);
91 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
92 }
93
94
95
96
97
98
99 public void replayEntries(List<Pair<HRegionLocation, HLog.Entry>> entries) throws IOException {
100 if (entries.size() == 0) {
101 return;
102 }
103
104 int batchSize = entries.size();
105 Map<HRegionInfo, List<HLog.Entry>> entriesByRegion =
106 new HashMap<HRegionInfo, List<HLog.Entry>>();
107 HRegionLocation loc = null;
108 HLog.Entry entry = null;
109 List<HLog.Entry> regionEntries = null;
110
111 for (int i = 0; i < batchSize; i++) {
112 loc = entries.get(i).getFirst();
113 entry = entries.get(i).getSecond();
114 if (entriesByRegion.containsKey(loc.getRegionInfo())) {
115 regionEntries = entriesByRegion.get(loc.getRegionInfo());
116 } else {
117 regionEntries = new ArrayList<HLog.Entry>();
118 entriesByRegion.put(loc.getRegionInfo(), regionEntries);
119 }
120 regionEntries.add(entry);
121 }
122
123 long startTime = EnvironmentEdgeManager.currentTimeMillis();
124
125
126 for (Map.Entry<HRegionInfo, List<HLog.Entry>> _entry : entriesByRegion.entrySet()) {
127 HRegionInfo curRegion = _entry.getKey();
128 List<HLog.Entry> allActions = _entry.getValue();
129
130 int totalActions = allActions.size();
131 int replayedActions = 0;
132 int curBatchSize = 0;
133 for (; replayedActions < totalActions;) {
134 curBatchSize = (totalActions > (MAX_BATCH_SIZE + replayedActions)) ? MAX_BATCH_SIZE
135 : (totalActions - replayedActions);
136 replayEdits(loc, curRegion, allActions.subList(replayedActions,
137 replayedActions + curBatchSize));
138 replayedActions += curBatchSize;
139 }
140 }
141
142 long endTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
143 LOG.debug("number of rows:" + entries.size() + " are sent by batch! spent " + endTime
144 + "(ms)!");
145
146 metrics.updateReplayTime(endTime);
147 metrics.updateReplayBatchSize(batchSize);
148
149 this.totalReplayedEdits.addAndGet(batchSize);
150 }
151
152
153
154
155
156 public String getStats() {
157 return this.totalReplayedEdits.get() == 0 ? "" : "Sink: total replayed edits: "
158 + this.totalReplayedEdits;
159 }
160
161 private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
162 final List<HLog.Entry> entries) throws IOException {
163 try {
164 RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf);
165 ReplayServerCallable<ReplicateWALEntryResponse> callable =
166 new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
167 regionInfo, entries);
168 factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
169 } catch (IOException ie) {
170 if (skipErrors) {
171 LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
172 + "=true so continuing replayEdits with error:" + ie.getMessage());
173 } else {
174 throw ie;
175 }
176 }
177 }
178
179
180
181
182
183 class ReplayServerCallable<R> extends RegionServerCallable<ReplicateWALEntryResponse> {
184 private HRegionInfo regionInfo;
185 private List<HLog.Entry> entries;
186
187 ReplayServerCallable(final HConnection connection, final TableName tableName,
188 final HRegionLocation regionLoc, final HRegionInfo regionInfo,
189 final List<HLog.Entry> entries) {
190 super(connection, tableName, null);
191 this.entries = entries;
192 this.regionInfo = regionInfo;
193 setLocation(regionLoc);
194 }
195
196 @Override
197 public ReplicateWALEntryResponse call() throws IOException {
198 try {
199 replayToServer(this.regionInfo, this.entries);
200 } catch (ServiceException se) {
201 throw ProtobufUtil.getRemoteException(se);
202 }
203 return null;
204 }
205
206 private void replayToServer(HRegionInfo regionInfo, List<HLog.Entry> entries)
207 throws IOException, ServiceException {
208 if (entries.isEmpty()) return;
209
210 HLog.Entry[] entriesArray = new HLog.Entry[entries.size()];
211 entriesArray = entries.toArray(entriesArray);
212 AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName());
213
214 Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
215 ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
216 try {
217 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
218 remoteSvr.replay(controller, p.getFirst());
219 } catch (ServiceException se) {
220 throw ProtobufUtil.getRemoteException(se);
221 }
222 }
223
224 @Override
225 public void prepare(boolean reload) throws IOException {
226 if (!reload) return;
227
228
229
230 boolean skip = false;
231 for (HLog.Entry entry : this.entries) {
232 WALEdit edit = entry.getEdit();
233 List<KeyValue> kvs = edit.getKeyValues();
234 for (KeyValue kv : kvs) {
235
236 setLocation(conn.locateRegion(tableName, kv.getRow()));
237 skip = true;
238 break;
239 }
240
241 if (skip) break;
242 }
243 }
244 }
245 }