1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
22 import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
23 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
27 import static org.mockito.Matchers.any;
28 import static org.mockito.Mockito.doAnswer;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.when;
32
33 import java.io.IOException;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FSDataOutputStream;
44 import org.apache.hadoop.fs.FileStatus;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.hbase.HBaseConfiguration;
48 import org.apache.hadoop.hbase.HBaseTestCase;
49 import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
50 import org.apache.hadoop.hbase.HBaseTestingUtility;
51 import org.apache.hadoop.hbase.HConstants;
52 import org.apache.hadoop.hbase.HTableDescriptor;
53 import org.apache.hadoop.hbase.MediumTests;
54 import org.apache.hadoop.hbase.client.Delete;
55 import org.apache.hadoop.hbase.client.Durability;
56 import org.apache.hadoop.hbase.client.Put;
57 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
58 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
59 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
60 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
61 import org.apache.hadoop.hbase.regionserver.wal.HLog;
62 import org.apache.hadoop.hbase.util.Bytes;
63 import org.apache.hadoop.hbase.util.Pair;
64 import org.apache.hadoop.hbase.util.Threads;
65 import org.junit.After;
66 import org.junit.Assume;
67 import org.junit.Before;
68 import org.junit.Rule;
69 import org.junit.Test;
70 import org.junit.experimental.categories.Category;
71 import org.junit.rules.TestName;
72 import org.mockito.Mockito;
73 import org.mockito.invocation.InvocationOnMock;
74 import org.mockito.stubbing.Answer;
75
76
77
78
79
80 @Category(MediumTests.class)
81 public class TestCompaction {
82 @Rule public TestName name = new TestName();
83 static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
84 private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
85 protected Configuration conf = UTIL.getConfiguration();
86
87 private HRegion r = null;
88 private HTableDescriptor htd = null;
89 private static final byte [] COLUMN_FAMILY = fam1;
90 private final byte [] STARTROW = Bytes.toBytes(START_KEY);
91 private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
92 private int compactionThreshold;
93 private byte[] secondRowBytes, thirdRowBytes;
94 private static final long MAX_FILES_TO_COMPACT = 10;
95
96
97 public TestCompaction() {
98 super();
99
100
101 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024);
102 conf.setInt("hbase.hregion.memstore.block.multiplier", 100);
103 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
104
105 secondRowBytes = START_KEY_BYTES.clone();
106
107 secondRowBytes[START_KEY_BYTES.length - 1]++;
108 thirdRowBytes = START_KEY_BYTES.clone();
109 thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
110 }
111
112 @Before
113 public void setUp() throws Exception {
114 this.htd = UTIL.createTableDescriptor(name.getMethodName());
115 this.r = UTIL.createLocalHRegion(htd, null, null);
116 }
117
118 @After
119 public void tearDown() throws Exception {
120 HLog hlog = r.getLog();
121 this.r.close();
122 hlog.closeAndDelete();
123 }
124
125
126
127
128
129
130 @Test
131 public void testInterruptCompaction() throws Exception {
132 assertEquals(0, count());
133
134
135 int origWI = HStore.closeCheckInterval;
136 HStore.closeCheckInterval = 10*1000;
137
138 try {
139
140 int jmax = (int) Math.ceil(15.0/compactionThreshold);
141 byte [] pad = new byte[1000];
142 for (int i = 0; i < compactionThreshold; i++) {
143 HRegionIncommon loader = new HRegionIncommon(r);
144 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
145 p.setDurability(Durability.SKIP_WAL);
146 for (int j = 0; j < jmax; j++) {
147 p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad);
148 }
149 HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
150 loader.put(p);
151 loader.flushcache();
152 }
153
154 HRegion spyR = spy(r);
155 doAnswer(new Answer() {
156 public Object answer(InvocationOnMock invocation) throws Throwable {
157 r.writestate.writesEnabled = false;
158 return invocation.callRealMethod();
159 }
160 }).when(spyR).doRegionCompactionPrep();
161
162
163 spyR.compactStores();
164
165
166 Store s = r.stores.get(COLUMN_FAMILY);
167 assertEquals(compactionThreshold, s.getStorefilesCount());
168 assertTrue(s.getStorefilesSize() > 15*1000);
169
170 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
171 assertEquals(0, ls.length);
172
173 } finally {
174
175 r.writestate.writesEnabled = true;
176 HStore.closeCheckInterval = origWI;
177
178
179 for (int i = 0; i < compactionThreshold; i++) {
180 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
181 byte [][] famAndQf = {COLUMN_FAMILY, null};
182 delete.deleteFamily(famAndQf[0]);
183 r.delete(delete);
184 }
185 r.flushcache();
186
187
188
189 final int ttl = 1000;
190 for (Store hstore: this.r.stores.values()) {
191 HStore store = (HStore)hstore;
192 ScanInfo old = store.getScanInfo();
193 ScanInfo si = new ScanInfo(old.getFamily(),
194 old.getMinVersions(), old.getMaxVersions(), ttl,
195 old.getKeepDeletedCells(), 0, old.getComparator());
196 store.setScanInfo(si);
197 }
198 Thread.sleep(ttl);
199
200 r.compactStores(true);
201 assertEquals(0, count());
202 }
203 }
204
205 private int count() throws IOException {
206 int count = 0;
207 for (StoreFile f: this.r.stores.
208 get(COLUMN_FAMILY_TEXT).getStorefiles()) {
209 HFileScanner scanner = f.getReader().getScanner(false, false);
210 if (!scanner.seekTo()) {
211 continue;
212 }
213 do {
214 count++;
215 } while(scanner.next());
216 }
217 return count;
218 }
219
220 private void createStoreFile(final HRegion region) throws IOException {
221 createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
222 }
223
224 private void createStoreFile(final HRegion region, String family) throws IOException {
225 HRegionIncommon loader = new HRegionIncommon(region);
226 HBaseTestCase.addContent(loader, family);
227 loader.flushcache();
228 }
229
230 @Test
231 public void testCompactionWithCorruptResult() throws Exception {
232 int nfiles = 10;
233 for (int i = 0; i < nfiles; i++) {
234 createStoreFile(r);
235 }
236 HStore store = (HStore) r.getStore(COLUMN_FAMILY);
237
238 Collection<StoreFile> storeFiles = store.getStorefiles();
239 DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
240 tool.compactForTesting(storeFiles, false);
241
242
243 FileSystem fs = store.getFileSystem();
244
245 Path dstPath = store.getRegionFileSystem().createTempName();
246 FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, (long)1024, null);
247 stream.writeChars("CORRUPT FILE!!!!");
248 stream.close();
249 Path origPath = store.getRegionFileSystem().commitStoreFile(
250 Bytes.toString(COLUMN_FAMILY), dstPath);
251
252 try {
253 ((HStore)store).moveFileIntoPlace(origPath);
254 } catch (Exception e) {
255
256
257 assert (fs.exists(origPath));
258 assert (!fs.exists(dstPath));
259 System.out.println("testCompactionWithCorruptResult Passed");
260 return;
261 }
262 fail("testCompactionWithCorruptResult failed since no exception was" +
263 "thrown while completing a corrupt file");
264 }
265
266
267
268
269
270 @Test
271 public void testTrackingCompactionRequest() throws Exception {
272
273 HRegionServer mockServer = Mockito.mock(HRegionServer.class);
274 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
275 CompactSplitThread thread = new CompactSplitThread(mockServer);
276 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
277
278
279 Store store = r.getStore(COLUMN_FAMILY);
280 createStoreFile(r);
281 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
282 createStoreFile(r);
283 }
284
285 CountDownLatch latch = new CountDownLatch(1);
286 TrackableCompactionRequest request = new TrackableCompactionRequest(latch);
287 thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request);
288
289 latch.await();
290
291 thread.interruptIfNecessary();
292 }
293
294
295
296
297
298
299 @Test
300 public void testMultipleCustomCompactionRequests() throws Exception {
301
302 HRegionServer mockServer = Mockito.mock(HRegionServer.class);
303 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
304 CompactSplitThread thread = new CompactSplitThread(mockServer);
305 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
306
307
308 int numStores = r.getStores().size();
309 List<Pair<CompactionRequest, Store>> requests =
310 new ArrayList<Pair<CompactionRequest, Store>>(numStores);
311 CountDownLatch latch = new CountDownLatch(numStores);
312
313
314 for (Store store : r.getStores().values()) {
315 createStoreFile(r, store.getColumnFamilyName());
316 createStoreFile(r, store.getColumnFamilyName());
317 createStoreFile(r, store.getColumnFamilyName());
318 requests
319 .add(new Pair<CompactionRequest, Store>(new TrackableCompactionRequest(latch), store));
320 }
321
322 thread.requestCompaction(r, "test mulitple custom comapctions", Store.PRIORITY_USER,
323 Collections.unmodifiableList(requests));
324
325
326 latch.await();
327
328 thread.interruptIfNecessary();
329 }
330
331 private class StoreMockMaker extends StatefulStoreMockMaker {
332 public ArrayList<StoreFile> compacting = new ArrayList<StoreFile>();
333 public ArrayList<StoreFile> notCompacting = new ArrayList<StoreFile>();
334 private ArrayList<Integer> results;
335
336 public StoreMockMaker(ArrayList<Integer> results) {
337 this.results = results;
338 }
339
340 public class TestCompactionContext extends CompactionContext {
341 private List<StoreFile> selectedFiles;
342 public TestCompactionContext(List<StoreFile> selectedFiles) {
343 super();
344 this.selectedFiles = selectedFiles;
345 }
346
347 @Override
348 public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
349 return new ArrayList<StoreFile>();
350 }
351
352 @Override
353 public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
354 boolean mayUseOffPeak, boolean forceMajor) throws IOException {
355 this.request = new CompactionRequest(selectedFiles);
356 this.request.setPriority(getPriority());
357 return true;
358 }
359
360 @Override
361 public List<Path> compact() throws IOException {
362 finishCompaction(this.selectedFiles);
363 return new ArrayList<Path>();
364 }
365 }
366
367 @Override
368 public synchronized CompactionContext selectCompaction() {
369 CompactionContext ctx = new TestCompactionContext(new ArrayList<StoreFile>(notCompacting));
370 compacting.addAll(notCompacting);
371 notCompacting.clear();
372 try {
373 ctx.select(null, false, false, false);
374 } catch (IOException ex) {
375 fail("Shouldn't happen");
376 }
377 return ctx;
378 }
379
380 @Override
381 public synchronized void cancelCompaction(Object object) {
382 TestCompactionContext ctx = (TestCompactionContext)object;
383 compacting.removeAll(ctx.selectedFiles);
384 notCompacting.addAll(ctx.selectedFiles);
385 }
386
387 public synchronized void finishCompaction(List<StoreFile> sfs) {
388 if (sfs.isEmpty()) return;
389 synchronized (results) {
390 results.add(sfs.size());
391 }
392 compacting.removeAll(sfs);
393 }
394
395 @Override
396 public int getPriority() {
397 return 7 - compacting.size() - notCompacting.size();
398 }
399 }
400
401 public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
402 BlockingCompactionContext blocked = null;
403
404 public class BlockingCompactionContext extends CompactionContext {
405 public volatile boolean isInCompact = false;
406
407 public void unblock() {
408 synchronized (this) { this.notifyAll(); }
409 }
410
411 @Override
412 public List<Path> compact() throws IOException {
413 try {
414 isInCompact = true;
415 synchronized (this) { this.wait(); }
416 } catch (InterruptedException e) {
417 Assume.assumeNoException(e);
418 }
419 return new ArrayList<Path>();
420 }
421
422 @Override
423 public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
424 return new ArrayList<StoreFile>();
425 }
426
427 @Override
428 public boolean select(List<StoreFile> f, boolean i, boolean m, boolean e)
429 throws IOException {
430 this.request = new CompactionRequest(new ArrayList<StoreFile>());
431 return true;
432 }
433 }
434
435 @Override
436 public CompactionContext selectCompaction() {
437 this.blocked = new BlockingCompactionContext();
438 try {
439 this.blocked.select(null, false, false, false);
440 } catch (IOException ex) {
441 fail("Shouldn't happen");
442 }
443 return this.blocked;
444 }
445
446 @Override
447 public void cancelCompaction(Object object) {}
448
449 public int getPriority() {
450 return Integer.MIN_VALUE;
451 }
452
453 public BlockingCompactionContext waitForBlocking() {
454 while (this.blocked == null || !this.blocked.isInCompact) {
455 Threads.sleepWithoutInterrupt(50);
456 }
457 BlockingCompactionContext ctx = this.blocked;
458 this.blocked = null;
459 return ctx;
460 }
461
462 @Override
463 public Store createStoreMock(String name) throws Exception {
464 return createStoreMock(Integer.MIN_VALUE, name);
465 }
466
467 public Store createStoreMock(int priority, String name) throws Exception {
468
469 Store s = super.createStoreMock(name);
470 when(s.getCompactPriority()).thenReturn(priority);
471 return s;
472 }
473 }
474
475
476 @Test
477 public void testCompactionQueuePriorities() throws Exception {
478
479 final Configuration conf = HBaseConfiguration.create();
480 HRegionServer mockServer = mock(HRegionServer.class);
481 when(mockServer.isStopped()).thenReturn(false);
482 when(mockServer.getConfiguration()).thenReturn(conf);
483 CompactSplitThread cst = new CompactSplitThread(mockServer);
484 when(mockServer.getCompactSplitThread()).thenReturn(cst);
485
486
487 HRegion r = mock(HRegion.class);
488 when(r.compact(any(CompactionContext.class), any(Store.class))).then(new Answer<Boolean>() {
489 public Boolean answer(InvocationOnMock invocation) throws Throwable {
490 ((CompactionContext)invocation.getArguments()[0]).compact();
491 return true;
492 }
493 });
494
495
496 ArrayList<Integer> results = new ArrayList<Integer>();
497 StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
498 Store store = sm.createStoreMock("store1"), store2 = sm2.createStoreMock("store2");
499 BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
500
501
502 cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
503 BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
504
505
506 for (int i = 0; i < 4; ++i) {
507 sm.notCompacting.add(createFile());
508 }
509 cst.requestSystemCompaction(r, store, "s1-pri3");
510 for (int i = 0; i < 3; ++i) {
511 sm2.notCompacting.add(createFile());
512 }
513 cst.requestSystemCompaction(r, store2, "s2-pri4");
514
515 for (int i = 0; i < 2; ++i) {
516 sm.notCompacting.add(createFile());
517 }
518 cst.requestSystemCompaction(r, store, "s1-pri1");
519
520 cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
521
522
523 currentBlock.unblock();
524 currentBlock = blocker.waitForBlocking();
525
526 assertEquals(1, results.size());
527 assertEquals(6, results.get(0).intValue());
528
529 for (int i = 0; i < 2; ++i) {
530 sm.notCompacting.add(createFile());
531 }
532
533
534 cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
535 currentBlock.unblock();
536 currentBlock = blocker.waitForBlocking();
537 assertEquals(3, results.size());
538 assertEquals(3, results.get(1).intValue());
539 assertEquals(2, results.get(2).intValue());
540
541 currentBlock.unblock();
542 cst.interruptIfNecessary();
543 }
544
545 private static StoreFile createFile() throws Exception {
546 StoreFile sf = mock(StoreFile.class);
547 when(sf.getPath()).thenReturn(new Path("file"));
548 StoreFile.Reader r = mock(StoreFile.Reader.class);
549 when(r.length()).thenReturn(10L);
550 when(sf.getReader()).thenReturn(r);
551 return sf;
552 }
553
554
555
556
557 public static class TrackableCompactionRequest extends CompactionRequest {
558 private CountDownLatch done;
559
560
561
562
563
564 public TrackableCompactionRequest(CountDownLatch finished) {
565 super();
566 this.done = finished;
567 }
568
569 @Override
570 public void afterExecute() {
571 super.afterExecute();
572 this.done.countDown();
573 }
574 }
575 }