1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23 import static org.junit.Assert.fail;
24
25 import java.io.File;
26 import java.io.IOException;
27 import java.util.Map;
28 import java.util.NavigableMap;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.fs.FileUtil;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.testclassification.LargeTests;
35 import org.apache.hadoop.hbase.client.HTable;
36 import org.apache.hadoop.hbase.client.Put;
37 import org.apache.hadoop.hbase.client.Result;
38 import org.apache.hadoop.hbase.client.Scan;
39 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.mapreduce.Job;
42 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
43 import org.junit.experimental.categories.Category;
44
45
46
47
48
49
50 @Category(LargeTests.class)
51 public class TestTableMapReduce extends TestTableMapReduceBase {
52 private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
53
54 protected Log getLog() { return LOG; }
55
56
57
58
59 static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> {
60
61
62
63
64
65
66
67
68
69 public void map(ImmutableBytesWritable key, Result value,
70 Context context)
71 throws IOException, InterruptedException {
72 if (value.size() != 1) {
73 throw new IOException("There should only be one input column");
74 }
75 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
76 cf = value.getMap();
77 if(!cf.containsKey(INPUT_FAMILY)) {
78 throw new IOException("Wrong input columns. Missing: '" +
79 Bytes.toString(INPUT_FAMILY) + "'.");
80 }
81
82
83 String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
84 StringBuilder newValue = new StringBuilder(originalValue);
85 newValue.reverse();
86
87 Put outval = new Put(key.get());
88 outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
89 context.write(key, outval);
90 }
91 }
92
93 protected void runTestOnTable(HTable table) throws IOException {
94 Job job = null;
95 try {
96 LOG.info("Before map/reduce startup");
97 job = new Job(table.getConfiguration(), "process column contents");
98 job.setNumReduceTasks(1);
99 Scan scan = new Scan();
100 scan.addFamily(INPUT_FAMILY);
101 TableMapReduceUtil.initTableMapperJob(
102 Bytes.toString(table.getTableName()), scan,
103 ProcessContentsMapper.class, ImmutableBytesWritable.class,
104 Put.class, job);
105 TableMapReduceUtil.initTableReducerJob(
106 Bytes.toString(table.getTableName()),
107 IdentityTableReducer.class, job);
108 FileOutputFormat.setOutputPath(job, new Path("test"));
109 LOG.info("Started " + Bytes.toString(table.getTableName()));
110 assertTrue(job.waitForCompletion(true));
111 LOG.info("After map/reduce completion");
112
113
114 verify(table.getName());
115 } catch (InterruptedException e) {
116 throw new IOException(e);
117 } catch (ClassNotFoundException e) {
118 throw new IOException(e);
119 } finally {
120 table.close();
121 if (job != null) {
122 FileUtil.fullyDelete(
123 new File(job.getConfiguration().get("hadoop.tmp.dir")));
124 }
125 }
126 }
127 }