View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.mockito.Mockito.mock;
22  import static org.mockito.Mockito.when;
23  
24  import java.io.IOException;
25  import java.util.List;
26  
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.HBaseTestingUtility;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
32  import org.apache.hadoop.hbase.LargeTests;
33  import org.apache.hadoop.hbase.TableName;
34  import org.apache.hadoop.hbase.client.Result;
35  import org.apache.hadoop.hbase.client.Scan;
36  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37  import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.io.NullWritable;
40  import org.apache.hadoop.mapreduce.InputSplit;
41  import org.apache.hadoop.mapreduce.Job;
42  import org.apache.hadoop.mapreduce.RecordReader;
43  import org.apache.hadoop.mapreduce.Reducer;
44  import org.apache.hadoop.mapreduce.TaskAttemptContext;
45  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
46  import org.junit.Assert;
47  import org.junit.Test;
48  import org.junit.experimental.categories.Category;
49  
50  import com.google.common.collect.Lists;
51  
52  @Category(LargeTests.class)
53  public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
54  
55    public static byte[] bbb = Bytes.toBytes("bbb");
56    public static byte[] yyy = Bytes.toBytes("yyy");
57  
58    @Override
59    protected byte[] getStartRow() {
60      return bbb;
61    }
62  
63    @Override
64    protected byte[] getEndRow() {
65      return yyy;
66    }
67  
68    @Test
69    public void testGetBestLocations() throws IOException {
70      TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl();
71      Configuration conf = UTIL.getConfiguration();
72  
73      HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
74      Assert.assertEquals(Lists.newArrayList(), tsif.getBestLocations(conf, blockDistribution));
75  
76      blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
77      Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
78  
79      blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
80      Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
81  
82      blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 1);
83      Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
84  
85      blockDistribution = new HDFSBlocksDistribution();
86      blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 10);
87      blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 7);
88      blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 5);
89      blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 1);
90      Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
91  
92      blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 2);
93      Assert.assertEquals(Lists.newArrayList("h1", "h2"), tsif.getBestLocations(conf, blockDistribution));
94  
95      blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 3);
96      Assert.assertEquals(Lists.newArrayList("h2", "h1"), tsif.getBestLocations(conf, blockDistribution));
97  
98      blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6);
99      blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9);
100 
101     Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4", "h1"), tsif.getBestLocations(conf, blockDistribution));
102   }
103 
104   public static enum TestTableSnapshotCounters {
105     VALIDATION_ERROR
106   }
107 
108   public static class TestTableSnapshotMapper
109     extends TableMapper<ImmutableBytesWritable, NullWritable> {
110     @Override
111     protected void map(ImmutableBytesWritable key, Result value,
112         Context context) throws IOException, InterruptedException {
113       // Validate a single row coming from the snapshot, and emit the row key
114       verifyRowFromMap(key, value);
115       context.write(key, NullWritable.get());
116     }
117   }
118 
119   public static class TestTableSnapshotReducer
120     extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
121     HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(bbb, yyy);
122     @Override
123     protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values,
124        Context context) throws IOException, InterruptedException {
125       rowTracker.addRow(key.get());
126     }
127 
128     @Override
129     protected void cleanup(Context context) throws IOException,
130         InterruptedException {
131       rowTracker.validate();
132     }
133   }
134 
135   @Test
136   public void testInitTableSnapshotMapperJobConfig() throws Exception {
137     setupCluster();
138     TableName tableName = TableName.valueOf("testInitTableSnapshotMapperJobConfig");
139     String snapshotName = "foo";
140 
141     try {
142       createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
143       Job job = new Job(UTIL.getConfiguration());
144       Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
145 
146       TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
147         new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
148         NullWritable.class, job, false, tmpTableDir);
149 
150       // TODO: would be better to examine directly the cache instance that results from this
151       // config. Currently this is not possible because BlockCache initialization is static.
152       Assert.assertEquals(
153         "Snapshot job should be configured for default LruBlockCache.",
154         HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
155         job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
156       Assert.assertEquals(
157         "Snapshot job should not use SlabCache.",
158         0, job.getConfiguration().getFloat("hbase.offheapcache.percentage", -1), 0.01);
159       Assert.assertEquals(
160         "Snapshot job should not use BucketCache.",
161         0, job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01);
162     } finally {
163       UTIL.getHBaseAdmin().deleteSnapshot(snapshotName);
164       UTIL.deleteTable(tableName);
165       tearDownCluster();
166     }
167   }
168 
169   public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
170       int numRegions, int expectedNumSplits) throws Exception {
171     setupCluster();
172     TableName tableName = TableName.valueOf("testWithMockedMapReduce");
173     try {
174       createTableAndSnapshot(
175         util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
176 
177       Job job = new Job(util.getConfiguration());
178       Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
179       Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan
180 
181       TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
182           scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
183           NullWritable.class, job, false, tmpTableDir);
184 
185       verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
186 
187     } finally {
188       util.getHBaseAdmin().deleteSnapshot(snapshotName);
189       util.deleteTable(tableName);
190       tearDownCluster();
191     }
192   }
193 
194   private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
195       byte[] startRow, byte[] stopRow)
196       throws IOException, InterruptedException {
197     TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
198     List<InputSplit> splits = tsif.getSplits(job);
199 
200     Assert.assertEquals(expectedNumSplits, splits.size());
201 
202     HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
203 
204     for (int i = 0; i < splits.size(); i++) {
205       // validate input split
206       InputSplit split = splits.get(i);
207       Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
208 
209       // validate record reader
210       TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
211       when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
212       RecordReader<ImmutableBytesWritable, Result> rr = tsif.createRecordReader(split, taskAttemptContext);
213       rr.initialize(split, taskAttemptContext);
214 
215       // validate we can read all the data back
216       while (rr.nextKeyValue()) {
217         byte[] row = rr.getCurrentKey().get();
218         verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue());
219         rowTracker.addRow(row);
220       }
221 
222       rr.close();
223     }
224 
225     // validate all rows are seen
226     rowTracker.validate();
227   }
228 
229   @Override
230   protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
231       String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
232       boolean shutdownCluster) throws Exception {
233     doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
234       numRegions, expectedNumSplits, shutdownCluster);
235   }
236 
237   // this is also called by the IntegrationTestTableSnapshotInputFormat
238   public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
239       String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
240       int expectedNumSplits, boolean shutdownCluster) throws Exception {
241 
242     //create the table and snapshot
243     createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
244 
245     if (shutdownCluster) {
246       util.shutdownMiniHBaseCluster();
247     }
248 
249     try {
250       // create the job
251       Job job = new Job(util.getConfiguration());
252       Scan scan = new Scan(startRow, endRow); // limit the scan
253 
254       job.setJarByClass(util.getClass());
255       TableMapReduceUtil.addDependencyJars(job.getConfiguration(), TestTableSnapshotInputFormat.class);
256 
257       TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
258         scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
259         NullWritable.class, job, true, tableDir);
260 
261       job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
262       job.setNumReduceTasks(1);
263       job.setOutputFormatClass(NullOutputFormat.class);
264 
265       Assert.assertTrue(job.waitForCompletion(true));
266     } finally {
267       if (!shutdownCluster) {
268         util.getHBaseAdmin().deleteSnapshot(snapshotName);
269         util.deleteTable(tableName);
270       }
271     }
272   }
273 }