View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    * <p/>
10   * http://www.apache.org/licenses/LICENSE-2.0
11   * <p/>
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.test;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  import java.util.Random;
25  import java.util.Set;
26  import java.util.UUID;
27  import java.util.regex.Matcher;
28  import java.util.regex.Pattern;
29  
30  import org.apache.commons.cli.CommandLine;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.HBaseConfiguration;
38  import org.apache.hadoop.hbase.HColumnDescriptor;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.HTableDescriptor;
41  import org.apache.hadoop.hbase.IntegrationTestBase;
42  import org.apache.hadoop.hbase.IntegrationTestingUtility;
43  import org.apache.hadoop.hbase.IntegrationTests;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.client.HBaseAdmin;
46  import org.apache.hadoop.hbase.client.HTable;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.client.Result;
49  import org.apache.hadoop.hbase.client.Scan;
50  import org.apache.hadoop.hbase.client.ScannerCallable;
51  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
52  import org.apache.hadoop.hbase.mapreduce.NMapInputFormat;
53  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
54  import org.apache.hadoop.hbase.mapreduce.TableMapper;
55  import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
56  import org.apache.hadoop.hbase.util.AbstractHBaseTool;
57  import org.apache.hadoop.hbase.util.Bytes;
58  import org.apache.hadoop.io.BytesWritable;
59  import org.apache.hadoop.io.NullWritable;
60  import org.apache.hadoop.io.Text;
61  import org.apache.hadoop.mapreduce.Counter;
62  import org.apache.hadoop.mapreduce.Job;
63  import org.apache.hadoop.mapreduce.Mapper;
64  import org.apache.hadoop.mapreduce.Reducer;
65  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
66  import org.apache.hadoop.util.ToolRunner;
67  import org.junit.Test;
68  import org.junit.experimental.categories.Category;
69  
70  import com.google.common.collect.Sets;
71  
72  /**
73   * A large test which loads a lot of data that has internal references, and
74   * verifies the data.
75   *
76   * In load step, 200 map tasks are launched, which in turn write loadmapper.num_to_write
77   * (default 100K) rows to an hbase table. Rows are written in blocks, for a total of
78   * 100 blocks. Each row in a block, contains loadmapper.backrefs (default 50) references
79   * to random rows in the prev block.
80   *
81   * Verify step is scans the table, and verifies that for every referenced row, the row is
82   * actually there (no data loss). Failed rows are output from reduce to be saved in the
83   * job output dir in hdfs and inspected later.
84   *
85   * This class can be run as a unit test, as an integration test, or from the command line
86   *
87   * Originally taken from Apache Bigtop.
88   */
89  @Category(IntegrationTests.class)
90  public class IntegrationTestLoadAndVerify  extends IntegrationTestBase  {
91    private static final String TEST_NAME = "IntegrationTestLoadAndVerify";
92    private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
93    private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1");
94  
95    private static final String NUM_TO_WRITE_KEY =
96      "loadmapper.num_to_write";
97    private static final long NUM_TO_WRITE_DEFAULT = 100*1000;
98  
99    private static final String TABLE_NAME_KEY = "loadmapper.table";
100   private static final String TABLE_NAME_DEFAULT = "table";
101 
102   private static final String NUM_BACKREFS_KEY = "loadmapper.backrefs";
103   private static final int NUM_BACKREFS_DEFAULT = 50;
104 
105   private static final String NUM_MAP_TASKS_KEY = "loadmapper.map.tasks";
106   private static final String NUM_REDUCE_TASKS_KEY = "verify.reduce.tasks";
107   private static final int NUM_MAP_TASKS_DEFAULT = 200;
108   private static final int NUM_REDUCE_TASKS_DEFAULT = 35;
109 
110   private static final int SCANNER_CACHING = 500;
111 
112   protected IntegrationTestingUtility util;
113 
114   private String toRun = null;
115 
116   private enum Counters {
117     ROWS_WRITTEN,
118     REFERENCES_WRITTEN,
119     REFERENCES_CHECKED
120   }
121 
122   public void setUpCluster() throws Exception {
123     util = getTestingUtil(getConf());
124     util.initializeCluster(3);
125     this.setConf(util.getConfiguration());
126     getConf().setLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT / 100);
127     getConf().setInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT / 100);
128     getConf().setInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT / 10);
129     if (!util.isDistributedCluster()) {
130       util.startMiniMapReduceCluster();
131     }
132   }
133 
134 @Override
135 public void cleanUpCluster() throws Exception {
136   super.cleanUpCluster();
137   if (!util.isDistributedCluster()) {
138     util.shutdownMiniMapReduceCluster();
139   }
140 }
141 
142   /**
143    * Converts a "long" value between endian systems.
144    * Borrowed from Apache Commons IO
145    * @param value value to convert
146    * @return the converted value
147    */
148   public static long swapLong(long value)
149   {
150     return
151       ( ( ( value >> 0 ) & 0xff ) << 56 ) +
152       ( ( ( value >> 8 ) & 0xff ) << 48 ) +
153       ( ( ( value >> 16 ) & 0xff ) << 40 ) +
154       ( ( ( value >> 24 ) & 0xff ) << 32 ) +
155       ( ( ( value >> 32 ) & 0xff ) << 24 ) +
156       ( ( ( value >> 40 ) & 0xff ) << 16 ) +
157       ( ( ( value >> 48 ) & 0xff ) << 8 ) +
158       ( ( ( value >> 56 ) & 0xff ) << 0 );
159   }
160 
161   public static class LoadMapper
162       extends Mapper<NullWritable, NullWritable, NullWritable, NullWritable>
163   {
164     protected long recordsToWrite;
165     protected HTable table;
166     protected Configuration conf;
167     protected int numBackReferencesPerRow;
168     protected String shortTaskId;
169 
170     protected Random rand = new Random();
171 
172     protected Counter rowsWritten, refsWritten;
173 
174     @Override
175     public void setup(Context context) throws IOException {
176       conf = context.getConfiguration();
177       recordsToWrite = conf.getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT);
178       String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT);
179       numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT);
180       table = new HTable(conf, tableName);
181       table.setWriteBufferSize(4*1024*1024);
182       table.setAutoFlush(false, true);
183 
184       String taskId = conf.get("mapred.task.id");
185       Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId);
186       if (!matcher.matches()) {
187         throw new RuntimeException("Strange task ID: " + taskId);
188       }
189       shortTaskId = matcher.group(1);
190 
191       rowsWritten = context.getCounter(Counters.ROWS_WRITTEN);
192       refsWritten = context.getCounter(Counters.REFERENCES_WRITTEN);
193     }
194 
195     @Override
196     public void cleanup(Context context) throws IOException {
197       table.flushCommits();
198       table.close();
199     }
200 
201     @Override
202     protected void map(NullWritable key, NullWritable value,
203         Context context) throws IOException, InterruptedException {
204 
205       String suffix = "/" + shortTaskId;
206       byte[] row = Bytes.add(new byte[8], Bytes.toBytes(suffix));
207 
208       int BLOCK_SIZE = (int)(recordsToWrite / 100);
209 
210       for (long i = 0; i < recordsToWrite;) {
211         long blockStart = i;
212         for (long idxInBlock = 0;
213              idxInBlock < BLOCK_SIZE && i < recordsToWrite;
214              idxInBlock++, i++) {
215 
216           long byteSwapped = swapLong(i);
217           Bytes.putLong(row, 0, byteSwapped);
218 
219           Put p = new Put(row);
220           p.add(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY);
221           if (blockStart > 0) {
222             for (int j = 0; j < numBackReferencesPerRow; j++) {
223               long referredRow = blockStart - BLOCK_SIZE + rand.nextInt(BLOCK_SIZE);
224               Bytes.putLong(row, 0, swapLong(referredRow));
225               p.add(TEST_FAMILY, row, HConstants.EMPTY_BYTE_ARRAY);
226             }
227             refsWritten.increment(1);
228           }
229           rowsWritten.increment(1);
230           table.put(p);
231 
232           if (i % 100 == 0) {
233             context.setStatus("Written " + i + "/" + recordsToWrite + " records");
234             context.progress();
235           }
236         }
237         // End of block, flush all of them before we start writing anything
238         // pointing to these!
239         table.flushCommits();
240       }
241     }
242   }
243 
244   public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
245     static final BytesWritable EMPTY = new BytesWritable(HConstants.EMPTY_BYTE_ARRAY);
246 
247 
248     @Override
249     protected void map(ImmutableBytesWritable key, Result value, Context context)
250         throws IOException, InterruptedException {
251       BytesWritable bwKey = new BytesWritable(key.get());
252       BytesWritable bwVal = new BytesWritable();
253       for (Cell kv : value.listCells()) {
254         if (Bytes.compareTo(TEST_QUALIFIER, 0, TEST_QUALIFIER.length,
255                             kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()) == 0) {
256           context.write(bwKey, EMPTY);
257         } else {
258           bwVal.set(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength());
259           context.write(bwVal, bwKey);
260         }
261       }
262     }
263   }
264 
265   public static class VerifyReducer extends Reducer<BytesWritable, BytesWritable, Text, Text> {
266     private static final Log LOG = LogFactory.getLog(VerifyReducer.class);
267     private Counter refsChecked;
268     private Counter rowsWritten;
269 
270     @Override
271     public void setup(Context context) throws IOException {
272       refsChecked = context.getCounter(Counters.REFERENCES_CHECKED);
273       rowsWritten = context.getCounter(Counters.ROWS_WRITTEN);
274     }
275 
276     @Override
277     protected void reduce(BytesWritable referredRow, Iterable<BytesWritable> referrers,
278         VerifyReducer.Context ctx) throws IOException, InterruptedException {
279       boolean gotOriginalRow = false;
280       int refCount = 0;
281 
282       for (BytesWritable ref : referrers) {
283         if (ref.getLength() == 0) {
284           assert !gotOriginalRow;
285           gotOriginalRow = true;
286         } else {
287           refCount++;
288         }
289       }
290       refsChecked.increment(refCount);
291 
292       if (!gotOriginalRow) {
293         String parsedRow = makeRowReadable(referredRow.getBytes(), referredRow.getLength());
294         String binRow = Bytes.toStringBinary(referredRow.getBytes(), 0, referredRow.getLength());
295         LOG.error("Reference error row " + parsedRow);
296         ctx.write(new Text(binRow), new Text(parsedRow));
297         rowsWritten.increment(1);
298       }
299     }
300 
301     private String makeRowReadable(byte[] bytes, int length) {
302       long rowIdx = swapLong(Bytes.toLong(bytes, 0));
303       String suffix = Bytes.toString(bytes, 8, length - 8);
304 
305       return "Row #" + rowIdx + " suffix " + suffix;
306     }
307   }
308 
309   protected Job doLoad(Configuration conf, HTableDescriptor htd) throws Exception {
310     Path outputDir = getTestDir(TEST_NAME, "load-output");
311 
312     NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT));
313     conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString());
314 
315     Job job = new Job(conf);
316     job.setJobName(TEST_NAME + " Load for " + htd.getTableName());
317     job.setJarByClass(this.getClass());
318     setMapperClass(job);
319     job.setInputFormatClass(NMapInputFormat.class);
320     job.setNumReduceTasks(0);
321     setJobScannerConf(job);
322     FileOutputFormat.setOutputPath(job, outputDir);
323 
324     TableMapReduceUtil.addDependencyJars(job);
325 
326     TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
327     TableMapReduceUtil.initCredentials(job);
328     assertTrue(job.waitForCompletion(true));
329     return job;
330   }
331 
332   protected void setMapperClass(Job job) {
333     job.setMapperClass(LoadMapper.class);
334   }
335 
336   protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception {
337     Path outputDir = getTestDir(TEST_NAME, "verify-output");
338 
339     Job job = new Job(conf);
340     job.setJarByClass(this.getClass());
341     job.setJobName(TEST_NAME + " Verification for " + htd.getTableName());
342     setJobScannerConf(job);
343 
344     Scan scan = new Scan();
345 
346     TableMapReduceUtil.initTableMapperJob(
347         htd.getTableName().getNameAsString(), scan, VerifyMapper.class,
348         BytesWritable.class, BytesWritable.class, job);
349     TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
350     int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING);
351     TableMapReduceUtil.setScannerCaching(job, scannerCaching);
352 
353     job.setReducerClass(VerifyReducer.class);
354     job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT));
355     FileOutputFormat.setOutputPath(job, outputDir);
356     assertTrue(job.waitForCompletion(true));
357 
358     long numOutputRecords = job.getCounters().findCounter(Counters.ROWS_WRITTEN).getValue();
359     assertEquals(0, numOutputRecords);
360   }
361 
362   private static void setJobScannerConf(Job job) {
363     // Make sure scanners log something useful to make debugging possible.
364     job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
365     long lpr = job.getConfiguration().getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT) / 100;
366     job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, (int)lpr);
367   }
368 
369   public Path getTestDir(String testName, String subdir) throws IOException {
370     //HBaseTestingUtility.getDataTestDirOnTestFs() has not been backported.
371     FileSystem fs = FileSystem.get(getConf());
372     Path base = new Path(fs.getWorkingDirectory(), "test-data");
373     String randomStr = UUID.randomUUID().toString();
374     Path testDir = new Path(base, randomStr);
375     fs.deleteOnExit(testDir);
376 
377     return new Path(new Path(testDir, testName), subdir);
378   }
379 
380   @Test
381   public void testLoadAndVerify() throws Exception {
382     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TEST_NAME));
383     htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
384 
385     HBaseAdmin admin = getTestingUtil(getConf()).getHBaseAdmin();
386     admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), 40);
387 
388     doLoad(getConf(), htd);
389     doVerify(getConf(), htd);
390 
391     // Only disable and drop if we succeeded to verify - otherwise it's useful
392     // to leave it around for post-mortem
393     getTestingUtil(getConf()).deleteTable(htd.getName());
394   }
395 
396   public void usage() {
397     System.err.println(this.getClass().getSimpleName() + " [-Doptions] <load|verify|loadAndVerify>");
398     System.err.println("  Loads a table with row dependencies and verifies the dependency chains");
399     System.err.println("Options");
400     System.err.println("  -Dloadmapper.table=<name>        Table to write/verify (default autogen)");
401     System.err.println("  -Dloadmapper.backrefs=<n>        Number of backreferences per row (default 50)");
402     System.err.println("  -Dloadmapper.num_to_write=<n>    Number of rows per mapper (default 100,000 per mapper)");
403     System.err.println("  -Dloadmapper.deleteAfter=<bool>  Delete after a successful verify (default true)");
404     System.err.println("  -Dloadmapper.numPresplits=<n>    Number of presplit regions to start with (default 40)");
405     System.err.println("  -Dloadmapper.map.tasks=<n>       Number of map tasks for load (default 200)");
406     System.err.println("  -Dverify.reduce.tasks=<n>        Number of reduce tasks for verify (default 35)");
407     System.err.println("  -Dverify.scannercaching=<n>      Number hbase scanner caching rows to read (default 50)");
408   }
409 
410 
411   @Override
412   protected void processOptions(CommandLine cmd) {
413     super.processOptions(cmd);
414 
415     String[] args = cmd.getArgs();
416     if (args == null || args.length < 1 || args.length > 1) {
417       usage();
418       throw new RuntimeException("Incorrect Number of args.");
419     }
420     toRun = args[0];
421   }
422 
423   public int runTestFromCommandLine() throws Exception {
424     IntegrationTestingUtility.setUseDistributedCluster(getConf());
425     boolean doLoad = false;
426     boolean doVerify = false;
427     boolean doDelete = getConf().getBoolean("loadmapper.deleteAfter",true);
428     int numPresplits = getConf().getInt("loadmapper.numPresplits", 40);
429 
430     if (toRun.equals("load")) {
431       doLoad = true;
432     } else if (toRun.equals("verify")) {
433       doVerify= true;
434     } else if (toRun.equals("loadAndVerify")) {
435       doLoad=true;
436       doVerify= true;
437     } else {
438       System.err.println("Invalid argument " + toRun);
439       usage();
440       return 1;
441     }
442 
443     // create HTableDescriptor for specified table
444     String table = getTablename();
445     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
446     htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
447 
448     HBaseAdmin admin = new HBaseAdmin(getConf());
449     if (doLoad) {
450       admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits);
451       doLoad(getConf(), htd);
452     }
453     if (doVerify) {
454       doVerify(getConf(), htd);
455       if (doDelete) {
456         getTestingUtil(getConf()).deleteTable(htd.getName());
457       }
458     }
459     return 0;
460   }
461 
462   @Override
463   public String getTablename() {
464     return getConf().get(TABLE_NAME_KEY, TEST_NAME);
465   }
466 
467   @Override
468   protected Set<String> getColumnFamilies() {
469     return Sets.newHashSet(Bytes.toString(TEST_FAMILY));
470   }
471 
472   public static void main(String argv[]) throws Exception {
473     Configuration conf = HBaseConfiguration.create();
474     IntegrationTestingUtility.setUseDistributedCluster(conf);
475     int ret = ToolRunner.run(conf, new IntegrationTestLoadAndVerify(), argv);
476     System.exit(ret);
477   }
478 }