View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.classification.InterfaceAudience;
25  import org.apache.hadoop.classification.InterfaceStability;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.HBaseConfiguration;
28  import org.apache.hadoop.hbase.catalog.MetaReader;
29  import org.apache.hadoop.hbase.client.Put;
30  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31  import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
32  import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
33  import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
34  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
35  import org.apache.hadoop.hbase.security.User;
36  import org.apache.hadoop.hbase.security.UserProvider;
37  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
38  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
39  import org.apache.hadoop.io.Text;
40  import org.apache.hadoop.mapred.FileInputFormat;
41  import org.apache.hadoop.mapred.InputFormat;
42  import org.apache.hadoop.mapred.JobConf;
43  import org.apache.hadoop.mapred.OutputFormat;
44  import org.apache.hadoop.mapred.TextInputFormat;
45  import org.apache.hadoop.mapred.TextOutputFormat;
46  import org.apache.hadoop.security.token.Token;
47  import org.apache.zookeeper.KeeperException;
48  import org.cliffc.high_scale_lib.Counter;
49  
50  /**
51   * Utility for {@link TableMap} and {@link TableReduce}
52   */
53  @Deprecated
54  @InterfaceAudience.Public
55  @InterfaceStability.Stable
56  @SuppressWarnings({ "rawtypes", "unchecked" })
57  public class TableMapReduceUtil {
58  
59    /**
60     * Use this before submitting a TableMap job. It will
61     * appropriately set up the JobConf.
62     *
63     * @param table  The table name to read from.
64     * @param columns  The columns to scan.
65     * @param mapper  The mapper class to use.
66     * @param outputKeyClass  The class of the output key.
67     * @param outputValueClass  The class of the output value.
68     * @param job  The current job configuration to adjust.
69     */
70    public static void initTableMapJob(String table, String columns,
71      Class<? extends TableMap> mapper,
72      Class<?> outputKeyClass,
73      Class<?> outputValueClass, JobConf job) {
74      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
75        true, TableInputFormat.class);
76    }
77  
78    public static void initTableMapJob(String table, String columns,
79      Class<? extends TableMap> mapper,
80      Class<?> outputKeyClass,
81      Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
82      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
83        addDependencyJars, TableInputFormat.class);
84    }
85  
86    /**
87     * Use this before submitting a TableMap job. It will
88     * appropriately set up the JobConf.
89     *
90     * @param table  The table name to read from.
91     * @param columns  The columns to scan.
92     * @param mapper  The mapper class to use.
93     * @param outputKeyClass  The class of the output key.
94     * @param outputValueClass  The class of the output value.
95     * @param job  The current job configuration to adjust.
96     * @param addDependencyJars upload HBase jars and jars for any of the configured
97     *           job classes via the distributed cache (tmpjars).
98     */
99    public static void initTableMapJob(String table, String columns,
100     Class<? extends TableMap> mapper,
101     Class<?> outputKeyClass,
102     Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
103     Class<? extends InputFormat> inputFormat) {
104 
105     job.setInputFormat(inputFormat);
106     job.setMapOutputValueClass(outputValueClass);
107     job.setMapOutputKeyClass(outputKeyClass);
108     job.setMapperClass(mapper);
109     job.setStrings("io.serializations", job.get("io.serializations"),
110         MutationSerialization.class.getName(), ResultSerialization.class.getName());
111     FileInputFormat.addInputPaths(job, table);
112     job.set(TableInputFormat.COLUMN_LIST, columns);
113     if (addDependencyJars) {
114       try {
115         addDependencyJars(job);
116       } catch (IOException e) {
117         e.printStackTrace();
118       }
119     }
120     try {
121       initCredentials(job);
122     } catch (IOException ioe) {
123       // just spit out the stack trace?  really?
124       ioe.printStackTrace();
125     }
126   }
127 
128   /**
129    * Sets up the job for reading from a table snapshot. It bypasses hbase servers
130    * and read directly from snapshot files.
131    *
132    * @param snapshotName The name of the snapshot (of a table) to read from.
133    * @param columns  The columns to scan.
134    * @param mapper  The mapper class to use.
135    * @param outputKeyClass  The class of the output key.
136    * @param outputValueClass  The class of the output value.
137    * @param job  The current job to adjust.  Make sure the passed job is
138    * carrying all necessary HBase configuration.
139    * @param addDependencyJars upload HBase jars and jars for any of the configured
140    *           job classes via the distributed cache (tmpjars).
141    * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
142    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
143    * After the job is finished, restore directory can be deleted.
144    * @throws IOException When setting up the details fails.
145    * @see TableSnapshotInputFormat
146    */
147   public static void initTableSnapshotMapJob(String snapshotName, String columns,
148       Class<? extends TableMap> mapper,
149       Class<?> outputKeyClass,
150       Class<?> outputValueClass, JobConf job,
151       boolean addDependencyJars, Path tmpRestoreDir)
152   throws IOException {
153     TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
154     initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
155       addDependencyJars, TableSnapshotInputFormat.class);
156     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
157     // We would need even more libraries that hbase-server depends on
158     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(job, Counter.class);
159   }
160 
161   /**
162    * Use this before submitting a TableReduce job. It will
163    * appropriately set up the JobConf.
164    *
165    * @param table  The output table.
166    * @param reducer  The reducer class to use.
167    * @param job  The current job configuration to adjust.
168    * @throws IOException When determining the region count fails.
169    */
170   public static void initTableReduceJob(String table,
171     Class<? extends TableReduce> reducer, JobConf job)
172   throws IOException {
173     initTableReduceJob(table, reducer, job, null);
174   }
175 
176   /**
177    * Use this before submitting a TableReduce job. It will
178    * appropriately set up the JobConf.
179    *
180    * @param table  The output table.
181    * @param reducer  The reducer class to use.
182    * @param job  The current job configuration to adjust.
183    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
184    * default partitioner.
185    * @throws IOException When determining the region count fails.
186    */
187   public static void initTableReduceJob(String table,
188     Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
189   throws IOException {
190     initTableReduceJob(table, reducer, job, partitioner, true);
191   }
192 
193   /**
194    * Use this before submitting a TableReduce job. It will
195    * appropriately set up the JobConf.
196    *
197    * @param table  The output table.
198    * @param reducer  The reducer class to use.
199    * @param job  The current job configuration to adjust.
200    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
201    * default partitioner.
202    * @param addDependencyJars upload HBase jars and jars for any of the configured
203    *           job classes via the distributed cache (tmpjars).
204    * @throws IOException When determining the region count fails.
205    */
206   public static void initTableReduceJob(String table,
207     Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
208     boolean addDependencyJars) throws IOException {
209     job.setOutputFormat(TableOutputFormat.class);
210     job.setReducerClass(reducer);
211     job.set(TableOutputFormat.OUTPUT_TABLE, table);
212     job.setOutputKeyClass(ImmutableBytesWritable.class);
213     job.setOutputValueClass(Put.class);
214     job.setStrings("io.serializations", job.get("io.serializations"),
215         MutationSerialization.class.getName(), ResultSerialization.class.getName());
216     if (partitioner == HRegionPartitioner.class) {
217       job.setPartitionerClass(HRegionPartitioner.class);
218       int regions = MetaReader.getRegionCount(HBaseConfiguration.create(job), table);
219       if (job.getNumReduceTasks() > regions) {
220         job.setNumReduceTasks(regions);
221       }
222     } else if (partitioner != null) {
223       job.setPartitionerClass(partitioner);
224     }
225     if (addDependencyJars) {
226       addDependencyJars(job);
227     }
228     initCredentials(job);
229   }
230 
231   public static void initCredentials(JobConf job) throws IOException {
232     UserProvider userProvider = UserProvider.instantiate(job);
233     if (userProvider.isHadoopSecurityEnabled()) {
234       // propagate delegation related props from launcher job to MR job
235       if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
236         job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
237       }
238     }
239 
240     if (userProvider.isHBaseSecurityEnabled()) {
241       try {
242         // login the server principal (if using secure Hadoop)
243         User user = userProvider.getCurrent();
244         Token<AuthenticationTokenIdentifier> authToken = getAuthToken(job, user);
245         if (authToken == null) {
246           user.obtainAuthTokenForJob(job);
247         } else {
248           job.getCredentials().addToken(authToken.getService(), authToken);
249         }
250       } catch (InterruptedException ie) {
251         ie.printStackTrace();
252         Thread.currentThread().interrupt();
253       }
254     }
255   }
256 
257   /**
258    * Get the authentication token of the user for the cluster specified in the configuration
259    * @return null if the user does not have the token, otherwise the auth token for the cluster.
260    */
261   private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
262       throws IOException, InterruptedException {
263     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
264     try {
265       String clusterId = ZKClusterId.readClusterIdZNode(zkw);
266       return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens());
267     } catch (KeeperException e) {
268       throw new IOException(e);
269     } finally {
270       zkw.close();
271     }
272   }
273 
274   /**
275    * Ensures that the given number of reduce tasks for the given job
276    * configuration does not exceed the number of regions for the given table.
277    *
278    * @param table  The table to get the region count for.
279    * @param job  The current job configuration to adjust.
280    * @throws IOException When retrieving the table details fails.
281    */
282   public static void limitNumReduceTasks(String table, JobConf job)
283   throws IOException {
284     int regions = MetaReader.getRegionCount(HBaseConfiguration.create(job), table);
285     if (job.getNumReduceTasks() > regions)
286       job.setNumReduceTasks(regions);
287   }
288 
289   /**
290    * Ensures that the given number of map tasks for the given job
291    * configuration does not exceed the number of regions for the given table.
292    *
293    * @param table  The table to get the region count for.
294    * @param job  The current job configuration to adjust.
295    * @throws IOException When retrieving the table details fails.
296    */
297   public static void limitNumMapTasks(String table, JobConf job)
298   throws IOException {
299     int regions = MetaReader.getRegionCount(HBaseConfiguration.create(job), table);
300     if (job.getNumMapTasks() > regions)
301       job.setNumMapTasks(regions);
302   }
303 
304   /**
305    * Sets the number of reduce tasks for the given job configuration to the
306    * number of regions the given table has.
307    *
308    * @param table  The table to get the region count for.
309    * @param job  The current job configuration to adjust.
310    * @throws IOException When retrieving the table details fails.
311    */
312   public static void setNumReduceTasks(String table, JobConf job)
313   throws IOException {
314     job.setNumReduceTasks(MetaReader.getRegionCount(HBaseConfiguration.create(job), table));
315   }
316 
317   /**
318    * Sets the number of map tasks for the given job configuration to the
319    * number of regions the given table has.
320    *
321    * @param table  The table to get the region count for.
322    * @param job  The current job configuration to adjust.
323    * @throws IOException When retrieving the table details fails.
324    */
325   public static void setNumMapTasks(String table, JobConf job)
326   throws IOException {
327     job.setNumMapTasks(MetaReader.getRegionCount(HBaseConfiguration.create(job), table));
328   }
329 
330   /**
331    * Sets the number of rows to return and cache with each scanner iteration.
332    * Higher caching values will enable faster mapreduce jobs at the expense of
333    * requiring more heap to contain the cached rows.
334    *
335    * @param job The current job configuration to adjust.
336    * @param batchSize The number of rows to return in batch with each scanner
337    * iteration.
338    */
339   public static void setScannerCaching(JobConf job, int batchSize) {
340     job.setInt("hbase.client.scanner.caching", batchSize);
341   }
342 
343   /**
344    * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
345    */
346   public static void addDependencyJars(JobConf job) throws IOException {
347     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
348     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
349       job,
350       // when making changes here, consider also mapreduce.TableMapReduceUtil
351       // pull job classes
352       job.getMapOutputKeyClass(),
353       job.getMapOutputValueClass(),
354       job.getOutputKeyClass(),
355       job.getOutputValueClass(),
356       job.getPartitionerClass(),
357       job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
358       job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
359       job.getCombinerClass());
360   }
361 }