1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.lang.reflect.InvocationTargetException;
24 import java.lang.reflect.Method;
25 import java.net.URL;
26 import java.net.URLDecoder;
27 import java.util.ArrayList;
28 import java.util.Enumeration;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34 import java.util.zip.ZipEntry;
35 import java.util.zip.ZipFile;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.classification.InterfaceAudience;
40 import org.apache.hadoop.classification.InterfaceStability;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.fs.FileSystem;
43 import org.apache.hadoop.fs.Path;
44 import org.apache.hadoop.hbase.HBaseConfiguration;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.catalog.MetaReader;
47 import org.apache.hadoop.hbase.client.Put;
48 import org.apache.hadoop.hbase.client.Scan;
49 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
50 import org.apache.hadoop.hbase.mapreduce.hadoopbackport.JarFinder;
51 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
52 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
53 import org.apache.hadoop.hbase.security.User;
54 import org.apache.hadoop.hbase.security.UserProvider;
55 import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
56 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
57 import org.apache.hadoop.hbase.util.Base64;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
60 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
61 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
62 import org.apache.hadoop.io.Text;
63 import org.apache.hadoop.io.Writable;
64 import org.apache.hadoop.io.WritableComparable;
65 import org.apache.hadoop.mapreduce.InputFormat;
66 import org.apache.hadoop.mapreduce.Job;
67 import org.apache.hadoop.security.token.Token;
68 import org.apache.hadoop.util.StringUtils;
69 import org.apache.zookeeper.KeeperException;
70 import org.cliffc.high_scale_lib.Counter;
71
72 import com.google.protobuf.InvalidProtocolBufferException;
73
74
75
76
77 @SuppressWarnings({ "rawtypes", "unchecked" })
78 @InterfaceAudience.Public
79 @InterfaceStability.Stable
80 public class TableMapReduceUtil {
81 static Log LOG = LogFactory.getLog(TableMapReduceUtil.class);
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 public static void initTableMapperJob(String table, Scan scan,
97 Class<? extends TableMapper> mapper,
98 Class<?> outputKeyClass,
99 Class<?> outputValueClass, Job job)
100 throws IOException {
101 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
102 job, true);
103 }
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118 public static void initTableMapperJob(byte[] table, Scan scan,
119 Class<? extends TableMapper> mapper,
120 Class<?> outputKeyClass,
121 Class<?> outputValueClass, Job job)
122 throws IOException {
123 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass,
124 job, true);
125 }
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142 public static void initTableMapperJob(String table, Scan scan,
143 Class<? extends TableMapper> mapper,
144 Class<?> outputKeyClass,
145 Class<?> outputValueClass, Job job,
146 boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
147 throws IOException {
148 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
149 addDependencyJars, true, inputFormatClass);
150 }
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170 public static void initTableMapperJob(String table, Scan scan,
171 Class<? extends TableMapper> mapper,
172 Class<?> outputKeyClass,
173 Class<?> outputValueClass, Job job,
174 boolean addDependencyJars, boolean initCredentials,
175 Class<? extends InputFormat> inputFormatClass)
176 throws IOException {
177 job.setInputFormatClass(inputFormatClass);
178 if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
179 if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
180 job.setMapperClass(mapper);
181 if (Put.class.equals(outputValueClass)) {
182 job.setCombinerClass(PutCombiner.class);
183 }
184 Configuration conf = job.getConfiguration();
185 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
186 conf.set(TableInputFormat.INPUT_TABLE, table);
187 conf.set(TableInputFormat.SCAN, convertScanToString(scan));
188 conf.setStrings("io.serializations", conf.get("io.serializations"),
189 MutationSerialization.class.getName(), ResultSerialization.class.getName(),
190 KeyValueSerialization.class.getName());
191 if (addDependencyJars) {
192 addDependencyJars(job);
193 }
194 if (initCredentials) {
195 initCredentials(job);
196 }
197 }
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215 public static void initTableMapperJob(byte[] table, Scan scan,
216 Class<? extends TableMapper> mapper,
217 Class<?> outputKeyClass,
218 Class<?> outputValueClass, Job job,
219 boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
220 throws IOException {
221 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
222 outputValueClass, job, addDependencyJars, inputFormatClass);
223 }
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240 public static void initTableMapperJob(byte[] table, Scan scan,
241 Class<? extends TableMapper> mapper,
242 Class<?> outputKeyClass,
243 Class<?> outputValueClass, Job job,
244 boolean addDependencyJars)
245 throws IOException {
246 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
247 outputValueClass, job, addDependencyJars, TableInputFormat.class);
248 }
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265 public static void initTableMapperJob(String table, Scan scan,
266 Class<? extends TableMapper> mapper,
267 Class<?> outputKeyClass,
268 Class<?> outputValueClass, Job job,
269 boolean addDependencyJars)
270 throws IOException {
271 initTableMapperJob(table, scan, mapper, outputKeyClass,
272 outputValueClass, job, addDependencyJars, TableInputFormat.class);
273 }
274
275
276
277
278
279
280
281 public static void resetCacheConfig(Configuration conf) {
282 conf.setFloat(
283 HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
284 conf.setFloat("hbase.offheapcache.percentage", 0f);
285 conf.setFloat("hbase.bucketcache.size", 0f);
286 }
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308 public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
309 Class<? extends TableMapper> mapper,
310 Class<?> outputKeyClass,
311 Class<?> outputValueClass, Job job,
312 boolean addDependencyJars, Path tmpRestoreDir)
313 throws IOException {
314 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
315 initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
316 outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
317
318 resetCacheConfig(job.getConfiguration());
319
320 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Counter.class);
321 }
322
323
324
325
326
327
328
329
330
331
332
333
334
335 public static void initTableMapperJob(List<Scan> scans,
336 Class<? extends TableMapper> mapper,
337 Class<? extends WritableComparable> outputKeyClass,
338 Class<? extends Writable> outputValueClass, Job job) throws IOException {
339 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
340 true);
341 }
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357 public static void initTableMapperJob(List<Scan> scans,
358 Class<? extends TableMapper> mapper,
359 Class<? extends WritableComparable> outputKeyClass,
360 Class<? extends Writable> outputValueClass, Job job,
361 boolean addDependencyJars) throws IOException {
362 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
363 addDependencyJars, true);
364 }
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381 public static void initTableMapperJob(List<Scan> scans,
382 Class<? extends TableMapper> mapper,
383 Class<? extends WritableComparable> outputKeyClass,
384 Class<? extends Writable> outputValueClass, Job job,
385 boolean addDependencyJars,
386 boolean initCredentials) throws IOException {
387 job.setInputFormatClass(MultiTableInputFormat.class);
388 if (outputValueClass != null) {
389 job.setMapOutputValueClass(outputValueClass);
390 }
391 if (outputKeyClass != null) {
392 job.setMapOutputKeyClass(outputKeyClass);
393 }
394 job.setMapperClass(mapper);
395 Configuration conf = job.getConfiguration();
396 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
397 List<String> scanStrings = new ArrayList<String>();
398
399 for (Scan scan : scans) {
400 scanStrings.add(convertScanToString(scan));
401 }
402 job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
403 scanStrings.toArray(new String[scanStrings.size()]));
404
405 if (addDependencyJars) {
406 addDependencyJars(job);
407 }
408
409 if (initCredentials) {
410 initCredentials(job);
411 }
412 }
413
414 public static void initCredentials(Job job) throws IOException {
415 UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
416 if (userProvider.isHadoopSecurityEnabled()) {
417
418 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
419 job.getConfiguration().set("mapreduce.job.credentials.binary",
420 System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
421 }
422 }
423
424 if (userProvider.isHBaseSecurityEnabled()) {
425 try {
426
427 String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
428 User user = userProvider.getCurrent();
429 if (quorumAddress != null) {
430 Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
431 ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
432 obtainAuthTokenForJob(job, peerConf, user);
433 }
434
435 obtainAuthTokenForJob(job, job.getConfiguration(), user);
436 } catch (InterruptedException ie) {
437 LOG.info("Interrupted obtaining user authentication token");
438 Thread.currentThread().interrupt();
439 }
440 }
441 }
442
443
444
445
446
447
448
449
450
451
452
453
454 public static void initCredentialsForCluster(Job job, String quorumAddress)
455 throws IOException {
456 UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
457 if (userProvider.isHBaseSecurityEnabled()) {
458 try {
459 Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
460 ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
461 obtainAuthTokenForJob(job, peerConf, userProvider.getCurrent());
462 } catch (InterruptedException e) {
463 LOG.info("Interrupted obtaining user authentication token");
464 Thread.interrupted();
465 }
466 }
467 }
468
469 private static void obtainAuthTokenForJob(Job job, Configuration conf, User user)
470 throws IOException, InterruptedException {
471 Token<AuthenticationTokenIdentifier> authToken = getAuthToken(conf, user);
472 if (authToken == null) {
473 user.obtainAuthTokenForJob(conf, job);
474 } else {
475 job.getCredentials().addToken(authToken.getService(), authToken);
476 }
477 }
478
479
480
481
482
483 private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
484 throws IOException, InterruptedException {
485 ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
486 try {
487 String clusterId = ZKClusterId.readClusterIdZNode(zkw);
488 return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens());
489 } catch (KeeperException e) {
490 throw new IOException(e);
491 } finally {
492 zkw.close();
493 }
494 }
495
496
497
498
499
500
501
502
503 static String convertScanToString(Scan scan) throws IOException {
504 ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
505 return Base64.encodeBytes(proto.toByteArray());
506 }
507
508
509
510
511
512
513
514
515 static Scan convertStringToScan(String base64) throws IOException {
516 byte [] decoded = Base64.decode(base64);
517 ClientProtos.Scan scan;
518 try {
519 scan = ClientProtos.Scan.parseFrom(decoded);
520 } catch (InvalidProtocolBufferException ipbe) {
521 throw new IOException(ipbe);
522 }
523
524 return ProtobufUtil.toScan(scan);
525 }
526
527
528
529
530
531
532
533
534
535
536 public static void initTableReducerJob(String table,
537 Class<? extends TableReducer> reducer, Job job)
538 throws IOException {
539 initTableReducerJob(table, reducer, job, null);
540 }
541
542
543
544
545
546
547
548
549
550
551
552
553 public static void initTableReducerJob(String table,
554 Class<? extends TableReducer> reducer, Job job,
555 Class partitioner) throws IOException {
556 initTableReducerJob(table, reducer, job, partitioner, null, null, null);
557 }
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582 public static void initTableReducerJob(String table,
583 Class<? extends TableReducer> reducer, Job job,
584 Class partitioner, String quorumAddress, String serverClass,
585 String serverImpl) throws IOException {
586 initTableReducerJob(table, reducer, job, partitioner, quorumAddress,
587 serverClass, serverImpl, true);
588 }
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615 public static void initTableReducerJob(String table,
616 Class<? extends TableReducer> reducer, Job job,
617 Class partitioner, String quorumAddress, String serverClass,
618 String serverImpl, boolean addDependencyJars) throws IOException {
619
620 Configuration conf = job.getConfiguration();
621 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
622 job.setOutputFormatClass(TableOutputFormat.class);
623 if (reducer != null) job.setReducerClass(reducer);
624 conf.set(TableOutputFormat.OUTPUT_TABLE, table);
625 conf.setStrings("io.serializations", conf.get("io.serializations"),
626 MutationSerialization.class.getName(), ResultSerialization.class.getName());
627
628 if (quorumAddress != null) {
629
630 ZKUtil.transformClusterKey(quorumAddress);
631 conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
632 }
633 if (serverClass != null && serverImpl != null) {
634 conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
635 conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
636 }
637 job.setOutputKeyClass(ImmutableBytesWritable.class);
638 job.setOutputValueClass(Writable.class);
639 if (partitioner == HRegionPartitioner.class) {
640 job.setPartitionerClass(HRegionPartitioner.class);
641 int regions = MetaReader.getRegionCount(conf, table);
642 if (job.getNumReduceTasks() > regions) {
643 job.setNumReduceTasks(regions);
644 }
645 } else if (partitioner != null) {
646 job.setPartitionerClass(partitioner);
647 }
648
649 if (addDependencyJars) {
650 addDependencyJars(job);
651 }
652
653 initCredentials(job);
654 }
655
656
657
658
659
660
661
662
663
664 public static void limitNumReduceTasks(String table, Job job)
665 throws IOException {
666 int regions = MetaReader.getRegionCount(job.getConfiguration(), table);
667 if (job.getNumReduceTasks() > regions)
668 job.setNumReduceTasks(regions);
669 }
670
671
672
673
674
675
676
677
678
679 public static void setNumReduceTasks(String table, Job job)
680 throws IOException {
681 job.setNumReduceTasks(MetaReader.getRegionCount(job.getConfiguration(), table));
682 }
683
684
685
686
687
688
689
690
691
692
693 public static void setScannerCaching(Job job, int batchSize) {
694 job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
695 }
696
697
698
699
700
701
702
703
704
705
706
707
708
709 public static void addHBaseDependencyJars(Configuration conf) throws IOException {
710 addDependencyJars(conf,
711
712 org.apache.hadoop.hbase.HConstants.class,
713 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class,
714 org.apache.hadoop.hbase.client.Put.class,
715 org.apache.hadoop.hbase.CompatibilityFactory.class,
716 org.apache.hadoop.hbase.mapreduce.TableMapper.class,
717
718 org.apache.zookeeper.ZooKeeper.class,
719 org.jboss.netty.channel.ChannelFactory.class,
720 com.google.protobuf.Message.class,
721 com.google.common.collect.Lists.class,
722 org.cloudera.htrace.Trace.class);
723 }
724
725
726
727
728
729 public static String buildDependencyClasspath(Configuration conf) {
730 if (conf == null) {
731 throw new IllegalArgumentException("Must provide a configuration object.");
732 }
733 Set<String> paths = new HashSet<String>(conf.getStringCollection("tmpjars"));
734 if (paths.size() == 0) {
735 throw new IllegalArgumentException("Configuration contains no tmpjars.");
736 }
737 StringBuilder sb = new StringBuilder();
738 for (String s : paths) {
739
740 int idx = s.indexOf(":");
741 if (idx != -1) s = s.substring(idx + 1);
742 if (sb.length() > 0) sb.append(File.pathSeparator);
743 sb.append(s);
744 }
745 return sb.toString();
746 }
747
748
749
750
751
752
753 public static void addDependencyJars(Job job) throws IOException {
754 addHBaseDependencyJars(job.getConfiguration());
755 try {
756 addDependencyJars(job.getConfiguration(),
757
758
759 job.getMapOutputKeyClass(),
760 job.getMapOutputValueClass(),
761 job.getInputFormatClass(),
762 job.getOutputKeyClass(),
763 job.getOutputValueClass(),
764 job.getOutputFormatClass(),
765 job.getPartitionerClass(),
766 job.getCombinerClass());
767 } catch (ClassNotFoundException e) {
768 throw new IOException(e);
769 }
770 }
771
772
773
774
775
776
777 public static void addDependencyJars(Configuration conf,
778 Class<?>... classes) throws IOException {
779
780 FileSystem localFs = FileSystem.getLocal(conf);
781 Set<String> jars = new HashSet<String>();
782
783 jars.addAll(conf.getStringCollection("tmpjars"));
784
785
786
787 Map<String, String> packagedClasses = new HashMap<String, String>();
788
789
790 for (Class<?> clazz : classes) {
791 if (clazz == null) continue;
792
793 Path path = findOrCreateJar(clazz, localFs, packagedClasses);
794 if (path == null) {
795 LOG.warn("Could not find jar for class " + clazz +
796 " in order to ship it to the cluster.");
797 continue;
798 }
799 if (!localFs.exists(path)) {
800 LOG.warn("Could not validate jar file " + path + " for class "
801 + clazz);
802 continue;
803 }
804 jars.add(path.toString());
805 }
806 if (jars.isEmpty()) return;
807
808 conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
809 }
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825 private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
826 Map<String, String> packagedClasses)
827 throws IOException {
828
829 String jar = findContainingJar(my_class, packagedClasses);
830 if (null == jar || jar.isEmpty()) {
831 jar = getJar(my_class);
832 updateMap(jar, packagedClasses);
833 }
834
835 if (null == jar || jar.isEmpty()) {
836 return null;
837 }
838
839 LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
840 return new Path(jar).makeQualified(fs);
841 }
842
843
844
845
846
847
848
849 private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
850 if (null == jar || jar.isEmpty()) {
851 return;
852 }
853 ZipFile zip = null;
854 try {
855 zip = new ZipFile(jar);
856 for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
857 ZipEntry entry = iter.nextElement();
858 if (entry.getName().endsWith("class")) {
859 packagedClasses.put(entry.getName(), jar);
860 }
861 }
862 } finally {
863 if (null != zip) zip.close();
864 }
865 }
866
867
868
869
870
871
872
873
874
875
876 private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
877 throws IOException {
878 ClassLoader loader = my_class.getClassLoader();
879 String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
880
881
882 for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
883 URL url = itr.nextElement();
884 if ("jar".equals(url.getProtocol())) {
885 String toReturn = url.getPath();
886 if (toReturn.startsWith("file:")) {
887 toReturn = toReturn.substring("file:".length());
888 }
889
890
891
892
893
894
895 toReturn = toReturn.replaceAll("\\+", "%2B");
896 toReturn = URLDecoder.decode(toReturn, "UTF-8");
897 return toReturn.replaceAll("!.*$", "");
898 }
899 }
900
901
902
903 return packagedClasses.get(class_file);
904 }
905
906
907
908
909
910
911
912
913 private static String getJar(Class<?> my_class) {
914 String ret = null;
915 String hadoopJarFinder = "org.apache.hadoop.util.JarFinder";
916 Class<?> jarFinder = null;
917 try {
918 LOG.debug("Looking for " + hadoopJarFinder + ".");
919 jarFinder = Class.forName(hadoopJarFinder);
920 LOG.debug(hadoopJarFinder + " found.");
921 Method getJar = jarFinder.getMethod("getJar", Class.class);
922 ret = (String) getJar.invoke(null, my_class);
923 } catch (ClassNotFoundException e) {
924 LOG.debug("Using backported JarFinder.");
925 ret = JarFinder.getJar(my_class);
926 } catch (InvocationTargetException e) {
927
928
929 throw new RuntimeException(e.getCause());
930 } catch (Exception e) {
931
932 throw new RuntimeException("getJar invocation failed.", e);
933 }
934
935 return ret;
936 }
937 }