1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
22 import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
23 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertTrue;
28
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Map.Entry;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.Cell;
41 import org.apache.hadoop.hbase.CellUtil;
42 import org.apache.hadoop.hbase.HBaseTestCase;
43 import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
44 import org.apache.hadoop.hbase.HBaseTestingUtility;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.HTableDescriptor;
47 import org.apache.hadoop.hbase.MediumTests;
48 import org.apache.hadoop.hbase.client.Delete;
49 import org.apache.hadoop.hbase.client.Get;
50 import org.apache.hadoop.hbase.client.Result;
51 import org.apache.hadoop.hbase.client.Scan;
52 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
53 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
54 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
55 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
56 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
57 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
58 import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
59 import org.apache.hadoop.hbase.regionserver.wal.HLog;
60 import org.apache.hadoop.hbase.util.Bytes;
61 import org.junit.After;
62 import org.junit.Before;
63 import org.junit.Rule;
64 import org.junit.Test;
65 import org.junit.experimental.categories.Category;
66 import org.junit.rules.TestName;
67
68
69
70
71
72 @Category(MediumTests.class)
73 public class TestMajorCompaction {
74 @Rule public TestName name = new TestName();
75 static final Log LOG = LogFactory.getLog(TestMajorCompaction.class.getName());
76 private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
77 protected Configuration conf = UTIL.getConfiguration();
78
79 private HRegion r = null;
80 private HTableDescriptor htd = null;
81 private static final byte [] COLUMN_FAMILY = fam1;
82 private final byte [] STARTROW = Bytes.toBytes(START_KEY);
83 private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
84 private int compactionThreshold;
85 private byte[] secondRowBytes, thirdRowBytes;
86 private static final long MAX_FILES_TO_COMPACT = 10;
87
88
89 public TestMajorCompaction() {
90 super();
91
92
93 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024);
94 conf.setInt("hbase.hregion.memstore.block.multiplier", 100);
95 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
96
97 secondRowBytes = START_KEY_BYTES.clone();
98
99 secondRowBytes[START_KEY_BYTES.length - 1]++;
100 thirdRowBytes = START_KEY_BYTES.clone();
101 thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
102 }
103
104 @Before
105 public void setUp() throws Exception {
106 this.htd = UTIL.createTableDescriptor(name.getMethodName());
107 this.r = UTIL.createLocalHRegion(htd, null, null);
108 }
109
110 @After
111 public void tearDown() throws Exception {
112 HLog hlog = r.getLog();
113 this.r.close();
114 hlog.closeAndDelete();
115 }
116
117
118
119
120
121
122
123 @Test
124 public void testMajorCompactingToNoOutput() throws IOException {
125 createStoreFile(r);
126 for (int i = 0; i < compactionThreshold; i++) {
127 createStoreFile(r);
128 }
129
130 InternalScanner s = r.getScanner(new Scan());
131 do {
132 List<Cell> results = new ArrayList<Cell>();
133 boolean result = s.next(results);
134 r.delete(new Delete(CellUtil.cloneRow(results.get(0))));
135 if (!result) break;
136 } while(true);
137 s.close();
138
139 r.flushcache();
140
141 r.compactStores(true);
142 s = r.getScanner(new Scan());
143 int counter = 0;
144 do {
145 List<Cell> results = new ArrayList<Cell>();
146 boolean result = s.next(results);
147 if (!result) break;
148 counter++;
149 } while(true);
150 assertEquals(0, counter);
151 }
152
153
154
155
156
157
158 @Test
159 public void testMajorCompaction() throws Exception {
160 majorCompaction();
161 }
162
163 @Test
164 public void testDataBlockEncodingInCacheOnly() throws Exception {
165 majorCompactionWithDataBlockEncoding(true);
166 }
167
168 @Test
169 public void testDataBlockEncodingEverywhere() throws Exception {
170 majorCompactionWithDataBlockEncoding(false);
171 }
172
173 public void majorCompactionWithDataBlockEncoding(boolean inCacheOnly)
174 throws Exception {
175 Map<HStore, HFileDataBlockEncoder> replaceBlockCache =
176 new HashMap<HStore, HFileDataBlockEncoder>();
177 for (Entry<byte[], Store> pair : r.getStores().entrySet()) {
178 HStore store = (HStore) pair.getValue();
179 HFileDataBlockEncoder blockEncoder = store.getDataBlockEncoder();
180 replaceBlockCache.put(store, blockEncoder);
181 final DataBlockEncoding inCache = DataBlockEncoding.PREFIX;
182 final DataBlockEncoding onDisk = inCacheOnly ? DataBlockEncoding.NONE :
183 inCache;
184 store.setDataBlockEncoderInTest(new HFileDataBlockEncoderImpl(onDisk));
185 }
186
187 majorCompaction();
188
189
190 for (Entry<HStore, HFileDataBlockEncoder> entry :
191 replaceBlockCache.entrySet()) {
192 entry.getKey().setDataBlockEncoderInTest(entry.getValue());
193 }
194 }
195
196 private void majorCompaction() throws Exception {
197 createStoreFile(r);
198 for (int i = 0; i < compactionThreshold; i++) {
199 createStoreFile(r);
200 }
201
202 HBaseTestCase.addContent(new HRegionIncommon(r), Bytes.toString(COLUMN_FAMILY));
203
204
205
206
207
208 Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
209 assertEquals(compactionThreshold, result.size());
210
211
212 for (Store store : this.r.stores.values()) {
213 assertNull(store.getCompactionProgress());
214 }
215
216 r.flushcache();
217 r.compactStores(true);
218
219
220 int storeCount = 0;
221 for (Store store : this.r.stores.values()) {
222 CompactionProgress progress = store.getCompactionProgress();
223 if( progress != null ) {
224 ++storeCount;
225 assertTrue(progress.currentCompactedKVs > 0);
226 assertTrue(progress.totalCompactingKVs > 0);
227 }
228 assertTrue(storeCount > 0);
229 }
230
231
232
233 byte [] secondRowBytes = START_KEY_BYTES.clone();
234 secondRowBytes[START_KEY_BYTES.length - 1]++;
235
236
237 result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).
238 setMaxVersions(100));
239 LOG.debug("Row " + Bytes.toStringBinary(secondRowBytes) + " after " +
240 "initial compaction: " + result);
241 assertEquals("Invalid number of versions of row "
242 + Bytes.toStringBinary(secondRowBytes) + ".", compactionThreshold,
243 result.size());
244
245
246
247
248
249
250 LOG.debug("Adding deletes to memstore and flushing");
251 Delete delete = new Delete(secondRowBytes, System.currentTimeMillis());
252 byte [][] famAndQf = {COLUMN_FAMILY, null};
253 delete.deleteFamily(famAndQf[0]);
254 r.delete(delete);
255
256
257 result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
258 assertTrue("Second row should have been deleted", result.isEmpty());
259
260 r.flushcache();
261
262 result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
263 assertTrue("Second row should have been deleted", result.isEmpty());
264
265
266 createSmallerStoreFile(this.r);
267 r.flushcache();
268
269 result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
270 assertTrue("Second row should still be deleted", result.isEmpty());
271
272
273 r.compactStores(true);
274 assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1);
275
276 result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
277 assertTrue("Second row should still be deleted", result.isEmpty());
278
279
280
281
282 verifyCounts(3,0);
283
284
285
286 final int ttl = 1000;
287 for (Store hstore : this.r.stores.values()) {
288 HStore store = ((HStore) hstore);
289 ScanInfo old = store.getScanInfo();
290 ScanInfo si = new ScanInfo(old.getFamily(),
291 old.getMinVersions(), old.getMaxVersions(), ttl,
292 old.getKeepDeletedCells(), 0, old.getComparator());
293 store.setScanInfo(si);
294 }
295 Thread.sleep(1000);
296
297 r.compactStores(true);
298 int count = count();
299 assertEquals("Should not see anything after TTL has expired", 0, count);
300 }
301
302 @Test
303 public void testTimeBasedMajorCompaction() throws Exception {
304
305 int delay = 10 * 1000;
306 float jitterPct = 0.20f;
307 conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, delay);
308 conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
309
310 HStore s = ((HStore) r.getStore(COLUMN_FAMILY));
311 s.storeEngine.getCompactionPolicy().setConf(conf);
312 try {
313 createStoreFile(r);
314 createStoreFile(r);
315 r.compactStores(true);
316
317
318 createStoreFile(r);
319 r.compactStores(false);
320 assertEquals(2, s.getStorefilesCount());
321
322
323 RatioBasedCompactionPolicy
324 c = (RatioBasedCompactionPolicy)s.storeEngine.getCompactionPolicy();
325 Collection<StoreFile> storeFiles = s.getStorefiles();
326 long mcTime = c.getNextMajorCompactTime(storeFiles);
327 for (int i = 0; i < 10; ++i) {
328 assertEquals(mcTime, c.getNextMajorCompactTime(storeFiles));
329 }
330
331
332 long jitter = Math.round(delay * jitterPct);
333 assertTrue(delay - jitter <= mcTime && mcTime <= delay + jitter);
334
335
336 Thread.sleep(mcTime);
337
338
339 r.compactStores(false);
340 assertEquals(1, s.getStorefilesCount());
341 } finally {
342
343 conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
344 conf.setFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
345
346 createStoreFile(r);
347 r.compactStores(true);
348 assertEquals(1, s.getStorefilesCount());
349 }
350 }
351
352 private void verifyCounts(int countRow1, int countRow2) throws Exception {
353 int count1 = 0;
354 int count2 = 0;
355 for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
356 HFileScanner scanner = f.getReader().getScanner(false, false);
357 scanner.seekTo();
358 do {
359 byte [] row = scanner.getKeyValue().getRow();
360 if (Bytes.equals(row, STARTROW)) {
361 count1++;
362 } else if(Bytes.equals(row, secondRowBytes)) {
363 count2++;
364 }
365 } while(scanner.next());
366 }
367 assertEquals(countRow1,count1);
368 assertEquals(countRow2,count2);
369 }
370
371
372 private int count() throws IOException {
373 int count = 0;
374 for (StoreFile f: this.r.stores.
375 get(COLUMN_FAMILY_TEXT).getStorefiles()) {
376 HFileScanner scanner = f.getReader().getScanner(false, false);
377 if (!scanner.seekTo()) {
378 continue;
379 }
380 do {
381 count++;
382 } while(scanner.next());
383 }
384 return count;
385 }
386
387 private void createStoreFile(final HRegion region) throws IOException {
388 createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
389 }
390
391 private void createStoreFile(final HRegion region, String family) throws IOException {
392 HRegionIncommon loader = new HRegionIncommon(region);
393 HBaseTestCase.addContent(loader, family);
394 loader.flushcache();
395 }
396
397 private void createSmallerStoreFile(final HRegion region) throws IOException {
398 HRegionIncommon loader = new HRegionIncommon(region);
399 HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY), ("" +
400 "bbb").getBytes(), null);
401 loader.flushcache();
402 }
403
404
405
406
407 @Test
408 public void testNonUserMajorCompactionRequest() throws Exception {
409 Store store = r.getStore(COLUMN_FAMILY);
410 createStoreFile(r);
411 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
412 createStoreFile(r);
413 }
414 store.triggerMajorCompaction();
415
416 CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null).getRequest();
417 assertNotNull("Expected to receive a compaction request", request);
418 assertEquals(
419 "System-requested major compaction should not occur if there are too many store files",
420 false,
421 request.isMajor());
422 }
423
424
425
426
427 @Test
428 public void testUserMajorCompactionRequest() throws IOException{
429 Store store = r.getStore(COLUMN_FAMILY);
430 createStoreFile(r);
431 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
432 createStoreFile(r);
433 }
434 store.triggerMajorCompaction();
435 CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null).getRequest();
436 assertNotNull("Expected to receive a compaction request", request);
437 assertEquals(
438 "User-requested major compaction should always occur, even if there are too many store files",
439 true,
440 request.isMajor());
441 }
442
443
444
445
446
447
448
449 public void testMajorCompactingToNoOutputWithReverseScan() throws IOException {
450 createStoreFile(r);
451 for (int i = 0; i < compactionThreshold; i++) {
452 createStoreFile(r);
453 }
454
455 Scan scan = new Scan();
456 scan.setReversed(true);
457 InternalScanner s = r.getScanner(scan);
458 do {
459 List<Cell> results = new ArrayList<Cell>();
460 boolean result = s.next(results);
461 assertTrue(!results.isEmpty());
462 r.delete(new Delete(results.get(0).getRow()));
463 if (!result) break;
464 } while (true);
465 s.close();
466
467 r.flushcache();
468
469 r.compactStores(true);
470 scan = new Scan();
471 scan.setReversed(true);
472 s = r.getScanner(scan);
473 int counter = 0;
474 do {
475 List<Cell> results = new ArrayList<Cell>();
476 boolean result = s.next(results);
477 if (!result) break;
478 counter++;
479 } while (true);
480 s.close();
481 assertEquals(0, counter);
482 }
483 }