1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23 import static org.junit.Assert.fail;
24
25 import java.io.File;
26 import java.io.IOException;
27 import java.util.Iterator;
28 import java.util.Map;
29 import java.util.NavigableMap;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileUtil;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.CellUtil;
38 import org.apache.hadoop.hbase.HBaseTestingUtility;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.LargeTests;
41 import org.apache.hadoop.hbase.client.HTable;
42 import org.apache.hadoop.hbase.client.Put;
43 import org.apache.hadoop.hbase.client.Result;
44 import org.apache.hadoop.hbase.client.ResultScanner;
45 import org.apache.hadoop.hbase.client.Scan;
46 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.apache.hadoop.mapreduce.Job;
49 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
50 import org.junit.AfterClass;
51 import org.junit.BeforeClass;
52 import org.junit.Test;
53 import org.junit.experimental.categories.Category;
54
55
56
57
58
59
60 @Category(LargeTests.class)
61 public class TestTableMapReduce extends TestTableMapReduceBase {
62 private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
63
64 protected Log getLog() { return LOG; }
65
66
67
68
69 static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> {
70
71
72
73
74
75
76
77
78
79 public void map(ImmutableBytesWritable key, Result value,
80 Context context)
81 throws IOException, InterruptedException {
82 if (value.size() != 1) {
83 throw new IOException("There should only be one input column");
84 }
85 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
86 cf = value.getMap();
87 if(!cf.containsKey(INPUT_FAMILY)) {
88 throw new IOException("Wrong input columns. Missing: '" +
89 Bytes.toString(INPUT_FAMILY) + "'.");
90 }
91
92
93 String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
94 StringBuilder newValue = new StringBuilder(originalValue);
95 newValue.reverse();
96
97 Put outval = new Put(key.get());
98 outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
99 context.write(key, outval);
100 }
101 }
102
103 protected void runTestOnTable(HTable table) throws IOException {
104 Job job = null;
105 try {
106 LOG.info("Before map/reduce startup");
107 job = new Job(table.getConfiguration(), "process column contents");
108 job.setNumReduceTasks(1);
109 Scan scan = new Scan();
110 scan.addFamily(INPUT_FAMILY);
111 TableMapReduceUtil.initTableMapperJob(
112 Bytes.toString(table.getTableName()), scan,
113 ProcessContentsMapper.class, ImmutableBytesWritable.class,
114 Put.class, job);
115 TableMapReduceUtil.initTableReducerJob(
116 Bytes.toString(table.getTableName()),
117 IdentityTableReducer.class, job);
118 FileOutputFormat.setOutputPath(job, new Path("test"));
119 LOG.info("Started " + Bytes.toString(table.getTableName()));
120 assertTrue(job.waitForCompletion(true));
121 LOG.info("After map/reduce completion");
122
123
124 verify(Bytes.toString(table.getTableName()));
125 } catch (InterruptedException e) {
126 throw new IOException(e);
127 } catch (ClassNotFoundException e) {
128 throw new IOException(e);
129 } finally {
130 table.close();
131 if (job != null) {
132 FileUtil.fullyDelete(
133 new File(job.getConfiguration().get("hadoop.tmp.dir")));
134 }
135 }
136 }
137 }