1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import org.apache.hadoop.classification.InterfaceAudience;
21 import org.apache.hadoop.classification.InterfaceStability;
22 import org.apache.hadoop.conf.Configuration;
23 import org.apache.hadoop.fs.FileSystem;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.hbase.CellUtil;
26 import org.apache.hadoop.hbase.HConstants;
27 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
28 import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
29 import org.apache.hadoop.hbase.HRegionInfo;
30 import org.apache.hadoop.hbase.HTableDescriptor;
31 import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
32 import org.apache.hadoop.hbase.client.IsolationLevel;
33 import org.apache.hadoop.hbase.client.Result;
34 import org.apache.hadoop.hbase.client.Scan;
35 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
37 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
38 import org.apache.hadoop.hbase.regionserver.HRegion;
39 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
40 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
41 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
42 import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
43 import org.apache.hadoop.hbase.util.ByteStringer;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.hadoop.hbase.util.FSTableDescriptors;
46 import org.apache.hadoop.hbase.util.FSUtils;
47 import org.apache.hadoop.io.Writable;
48 import org.apache.hadoop.mapreduce.Job;
49
50 import java.io.ByteArrayOutputStream;
51 import java.io.DataInput;
52 import java.io.DataOutput;
53 import java.io.IOException;
54 import java.util.ArrayList;
55 import java.util.List;
56 import java.util.Set;
57 import java.util.UUID;
58
59
60
61
62 @InterfaceAudience.Private
63 @InterfaceStability.Evolving
64 public class TableSnapshotInputFormatImpl {
65
66
67
68 private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
69
70 private static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
71
72
73 private static final String LOCALITY_CUTOFF_MULTIPLIER = "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
74 private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
75
76
77
78
79 public static class InputSplit implements Writable {
80 private String regionName;
81 private String[] locations;
82
83
84 public InputSplit() { }
85
86 public InputSplit(String regionName, List<String> locations) {
87 this.regionName = regionName;
88 if (locations == null || locations.isEmpty()) {
89 this.locations = new String[0];
90 } else {
91 this.locations = locations.toArray(new String[locations.size()]);
92 }
93 }
94
95 public long getLength() {
96
97 return 0;
98 }
99
100 public String[] getLocations() {
101 return locations;
102 }
103
104
105
106 @Override
107 public void write(DataOutput out) throws IOException {
108 MapReduceProtos.TableSnapshotRegionSplit.Builder builder =
109 MapReduceProtos.TableSnapshotRegionSplit.newBuilder()
110 .setRegion(HBaseProtos.RegionSpecifier.newBuilder()
111 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.ENCODED_REGION_NAME)
112 .setValue(ByteStringer.wrap(Bytes.toBytes(regionName))).build());
113
114 for (String location : locations) {
115 builder.addLocations(location);
116 }
117
118 MapReduceProtos.TableSnapshotRegionSplit split = builder.build();
119
120 ByteArrayOutputStream baos = new ByteArrayOutputStream();
121 split.writeTo(baos);
122 baos.close();
123 byte[] buf = baos.toByteArray();
124 out.writeInt(buf.length);
125 out.write(buf);
126 }
127
128 @Override
129 public void readFields(DataInput in) throws IOException {
130 int len = in.readInt();
131 byte[] buf = new byte[len];
132 in.readFully(buf);
133 MapReduceProtos.TableSnapshotRegionSplit split = MapReduceProtos.TableSnapshotRegionSplit.PARSER.parseFrom(buf);
134 this.regionName = Bytes.toString(split.getRegion().getValue().toByteArray());
135 List<String> locationsList = split.getLocationsList();
136 this.locations = locationsList.toArray(new String[locationsList.size()]);
137 }
138 }
139
140
141
142
143 public static class RecordReader {
144 InputSplit split;
145 private Scan scan;
146 private Result result = null;
147 private ImmutableBytesWritable row = null;
148 private ClientSideRegionScanner scanner;
149
150 public ClientSideRegionScanner getScanner() {
151 return scanner;
152 }
153
154 public void initialize(InputSplit split, Configuration conf) throws IOException {
155 this.split = split;
156 String regionName = this.split.regionName;
157 String snapshotName = getSnapshotName(conf);
158 Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
159 FileSystem fs = rootDir.getFileSystem(conf);
160
161 Path tmpRootDir = new Path(conf.get(RESTORE_DIR_KEY));
162
163
164 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
165
166
167 HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir);
168
169
170 Path regionDir = new Path(snapshotDir, regionName);
171 HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
172
173
174
175 if (conf.get(TableInputFormat.SCAN) != null) {
176 scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
177 } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
178 String[] columns =
179 conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
180 scan = new Scan();
181 for (String col : columns) {
182 scan.addFamily(Bytes.toBytes(col));
183 }
184 } else {
185 throw new IllegalArgumentException("A Scan is not configured for this job");
186 }
187
188
189
190 scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
191
192 scan.setCacheBlocks(false);
193
194 scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
195 }
196
197 public boolean nextKeyValue() throws IOException {
198 result = scanner.next();
199 if (result == null) {
200
201 return false;
202 }
203
204 if (this.row == null) {
205 this.row = new ImmutableBytesWritable();
206 }
207 this.row.set(result.getRow());
208 return true;
209 }
210
211 public ImmutableBytesWritable getCurrentKey() {
212 return row;
213 }
214
215 public Result getCurrentValue() {
216 return result;
217 }
218
219 public long getPos() {
220 return 0;
221 }
222
223 public float getProgress() {
224 return 0;
225 }
226
227 public void close() {
228 if (this.scanner != null) {
229 this.scanner.close();
230 }
231 }
232 }
233
234 public static List<InputSplit> getSplits(Configuration conf) throws IOException {
235 String snapshotName = getSnapshotName(conf);
236
237 Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
238 FileSystem fs = rootDir.getFileSystem(conf);
239
240 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
241 HBaseProtos.SnapshotDescription snapshotDesc =
242 SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
243
244 Set<String> snapshotRegionNames =
245 SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir);
246 if (snapshotRegionNames == null) {
247 throw new IllegalArgumentException("Snapshot seems empty");
248 }
249
250
251 HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs,
252 snapshotDir);
253
254
255 Scan scan = null;
256 if (conf.get(TableInputFormat.SCAN) != null) {
257 scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
258 } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
259 String[] columns =
260 conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
261 scan = new Scan();
262 for (String col : columns) {
263 scan.addFamily(Bytes.toBytes(col));
264 }
265 } else {
266 throw new IllegalArgumentException("Unable to create scan");
267 }
268
269 Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
270 Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
271
272 List<InputSplit> splits = new ArrayList<InputSplit>();
273 for (String regionName : snapshotRegionNames) {
274
275 Path regionDir = new Path(snapshotDir, regionName);
276 HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs,
277 regionDir);
278
279 if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
280 hri.getStartKey(), hri.getEndKey())) {
281
282
283 List<String> hosts = getBestLocations(conf,
284 HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
285
286 int len = Math.min(3, hosts.size());
287 hosts = hosts.subList(0, len);
288 splits.add(new InputSplit(regionName, hosts));
289 }
290 }
291
292 return splits;
293 }
294
295
296
297
298
299
300
301
302
303
304
305
306 public static List<String> getBestLocations(
307 Configuration conf, HDFSBlocksDistribution blockDistribution) {
308 List<String> locations = new ArrayList<String>(3);
309
310 HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
311
312 if (hostAndWeights.length == 0) {
313 return locations;
314 }
315
316 HostAndWeight topHost = hostAndWeights[0];
317 locations.add(topHost.getHost());
318
319
320 double cutoffMultiplier
321 = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
322
323 double filterWeight = topHost.getWeight() * cutoffMultiplier;
324
325 for (int i = 1; i < hostAndWeights.length; i++) {
326 if (hostAndWeights[i].getWeight() >= filterWeight) {
327 locations.add(hostAndWeights[i].getHost());
328 } else {
329 break;
330 }
331 }
332
333 return locations;
334 }
335
336 private static String getSnapshotName(Configuration conf) {
337 String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
338 if (snapshotName == null) {
339 throw new IllegalArgumentException("Snapshot name must be provided");
340 }
341 return snapshotName;
342 }
343
344
345
346
347
348
349
350
351
352
353 public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
354 throws IOException {
355 conf.set(SNAPSHOT_NAME_KEY, snapshotName);
356
357 Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
358 FileSystem fs = rootDir.getFileSystem(conf);
359
360 restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
361
362
363 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
364
365 conf.set(RESTORE_DIR_KEY, restoreDir.toString());
366 }
367 }