1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.when;
23
24 import java.io.IOException;
25 import java.util.List;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.CategoryBasedTimeout;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
33 import org.apache.hadoop.hbase.testclassification.LargeTests;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.client.Result;
36 import org.apache.hadoop.hbase.client.Scan;
37 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
38 import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.io.NullWritable;
41 import org.apache.hadoop.mapreduce.InputSplit;
42 import org.apache.hadoop.mapreduce.Job;
43 import org.apache.hadoop.mapreduce.RecordReader;
44 import org.apache.hadoop.mapreduce.Reducer;
45 import org.apache.hadoop.mapreduce.TaskAttemptContext;
46 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
47 import org.junit.After;
48 import org.junit.Assert;
49 import org.junit.Rule;
50 import org.junit.Test;
51 import org.junit.experimental.categories.Category;
52 import org.junit.rules.TestRule;
53
54 import com.google.common.collect.Lists;
55
56 @Category(LargeTests.class)
57 public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
58 @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
59 withTimeout(this.getClass()).withLookingForStuckThread(true).build();
60
61 private static final byte[] bbb = Bytes.toBytes("bbb");
62 private static final byte[] yyy = Bytes.toBytes("yyy");
63
64 @Override
65 protected byte[] getStartRow() {
66 return bbb;
67 }
68
69 @Override
70 protected byte[] getEndRow() {
71 return yyy;
72 }
73
74 @After
75 public void tearDown() throws Exception {
76 }
77
78 @Test
79 public void testGetBestLocations() throws IOException {
80 TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl();
81 Configuration conf = UTIL.getConfiguration();
82
83 HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
84 Assert.assertEquals(Lists.newArrayList(), tsif.getBestLocations(conf, blockDistribution));
85
86 blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
87 Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
88
89 blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
90 Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
91
92 blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 1);
93 Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
94
95 blockDistribution = new HDFSBlocksDistribution();
96 blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 10);
97 blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 7);
98 blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 5);
99 blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 1);
100 Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
101
102 blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 2);
103 Assert.assertEquals(Lists.newArrayList("h1", "h2"),
104 tsif.getBestLocations(conf, blockDistribution));
105
106 blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 3);
107 Assert.assertEquals(Lists.newArrayList("h2", "h1"),
108 tsif.getBestLocations(conf, blockDistribution));
109
110 blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6);
111 blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9);
112
113 Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4", "h1"),
114 tsif.getBestLocations(conf, blockDistribution));
115 }
116
117 public static enum TestTableSnapshotCounters {
118 VALIDATION_ERROR
119 }
120
121 public static class TestTableSnapshotMapper
122 extends TableMapper<ImmutableBytesWritable, NullWritable> {
123 @Override
124 protected void map(ImmutableBytesWritable key, Result value,
125 Context context) throws IOException, InterruptedException {
126
127 verifyRowFromMap(key, value);
128 context.write(key, NullWritable.get());
129 }
130 }
131
132 public static class TestTableSnapshotReducer
133 extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
134 HBaseTestingUtility.SeenRowTracker rowTracker =
135 new HBaseTestingUtility.SeenRowTracker(bbb, yyy);
136 @Override
137 protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values,
138 Context context) throws IOException, InterruptedException {
139 rowTracker.addRow(key.get());
140 }
141
142 @Override
143 protected void cleanup(Context context) throws IOException,
144 InterruptedException {
145 rowTracker.validate();
146 }
147 }
148
149 @Test
150 public void testInitTableSnapshotMapperJobConfig() throws Exception {
151 setupCluster();
152 TableName tableName = TableName.valueOf("testInitTableSnapshotMapperJobConfig");
153 String snapshotName = "foo";
154
155 try {
156 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
157 Job job = new Job(UTIL.getConfiguration());
158 Path tmpTableDir = UTIL.getRandomDir();
159
160 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
161 new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
162 NullWritable.class, job, false, tmpTableDir);
163
164
165
166 Assert.assertEquals(
167 "Snapshot job should be configured for default LruBlockCache.",
168 HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
169 job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
170 Assert.assertEquals(
171 "Snapshot job should not use BucketCache.",
172 0, job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01);
173 } finally {
174 UTIL.getHBaseAdmin().deleteSnapshot(snapshotName);
175 UTIL.deleteTable(tableName);
176 tearDownCluster();
177 }
178 }
179
180 @Override
181 public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
182 String snapshotName, Path tmpTableDir) throws Exception {
183 Job job = new Job(UTIL.getConfiguration());
184 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
185 new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
186 NullWritable.class, job, false, tmpTableDir);
187 }
188
189 @Override
190 public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
191 int numRegions, int expectedNumSplits) throws Exception {
192 setupCluster();
193 TableName tableName = TableName.valueOf("testWithMockedMapReduce");
194 try {
195 createTableAndSnapshot(
196 util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
197
198 Job job = new Job(util.getConfiguration());
199 Path tmpTableDir = util.getRandomDir();
200 Scan scan = new Scan(getStartRow(), getEndRow());
201
202 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
203 scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
204 NullWritable.class, job, false, tmpTableDir);
205
206 verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
207
208 } finally {
209 util.getHBaseAdmin().deleteSnapshot(snapshotName);
210 util.deleteTable(tableName);
211 tearDownCluster();
212 }
213 }
214
215 private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
216 byte[] startRow, byte[] stopRow)
217 throws IOException, InterruptedException {
218 TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
219 List<InputSplit> splits = tsif.getSplits(job);
220
221 Assert.assertEquals(expectedNumSplits, splits.size());
222
223 HBaseTestingUtility.SeenRowTracker rowTracker =
224 new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
225
226 for (int i = 0; i < splits.size(); i++) {
227
228 InputSplit split = splits.get(i);
229 Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
230
231
232 TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
233 when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
234 RecordReader<ImmutableBytesWritable, Result> rr =
235 tsif.createRecordReader(split, taskAttemptContext);
236 rr.initialize(split, taskAttemptContext);
237
238
239 while (rr.nextKeyValue()) {
240 byte[] row = rr.getCurrentKey().get();
241 verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue());
242 rowTracker.addRow(row);
243 }
244
245 rr.close();
246 }
247
248
249 rowTracker.validate();
250 }
251
252 @Override
253 protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
254 String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
255 boolean shutdownCluster) throws Exception {
256 doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
257 numRegions, expectedNumSplits, shutdownCluster);
258 }
259
260
261 public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
262 String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
263 int expectedNumSplits, boolean shutdownCluster) throws Exception {
264
265
266 createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
267
268 if (shutdownCluster) {
269 util.shutdownMiniHBaseCluster();
270 }
271
272 try {
273
274 Job job = new Job(util.getConfiguration());
275 Scan scan = new Scan(startRow, endRow);
276
277 job.setJarByClass(util.getClass());
278 TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
279 TestTableSnapshotInputFormat.class);
280
281 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
282 scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
283 NullWritable.class, job, true, tableDir);
284
285 job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
286 job.setNumReduceTasks(1);
287 job.setOutputFormatClass(NullOutputFormat.class);
288
289 Assert.assertTrue(job.waitForCompletion(true));
290 } finally {
291 if (!shutdownCluster) {
292 util.getHBaseAdmin().deleteSnapshot(snapshotName);
293 util.deleteTable(tableName);
294 }
295 }
296 }
297 }