1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapred;
19
20 import org.apache.hadoop.fs.Path;
21 import org.apache.hadoop.hbase.client.Result;
22 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23 import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
24 import org.apache.hadoop.mapred.InputFormat;
25 import org.apache.hadoop.mapred.InputSplit;
26 import org.apache.hadoop.mapred.JobConf;
27 import org.apache.hadoop.mapred.RecordReader;
28 import org.apache.hadoop.mapred.Reporter;
29
30 import java.io.DataInput;
31 import java.io.DataOutput;
32 import java.io.IOException;
33 import java.util.List;
34
35 public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
36
37 static class TableSnapshotRegionSplit implements InputSplit {
38 private TableSnapshotInputFormatImpl.InputSplit delegate;
39
40
41 public TableSnapshotRegionSplit() {
42 this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
43 }
44
45 public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
46 this.delegate = delegate;
47 }
48
49 public TableSnapshotRegionSplit(String regionName, List<String> locations) {
50 this.delegate = new TableSnapshotInputFormatImpl.InputSplit(regionName, locations);
51 }
52
53 @Override
54 public long getLength() throws IOException {
55 return delegate.getLength();
56 }
57
58 @Override
59 public String[] getLocations() throws IOException {
60 return delegate.getLocations();
61 }
62
63 @Override
64 public void write(DataOutput out) throws IOException {
65 delegate.write(out);
66 }
67
68 @Override
69 public void readFields(DataInput in) throws IOException {
70 delegate.readFields(in);
71 }
72 }
73
74 static class TableSnapshotRecordReader
75 implements RecordReader<ImmutableBytesWritable, Result> {
76
77 private TableSnapshotInputFormatImpl.RecordReader delegate;
78
79 public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job)
80 throws IOException {
81 delegate = new TableSnapshotInputFormatImpl.RecordReader();
82 delegate.initialize(split.delegate, job);
83 }
84
85 @Override
86 public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
87 if (!delegate.nextKeyValue()) {
88 return false;
89 }
90 ImmutableBytesWritable currentKey = delegate.getCurrentKey();
91 key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength());
92 value.copyFrom(delegate.getCurrentValue());
93 return true;
94 }
95
96 @Override
97 public ImmutableBytesWritable createKey() {
98 return new ImmutableBytesWritable();
99 }
100
101 @Override
102 public Result createValue() {
103 return new Result();
104 }
105
106 @Override
107 public long getPos() throws IOException {
108 return delegate.getPos();
109 }
110
111 @Override
112 public void close() throws IOException {
113 delegate.close();
114 }
115
116 @Override
117 public float getProgress() throws IOException {
118 return delegate.getProgress();
119 }
120 }
121
122 @Override
123 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
124 List<TableSnapshotInputFormatImpl.InputSplit> splits =
125 TableSnapshotInputFormatImpl.getSplits(job);
126 InputSplit[] results = new InputSplit[splits.size()];
127 for (int i = 0; i < splits.size(); i++) {
128 results[i] = new TableSnapshotRegionSplit(splits.get(i));
129 }
130 return results;
131 }
132
133 @Override
134 public RecordReader<ImmutableBytesWritable, Result>
135 getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
136 return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
137 }
138
139
140
141
142
143
144
145
146
147
148 public static void setInput(JobConf job, String snapshotName, Path restoreDir) throws IOException {
149 TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir);
150 }
151 }