1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import java.io.FilterInputStream;
23 import java.io.IOException;
24 import java.lang.reflect.Field;
25 import java.lang.reflect.Method;
26 import java.util.NavigableMap;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FSDataInputStream;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
36 import org.apache.hadoop.io.SequenceFile;
37 import org.apache.hadoop.io.SequenceFile.Metadata;
38 import org.apache.hadoop.io.Text;
39
40 @InterfaceAudience.Private
41 public class SequenceFileLogReader extends ReaderBase {
42 private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class);
43
44
45 private static final Text WAL_VERSION_KEY = new Text("version");
46
47
48
49 private static final int COMPRESSION_VERSION = 1;
50 private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
51 private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
52
53
54
55
56
57
58
59
60
61
62
63
64
65 private static class WALReader extends SequenceFile.Reader {
66
67 WALReader(final FileSystem fs, final Path p, final Configuration c)
68 throws IOException {
69 super(fs, p, c);
70 }
71
72 @Override
73 protected FSDataInputStream openFile(FileSystem fs, Path file,
74 int bufferSize, long length)
75 throws IOException {
76 return new WALReaderFSDataInputStream(super.openFile(fs, file,
77 bufferSize, length), length);
78 }
79
80
81
82
83 static class WALReaderFSDataInputStream extends FSDataInputStream {
84 private boolean firstGetPosInvocation = true;
85 private long length;
86
87 WALReaderFSDataInputStream(final FSDataInputStream is, final long l)
88 throws IOException {
89 super(is);
90 this.length = l;
91 }
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108 @Override
109 public long getPos() throws IOException {
110 if (this.firstGetPosInvocation) {
111 this.firstGetPosInvocation = false;
112 long adjust = 0;
113
114 try {
115 Field fIn = FilterInputStream.class.getDeclaredField("in");
116 fIn.setAccessible(true);
117 Object realIn = fIn.get(this.in);
118
119
120 if (realIn.getClass().getName().endsWith("DFSInputStream")) {
121 Method getFileLength = realIn.getClass().
122 getDeclaredMethod("getFileLength", new Class<?> []{});
123 getFileLength.setAccessible(true);
124 long realLength = ((Long)getFileLength.
125 invoke(realIn, new Object []{})).longValue();
126 assert(realLength >= this.length);
127 adjust = realLength - this.length;
128 } else {
129 LOG.info("Input stream class: " + realIn.getClass().getName() +
130 ", not adjusting length");
131 }
132 } catch(Exception e) {
133 SequenceFileLogReader.LOG.warn(
134 "Error while trying to get accurate file length. " +
135 "Truncation / data loss may occur if RegionServers die.", e);
136 }
137
138 return adjust + super.getPos();
139 }
140 return super.getPos();
141 }
142 }
143 }
144
145
146 protected SequenceFile.Reader reader;
147 long entryStart = 0;
148
149 public SequenceFileLogReader() {
150 super();
151 }
152
153 @Override
154 public void close() throws IOException {
155 try {
156 if (reader != null) {
157 this.reader.close();
158 this.reader = null;
159 }
160 } catch (IOException ioe) {
161 throw addFileInfoToException(ioe);
162 }
163 }
164
165 @Override
166 public long getPosition() throws IOException {
167 return reader != null ? reader.getPosition() : 0;
168 }
169
170 @Override
171 public void reset() throws IOException {
172
173
174 reader = new WALReader(fs, path, conf);
175 }
176
177 @Override
178 protected void initReader(FSDataInputStream stream) throws IOException {
179
180 if (stream != null) {
181 stream.close();
182 }
183 reset();
184 }
185
186 @Override
187 protected void initAfterCompression() throws IOException {
188
189 }
190
191 @Override
192 protected boolean hasCompression() {
193 return isWALCompressionEnabled(reader.getMetadata());
194 }
195
196 @Override
197 protected boolean hasTagCompression() {
198
199 return false;
200 }
201
202
203
204
205
206 static boolean isWALCompressionEnabled(final Metadata metadata) {
207
208 Text txt = metadata.get(WAL_VERSION_KEY);
209 if (txt == null || Integer.parseInt(txt.toString()) < COMPRESSION_VERSION) {
210 return false;
211 }
212
213 txt = metadata.get(WAL_COMPRESSION_TYPE_KEY);
214 return txt != null && txt.equals(DICTIONARY_COMPRESSION_TYPE);
215 }
216
217
218 @Override
219 protected boolean readNext(Entry e) throws IOException {
220 try {
221 boolean hasNext = this.reader.next(e.getKey(), e.getEdit());
222 if (!hasNext) return false;
223
224 NavigableMap<byte[], Integer> scopes = e.getEdit().getAndRemoveScopes();
225 if (scopes != null) {
226 e.getKey().readOlderScopes(scopes);
227 }
228 return true;
229 } catch (IOException ioe) {
230 throw addFileInfoToException(ioe);
231 }
232 }
233
234 @Override
235 protected void seekOnFs(long pos) throws IOException {
236 try {
237 reader.seek(pos);
238 } catch (IOException ioe) {
239 throw addFileInfoToException(ioe);
240 }
241 }
242
243 protected IOException addFileInfoToException(final IOException ioe)
244 throws IOException {
245 long pos = -1;
246 try {
247 pos = getPosition();
248 } catch (IOException e) {
249 LOG.warn("Failed getting position to add to throw", e);
250 }
251
252
253 long end = Long.MAX_VALUE;
254 try {
255 Field fEnd = SequenceFile.Reader.class.getDeclaredField("end");
256 fEnd.setAccessible(true);
257 end = fEnd.getLong(this.reader);
258 } catch(NoSuchFieldException nfe) {
259
260 } catch(IllegalAccessException iae) {
261
262 } catch(Exception e) {
263
264 LOG.warn("Unexpected exception when accessing the end field", e);
265 }
266
267 String msg = (this.path == null? "": this.path.toString()) +
268 ", entryStart=" + entryStart + ", pos=" + pos +
269 ((end == Long.MAX_VALUE) ? "" : ", end=" + end) +
270 ", edit=" + this.edit;
271
272
273 try {
274 return (IOException) ioe.getClass()
275 .getConstructor(String.class)
276 .newInstance(msg)
277 .initCause(ioe);
278 } catch(NoSuchMethodException nfe) {
279
280 } catch(IllegalAccessException iae) {
281
282 } catch(Exception e) {
283
284 LOG.warn("Unexpected exception when accessing the end field", e);
285 }
286 return ioe;
287 }
288 }