View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.EOFException;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.nio.ByteBuffer;
26  import java.util.Arrays;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FSDataInputStream;
33  import org.apache.hadoop.hbase.codec.Codec;
34  import org.apache.hadoop.hbase.io.LimitInputStream;
35  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
36  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader.Builder;
37  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
38  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
39  import org.apache.hadoop.hbase.util.Bytes;
40  
41  import com.google.protobuf.CodedInputStream;
42  import com.google.protobuf.InvalidProtocolBufferException;
43  
44  /**
45   * A Protobuf based WAL has the following structure:
46   * <p>
47   * &lt;PB_WAL_MAGIC&gt;&lt;WALHeader&gt;&lt;WALEdits&gt;...&lt;WALEdits&gt;&lt;Trailer&gt;
48   * &lt;TrailerSize&gt; &lt;PB_WAL_COMPLETE_MAGIC&gt;
49   * </p>
50   * The Reader reads meta information (WAL Compression state, WALTrailer, etc) in
51   * {@link ProtobufLogReader#initReader(FSDataInputStream)}. A WALTrailer is an extensible structure
52   * which is appended at the end of the WAL. This is empty for now; it can contain some meta
53   * information such as Region level stats, etc in future.
54   */
55  @InterfaceAudience.Private
56  public class ProtobufLogReader extends ReaderBase {
57    private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class);
58    static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
59    static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
60    protected FSDataInputStream inputStream;
61    protected Codec.Decoder cellDecoder;
62    protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
63    protected boolean hasCompression = false;
64    protected boolean hasTagCompression = false;
65    // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit entry
66    // in the hlog, the inputstream's position is equal to walEditsStopOffset.
67    private long walEditsStopOffset;
68    private boolean trailerPresent;
69  
70    public ProtobufLogReader() {
71      super();
72    }
73  
74    @Override
75    public void close() throws IOException {
76      if (this.inputStream != null) {
77        this.inputStream.close();
78        this.inputStream = null;
79      }
80    }
81  
82    @Override
83    public long getPosition() throws IOException {
84      return inputStream.getPos();
85    }
86  
87    @Override
88    public void reset() throws IOException {
89      initInternal(null, false);
90      initAfterCompression(); // We need a new decoder (at least).
91    }
92  
93    @Override
94    protected void initReader(FSDataInputStream stream) throws IOException {
95      initInternal(stream, true);
96    }
97  
98    protected boolean readHeader(Builder builder, FSDataInputStream stream) throws IOException {
99      return builder.mergeDelimitedFrom(stream);
100   }
101 
102   private void initInternal(FSDataInputStream stream, boolean isFirst) throws IOException {
103     close();
104     long expectedPos = PB_WAL_MAGIC.length;
105     if (stream == null) {
106       stream = fs.open(path);
107       stream.seek(expectedPos);
108     }
109     if (stream.getPos() != expectedPos) {
110       throw new IOException("The stream is at invalid position: " + stream.getPos());
111     }
112     // Initialize metadata or, when we reset, just skip the header.
113     WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
114     boolean hasHeader = readHeader(builder, stream);
115     if (!hasHeader) {
116       throw new EOFException("Couldn't read WAL PB header");
117     }
118     if (isFirst) {
119       WALProtos.WALHeader header = builder.build();
120       this.hasCompression = header.hasHasCompression() && header.getHasCompression();
121       this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
122     }
123     this.inputStream = stream;
124     this.walEditsStopOffset = this.fileLength;
125     long currentPosition = stream.getPos();
126     trailerPresent = setTrailerIfPresent();
127     this.seekOnFs(currentPosition);
128     if (LOG.isTraceEnabled()) {
129       LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
130           + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + trailerPresent);
131     }
132   }
133 
134   /**
135    * To check whether a trailer is present in a WAL, it seeks to position (fileLength -
136    * PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of
137    * the trailer, and checks whether the trailer is present at the end or not by comparing the last
138    * PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false;
139    * otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just
140    * before the trailer.
141    * <ul>
142    * The trailer is ignored in case:
143    * <li>fileLength is 0 or not correct (when file is under recovery, etc).
144    * <li>the trailer size is negative.
145    * </ul>
146    * <p>
147    * In case the trailer size > this.trailerMaxSize, it is read after a WARN message.
148    * @return true if a valid trailer is present
149    * @throws IOException
150    */
151   private boolean setTrailerIfPresent() {
152     try {
153       long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
154       if (trailerSizeOffset <= 0) return false;// no trailer possible.
155       this.seekOnFs(trailerSizeOffset);
156       // read the int as trailer size.
157       int trailerSize = this.inputStream.readInt();
158       ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
159       this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
160       if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
161         LOG.trace("No trailer found.");
162         return false;
163       }
164       if (trailerSize < 0) {
165         LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
166         return false;
167       } else if (trailerSize > this.trailerWarnSize) {
168         // continue reading after warning the user.
169         LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
170           + trailerSize + " > " + this.trailerWarnSize);
171       }
172       // seek to the position where trailer starts.
173       long positionOfTrailer = trailerSizeOffset - trailerSize;
174       this.seekOnFs(positionOfTrailer);
175       // read the trailer.
176       buf = ByteBuffer.allocate(trailerSize);// for trailer.
177       this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
178       trailer = WALTrailer.parseFrom(buf.array());
179       this.walEditsStopOffset = positionOfTrailer;
180       return true;
181     } catch (IOException ioe) {
182       LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
183     }
184     return false;
185   }
186 
187   protected WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext)
188       throws IOException {
189     return WALCellCodec.create(conf, compressionContext);
190   }
191 
192   @Override
193   protected void initAfterCompression() throws IOException {
194     WALCellCodec codec = getCodec(this.conf, this.compressionContext);
195     this.cellDecoder = codec.getDecoder(this.inputStream);
196     if (this.hasCompression) {
197       this.byteStringUncompressor = codec.getByteStringUncompressor();
198     }
199   }
200 
201   @Override
202   protected boolean hasCompression() {
203     return this.hasCompression;
204   }
205 
206   @Override
207   protected boolean hasTagCompression() {
208     return this.hasTagCompression;
209   }
210 
211   @Override
212   protected boolean readNext(HLog.Entry entry) throws IOException {
213     while (true) {
214       // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
215       long originalPosition = this.inputStream.getPos();
216       if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
217         return false;
218       }
219       WALKey.Builder builder = WALKey.newBuilder();
220       long size = 0;
221       try {
222         long available = -1;
223         try {
224           int firstByte = this.inputStream.read();
225           if (firstByte == -1) {
226             throw new EOFException("First byte is negative");
227           }
228           size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
229           // available may be < 0 on local fs for instance.  If so, can't depend on it.
230           available = this.inputStream.available();
231           if (available > 0 && available < size) {
232             throw new EOFException("Available stream not enough for edit, " +
233                 "inputStream.available()= " + this.inputStream.available() + ", " +
234                 "entry size= " + size);
235           }
236           final InputStream limitedInput = new LimitInputStream(this.inputStream, size);
237           builder.mergeFrom(limitedInput);
238         } catch (InvalidProtocolBufferException ipbe) {
239           throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
240             originalPosition + ", currentPosition=" + this.inputStream.getPos() +
241             ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
242         }
243         if (!builder.isInitialized()) {
244           // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
245           //       If we can get the KV count, we could, theoretically, try to get next record.
246           throw new EOFException("Partial PB while reading WAL, " +
247               "probably an unexpected EOF, ignoring");
248         }
249         WALKey walKey = builder.build();
250         entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
251         if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
252           LOG.trace("WALKey has no KVs that follow it; trying the next one");
253           continue;
254         }
255         int expectedCells = walKey.getFollowingKvCount();
256         long posBefore = this.inputStream.getPos();
257         try {
258           int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
259           if (expectedCells != actualCells) {
260             throw new EOFException("Only read " + actualCells); // other info added in catch
261           }
262         } catch (Exception ex) {
263           String posAfterStr = "<unknown>";
264           try {
265             posAfterStr = this.inputStream.getPos() + "";
266           } catch (Throwable t) {
267             LOG.trace("Error getting pos for error message - ignoring", t);
268           }
269           String message = " while reading " + expectedCells + " WAL KVs; started reading at "
270               + posBefore + " and read up to " + posAfterStr;
271           IOException realEofEx = extractHiddenEof(ex);
272           throw (EOFException) new EOFException("EOF " + message).
273               initCause(realEofEx != null ? realEofEx : ex);
274         }
275         if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
276           LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path
277               + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
278               + this.walEditsStopOffset);
279           throw new EOFException("Read WALTrailer while reading WALEdits");
280         }
281       } catch (EOFException eof) {
282         LOG.trace("Encountered a malformed edit, seeking back to last good position in file", eof);
283         // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
284         if (originalPosition < 0) throw eof;
285         // Else restore our position to original location in hope that next time through we will
286         // read successfully.
287         seekOnFs(originalPosition);
288         return false;
289       }
290       return true;
291     }
292   }
293 
294   private IOException extractHiddenEof(Exception ex) {
295     // There are two problems we are dealing with here. Hadoop stream throws generic exception
296     // for EOF, not EOFException; and scanner further hides it inside RuntimeException.
297     IOException ioEx = null;
298     if (ex instanceof EOFException) {
299       return (EOFException)ex;
300     } else if (ex instanceof IOException) {
301       ioEx = (IOException)ex;
302     } else if (ex instanceof RuntimeException
303         && ex.getCause() != null && ex.getCause() instanceof IOException) {
304       ioEx = (IOException)ex.getCause();
305     }
306     if (ioEx != null) {
307       if (ioEx.getMessage().contains("EOF")) return ioEx;
308       return null;
309     }
310     return null;
311   }
312 
313   @Override
314   public WALTrailer getWALTrailer() {
315     return trailer;
316   }
317 
318   @Override
319   protected void seekOnFs(long pos) throws IOException {
320     this.inputStream.seek(pos);
321   }
322 }