View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import org.apache.hadoop.classification.InterfaceAudience;
21  import org.apache.hadoop.classification.InterfaceStability;
22  import org.apache.hadoop.conf.Configuration;
23  import org.apache.hadoop.fs.FileSystem;
24  import org.apache.hadoop.fs.Path;
25  import org.apache.hadoop.hbase.CellUtil;
26  import org.apache.hadoop.hbase.HConstants;
27  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
28  import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
29  import org.apache.hadoop.hbase.HRegionInfo;
30  import org.apache.hadoop.hbase.HTableDescriptor;
31  import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
32  import org.apache.hadoop.hbase.client.IsolationLevel;
33  import org.apache.hadoop.hbase.client.Result;
34  import org.apache.hadoop.hbase.client.Scan;
35  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
37  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
38  import org.apache.hadoop.hbase.regionserver.HRegion;
39  import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
40  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
41  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
42  import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
43  import org.apache.hadoop.hbase.util.ByteStringer;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.FSTableDescriptors;
46  import org.apache.hadoop.hbase.util.FSUtils;
47  import org.apache.hadoop.io.Writable;
48  import org.apache.hadoop.mapreduce.Job;
49  
50  import java.io.ByteArrayOutputStream;
51  import java.io.DataInput;
52  import java.io.DataOutput;
53  import java.io.IOException;
54  import java.util.ArrayList;
55  import java.util.List;
56  import java.util.Set;
57  import java.util.UUID;
58  
59  /**
60   * API-agnostic implementation for mapreduce over table snapshots.
61   */
62  @InterfaceAudience.Private
63  @InterfaceStability.Evolving
64  public class TableSnapshotInputFormatImpl {
65    // TODO: Snapshots files are owned in fs by the hbase user. There is no
66    // easy way to delegate access.
67  
68    private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
69    // key for specifying the root dir of the restored snapshot
70    private static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
71  
72    /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
73    private static final String LOCALITY_CUTOFF_MULTIPLIER = "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
74    private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
75  
76    /**
77     * Implementation class for InputSplit logic common between mapred and mapreduce.
78     */
79    public static class InputSplit implements Writable {
80      private String regionName;
81      private String[] locations;
82  
83      // constructor for mapreduce framework / Writable
84      public InputSplit() { }
85  
86      public InputSplit(String regionName, List<String> locations) {
87        this.regionName = regionName;
88        if (locations == null || locations.isEmpty()) {
89          this.locations = new String[0];
90        } else {
91          this.locations = locations.toArray(new String[locations.size()]);
92        }
93      }
94  
95      public long getLength() {
96        //TODO: We can obtain the file sizes of the snapshot here.
97        return 0;
98      }
99  
100     public String[] getLocations() {
101       return locations;
102     }
103 
104     // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
105     // doing this wrapping with Writables.
106     @Override
107     public void write(DataOutput out) throws IOException {
108       MapReduceProtos.TableSnapshotRegionSplit.Builder builder =
109         MapReduceProtos.TableSnapshotRegionSplit.newBuilder()
110           .setRegion(HBaseProtos.RegionSpecifier.newBuilder()
111             .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.ENCODED_REGION_NAME)
112             .setValue(ByteStringer.wrap(Bytes.toBytes(regionName))).build());
113 
114       for (String location : locations) {
115         builder.addLocations(location);
116       }
117 
118       MapReduceProtos.TableSnapshotRegionSplit split = builder.build();
119 
120       ByteArrayOutputStream baos = new ByteArrayOutputStream();
121       split.writeTo(baos);
122       baos.close();
123       byte[] buf = baos.toByteArray();
124       out.writeInt(buf.length);
125       out.write(buf);
126     }
127 
128     @Override
129     public void readFields(DataInput in) throws IOException {
130       int len = in.readInt();
131       byte[] buf = new byte[len];
132       in.readFully(buf);
133       MapReduceProtos.TableSnapshotRegionSplit split = MapReduceProtos.TableSnapshotRegionSplit.PARSER.parseFrom(buf);
134       this.regionName = Bytes.toString(split.getRegion().getValue().toByteArray());
135       List<String> locationsList = split.getLocationsList();
136       this.locations = locationsList.toArray(new String[locationsList.size()]);
137     }
138   }
139 
140   /**
141    * Implementation class for RecordReader logic common between mapred and mapreduce.
142    */
143   public static class RecordReader {
144     InputSplit split;
145     private Scan scan;
146     private Result result = null;
147     private ImmutableBytesWritable row = null;
148     private ClientSideRegionScanner scanner;
149 
150     public ClientSideRegionScanner getScanner() {
151       return scanner;
152     }
153 
154     public void initialize(InputSplit split, Configuration conf) throws IOException {
155       this.split = split;
156       String regionName = this.split.regionName;
157       String snapshotName = getSnapshotName(conf);
158       Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
159       FileSystem fs = rootDir.getFileSystem(conf);
160 
161       Path tmpRootDir = new Path(conf.get(RESTORE_DIR_KEY)); // This is the user specified root
162       // directory where snapshot was restored
163 
164       Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
165 
166       //load table descriptor
167       HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir);
168 
169       //load region descriptor
170       Path regionDir = new Path(snapshotDir, regionName);
171       HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
172 
173       // create scan
174       // TODO: mapred does not support scan as input API. Work around for now.
175       if (conf.get(TableInputFormat.SCAN) != null) {
176         scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
177       } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
178         String[] columns =
179           conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
180         scan = new Scan();
181         for (String col : columns) {
182           scan.addFamily(Bytes.toBytes(col));
183         }
184       } else {
185         throw new IllegalArgumentException("A Scan is not configured for this job");
186       }
187 
188       // region is immutable, this should be fine,
189       // otherwise we have to set the thread read point
190       scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
191       // disable caching of data blocks
192       scan.setCacheBlocks(false);
193 
194       scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
195     }
196 
197     public boolean nextKeyValue() throws IOException {
198       result = scanner.next();
199       if (result == null) {
200         //we are done
201         return false;
202       }
203 
204       if (this.row == null) {
205         this.row = new ImmutableBytesWritable();
206       }
207       this.row.set(result.getRow());
208       return true;
209     }
210 
211     public ImmutableBytesWritable getCurrentKey() {
212       return row;
213     }
214 
215     public Result getCurrentValue() {
216       return result;
217     }
218 
219     public long getPos() {
220       return 0;
221     }
222 
223     public float getProgress() {
224       return 0; // TODO: use total bytes to estimate
225     }
226 
227     public void close() {
228       if (this.scanner != null) {
229         this.scanner.close();
230       }
231     }
232   }
233 
234   public static List<InputSplit> getSplits(Configuration conf) throws IOException {
235     String snapshotName = getSnapshotName(conf);
236 
237     Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
238     FileSystem fs = rootDir.getFileSystem(conf);
239 
240     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
241     HBaseProtos.SnapshotDescription snapshotDesc =
242       SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
243 
244     Set<String> snapshotRegionNames =
245       SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir);
246     if (snapshotRegionNames == null) {
247       throw new IllegalArgumentException("Snapshot seems empty");
248     }
249 
250     // load table descriptor
251     HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs,
252       snapshotDir);
253 
254     // TODO: mapred does not support scan as input API. Work around for now.
255     Scan scan = null;
256     if (conf.get(TableInputFormat.SCAN) != null) {
257       scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
258     } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
259       String[] columns =
260         conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
261       scan = new Scan();
262       for (String col : columns) {
263         scan.addFamily(Bytes.toBytes(col));
264       }
265     } else {
266       throw new IllegalArgumentException("Unable to create scan");
267     }
268     // the temp dir where the snapshot is restored
269     Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
270     Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
271 
272     List<InputSplit> splits = new ArrayList<InputSplit>();
273     for (String regionName : snapshotRegionNames) {
274       // load region descriptor
275       Path regionDir = new Path(snapshotDir, regionName);
276       HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs,
277         regionDir);
278 
279       if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
280         hri.getStartKey(), hri.getEndKey())) {
281         // compute HDFS locations from snapshot files (which will get the locations for
282         // referred hfiles)
283         List<String> hosts = getBestLocations(conf,
284           HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
285 
286         int len = Math.min(3, hosts.size());
287         hosts = hosts.subList(0, len);
288         splits.add(new InputSplit(regionName, hosts));
289       }
290     }
291 
292     return splits;
293   }
294 
295   /**
296    * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
297    * weights into account, thus will treat every location passed from the input split as equal. We
298    * do not want to blindly pass all the locations, since we are creating one split per region, and
299    * the region's blocks are all distributed throughout the cluster unless favorite node assignment
300    * is used. On the expected stable case, only one location will contain most of the blocks as local.
301    * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
302    * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
303    * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
304    * host with the best locality.
305    */
306   public static List<String> getBestLocations(
307       Configuration conf, HDFSBlocksDistribution blockDistribution) {
308     List<String> locations = new ArrayList<String>(3);
309 
310     HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
311 
312     if (hostAndWeights.length == 0) {
313       return locations;
314     }
315 
316     HostAndWeight topHost = hostAndWeights[0];
317     locations.add(topHost.getHost());
318 
319     // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
320     double cutoffMultiplier
321       = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
322 
323     double filterWeight = topHost.getWeight() * cutoffMultiplier;
324 
325     for (int i = 1; i < hostAndWeights.length; i++) {
326       if (hostAndWeights[i].getWeight() >= filterWeight) {
327         locations.add(hostAndWeights[i].getHost());
328       } else {
329         break;
330       }
331     }
332 
333     return locations;
334   }
335 
336   private static String getSnapshotName(Configuration conf) {
337     String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
338     if (snapshotName == null) {
339       throw new IllegalArgumentException("Snapshot name must be provided");
340     }
341     return snapshotName;
342   }
343 
344   /**
345    * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
346    * @param conf the job to configure
347    * @param snapshotName the name of the snapshot to read from
348    * @param restoreDir a temporary directory to restore the snapshot into. Current user should
349    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
350    * After the job is finished, restoreDir can be deleted.
351    * @throws IOException if an error occurs
352    */
353   public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
354       throws IOException {
355     conf.set(SNAPSHOT_NAME_KEY, snapshotName);
356 
357     Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
358     FileSystem fs = rootDir.getFileSystem(conf);
359 
360     restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
361 
362     // TODO: restore from record readers to parallelize.
363     RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
364 
365     conf.set(RESTORE_DIR_KEY, restoreDir.toString());
366   }
367 }