1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.test;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.util.Random;
25 import java.util.Set;
26 import java.util.UUID;
27 import java.util.regex.Matcher;
28 import java.util.regex.Pattern;
29
30 import org.apache.commons.cli.CommandLine;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.hbase.HColumnDescriptor;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.HTableDescriptor;
41 import org.apache.hadoop.hbase.IntegrationTestBase;
42 import org.apache.hadoop.hbase.IntegrationTestingUtility;
43 import org.apache.hadoop.hbase.IntegrationTests;
44 import org.apache.hadoop.hbase.TableName;
45 import org.apache.hadoop.hbase.client.HBaseAdmin;
46 import org.apache.hadoop.hbase.client.HTable;
47 import org.apache.hadoop.hbase.client.Put;
48 import org.apache.hadoop.hbase.client.Result;
49 import org.apache.hadoop.hbase.client.Scan;
50 import org.apache.hadoop.hbase.client.ScannerCallable;
51 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
52 import org.apache.hadoop.hbase.mapreduce.NMapInputFormat;
53 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
54 import org.apache.hadoop.hbase.mapreduce.TableMapper;
55 import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
56 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
57 import org.apache.hadoop.hbase.util.Bytes;
58 import org.apache.hadoop.io.BytesWritable;
59 import org.apache.hadoop.io.NullWritable;
60 import org.apache.hadoop.io.Text;
61 import org.apache.hadoop.mapreduce.Counter;
62 import org.apache.hadoop.mapreduce.Job;
63 import org.apache.hadoop.mapreduce.Mapper;
64 import org.apache.hadoop.mapreduce.Reducer;
65 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
66 import org.apache.hadoop.util.ToolRunner;
67 import org.junit.Test;
68 import org.junit.experimental.categories.Category;
69
70 import com.google.common.collect.Sets;
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 @Category(IntegrationTests.class)
90 public class IntegrationTestLoadAndVerify extends IntegrationTestBase {
91 private static final String TEST_NAME = "IntegrationTestLoadAndVerify";
92 private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
93 private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1");
94
95 private static final String NUM_TO_WRITE_KEY =
96 "loadmapper.num_to_write";
97 private static final long NUM_TO_WRITE_DEFAULT = 100*1000;
98
99 private static final String TABLE_NAME_KEY = "loadmapper.table";
100 private static final String TABLE_NAME_DEFAULT = "table";
101
102 private static final String NUM_BACKREFS_KEY = "loadmapper.backrefs";
103 private static final int NUM_BACKREFS_DEFAULT = 50;
104
105 private static final String NUM_MAP_TASKS_KEY = "loadmapper.map.tasks";
106 private static final String NUM_REDUCE_TASKS_KEY = "verify.reduce.tasks";
107 private static final int NUM_MAP_TASKS_DEFAULT = 200;
108 private static final int NUM_REDUCE_TASKS_DEFAULT = 35;
109
110 private static final int SCANNER_CACHING = 500;
111
112 protected IntegrationTestingUtility util;
113
114 private String toRun = null;
115
116 private enum Counters {
117 ROWS_WRITTEN,
118 REFERENCES_WRITTEN,
119 REFERENCES_CHECKED
120 }
121
122 public void setUpCluster() throws Exception {
123 util = getTestingUtil(getConf());
124 util.initializeCluster(3);
125 this.setConf(util.getConfiguration());
126 getConf().setLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT / 100);
127 getConf().setInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT / 100);
128 getConf().setInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT / 10);
129 if (!util.isDistributedCluster()) {
130 util.startMiniMapReduceCluster();
131 }
132 }
133
134 @Override
135 public void cleanUpCluster() throws Exception {
136 super.cleanUpCluster();
137 if (!util.isDistributedCluster()) {
138 util.shutdownMiniMapReduceCluster();
139 }
140 }
141
142
143
144
145
146
147
148 public static long swapLong(long value)
149 {
150 return
151 ( ( ( value >> 0 ) & 0xff ) << 56 ) +
152 ( ( ( value >> 8 ) & 0xff ) << 48 ) +
153 ( ( ( value >> 16 ) & 0xff ) << 40 ) +
154 ( ( ( value >> 24 ) & 0xff ) << 32 ) +
155 ( ( ( value >> 32 ) & 0xff ) << 24 ) +
156 ( ( ( value >> 40 ) & 0xff ) << 16 ) +
157 ( ( ( value >> 48 ) & 0xff ) << 8 ) +
158 ( ( ( value >> 56 ) & 0xff ) << 0 );
159 }
160
161 public static class LoadMapper
162 extends Mapper<NullWritable, NullWritable, NullWritable, NullWritable>
163 {
164 protected long recordsToWrite;
165 protected HTable table;
166 protected Configuration conf;
167 protected int numBackReferencesPerRow;
168 protected String shortTaskId;
169
170 protected Random rand = new Random();
171
172 protected Counter rowsWritten, refsWritten;
173
174 @Override
175 public void setup(Context context) throws IOException {
176 conf = context.getConfiguration();
177 recordsToWrite = conf.getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT);
178 String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT);
179 numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT);
180 table = new HTable(conf, tableName);
181 table.setWriteBufferSize(4*1024*1024);
182 table.setAutoFlush(false, true);
183
184 String taskId = conf.get("mapred.task.id");
185 Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId);
186 if (!matcher.matches()) {
187 throw new RuntimeException("Strange task ID: " + taskId);
188 }
189 shortTaskId = matcher.group(1);
190
191 rowsWritten = context.getCounter(Counters.ROWS_WRITTEN);
192 refsWritten = context.getCounter(Counters.REFERENCES_WRITTEN);
193 }
194
195 @Override
196 public void cleanup(Context context) throws IOException {
197 table.flushCommits();
198 table.close();
199 }
200
201 @Override
202 protected void map(NullWritable key, NullWritable value,
203 Context context) throws IOException, InterruptedException {
204
205 String suffix = "/" + shortTaskId;
206 byte[] row = Bytes.add(new byte[8], Bytes.toBytes(suffix));
207
208 int BLOCK_SIZE = (int)(recordsToWrite / 100);
209
210 for (long i = 0; i < recordsToWrite;) {
211 long blockStart = i;
212 for (long idxInBlock = 0;
213 idxInBlock < BLOCK_SIZE && i < recordsToWrite;
214 idxInBlock++, i++) {
215
216 long byteSwapped = swapLong(i);
217 Bytes.putLong(row, 0, byteSwapped);
218
219 Put p = new Put(row);
220 p.add(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY);
221 if (blockStart > 0) {
222 for (int j = 0; j < numBackReferencesPerRow; j++) {
223 long referredRow = blockStart - BLOCK_SIZE + rand.nextInt(BLOCK_SIZE);
224 Bytes.putLong(row, 0, swapLong(referredRow));
225 p.add(TEST_FAMILY, row, HConstants.EMPTY_BYTE_ARRAY);
226 }
227 refsWritten.increment(1);
228 }
229 rowsWritten.increment(1);
230 table.put(p);
231
232 if (i % 100 == 0) {
233 context.setStatus("Written " + i + "/" + recordsToWrite + " records");
234 context.progress();
235 }
236 }
237
238
239 table.flushCommits();
240 }
241 }
242 }
243
244 public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
245 static final BytesWritable EMPTY = new BytesWritable(HConstants.EMPTY_BYTE_ARRAY);
246
247
248 @Override
249 protected void map(ImmutableBytesWritable key, Result value, Context context)
250 throws IOException, InterruptedException {
251 BytesWritable bwKey = new BytesWritable(key.get());
252 BytesWritable bwVal = new BytesWritable();
253 for (Cell kv : value.listCells()) {
254 if (Bytes.compareTo(TEST_QUALIFIER, 0, TEST_QUALIFIER.length,
255 kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()) == 0) {
256 context.write(bwKey, EMPTY);
257 } else {
258 bwVal.set(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength());
259 context.write(bwVal, bwKey);
260 }
261 }
262 }
263 }
264
265 public static class VerifyReducer extends Reducer<BytesWritable, BytesWritable, Text, Text> {
266 private static final Log LOG = LogFactory.getLog(VerifyReducer.class);
267 private Counter refsChecked;
268 private Counter rowsWritten;
269
270 @Override
271 public void setup(Context context) throws IOException {
272 refsChecked = context.getCounter(Counters.REFERENCES_CHECKED);
273 rowsWritten = context.getCounter(Counters.ROWS_WRITTEN);
274 }
275
276 @Override
277 protected void reduce(BytesWritable referredRow, Iterable<BytesWritable> referrers,
278 VerifyReducer.Context ctx) throws IOException, InterruptedException {
279 boolean gotOriginalRow = false;
280 int refCount = 0;
281
282 for (BytesWritable ref : referrers) {
283 if (ref.getLength() == 0) {
284 assert !gotOriginalRow;
285 gotOriginalRow = true;
286 } else {
287 refCount++;
288 }
289 }
290 refsChecked.increment(refCount);
291
292 if (!gotOriginalRow) {
293 String parsedRow = makeRowReadable(referredRow.getBytes(), referredRow.getLength());
294 String binRow = Bytes.toStringBinary(referredRow.getBytes(), 0, referredRow.getLength());
295 LOG.error("Reference error row " + parsedRow);
296 ctx.write(new Text(binRow), new Text(parsedRow));
297 rowsWritten.increment(1);
298 }
299 }
300
301 private String makeRowReadable(byte[] bytes, int length) {
302 long rowIdx = swapLong(Bytes.toLong(bytes, 0));
303 String suffix = Bytes.toString(bytes, 8, length - 8);
304
305 return "Row #" + rowIdx + " suffix " + suffix;
306 }
307 }
308
309 protected Job doLoad(Configuration conf, HTableDescriptor htd) throws Exception {
310 Path outputDir = getTestDir(TEST_NAME, "load-output");
311
312 NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT));
313 conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString());
314
315 Job job = new Job(conf);
316 job.setJobName(TEST_NAME + " Load for " + htd.getTableName());
317 job.setJarByClass(this.getClass());
318 setMapperClass(job);
319 job.setInputFormatClass(NMapInputFormat.class);
320 job.setNumReduceTasks(0);
321 setJobScannerConf(job);
322 FileOutputFormat.setOutputPath(job, outputDir);
323
324 TableMapReduceUtil.addDependencyJars(job);
325
326 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
327 TableMapReduceUtil.initCredentials(job);
328 assertTrue(job.waitForCompletion(true));
329 return job;
330 }
331
332 protected void setMapperClass(Job job) {
333 job.setMapperClass(LoadMapper.class);
334 }
335
336 protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception {
337 Path outputDir = getTestDir(TEST_NAME, "verify-output");
338
339 Job job = new Job(conf);
340 job.setJarByClass(this.getClass());
341 job.setJobName(TEST_NAME + " Verification for " + htd.getTableName());
342 setJobScannerConf(job);
343
344 Scan scan = new Scan();
345
346 TableMapReduceUtil.initTableMapperJob(
347 htd.getTableName().getNameAsString(), scan, VerifyMapper.class,
348 BytesWritable.class, BytesWritable.class, job);
349 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
350 int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING);
351 TableMapReduceUtil.setScannerCaching(job, scannerCaching);
352
353 job.setReducerClass(VerifyReducer.class);
354 job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT));
355 FileOutputFormat.setOutputPath(job, outputDir);
356 assertTrue(job.waitForCompletion(true));
357
358 long numOutputRecords = job.getCounters().findCounter(Counters.ROWS_WRITTEN).getValue();
359 assertEquals(0, numOutputRecords);
360 }
361
362 private static void setJobScannerConf(Job job) {
363
364 job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
365 long lpr = job.getConfiguration().getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT) / 100;
366 job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, (int)lpr);
367 }
368
369 public Path getTestDir(String testName, String subdir) throws IOException {
370
371 FileSystem fs = FileSystem.get(getConf());
372 Path base = new Path(fs.getWorkingDirectory(), "test-data");
373 String randomStr = UUID.randomUUID().toString();
374 Path testDir = new Path(base, randomStr);
375 fs.deleteOnExit(testDir);
376
377 return new Path(new Path(testDir, testName), subdir);
378 }
379
380 @Test
381 public void testLoadAndVerify() throws Exception {
382 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TEST_NAME));
383 htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
384
385 HBaseAdmin admin = getTestingUtil(getConf()).getHBaseAdmin();
386 admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), 40);
387
388 doLoad(getConf(), htd);
389 doVerify(getConf(), htd);
390
391
392
393 getTestingUtil(getConf()).deleteTable(htd.getName());
394 }
395
396 public void usage() {
397 System.err.println(this.getClass().getSimpleName() + " [-Doptions] <load|verify|loadAndVerify>");
398 System.err.println(" Loads a table with row dependencies and verifies the dependency chains");
399 System.err.println("Options");
400 System.err.println(" -Dloadmapper.table=<name> Table to write/verify (default autogen)");
401 System.err.println(" -Dloadmapper.backrefs=<n> Number of backreferences per row (default 50)");
402 System.err.println(" -Dloadmapper.num_to_write=<n> Number of rows per mapper (default 100,000 per mapper)");
403 System.err.println(" -Dloadmapper.deleteAfter=<bool> Delete after a successful verify (default true)");
404 System.err.println(" -Dloadmapper.numPresplits=<n> Number of presplit regions to start with (default 40)");
405 System.err.println(" -Dloadmapper.map.tasks=<n> Number of map tasks for load (default 200)");
406 System.err.println(" -Dverify.reduce.tasks=<n> Number of reduce tasks for verify (default 35)");
407 System.err.println(" -Dverify.scannercaching=<n> Number hbase scanner caching rows to read (default 50)");
408 }
409
410
411 @Override
412 protected void processOptions(CommandLine cmd) {
413 super.processOptions(cmd);
414
415 String[] args = cmd.getArgs();
416 if (args == null || args.length < 1 || args.length > 1) {
417 usage();
418 throw new RuntimeException("Incorrect Number of args.");
419 }
420 toRun = args[0];
421 }
422
423 public int runTestFromCommandLine() throws Exception {
424 IntegrationTestingUtility.setUseDistributedCluster(getConf());
425 boolean doLoad = false;
426 boolean doVerify = false;
427 boolean doDelete = getConf().getBoolean("loadmapper.deleteAfter",true);
428 int numPresplits = getConf().getInt("loadmapper.numPresplits", 40);
429
430 if (toRun.equals("load")) {
431 doLoad = true;
432 } else if (toRun.equals("verify")) {
433 doVerify= true;
434 } else if (toRun.equals("loadAndVerify")) {
435 doLoad=true;
436 doVerify= true;
437 } else {
438 System.err.println("Invalid argument " + toRun);
439 usage();
440 return 1;
441 }
442
443
444 String table = getTablename();
445 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
446 htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
447
448 HBaseAdmin admin = new HBaseAdmin(getConf());
449 if (doLoad) {
450 admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits);
451 doLoad(getConf(), htd);
452 }
453 if (doVerify) {
454 doVerify(getConf(), htd);
455 if (doDelete) {
456 getTestingUtil(getConf()).deleteTable(htd.getName());
457 }
458 }
459 return 0;
460 }
461
462 @Override
463 public String getTablename() {
464 return getConf().get(TABLE_NAME_KEY, TEST_NAME);
465 }
466
467 @Override
468 protected Set<String> getColumnFamilies() {
469 return Sets.newHashSet(Bytes.toString(TEST_FAMILY));
470 }
471
472 public static void main(String argv[]) throws Exception {
473 Configuration conf = HBaseConfiguration.create();
474 IntegrationTestingUtility.setUseDistributedCluster(conf);
475 int ret = ToolRunner.run(conf, new IntegrationTestLoadAndVerify(), argv);
476 System.exit(ret);
477 }
478 }