View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.io.IOException;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.NavigableMap;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.HBaseTestingUtility;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.client.HTable;
36  import org.apache.hadoop.hbase.client.Result;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.io.NullWritable;
41  import org.apache.hadoop.mapreduce.InputSplit;
42  import org.apache.hadoop.mapreduce.Job;
43  import org.apache.hadoop.mapreduce.Reducer;
44  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
45  import org.junit.AfterClass;
46  import org.junit.Assert;
47  import org.junit.BeforeClass;
48  
49  
50  /**
51   * <p>
52   * Tests various scan start and stop row scenarios. This is set in a scan and
53   * tested in a MapReduce job to see if that is handed over and done properly
54   * too.
55   * </p>
56   * <p>
57   * This test is broken into two parts in order to side-step the test timeout
58   * period of 900, as documented in HBASE-8326.
59   * </p>
60   */
61  public abstract class TestTableInputFormatScanBase {
62  
63    private static final Log LOG = LogFactory.getLog(TestTableInputFormatScanBase.class);
64    static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
65  
66    static final byte[] TABLE_NAME = Bytes.toBytes("scantest");
67    static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
68    static final String KEY_STARTROW = "startRow";
69    static final String KEY_LASTROW = "stpRow";
70  
71    private static HTable table = null;
72  
73    @BeforeClass
74    public static void setUpBeforeClass() throws Exception {
75      // test intermittently fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on.
76      // this turns it off for this test.  TODO: Figure out why scr breaks recovery. 
77      System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
78  
79      // switch TIF to log at DEBUG level
80      TEST_UTIL.enableDebug(TableInputFormat.class);
81      TEST_UTIL.enableDebug(TableInputFormatBase.class);
82      TEST_UTIL.setJobWithoutMRCluster();
83      // start mini hbase cluster
84      TEST_UTIL.startMiniCluster(3);
85      // create and fill table
86      table = TEST_UTIL.createMultiRegionTable(TableName.valueOf(TABLE_NAME), INPUT_FAMILY);
87      TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
88    }
89  
90    @AfterClass
91    public static void tearDownAfterClass() throws Exception {
92      TEST_UTIL.shutdownMiniCluster();
93    }
94  
95    /**
96     * Pass the key and value to reduce.
97     */
98    public static class ScanMapper
99    extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
100 
101     /**
102      * Pass the key and value to reduce.
103      *
104      * @param key  The key, here "aaa", "aab" etc.
105      * @param value  The value is the same as the key.
106      * @param context  The task context.
107      * @throws IOException When reading the rows fails.
108      */
109     @Override
110     public void map(ImmutableBytesWritable key, Result value,
111       Context context)
112     throws IOException, InterruptedException {
113       if (value.size() != 1) {
114         throw new IOException("There should only be one input column");
115       }
116       Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
117         cf = value.getMap();
118       if(!cf.containsKey(INPUT_FAMILY)) {
119         throw new IOException("Wrong input columns. Missing: '" +
120           Bytes.toString(INPUT_FAMILY) + "'.");
121       }
122       String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
123       LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
124         ", value -> " + val);
125       context.write(key, key);
126     }
127 
128   }
129 
130   /**
131    * Checks the last and first key seen against the scanner boundaries.
132    */
133   public static class ScanReducer
134   extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
135                   NullWritable, NullWritable> {
136 
137     private String first = null;
138     private String last = null;
139 
140     protected void reduce(ImmutableBytesWritable key,
141         Iterable<ImmutableBytesWritable> values, Context context)
142     throws IOException ,InterruptedException {
143       int count = 0;
144       for (ImmutableBytesWritable value : values) {
145         String val = Bytes.toStringBinary(value.get());
146         LOG.info("reduce: key[" + count + "] -> " +
147           Bytes.toStringBinary(key.get()) + ", value -> " + val);
148         if (first == null) first = val;
149         last = val;
150         count++;
151       }
152     }
153 
154     protected void cleanup(Context context)
155     throws IOException, InterruptedException {
156       Configuration c = context.getConfiguration();
157       String startRow = c.get(KEY_STARTROW);
158       String lastRow = c.get(KEY_LASTROW);
159       LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
160       LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
161       if (startRow != null && startRow.length() > 0) {
162         assertEquals(startRow, first);
163       }
164       if (lastRow != null && lastRow.length() > 0) {
165         assertEquals(lastRow, last);
166       }
167     }
168 
169   }
170 
171   /**
172    * Tests an MR Scan initialized from properties set in the Configuration.
173    * 
174    * @throws IOException
175    * @throws ClassNotFoundException
176    * @throws InterruptedException
177    */
178   protected void testScanFromConfiguration(String start, String stop, String last)
179   throws IOException, InterruptedException, ClassNotFoundException {
180     String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase() : "Empty") +
181       "To" + (stop != null ? stop.toUpperCase() : "Empty");
182     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
183     c.set(TableInputFormat.INPUT_TABLE, Bytes.toString(TABLE_NAME));
184     c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILY));
185     c.set(KEY_STARTROW, start != null ? start : "");
186     c.set(KEY_LASTROW, last != null ? last : "");
187 
188     if (start != null) {
189       c.set(TableInputFormat.SCAN_ROW_START, start);
190     }
191 
192     if (stop != null) {
193       c.set(TableInputFormat.SCAN_ROW_STOP, stop);
194     }
195 
196     Job job = new Job(c, jobName);
197     job.setMapperClass(ScanMapper.class);
198     job.setReducerClass(ScanReducer.class);
199     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
200     job.setMapOutputValueClass(ImmutableBytesWritable.class);
201     job.setInputFormatClass(TableInputFormat.class);
202     job.setNumReduceTasks(1);
203     FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
204     TableMapReduceUtil.addDependencyJars(job);
205     assertTrue(job.waitForCompletion(true));
206   }
207 
208   /**
209    * Tests a MR scan using specific start and stop rows.
210    *
211    * @throws IOException
212    * @throws ClassNotFoundException
213    * @throws InterruptedException
214    */
215   protected void testScan(String start, String stop, String last)
216   throws IOException, InterruptedException, ClassNotFoundException {
217     String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") +
218       "To" + (stop != null ? stop.toUpperCase() : "Empty");
219     LOG.info("Before map/reduce startup - job " + jobName);
220     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
221     Scan scan = new Scan();
222     scan.addFamily(INPUT_FAMILY);
223     if (start != null) {
224       scan.setStartRow(Bytes.toBytes(start));
225     }
226     c.set(KEY_STARTROW, start != null ? start : "");
227     if (stop != null) {
228       scan.setStopRow(Bytes.toBytes(stop));
229     }
230     c.set(KEY_LASTROW, last != null ? last : "");
231     LOG.info("scan before: " + scan);
232     Job job = new Job(c, jobName);
233     TableMapReduceUtil.initTableMapperJob(
234       Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
235       ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
236     job.setReducerClass(ScanReducer.class);
237     job.setNumReduceTasks(1); // one to get final "first" and "last" key
238     FileOutputFormat.setOutputPath(job,
239         new Path(TEST_UTIL.getDataTestDir(), job.getJobName()));
240     LOG.info("Started " + job.getJobName());
241     assertTrue(job.waitForCompletion(true));
242     LOG.info("After map/reduce completion - job " + jobName);
243   }
244 
245 
246   /**
247    * Tests a MR scan using data skew auto-balance
248    *
249    * @throws IOException
250    * @throws ClassNotFoundException
251    * @throws InterruptedException
252    */
253   public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException,
254           InterruptedException,
255           ClassNotFoundException {
256     String jobName = "TestJobForNumOfSplits";
257     LOG.info("Before map/reduce startup - job " + jobName);
258     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
259     Scan scan = new Scan();
260     scan.addFamily(INPUT_FAMILY);
261     c.set("hbase.mapreduce.input.autobalance", "true");
262     c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio);
263     c.set(KEY_STARTROW, "");
264     c.set(KEY_LASTROW, "");
265     Job job = new Job(c, jobName);
266     TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
267             ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
268     TableInputFormat tif = new TableInputFormat();
269     tif.setConf(job.getConfiguration());
270     Assert.assertEquals(new String(TABLE_NAME), new String(table.getTableName()));
271     List<InputSplit> splits = tif.getSplits(job);
272     Assert.assertEquals(expectedNumOfSplits, splits.size());
273   }
274 
275   /**
276    * Tests for the getSplitKey() method in TableInputFormatBase.java
277    */
278   public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText) {
279     byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText);
280       Assert.assertArrayEquals(splitKey, result);
281   }
282 }
283