View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
22  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
23  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertTrue;
26  import static org.junit.Assert.fail;
27  import static org.mockito.Matchers.any;
28  import static org.mockito.Mockito.doAnswer;
29  import static org.mockito.Mockito.mock;
30  import static org.mockito.Mockito.spy;
31  import static org.mockito.Mockito.when;
32  
33  import java.io.IOException;
34  import java.util.ArrayList;
35  import java.util.Collection;
36  import java.util.Collections;
37  import java.util.List;
38  import java.util.concurrent.CountDownLatch;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FSDataOutputStream;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.hbase.HBaseConfiguration;
48  import org.apache.hadoop.hbase.HBaseTestCase;
49  import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
50  import org.apache.hadoop.hbase.HBaseTestingUtility;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.HTableDescriptor;
53  import org.apache.hadoop.hbase.MediumTests;
54  import org.apache.hadoop.hbase.client.Delete;
55  import org.apache.hadoop.hbase.client.Durability;
56  import org.apache.hadoop.hbase.client.Put;
57  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
58  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
59  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
60  import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
61  import org.apache.hadoop.hbase.regionserver.wal.HLog;
62  import org.apache.hadoop.hbase.util.Bytes;
63  import org.apache.hadoop.hbase.util.Pair;
64  import org.apache.hadoop.hbase.util.Threads;
65  import org.junit.After;
66  import org.junit.Assume;
67  import org.junit.Before;
68  import org.junit.Rule;
69  import org.junit.Test;
70  import org.junit.experimental.categories.Category;
71  import org.junit.rules.TestName;
72  import org.mockito.Mockito;
73  import org.mockito.invocation.InvocationOnMock;
74  import org.mockito.stubbing.Answer;
75  
76  
77  /**
78   * Test compaction framework and common functions
79   */
80  @Category(MediumTests.class)
81  public class TestCompaction {
82    @Rule public TestName name = new TestName();
83    static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
84    private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
85    protected Configuration conf = UTIL.getConfiguration();
86    
87    private HRegion r = null;
88    private HTableDescriptor htd = null;
89    private static final byte [] COLUMN_FAMILY = fam1;
90    private final byte [] STARTROW = Bytes.toBytes(START_KEY);
91    private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
92    private int compactionThreshold;
93    private byte[] secondRowBytes, thirdRowBytes;
94    private static final long MAX_FILES_TO_COMPACT = 10;
95  
96    /** constructor */
97    public TestCompaction() {
98      super();
99  
100     // Set cache flush size to 1MB
101     conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024);
102     conf.setInt("hbase.hregion.memstore.block.multiplier", 100);
103     compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
104 
105     secondRowBytes = START_KEY_BYTES.clone();
106     // Increment the least significant character so we get to next row.
107     secondRowBytes[START_KEY_BYTES.length - 1]++;
108     thirdRowBytes = START_KEY_BYTES.clone();
109     thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
110   }
111 
112   @Before
113   public void setUp() throws Exception {
114     this.htd = UTIL.createTableDescriptor(name.getMethodName());
115     this.r = UTIL.createLocalHRegion(htd, null, null);
116   }
117 
118   @After
119   public void tearDown() throws Exception {
120     HLog hlog = r.getLog();
121     this.r.close();
122     hlog.closeAndDelete();
123   }
124 
125   /**
126    * Verify that you can stop a long-running compaction
127    * (used during RS shutdown)
128    * @throws Exception
129    */
130   @Test
131   public void testInterruptCompaction() throws Exception {
132     assertEquals(0, count());
133 
134     // lower the polling interval for this test
135     int origWI = HStore.closeCheckInterval;
136     HStore.closeCheckInterval = 10*1000; // 10 KB
137 
138     try {
139       // Create a couple store files w/ 15KB (over 10KB interval)
140       int jmax = (int) Math.ceil(15.0/compactionThreshold);
141       byte [] pad = new byte[1000]; // 1 KB chunk
142       for (int i = 0; i < compactionThreshold; i++) {
143         HRegionIncommon loader = new HRegionIncommon(r);
144         Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
145         p.setDurability(Durability.SKIP_WAL);
146         for (int j = 0; j < jmax; j++) {
147           p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad);
148         }
149         HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
150         loader.put(p);
151         loader.flushcache();
152       }
153 
154       HRegion spyR = spy(r);
155       doAnswer(new Answer() {
156         public Object answer(InvocationOnMock invocation) throws Throwable {
157           r.writestate.writesEnabled = false;
158           return invocation.callRealMethod();
159         }
160       }).when(spyR).doRegionCompactionPrep();
161 
162       // force a minor compaction, but not before requesting a stop
163       spyR.compactStores();
164 
165       // ensure that the compaction stopped, all old files are intact,
166       Store s = r.stores.get(COLUMN_FAMILY);
167       assertEquals(compactionThreshold, s.getStorefilesCount());
168       assertTrue(s.getStorefilesSize() > 15*1000);
169       // and no new store files persisted past compactStores()
170       FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
171       assertEquals(0, ls.length);
172 
173     } finally {
174       // don't mess up future tests
175       r.writestate.writesEnabled = true;
176       HStore.closeCheckInterval = origWI;
177 
178       // Delete all Store information once done using
179       for (int i = 0; i < compactionThreshold; i++) {
180         Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
181         byte [][] famAndQf = {COLUMN_FAMILY, null};
182         delete.deleteFamily(famAndQf[0]);
183         r.delete(delete);
184       }
185       r.flushcache();
186 
187       // Multiple versions allowed for an entry, so the delete isn't enough
188       // Lower TTL and expire to ensure that all our entries have been wiped
189       final int ttl = 1000;
190       for (Store hstore: this.r.stores.values()) {
191         HStore store = (HStore)hstore;
192         ScanInfo old = store.getScanInfo();
193         ScanInfo si = new ScanInfo(old.getFamily(),
194             old.getMinVersions(), old.getMaxVersions(), ttl,
195             old.getKeepDeletedCells(), 0, old.getComparator());
196         store.setScanInfo(si);
197       }
198       Thread.sleep(ttl);
199 
200       r.compactStores(true);
201       assertEquals(0, count());
202     }
203   }
204 
205   private int count() throws IOException {
206     int count = 0;
207     for (StoreFile f: this.r.stores.
208         get(COLUMN_FAMILY_TEXT).getStorefiles()) {
209       HFileScanner scanner = f.getReader().getScanner(false, false);
210       if (!scanner.seekTo()) {
211         continue;
212       }
213       do {
214         count++;
215       } while(scanner.next());
216     }
217     return count;
218   }
219 
220   private void createStoreFile(final HRegion region) throws IOException {
221     createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
222   }
223 
224   private void createStoreFile(final HRegion region, String family) throws IOException {
225     HRegionIncommon loader = new HRegionIncommon(region);
226     HBaseTestCase.addContent(loader, family);
227     loader.flushcache();
228   }
229 
230   @Test
231   public void testCompactionWithCorruptResult() throws Exception {
232     int nfiles = 10;
233     for (int i = 0; i < nfiles; i++) {
234       createStoreFile(r);
235     }
236     HStore store = (HStore) r.getStore(COLUMN_FAMILY);
237 
238     Collection<StoreFile> storeFiles = store.getStorefiles();
239     DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
240     tool.compactForTesting(storeFiles, false);
241 
242     // Now lets corrupt the compacted file.
243     FileSystem fs = store.getFileSystem();
244     // default compaction policy created one and only one new compacted file
245     Path dstPath = store.getRegionFileSystem().createTempName();
246     FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, (long)1024, null);
247     stream.writeChars("CORRUPT FILE!!!!");
248     stream.close();
249     Path origPath = store.getRegionFileSystem().commitStoreFile(
250       Bytes.toString(COLUMN_FAMILY), dstPath);
251 
252     try {
253       ((HStore)store).moveFileIntoPlace(origPath);
254     } catch (Exception e) {
255       // The complete compaction should fail and the corrupt file should remain
256       // in the 'tmp' directory;
257       assert (fs.exists(origPath));
258       assert (!fs.exists(dstPath));
259       System.out.println("testCompactionWithCorruptResult Passed");
260       return;
261     }
262     fail("testCompactionWithCorruptResult failed since no exception was" +
263         "thrown while completing a corrupt file");
264   }
265 
266   /**
267    * Create a custom compaction request and be sure that we can track it through the queue, knowing
268    * when the compaction is completed.
269    */
270   @Test
271   public void testTrackingCompactionRequest() throws Exception {
272     // setup a compact/split thread on a mock server
273     HRegionServer mockServer = Mockito.mock(HRegionServer.class);
274     Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
275     CompactSplitThread thread = new CompactSplitThread(mockServer);
276     Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
277 
278     // setup a region/store with some files
279     Store store = r.getStore(COLUMN_FAMILY);
280     createStoreFile(r);
281     for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
282       createStoreFile(r);
283     }
284 
285     CountDownLatch latch = new CountDownLatch(1);
286     TrackableCompactionRequest request = new TrackableCompactionRequest(latch);
287     thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request);
288     // wait for the latch to complete.
289     latch.await();
290 
291     thread.interruptIfNecessary();
292   }
293 
294   /**
295    * HBASE-7947: Regression test to ensure adding to the correct list in the
296    * {@link CompactSplitThread}
297    * @throws Exception on failure
298    */
299   @Test
300   public void testMultipleCustomCompactionRequests() throws Exception {
301     // setup a compact/split thread on a mock server
302     HRegionServer mockServer = Mockito.mock(HRegionServer.class);
303     Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
304     CompactSplitThread thread = new CompactSplitThread(mockServer);
305     Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
306 
307     // setup a region/store with some files
308     int numStores = r.getStores().size();
309     List<Pair<CompactionRequest, Store>> requests =
310         new ArrayList<Pair<CompactionRequest, Store>>(numStores);
311     CountDownLatch latch = new CountDownLatch(numStores);
312     // create some store files and setup requests for each store on which we want to do a
313     // compaction
314     for (Store store : r.getStores().values()) {
315       createStoreFile(r, store.getColumnFamilyName());
316       createStoreFile(r, store.getColumnFamilyName());
317       createStoreFile(r, store.getColumnFamilyName());
318       requests
319           .add(new Pair<CompactionRequest, Store>(new TrackableCompactionRequest(latch), store));
320     }
321 
322     thread.requestCompaction(r, "test mulitple custom comapctions", Store.PRIORITY_USER,
323       Collections.unmodifiableList(requests));
324 
325     // wait for the latch to complete.
326     latch.await();
327 
328     thread.interruptIfNecessary();
329   }
330 
331   private class StoreMockMaker extends StatefulStoreMockMaker {
332     public ArrayList<StoreFile> compacting = new ArrayList<StoreFile>();
333     public ArrayList<StoreFile> notCompacting = new ArrayList<StoreFile>();
334     private ArrayList<Integer> results;
335 
336     public StoreMockMaker(ArrayList<Integer> results) {
337       this.results = results;
338     }
339 
340     public class TestCompactionContext extends CompactionContext {
341       private List<StoreFile> selectedFiles;
342       public TestCompactionContext(List<StoreFile> selectedFiles) {
343         super();
344         this.selectedFiles = selectedFiles;
345       }
346 
347       @Override
348       public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
349         return new ArrayList<StoreFile>();
350       }
351 
352       @Override
353       public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
354           boolean mayUseOffPeak, boolean forceMajor) throws IOException {
355         this.request = new CompactionRequest(selectedFiles);
356         this.request.setPriority(getPriority());
357         return true;
358       }
359 
360       @Override
361       public List<Path> compact() throws IOException {
362         finishCompaction(this.selectedFiles);
363         return new ArrayList<Path>();
364       }
365     }
366 
367     @Override
368     public synchronized CompactionContext selectCompaction() {
369       CompactionContext ctx = new TestCompactionContext(new ArrayList<StoreFile>(notCompacting));
370       compacting.addAll(notCompacting);
371       notCompacting.clear();
372       try {
373         ctx.select(null, false, false, false);
374       } catch (IOException ex) {
375         fail("Shouldn't happen");
376       }
377       return ctx;
378     }
379 
380     @Override
381     public synchronized void cancelCompaction(Object object) {
382       TestCompactionContext ctx = (TestCompactionContext)object;
383       compacting.removeAll(ctx.selectedFiles);
384       notCompacting.addAll(ctx.selectedFiles);
385     }
386 
387     public synchronized void finishCompaction(List<StoreFile> sfs) {
388       if (sfs.isEmpty()) return;
389       synchronized (results) {
390         results.add(sfs.size());
391       }
392       compacting.removeAll(sfs);
393     }
394 
395     @Override
396     public int getPriority() {
397       return 7 - compacting.size() - notCompacting.size();
398     }
399   }
400 
401   public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
402     BlockingCompactionContext blocked = null;
403 
404     public class BlockingCompactionContext extends CompactionContext {
405       public volatile boolean isInCompact = false;
406 
407       public void unblock() {
408         synchronized (this) { this.notifyAll(); }
409       }
410 
411       @Override
412       public List<Path> compact() throws IOException {
413         try {
414           isInCompact = true;
415           synchronized (this) { this.wait(); }
416         } catch (InterruptedException e) {
417            Assume.assumeNoException(e);
418         }
419         return new ArrayList<Path>();
420       }
421 
422       @Override
423       public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
424         return new ArrayList<StoreFile>();
425       }
426 
427       @Override
428       public boolean select(List<StoreFile> f, boolean i, boolean m, boolean e)
429           throws IOException {
430         this.request = new CompactionRequest(new ArrayList<StoreFile>());
431         return true;
432       }
433     }
434 
435     @Override
436     public CompactionContext selectCompaction() {
437       this.blocked = new BlockingCompactionContext();
438       try {
439         this.blocked.select(null, false, false, false);
440       } catch (IOException ex) {
441         fail("Shouldn't happen");
442       }
443       return this.blocked;
444     }
445 
446     @Override
447     public void cancelCompaction(Object object) {}
448 
449     public int getPriority() {
450       return Integer.MIN_VALUE; // some invalid value, see createStoreMock
451     }
452 
453     public BlockingCompactionContext waitForBlocking() {
454       while (this.blocked == null || !this.blocked.isInCompact) {
455         Threads.sleepWithoutInterrupt(50);
456       }
457       BlockingCompactionContext ctx = this.blocked;
458       this.blocked = null;
459       return ctx;
460     }
461 
462     @Override
463     public Store createStoreMock(String name) throws Exception {
464       return createStoreMock(Integer.MIN_VALUE, name);
465     }
466 
467     public Store createStoreMock(int priority, String name) throws Exception {
468       // Override the mock to always return the specified priority.
469       Store s = super.createStoreMock(name);
470       when(s.getCompactPriority()).thenReturn(priority);
471       return s;
472     }
473   }
474 
475   /** Test compaction priority management and multiple compactions per store (HBASE-8665). */
476   @Test
477   public void testCompactionQueuePriorities() throws Exception {
478     // Setup a compact/split thread on a mock server.
479     final Configuration conf = HBaseConfiguration.create();
480     HRegionServer mockServer = mock(HRegionServer.class);
481     when(mockServer.isStopped()).thenReturn(false);
482     when(mockServer.getConfiguration()).thenReturn(conf);
483     CompactSplitThread cst = new CompactSplitThread(mockServer);
484     when(mockServer.getCompactSplitThread()).thenReturn(cst);
485 
486     // Set up the region mock that redirects compactions.
487     HRegion r = mock(HRegion.class);
488     when(r.compact(any(CompactionContext.class), any(Store.class))).then(new Answer<Boolean>() {
489       public Boolean answer(InvocationOnMock invocation) throws Throwable {
490         ((CompactionContext)invocation.getArguments()[0]).compact();
491         return true;
492       }
493     });
494 
495     // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
496     ArrayList<Integer> results = new ArrayList<Integer>();
497     StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
498     Store store = sm.createStoreMock("store1"), store2 = sm2.createStoreMock("store2");
499     BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
500 
501     // First, block the compaction thread so that we could muck with queue.
502     cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
503     BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
504 
505     // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
506     for (int i = 0; i < 4; ++i) {
507       sm.notCompacting.add(createFile());
508     }
509     cst.requestSystemCompaction(r, store, "s1-pri3");
510     for (int i = 0; i < 3; ++i) {
511       sm2.notCompacting.add(createFile());
512     }
513     cst.requestSystemCompaction(r, store2, "s2-pri4");
514     // Now add 2 more files to store1 and queue compaction - pri 1.
515     for (int i = 0; i < 2; ++i) {
516       sm.notCompacting.add(createFile());
517     }
518     cst.requestSystemCompaction(r, store, "s1-pri1");
519     // Finally add blocking compaction with priority 2.
520     cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
521 
522     // Unblock the blocking compaction; we should run pri1 and become block again in pri2.
523     currentBlock.unblock();
524     currentBlock = blocker.waitForBlocking();
525     // Pri1 should have "compacted" all 6 files.
526     assertEquals(1, results.size());
527     assertEquals(6, results.get(0).intValue());
528     // Add 2 files to store 1 (it has 2 files now).
529     for (int i = 0; i < 2; ++i) {
530       sm.notCompacting.add(createFile());
531     }
532     // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
533     // is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
534     cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
535     currentBlock.unblock();
536     currentBlock = blocker.waitForBlocking();
537     assertEquals(3, results.size());
538     assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files.
539     assertEquals(2, results.get(2).intValue());
540 
541     currentBlock.unblock();
542     cst.interruptIfNecessary();
543   }
544 
545   private static StoreFile createFile() throws Exception {
546     StoreFile sf = mock(StoreFile.class);
547     when(sf.getPath()).thenReturn(new Path("file"));
548     StoreFile.Reader r = mock(StoreFile.Reader.class);
549     when(r.length()).thenReturn(10L);
550     when(sf.getReader()).thenReturn(r);
551     return sf;
552   }
553 
554   /**
555    * Simple {@link CompactionRequest} on which you can wait until the requested compaction finishes.
556    */
557   public static class TrackableCompactionRequest extends CompactionRequest {
558     private CountDownLatch done;
559 
560     /**
561      * Constructor for a custom compaction. Uses the setXXX methods to update the state of the
562      * compaction before being used.
563      */
564     public TrackableCompactionRequest(CountDownLatch finished) {
565       super();
566       this.done = finished;
567     }
568 
569     @Override
570     public void afterExecute() {
571       super.afterExecute();
572       this.done.countDown();
573     }
574   }
575 }