View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.lang.reflect.Method;
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.classification.InterfaceAudience;
31  import org.apache.hadoop.classification.InterfaceStability;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.client.Result;
34  import org.apache.hadoop.hbase.client.Scan;
35  import org.apache.hadoop.hbase.client.TableSnapshotScanner;
36  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
37  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
38  import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
39  import org.apache.hadoop.io.Writable;
40  import org.apache.hadoop.mapreduce.InputFormat;
41  import org.apache.hadoop.mapreduce.InputSplit;
42  import org.apache.hadoop.mapreduce.Job;
43  import org.apache.hadoop.mapreduce.JobContext;
44  import org.apache.hadoop.mapreduce.RecordReader;
45  import org.apache.hadoop.mapreduce.TaskAttemptContext;
46  
47  import com.google.common.annotations.VisibleForTesting;
48  
49  /**
50   * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
51   * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits,
52   * hlogs, etc) directly to provide maximum performance. The snapshot is not required to be
53   * restored to the live cluster or cloned. This also allows to run the mapreduce job from an
54   * online or offline hbase cluster. The snapshot files can be exported by using the
55   * {@link ExportSnapshot} tool, to a pure-hdfs cluster, and this InputFormat can be used to
56   * run the mapreduce job directly over the snapshot files. The snapshot should not be deleted
57   * while there are jobs reading from snapshot files.
58   * <p>
59   * Usage is similar to TableInputFormat, and
60   * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean, Path)}
61   * can be used to configure the job.
62   * <pre>{@code
63   * Job job = new Job(conf);
64   * Scan scan = new Scan();
65   * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
66   *      scan, MyTableMapper.class, MyMapKeyOutput.class,
67   *      MyMapOutputValueWritable.class, job, true);
68   * }
69   * </pre>
70   * <p>
71   * Internally, this input format restores the snapshot into the given tmp directory. Similar to
72   * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading
73   * from each RecordReader. An internal RegionScanner is used to execute the {@link Scan} obtained
74   * from the user.
75   * <p>
76   * HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from
77   * snapshot files and data files.
78   * To read from snapshot files directly from the file system, the user who is running the MR job
79   * must have sufficient permissions to access snapshot and reference files.
80   * This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase
81   * user or the user must have group or other privileges in the filesystem (See HBASE-8369).
82   * Note that, given other users access to read from snapshot/data files will completely circumvent
83   * the access control enforced by HBase.
84   * @see TableSnapshotScanner
85   */
86  @InterfaceAudience.Public
87  @InterfaceStability.Evolving
88  public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
89  
90    private static final Log LOG = LogFactory.getLog(TableSnapshotInputFormat.class);
91  
92    @VisibleForTesting
93    static class TableSnapshotRegionSplit extends InputSplit implements Writable {
94      private TableSnapshotInputFormatImpl.InputSplit delegate;
95  
96      public TableSnapshotRegionSplit() {
97        this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
98      }
99  
100     public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
101       this.delegate = delegate;
102     }
103 
104     public TableSnapshotRegionSplit(String regionName, List<String> locations) {
105       this.delegate = new TableSnapshotInputFormatImpl.InputSplit(regionName, locations);
106     }
107 
108     @Override
109     public long getLength() throws IOException, InterruptedException {
110       return delegate.getLength();
111     }
112 
113     @Override
114     public String[] getLocations() throws IOException, InterruptedException {
115       return delegate.getLocations();
116     }
117 
118     @Override
119     public void write(DataOutput out) throws IOException {
120       delegate.write(out);
121     }
122 
123     @Override
124     public void readFields(DataInput in) throws IOException {
125       delegate.readFields(in);
126     }
127   }
128 
129   @VisibleForTesting
130   static class TableSnapshotRegionRecordReader extends RecordReader<ImmutableBytesWritable, Result> {
131     private TableSnapshotInputFormatImpl.RecordReader delegate =
132       new TableSnapshotInputFormatImpl.RecordReader();
133     private TaskAttemptContext context;
134     private Method getCounter;
135 
136     @Override
137     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
138       InterruptedException {
139 
140       this.context = context;
141       getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
142       delegate.initialize(
143         ((TableSnapshotRegionSplit) split).delegate,
144         context.getConfiguration());
145     }
146 
147     @Override
148     public boolean nextKeyValue() throws IOException, InterruptedException {
149       boolean result = delegate.nextKeyValue();
150       if (result) {
151         ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
152         if (scanMetrics != null && context != null) {
153           TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context);
154         }
155       }
156 
157       return result;
158     }
159 
160     @Override
161     public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
162       return delegate.getCurrentKey();
163     }
164 
165     @Override
166     public Result getCurrentValue() throws IOException, InterruptedException {
167       return delegate.getCurrentValue();
168     }
169 
170     @Override
171     public float getProgress() throws IOException, InterruptedException {
172       return delegate.getProgress();
173     }
174 
175     @Override
176     public void close() throws IOException {
177       delegate.close();
178     }
179   }
180 
181   @Override
182   public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
183     InputSplit split, TaskAttemptContext context) throws IOException {
184     return new TableSnapshotRegionRecordReader();
185   }
186 
187   @Override
188   public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
189     List<InputSplit> results = new ArrayList<InputSplit>();
190     for (TableSnapshotInputFormatImpl.InputSplit split :
191       TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) {
192       results.add(new TableSnapshotRegionSplit(split));
193     }
194     return results;
195   }
196 
197   /**
198    * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
199    * @param job the job to configure
200    * @param snapshotName the name of the snapshot to read from
201    * @param restoreDir a temporary directory to restore the snapshot into. Current user should
202    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
203    * After the job is finished, restoreDir can be deleted.
204    * @throws IOException if an error occurs
205    */
206   public static void setInput(Job job, String snapshotName, Path restoreDir) throws IOException {
207     TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir);
208   }
209 }