View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertFalse;
22  import static org.junit.Assert.assertTrue;
23  import static org.junit.Assert.fail;
24  
25  import java.io.File;
26  import java.io.IOException;
27  import java.util.Iterator;
28  import java.util.Map;
29  import java.util.NavigableMap;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileUtil;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.HBaseTestingUtility;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.LargeTests;
41  import org.apache.hadoop.hbase.client.HTable;
42  import org.apache.hadoop.hbase.client.Put;
43  import org.apache.hadoop.hbase.client.Result;
44  import org.apache.hadoop.hbase.client.ResultScanner;
45  import org.apache.hadoop.hbase.client.Scan;
46  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.mapreduce.Job;
49  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
50  import org.junit.AfterClass;
51  import org.junit.BeforeClass;
52  import org.junit.Test;
53  import org.junit.experimental.categories.Category;
54  
55  /**
56   * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
57   * on our tables is simple - take every row in the table, reverse the value of
58   * a particular cell, and write it back to the table.
59   */
60  @Category(LargeTests.class)
61  public class TestTableMapReduce extends TestTableMapReduceBase {
62    private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
63  
64    protected Log getLog() { return LOG; }
65  
66    /**
67     * Pass the given key and processed record reduce
68     */
69    static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> {
70  
71      /**
72       * Pass the key, and reversed value to reduce
73       *
74       * @param key
75       * @param value
76       * @param context
77       * @throws IOException
78       */
79      public void map(ImmutableBytesWritable key, Result value,
80        Context context)
81      throws IOException, InterruptedException {
82        if (value.size() != 1) {
83          throw new IOException("There should only be one input column");
84        }
85        Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
86          cf = value.getMap();
87        if(!cf.containsKey(INPUT_FAMILY)) {
88          throw new IOException("Wrong input columns. Missing: '" +
89            Bytes.toString(INPUT_FAMILY) + "'.");
90        }
91  
92        // Get the original value and reverse it
93        String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
94        StringBuilder newValue = new StringBuilder(originalValue);
95        newValue.reverse();
96        // Now set the value to be collected
97        Put outval = new Put(key.get());
98        outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
99        context.write(key, outval);
100     }
101   }
102 
103   protected void runTestOnTable(HTable table) throws IOException {
104     Job job = null;
105     try {
106       LOG.info("Before map/reduce startup");
107       job = new Job(table.getConfiguration(), "process column contents");
108       job.setNumReduceTasks(1);
109       Scan scan = new Scan();
110       scan.addFamily(INPUT_FAMILY);
111       TableMapReduceUtil.initTableMapperJob(
112         Bytes.toString(table.getTableName()), scan,
113         ProcessContentsMapper.class, ImmutableBytesWritable.class,
114         Put.class, job);
115       TableMapReduceUtil.initTableReducerJob(
116         Bytes.toString(table.getTableName()),
117         IdentityTableReducer.class, job);
118       FileOutputFormat.setOutputPath(job, new Path("test"));
119       LOG.info("Started " + Bytes.toString(table.getTableName()));
120       assertTrue(job.waitForCompletion(true));
121       LOG.info("After map/reduce completion");
122 
123       // verify map-reduce results
124       verify(Bytes.toString(table.getTableName()));
125     } catch (InterruptedException e) {
126       throw new IOException(e);
127     } catch (ClassNotFoundException e) {
128       throw new IOException(e);
129     } finally {
130       table.close();
131       if (job != null) {
132         FileUtil.fullyDelete(
133           new File(job.getConfiguration().get("hadoop.tmp.dir")));
134       }
135     }
136   }
137 }