1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24 import java.lang.reflect.Method;
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.classification.InterfaceStability;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.client.Result;
34 import org.apache.hadoop.hbase.client.Scan;
35 import org.apache.hadoop.hbase.client.TableSnapshotScanner;
36 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
37 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
38 import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
39 import org.apache.hadoop.io.Writable;
40 import org.apache.hadoop.mapreduce.InputFormat;
41 import org.apache.hadoop.mapreduce.InputSplit;
42 import org.apache.hadoop.mapreduce.Job;
43 import org.apache.hadoop.mapreduce.JobContext;
44 import org.apache.hadoop.mapreduce.RecordReader;
45 import org.apache.hadoop.mapreduce.TaskAttemptContext;
46
47 import com.google.common.annotations.VisibleForTesting;
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 @InterfaceAudience.Public
87 @InterfaceStability.Evolving
88 public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
89
90 private static final Log LOG = LogFactory.getLog(TableSnapshotInputFormat.class);
91
92 @VisibleForTesting
93 static class TableSnapshotRegionSplit extends InputSplit implements Writable {
94 private TableSnapshotInputFormatImpl.InputSplit delegate;
95
96 public TableSnapshotRegionSplit() {
97 this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
98 }
99
100 public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
101 this.delegate = delegate;
102 }
103
104 public TableSnapshotRegionSplit(String regionName, List<String> locations) {
105 this.delegate = new TableSnapshotInputFormatImpl.InputSplit(regionName, locations);
106 }
107
108 @Override
109 public long getLength() throws IOException, InterruptedException {
110 return delegate.getLength();
111 }
112
113 @Override
114 public String[] getLocations() throws IOException, InterruptedException {
115 return delegate.getLocations();
116 }
117
118 @Override
119 public void write(DataOutput out) throws IOException {
120 delegate.write(out);
121 }
122
123 @Override
124 public void readFields(DataInput in) throws IOException {
125 delegate.readFields(in);
126 }
127 }
128
129 @VisibleForTesting
130 static class TableSnapshotRegionRecordReader extends RecordReader<ImmutableBytesWritable, Result> {
131 private TableSnapshotInputFormatImpl.RecordReader delegate =
132 new TableSnapshotInputFormatImpl.RecordReader();
133 private TaskAttemptContext context;
134 private Method getCounter;
135
136 @Override
137 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
138 InterruptedException {
139
140 this.context = context;
141 getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
142 delegate.initialize(
143 ((TableSnapshotRegionSplit) split).delegate,
144 context.getConfiguration());
145 }
146
147 @Override
148 public boolean nextKeyValue() throws IOException, InterruptedException {
149 boolean result = delegate.nextKeyValue();
150 if (result) {
151 ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
152 if (scanMetrics != null && context != null) {
153 TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context);
154 }
155 }
156
157 return result;
158 }
159
160 @Override
161 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
162 return delegate.getCurrentKey();
163 }
164
165 @Override
166 public Result getCurrentValue() throws IOException, InterruptedException {
167 return delegate.getCurrentValue();
168 }
169
170 @Override
171 public float getProgress() throws IOException, InterruptedException {
172 return delegate.getProgress();
173 }
174
175 @Override
176 public void close() throws IOException {
177 delegate.close();
178 }
179 }
180
181 @Override
182 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
183 InputSplit split, TaskAttemptContext context) throws IOException {
184 return new TableSnapshotRegionRecordReader();
185 }
186
187 @Override
188 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
189 List<InputSplit> results = new ArrayList<InputSplit>();
190 for (TableSnapshotInputFormatImpl.InputSplit split :
191 TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) {
192 results.add(new TableSnapshotRegionSplit(split));
193 }
194 return results;
195 }
196
197
198
199
200
201
202
203
204
205
206 public static void setInput(Job job, String snapshotName, Path restoreDir) throws IOException {
207 TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir);
208 }
209 }