1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.nio.ByteBuffer;
26 import java.util.Arrays;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FSDataInputStream;
33 import org.apache.hadoop.hbase.codec.Codec;
34 import org.apache.hadoop.hbase.io.LimitInputStream;
35 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
36 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader.Builder;
37 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
38 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
39 import org.apache.hadoop.hbase.util.Bytes;
40
41 import com.google.protobuf.CodedInputStream;
42 import com.google.protobuf.InvalidProtocolBufferException;
43
44
45
46
47
48
49
50
51
52
53
54
55 @InterfaceAudience.Private
56 public class ProtobufLogReader extends ReaderBase {
57 private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class);
58 static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
59 static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
60 protected FSDataInputStream inputStream;
61 protected Codec.Decoder cellDecoder;
62 protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
63 protected boolean hasCompression = false;
64 protected boolean hasTagCompression = false;
65
66
67 private long walEditsStopOffset;
68 private boolean trailerPresent;
69
70 public ProtobufLogReader() {
71 super();
72 }
73
74 @Override
75 public void close() throws IOException {
76 if (this.inputStream != null) {
77 this.inputStream.close();
78 this.inputStream = null;
79 }
80 }
81
82 @Override
83 public long getPosition() throws IOException {
84 return inputStream.getPos();
85 }
86
87 @Override
88 public void reset() throws IOException {
89 initInternal(null, false);
90 initAfterCompression();
91 }
92
93 @Override
94 protected void initReader(FSDataInputStream stream) throws IOException {
95 initInternal(stream, true);
96 }
97
98 protected boolean readHeader(Builder builder, FSDataInputStream stream) throws IOException {
99 return builder.mergeDelimitedFrom(stream);
100 }
101
102 private void initInternal(FSDataInputStream stream, boolean isFirst) throws IOException {
103 close();
104 long expectedPos = PB_WAL_MAGIC.length;
105 if (stream == null) {
106 stream = fs.open(path);
107 stream.seek(expectedPos);
108 }
109 if (stream.getPos() != expectedPos) {
110 throw new IOException("The stream is at invalid position: " + stream.getPos());
111 }
112
113 WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
114 boolean hasHeader = readHeader(builder, stream);
115 if (!hasHeader) {
116 throw new EOFException("Couldn't read WAL PB header");
117 }
118 if (isFirst) {
119 WALProtos.WALHeader header = builder.build();
120 this.hasCompression = header.hasHasCompression() && header.getHasCompression();
121 this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
122 }
123 this.inputStream = stream;
124 this.walEditsStopOffset = this.fileLength;
125 long currentPosition = stream.getPos();
126 trailerPresent = setTrailerIfPresent();
127 this.seekOnFs(currentPosition);
128 if (LOG.isTraceEnabled()) {
129 LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
130 + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + trailerPresent);
131 }
132 }
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151 private boolean setTrailerIfPresent() {
152 try {
153 long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
154 if (trailerSizeOffset <= 0) return false;
155 this.seekOnFs(trailerSizeOffset);
156
157 int trailerSize = this.inputStream.readInt();
158 ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
159 this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
160 if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
161 LOG.trace("No trailer found.");
162 return false;
163 }
164 if (trailerSize < 0) {
165 LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
166 return false;
167 } else if (trailerSize > this.trailerWarnSize) {
168
169 LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
170 + trailerSize + " > " + this.trailerWarnSize);
171 }
172
173 long positionOfTrailer = trailerSizeOffset - trailerSize;
174 this.seekOnFs(positionOfTrailer);
175
176 buf = ByteBuffer.allocate(trailerSize);
177 this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
178 trailer = WALTrailer.parseFrom(buf.array());
179 this.walEditsStopOffset = positionOfTrailer;
180 return true;
181 } catch (IOException ioe) {
182 LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
183 }
184 return false;
185 }
186
187 protected WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext)
188 throws IOException {
189 return WALCellCodec.create(conf, compressionContext);
190 }
191
192 @Override
193 protected void initAfterCompression() throws IOException {
194 WALCellCodec codec = getCodec(this.conf, this.compressionContext);
195 this.cellDecoder = codec.getDecoder(this.inputStream);
196 if (this.hasCompression) {
197 this.byteStringUncompressor = codec.getByteStringUncompressor();
198 }
199 }
200
201 @Override
202 protected boolean hasCompression() {
203 return this.hasCompression;
204 }
205
206 @Override
207 protected boolean hasTagCompression() {
208 return this.hasTagCompression;
209 }
210
211 @Override
212 protected boolean readNext(HLog.Entry entry) throws IOException {
213 while (true) {
214
215 long originalPosition = this.inputStream.getPos();
216 if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
217 return false;
218 }
219 WALKey.Builder builder = WALKey.newBuilder();
220 long size = 0;
221 try {
222 long available = -1;
223 try {
224 int firstByte = this.inputStream.read();
225 if (firstByte == -1) {
226 throw new EOFException("First byte is negative");
227 }
228 size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
229
230 available = this.inputStream.available();
231 if (available > 0 && available < size) {
232 throw new EOFException("Available stream not enough for edit, " +
233 "inputStream.available()= " + this.inputStream.available() + ", " +
234 "entry size= " + size);
235 }
236 final InputStream limitedInput = new LimitInputStream(this.inputStream, size);
237 builder.mergeFrom(limitedInput);
238 } catch (InvalidProtocolBufferException ipbe) {
239 throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
240 originalPosition + ", currentPosition=" + this.inputStream.getPos() +
241 ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
242 }
243 if (!builder.isInitialized()) {
244
245
246 throw new EOFException("Partial PB while reading WAL, " +
247 "probably an unexpected EOF, ignoring");
248 }
249 WALKey walKey = builder.build();
250 entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
251 if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
252 LOG.trace("WALKey has no KVs that follow it; trying the next one");
253 continue;
254 }
255 int expectedCells = walKey.getFollowingKvCount();
256 long posBefore = this.inputStream.getPos();
257 try {
258 int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
259 if (expectedCells != actualCells) {
260 throw new EOFException("Only read " + actualCells);
261 }
262 } catch (Exception ex) {
263 String posAfterStr = "<unknown>";
264 try {
265 posAfterStr = this.inputStream.getPos() + "";
266 } catch (Throwable t) {
267 LOG.trace("Error getting pos for error message - ignoring", t);
268 }
269 String message = " while reading " + expectedCells + " WAL KVs; started reading at "
270 + posBefore + " and read up to " + posAfterStr;
271 IOException realEofEx = extractHiddenEof(ex);
272 throw (EOFException) new EOFException("EOF " + message).
273 initCause(realEofEx != null ? realEofEx : ex);
274 }
275 if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
276 LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path
277 + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
278 + this.walEditsStopOffset);
279 throw new EOFException("Read WALTrailer while reading WALEdits");
280 }
281 } catch (EOFException eof) {
282 LOG.trace("Encountered a malformed edit, seeking back to last good position in file", eof);
283
284 if (originalPosition < 0) throw eof;
285
286
287 seekOnFs(originalPosition);
288 return false;
289 }
290 return true;
291 }
292 }
293
294 private IOException extractHiddenEof(Exception ex) {
295
296
297 IOException ioEx = null;
298 if (ex instanceof EOFException) {
299 return (EOFException)ex;
300 } else if (ex instanceof IOException) {
301 ioEx = (IOException)ex;
302 } else if (ex instanceof RuntimeException
303 && ex.getCause() != null && ex.getCause() instanceof IOException) {
304 ioEx = (IOException)ex.getCause();
305 }
306 if (ioEx != null) {
307 if (ioEx.getMessage().contains("EOF")) return ioEx;
308 return null;
309 }
310 return null;
311 }
312
313 @Override
314 public WALTrailer getWALTrailer() {
315 return trailer;
316 }
317
318 @Override
319 protected void seekOnFs(long pos) throws IOException {
320 this.inputStream.seek(pos);
321 }
322 }