1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23
24 import java.io.File;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Set;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileUtil;
35 import org.apache.hadoop.hbase.HBaseTestingUtility;
36 import org.apache.hadoop.hbase.testclassification.LargeTests;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.client.Put;
39 import org.apache.hadoop.hbase.client.Result;
40 import org.apache.hadoop.hbase.client.Table;
41 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.mapred.JobClient;
44 import org.apache.hadoop.mapred.JobConf;
45 import org.apache.hadoop.mapred.MapReduceBase;
46 import org.apache.hadoop.mapred.OutputCollector;
47 import org.apache.hadoop.mapred.Reporter;
48 import org.apache.hadoop.mapred.RunningJob;
49 import org.junit.AfterClass;
50 import org.junit.Assert;
51 import org.junit.Before;
52 import org.junit.BeforeClass;
53 import org.junit.Test;
54 import org.junit.experimental.categories.Category;
55
56 import com.google.common.collect.ImmutableMap;
57 import com.google.common.collect.ImmutableSet;
58
59 @Category(LargeTests.class)
60 public class TestTableMapReduceUtil {
61
62 private static final Log LOG = LogFactory
63 .getLog(TestTableMapReduceUtil.class);
64
65 private static Table presidentsTable;
66 private static final String TABLE_NAME = "People";
67
68 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
69 private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name");
70
71 private static ImmutableSet<String> presidentsRowKeys = ImmutableSet.of(
72 "president1", "president2", "president3");
73 private static Iterator<String> presidentNames = ImmutableSet.of(
74 "John F. Kennedy", "George W. Bush", "Barack Obama").iterator();
75
76 private static ImmutableSet<String> actorsRowKeys = ImmutableSet.of("actor1",
77 "actor2");
78 private static Iterator<String> actorNames = ImmutableSet.of(
79 "Jack Nicholson", "Martin Freeman").iterator();
80
81 private static String PRESIDENT_PATTERN = "president";
82 private static String ACTOR_PATTERN = "actor";
83 private static ImmutableMap<String, ImmutableSet<String>> relation = ImmutableMap
84 .of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys);
85
86 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
87
88 @BeforeClass
89 public static void beforeClass() throws Exception {
90 UTIL.startMiniCluster();
91 presidentsTable = createAndFillTable(TableName.valueOf(TABLE_NAME));
92 UTIL.startMiniMapReduceCluster();
93 }
94
95 @AfterClass
96 public static void afterClass() throws Exception {
97 UTIL.shutdownMiniMapReduceCluster();
98 UTIL.shutdownMiniCluster();
99 }
100
101 @Before
102 public void before() throws IOException {
103 LOG.info("before");
104 UTIL.ensureSomeRegionServersAvailable(1);
105 LOG.info("before done");
106 }
107
108 public static Table createAndFillTable(TableName tableName) throws IOException {
109 Table table = UTIL.createTable(tableName, COLUMN_FAMILY);
110 createPutCommand(table);
111 return table;
112 }
113
114 private static void createPutCommand(Table table) throws IOException {
115 for (String president : presidentsRowKeys) {
116 if (presidentNames.hasNext()) {
117 Put p = new Put(Bytes.toBytes(president));
118 p.add(COLUMN_FAMILY, COLUMN_QUALIFIER,
119 Bytes.toBytes(presidentNames.next()));
120 table.put(p);
121 }
122 }
123
124 for (String actor : actorsRowKeys) {
125 if (actorNames.hasNext()) {
126 Put p = new Put(Bytes.toBytes(actor));
127 p.add(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next()));
128 table.put(p);
129 }
130 }
131 }
132
133
134
135
136
137 @Test
138 public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable()
139 throws IOException {
140 Assert.assertNotNull(presidentsTable);
141 Configuration cfg = UTIL.getConfiguration();
142 JobConf jobConf = new JobConf(cfg);
143 TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
144 TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
145 TableMapReduceUtil.setScannerCaching(jobConf, 100);
146 assertEquals(1, jobConf.getNumReduceTasks());
147 assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0));
148
149 jobConf.setNumReduceTasks(10);
150 TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
151 TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
152 assertEquals(1, jobConf.getNumReduceTasks());
153 }
154
155 @Test
156 public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable()
157 throws IOException {
158 Configuration cfg = UTIL.getConfiguration();
159 JobConf jobConf = new JobConf(cfg);
160 TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
161 TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
162 assertEquals(1, jobConf.getNumMapTasks());
163
164 jobConf.setNumMapTasks(10);
165 TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
166 TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
167 assertEquals(1, jobConf.getNumMapTasks());
168 }
169
170 @Test
171 @SuppressWarnings("deprecation")
172 public void shoudBeValidMapReduceEvaluation() throws Exception {
173 Configuration cfg = UTIL.getConfiguration();
174 JobConf jobConf = new JobConf(cfg);
175 try {
176 jobConf.setJobName("process row task");
177 jobConf.setNumReduceTasks(1);
178 TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
179 ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
180 jobConf);
181 TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
182 ClassificatorRowReduce.class, jobConf);
183 RunningJob job = JobClient.runJob(jobConf);
184 assertTrue(job.isSuccessful());
185 } finally {
186 if (jobConf != null)
187 FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
188 }
189 }
190
191 @Test
192 @SuppressWarnings("deprecation")
193 public void shoudBeValidMapReduceWithPartitionerEvaluation()
194 throws IOException {
195 Configuration cfg = UTIL.getConfiguration();
196 JobConf jobConf = new JobConf(cfg);
197 try {
198 jobConf.setJobName("process row task");
199 jobConf.setNumReduceTasks(2);
200 TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
201 ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
202 jobConf);
203
204 TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
205 ClassificatorRowReduce.class, jobConf, HRegionPartitioner.class);
206 RunningJob job = JobClient.runJob(jobConf);
207 assertTrue(job.isSuccessful());
208 } finally {
209 if (jobConf != null)
210 FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
211 }
212 }
213
214 @SuppressWarnings("deprecation")
215 static class ClassificatorRowReduce extends MapReduceBase implements
216 TableReduce<ImmutableBytesWritable, Put> {
217
218 @Override
219 public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
220 OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter)
221 throws IOException {
222 String strKey = Bytes.toString(key.get());
223 List<Put> result = new ArrayList<Put>();
224 while (values.hasNext())
225 result.add(values.next());
226
227 if (relation.keySet().contains(strKey)) {
228 Set<String> set = relation.get(strKey);
229 if (set != null) {
230 assertEquals(set.size(), result.size());
231 } else {
232 throwAccertionError("Test infrastructure error: set is null");
233 }
234 } else {
235 throwAccertionError("Test infrastructure error: key not found in map");
236 }
237 }
238
239 private void throwAccertionError(String errorMessage) throws AssertionError {
240 throw new AssertionError(errorMessage);
241 }
242 }
243
244 @SuppressWarnings("deprecation")
245 static class ClassificatorMapper extends MapReduceBase implements
246 TableMap<ImmutableBytesWritable, Put> {
247
248 @Override
249 public void map(ImmutableBytesWritable row, Result result,
250 OutputCollector<ImmutableBytesWritable, Put> outCollector,
251 Reporter reporter) throws IOException {
252 String rowKey = Bytes.toString(result.getRow());
253 final ImmutableBytesWritable pKey = new ImmutableBytesWritable(
254 Bytes.toBytes(PRESIDENT_PATTERN));
255 final ImmutableBytesWritable aKey = new ImmutableBytesWritable(
256 Bytes.toBytes(ACTOR_PATTERN));
257 ImmutableBytesWritable outKey = null;
258
259 if (rowKey.startsWith(PRESIDENT_PATTERN)) {
260 outKey = pKey;
261 } else if (rowKey.startsWith(ACTOR_PATTERN)) {
262 outKey = aKey;
263 } else {
264 throw new AssertionError("unexpected rowKey");
265 }
266
267 String name = Bytes.toString(result.getValue(COLUMN_FAMILY,
268 COLUMN_QUALIFIER));
269 outCollector.collect(outKey, new Put(Bytes.toBytes("rowKey2")).add(
270 COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name)));
271 }
272 }
273 }