1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import java.io.IOException;
23 import java.lang.reflect.Field;
24 import java.lang.reflect.InvocationTargetException;
25 import java.util.TreeMap;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FSDataOutputStream;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
35 import org.apache.hadoop.hbase.util.FSUtils;
36 import org.apache.hadoop.io.SequenceFile;
37 import org.apache.hadoop.io.SequenceFile.CompressionType;
38 import org.apache.hadoop.io.SequenceFile.Metadata;
39 import org.apache.hadoop.io.Text;
40 import org.apache.hadoop.io.compress.CompressionCodec;
41 import org.apache.hadoop.io.compress.DefaultCodec;
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class SequenceFileLogWriter extends WriterBase {
49 private final Log LOG = LogFactory.getLog(this.getClass());
50
51 private SequenceFile.Writer writer;
52
53
54 private FSDataOutputStream writer_out;
55
56
57 private static final Text WAL_VERSION_KEY = new Text("version");
58 private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
59 private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
60
61
62
63
64 public SequenceFileLogWriter() {
65 super();
66 }
67
68
69
70
71
72
73
74 private static Metadata createMetadata(final Configuration conf,
75 final boolean compress) {
76 TreeMap<Text, Text> metaMap = new TreeMap<Text, Text>();
77 metaMap.put(WAL_VERSION_KEY, new Text("1"));
78 if (compress) {
79
80 metaMap.put(WAL_COMPRESSION_TYPE_KEY, DICTIONARY_COMPRESSION_TYPE);
81 }
82 return new Metadata(metaMap);
83 }
84
85 @Override
86 public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
87 throws IOException {
88 super.init(fs, path, conf, overwritable);
89 boolean compress = initializeCompressionContext(conf, path);
90
91 try {
92
93
94 this.writer = (SequenceFile.Writer) SequenceFile.class
95 .getMethod("createWriter", new Class[] {FileSystem.class,
96 Configuration.class, Path.class, Class.class, Class.class,
97 Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE,
98 CompressionType.class, CompressionCodec.class, Metadata.class})
99 .invoke(null, new Object[] {fs, conf, path, HLogKey.class, WALEdit.class,
100 Integer.valueOf(FSUtils.getDefaultBufferSize(fs)),
101 Short.valueOf((short)
102 conf.getInt("hbase.regionserver.hlog.replication",
103 FSUtils.getDefaultReplication(fs, path))),
104 Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize",
105 FSUtils.getDefaultBlockSize(fs, path))),
106 Boolean.valueOf(false)
107 SequenceFile.CompressionType.NONE, new DefaultCodec(),
108 createMetadata(conf, compress)
109 });
110 } catch (InvocationTargetException ite) {
111
112 throw new IOException(ite.getCause());
113 } catch (Exception e) {
114
115 }
116
117
118 if (this.writer == null) {
119 LOG.debug("new createWriter -- HADOOP-6840 -- not available");
120 this.writer = SequenceFile.createWriter(fs, conf, path,
121 HLogKey.class, WALEdit.class,
122 FSUtils.getDefaultBufferSize(fs),
123 (short) conf.getInt("hbase.regionserver.hlog.replication",
124 FSUtils.getDefaultReplication(fs, path)),
125 conf.getLong("hbase.regionserver.hlog.blocksize",
126 FSUtils.getDefaultBlockSize(fs, path)),
127 SequenceFile.CompressionType.NONE,
128 new DefaultCodec(),
129 null,
130 createMetadata(conf, compress));
131 } else {
132 if (LOG.isTraceEnabled()) LOG.trace("Using new createWriter -- HADOOP-6840");
133 }
134
135 this.writer_out = getSequenceFilePrivateFSDataOutputStreamAccessible();
136 if (LOG.isTraceEnabled()) LOG.trace("Path=" + path + ", compression=" + compress);
137 }
138
139
140
141 private FSDataOutputStream getSequenceFilePrivateFSDataOutputStreamAccessible()
142 throws IOException {
143 FSDataOutputStream out = null;
144 final Field fields [] = this.writer.getClass().getDeclaredFields();
145 final String fieldName = "out";
146 for (int i = 0; i < fields.length; ++i) {
147 if (fieldName.equals(fields[i].getName())) {
148 try {
149
150 fields[i].setAccessible(true);
151 out = (FSDataOutputStream)fields[i].get(this.writer);
152 break;
153 } catch (IllegalAccessException ex) {
154 throw new IOException("Accessing " + fieldName, ex);
155 } catch (SecurityException e) {
156 LOG.warn("Does not have access to out field from FSDataOutputStream",
157 e);
158 }
159 }
160 }
161 return out;
162 }
163
164 @Override
165 public void append(HLog.Entry entry) throws IOException {
166 entry.setCompressionContext(compressionContext);
167 try {
168 this.writer.append(entry.getKey(), entry.getEdit());
169 } catch (NullPointerException npe) {
170
171 throw new IOException(npe);
172 }
173 }
174
175 @Override
176 public void close() throws IOException {
177 if (this.writer != null) {
178 try {
179 this.writer.close();
180 } catch (NullPointerException npe) {
181
182 LOG.warn(npe);
183 }
184 this.writer = null;
185 }
186 }
187
188 @Override
189 public void sync() throws IOException {
190 try {
191 this.writer.syncFs();
192 } catch (NullPointerException npe) {
193
194 throw new IOException(npe);
195 }
196 }
197
198 @Override
199 public long getLength() throws IOException {
200 try {
201 return this.writer.getLength();
202 } catch (NullPointerException npe) {
203
204 throw new IOException(npe);
205 }
206 }
207
208
209
210
211
212 public FSDataOutputStream getWriterFSDataOutputStream() {
213 return this.writer_out;
214 }
215
216
217
218
219 @Override
220 public void setWALTrailer(WALTrailer walTrailer) {
221 }
222 }