View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertNotSame;
25  import static org.junit.Assert.assertTrue;
26  import static org.junit.Assert.fail;
27  
28  import java.io.IOException;
29  import java.util.Arrays;
30  import java.util.HashMap;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.Random;
34  import java.util.Set;
35  import java.util.concurrent.Callable;
36  
37  import junit.framework.Assert;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FileStatus;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.CategoryBasedTimeout;
46  import org.apache.hadoop.hbase.Cell;
47  import org.apache.hadoop.hbase.CellUtil;
48  import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
49  import org.apache.hadoop.hbase.HBaseConfiguration;
50  import org.apache.hadoop.hbase.HBaseTestingUtility;
51  import org.apache.hadoop.hbase.HColumnDescriptor;
52  import org.apache.hadoop.hbase.HConstants;
53  import org.apache.hadoop.hbase.HTableDescriptor;
54  import org.apache.hadoop.hbase.HadoopShims;
55  import org.apache.hadoop.hbase.KeyValue;
56  import org.apache.hadoop.hbase.PerformanceEvaluation;
57  import org.apache.hadoop.hbase.TableName;
58  import org.apache.hadoop.hbase.client.HBaseAdmin;
59  import org.apache.hadoop.hbase.client.HTable;
60  import org.apache.hadoop.hbase.client.Put;
61  import org.apache.hadoop.hbase.client.RegionLocator;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.ResultScanner;
64  import org.apache.hadoop.hbase.client.Scan;
65  import org.apache.hadoop.hbase.client.Table;
66  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
67  import org.apache.hadoop.hbase.io.compress.Compression;
68  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
69  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
70  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
71  import org.apache.hadoop.hbase.io.hfile.HFile;
72  import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
73  import org.apache.hadoop.hbase.regionserver.BloomType;
74  import org.apache.hadoop.hbase.regionserver.HStore;
75  import org.apache.hadoop.hbase.regionserver.StoreFile;
76  import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
77  import org.apache.hadoop.hbase.testclassification.LargeTests;
78  import org.apache.hadoop.hbase.util.Bytes;
79  import org.apache.hadoop.hbase.util.FSUtils;
80  import org.apache.hadoop.hbase.util.Threads;
81  import org.apache.hadoop.hbase.util.Writables;
82  import org.apache.hadoop.io.NullWritable;
83  import org.apache.hadoop.mapreduce.Job;
84  import org.apache.hadoop.mapreduce.Mapper;
85  import org.apache.hadoop.mapreduce.RecordWriter;
86  import org.apache.hadoop.mapreduce.TaskAttemptContext;
87  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
88  import org.junit.Ignore;
89  import org.junit.Rule;
90  import org.junit.Test;
91  import org.junit.experimental.categories.Category;
92  import org.junit.rules.TestRule;
93  import org.mockito.Mockito;
94  
95  /**
96   * Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}.
97   * Sets up and runs a mapreduce job that writes hfile output.
98   * Creates a few inner classes to implement splits and an inputformat that
99   * emits keys and values like those of {@link PerformanceEvaluation}.
100  */
101 @Category(LargeTests.class)
102 public class TestHFileOutputFormat  {
103   @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
104       withTimeout(this.getClass()).withLookingForStuckThread(true).build();
105   private final static int ROWSPERSPLIT = 1024;
106 
107   private static final byte[][] FAMILIES
108     = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
109       , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
110   private static final TableName TABLE_NAME =
111       TableName.valueOf("TestTable");
112 
113   private HBaseTestingUtility util = new HBaseTestingUtility();
114 
115   private static final Log LOG = LogFactory.getLog(TestHFileOutputFormat.class);
116 
117   /**
118    * Simple mapper that makes KeyValue output.
119    */
120   static class RandomKVGeneratingMapper extends
121       Mapper<NullWritable, NullWritable, ImmutableBytesWritable, KeyValue> {
122 
123     private int keyLength;
124     private static final int KEYLEN_DEFAULT = 10;
125     private static final String KEYLEN_CONF = "randomkv.key.length";
126 
127     private int valLength;
128     private static final int VALLEN_DEFAULT=10;
129     private static final String VALLEN_CONF="randomkv.val.length";
130     private static final byte [] QUALIFIER = Bytes.toBytes("data");
131 
132     @Override
133     protected void setup(Context context) throws IOException,
134         InterruptedException {
135       super.setup(context);
136 
137       Configuration conf = context.getConfiguration();
138       keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
139       valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
140     }
141 
142     protected void map(
143         NullWritable n1, NullWritable n2,
144         Mapper<NullWritable, NullWritable,
145                ImmutableBytesWritable,KeyValue>.Context context)
146         throws java.io.IOException ,InterruptedException
147     {
148 
149       byte keyBytes[] = new byte[keyLength];
150       byte valBytes[] = new byte[valLength];
151 
152       int taskId = context.getTaskAttemptID().getTaskID().getId();
153       assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
154 
155       Random random = new Random();
156       for (int i = 0; i < ROWSPERSPLIT; i++) {
157 
158         random.nextBytes(keyBytes);
159         // Ensure that unique tasks generate unique keys
160         keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
161         random.nextBytes(valBytes);
162         ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
163 
164         for (byte[] family : TestHFileOutputFormat.FAMILIES) {
165           KeyValue kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
166           context.write(key, kv);
167         }
168       }
169     }
170   }
171 
172   private void setupRandomGeneratorMapper(Job job) {
173     job.setInputFormatClass(NMapInputFormat.class);
174     job.setMapperClass(RandomKVGeneratingMapper.class);
175     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
176     job.setMapOutputValueClass(KeyValue.class);
177   }
178 
179   /**
180    * Test that {@link HFileOutputFormat} RecordWriter amends timestamps if
181    * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}.
182    * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
183    */
184   @Test
185   public void test_LATEST_TIMESTAMP_isReplaced()
186   throws Exception {
187     Configuration conf = new Configuration(this.util.getConfiguration());
188     RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
189     TaskAttemptContext context = null;
190     Path dir =
191       util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
192     try {
193       Job job = new Job(conf);
194       FileOutputFormat.setOutputPath(job, dir);
195       context = createTestTaskAttemptContext(job);
196       HFileOutputFormat hof = new HFileOutputFormat();
197       writer = hof.getRecordWriter(context);
198       final byte [] b = Bytes.toBytes("b");
199 
200       // Test 1.  Pass a KV that has a ts of LATEST_TIMESTAMP.  It should be
201       // changed by call to write.  Check all in kv is same but ts.
202       KeyValue kv = new KeyValue(b, b, b);
203       KeyValue original = kv.clone();
204       writer.write(new ImmutableBytesWritable(), kv);
205       assertFalse(original.equals(kv));
206       assertTrue(Bytes.equals(original.getRow(), kv.getRow()));
207       assertTrue(CellUtil.matchingColumn(original, kv.getFamily(), kv.getQualifier()));
208       assertNotSame(original.getTimestamp(), kv.getTimestamp());
209       assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
210 
211       // Test 2. Now test passing a kv that has explicit ts.  It should not be
212       // changed by call to record write.
213       kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
214       original = kv.clone();
215       writer.write(new ImmutableBytesWritable(), kv);
216       assertTrue(original.equals(kv));
217     } finally {
218       if (writer != null && context != null) writer.close(context);
219       dir.getFileSystem(conf).delete(dir, true);
220     }
221   }
222 
223   private TaskAttemptContext createTestTaskAttemptContext(final Job job)
224   throws IOException, Exception {
225     HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
226     TaskAttemptContext context = hadoop.createTestTaskAttemptContext(job, "attempt_200707121733_0001_m_000000_0");
227     return context;
228   }
229 
230   /*
231    * Test that {@link HFileOutputFormat} creates an HFile with TIMERANGE
232    * metadata used by time-restricted scans.
233    */
234   @Test
235   public void test_TIMERANGE() throws Exception {
236     Configuration conf = new Configuration(this.util.getConfiguration());
237     RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
238     TaskAttemptContext context = null;
239     Path dir =
240       util.getDataTestDir("test_TIMERANGE_present");
241     LOG.info("Timerange dir writing to dir: "+ dir);
242     try {
243       // build a record writer using HFileOutputFormat
244       Job job = new Job(conf);
245       FileOutputFormat.setOutputPath(job, dir);
246       context = createTestTaskAttemptContext(job);
247       HFileOutputFormat hof = new HFileOutputFormat();
248       writer = hof.getRecordWriter(context);
249 
250       // Pass two key values with explicit times stamps
251       final byte [] b = Bytes.toBytes("b");
252 
253       // value 1 with timestamp 2000
254       KeyValue kv = new KeyValue(b, b, b, 2000, b);
255       KeyValue original = kv.clone();
256       writer.write(new ImmutableBytesWritable(), kv);
257       assertEquals(original,kv);
258 
259       // value 2 with timestamp 1000
260       kv = new KeyValue(b, b, b, 1000, b);
261       original = kv.clone();
262       writer.write(new ImmutableBytesWritable(), kv);
263       assertEquals(original, kv);
264 
265       // verify that the file has the proper FileInfo.
266       writer.close(context);
267 
268       // the generated file lives 1 directory down from the attempt directory
269       // and is the only file, e.g.
270       // _attempt__0000_r_000000_0/b/1979617994050536795
271       FileSystem fs = FileSystem.get(conf);
272       Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
273       FileStatus[] sub1 = fs.listStatus(attemptDirectory);
274       FileStatus[] file = fs.listStatus(sub1[0].getPath());
275 
276       // open as HFile Reader and pull out TIMERANGE FileInfo.
277       HFile.Reader rd = HFile.createReader(fs, file[0].getPath(),
278           new CacheConfig(conf), conf);
279       Map<byte[],byte[]> finfo = rd.loadFileInfo();
280       byte[] range = finfo.get("TIMERANGE".getBytes());
281       assertNotNull(range);
282 
283       // unmarshall and check values.
284       TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
285       Writables.copyWritable(range, timeRangeTracker);
286       LOG.info(timeRangeTracker.getMinimumTimestamp() +
287           "...." + timeRangeTracker.getMaximumTimestamp());
288       assertEquals(1000, timeRangeTracker.getMinimumTimestamp());
289       assertEquals(2000, timeRangeTracker.getMaximumTimestamp());
290       rd.close();
291     } finally {
292       if (writer != null && context != null) writer.close(context);
293       dir.getFileSystem(conf).delete(dir, true);
294     }
295   }
296 
297   /**
298    * Run small MR job.
299    */
300   @Test
301   public void testWritingPEData() throws Exception {
302     Configuration conf = util.getConfiguration();
303     Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
304     FileSystem fs = testDir.getFileSystem(conf);
305 
306     // Set down this value or we OOME in eclipse.
307     conf.setInt("mapreduce.task.io.sort.mb", 20);
308     // Write a few files.
309     conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
310 
311     Job job = new Job(conf, "testWritingPEData");
312     setupRandomGeneratorMapper(job);
313     // This partitioner doesn't work well for number keys but using it anyways
314     // just to demonstrate how to configure it.
315     byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
316     byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
317 
318     Arrays.fill(startKey, (byte)0);
319     Arrays.fill(endKey, (byte)0xff);
320 
321     job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
322     // Set start and end rows for partitioner.
323     SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
324     SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
325     job.setReducerClass(KeyValueSortReducer.class);
326     job.setOutputFormatClass(HFileOutputFormat.class);
327     job.setNumReduceTasks(4);
328     job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
329         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
330         KeyValueSerialization.class.getName());
331 
332     FileOutputFormat.setOutputPath(job, testDir);
333     assertTrue(job.waitForCompletion(false));
334     FileStatus [] files = fs.listStatus(testDir);
335     assertTrue(files.length > 0);
336   }
337 
338   @Test
339   public void testJobConfiguration() throws Exception {
340     Configuration conf = new Configuration(this.util.getConfiguration());
341     conf.set("hbase.fs.tmp.dir", util.getDataTestDir("testJobConfiguration").toString());
342     Job job = new Job(conf);
343     job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
344     HTableDescriptor tableDescriptor = Mockito.mock(HTableDescriptor.class);
345     RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
346     setupMockStartKeys(regionLocator);
347     HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator);
348     assertEquals(job.getNumReduceTasks(), 4);
349   }
350 
351   private byte [][] generateRandomStartKeys(int numKeys) {
352     Random random = new Random();
353     byte[][] ret = new byte[numKeys][];
354     // first region start key is always empty
355     ret[0] = HConstants.EMPTY_BYTE_ARRAY;
356     for (int i = 1; i < numKeys; i++) {
357       ret[i] =
358         PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
359     }
360     return ret;
361   }
362 
363   private byte[][] generateRandomSplitKeys(int numKeys) {
364     Random random = new Random();
365     byte[][] ret = new byte[numKeys][];
366     for (int i = 0; i < numKeys; i++) {
367       ret[i] =
368           PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
369     }
370     return ret;
371   }
372 
373   @Test
374   public void testMRIncrementalLoad() throws Exception {
375     LOG.info("\nStarting test testMRIncrementalLoad\n");
376     doIncrementalLoadTest(false);
377   }
378 
379   @Test
380   public void testMRIncrementalLoadWithSplit() throws Exception {
381     LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
382     doIncrementalLoadTest(true);
383   }
384 
385   private void doIncrementalLoadTest(
386       boolean shouldChangeRegions) throws Exception {
387     util = new HBaseTestingUtility();
388     Configuration conf = util.getConfiguration();
389     byte[][] splitKeys = generateRandomSplitKeys(4);
390     HBaseAdmin admin = null;
391     try {
392       util.setJobWithoutMRCluster();
393       util.startMiniCluster();
394       Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
395       admin = util.getHBaseAdmin();
396       HTable table = util.createTable(TABLE_NAME, FAMILIES, splitKeys);
397       assertEquals("Should start with empty table",
398           0, util.countRows(table));
399       int numRegions = -1;
400       try(RegionLocator r = table.getRegionLocator()) {
401         numRegions = r.getStartKeys().length;
402       }
403       assertEquals("Should make 5 regions", numRegions, 5);
404 
405       // Generate the bulk load files
406       runIncrementalPELoad(conf, table, testDir);
407       // This doesn't write into the table, just makes files
408       assertEquals("HFOF should not touch actual table",
409           0, util.countRows(table));
410 
411 
412       // Make sure that a directory was created for every CF
413       int dir = 0;
414       for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
415         for (byte[] family : FAMILIES) {
416           if (Bytes.toString(family).equals(f.getPath().getName())) {
417             ++dir;
418           }
419         }
420       }
421       assertEquals("Column family not found in FS.", FAMILIES.length, dir);
422 
423       // handle the split case
424       if (shouldChangeRegions) {
425         LOG.info("Changing regions in table");
426         admin.disableTable(table.getTableName());
427         while(util.getMiniHBaseCluster().getMaster().getAssignmentManager().
428             getRegionStates().isRegionsInTransition()) {
429           Threads.sleep(200);
430           LOG.info("Waiting on table to finish disabling");
431         }
432         util.deleteTable(table.getName());
433         byte[][] newSplitKeys = generateRandomSplitKeys(14);
434         table = util.createTable(TABLE_NAME, FAMILIES, newSplitKeys);
435         while (table.getRegionLocations().size() != 15 ||
436             !admin.isTableAvailable(table.getTableName())) {
437           Thread.sleep(200);
438           LOG.info("Waiting for new region assignment to happen");
439         }
440       }
441 
442       // Perform the actual load
443       new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
444 
445       // Ensure data shows up
446       int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
447       assertEquals("LoadIncrementalHFiles should put expected data in table",
448           expectedRows, util.countRows(table));
449       Scan scan = new Scan();
450       ResultScanner results = table.getScanner(scan);
451       for (Result res : results) {
452         assertEquals(FAMILIES.length, res.rawCells().length);
453         Cell first = res.rawCells()[0];
454         for (Cell kv : res.rawCells()) {
455           assertTrue(CellUtil.matchingRow(first, kv));
456           assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
457         }
458       }
459       results.close();
460       String tableDigestBefore = util.checksumRows(table);
461 
462       // Cause regions to reopen
463       admin.disableTable(TABLE_NAME);
464       while (!admin.isTableDisabled(TABLE_NAME)) {
465         Thread.sleep(200);
466         LOG.info("Waiting for table to disable");
467       }
468       admin.enableTable(TABLE_NAME);
469       util.waitTableAvailable(TABLE_NAME);
470       assertEquals("Data should remain after reopening of regions",
471           tableDigestBefore, util.checksumRows(table));
472     } finally {
473       if (admin != null) admin.close();
474       util.shutdownMiniCluster();
475     }
476   }
477 
478   private void runIncrementalPELoad(
479       Configuration conf, HTable table, Path outDir)
480   throws Exception {
481     Job job = new Job(conf, "testLocalMRIncrementalLoad");
482     job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
483     job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
484         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
485         KeyValueSerialization.class.getName());
486     setupRandomGeneratorMapper(job);
487     HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
488         table.getRegionLocator());
489     FileOutputFormat.setOutputPath(job, outDir);
490 
491     Assert.assertFalse( util.getTestFileSystem().exists(outDir)) ;
492 
493     assertEquals(table.getRegionLocator().getAllRegionLocations().size(), job.getNumReduceTasks());
494 
495     assertTrue(job.waitForCompletion(true));
496   }
497 
498   /**
499    * Test for {@link HFileOutputFormat#configureCompression(org.apache.hadoop.hbase.client.Table,
500    * Configuration)} and {@link HFileOutputFormat#createFamilyCompressionMap
501    * (Configuration)}.
502    * Tests that the compression map is correctly serialized into
503    * and deserialized from configuration
504    *
505    * @throws IOException
506    */
507   @Test
508   public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
509     for (int numCfs = 0; numCfs <= 3; numCfs++) {
510       Configuration conf = new Configuration(this.util.getConfiguration());
511       Map<String, Compression.Algorithm> familyToCompression =
512           getMockColumnFamiliesForCompression(numCfs);
513       Table table = Mockito.mock(HTable.class);
514       setupMockColumnFamiliesForCompression(table, familyToCompression);
515       HFileOutputFormat.configureCompression(table, conf);
516 
517       // read back family specific compression setting from the configuration
518       Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat
519           .createFamilyCompressionMap(conf);
520 
521       // test that we have a value for all column families that matches with the
522       // used mock values
523       for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
524         assertEquals("Compression configuration incorrect for column family:"
525             + entry.getKey(), entry.getValue(),
526             retrievedFamilyToCompressionMap.get(entry.getKey().getBytes()));
527       }
528     }
529   }
530 
531   private void setupMockColumnFamiliesForCompression(Table table,
532       Map<String, Compression.Algorithm> familyToCompression) throws IOException {
533     HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
534     for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
535       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
536           .setMaxVersions(1)
537           .setCompressionType(entry.getValue())
538           .setBlockCacheEnabled(false)
539           .setTimeToLive(0));
540     }
541     Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
542   }
543 
544   /**
545    * @return a map from column family names to compression algorithms for
546    *         testing column family compression. Column family names have special characters
547    */
548   private Map<String, Compression.Algorithm>
549       getMockColumnFamiliesForCompression (int numCfs) {
550     Map<String, Compression.Algorithm> familyToCompression = new HashMap<String, Compression.Algorithm>();
551     // use column family names having special characters
552     if (numCfs-- > 0) {
553       familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
554     }
555     if (numCfs-- > 0) {
556       familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
557     }
558     if (numCfs-- > 0) {
559       familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
560     }
561     if (numCfs-- > 0) {
562       familyToCompression.put("Family3", Compression.Algorithm.NONE);
563     }
564     return familyToCompression;
565   }
566 
567 
568   /**
569    * Test for {@link HFileOutputFormat#configureBloomType(org.apache.hadoop.hbase.client.Table,
570    * Configuration)} and {@link HFileOutputFormat#createFamilyBloomTypeMap
571    * (Configuration)}.
572    * Tests that the compression map is correctly serialized into
573    * and deserialized from configuration
574    *
575    * @throws IOException
576    */
577   @Test
578   public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
579     for (int numCfs = 0; numCfs <= 2; numCfs++) {
580       Configuration conf = new Configuration(this.util.getConfiguration());
581       Map<String, BloomType> familyToBloomType =
582           getMockColumnFamiliesForBloomType(numCfs);
583       Table table = Mockito.mock(HTable.class);
584       setupMockColumnFamiliesForBloomType(table,
585           familyToBloomType);
586       HFileOutputFormat.configureBloomType(table, conf);
587 
588       // read back family specific data block encoding settings from the
589       // configuration
590       Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
591           HFileOutputFormat
592               .createFamilyBloomTypeMap(conf);
593 
594       // test that we have a value for all column families that matches with the
595       // used mock values
596       for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
597         assertEquals("BloomType configuration incorrect for column family:"
598             + entry.getKey(), entry.getValue(),
599             retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes()));
600       }
601     }
602   }
603 
604   private void setupMockColumnFamiliesForBloomType(Table table,
605       Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
606     HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
607     for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
608       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
609           .setMaxVersions(1)
610           .setBloomFilterType(entry.getValue())
611           .setBlockCacheEnabled(false)
612           .setTimeToLive(0));
613     }
614     Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
615   }
616 
617   /**
618    * @return a map from column family names to compression algorithms for
619    *         testing column family compression. Column family names have special characters
620    */
621   private Map<String, BloomType>
622   getMockColumnFamiliesForBloomType (int numCfs) {
623     Map<String, BloomType> familyToBloomType =
624         new HashMap<String, BloomType>();
625     // use column family names having special characters
626     if (numCfs-- > 0) {
627       familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
628     }
629     if (numCfs-- > 0) {
630       familyToBloomType.put("Family2=asdads&!AASD",
631           BloomType.ROWCOL);
632     }
633     if (numCfs-- > 0) {
634       familyToBloomType.put("Family3", BloomType.NONE);
635     }
636     return familyToBloomType;
637   }
638 
639   /**
640    * Test for {@link HFileOutputFormat#configureBlockSize(org.apache.hadoop.hbase.client.Table,
641    * Configuration)} and {@link HFileOutputFormat#createFamilyBlockSizeMap
642    * (Configuration)}.
643    * Tests that the compression map is correctly serialized into
644    * and deserialized from configuration
645    *
646    * @throws IOException
647    */
648   @Test
649   public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
650     for (int numCfs = 0; numCfs <= 3; numCfs++) {
651       Configuration conf = new Configuration(this.util.getConfiguration());
652       Map<String, Integer> familyToBlockSize =
653           getMockColumnFamiliesForBlockSize(numCfs);
654       Table table = Mockito.mock(HTable.class);
655       setupMockColumnFamiliesForBlockSize(table,
656           familyToBlockSize);
657       HFileOutputFormat.configureBlockSize(table, conf);
658 
659       // read back family specific data block encoding settings from the
660       // configuration
661       Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
662           HFileOutputFormat
663               .createFamilyBlockSizeMap(conf);
664 
665       // test that we have a value for all column families that matches with the
666       // used mock values
667       for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
668           ) {
669         assertEquals("BlockSize configuration incorrect for column family:"
670             + entry.getKey(), entry.getValue(),
671             retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes()));
672       }
673     }
674   }
675 
676   private void setupMockColumnFamiliesForBlockSize(Table table,
677       Map<String, Integer> familyToDataBlockEncoding) throws IOException {
678     HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
679     for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
680       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
681           .setMaxVersions(1)
682           .setBlocksize(entry.getValue())
683           .setBlockCacheEnabled(false)
684           .setTimeToLive(0));
685     }
686     Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
687   }
688 
689   /**
690    * @return a map from column family names to compression algorithms for
691    *         testing column family compression. Column family names have special characters
692    */
693   private Map<String, Integer>
694   getMockColumnFamiliesForBlockSize (int numCfs) {
695     Map<String, Integer> familyToBlockSize =
696         new HashMap<String, Integer>();
697     // use column family names having special characters
698     if (numCfs-- > 0) {
699       familyToBlockSize.put("Family1!@#!@#&", 1234);
700     }
701     if (numCfs-- > 0) {
702       familyToBlockSize.put("Family2=asdads&!AASD",
703           Integer.MAX_VALUE);
704     }
705     if (numCfs-- > 0) {
706       familyToBlockSize.put("Family2=asdads&!AASD",
707           Integer.MAX_VALUE);
708     }
709     if (numCfs-- > 0) {
710       familyToBlockSize.put("Family3", 0);
711     }
712     return familyToBlockSize;
713   }
714 
715     /**
716    * Test for {@link HFileOutputFormat#configureDataBlockEncoding(org.apache.hadoop.hbase.client.Table,
717    * Configuration)} and {@link HFileOutputFormat#createFamilyDataBlockEncodingMap
718    * (Configuration)}.
719    * Tests that the compression map is correctly serialized into
720    * and deserialized from configuration
721    *
722    * @throws IOException
723    */
724   @Test
725   public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
726     for (int numCfs = 0; numCfs <= 3; numCfs++) {
727       Configuration conf = new Configuration(this.util.getConfiguration());
728       Map<String, DataBlockEncoding> familyToDataBlockEncoding =
729           getMockColumnFamiliesForDataBlockEncoding(numCfs);
730       Table table = Mockito.mock(HTable.class);
731       setupMockColumnFamiliesForDataBlockEncoding(table,
732           familyToDataBlockEncoding);
733       HFileOutputFormat.configureDataBlockEncoding(table, conf);
734 
735       // read back family specific data block encoding settings from the
736       // configuration
737       Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
738           HFileOutputFormat
739           .createFamilyDataBlockEncodingMap(conf);
740 
741       // test that we have a value for all column families that matches with the
742       // used mock values
743       for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
744         assertEquals("DataBlockEncoding configuration incorrect for column family:"
745             + entry.getKey(), entry.getValue(),
746             retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes()));
747       }
748     }
749   }
750 
751   private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
752       Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
753     HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
754     for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
755       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
756           .setMaxVersions(1)
757           .setDataBlockEncoding(entry.getValue())
758           .setBlockCacheEnabled(false)
759           .setTimeToLive(0));
760     }
761     Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
762   }
763 
764   /**
765    * @return a map from column family names to compression algorithms for
766    *         testing column family compression. Column family names have special characters
767    */
768   private Map<String, DataBlockEncoding>
769       getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
770     Map<String, DataBlockEncoding> familyToDataBlockEncoding =
771         new HashMap<String, DataBlockEncoding>();
772     // use column family names having special characters
773     if (numCfs-- > 0) {
774       familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
775     }
776     if (numCfs-- > 0) {
777       familyToDataBlockEncoding.put("Family2=asdads&!AASD",
778           DataBlockEncoding.FAST_DIFF);
779     }
780     if (numCfs-- > 0) {
781       familyToDataBlockEncoding.put("Family2=asdads&!AASD",
782           DataBlockEncoding.PREFIX);
783     }
784     if (numCfs-- > 0) {
785       familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
786     }
787     return familyToDataBlockEncoding;
788   }
789 
790   private void setupMockStartKeys(RegionLocator regionLocator) throws IOException {
791     byte[][] mockKeys = new byte[][] {
792         HConstants.EMPTY_BYTE_ARRAY,
793         Bytes.toBytes("aaa"),
794         Bytes.toBytes("ggg"),
795         Bytes.toBytes("zzz")
796     };
797     Mockito.doReturn(mockKeys).when(regionLocator).getStartKeys();
798   }
799 
800   /**
801    * Test that {@link HFileOutputFormat} RecordWriter uses compression and
802    * bloom filter settings from the column family descriptor
803    */
804   @Test
805   public void testColumnFamilySettings() throws Exception {
806     Configuration conf = new Configuration(this.util.getConfiguration());
807     RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
808     TaskAttemptContext context = null;
809     Path dir = util.getDataTestDir("testColumnFamilySettings");
810 
811     // Setup table descriptor
812     HTable table = Mockito.mock(HTable.class);
813     RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
814     HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
815     Mockito.doReturn(htd).when(table).getTableDescriptor();
816     for (HColumnDescriptor hcd: this.util.generateColumnDescriptors()) {
817       htd.addFamily(hcd);
818     }
819 
820     // set up the table to return some mock keys
821     setupMockStartKeys(regionLocator);
822 
823     try {
824       // partial map red setup to get an operational writer for testing
825       // We turn off the sequence file compression, because DefaultCodec
826       // pollutes the GZip codec pool with an incompatible compressor.
827       conf.set("io.seqfile.compression.type", "NONE");
828       conf.set("hbase.fs.tmp.dir", dir.toString());
829       Job job = new Job(conf, "testLocalMRIncrementalLoad");
830       job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
831       setupRandomGeneratorMapper(job);
832       HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
833       FileOutputFormat.setOutputPath(job, dir);
834       context = createTestTaskAttemptContext(job);
835       HFileOutputFormat hof = new HFileOutputFormat();
836       writer = hof.getRecordWriter(context);
837 
838       // write out random rows
839       writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
840       writer.close(context);
841 
842       // Make sure that a directory was created for every CF
843       FileSystem fs = dir.getFileSystem(conf);
844 
845       // commit so that the filesystem has one directory per column family
846       hof.getOutputCommitter(context).commitTask(context);
847       hof.getOutputCommitter(context).commitJob(context);
848       FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
849       assertEquals(htd.getFamilies().size(), families.length);
850       for (FileStatus f : families) {
851         String familyStr = f.getPath().getName();
852         HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
853         // verify that the compression on this file matches the configured
854         // compression
855         Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
856         Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), conf);
857         Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
858 
859         byte[] bloomFilter = fileInfo.get(StoreFile.BLOOM_FILTER_TYPE_KEY);
860         if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
861         assertEquals("Incorrect bloom filter used for column family " + familyStr +
862           "(reader: " + reader + ")",
863           hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
864         assertEquals("Incorrect compression used for column family " + familyStr +
865           "(reader: " + reader + ")", hcd.getCompression(), reader.getFileContext().getCompression());
866       }
867     } finally {
868       dir.getFileSystem(conf).delete(dir, true);
869     }
870   }
871 
872   /**
873    * Write random values to the writer assuming a table created using
874    * {@link #FAMILIES} as column family descriptors
875    */
876   private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, KeyValue> writer,
877       TaskAttemptContext context, Set<byte[]> families, int numRows)
878       throws IOException, InterruptedException {
879     byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
880     int valLength = 10;
881     byte valBytes[] = new byte[valLength];
882 
883     int taskId = context.getTaskAttemptID().getTaskID().getId();
884     assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
885     final byte [] qualifier = Bytes.toBytes("data");
886     Random random = new Random();
887     for (int i = 0; i < numRows; i++) {
888 
889       Bytes.putInt(keyBytes, 0, i);
890       random.nextBytes(valBytes);
891       ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
892 
893       for (byte[] family : families) {
894         KeyValue kv = new KeyValue(keyBytes, family, qualifier, valBytes);
895         writer.write(key, kv);
896       }
897     }
898   }
899 
900   /**
901    * This test is to test the scenario happened in HBASE-6901.
902    * All files are bulk loaded and excluded from minor compaction.
903    * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException
904    * will be thrown.
905    */
906   @Ignore ("Flakey: See HBASE-9051") @Test
907   public void testExcludeAllFromMinorCompaction() throws Exception {
908     Configuration conf = util.getConfiguration();
909     conf.setInt("hbase.hstore.compaction.min", 2);
910     generateRandomStartKeys(5);
911 
912     try {
913       util.setJobWithoutMRCluster();
914       util.startMiniCluster();
915       final FileSystem fs = util.getDFSCluster().getFileSystem();
916       HBaseAdmin admin = new HBaseAdmin(conf);
917       HTable table = util.createTable(TABLE_NAME, FAMILIES);
918       assertEquals("Should start with empty table", 0, util.countRows(table));
919 
920       // deep inspection: get the StoreFile dir
921       final Path storePath = HStore.getStoreHomedir(
922           FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
923           admin.getTableRegions(TABLE_NAME).get(0),
924           FAMILIES[0]);
925       assertEquals(0, fs.listStatus(storePath).length);
926 
927       // Generate two bulk load files
928       conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
929           true);
930 
931       for (int i = 0; i < 2; i++) {
932         Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
933         runIncrementalPELoad(conf, table, testDir);
934         // Perform the actual load
935         new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
936       }
937 
938       // Ensure data shows up
939       int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
940       assertEquals("LoadIncrementalHFiles should put expected data in table",
941           expectedRows, util.countRows(table));
942 
943       // should have a second StoreFile now
944       assertEquals(2, fs.listStatus(storePath).length);
945 
946       // minor compactions shouldn't get rid of the file
947       admin.compact(TABLE_NAME.getName());
948       try {
949         quickPoll(new Callable<Boolean>() {
950           public Boolean call() throws Exception {
951             return fs.listStatus(storePath).length == 1;
952           }
953         }, 5000);
954         throw new IOException("SF# = " + fs.listStatus(storePath).length);
955       } catch (AssertionError ae) {
956         // this is expected behavior
957       }
958 
959       // a major compaction should work though
960       admin.majorCompact(TABLE_NAME.getName());
961       quickPoll(new Callable<Boolean>() {
962         public Boolean call() throws Exception {
963           return fs.listStatus(storePath).length == 1;
964         }
965       }, 5000);
966 
967     } finally {
968       util.shutdownMiniCluster();
969     }
970   }
971 
972   @Test
973   public void testExcludeMinorCompaction() throws Exception {
974     Configuration conf = util.getConfiguration();
975     conf.setInt("hbase.hstore.compaction.min", 2);
976     generateRandomStartKeys(5);
977 
978     try {
979       util.setJobWithoutMRCluster();
980       util.startMiniCluster();
981       Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
982       final FileSystem fs = util.getTestFileSystem();
983       HBaseAdmin admin = new HBaseAdmin(conf);
984       HTable table = util.createTable(TABLE_NAME, FAMILIES);
985       assertEquals("Should start with empty table", 0, util.countRows(table));
986 
987       // deep inspection: get the StoreFile dir
988       final Path storePath = HStore.getStoreHomedir(
989           FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
990           admin.getTableRegions(TABLE_NAME).get(0),
991           FAMILIES[0]);
992       assertEquals(0, fs.listStatus(storePath).length);
993 
994       // put some data in it and flush to create a storefile
995       Put p = new Put(Bytes.toBytes("test"));
996       p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
997       table.put(p);
998       admin.flush(TABLE_NAME.getName());
999       assertEquals(1, util.countRows(table));
1000       quickPoll(new Callable<Boolean>() {
1001         public Boolean call() throws Exception {
1002           return fs.listStatus(storePath).length == 1;
1003         }
1004       }, 5000);
1005 
1006       // Generate a bulk load file with more rows
1007       conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1008           true);
1009       runIncrementalPELoad(conf, table, testDir);
1010 
1011       // Perform the actual load
1012       new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
1013 
1014       // Ensure data shows up
1015       int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1016       assertEquals("LoadIncrementalHFiles should put expected data in table",
1017           expectedRows + 1, util.countRows(table));
1018 
1019       // should have a second StoreFile now
1020       assertEquals(2, fs.listStatus(storePath).length);
1021 
1022       // minor compactions shouldn't get rid of the file
1023       admin.compact(TABLE_NAME.getName());
1024       try {
1025         quickPoll(new Callable<Boolean>() {
1026           public Boolean call() throws Exception {
1027             return fs.listStatus(storePath).length == 1;
1028           }
1029         }, 5000);
1030         throw new IOException("SF# = " + fs.listStatus(storePath).length);
1031       } catch (AssertionError ae) {
1032         // this is expected behavior
1033       }
1034 
1035       // a major compaction should work though
1036       admin.majorCompact(TABLE_NAME.getName());
1037       quickPoll(new Callable<Boolean>() {
1038         public Boolean call() throws Exception {
1039           return fs.listStatus(storePath).length == 1;
1040         }
1041       }, 5000);
1042 
1043     } finally {
1044       util.shutdownMiniCluster();
1045     }
1046   }
1047 
1048   private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1049     int sleepMs = 10;
1050     int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1051     while (retries-- > 0) {
1052       if (c.call().booleanValue()) {
1053         return;
1054       }
1055       Thread.sleep(sleepMs);
1056     }
1057     fail();
1058   }
1059 
1060   public static void main(String args[]) throws Exception {
1061     new TestHFileOutputFormat().manualTest(args);
1062   }
1063 
1064   public void manualTest(String args[]) throws Exception {
1065     Configuration conf = HBaseConfiguration.create();
1066     util = new HBaseTestingUtility(conf);
1067     if ("newtable".equals(args[0])) {
1068       TableName tname = TableName.valueOf(args[1]);
1069       byte[][] splitKeys = generateRandomSplitKeys(4);
1070       HTable table = util.createTable(tname, FAMILIES, splitKeys);
1071     } else if ("incremental".equals(args[0])) {
1072       TableName tname = TableName.valueOf(args[1]);
1073       HTable table = new HTable(conf, tname);
1074       Path outDir = new Path("incremental-out");
1075       runIncrementalPELoad(conf, table, outDir);
1076     } else {
1077       throw new RuntimeException(
1078           "usage: TestHFileOutputFormat newtable | incremental");
1079     }
1080   }
1081 
1082 }
1083