1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertNotSame;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
27
28 import java.io.IOException;
29 import java.util.Arrays;
30 import java.util.HashMap;
31 import java.util.Map;
32 import java.util.Map.Entry;
33 import java.util.Random;
34 import java.util.Set;
35 import java.util.concurrent.Callable;
36
37 import junit.framework.Assert;
38
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.fs.FileStatus;
43 import org.apache.hadoop.fs.FileSystem;
44 import org.apache.hadoop.fs.Path;
45 import org.apache.hadoop.hbase.CategoryBasedTimeout;
46 import org.apache.hadoop.hbase.Cell;
47 import org.apache.hadoop.hbase.CellUtil;
48 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
49 import org.apache.hadoop.hbase.HBaseConfiguration;
50 import org.apache.hadoop.hbase.HBaseTestingUtility;
51 import org.apache.hadoop.hbase.HColumnDescriptor;
52 import org.apache.hadoop.hbase.HConstants;
53 import org.apache.hadoop.hbase.HTableDescriptor;
54 import org.apache.hadoop.hbase.HadoopShims;
55 import org.apache.hadoop.hbase.KeyValue;
56 import org.apache.hadoop.hbase.PerformanceEvaluation;
57 import org.apache.hadoop.hbase.TableName;
58 import org.apache.hadoop.hbase.client.HBaseAdmin;
59 import org.apache.hadoop.hbase.client.HTable;
60 import org.apache.hadoop.hbase.client.Put;
61 import org.apache.hadoop.hbase.client.RegionLocator;
62 import org.apache.hadoop.hbase.client.Result;
63 import org.apache.hadoop.hbase.client.ResultScanner;
64 import org.apache.hadoop.hbase.client.Scan;
65 import org.apache.hadoop.hbase.client.Table;
66 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
67 import org.apache.hadoop.hbase.io.compress.Compression;
68 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
69 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
70 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
71 import org.apache.hadoop.hbase.io.hfile.HFile;
72 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
73 import org.apache.hadoop.hbase.regionserver.BloomType;
74 import org.apache.hadoop.hbase.regionserver.HStore;
75 import org.apache.hadoop.hbase.regionserver.StoreFile;
76 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
77 import org.apache.hadoop.hbase.testclassification.LargeTests;
78 import org.apache.hadoop.hbase.util.Bytes;
79 import org.apache.hadoop.hbase.util.FSUtils;
80 import org.apache.hadoop.hbase.util.Threads;
81 import org.apache.hadoop.hbase.util.Writables;
82 import org.apache.hadoop.io.NullWritable;
83 import org.apache.hadoop.mapreduce.Job;
84 import org.apache.hadoop.mapreduce.Mapper;
85 import org.apache.hadoop.mapreduce.RecordWriter;
86 import org.apache.hadoop.mapreduce.TaskAttemptContext;
87 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
88 import org.junit.Ignore;
89 import org.junit.Rule;
90 import org.junit.Test;
91 import org.junit.experimental.categories.Category;
92 import org.junit.rules.TestRule;
93 import org.mockito.Mockito;
94
95
96
97
98
99
100
101 @Category(LargeTests.class)
102 public class TestHFileOutputFormat {
103 @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
104 withTimeout(this.getClass()).withLookingForStuckThread(true).build();
105 private final static int ROWSPERSPLIT = 1024;
106
107 private static final byte[][] FAMILIES
108 = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
109 , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
110 private static final TableName TABLE_NAME =
111 TableName.valueOf("TestTable");
112
113 private HBaseTestingUtility util = new HBaseTestingUtility();
114
115 private static final Log LOG = LogFactory.getLog(TestHFileOutputFormat.class);
116
117
118
119
120 static class RandomKVGeneratingMapper extends
121 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, KeyValue> {
122
123 private int keyLength;
124 private static final int KEYLEN_DEFAULT = 10;
125 private static final String KEYLEN_CONF = "randomkv.key.length";
126
127 private int valLength;
128 private static final int VALLEN_DEFAULT=10;
129 private static final String VALLEN_CONF="randomkv.val.length";
130 private static final byte [] QUALIFIER = Bytes.toBytes("data");
131
132 @Override
133 protected void setup(Context context) throws IOException,
134 InterruptedException {
135 super.setup(context);
136
137 Configuration conf = context.getConfiguration();
138 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
139 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
140 }
141
142 protected void map(
143 NullWritable n1, NullWritable n2,
144 Mapper<NullWritable, NullWritable,
145 ImmutableBytesWritable,KeyValue>.Context context)
146 throws java.io.IOException ,InterruptedException
147 {
148
149 byte keyBytes[] = new byte[keyLength];
150 byte valBytes[] = new byte[valLength];
151
152 int taskId = context.getTaskAttemptID().getTaskID().getId();
153 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
154
155 Random random = new Random();
156 for (int i = 0; i < ROWSPERSPLIT; i++) {
157
158 random.nextBytes(keyBytes);
159
160 keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
161 random.nextBytes(valBytes);
162 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
163
164 for (byte[] family : TestHFileOutputFormat.FAMILIES) {
165 KeyValue kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
166 context.write(key, kv);
167 }
168 }
169 }
170 }
171
172 private void setupRandomGeneratorMapper(Job job) {
173 job.setInputFormatClass(NMapInputFormat.class);
174 job.setMapperClass(RandomKVGeneratingMapper.class);
175 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
176 job.setMapOutputValueClass(KeyValue.class);
177 }
178
179
180
181
182
183
184 @Test
185 public void test_LATEST_TIMESTAMP_isReplaced()
186 throws Exception {
187 Configuration conf = new Configuration(this.util.getConfiguration());
188 RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
189 TaskAttemptContext context = null;
190 Path dir =
191 util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
192 try {
193 Job job = new Job(conf);
194 FileOutputFormat.setOutputPath(job, dir);
195 context = createTestTaskAttemptContext(job);
196 HFileOutputFormat hof = new HFileOutputFormat();
197 writer = hof.getRecordWriter(context);
198 final byte [] b = Bytes.toBytes("b");
199
200
201
202 KeyValue kv = new KeyValue(b, b, b);
203 KeyValue original = kv.clone();
204 writer.write(new ImmutableBytesWritable(), kv);
205 assertFalse(original.equals(kv));
206 assertTrue(Bytes.equals(original.getRow(), kv.getRow()));
207 assertTrue(CellUtil.matchingColumn(original, kv.getFamily(), kv.getQualifier()));
208 assertNotSame(original.getTimestamp(), kv.getTimestamp());
209 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
210
211
212
213 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
214 original = kv.clone();
215 writer.write(new ImmutableBytesWritable(), kv);
216 assertTrue(original.equals(kv));
217 } finally {
218 if (writer != null && context != null) writer.close(context);
219 dir.getFileSystem(conf).delete(dir, true);
220 }
221 }
222
223 private TaskAttemptContext createTestTaskAttemptContext(final Job job)
224 throws IOException, Exception {
225 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
226 TaskAttemptContext context = hadoop.createTestTaskAttemptContext(job, "attempt_200707121733_0001_m_000000_0");
227 return context;
228 }
229
230
231
232
233
234 @Test
235 public void test_TIMERANGE() throws Exception {
236 Configuration conf = new Configuration(this.util.getConfiguration());
237 RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
238 TaskAttemptContext context = null;
239 Path dir =
240 util.getDataTestDir("test_TIMERANGE_present");
241 LOG.info("Timerange dir writing to dir: "+ dir);
242 try {
243
244 Job job = new Job(conf);
245 FileOutputFormat.setOutputPath(job, dir);
246 context = createTestTaskAttemptContext(job);
247 HFileOutputFormat hof = new HFileOutputFormat();
248 writer = hof.getRecordWriter(context);
249
250
251 final byte [] b = Bytes.toBytes("b");
252
253
254 KeyValue kv = new KeyValue(b, b, b, 2000, b);
255 KeyValue original = kv.clone();
256 writer.write(new ImmutableBytesWritable(), kv);
257 assertEquals(original,kv);
258
259
260 kv = new KeyValue(b, b, b, 1000, b);
261 original = kv.clone();
262 writer.write(new ImmutableBytesWritable(), kv);
263 assertEquals(original, kv);
264
265
266 writer.close(context);
267
268
269
270
271 FileSystem fs = FileSystem.get(conf);
272 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
273 FileStatus[] sub1 = fs.listStatus(attemptDirectory);
274 FileStatus[] file = fs.listStatus(sub1[0].getPath());
275
276
277 HFile.Reader rd = HFile.createReader(fs, file[0].getPath(),
278 new CacheConfig(conf), conf);
279 Map<byte[],byte[]> finfo = rd.loadFileInfo();
280 byte[] range = finfo.get("TIMERANGE".getBytes());
281 assertNotNull(range);
282
283
284 TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
285 Writables.copyWritable(range, timeRangeTracker);
286 LOG.info(timeRangeTracker.getMinimumTimestamp() +
287 "...." + timeRangeTracker.getMaximumTimestamp());
288 assertEquals(1000, timeRangeTracker.getMinimumTimestamp());
289 assertEquals(2000, timeRangeTracker.getMaximumTimestamp());
290 rd.close();
291 } finally {
292 if (writer != null && context != null) writer.close(context);
293 dir.getFileSystem(conf).delete(dir, true);
294 }
295 }
296
297
298
299
300 @Test
301 public void testWritingPEData() throws Exception {
302 Configuration conf = util.getConfiguration();
303 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
304 FileSystem fs = testDir.getFileSystem(conf);
305
306
307 conf.setInt("mapreduce.task.io.sort.mb", 20);
308
309 conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
310
311 Job job = new Job(conf, "testWritingPEData");
312 setupRandomGeneratorMapper(job);
313
314
315 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
316 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
317
318 Arrays.fill(startKey, (byte)0);
319 Arrays.fill(endKey, (byte)0xff);
320
321 job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
322
323 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
324 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
325 job.setReducerClass(KeyValueSortReducer.class);
326 job.setOutputFormatClass(HFileOutputFormat.class);
327 job.setNumReduceTasks(4);
328 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
329 MutationSerialization.class.getName(), ResultSerialization.class.getName(),
330 KeyValueSerialization.class.getName());
331
332 FileOutputFormat.setOutputPath(job, testDir);
333 assertTrue(job.waitForCompletion(false));
334 FileStatus [] files = fs.listStatus(testDir);
335 assertTrue(files.length > 0);
336 }
337
338 @Test
339 public void testJobConfiguration() throws Exception {
340 Configuration conf = new Configuration(this.util.getConfiguration());
341 conf.set("hbase.fs.tmp.dir", util.getDataTestDir("testJobConfiguration").toString());
342 Job job = new Job(conf);
343 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
344 HTableDescriptor tableDescriptor = Mockito.mock(HTableDescriptor.class);
345 RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
346 setupMockStartKeys(regionLocator);
347 HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator);
348 assertEquals(job.getNumReduceTasks(), 4);
349 }
350
351 private byte [][] generateRandomStartKeys(int numKeys) {
352 Random random = new Random();
353 byte[][] ret = new byte[numKeys][];
354
355 ret[0] = HConstants.EMPTY_BYTE_ARRAY;
356 for (int i = 1; i < numKeys; i++) {
357 ret[i] =
358 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
359 }
360 return ret;
361 }
362
363 private byte[][] generateRandomSplitKeys(int numKeys) {
364 Random random = new Random();
365 byte[][] ret = new byte[numKeys][];
366 for (int i = 0; i < numKeys; i++) {
367 ret[i] =
368 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
369 }
370 return ret;
371 }
372
373 @Test
374 public void testMRIncrementalLoad() throws Exception {
375 LOG.info("\nStarting test testMRIncrementalLoad\n");
376 doIncrementalLoadTest(false);
377 }
378
379 @Test
380 public void testMRIncrementalLoadWithSplit() throws Exception {
381 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
382 doIncrementalLoadTest(true);
383 }
384
385 private void doIncrementalLoadTest(
386 boolean shouldChangeRegions) throws Exception {
387 util = new HBaseTestingUtility();
388 Configuration conf = util.getConfiguration();
389 byte[][] splitKeys = generateRandomSplitKeys(4);
390 HBaseAdmin admin = null;
391 try {
392 util.setJobWithoutMRCluster();
393 util.startMiniCluster();
394 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
395 admin = util.getHBaseAdmin();
396 HTable table = util.createTable(TABLE_NAME, FAMILIES, splitKeys);
397 assertEquals("Should start with empty table",
398 0, util.countRows(table));
399 int numRegions = -1;
400 try(RegionLocator r = table.getRegionLocator()) {
401 numRegions = r.getStartKeys().length;
402 }
403 assertEquals("Should make 5 regions", numRegions, 5);
404
405
406 runIncrementalPELoad(conf, table, testDir);
407
408 assertEquals("HFOF should not touch actual table",
409 0, util.countRows(table));
410
411
412
413 int dir = 0;
414 for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
415 for (byte[] family : FAMILIES) {
416 if (Bytes.toString(family).equals(f.getPath().getName())) {
417 ++dir;
418 }
419 }
420 }
421 assertEquals("Column family not found in FS.", FAMILIES.length, dir);
422
423
424 if (shouldChangeRegions) {
425 LOG.info("Changing regions in table");
426 admin.disableTable(table.getTableName());
427 while(util.getMiniHBaseCluster().getMaster().getAssignmentManager().
428 getRegionStates().isRegionsInTransition()) {
429 Threads.sleep(200);
430 LOG.info("Waiting on table to finish disabling");
431 }
432 util.deleteTable(table.getName());
433 byte[][] newSplitKeys = generateRandomSplitKeys(14);
434 table = util.createTable(TABLE_NAME, FAMILIES, newSplitKeys);
435 while (table.getRegionLocations().size() != 15 ||
436 !admin.isTableAvailable(table.getTableName())) {
437 Thread.sleep(200);
438 LOG.info("Waiting for new region assignment to happen");
439 }
440 }
441
442
443 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
444
445
446 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
447 assertEquals("LoadIncrementalHFiles should put expected data in table",
448 expectedRows, util.countRows(table));
449 Scan scan = new Scan();
450 ResultScanner results = table.getScanner(scan);
451 for (Result res : results) {
452 assertEquals(FAMILIES.length, res.rawCells().length);
453 Cell first = res.rawCells()[0];
454 for (Cell kv : res.rawCells()) {
455 assertTrue(CellUtil.matchingRow(first, kv));
456 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
457 }
458 }
459 results.close();
460 String tableDigestBefore = util.checksumRows(table);
461
462
463 admin.disableTable(TABLE_NAME);
464 while (!admin.isTableDisabled(TABLE_NAME)) {
465 Thread.sleep(200);
466 LOG.info("Waiting for table to disable");
467 }
468 admin.enableTable(TABLE_NAME);
469 util.waitTableAvailable(TABLE_NAME);
470 assertEquals("Data should remain after reopening of regions",
471 tableDigestBefore, util.checksumRows(table));
472 } finally {
473 if (admin != null) admin.close();
474 util.shutdownMiniCluster();
475 }
476 }
477
478 private void runIncrementalPELoad(
479 Configuration conf, HTable table, Path outDir)
480 throws Exception {
481 Job job = new Job(conf, "testLocalMRIncrementalLoad");
482 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
483 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
484 MutationSerialization.class.getName(), ResultSerialization.class.getName(),
485 KeyValueSerialization.class.getName());
486 setupRandomGeneratorMapper(job);
487 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
488 table.getRegionLocator());
489 FileOutputFormat.setOutputPath(job, outDir);
490
491 Assert.assertFalse( util.getTestFileSystem().exists(outDir)) ;
492
493 assertEquals(table.getRegionLocator().getAllRegionLocations().size(), job.getNumReduceTasks());
494
495 assertTrue(job.waitForCompletion(true));
496 }
497
498
499
500
501
502
503
504
505
506
507 @Test
508 public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
509 for (int numCfs = 0; numCfs <= 3; numCfs++) {
510 Configuration conf = new Configuration(this.util.getConfiguration());
511 Map<String, Compression.Algorithm> familyToCompression =
512 getMockColumnFamiliesForCompression(numCfs);
513 Table table = Mockito.mock(HTable.class);
514 setupMockColumnFamiliesForCompression(table, familyToCompression);
515 HFileOutputFormat.configureCompression(table, conf);
516
517
518 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat
519 .createFamilyCompressionMap(conf);
520
521
522
523 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
524 assertEquals("Compression configuration incorrect for column family:"
525 + entry.getKey(), entry.getValue(),
526 retrievedFamilyToCompressionMap.get(entry.getKey().getBytes()));
527 }
528 }
529 }
530
531 private void setupMockColumnFamiliesForCompression(Table table,
532 Map<String, Compression.Algorithm> familyToCompression) throws IOException {
533 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
534 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
535 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
536 .setMaxVersions(1)
537 .setCompressionType(entry.getValue())
538 .setBlockCacheEnabled(false)
539 .setTimeToLive(0));
540 }
541 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
542 }
543
544
545
546
547
548 private Map<String, Compression.Algorithm>
549 getMockColumnFamiliesForCompression (int numCfs) {
550 Map<String, Compression.Algorithm> familyToCompression = new HashMap<String, Compression.Algorithm>();
551
552 if (numCfs-- > 0) {
553 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
554 }
555 if (numCfs-- > 0) {
556 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
557 }
558 if (numCfs-- > 0) {
559 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
560 }
561 if (numCfs-- > 0) {
562 familyToCompression.put("Family3", Compression.Algorithm.NONE);
563 }
564 return familyToCompression;
565 }
566
567
568
569
570
571
572
573
574
575
576
577 @Test
578 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
579 for (int numCfs = 0; numCfs <= 2; numCfs++) {
580 Configuration conf = new Configuration(this.util.getConfiguration());
581 Map<String, BloomType> familyToBloomType =
582 getMockColumnFamiliesForBloomType(numCfs);
583 Table table = Mockito.mock(HTable.class);
584 setupMockColumnFamiliesForBloomType(table,
585 familyToBloomType);
586 HFileOutputFormat.configureBloomType(table, conf);
587
588
589
590 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
591 HFileOutputFormat
592 .createFamilyBloomTypeMap(conf);
593
594
595
596 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
597 assertEquals("BloomType configuration incorrect for column family:"
598 + entry.getKey(), entry.getValue(),
599 retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes()));
600 }
601 }
602 }
603
604 private void setupMockColumnFamiliesForBloomType(Table table,
605 Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
606 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
607 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
608 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
609 .setMaxVersions(1)
610 .setBloomFilterType(entry.getValue())
611 .setBlockCacheEnabled(false)
612 .setTimeToLive(0));
613 }
614 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
615 }
616
617
618
619
620
621 private Map<String, BloomType>
622 getMockColumnFamiliesForBloomType (int numCfs) {
623 Map<String, BloomType> familyToBloomType =
624 new HashMap<String, BloomType>();
625
626 if (numCfs-- > 0) {
627 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
628 }
629 if (numCfs-- > 0) {
630 familyToBloomType.put("Family2=asdads&!AASD",
631 BloomType.ROWCOL);
632 }
633 if (numCfs-- > 0) {
634 familyToBloomType.put("Family3", BloomType.NONE);
635 }
636 return familyToBloomType;
637 }
638
639
640
641
642
643
644
645
646
647
648 @Test
649 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
650 for (int numCfs = 0; numCfs <= 3; numCfs++) {
651 Configuration conf = new Configuration(this.util.getConfiguration());
652 Map<String, Integer> familyToBlockSize =
653 getMockColumnFamiliesForBlockSize(numCfs);
654 Table table = Mockito.mock(HTable.class);
655 setupMockColumnFamiliesForBlockSize(table,
656 familyToBlockSize);
657 HFileOutputFormat.configureBlockSize(table, conf);
658
659
660
661 Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
662 HFileOutputFormat
663 .createFamilyBlockSizeMap(conf);
664
665
666
667 for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
668 ) {
669 assertEquals("BlockSize configuration incorrect for column family:"
670 + entry.getKey(), entry.getValue(),
671 retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes()));
672 }
673 }
674 }
675
676 private void setupMockColumnFamiliesForBlockSize(Table table,
677 Map<String, Integer> familyToDataBlockEncoding) throws IOException {
678 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
679 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
680 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
681 .setMaxVersions(1)
682 .setBlocksize(entry.getValue())
683 .setBlockCacheEnabled(false)
684 .setTimeToLive(0));
685 }
686 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
687 }
688
689
690
691
692
693 private Map<String, Integer>
694 getMockColumnFamiliesForBlockSize (int numCfs) {
695 Map<String, Integer> familyToBlockSize =
696 new HashMap<String, Integer>();
697
698 if (numCfs-- > 0) {
699 familyToBlockSize.put("Family1!@#!@#&", 1234);
700 }
701 if (numCfs-- > 0) {
702 familyToBlockSize.put("Family2=asdads&!AASD",
703 Integer.MAX_VALUE);
704 }
705 if (numCfs-- > 0) {
706 familyToBlockSize.put("Family2=asdads&!AASD",
707 Integer.MAX_VALUE);
708 }
709 if (numCfs-- > 0) {
710 familyToBlockSize.put("Family3", 0);
711 }
712 return familyToBlockSize;
713 }
714
715
716
717
718
719
720
721
722
723
724 @Test
725 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
726 for (int numCfs = 0; numCfs <= 3; numCfs++) {
727 Configuration conf = new Configuration(this.util.getConfiguration());
728 Map<String, DataBlockEncoding> familyToDataBlockEncoding =
729 getMockColumnFamiliesForDataBlockEncoding(numCfs);
730 Table table = Mockito.mock(HTable.class);
731 setupMockColumnFamiliesForDataBlockEncoding(table,
732 familyToDataBlockEncoding);
733 HFileOutputFormat.configureDataBlockEncoding(table, conf);
734
735
736
737 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
738 HFileOutputFormat
739 .createFamilyDataBlockEncodingMap(conf);
740
741
742
743 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
744 assertEquals("DataBlockEncoding configuration incorrect for column family:"
745 + entry.getKey(), entry.getValue(),
746 retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes()));
747 }
748 }
749 }
750
751 private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
752 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
753 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
754 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
755 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
756 .setMaxVersions(1)
757 .setDataBlockEncoding(entry.getValue())
758 .setBlockCacheEnabled(false)
759 .setTimeToLive(0));
760 }
761 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
762 }
763
764
765
766
767
768 private Map<String, DataBlockEncoding>
769 getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
770 Map<String, DataBlockEncoding> familyToDataBlockEncoding =
771 new HashMap<String, DataBlockEncoding>();
772
773 if (numCfs-- > 0) {
774 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
775 }
776 if (numCfs-- > 0) {
777 familyToDataBlockEncoding.put("Family2=asdads&!AASD",
778 DataBlockEncoding.FAST_DIFF);
779 }
780 if (numCfs-- > 0) {
781 familyToDataBlockEncoding.put("Family2=asdads&!AASD",
782 DataBlockEncoding.PREFIX);
783 }
784 if (numCfs-- > 0) {
785 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
786 }
787 return familyToDataBlockEncoding;
788 }
789
790 private void setupMockStartKeys(RegionLocator regionLocator) throws IOException {
791 byte[][] mockKeys = new byte[][] {
792 HConstants.EMPTY_BYTE_ARRAY,
793 Bytes.toBytes("aaa"),
794 Bytes.toBytes("ggg"),
795 Bytes.toBytes("zzz")
796 };
797 Mockito.doReturn(mockKeys).when(regionLocator).getStartKeys();
798 }
799
800
801
802
803
804 @Test
805 public void testColumnFamilySettings() throws Exception {
806 Configuration conf = new Configuration(this.util.getConfiguration());
807 RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
808 TaskAttemptContext context = null;
809 Path dir = util.getDataTestDir("testColumnFamilySettings");
810
811
812 HTable table = Mockito.mock(HTable.class);
813 RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
814 HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
815 Mockito.doReturn(htd).when(table).getTableDescriptor();
816 for (HColumnDescriptor hcd: this.util.generateColumnDescriptors()) {
817 htd.addFamily(hcd);
818 }
819
820
821 setupMockStartKeys(regionLocator);
822
823 try {
824
825
826
827 conf.set("io.seqfile.compression.type", "NONE");
828 conf.set("hbase.fs.tmp.dir", dir.toString());
829 Job job = new Job(conf, "testLocalMRIncrementalLoad");
830 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
831 setupRandomGeneratorMapper(job);
832 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
833 FileOutputFormat.setOutputPath(job, dir);
834 context = createTestTaskAttemptContext(job);
835 HFileOutputFormat hof = new HFileOutputFormat();
836 writer = hof.getRecordWriter(context);
837
838
839 writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
840 writer.close(context);
841
842
843 FileSystem fs = dir.getFileSystem(conf);
844
845
846 hof.getOutputCommitter(context).commitTask(context);
847 hof.getOutputCommitter(context).commitJob(context);
848 FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
849 assertEquals(htd.getFamilies().size(), families.length);
850 for (FileStatus f : families) {
851 String familyStr = f.getPath().getName();
852 HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
853
854
855 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
856 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), conf);
857 Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
858
859 byte[] bloomFilter = fileInfo.get(StoreFile.BLOOM_FILTER_TYPE_KEY);
860 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
861 assertEquals("Incorrect bloom filter used for column family " + familyStr +
862 "(reader: " + reader + ")",
863 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
864 assertEquals("Incorrect compression used for column family " + familyStr +
865 "(reader: " + reader + ")", hcd.getCompression(), reader.getFileContext().getCompression());
866 }
867 } finally {
868 dir.getFileSystem(conf).delete(dir, true);
869 }
870 }
871
872
873
874
875
876 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, KeyValue> writer,
877 TaskAttemptContext context, Set<byte[]> families, int numRows)
878 throws IOException, InterruptedException {
879 byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
880 int valLength = 10;
881 byte valBytes[] = new byte[valLength];
882
883 int taskId = context.getTaskAttemptID().getTaskID().getId();
884 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
885 final byte [] qualifier = Bytes.toBytes("data");
886 Random random = new Random();
887 for (int i = 0; i < numRows; i++) {
888
889 Bytes.putInt(keyBytes, 0, i);
890 random.nextBytes(valBytes);
891 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
892
893 for (byte[] family : families) {
894 KeyValue kv = new KeyValue(keyBytes, family, qualifier, valBytes);
895 writer.write(key, kv);
896 }
897 }
898 }
899
900
901
902
903
904
905
906 @Ignore ("Flakey: See HBASE-9051") @Test
907 public void testExcludeAllFromMinorCompaction() throws Exception {
908 Configuration conf = util.getConfiguration();
909 conf.setInt("hbase.hstore.compaction.min", 2);
910 generateRandomStartKeys(5);
911
912 try {
913 util.setJobWithoutMRCluster();
914 util.startMiniCluster();
915 final FileSystem fs = util.getDFSCluster().getFileSystem();
916 HBaseAdmin admin = new HBaseAdmin(conf);
917 HTable table = util.createTable(TABLE_NAME, FAMILIES);
918 assertEquals("Should start with empty table", 0, util.countRows(table));
919
920
921 final Path storePath = HStore.getStoreHomedir(
922 FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
923 admin.getTableRegions(TABLE_NAME).get(0),
924 FAMILIES[0]);
925 assertEquals(0, fs.listStatus(storePath).length);
926
927
928 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
929 true);
930
931 for (int i = 0; i < 2; i++) {
932 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
933 runIncrementalPELoad(conf, table, testDir);
934
935 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
936 }
937
938
939 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
940 assertEquals("LoadIncrementalHFiles should put expected data in table",
941 expectedRows, util.countRows(table));
942
943
944 assertEquals(2, fs.listStatus(storePath).length);
945
946
947 admin.compact(TABLE_NAME.getName());
948 try {
949 quickPoll(new Callable<Boolean>() {
950 public Boolean call() throws Exception {
951 return fs.listStatus(storePath).length == 1;
952 }
953 }, 5000);
954 throw new IOException("SF# = " + fs.listStatus(storePath).length);
955 } catch (AssertionError ae) {
956
957 }
958
959
960 admin.majorCompact(TABLE_NAME.getName());
961 quickPoll(new Callable<Boolean>() {
962 public Boolean call() throws Exception {
963 return fs.listStatus(storePath).length == 1;
964 }
965 }, 5000);
966
967 } finally {
968 util.shutdownMiniCluster();
969 }
970 }
971
972 @Test
973 public void testExcludeMinorCompaction() throws Exception {
974 Configuration conf = util.getConfiguration();
975 conf.setInt("hbase.hstore.compaction.min", 2);
976 generateRandomStartKeys(5);
977
978 try {
979 util.setJobWithoutMRCluster();
980 util.startMiniCluster();
981 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
982 final FileSystem fs = util.getTestFileSystem();
983 HBaseAdmin admin = new HBaseAdmin(conf);
984 HTable table = util.createTable(TABLE_NAME, FAMILIES);
985 assertEquals("Should start with empty table", 0, util.countRows(table));
986
987
988 final Path storePath = HStore.getStoreHomedir(
989 FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
990 admin.getTableRegions(TABLE_NAME).get(0),
991 FAMILIES[0]);
992 assertEquals(0, fs.listStatus(storePath).length);
993
994
995 Put p = new Put(Bytes.toBytes("test"));
996 p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
997 table.put(p);
998 admin.flush(TABLE_NAME.getName());
999 assertEquals(1, util.countRows(table));
1000 quickPoll(new Callable<Boolean>() {
1001 public Boolean call() throws Exception {
1002 return fs.listStatus(storePath).length == 1;
1003 }
1004 }, 5000);
1005
1006
1007 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1008 true);
1009 runIncrementalPELoad(conf, table, testDir);
1010
1011
1012 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
1013
1014
1015 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1016 assertEquals("LoadIncrementalHFiles should put expected data in table",
1017 expectedRows + 1, util.countRows(table));
1018
1019
1020 assertEquals(2, fs.listStatus(storePath).length);
1021
1022
1023 admin.compact(TABLE_NAME.getName());
1024 try {
1025 quickPoll(new Callable<Boolean>() {
1026 public Boolean call() throws Exception {
1027 return fs.listStatus(storePath).length == 1;
1028 }
1029 }, 5000);
1030 throw new IOException("SF# = " + fs.listStatus(storePath).length);
1031 } catch (AssertionError ae) {
1032
1033 }
1034
1035
1036 admin.majorCompact(TABLE_NAME.getName());
1037 quickPoll(new Callable<Boolean>() {
1038 public Boolean call() throws Exception {
1039 return fs.listStatus(storePath).length == 1;
1040 }
1041 }, 5000);
1042
1043 } finally {
1044 util.shutdownMiniCluster();
1045 }
1046 }
1047
1048 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1049 int sleepMs = 10;
1050 int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1051 while (retries-- > 0) {
1052 if (c.call().booleanValue()) {
1053 return;
1054 }
1055 Thread.sleep(sleepMs);
1056 }
1057 fail();
1058 }
1059
1060 public static void main(String args[]) throws Exception {
1061 new TestHFileOutputFormat().manualTest(args);
1062 }
1063
1064 public void manualTest(String args[]) throws Exception {
1065 Configuration conf = HBaseConfiguration.create();
1066 util = new HBaseTestingUtility(conf);
1067 if ("newtable".equals(args[0])) {
1068 TableName tname = TableName.valueOf(args[1]);
1069 byte[][] splitKeys = generateRandomSplitKeys(4);
1070 HTable table = util.createTable(tname, FAMILIES, splitKeys);
1071 } else if ("incremental".equals(args[0])) {
1072 TableName tname = TableName.valueOf(args[1]);
1073 HTable table = new HTable(conf, tname);
1074 Path outDir = new Path("incremental-out");
1075 runIncrementalPELoad(conf, table, outDir);
1076 } else {
1077 throw new RuntimeException(
1078 "usage: TestHFileOutputFormat newtable | incremental");
1079 }
1080 }
1081
1082 }
1083