View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.IOException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FSDataInputStream;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.io.util.LRUDictionary;
33  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
34  import org.apache.hadoop.hbase.util.FSUtils;
35  
36  @InterfaceAudience.Private
37  public abstract class ReaderBase implements HLog.Reader {
38    private static final Log LOG = LogFactory.getLog(ReaderBase.class);
39    protected Configuration conf;
40    protected FileSystem fs;
41    protected Path path;
42    protected long edit = 0;
43    protected long fileLength;
44    protected WALTrailer trailer;
45    // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
46    // than this size, it is written/read respectively, with a WARN message in the log.
47    protected int trailerWarnSize;
48    /**
49     * Compression context to use reading.  Can be null if no compression.
50     */
51    protected CompressionContext compressionContext = null;
52    protected boolean emptyCompressionContext = true;
53  
54    /**
55     * Default constructor.
56     */
57    public ReaderBase() {
58    }
59  
60    @Override
61    public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
62        throws IOException {
63      this.conf = conf;
64      this.path = path;
65      this.fs = fs;
66      this.fileLength = this.fs.getFileStatus(path).getLen();
67      this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
68        HLog.DEFAULT_WAL_TRAILER_WARN_SIZE);
69      initReader(stream);
70  
71      boolean compression = hasCompression();
72      if (compression) {
73        // If compression is enabled, new dictionaries are created here.
74        try {
75          if (compressionContext == null) {
76            compressionContext = new CompressionContext(LRUDictionary.class,
77                FSUtils.isRecoveredEdits(path), hasTagCompression());
78          } else {
79            compressionContext.clear();
80          }
81        } catch (Exception e) {
82          throw new IOException("Failed to initialize CompressionContext", e);
83        }
84      }
85      initAfterCompression();
86    }
87  
88    @Override
89    public HLog.Entry next() throws IOException {
90      return next(null);
91    }
92  
93    @Override
94    public HLog.Entry next(HLog.Entry reuse) throws IOException {
95      HLog.Entry e = reuse;
96      if (e == null) {
97        e = new HLog.Entry(new HLogKey(), new WALEdit());
98      }
99      if (compressionContext != null) {
100       e.setCompressionContext(compressionContext);
101     }
102 
103     boolean hasEntry = false;
104     try {
105       hasEntry = readNext(e);
106     } catch (IllegalArgumentException iae) {
107       TableName tableName = e.getKey().getTablename();
108       if (tableName != null && tableName.equals(TableName.OLD_ROOT_TABLE_NAME)) {
109         // It is old ROOT table edit, ignore it
110         LOG.info("Got an old ROOT edit, ignoring ");
111         return next(e);
112       }
113       else throw iae;
114     }
115     edit++;
116     if (compressionContext != null && emptyCompressionContext) {
117       emptyCompressionContext = false;
118     }
119     return hasEntry ? e : null;
120   }
121 
122   @Override
123   public void seek(long pos) throws IOException {
124     if (compressionContext != null && emptyCompressionContext) {
125       while (next() != null) {
126         if (getPosition() == pos) {
127           emptyCompressionContext = false;
128           break;
129         }
130       }
131     }
132     seekOnFs(pos);
133   }
134 
135   /**
136    * Initializes the log reader with a particular stream (may be null).
137    * Reader assumes ownership of the stream if not null and may use it. Called once.
138    */
139   protected abstract void initReader(FSDataInputStream stream) throws IOException;
140 
141   /**
142    * Initializes the compression after the shared stuff has been initialized. Called once.
143    */
144   protected abstract void initAfterCompression() throws IOException;
145   /**
146    * @return Whether compression is enabled for this log.
147    */
148   protected abstract boolean hasCompression();
149 
150   /**
151    * @return Whether tag compression is enabled for this log.
152    */
153   protected abstract boolean hasTagCompression();
154 
155   /**
156    * Read next entry.
157    * @param e The entry to read into.
158    * @return Whether there was anything to read.
159    */
160   protected abstract boolean readNext(HLog.Entry e) throws IOException;
161 
162   /**
163    * Performs a filesystem-level seek to a certain position in an underlying file.
164    */
165   protected abstract void seekOnFs(long pos) throws IOException;
166 
167   @Override
168   public WALTrailer getWALTrailer() {
169     return null;
170   }
171 }