1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.lang.reflect.Constructor;
22 import java.lang.reflect.InvocationTargetException;
23 import java.lang.reflect.Method;
24
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.mapred.JobConf;
27 import org.apache.hadoop.mapred.MiniMRCluster;
28 import org.apache.hadoop.mapreduce.Job;
29 import org.apache.hadoop.mapreduce.JobContext;
30 import org.apache.hadoop.mapreduce.JobID;
31
32
33
34
35
36
37
38 abstract public class MapreduceTestingShim {
39 private static MapreduceTestingShim instance;
40 private static Class[] emptyParam = new Class[] {};
41
42 static {
43 try {
44
45 Class c = Class
46 .forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
47 instance = new MapreduceV2Shim();
48 } catch (Exception e) {
49 instance = new MapreduceV1Shim();
50 }
51 }
52
53 abstract public JobContext newJobContext(Configuration jobConf)
54 throws IOException;
55
56 abstract public JobConf obtainJobConf(MiniMRCluster cluster);
57
58 abstract public String obtainMROutputDirProp();
59
60 public static JobContext createJobContext(Configuration jobConf)
61 throws IOException {
62 return instance.newJobContext(jobConf);
63 }
64
65 public static JobConf getJobConf(MiniMRCluster cluster) {
66 return instance.obtainJobConf(cluster);
67 }
68
69 public static String getMROutputDirProp() {
70 return instance.obtainMROutputDirProp();
71 }
72
73 private static class MapreduceV1Shim extends MapreduceTestingShim {
74 public JobContext newJobContext(Configuration jobConf) throws IOException {
75
76
77 JobID jobId = new JobID();
78 Constructor<JobContext> c;
79 try {
80 c = JobContext.class.getConstructor(Configuration.class, JobID.class);
81 return c.newInstance(jobConf, jobId);
82 } catch (Exception e) {
83 throw new IllegalStateException(
84 "Failed to instantiate new JobContext(jobConf, new JobID())", e);
85 }
86 }
87
88 public JobConf obtainJobConf(MiniMRCluster cluster) {
89 if (cluster == null) return null;
90 try {
91 Object runner = cluster.getJobTrackerRunner();
92 Method meth = runner.getClass().getDeclaredMethod("getJobTracker", emptyParam);
93 Object tracker = meth.invoke(runner, new Object []{});
94 Method m = tracker.getClass().getDeclaredMethod("getConf", emptyParam);
95 return (JobConf) m.invoke(tracker, new Object []{});
96 } catch (NoSuchMethodException nsme) {
97 return null;
98 } catch (InvocationTargetException ite) {
99 return null;
100 } catch (IllegalAccessException iae) {
101 return null;
102 }
103 }
104
105 @Override
106 public String obtainMROutputDirProp() {
107 return "mapred.output.dir";
108 }
109 };
110
111 private static class MapreduceV2Shim extends MapreduceTestingShim {
112 public JobContext newJobContext(Configuration jobConf) {
113
114
115 try {
116 Method m = Job.class.getMethod("getInstance", Configuration.class);
117 return (JobContext) m.invoke(null, jobConf);
118 } catch (Exception e) {
119 e.printStackTrace();
120 throw new IllegalStateException(
121 "Failed to return from Job.getInstance(jobConf)");
122 }
123 }
124
125 public JobConf obtainJobConf(MiniMRCluster cluster) {
126 try {
127 Method meth = MiniMRCluster.class.getMethod("getJobTrackerConf", emptyParam);
128 return (JobConf) meth.invoke(cluster, new Object []{});
129 } catch (NoSuchMethodException nsme) {
130 return null;
131 } catch (InvocationTargetException ite) {
132 return null;
133 } catch (IllegalAccessException iae) {
134 return null;
135 }
136 }
137
138 @Override
139 public String obtainMROutputDirProp() {
140
141
142 return "mapreduce.output.fileoutputformat.outputdir";
143 }
144 };
145
146 }