View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapred;
20  
21  import static org.mockito.Mockito.mock;
22  
23  import org.apache.hadoop.fs.Path;
24  import org.apache.hadoop.hbase.HBaseTestingUtility;
25  import org.apache.hadoop.hbase.HConstants;
26  import org.apache.hadoop.hbase.LargeTests;
27  import org.apache.hadoop.hbase.TableName;
28  import org.apache.hadoop.hbase.client.Result;
29  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
30  import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.apache.hadoop.io.NullWritable;
33  import org.apache.hadoop.mapred.InputSplit;
34  import org.apache.hadoop.mapred.JobClient;
35  import org.apache.hadoop.mapred.JobConf;
36  import org.apache.hadoop.mapred.MapReduceBase;
37  import org.apache.hadoop.mapred.OutputCollector;
38  import org.apache.hadoop.mapred.RecordReader;
39  import org.apache.hadoop.mapred.Reducer;
40  import org.apache.hadoop.mapred.Reporter;
41  import org.apache.hadoop.mapred.RunningJob;
42  import org.apache.hadoop.mapred.lib.NullOutputFormat;
43  import org.junit.Assert;
44  import org.junit.Test;
45  import org.junit.experimental.categories.Category;
46  
47  import java.io.IOException;
48  import java.util.Iterator;
49  
50  @Category(LargeTests.class)
51  public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
52  
53    private static final byte[] aaa = Bytes.toBytes("aaa");
54    private static final byte[] after_zzz = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
55    private static final String COLUMNS =
56      Bytes.toString(FAMILIES[0]) + " " + Bytes.toString(FAMILIES[1]);
57  
58    @Override
59    protected byte[] getStartRow() {
60      return aaa;
61    }
62  
63    @Override
64    protected byte[] getEndRow() {
65      return after_zzz;
66    }
67  
68    static class TestTableSnapshotMapper extends MapReduceBase
69        implements  TableMap<ImmutableBytesWritable, NullWritable> {
70      @Override
71      public void map(ImmutableBytesWritable key, Result value,
72          OutputCollector<ImmutableBytesWritable, NullWritable> collector, Reporter reporter)
73          throws IOException {
74        verifyRowFromMap(key, value);
75        collector.collect(key, NullWritable.get());
76      }
77    }
78  
79    public static class TestTableSnapshotReducer extends MapReduceBase
80        implements Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
81      HBaseTestingUtility.SeenRowTracker rowTracker =
82        new HBaseTestingUtility.SeenRowTracker(aaa, after_zzz);
83  
84      @Override
85      public void reduce(ImmutableBytesWritable key, Iterator<NullWritable> values,
86          OutputCollector<NullWritable, NullWritable> collector, Reporter reporter)
87          throws IOException {
88        rowTracker.addRow(key.get());
89      }
90  
91      @Override
92      public void close() {
93        rowTracker.validate();
94      }
95    }
96  
97    @Test
98    public void testInitTableSnapshotMapperJobConfig() throws Exception {
99      setupCluster();
100     TableName tableName = TableName.valueOf("testInitTableSnapshotMapperJobConfig");
101     String snapshotName = "foo";
102 
103     try {
104       createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
105       JobConf job = new JobConf(UTIL.getConfiguration());
106       Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
107 
108       TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
109         COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
110         NullWritable.class, job, false, tmpTableDir);
111 
112       // TODO: would be better to examine directly the cache instance that results from this
113       // config. Currently this is not possible because BlockCache initialization is static.
114       Assert.assertEquals(
115         "Snapshot job should be configured for default LruBlockCache.",
116         HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
117         job.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
118       Assert.assertEquals(
119         "Snapshot job should not use SlabCache.",
120         0, job.getFloat("hbase.offheapcache.percentage", -1), 0.01);
121       Assert.assertEquals(
122         "Snapshot job should not use BucketCache.",
123         0, job.getFloat("hbase.bucketcache.size", -1), 0.01);
124     } finally {
125       UTIL.getHBaseAdmin().deleteSnapshot(snapshotName);
126       UTIL.deleteTable(tableName);
127       tearDownCluster();
128     }
129   }
130 
131   // TODO: mapred does not support limiting input range by startrow, endrow.
132   // Thus the following tests must override parameterverification.
133 
134   @Test
135   @Override
136   public void testWithMockedMapReduceMultiRegion() throws Exception {
137     testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 10);
138   }
139 
140   @Test
141   @Override
142   public void testWithMapReduceMultiRegion() throws Exception {
143     testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 10, false);
144   }
145 
146   @Test
147   @Override
148   // run the MR job while HBase is offline
149   public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
150     testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 10, true);
151   }
152 
153   @Override
154   protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
155       int numRegions, int expectedNumSplits) throws Exception {
156     setupCluster();
157     TableName tableName = TableName.valueOf("testWithMockedMapReduce");
158     try {
159       createTableAndSnapshot(
160         util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
161 
162       JobConf job = new JobConf(util.getConfiguration());
163       Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
164 
165       TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
166         COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
167         NullWritable.class, job, false, tmpTableDir);
168 
169       // mapred doesn't support start and end keys? o.O
170       verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
171 
172     } finally {
173       util.getHBaseAdmin().deleteSnapshot(snapshotName);
174       util.deleteTable(tableName);
175       tearDownCluster();
176     }
177   }
178 
179   private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits,
180       byte[] startRow, byte[] stopRow) throws IOException, InterruptedException {
181     TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
182     InputSplit[] splits = tsif.getSplits(job, 0);
183 
184     Assert.assertEquals(expectedNumSplits, splits.length);
185 
186     HBaseTestingUtility.SeenRowTracker rowTracker =
187       new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
188 
189     for (int i = 0; i < splits.length; i++) {
190       // validate input split
191       InputSplit split = splits[i];
192       Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit);
193 
194       // validate record reader
195       OutputCollector collector = mock(OutputCollector.class);
196       Reporter reporter = mock(Reporter.class);
197       RecordReader<ImmutableBytesWritable, Result> rr = tsif.getRecordReader(split, job, reporter);
198 
199       // validate we can read all the data back
200       ImmutableBytesWritable key = rr.createKey();
201       Result value = rr.createValue();
202       while (rr.next(key, value)) {
203         verifyRowFromMap(key, value);
204         rowTracker.addRow(key.copyBytes());
205       }
206 
207       rr.close();
208     }
209 
210     // validate all rows are seen
211     rowTracker.validate();
212   }
213 
214   @Override
215   protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
216       String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
217       boolean shutdownCluster) throws Exception {
218     doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
219       numRegions, expectedNumSplits, shutdownCluster);
220   }
221 
222   // this is also called by the IntegrationTestTableSnapshotInputFormat
223   public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
224       String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
225       int expectedNumSplits, boolean shutdownCluster) throws Exception {
226 
227     //create the table and snapshot
228     createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
229 
230     if (shutdownCluster) {
231       util.shutdownMiniHBaseCluster();
232     }
233 
234     try {
235       // create the job
236       JobConf jobConf = new JobConf(util.getConfiguration());
237 
238       jobConf.setJarByClass(util.getClass());
239       org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(jobConf,
240         TestTableSnapshotInputFormat.class);
241 
242       TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
243         TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
244         NullWritable.class, jobConf, true, tableDir);
245 
246       jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
247       jobConf.setNumReduceTasks(1);
248       jobConf.setOutputFormat(NullOutputFormat.class);
249 
250       RunningJob job = JobClient.runJob(jobConf);
251       Assert.assertTrue(job.isSuccessful());
252     } finally {
253       if (!shutdownCluster) {
254         util.getHBaseAdmin().deleteSnapshot(snapshotName);
255         util.deleteTable(tableName);
256       }
257     }
258   }
259 }