View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.IOException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FSDataOutputStream;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.KeyValue;
32  import org.apache.hadoop.hbase.codec.Codec;
33  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
34  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
35  import org.apache.hadoop.hbase.util.FSUtils;
36  
37  /**
38   * Writer for protobuf-based WAL.
39   */
40  @InterfaceAudience.Private
41  public class ProtobufLogWriter extends WriterBase {
42    private final Log LOG = LogFactory.getLog(this.getClass());
43    protected FSDataOutputStream output;
44    protected Codec.Encoder cellEncoder;
45    protected WALCellCodec.ByteStringCompressor compressor;
46    private boolean trailerWritten;
47    private WALTrailer trailer;
48    // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
49    // than this size, it is written/read respectively, with a WARN message in the log.
50    private int trailerWarnSize;
51  
52    public ProtobufLogWriter() {
53      super();
54    }
55  
56    protected WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext)
57        throws IOException {
58      return WALCellCodec.create(conf, compressionContext);
59    }
60  
61    protected WALHeader buildWALHeader(WALHeader.Builder builder) throws IOException {
62      return builder.build();
63    }
64  
65    @Override
66    @SuppressWarnings("deprecation")
67    public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) throws IOException {
68      super.init(fs, path, conf, overwritable);
69      assert this.output == null;
70      boolean doCompress = initializeCompressionContext(conf, path);
71      this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
72        HLog.DEFAULT_WAL_TRAILER_WARN_SIZE);
73      int bufferSize = FSUtils.getDefaultBufferSize(fs);
74      short replication = (short)conf.getInt(
75          "hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path));
76      long blockSize = conf.getLong("hbase.regionserver.hlog.blocksize",
77          FSUtils.getDefaultBlockSize(fs, path));
78      output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
79      output.write(ProtobufLogReader.PB_WAL_MAGIC);
80      boolean doTagCompress = doCompress
81          && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
82      buildWALHeader(
83          WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))
84          .writeDelimitedTo(output);
85  
86      initAfterHeader(doCompress);
87  
88      // instantiate trailer to default value.
89      trailer = WALTrailer.newBuilder().build();
90      if (LOG.isTraceEnabled()) {
91        LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress);
92      }
93    }
94  
95    protected void initAfterHeader(boolean doCompress) throws IOException {
96      WALCellCodec codec = getCodec(conf, this.compressionContext);
97      this.cellEncoder = codec.getEncoder(this.output);
98      if (doCompress) {
99        this.compressor = codec.getByteStringCompressor();
100     }
101   }
102 
103   @Override
104   public void append(HLog.Entry entry) throws IOException {
105     entry.setCompressionContext(compressionContext);
106     entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size())
107       .build().writeDelimitedTo(output);
108     for (KeyValue kv : entry.getEdit().getKeyValues()) {
109       // cellEncoder must assume little about the stream, since we write PB and cells in turn.
110       cellEncoder.write(kv);
111     }
112   }
113 
114   @Override
115   public void close() throws IOException {
116     if (this.output != null) {
117       try {
118         if (!trailerWritten) writeWALTrailer();
119         this.output.close();
120       } catch (NullPointerException npe) {
121         // Can get a NPE coming up from down in DFSClient$DFSOutputStream#close
122         LOG.warn(npe);
123       }
124       this.output = null;
125     }
126   }
127 
128   protected WALTrailer buildWALTrailer(WALTrailer.Builder builder) {
129     return builder.build();
130   }
131 
132   private void writeWALTrailer() {
133     try {
134       int trailerSize = 0;
135       if (this.trailer == null) {
136         // use default trailer.
137         LOG.warn("WALTrailer is null. Continuing with default.");
138         this.trailer = buildWALTrailer(WALTrailer.newBuilder());
139         trailerSize = this.trailer.getSerializedSize();
140       } else if ((trailerSize = this.trailer.getSerializedSize()) > this.trailerWarnSize) {
141         // continue writing after warning the user.
142         LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " +
143           trailerSize + " > " + this.trailerWarnSize);
144       }
145       this.trailer.writeTo(output);
146       output.writeInt(trailerSize);
147       output.write(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC);
148       this.trailerWritten = true;
149     } catch (IOException ioe) {
150       LOG.error("Got IOException while writing trailer", ioe);
151     }
152   }
153 
154   @Override
155   public void sync() throws IOException {
156     try {
157       this.output.flush();
158       this.output.sync();
159     } catch (NullPointerException npe) {
160       // Concurrent close...
161       throw new IOException(npe);
162     }
163   }
164 
165   @Override
166   public long getLength() throws IOException {
167     try {
168       return this.output.getPos();
169     } catch (NullPointerException npe) {
170       // Concurrent close...
171       throw new IOException(npe);
172     }
173   }
174 
175   public FSDataOutputStream getStream() {
176     return this.output;
177   }
178 
179   @Override
180   public void setWALTrailer(WALTrailer walTrailer) {
181     this.trailer = walTrailer;
182   }
183 }