View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapred;
19  
20  import org.apache.hadoop.fs.Path;
21  import org.apache.hadoop.hbase.client.Result;
22  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23  import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
24  import org.apache.hadoop.mapred.InputFormat;
25  import org.apache.hadoop.mapred.InputSplit;
26  import org.apache.hadoop.mapred.JobConf;
27  import org.apache.hadoop.mapred.RecordReader;
28  import org.apache.hadoop.mapred.Reporter;
29  
30  import java.io.DataInput;
31  import java.io.DataOutput;
32  import java.io.IOException;
33  import java.util.List;
34  
35  public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
36  
37    static class TableSnapshotRegionSplit implements InputSplit {
38      private TableSnapshotInputFormatImpl.InputSplit delegate;
39  
40      // constructor for mapreduce framework / Writable
41      public TableSnapshotRegionSplit() {
42        this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
43      }
44  
45      public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
46        this.delegate = delegate;
47      }
48  
49      public TableSnapshotRegionSplit(String regionName, List<String> locations) {
50        this.delegate = new TableSnapshotInputFormatImpl.InputSplit(regionName, locations);
51      }
52  
53      @Override
54      public long getLength() throws IOException {
55        return delegate.getLength();
56      }
57  
58      @Override
59      public String[] getLocations() throws IOException {
60        return delegate.getLocations();
61      }
62  
63      @Override
64      public void write(DataOutput out) throws IOException {
65        delegate.write(out);
66      }
67  
68      @Override
69      public void readFields(DataInput in) throws IOException {
70        delegate.readFields(in);
71      }
72    }
73  
74    static class TableSnapshotRecordReader
75        implements RecordReader<ImmutableBytesWritable, Result> {
76  
77      private TableSnapshotInputFormatImpl.RecordReader delegate;
78  
79      public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job)
80          throws IOException {
81        delegate = new TableSnapshotInputFormatImpl.RecordReader();
82        delegate.initialize(split.delegate, job);
83      }
84  
85      @Override
86      public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
87        if (!delegate.nextKeyValue()) {
88          return false;
89        }
90        ImmutableBytesWritable currentKey = delegate.getCurrentKey();
91        key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength());
92        value.copyFrom(delegate.getCurrentValue());
93        return true;
94      }
95  
96      @Override
97      public ImmutableBytesWritable createKey() {
98        return new ImmutableBytesWritable();
99      }
100 
101     @Override
102     public Result createValue() {
103       return new Result();
104     }
105 
106     @Override
107     public long getPos() throws IOException {
108       return delegate.getPos();
109     }
110 
111     @Override
112     public void close() throws IOException {
113       delegate.close();
114     }
115 
116     @Override
117     public float getProgress() throws IOException {
118       return delegate.getProgress();
119     }
120   }
121 
122   @Override
123   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
124     List<TableSnapshotInputFormatImpl.InputSplit> splits =
125       TableSnapshotInputFormatImpl.getSplits(job);
126     InputSplit[] results = new InputSplit[splits.size()];
127     for (int i = 0; i < splits.size(); i++) {
128       results[i] = new TableSnapshotRegionSplit(splits.get(i));
129     }
130     return results;
131   }
132 
133   @Override
134   public RecordReader<ImmutableBytesWritable, Result>
135   getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
136     return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
137   }
138 
139   /**
140    * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
141    * @param job the job to configure
142    * @param snapshotName the name of the snapshot to read from
143    * @param restoreDir a temporary directory to restore the snapshot into. Current user should
144    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
145    * After the job is finished, restoreDir can be deleted.
146    * @throws IOException if an error occurs
147    */
148   public static void setInput(JobConf job, String snapshotName, Path restoreDir) throws IOException {
149     TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir);
150   }
151 }