1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import java.io.IOException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FSDataOutputStream;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.KeyValue;
32 import org.apache.hadoop.hbase.codec.Codec;
33 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
34 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
35 import org.apache.hadoop.hbase.util.FSUtils;
36
37
38
39
40 @InterfaceAudience.Private
41 public class ProtobufLogWriter extends WriterBase {
42 private final Log LOG = LogFactory.getLog(this.getClass());
43 protected FSDataOutputStream output;
44 protected Codec.Encoder cellEncoder;
45 protected WALCellCodec.ByteStringCompressor compressor;
46 private boolean trailerWritten;
47 private WALTrailer trailer;
48
49
50 private int trailerWarnSize;
51
52 public ProtobufLogWriter() {
53 super();
54 }
55
56 protected WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext)
57 throws IOException {
58 return WALCellCodec.create(conf, compressionContext);
59 }
60
61 protected WALHeader buildWALHeader(WALHeader.Builder builder) throws IOException {
62 return builder.build();
63 }
64
65 @Override
66 @SuppressWarnings("deprecation")
67 public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) throws IOException {
68 super.init(fs, path, conf, overwritable);
69 assert this.output == null;
70 boolean doCompress = initializeCompressionContext(conf, path);
71 this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
72 HLog.DEFAULT_WAL_TRAILER_WARN_SIZE);
73 int bufferSize = FSUtils.getDefaultBufferSize(fs);
74 short replication = (short)conf.getInt(
75 "hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path));
76 long blockSize = conf.getLong("hbase.regionserver.hlog.blocksize",
77 FSUtils.getDefaultBlockSize(fs, path));
78 output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
79 output.write(ProtobufLogReader.PB_WAL_MAGIC);
80 boolean doTagCompress = doCompress
81 && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
82 buildWALHeader(
83 WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))
84 .writeDelimitedTo(output);
85
86 initAfterHeader(doCompress);
87
88
89 trailer = WALTrailer.newBuilder().build();
90 if (LOG.isTraceEnabled()) {
91 LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress);
92 }
93 }
94
95 protected void initAfterHeader(boolean doCompress) throws IOException {
96 WALCellCodec codec = getCodec(conf, this.compressionContext);
97 this.cellEncoder = codec.getEncoder(this.output);
98 if (doCompress) {
99 this.compressor = codec.getByteStringCompressor();
100 }
101 }
102
103 @Override
104 public void append(HLog.Entry entry) throws IOException {
105 entry.setCompressionContext(compressionContext);
106 entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size())
107 .build().writeDelimitedTo(output);
108 for (KeyValue kv : entry.getEdit().getKeyValues()) {
109
110 cellEncoder.write(kv);
111 }
112 }
113
114 @Override
115 public void close() throws IOException {
116 if (this.output != null) {
117 try {
118 if (!trailerWritten) writeWALTrailer();
119 this.output.close();
120 } catch (NullPointerException npe) {
121
122 LOG.warn(npe);
123 }
124 this.output = null;
125 }
126 }
127
128 protected WALTrailer buildWALTrailer(WALTrailer.Builder builder) {
129 return builder.build();
130 }
131
132 private void writeWALTrailer() {
133 try {
134 int trailerSize = 0;
135 if (this.trailer == null) {
136
137 LOG.warn("WALTrailer is null. Continuing with default.");
138 this.trailer = buildWALTrailer(WALTrailer.newBuilder());
139 trailerSize = this.trailer.getSerializedSize();
140 } else if ((trailerSize = this.trailer.getSerializedSize()) > this.trailerWarnSize) {
141
142 LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " +
143 trailerSize + " > " + this.trailerWarnSize);
144 }
145 this.trailer.writeTo(output);
146 output.writeInt(trailerSize);
147 output.write(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC);
148 this.trailerWritten = true;
149 } catch (IOException ioe) {
150 LOG.error("Got IOException while writing trailer", ioe);
151 }
152 }
153
154 @Override
155 public void sync() throws IOException {
156 try {
157 this.output.flush();
158 this.output.sync();
159 } catch (NullPointerException npe) {
160
161 throw new IOException(npe);
162 }
163 }
164
165 @Override
166 public long getLength() throws IOException {
167 try {
168 return this.output.getPos();
169 } catch (NullPointerException npe) {
170
171 throw new IOException(npe);
172 }
173 }
174
175 public FSDataOutputStream getStream() {
176 return this.output;
177 }
178
179 @Override
180 public void setWALTrailer(WALTrailer walTrailer) {
181 this.trailer = walTrailer;
182 }
183 }