1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23
24 import java.io.IOException;
25 import java.util.Map;
26 import java.util.NavigableMap;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.HBaseTestingUtility;
33 import org.apache.hadoop.hbase.client.HTable;
34 import org.apache.hadoop.hbase.client.Result;
35 import org.apache.hadoop.hbase.client.Scan;
36 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.io.NullWritable;
39 import org.apache.hadoop.mapreduce.Job;
40 import org.apache.hadoop.mapreduce.Reducer;
41 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
42 import org.junit.AfterClass;
43 import org.junit.BeforeClass;
44
45
46
47
48
49
50
51
52
53
54
55
56 public abstract class TestTableInputFormatScanBase {
57
58 static final Log LOG = LogFactory.getLog(TestTableInputFormatScanBase.class);
59 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
60
61 static final byte[] TABLE_NAME = Bytes.toBytes("scantest");
62 static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
63 static final String KEY_STARTROW = "startRow";
64 static final String KEY_LASTROW = "stpRow";
65
66 private static HTable table = null;
67
68 @BeforeClass
69 public static void setUpBeforeClass() throws Exception {
70
71
72 System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
73
74
75 TEST_UTIL.enableDebug(TableInputFormat.class);
76 TEST_UTIL.enableDebug(TableInputFormatBase.class);
77
78 TEST_UTIL.startMiniCluster(3);
79
80 table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY);
81 TEST_UTIL.createMultiRegions(table, INPUT_FAMILY);
82 TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
83
84 TEST_UTIL.startMiniMapReduceCluster();
85 }
86
87 @AfterClass
88 public static void tearDownAfterClass() throws Exception {
89 TEST_UTIL.shutdownMiniMapReduceCluster();
90 TEST_UTIL.shutdownMiniCluster();
91 }
92
93
94
95
96 public static class ScanMapper
97 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
98
99
100
101
102
103
104
105
106
107 @Override
108 public void map(ImmutableBytesWritable key, Result value,
109 Context context)
110 throws IOException, InterruptedException {
111 if (value.size() != 1) {
112 throw new IOException("There should only be one input column");
113 }
114 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
115 cf = value.getMap();
116 if(!cf.containsKey(INPUT_FAMILY)) {
117 throw new IOException("Wrong input columns. Missing: '" +
118 Bytes.toString(INPUT_FAMILY) + "'.");
119 }
120 String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
121 LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
122 ", value -> " + val);
123 context.write(key, key);
124 }
125
126 }
127
128
129
130
131 public static class ScanReducer
132 extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
133 NullWritable, NullWritable> {
134
135 private String first = null;
136 private String last = null;
137
138 protected void reduce(ImmutableBytesWritable key,
139 Iterable<ImmutableBytesWritable> values, Context context)
140 throws IOException ,InterruptedException {
141 int count = 0;
142 for (ImmutableBytesWritable value : values) {
143 String val = Bytes.toStringBinary(value.get());
144 LOG.info("reduce: key[" + count + "] -> " +
145 Bytes.toStringBinary(key.get()) + ", value -> " + val);
146 if (first == null) first = val;
147 last = val;
148 count++;
149 }
150 }
151
152 protected void cleanup(Context context)
153 throws IOException, InterruptedException {
154 Configuration c = context.getConfiguration();
155 String startRow = c.get(KEY_STARTROW);
156 String lastRow = c.get(KEY_LASTROW);
157 LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
158 LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
159 if (startRow != null && startRow.length() > 0) {
160 assertEquals(startRow, first);
161 }
162 if (lastRow != null && lastRow.length() > 0) {
163 assertEquals(lastRow, last);
164 }
165 }
166
167 }
168
169
170
171
172
173
174
175
176 protected void testScanFromConfiguration(String start, String stop, String last)
177 throws IOException, InterruptedException, ClassNotFoundException {
178 String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase() : "Empty") +
179 "To" + (stop != null ? stop.toUpperCase() : "Empty");
180 Configuration c = new Configuration(TEST_UTIL.getConfiguration());
181 c.set(TableInputFormat.INPUT_TABLE, Bytes.toString(TABLE_NAME));
182 c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILY));
183 c.set(KEY_STARTROW, start != null ? start : "");
184 c.set(KEY_LASTROW, last != null ? last : "");
185
186 if (start != null) {
187 c.set(TableInputFormat.SCAN_ROW_START, start);
188 }
189
190 if (stop != null) {
191 c.set(TableInputFormat.SCAN_ROW_STOP, stop);
192 }
193
194 Job job = new Job(c, jobName);
195 job.setMapperClass(ScanMapper.class);
196 job.setReducerClass(ScanReducer.class);
197 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
198 job.setMapOutputValueClass(ImmutableBytesWritable.class);
199 job.setInputFormatClass(TableInputFormat.class);
200 job.setNumReduceTasks(1);
201 FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
202 TableMapReduceUtil.addDependencyJars(job);
203 assertTrue(job.waitForCompletion(true));
204 }
205
206
207
208
209
210
211
212
213 protected void testScan(String start, String stop, String last)
214 throws IOException, InterruptedException, ClassNotFoundException {
215 String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") +
216 "To" + (stop != null ? stop.toUpperCase() : "Empty");
217 LOG.info("Before map/reduce startup - job " + jobName);
218 Configuration c = new Configuration(TEST_UTIL.getConfiguration());
219 Scan scan = new Scan();
220 scan.addFamily(INPUT_FAMILY);
221 if (start != null) {
222 scan.setStartRow(Bytes.toBytes(start));
223 }
224 c.set(KEY_STARTROW, start != null ? start : "");
225 if (stop != null) {
226 scan.setStopRow(Bytes.toBytes(stop));
227 }
228 c.set(KEY_LASTROW, last != null ? last : "");
229 LOG.info("scan before: " + scan);
230 Job job = new Job(c, jobName);
231 TableMapReduceUtil.initTableMapperJob(
232 Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
233 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
234 job.setReducerClass(ScanReducer.class);
235 job.setNumReduceTasks(1);
236 FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
237 LOG.info("Started " + job.getJobName());
238 assertTrue(job.waitForCompletion(true));
239 LOG.info("After map/reduce completion - job " + jobName);
240 }
241
242 }
243