1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import java.io.IOException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FSDataInputStream;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.io.util.LRUDictionary;
33 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
34 import org.apache.hadoop.hbase.util.FSUtils;
35
36 @InterfaceAudience.Private
37 public abstract class ReaderBase implements HLog.Reader {
38 private static final Log LOG = LogFactory.getLog(ReaderBase.class);
39 protected Configuration conf;
40 protected FileSystem fs;
41 protected Path path;
42 protected long edit = 0;
43 protected long fileLength;
44 protected WALTrailer trailer;
45
46
47 protected int trailerWarnSize;
48
49
50
51 protected CompressionContext compressionContext = null;
52 protected boolean emptyCompressionContext = true;
53
54
55
56
57 public ReaderBase() {
58 }
59
60 @Override
61 public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
62 throws IOException {
63 this.conf = conf;
64 this.path = path;
65 this.fs = fs;
66 this.fileLength = this.fs.getFileStatus(path).getLen();
67 this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
68 HLog.DEFAULT_WAL_TRAILER_WARN_SIZE);
69 initReader(stream);
70
71 boolean compression = hasCompression();
72 if (compression) {
73
74 try {
75 if (compressionContext == null) {
76 compressionContext = new CompressionContext(LRUDictionary.class,
77 FSUtils.isRecoveredEdits(path), hasTagCompression());
78 } else {
79 compressionContext.clear();
80 }
81 } catch (Exception e) {
82 throw new IOException("Failed to initialize CompressionContext", e);
83 }
84 }
85 initAfterCompression();
86 }
87
88 @Override
89 public HLog.Entry next() throws IOException {
90 return next(null);
91 }
92
93 @Override
94 public HLog.Entry next(HLog.Entry reuse) throws IOException {
95 HLog.Entry e = reuse;
96 if (e == null) {
97 e = new HLog.Entry(new HLogKey(), new WALEdit());
98 }
99 if (compressionContext != null) {
100 e.setCompressionContext(compressionContext);
101 }
102
103 boolean hasEntry = false;
104 try {
105 hasEntry = readNext(e);
106 } catch (IllegalArgumentException iae) {
107 TableName tableName = e.getKey().getTablename();
108 if (tableName != null && tableName.equals(TableName.OLD_ROOT_TABLE_NAME)) {
109
110 LOG.info("Got an old ROOT edit, ignoring ");
111 return next(e);
112 }
113 else throw iae;
114 }
115 edit++;
116 if (compressionContext != null && emptyCompressionContext) {
117 emptyCompressionContext = false;
118 }
119 return hasEntry ? e : null;
120 }
121
122 @Override
123 public void seek(long pos) throws IOException {
124 if (compressionContext != null && emptyCompressionContext) {
125 while (next() != null) {
126 if (getPosition() == pos) {
127 emptyCompressionContext = false;
128 break;
129 }
130 }
131 }
132 seekOnFs(pos);
133 }
134
135
136
137
138
139 protected abstract void initReader(FSDataInputStream stream) throws IOException;
140
141
142
143
144 protected abstract void initAfterCompression() throws IOException;
145
146
147
148 protected abstract boolean hasCompression();
149
150
151
152
153 protected abstract boolean hasTagCompression();
154
155
156
157
158
159
160 protected abstract boolean readNext(HLog.Entry e) throws IOException;
161
162
163
164
165 protected abstract void seekOnFs(long pos) throws IOException;
166
167 @Override
168 public WALTrailer getWALTrailer() {
169 return null;
170 }
171 }