1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FileStatus;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.regionserver.wal.HLog;
36 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
37 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
38 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
39 import org.apache.hadoop.io.Writable;
40 import org.apache.hadoop.mapreduce.InputFormat;
41 import org.apache.hadoop.mapreduce.InputSplit;
42 import org.apache.hadoop.mapreduce.JobContext;
43 import org.apache.hadoop.mapreduce.RecordReader;
44 import org.apache.hadoop.mapreduce.TaskAttemptContext;
45
46
47
48
49 @InterfaceAudience.Public
50 public class HLogInputFormat extends InputFormat<HLogKey, WALEdit> {
51 private static final Log LOG = LogFactory.getLog(HLogInputFormat.class);
52
53 public static final String START_TIME_KEY = "hlog.start.time";
54 public static final String END_TIME_KEY = "hlog.end.time";
55
56
57
58
59
60 static class HLogSplit extends InputSplit implements Writable {
61 private String logFileName;
62 private long fileSize;
63 private long startTime;
64 private long endTime;
65
66
67 public HLogSplit() {}
68
69
70
71
72
73
74
75
76
77
78 public HLogSplit(String logFileName, long fileSize, long startTime, long endTime) {
79 this.logFileName = logFileName;
80 this.fileSize = fileSize;
81 this.startTime = startTime;
82 this.endTime = endTime;
83 }
84
85 @Override
86 public long getLength() throws IOException, InterruptedException {
87 return fileSize;
88 }
89
90 @Override
91 public String[] getLocations() throws IOException, InterruptedException {
92
93 return new String[] {};
94 }
95
96 public String getLogFileName() {
97 return logFileName;
98 }
99
100 public long getStartTime() {
101 return startTime;
102 }
103
104 public long getEndTime() {
105 return endTime;
106 }
107
108 @Override
109 public void readFields(DataInput in) throws IOException {
110 logFileName = in.readUTF();
111 fileSize = in.readLong();
112 startTime = in.readLong();
113 endTime = in.readLong();
114 }
115
116 @Override
117 public void write(DataOutput out) throws IOException {
118 out.writeUTF(logFileName);
119 out.writeLong(fileSize);
120 out.writeLong(startTime);
121 out.writeLong(endTime);
122 }
123
124 @Override
125 public String toString() {
126 return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
127 }
128 }
129
130
131
132
133 static class HLogRecordReader extends RecordReader<HLogKey, WALEdit> {
134 private HLog.Reader reader = null;
135 private HLog.Entry currentEntry = new HLog.Entry();
136 private long startTime;
137 private long endTime;
138
139 @Override
140 public void initialize(InputSplit split, TaskAttemptContext context)
141 throws IOException, InterruptedException {
142 HLogSplit hsplit = (HLogSplit)split;
143 Path logFile = new Path(hsplit.getLogFileName());
144 Configuration conf = context.getConfiguration();
145 LOG.info("Opening reader for "+split);
146 try {
147 this.reader = HLogFactory.createReader(logFile.getFileSystem(conf),
148 logFile, conf);
149 } catch (EOFException x) {
150 LOG.info("Ignoring corrupted HLog file: " + logFile
151 + " (This is normal when a RegionServer crashed.)");
152 }
153 this.startTime = hsplit.getStartTime();
154 this.endTime = hsplit.getEndTime();
155 }
156
157 @Override
158 public boolean nextKeyValue() throws IOException, InterruptedException {
159 if (reader == null) return false;
160
161 HLog.Entry temp;
162 long i = -1;
163 do {
164
165 try {
166 temp = reader.next(currentEntry);
167 i++;
168 } catch (EOFException x) {
169 LOG.info("Corrupted entry detected. Ignoring the rest of the file."
170 + " (This is normal when a RegionServer crashed.)");
171 return false;
172 }
173 }
174 while(temp != null && temp.getKey().getWriteTime() < startTime);
175
176 if (temp == null) {
177 if (i > 0) LOG.info("Skipped " + i + " entries.");
178 LOG.info("Reached end of file.");
179 return false;
180 } else if (i > 0) {
181 LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
182 }
183 boolean res = temp.getKey().getWriteTime() <= endTime;
184 if (!res) {
185 LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
186 }
187 return res;
188 }
189
190 @Override
191 public HLogKey getCurrentKey() throws IOException, InterruptedException {
192 return currentEntry.getKey();
193 }
194
195 @Override
196 public WALEdit getCurrentValue() throws IOException, InterruptedException {
197 return currentEntry.getEdit();
198 }
199
200 @Override
201 public float getProgress() throws IOException, InterruptedException {
202
203 return 0;
204 }
205
206 @Override
207 public void close() throws IOException {
208 LOG.info("Closing reader");
209 if (reader != null) this.reader.close();
210 }
211 }
212
213 @Override
214 public List<InputSplit> getSplits(JobContext context) throws IOException,
215 InterruptedException {
216 Configuration conf = context.getConfiguration();
217 Path inputDir = new Path(conf.get("mapred.input.dir"));
218
219 long startTime = conf.getLong(START_TIME_KEY, Long.MIN_VALUE);
220 long endTime = conf.getLong(END_TIME_KEY, Long.MAX_VALUE);
221
222 FileSystem fs = inputDir.getFileSystem(conf);
223 List<FileStatus> files = getFiles(fs, inputDir, startTime, endTime);
224
225 List<InputSplit> splits = new ArrayList<InputSplit>(files.size());
226 for (FileStatus file : files) {
227 splits.add(new HLogSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
228 }
229 return splits;
230 }
231
232 private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
233 throws IOException {
234 List<FileStatus> result = new ArrayList<FileStatus>();
235 LOG.debug("Scanning " + dir.toString() + " for HLog files");
236
237 FileStatus[] files = fs.listStatus(dir);
238 if (files == null) return Collections.emptyList();
239 for (FileStatus file : files) {
240 if (file.isDir()) {
241
242 result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
243 } else {
244 String name = file.getPath().toString();
245 int idx = name.lastIndexOf('.');
246 if (idx > 0) {
247 try {
248 long fileStartTime = Long.parseLong(name.substring(idx+1));
249 if (fileStartTime <= endTime) {
250 LOG.info("Found: " + name);
251 result.add(file);
252 }
253 } catch (NumberFormatException x) {
254 idx = 0;
255 }
256 }
257 if (idx == 0) {
258 LOG.warn("File " + name + " does not appear to be an HLog file. Skipping...");
259 }
260 }
261 }
262 return result;
263 }
264
265 @Override
266 public RecordReader<HLogKey, WALEdit> createRecordReader(InputSplit split,
267 TaskAttemptContext context) throws IOException, InterruptedException {
268 return new HLogRecordReader();
269 }
270 }