1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import java.io.IOException;
22
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.classification.InterfaceAudience;
25 import org.apache.hadoop.classification.InterfaceStability;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.HBaseConfiguration;
28 import org.apache.hadoop.hbase.catalog.MetaReader;
29 import org.apache.hadoop.hbase.client.Put;
30 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31 import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
32 import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
33 import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
34 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
35 import org.apache.hadoop.hbase.security.User;
36 import org.apache.hadoop.hbase.security.UserProvider;
37 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
38 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
39 import org.apache.hadoop.io.Text;
40 import org.apache.hadoop.mapred.FileInputFormat;
41 import org.apache.hadoop.mapred.InputFormat;
42 import org.apache.hadoop.mapred.JobConf;
43 import org.apache.hadoop.mapred.OutputFormat;
44 import org.apache.hadoop.mapred.TextInputFormat;
45 import org.apache.hadoop.mapred.TextOutputFormat;
46 import org.apache.hadoop.security.token.Token;
47 import org.apache.zookeeper.KeeperException;
48 import org.cliffc.high_scale_lib.Counter;
49
50
51
52
53 @Deprecated
54 @InterfaceAudience.Public
55 @InterfaceStability.Stable
56 @SuppressWarnings({ "rawtypes", "unchecked" })
57 public class TableMapReduceUtil {
58
59
60
61
62
63
64
65
66
67
68
69
70 public static void initTableMapJob(String table, String columns,
71 Class<? extends TableMap> mapper,
72 Class<?> outputKeyClass,
73 Class<?> outputValueClass, JobConf job) {
74 initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
75 true, TableInputFormat.class);
76 }
77
78 public static void initTableMapJob(String table, String columns,
79 Class<? extends TableMap> mapper,
80 Class<?> outputKeyClass,
81 Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
82 initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
83 addDependencyJars, TableInputFormat.class);
84 }
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99 public static void initTableMapJob(String table, String columns,
100 Class<? extends TableMap> mapper,
101 Class<?> outputKeyClass,
102 Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
103 Class<? extends InputFormat> inputFormat) {
104
105 job.setInputFormat(inputFormat);
106 job.setMapOutputValueClass(outputValueClass);
107 job.setMapOutputKeyClass(outputKeyClass);
108 job.setMapperClass(mapper);
109 job.setStrings("io.serializations", job.get("io.serializations"),
110 MutationSerialization.class.getName(), ResultSerialization.class.getName());
111 FileInputFormat.addInputPaths(job, table);
112 job.set(TableInputFormat.COLUMN_LIST, columns);
113 if (addDependencyJars) {
114 try {
115 addDependencyJars(job);
116 } catch (IOException e) {
117 e.printStackTrace();
118 }
119 }
120 try {
121 initCredentials(job);
122 } catch (IOException ioe) {
123
124 ioe.printStackTrace();
125 }
126 }
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 public static void initTableSnapshotMapJob(String snapshotName, String columns,
148 Class<? extends TableMap> mapper,
149 Class<?> outputKeyClass,
150 Class<?> outputValueClass, JobConf job,
151 boolean addDependencyJars, Path tmpRestoreDir)
152 throws IOException {
153 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
154 initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
155 addDependencyJars, TableSnapshotInputFormat.class);
156 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
157
158 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(job, Counter.class);
159 }
160
161
162
163
164
165
166
167
168
169
170 public static void initTableReduceJob(String table,
171 Class<? extends TableReduce> reducer, JobConf job)
172 throws IOException {
173 initTableReduceJob(table, reducer, job, null);
174 }
175
176
177
178
179
180
181
182
183
184
185
186
187 public static void initTableReduceJob(String table,
188 Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
189 throws IOException {
190 initTableReduceJob(table, reducer, job, partitioner, true);
191 }
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206 public static void initTableReduceJob(String table,
207 Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
208 boolean addDependencyJars) throws IOException {
209 job.setOutputFormat(TableOutputFormat.class);
210 job.setReducerClass(reducer);
211 job.set(TableOutputFormat.OUTPUT_TABLE, table);
212 job.setOutputKeyClass(ImmutableBytesWritable.class);
213 job.setOutputValueClass(Put.class);
214 job.setStrings("io.serializations", job.get("io.serializations"),
215 MutationSerialization.class.getName(), ResultSerialization.class.getName());
216 if (partitioner == HRegionPartitioner.class) {
217 job.setPartitionerClass(HRegionPartitioner.class);
218 int regions = MetaReader.getRegionCount(HBaseConfiguration.create(job), table);
219 if (job.getNumReduceTasks() > regions) {
220 job.setNumReduceTasks(regions);
221 }
222 } else if (partitioner != null) {
223 job.setPartitionerClass(partitioner);
224 }
225 if (addDependencyJars) {
226 addDependencyJars(job);
227 }
228 initCredentials(job);
229 }
230
231 public static void initCredentials(JobConf job) throws IOException {
232 UserProvider userProvider = UserProvider.instantiate(job);
233 if (userProvider.isHadoopSecurityEnabled()) {
234
235 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
236 job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
237 }
238 }
239
240 if (userProvider.isHBaseSecurityEnabled()) {
241 try {
242
243 User user = userProvider.getCurrent();
244 Token<AuthenticationTokenIdentifier> authToken = getAuthToken(job, user);
245 if (authToken == null) {
246 user.obtainAuthTokenForJob(job);
247 } else {
248 job.getCredentials().addToken(authToken.getService(), authToken);
249 }
250 } catch (InterruptedException ie) {
251 ie.printStackTrace();
252 Thread.currentThread().interrupt();
253 }
254 }
255 }
256
257
258
259
260
261 private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
262 throws IOException, InterruptedException {
263 ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
264 try {
265 String clusterId = ZKClusterId.readClusterIdZNode(zkw);
266 return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens());
267 } catch (KeeperException e) {
268 throw new IOException(e);
269 } finally {
270 zkw.close();
271 }
272 }
273
274
275
276
277
278
279
280
281
282 public static void limitNumReduceTasks(String table, JobConf job)
283 throws IOException {
284 int regions = MetaReader.getRegionCount(HBaseConfiguration.create(job), table);
285 if (job.getNumReduceTasks() > regions)
286 job.setNumReduceTasks(regions);
287 }
288
289
290
291
292
293
294
295
296
297 public static void limitNumMapTasks(String table, JobConf job)
298 throws IOException {
299 int regions = MetaReader.getRegionCount(HBaseConfiguration.create(job), table);
300 if (job.getNumMapTasks() > regions)
301 job.setNumMapTasks(regions);
302 }
303
304
305
306
307
308
309
310
311
312 public static void setNumReduceTasks(String table, JobConf job)
313 throws IOException {
314 job.setNumReduceTasks(MetaReader.getRegionCount(HBaseConfiguration.create(job), table));
315 }
316
317
318
319
320
321
322
323
324
325 public static void setNumMapTasks(String table, JobConf job)
326 throws IOException {
327 job.setNumMapTasks(MetaReader.getRegionCount(HBaseConfiguration.create(job), table));
328 }
329
330
331
332
333
334
335
336
337
338
339 public static void setScannerCaching(JobConf job, int batchSize) {
340 job.setInt("hbase.client.scanner.caching", batchSize);
341 }
342
343
344
345
346 public static void addDependencyJars(JobConf job) throws IOException {
347 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
348 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
349 job,
350
351
352 job.getMapOutputKeyClass(),
353 job.getMapOutputValueClass(),
354 job.getOutputKeyClass(),
355 job.getOutputValueClass(),
356 job.getPartitionerClass(),
357 job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
358 job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
359 job.getCombinerClass());
360 }
361 }