View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
22  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
23  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertNotNull;
26  import static org.junit.Assert.assertNull;
27  import static org.junit.Assert.assertTrue;
28  
29  import java.io.IOException;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.Map.Entry;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.CellUtil;
42  import org.apache.hadoop.hbase.HBaseTestCase;
43  import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
44  import org.apache.hadoop.hbase.HBaseTestingUtility;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.HTableDescriptor;
47  import org.apache.hadoop.hbase.MediumTests;
48  import org.apache.hadoop.hbase.client.Delete;
49  import org.apache.hadoop.hbase.client.Get;
50  import org.apache.hadoop.hbase.client.Result;
51  import org.apache.hadoop.hbase.client.Scan;
52  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
53  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
54  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
55  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
56  import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
57  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
58  import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
59  import org.apache.hadoop.hbase.regionserver.wal.HLog;
60  import org.apache.hadoop.hbase.util.Bytes;
61  import org.junit.After;
62  import org.junit.Before;
63  import org.junit.Rule;
64  import org.junit.Test;
65  import org.junit.experimental.categories.Category;
66  import org.junit.rules.TestName;
67  
68  
69  /**
70   * Test major compactions
71   */
72  @Category(MediumTests.class)
73  public class TestMajorCompaction {
74    @Rule public TestName name = new TestName();
75    static final Log LOG = LogFactory.getLog(TestMajorCompaction.class.getName());
76    private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
77    protected Configuration conf = UTIL.getConfiguration();
78    
79    private HRegion r = null;
80    private HTableDescriptor htd = null;
81    private static final byte [] COLUMN_FAMILY = fam1;
82    private final byte [] STARTROW = Bytes.toBytes(START_KEY);
83    private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
84    private int compactionThreshold;
85    private byte[] secondRowBytes, thirdRowBytes;
86    private static final long MAX_FILES_TO_COMPACT = 10;
87  
88    /** constructor */
89    public TestMajorCompaction() {
90      super();
91  
92      // Set cache flush size to 1MB
93      conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024);
94      conf.setInt("hbase.hregion.memstore.block.multiplier", 100);
95      compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
96  
97      secondRowBytes = START_KEY_BYTES.clone();
98      // Increment the least significant character so we get to next row.
99      secondRowBytes[START_KEY_BYTES.length - 1]++;
100     thirdRowBytes = START_KEY_BYTES.clone();
101     thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
102   }
103 
104   @Before
105   public void setUp() throws Exception {
106     this.htd = UTIL.createTableDescriptor(name.getMethodName());
107     this.r = UTIL.createLocalHRegion(htd, null, null);
108   }
109 
110   @After
111   public void tearDown() throws Exception {
112     HLog hlog = r.getLog();
113     this.r.close();
114     hlog.closeAndDelete();
115   }
116 
117   /**
118    * Test that on a major compaction, if all cells are expired or deleted, then
119    * we'll end up with no product.  Make sure scanner over region returns
120    * right answer in this case - and that it just basically works.
121    * @throws IOException
122    */
123   @Test
124   public void testMajorCompactingToNoOutput() throws IOException {
125     createStoreFile(r);
126     for (int i = 0; i < compactionThreshold; i++) {
127       createStoreFile(r);
128     }
129     // Now delete everything.
130     InternalScanner s = r.getScanner(new Scan());
131     do {
132       List<Cell> results = new ArrayList<Cell>();
133       boolean result = s.next(results);
134       r.delete(new Delete(CellUtil.cloneRow(results.get(0))));
135       if (!result) break;
136     } while(true);
137     s.close();
138     // Flush
139     r.flushcache();
140     // Major compact.
141     r.compactStores(true);
142     s = r.getScanner(new Scan());
143     int counter = 0;
144     do {
145       List<Cell> results = new ArrayList<Cell>();
146       boolean result = s.next(results);
147       if (!result) break;
148       counter++;
149     } while(true);
150     assertEquals(0, counter);
151   }
152 
153   /**
154    * Run compaction and flushing memstore
155    * Assert deletes get cleaned up.
156    * @throws Exception
157    */
158   @Test
159   public void testMajorCompaction() throws Exception {
160     majorCompaction();
161   }
162 
163   @Test
164   public void testDataBlockEncodingInCacheOnly() throws Exception {
165     majorCompactionWithDataBlockEncoding(true);
166   }
167 
168   @Test
169   public void testDataBlockEncodingEverywhere() throws Exception {
170     majorCompactionWithDataBlockEncoding(false);
171   }
172 
173   public void majorCompactionWithDataBlockEncoding(boolean inCacheOnly)
174       throws Exception {
175     Map<HStore, HFileDataBlockEncoder> replaceBlockCache =
176         new HashMap<HStore, HFileDataBlockEncoder>();
177     for (Entry<byte[], Store> pair : r.getStores().entrySet()) {
178       HStore store = (HStore) pair.getValue();
179       HFileDataBlockEncoder blockEncoder = store.getDataBlockEncoder();
180       replaceBlockCache.put(store, blockEncoder);
181       final DataBlockEncoding inCache = DataBlockEncoding.PREFIX;
182       final DataBlockEncoding onDisk = inCacheOnly ? DataBlockEncoding.NONE :
183           inCache;
184       store.setDataBlockEncoderInTest(new HFileDataBlockEncoderImpl(onDisk));
185     }
186 
187     majorCompaction();
188 
189     // restore settings
190     for (Entry<HStore, HFileDataBlockEncoder> entry :
191         replaceBlockCache.entrySet()) {
192       entry.getKey().setDataBlockEncoderInTest(entry.getValue());
193     }
194   }
195 
196   private void majorCompaction() throws Exception {
197     createStoreFile(r);
198     for (int i = 0; i < compactionThreshold; i++) {
199       createStoreFile(r);
200     }
201     // Add more content.
202     HBaseTestCase.addContent(new HRegionIncommon(r), Bytes.toString(COLUMN_FAMILY));
203 
204     // Now there are about 5 versions of each column.
205     // Default is that there only 3 (MAXVERSIONS) versions allowed per column.
206     //
207     // Assert == 3 when we ask for versions.
208     Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
209     assertEquals(compactionThreshold, result.size());
210 
211     // see if CompactionProgress is in place but null
212     for (Store store : this.r.stores.values()) {
213       assertNull(store.getCompactionProgress());
214     }
215 
216     r.flushcache();
217     r.compactStores(true);
218 
219     // see if CompactionProgress has done its thing on at least one store
220     int storeCount = 0;
221     for (Store store : this.r.stores.values()) {
222       CompactionProgress progress = store.getCompactionProgress();
223       if( progress != null ) {
224         ++storeCount;
225         assertTrue(progress.currentCompactedKVs > 0);
226         assertTrue(progress.totalCompactingKVs > 0);
227       }
228       assertTrue(storeCount > 0);
229     }
230 
231     // look at the second row
232     // Increment the least significant character so we get to next row.
233     byte [] secondRowBytes = START_KEY_BYTES.clone();
234     secondRowBytes[START_KEY_BYTES.length - 1]++;
235 
236     // Always 3 versions if that is what max versions is.
237     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).
238         setMaxVersions(100));
239     LOG.debug("Row " + Bytes.toStringBinary(secondRowBytes) + " after " +
240         "initial compaction: " + result);
241     assertEquals("Invalid number of versions of row "
242         + Bytes.toStringBinary(secondRowBytes) + ".", compactionThreshold,
243         result.size());
244 
245     // Now add deletes to memstore and then flush it.
246     // That will put us over
247     // the compaction threshold of 3 store files.  Compacting these store files
248     // should result in a compacted store file that has no references to the
249     // deleted row.
250     LOG.debug("Adding deletes to memstore and flushing");
251     Delete delete = new Delete(secondRowBytes, System.currentTimeMillis());
252     byte [][] famAndQf = {COLUMN_FAMILY, null};
253     delete.deleteFamily(famAndQf[0]);
254     r.delete(delete);
255 
256     // Assert deleted.
257     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
258     assertTrue("Second row should have been deleted", result.isEmpty());
259 
260     r.flushcache();
261 
262     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
263     assertTrue("Second row should have been deleted", result.isEmpty());
264 
265     // Add a bit of data and flush.  Start adding at 'bbb'.
266     createSmallerStoreFile(this.r);
267     r.flushcache();
268     // Assert that the second row is still deleted.
269     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
270     assertTrue("Second row should still be deleted", result.isEmpty());
271 
272     // Force major compaction.
273     r.compactStores(true);
274     assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1);
275 
276     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
277     assertTrue("Second row should still be deleted", result.isEmpty());
278 
279     // Make sure the store files do have some 'aaa' keys in them -- exactly 3.
280     // Also, that compacted store files do not have any secondRowBytes because
281     // they were deleted.
282     verifyCounts(3,0);
283 
284     // Multiple versions allowed for an entry, so the delete isn't enough
285     // Lower TTL and expire to ensure that all our entries have been wiped
286     final int ttl = 1000;
287     for (Store hstore : this.r.stores.values()) {
288       HStore store = ((HStore) hstore);
289       ScanInfo old = store.getScanInfo();
290       ScanInfo si = new ScanInfo(old.getFamily(),
291           old.getMinVersions(), old.getMaxVersions(), ttl,
292           old.getKeepDeletedCells(), 0, old.getComparator());
293       store.setScanInfo(si);
294     }
295     Thread.sleep(1000);
296 
297     r.compactStores(true);
298     int count = count();
299     assertEquals("Should not see anything after TTL has expired", 0, count);
300   }
301 
302   @Test
303   public void testTimeBasedMajorCompaction() throws Exception {
304     // create 2 storefiles and force a major compaction to reset the time
305     int delay = 10 * 1000; // 10 sec
306     float jitterPct = 0.20f; // 20%
307     conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, delay);
308     conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
309 
310     HStore s = ((HStore) r.getStore(COLUMN_FAMILY));
311     s.storeEngine.getCompactionPolicy().setConf(conf);
312     try {
313       createStoreFile(r);
314       createStoreFile(r);
315       r.compactStores(true);
316 
317       // add one more file & verify that a regular compaction won't work
318       createStoreFile(r);
319       r.compactStores(false);
320       assertEquals(2, s.getStorefilesCount());
321 
322       // ensure that major compaction time is deterministic
323       RatioBasedCompactionPolicy
324           c = (RatioBasedCompactionPolicy)s.storeEngine.getCompactionPolicy();
325       Collection<StoreFile> storeFiles = s.getStorefiles();
326       long mcTime = c.getNextMajorCompactTime(storeFiles);
327       for (int i = 0; i < 10; ++i) {
328         assertEquals(mcTime, c.getNextMajorCompactTime(storeFiles));
329       }
330 
331       // ensure that the major compaction time is within the variance
332       long jitter = Math.round(delay * jitterPct);
333       assertTrue(delay - jitter <= mcTime && mcTime <= delay + jitter);
334 
335       // wait until the time-based compaction interval
336       Thread.sleep(mcTime);
337 
338       // trigger a compaction request and ensure that it's upgraded to major
339       r.compactStores(false);
340       assertEquals(1, s.getStorefilesCount());
341     } finally {
342       // reset the timed compaction settings
343       conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
344       conf.setFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
345       // run a major to reset the cache
346       createStoreFile(r);
347       r.compactStores(true);
348       assertEquals(1, s.getStorefilesCount());
349     }
350   }
351 
352   private void verifyCounts(int countRow1, int countRow2) throws Exception {
353     int count1 = 0;
354     int count2 = 0;
355     for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
356       HFileScanner scanner = f.getReader().getScanner(false, false);
357       scanner.seekTo();
358       do {
359         byte [] row = scanner.getKeyValue().getRow();
360         if (Bytes.equals(row, STARTROW)) {
361           count1++;
362         } else if(Bytes.equals(row, secondRowBytes)) {
363           count2++;
364         }
365       } while(scanner.next());
366     }
367     assertEquals(countRow1,count1);
368     assertEquals(countRow2,count2);
369   }
370 
371 
372   private int count() throws IOException {
373     int count = 0;
374     for (StoreFile f: this.r.stores.
375         get(COLUMN_FAMILY_TEXT).getStorefiles()) {
376       HFileScanner scanner = f.getReader().getScanner(false, false);
377       if (!scanner.seekTo()) {
378         continue;
379       }
380       do {
381         count++;
382       } while(scanner.next());
383     }
384     return count;
385   }
386 
387   private void createStoreFile(final HRegion region) throws IOException {
388     createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
389   }
390 
391   private void createStoreFile(final HRegion region, String family) throws IOException {
392     HRegionIncommon loader = new HRegionIncommon(region);
393     HBaseTestCase.addContent(loader, family);
394     loader.flushcache();
395   }
396 
397   private void createSmallerStoreFile(final HRegion region) throws IOException {
398     HRegionIncommon loader = new HRegionIncommon(region);
399     HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY), ("" +
400     		"bbb").getBytes(), null);
401     loader.flushcache();
402   }
403 
404   /**
405    * Test for HBASE-5920 - Test user requested major compactions always occurring
406    */
407   @Test
408   public void testNonUserMajorCompactionRequest() throws Exception {
409     Store store = r.getStore(COLUMN_FAMILY);
410     createStoreFile(r);
411     for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
412       createStoreFile(r);
413     }
414     store.triggerMajorCompaction();
415 
416     CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null).getRequest();
417     assertNotNull("Expected to receive a compaction request", request);
418     assertEquals(
419       "System-requested major compaction should not occur if there are too many store files",
420       false,
421       request.isMajor());
422   }
423 
424   /**
425    * Test for HBASE-5920
426    */
427   @Test
428   public void testUserMajorCompactionRequest() throws IOException{
429     Store store = r.getStore(COLUMN_FAMILY);
430     createStoreFile(r);
431     for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
432       createStoreFile(r);
433     }
434     store.triggerMajorCompaction();
435     CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null).getRequest();
436     assertNotNull("Expected to receive a compaction request", request);
437     assertEquals(
438       "User-requested major compaction should always occur, even if there are too many store files",
439       true, 
440       request.isMajor());
441   }
442 
443   /**
444    * Test that on a major compaction, if all cells are expired or deleted, then we'll end up with no
445    * product. Make sure scanner over region returns right answer in this case - and that it just
446    * basically works.
447    * @throws IOException
448    */
449   public void testMajorCompactingToNoOutputWithReverseScan() throws IOException {
450     createStoreFile(r);
451     for (int i = 0; i < compactionThreshold; i++) {
452       createStoreFile(r);
453     }
454     // Now delete everything.
455     Scan scan = new Scan();
456     scan.setReversed(true);
457     InternalScanner s = r.getScanner(scan);
458     do {
459       List<Cell> results = new ArrayList<Cell>();
460       boolean result = s.next(results);
461       assertTrue(!results.isEmpty());
462       r.delete(new Delete(results.get(0).getRow()));
463       if (!result) break;
464     } while (true);
465     s.close();
466     // Flush
467     r.flushcache();
468     // Major compact.
469     r.compactStores(true);
470     scan = new Scan();
471     scan.setReversed(true);
472     s = r.getScanner(scan);
473     int counter = 0;
474     do {
475       List<Cell> results = new ArrayList<Cell>();
476       boolean result = s.next(results);
477       if (!result) break;
478       counter++;
479     } while (true);
480     s.close();
481     assertEquals(0, counter);
482   }
483 }