View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.io.File;
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Set;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileUtil;
35  import org.apache.hadoop.hbase.HBaseTestingUtility;
36  import org.apache.hadoop.hbase.LargeTests;
37  import org.apache.hadoop.hbase.client.HTable;
38  import org.apache.hadoop.hbase.client.Put;
39  import org.apache.hadoop.hbase.client.Result;
40  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.mapred.JobClient;
43  import org.apache.hadoop.mapred.JobConf;
44  import org.apache.hadoop.mapred.MapReduceBase;
45  import org.apache.hadoop.mapred.OutputCollector;
46  import org.apache.hadoop.mapred.Reporter;
47  import org.apache.hadoop.mapred.RunningJob;
48  import org.junit.AfterClass;
49  import org.junit.Assert;
50  import org.junit.Before;
51  import org.junit.BeforeClass;
52  import org.junit.Test;
53  import org.junit.experimental.categories.Category;
54  
55  import com.google.common.collect.ImmutableMap;
56  import com.google.common.collect.ImmutableSet;
57  
58  @Category(LargeTests.class)
59  public class TestTableMapReduceUtil {
60  
61    private static final Log LOG = LogFactory
62        .getLog(TestTableMapReduceUtil.class);
63  
64    private static HTable presidentsTable;
65    private static final String TABLE_NAME = "People";
66  
67    private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
68    private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name");
69  
70    private static ImmutableSet<String> presidentsRowKeys = ImmutableSet.of(
71        "president1", "president2", "president3");
72    private static Iterator<String> presidentNames = ImmutableSet.of(
73        "John F. Kennedy", "George W. Bush", "Barack Obama").iterator();
74  
75    private static ImmutableSet<String> actorsRowKeys = ImmutableSet.of("actor1",
76        "actor2");
77    private static Iterator<String> actorNames = ImmutableSet.of(
78        "Jack Nicholson", "Martin Freeman").iterator();
79  
80    private static String PRESIDENT_PATTERN = "president";
81    private static String ACTOR_PATTERN = "actor";
82    private static ImmutableMap<String, ImmutableSet<String>> relation = ImmutableMap
83        .of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys);
84  
85    private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
86  
87    @BeforeClass
88    public static void beforeClass() throws Exception {
89      UTIL.startMiniCluster();
90      presidentsTable = createAndFillTable(Bytes.toBytes(TABLE_NAME));
91      UTIL.startMiniMapReduceCluster();
92    }
93  
94    @AfterClass
95    public static void afterClass() throws Exception {
96      UTIL.shutdownMiniMapReduceCluster();
97      UTIL.shutdownMiniCluster();
98    }
99  
100   @Before
101   public void before() throws IOException {
102     LOG.info("before");
103     UTIL.ensureSomeRegionServersAvailable(1);
104     LOG.info("before done");
105   }
106 
107   public static HTable createAndFillTable(byte[] tableName) throws IOException {
108     HTable table = UTIL.createTable(tableName, COLUMN_FAMILY);
109     createPutCommand(table);
110     return table;
111   }
112 
113   private static void createPutCommand(HTable table) throws IOException {
114     for (String president : presidentsRowKeys) {
115       if (presidentNames.hasNext()) {
116         Put p = new Put(Bytes.toBytes(president));
117         p.add(COLUMN_FAMILY, COLUMN_QUALIFIER,
118             Bytes.toBytes(presidentNames.next()));
119         table.put(p);
120       }
121     }
122 
123     for (String actor : actorsRowKeys) {
124       if (actorNames.hasNext()) {
125         Put p = new Put(Bytes.toBytes(actor));
126         p.add(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next()));
127         table.put(p);
128       }
129     }
130   }
131 
132   /**
133    * Check what the given number of reduce tasks for the given job configuration
134    * does not exceed the number of regions for the given table.
135    */
136   @Test
137   @SuppressWarnings("deprecation")
138   public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable()
139       throws IOException {
140     Assert.assertNotNull(presidentsTable);
141     Configuration cfg = UTIL.getConfiguration();
142     JobConf jobConf = new JobConf(cfg);
143     TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
144     TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
145     TableMapReduceUtil.setScannerCaching(jobConf, 100);
146     assertEquals(1, jobConf.getNumReduceTasks());
147     assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0));
148 
149     jobConf.setNumReduceTasks(10);
150     TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
151     TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
152     assertEquals(1, jobConf.getNumReduceTasks());
153   }
154 
155   @Test
156   @SuppressWarnings("deprecation")
157   public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable()
158       throws IOException {
159     Configuration cfg = UTIL.getConfiguration();
160     JobConf jobConf = new JobConf(cfg);
161     TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
162     TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
163     assertEquals(1, jobConf.getNumMapTasks());
164 
165     jobConf.setNumMapTasks(10);
166     TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
167     TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
168     assertEquals(1, jobConf.getNumMapTasks());
169   }
170 
171   @Test
172   @SuppressWarnings("deprecation")
173   public void shoudBeValidMapReduceEvaluation() throws Exception {
174     Configuration cfg = UTIL.getConfiguration();
175     JobConf jobConf = new JobConf(cfg);
176     try {
177       jobConf.setJobName("process row task");
178       jobConf.setNumReduceTasks(1);
179       TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
180           ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
181           jobConf);
182       TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
183           ClassificatorRowReduce.class, jobConf);
184       RunningJob job = JobClient.runJob(jobConf);
185       assertTrue(job.isSuccessful());
186     } finally {
187       if (jobConf != null)
188         FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
189     }
190   }
191 
192   @Test
193   @SuppressWarnings("deprecation")
194   public void shoudBeValidMapReduceWithPartitionerEvaluation()
195       throws IOException {
196     Configuration cfg = UTIL.getConfiguration();
197     JobConf jobConf = new JobConf(cfg);
198     try {
199       jobConf.setJobName("process row task");
200       jobConf.setNumReduceTasks(2);
201       TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
202           ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
203           jobConf);
204 
205       TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
206           ClassificatorRowReduce.class, jobConf, HRegionPartitioner.class);
207       RunningJob job = JobClient.runJob(jobConf);
208       assertTrue(job.isSuccessful());
209     } finally {
210       if (jobConf != null)
211         FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
212     }
213   }
214 
215   @SuppressWarnings("deprecation")
216   static class ClassificatorRowReduce extends MapReduceBase implements
217       TableReduce<ImmutableBytesWritable, Put> {
218 
219     @Override
220     public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
221         OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter)
222         throws IOException {
223       String strKey = Bytes.toString(key.get());
224       List<Put> result = new ArrayList<Put>();
225       while (values.hasNext())
226         result.add(values.next());
227 
228       if (relation.keySet().contains(strKey)) {
229         Set<String> set = relation.get(strKey);
230         if (set != null) {
231           assertEquals(set.size(), result.size());
232         } else {
233           throwAccertionError("Test infrastructure error: set is null");
234         }
235       } else {
236         throwAccertionError("Test infrastructure error: key not found in map");
237       }
238     }
239 
240     private void throwAccertionError(String errorMessage) throws AssertionError {
241       throw new AssertionError(errorMessage);
242     }
243   }
244 
245   @SuppressWarnings("deprecation")
246   static class ClassificatorMapper extends MapReduceBase implements
247       TableMap<ImmutableBytesWritable, Put> {
248 
249     @Override
250     public void map(ImmutableBytesWritable row, Result result,
251         OutputCollector<ImmutableBytesWritable, Put> outCollector,
252         Reporter reporter) throws IOException {
253       String rowKey = Bytes.toString(result.getRow());
254       final ImmutableBytesWritable pKey = new ImmutableBytesWritable(
255           Bytes.toBytes(PRESIDENT_PATTERN));
256       final ImmutableBytesWritable aKey = new ImmutableBytesWritable(
257           Bytes.toBytes(ACTOR_PATTERN));
258       ImmutableBytesWritable outKey = null;
259 
260       if (rowKey.startsWith(PRESIDENT_PATTERN)) {
261         outKey = pKey;
262       } else if (rowKey.startsWith(ACTOR_PATTERN)) {
263         outKey = aKey;
264       } else {
265         throw new AssertionError("unexpected rowKey");
266       }
267 
268       String name = Bytes.toString(result.getValue(COLUMN_FAMILY,
269           COLUMN_QUALIFIER));
270       outCollector.collect(outKey, new Put(Bytes.toBytes("rowKey2")).add(
271           COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name)));
272     }
273   }
274 }