View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import com.google.common.collect.Lists;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.fs.FileUtil;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.CategoryBasedTimeout;
28  import org.apache.hadoop.hbase.HBaseTestingUtility;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.client.HTable;
31  import org.apache.hadoop.hbase.client.Result;
32  import org.apache.hadoop.hbase.client.Scan;
33  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.apache.hadoop.io.NullWritable;
36  import org.apache.hadoop.mapreduce.Job;
37  import org.apache.hadoop.mapreduce.Reducer;
38  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
39  import org.junit.After;
40  import org.junit.AfterClass;
41  import org.junit.BeforeClass;
42  import org.junit.Rule;
43  import org.junit.Test;
44  import org.junit.rules.TestRule;
45  
46  import java.io.File;
47  import java.io.IOException;
48  import java.util.ArrayList;
49  import java.util.List;
50  import java.util.Map;
51  import java.util.NavigableMap;
52  
53  import static org.junit.Assert.assertEquals;
54  import static org.junit.Assert.assertTrue;
55  
56  /**
57   * Base set of tests and setup for input formats touching multiple tables.
58   */
59  public abstract class MultiTableInputFormatTestBase {
60    static final Log LOG = LogFactory.getLog(MultiTableInputFormatTestBase.class);
61    @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
62        withTimeout(this.getClass()).withLookingForStuckThread(true).build();
63    public static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
64    static final String TABLE_NAME = "scantest";
65    static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
66    static final String KEY_STARTROW = "startRow";
67    static final String KEY_LASTROW = "stpRow";
68  
69    static List<String> TABLES = Lists.newArrayList();
70  
71    static {
72      for (int i = 0; i < 3; i++) {
73        TABLES.add(TABLE_NAME + String.valueOf(i));
74      }
75    }
76  
77    @BeforeClass
78    public static void setUpBeforeClass() throws Exception {
79      // switch TIF to log at DEBUG level
80      TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
81      TEST_UTIL.setJobWithoutMRCluster();
82      // start mini hbase cluster
83      TEST_UTIL.startMiniCluster(3);
84      // create and fill table
85      for (String tableName : TABLES) {
86        HTable table = null;
87        try {
88          table = TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName), INPUT_FAMILY, 4);
89          TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
90        } finally {
91            if (table != null) {
92              table.close();
93            }
94          }
95      }
96    }
97  
98    @AfterClass
99    public static void tearDownAfterClass() throws Exception {
100     TEST_UTIL.shutdownMiniCluster();
101   }
102 
103   @After
104   public void tearDown() throws Exception {
105     Configuration c = TEST_UTIL.getConfiguration();
106     FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
107   }
108 
109   /**
110    * Pass the key and value to reducer.
111    */
112   public static class ScanMapper extends
113       TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
114     /**
115      * Pass the key and value to reduce.
116      *
117      * @param key The key, here "aaa", "aab" etc.
118      * @param value The value is the same as the key.
119      * @param context The task context.
120      * @throws IOException When reading the rows fails.
121      */
122     @Override
123     public void map(ImmutableBytesWritable key, Result value, Context context)
124         throws IOException, InterruptedException {
125       makeAssertions(key, value);
126       context.write(key, key);
127     }
128 
129     public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException {
130       if (value.size() != 1) {
131         throw new IOException("There should only be one input column");
132       }
133       Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf =
134           value.getMap();
135       if (!cf.containsKey(INPUT_FAMILY)) {
136         throw new IOException("Wrong input columns. Missing: '" +
137             Bytes.toString(INPUT_FAMILY) + "'.");
138       }
139       String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
140       LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
141           ", value -> " + val);
142     }
143   }
144 
145   /**
146    * Checks the last and first keys seen against the scanner boundaries.
147    */
148   public static class ScanReducer
149       extends
150       Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
151           NullWritable, NullWritable> {
152     private String first = null;
153     private String last = null;
154 
155     @Override
156     protected void reduce(ImmutableBytesWritable key,
157         Iterable<ImmutableBytesWritable> values, Context context)
158         throws IOException, InterruptedException {
159       makeAssertions(key, values);
160     }
161 
162     protected void makeAssertions(ImmutableBytesWritable key,
163         Iterable<ImmutableBytesWritable> values) {
164       int count = 0;
165       for (ImmutableBytesWritable value : values) {
166         String val = Bytes.toStringBinary(value.get());
167         LOG.debug("reduce: key[" + count + "] -> " +
168             Bytes.toStringBinary(key.get()) + ", value -> " + val);
169         if (first == null) first = val;
170         last = val;
171         count++;
172       }
173       assertEquals(3, count);
174     }
175 
176     @Override
177     protected void cleanup(Context context) throws IOException,
178         InterruptedException {
179       Configuration c = context.getConfiguration();
180       cleanup(c);
181     }
182 
183     protected void cleanup(Configuration c) {
184       String startRow = c.get(KEY_STARTROW);
185       String lastRow = c.get(KEY_LASTROW);
186       LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
187           startRow + "\"");
188       LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
189           "\"");
190       if (startRow != null && startRow.length() > 0) {
191         assertEquals(startRow, first);
192       }
193       if (lastRow != null && lastRow.length() > 0) {
194         assertEquals(lastRow, last);
195       }
196     }
197   }
198 
199   @Test
200   public void testScanEmptyToEmpty() throws IOException, InterruptedException,
201       ClassNotFoundException {
202     testScan(null, null, null);
203   }
204 
205   @Test
206   public void testScanEmptyToAPP() throws IOException, InterruptedException,
207       ClassNotFoundException {
208     testScan(null, "app", "apo");
209   }
210 
211   @Test
212   public void testScanOBBToOPP() throws IOException, InterruptedException,
213       ClassNotFoundException {
214     testScan("obb", "opp", "opo");
215   }
216 
217   @Test
218   public void testScanYZYToEmpty() throws IOException, InterruptedException,
219       ClassNotFoundException {
220     testScan("yzy", null, "zzz");
221   }
222 
223   /**
224    * Tests a MR scan using specific start and stop rows.
225    *
226    * @throws IOException
227    * @throws ClassNotFoundException
228    * @throws InterruptedException
229    */
230   private void testScan(String start, String stop, String last)
231       throws IOException, InterruptedException, ClassNotFoundException {
232     String jobName =
233         "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" +
234             (stop != null ? stop.toUpperCase() : "Empty");
235     LOG.info("Before map/reduce startup - job " + jobName);
236     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
237 
238     c.set(KEY_STARTROW, start != null ? start : "");
239     c.set(KEY_LASTROW, last != null ? last : "");
240 
241     List<Scan> scans = new ArrayList<Scan>();
242 
243     for (String tableName : TABLES) {
244       Scan scan = new Scan();
245 
246       scan.addFamily(INPUT_FAMILY);
247       scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName));
248 
249       if (start != null) {
250         scan.setStartRow(Bytes.toBytes(start));
251       }
252       if (stop != null) {
253         scan.setStopRow(Bytes.toBytes(stop));
254       }
255 
256       scans.add(scan);
257 
258       LOG.info("scan before: " + scan);
259     }
260 
261     runJob(jobName, c, scans);
262   }
263 
264   protected void runJob(String jobName, Configuration c, List<Scan> scans)
265       throws IOException, InterruptedException, ClassNotFoundException {
266     Job job = new Job(c, jobName);
267 
268     initJob(scans, job);
269     job.setReducerClass(ScanReducer.class);
270     job.setNumReduceTasks(1); // one to get final "first" and "last" key
271     FileOutputFormat.setOutputPath(job,
272       new Path(TEST_UTIL.getDataTestDirOnTestFS(), job.getJobName()));
273     LOG.info("Started " + job.getJobName());
274     job.waitForCompletion(true);
275     assertTrue(job.isSuccessful());
276     LOG.info("After map/reduce completion - job " + jobName);
277   }
278 
279   protected abstract void initJob(List<Scan> scans, Job job) throws IOException;
280 
281 
282 }