1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23
24 import java.io.IOException;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.NavigableMap;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.CategoryBasedTimeout;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.client.HTable;
38 import org.apache.hadoop.hbase.client.Put;
39 import org.apache.hadoop.hbase.client.Result;
40 import org.apache.hadoop.hbase.client.ResultScanner;
41 import org.apache.hadoop.hbase.client.Scan;
42 import org.apache.hadoop.hbase.client.Table;
43 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.junit.AfterClass;
46 import org.junit.BeforeClass;
47 import org.junit.Rule;
48 import org.junit.Test;
49 import org.junit.rules.TestRule;
50
51
52
53
54
55
56
57 public abstract class TestTableMapReduceBase {
58 @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
59 withTimeout(this.getClass()).withLookingForStuckThread(true).build();
60 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
61 protected static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest");
62 protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
63 protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
64
65 protected static final byte[][] columns = new byte[][] {
66 INPUT_FAMILY,
67 OUTPUT_FAMILY
68 };
69
70
71
72
73 protected abstract Log getLog();
74
75
76
77
78 protected abstract void runTestOnTable(HTable table) throws IOException;
79
80 @BeforeClass
81 public static void beforeClass() throws Exception {
82 UTIL.setJobWithoutMRCluster();
83 UTIL.startMiniCluster();
84 HTable table =
85 UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY,
86 OUTPUT_FAMILY });
87 UTIL.loadTable(table, INPUT_FAMILY, false);
88 }
89
90 @AfterClass
91 public static void afterClass() throws Exception {
92 UTIL.shutdownMiniCluster();
93 }
94
95
96
97
98
99 @Test
100 public void testMultiRegionTable() throws IOException {
101 runTestOnTable(new HTable(UTIL.getConfiguration(), MULTI_REGION_TABLE_NAME));
102 }
103
104 @Test
105 public void testCombiner() throws IOException {
106 Configuration conf = new Configuration(UTIL.getConfiguration());
107
108 conf.setInt("mapreduce.map.combine.minspills", 1);
109 runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
110 }
111
112
113
114
115 protected static Put map(ImmutableBytesWritable key, Result value) throws IOException {
116 if (value.size() != 1) {
117 throw new IOException("There should only be one input column");
118 }
119 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
120 cf = value.getMap();
121 if(!cf.containsKey(INPUT_FAMILY)) {
122 throw new IOException("Wrong input columns. Missing: '" +
123 Bytes.toString(INPUT_FAMILY) + "'.");
124 }
125
126
127
128 String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
129 StringBuilder newValue = new StringBuilder(originalValue);
130 newValue.reverse();
131
132
133
134 Put outval = new Put(key.get());
135 outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
136 return outval;
137 }
138
139 protected void verify(TableName tableName) throws IOException {
140 Table table = new HTable(UTIL.getConfiguration(), tableName);
141 boolean verified = false;
142 long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
143 int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
144 for (int i = 0; i < numRetries; i++) {
145 try {
146 getLog().info("Verification attempt #" + i);
147 verifyAttempt(table);
148 verified = true;
149 break;
150 } catch (NullPointerException e) {
151
152
153 getLog().debug("Verification attempt failed: " + e.getMessage());
154 }
155 try {
156 Thread.sleep(pause);
157 } catch (InterruptedException e) {
158
159 }
160 }
161 assertTrue(verified);
162 }
163
164
165
166
167
168
169
170
171 private void verifyAttempt(final Table table) throws IOException, NullPointerException {
172 Scan scan = new Scan();
173 TableInputFormat.addColumns(scan, columns);
174 ResultScanner scanner = table.getScanner(scan);
175 try {
176 Iterator<Result> itr = scanner.iterator();
177 assertTrue(itr.hasNext());
178 while(itr.hasNext()) {
179 Result r = itr.next();
180 if (getLog().isDebugEnabled()) {
181 if (r.size() > 2 ) {
182 throw new IOException("Too many results, expected 2 got " +
183 r.size());
184 }
185 }
186 byte[] firstValue = null;
187 byte[] secondValue = null;
188 int count = 0;
189 for(Cell kv : r.listCells()) {
190 if (count == 0) {
191 firstValue = CellUtil.cloneValue(kv);
192 }
193 if (count == 1) {
194 secondValue = CellUtil.cloneValue(kv);
195 }
196 count++;
197 if (count == 2) {
198 break;
199 }
200 }
201
202
203 if (firstValue == null) {
204 throw new NullPointerException(Bytes.toString(r.getRow()) +
205 ": first value is null");
206 }
207 String first = Bytes.toString(firstValue);
208
209 if (secondValue == null) {
210 throw new NullPointerException(Bytes.toString(r.getRow()) +
211 ": second value is null");
212 }
213 byte[] secondReversed = new byte[secondValue.length];
214 for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
215 secondReversed[i] = secondValue[j];
216 }
217 String second = Bytes.toString(secondReversed);
218
219 if (first.compareTo(second) != 0) {
220 if (getLog().isDebugEnabled()) {
221 getLog().debug("second key is not the reverse of first. row=" +
222 Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
223 ", second value=" + second);
224 }
225 fail();
226 }
227 }
228 } finally {
229 scanner.close();
230 }
231 }
232 }