1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import static org.junit.Assert.*;
23 import static org.mockito.Matchers.any;
24 import static org.mockito.Mockito.spy;
25 import static org.mockito.Mockito.times;
26 import static org.mockito.Mockito.verify;
27
28 import java.io.IOException;
29 import java.lang.ref.SoftReference;
30 import java.security.PrivilegedExceptionAction;
31 import java.util.ArrayList;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.NavigableSet;
37 import java.util.concurrent.ConcurrentSkipListSet;
38 import java.util.concurrent.atomic.AtomicBoolean;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FSDataOutputStream;
44 import org.apache.hadoop.fs.FileStatus;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.FilterFileSystem;
47 import org.apache.hadoop.fs.LocalFileSystem;
48 import org.apache.hadoop.fs.Path;
49 import org.apache.hadoop.fs.permission.FsPermission;
50 import org.apache.hadoop.hbase.Cell;
51 import org.apache.hadoop.hbase.CellUtil;
52 import org.apache.hadoop.hbase.HBaseConfiguration;
53 import org.apache.hadoop.hbase.HBaseTestingUtility;
54 import org.apache.hadoop.hbase.HColumnDescriptor;
55 import org.apache.hadoop.hbase.HRegionInfo;
56 import org.apache.hadoop.hbase.HTableDescriptor;
57 import org.apache.hadoop.hbase.KeyValue;
58 import org.apache.hadoop.hbase.KeyValue.KVComparator;
59 import org.apache.hadoop.hbase.KeyValueUtil;
60 import org.apache.hadoop.hbase.testclassification.MediumTests;
61 import org.apache.hadoop.hbase.TableName;
62 import org.apache.hadoop.hbase.client.Get;
63 import org.apache.hadoop.hbase.io.compress.Compression;
64 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
65 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
66 import org.apache.hadoop.hbase.io.hfile.HFile;
67 import org.apache.hadoop.hbase.io.hfile.HFileContext;
68 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
69 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
70 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
71 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
72 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
73 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
74 import org.apache.hadoop.hbase.wal.WALFactory;
75 import org.apache.hadoop.hbase.security.User;
76 import org.apache.hadoop.hbase.util.Bytes;
77 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
78 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
79 import org.apache.hadoop.hbase.util.FSUtils;
80 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
81 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
82 import org.apache.hadoop.util.Progressable;
83 import org.junit.After;
84 import org.junit.Assert;
85 import org.junit.Before;
86 import org.junit.Rule;
87 import org.junit.Test;
88 import org.junit.experimental.categories.Category;
89 import org.junit.rules.TestName;
90 import org.mockito.Mockito;
91
92 import com.google.common.collect.Lists;
93
94
95
96
97 @Category(MediumTests.class)
98 public class TestStore {
99 private static final Log LOG = LogFactory.getLog(TestStore.class);
100 @Rule public TestName name = new TestName();
101
102 HStore store;
103 byte [] table = Bytes.toBytes("table");
104 byte [] family = Bytes.toBytes("family");
105
106 byte [] row = Bytes.toBytes("row");
107 byte [] row2 = Bytes.toBytes("row2");
108 byte [] qf1 = Bytes.toBytes("qf1");
109 byte [] qf2 = Bytes.toBytes("qf2");
110 byte [] qf3 = Bytes.toBytes("qf3");
111 byte [] qf4 = Bytes.toBytes("qf4");
112 byte [] qf5 = Bytes.toBytes("qf5");
113 byte [] qf6 = Bytes.toBytes("qf6");
114
115 NavigableSet<byte[]> qualifiers =
116 new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
117
118 List<Cell> expected = new ArrayList<Cell>();
119 List<Cell> result = new ArrayList<Cell>();
120
121 long id = System.currentTimeMillis();
122 Get get = new Get(row);
123
124 private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
125 private final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
126
127
128
129
130
131
132 @Before
133 public void setUp() throws IOException {
134 qualifiers.add(qf1);
135 qualifiers.add(qf3);
136 qualifiers.add(qf5);
137
138 Iterator<byte[]> iter = qualifiers.iterator();
139 while(iter.hasNext()){
140 byte [] next = iter.next();
141 expected.add(new KeyValue(row, family, next, 1, (byte[])null));
142 get.addColumn(family, next);
143 }
144 }
145
146 private void init(String methodName) throws IOException {
147 init(methodName, TEST_UTIL.getConfiguration());
148 }
149
150 private void init(String methodName, Configuration conf)
151 throws IOException {
152 HColumnDescriptor hcd = new HColumnDescriptor(family);
153
154
155 hcd.setMaxVersions(4);
156 init(methodName, conf, hcd);
157 }
158
159 private void init(String methodName, Configuration conf,
160 HColumnDescriptor hcd) throws IOException {
161 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
162 init(methodName, conf, htd, hcd);
163 }
164
165 @SuppressWarnings("deprecation")
166 private Store init(String methodName, Configuration conf, HTableDescriptor htd,
167 HColumnDescriptor hcd) throws IOException {
168
169 Path basedir = new Path(DIR+methodName);
170 Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
171 final Path logdir = new Path(basedir, DefaultWALProvider.getWALDirectoryName(methodName));
172
173 FileSystem fs = FileSystem.get(conf);
174
175 fs.delete(logdir, true);
176
177 if (htd.hasFamily(hcd.getName())) {
178 htd.modifyFamily(hcd);
179 } else {
180 htd.addFamily(hcd);
181 }
182 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
183 final Configuration walConf = new Configuration(conf);
184 FSUtils.setRootDir(walConf, basedir);
185 final WALFactory wals = new WALFactory(walConf, null, methodName);
186 HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes()), fs, conf,
187 info, htd, null);
188
189 store = new HStore(region, hcd, conf);
190 return store;
191 }
192
193
194
195
196
197
198 @Test
199 public void testFlushSizeAccounting() throws Exception {
200 LOG.info("Setting up a faulty file system that cannot write in " +
201 this.name.getMethodName());
202 final Configuration conf = HBaseConfiguration.create();
203
204 conf.setInt("hbase.hstore.flush.retries.number", 1);
205 User user = User.createUserForTesting(conf, this.name.getMethodName(),
206 new String[]{"foo"});
207
208 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
209 user.runAs(new PrivilegedExceptionAction<Object>() {
210 @Override
211 public Object run() throws Exception {
212
213 FileSystem fs = FileSystem.get(conf);
214 Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
215 FaultyFileSystem ffs = (FaultyFileSystem)fs;
216
217
218 init(name.getMethodName(), conf);
219
220 long size = store.memstore.getFlushableSize();
221 Assert.assertEquals(0, size);
222 LOG.info("Adding some data");
223 long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
224 size = store.memstore.getFlushableSize();
225 Assert.assertEquals(kvSize, size);
226
227 try {
228 LOG.info("Flushing");
229 flushStore(store, id++);
230 Assert.fail("Didn't bubble up IOE!");
231 } catch (IOException ioe) {
232 Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
233 }
234 size = store.memstore.getFlushableSize();
235 Assert.assertEquals(kvSize, size);
236 store.add(new KeyValue(row, family, qf2, 2, (byte[])null));
237
238
239 Assert.assertEquals(kvSize, size);
240 ffs.fault.set(false);
241 flushStore(store, id++);
242 size = store.memstore.getFlushableSize();
243
244 Assert.assertEquals(kvSize, size);
245 flushStore(store, id++);
246 size = store.memstore.getFlushableSize();
247 Assert.assertEquals(0, size);
248 return null;
249 }
250 });
251 }
252
253
254
255
256
257 @Test
258 public void testCreateWriter() throws Exception {
259 Configuration conf = HBaseConfiguration.create();
260 FileSystem fs = FileSystem.get(conf);
261
262 HColumnDescriptor hcd = new HColumnDescriptor(family);
263 hcd.setCompressionType(Compression.Algorithm.GZ);
264 hcd.setDataBlockEncoding(DataBlockEncoding.DIFF);
265 init(name.getMethodName(), conf, hcd);
266
267
268 StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false, true, false);
269 Path path = writer.getPath();
270 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
271 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
272 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
273 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
274 writer.close();
275
276
277 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), conf);
278 Assert.assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm());
279 Assert.assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
280 reader.close();
281 }
282
283 @Test
284 public void testDeleteExpiredStoreFiles() throws Exception {
285 testDeleteExpiredStoreFiles(0);
286 testDeleteExpiredStoreFiles(1);
287 }
288
289
290
291
292 public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
293 int storeFileNum = 4;
294 int ttl = 4;
295 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
296 EnvironmentEdgeManagerTestHelper.injectEdge(edge);
297
298 Configuration conf = HBaseConfiguration.create();
299
300 conf.setBoolean("hbase.store.delete.expired.storefile", true);
301
302 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
303
304 HColumnDescriptor hcd = new HColumnDescriptor(family);
305 hcd.setMinVersions(minVersions);
306 hcd.setTimeToLive(ttl);
307 init(name.getMethodName() + "-" + minVersions, conf, hcd);
308
309 long storeTtl = this.store.getScanInfo().getTtl();
310 long sleepTime = storeTtl / storeFileNum;
311 long timeStamp;
312
313
314 for (int i = 1; i <= storeFileNum; i++) {
315 LOG.info("Adding some data for the store file #" + i);
316 timeStamp = EnvironmentEdgeManager.currentTime();
317 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null));
318 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null));
319 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null));
320 flush(i);
321 edge.incrementTime(sleepTime);
322 }
323
324
325 Assert.assertEquals(storeFileNum, this.store.getStorefiles().size());
326
327
328
329 for (int i = 1; i <= storeFileNum - 1; i++) {
330
331 assertNull(this.store.requestCompaction());
332 Collection<StoreFile> sfs = this.store.getStorefiles();
333
334 if (minVersions == 0) {
335 assertEquals(storeFileNum - i, sfs.size());
336
337 for (StoreFile sf : sfs) {
338 assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
339 }
340 } else {
341 assertEquals(storeFileNum, sfs.size());
342 }
343
344 edge.incrementTime(sleepTime);
345 }
346 assertNull(this.store.requestCompaction());
347
348 Collection<StoreFile> sfs = this.store.getStorefiles();
349
350 if (minVersions == 0) {
351 assertEquals(1, sfs.size());
352 }
353 long ts = sfs.iterator().next().getReader().getMaxTimestamp();
354 assertTrue(ts < (edge.currentTime() - storeTtl));
355
356 for (StoreFile sf : sfs) {
357 sf.closeReader(true);
358 }
359 }
360
361 @Test
362 public void testLowestModificationTime() throws Exception {
363 Configuration conf = HBaseConfiguration.create();
364 FileSystem fs = FileSystem.get(conf);
365
366 init(name.getMethodName(), conf);
367
368 int storeFileNum = 4;
369 for (int i = 1; i <= storeFileNum; i++) {
370 LOG.info("Adding some data for the store file #"+i);
371 this.store.add(new KeyValue(row, family, qf1, i, (byte[])null));
372 this.store.add(new KeyValue(row, family, qf2, i, (byte[])null));
373 this.store.add(new KeyValue(row, family, qf3, i, (byte[])null));
374 flush(i);
375 }
376
377 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
378 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
379 Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
380
381
382 store.compact(store.requestCompaction(), NoLimitCompactionThroughputController.INSTANCE);
383 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
384 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
385 Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
386 }
387
388 private static long getLowestTimeStampFromFS(FileSystem fs,
389 final Collection<StoreFile> candidates) throws IOException {
390 long minTs = Long.MAX_VALUE;
391 if (candidates.isEmpty()) {
392 return minTs;
393 }
394 Path[] p = new Path[candidates.size()];
395 int i = 0;
396 for (StoreFile sf : candidates) {
397 p[i] = sf.getPath();
398 ++i;
399 }
400
401 FileStatus[] stats = fs.listStatus(p);
402 if (stats == null || stats.length == 0) {
403 return minTs;
404 }
405 for (FileStatus s : stats) {
406 minTs = Math.min(minTs, s.getModificationTime());
407 }
408 return minTs;
409 }
410
411
412
413
414
415 private static final int BLOCKSIZE_SMALL = 8192;
416
417
418
419
420 @Test
421 public void testEmptyStoreFile() throws IOException {
422 init(this.name.getMethodName());
423
424 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
425 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
426 flush(1);
427
428
429 StoreFile f = this.store.getStorefiles().iterator().next();
430 Path storedir = f.getPath().getParent();
431 long seqid = f.getMaxSequenceId();
432 Configuration c = HBaseConfiguration.create();
433 FileSystem fs = FileSystem.get(c);
434 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
435 StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c),
436 fs)
437 .withOutputDir(storedir)
438 .withFileContext(meta)
439 .build();
440 w.appendMetadata(seqid + 1, false);
441 w.close();
442 this.store.close();
443
444 this.store = new HStore(this.store.getHRegion(), this.store.getFamily(), c);
445 Assert.assertEquals(2, this.store.getStorefilesCount());
446
447 result = HBaseTestingUtility.getFromStoreFile(store,
448 get.getRow(),
449 qualifiers);
450 Assert.assertEquals(1, result.size());
451 }
452
453
454
455
456
457 @Test
458 public void testGet_FromMemStoreOnly() throws IOException {
459 init(this.name.getMethodName());
460
461
462 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
463 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
464 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
465 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
466 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
467 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
468
469
470 result = HBaseTestingUtility.getFromStoreFile(store,
471 get.getRow(), qualifiers);
472
473
474 assertCheck();
475 }
476
477
478
479
480
481 @Test
482 public void testGet_FromFilesOnly() throws IOException {
483 init(this.name.getMethodName());
484
485
486 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
487 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
488
489 flush(1);
490
491
492 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
493 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
494
495 flush(2);
496
497
498 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
499 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
500
501 flush(3);
502
503
504 result = HBaseTestingUtility.getFromStoreFile(store,
505 get.getRow(),
506 qualifiers);
507
508
509
510 Collections.sort(result, KeyValue.COMPARATOR);
511
512
513 assertCheck();
514 }
515
516
517
518
519
520 @Test
521 public void testGet_FromMemStoreAndFiles() throws IOException {
522 init(this.name.getMethodName());
523
524
525 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
526 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
527
528 flush(1);
529
530
531 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
532 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
533
534 flush(2);
535
536
537 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
538 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
539
540
541 result = HBaseTestingUtility.getFromStoreFile(store,
542 get.getRow(), qualifiers);
543
544
545 Collections.sort(result, KeyValue.COMPARATOR);
546
547
548 assertCheck();
549 }
550
551 private void flush(int storeFilessize) throws IOException{
552 this.store.snapshot();
553 flushStore(store, id++);
554 Assert.assertEquals(storeFilessize, this.store.getStorefiles().size());
555 Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size());
556 }
557
558 private void assertCheck() {
559 Assert.assertEquals(expected.size(), result.size());
560 for(int i=0; i<expected.size(); i++) {
561 Assert.assertEquals(expected.get(i), result.get(i));
562 }
563 }
564
565
566
567
568
569
570
571 @Test
572 public void testIncrementColumnValue_ICVDuringFlush()
573 throws IOException, InterruptedException {
574 init(this.name.getMethodName());
575
576 long oldValue = 1L;
577 long newValue = 3L;
578 this.store.add(new KeyValue(row, family, qf1,
579 System.currentTimeMillis(),
580 Bytes.toBytes(oldValue)));
581
582
583 this.store.snapshot();
584
585
586 this.store.add(new KeyValue(row, family, qf2,
587 System.currentTimeMillis(),
588 Bytes.toBytes(oldValue)));
589
590
591 long ret = this.store.updateColumnValue(row, family, qf1, newValue);
592
593
594 Assert.assertTrue(ret > 0);
595
596
597 flushStore(store, id++);
598 Assert.assertEquals(1, this.store.getStorefiles().size());
599
600 Assert.assertEquals(2, ((DefaultMemStore)this.store.memstore).cellSet.size());
601
602
603 Get get = new Get(row);
604 get.addColumn(family, qf1);
605 get.setMaxVersions();
606 List<Cell> results = new ArrayList<Cell>();
607
608 results = HBaseTestingUtility.getFromStoreFile(store, get);
609 Assert.assertEquals(2, results.size());
610
611 long ts1 = results.get(0).getTimestamp();
612 long ts2 = results.get(1).getTimestamp();
613
614 Assert.assertTrue(ts1 > ts2);
615
616 Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
617 Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
618 }
619
620 @After
621 public void tearDown() throws Exception {
622 EnvironmentEdgeManagerTestHelper.reset();
623 }
624
625 @Test
626 public void testICV_negMemstoreSize() throws IOException {
627 init(this.name.getMethodName());
628
629 long time = 100;
630 ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
631 ee.setValue(time);
632 EnvironmentEdgeManagerTestHelper.injectEdge(ee);
633 long newValue = 3L;
634 long size = 0;
635
636
637 size += this.store.add(new KeyValue(Bytes.toBytes("200909091000"), family, qf1,
638 System.currentTimeMillis(), Bytes.toBytes(newValue)));
639 size += this.store.add(new KeyValue(Bytes.toBytes("200909091200"), family, qf1,
640 System.currentTimeMillis(), Bytes.toBytes(newValue)));
641 size += this.store.add(new KeyValue(Bytes.toBytes("200909091300"), family, qf1,
642 System.currentTimeMillis(), Bytes.toBytes(newValue)));
643 size += this.store.add(new KeyValue(Bytes.toBytes("200909091400"), family, qf1,
644 System.currentTimeMillis(), Bytes.toBytes(newValue)));
645 size += this.store.add(new KeyValue(Bytes.toBytes("200909091500"), family, qf1,
646 System.currentTimeMillis(), Bytes.toBytes(newValue)));
647
648
649 for ( int i = 0 ; i < 10000 ; ++i) {
650 newValue++;
651
652 long ret = this.store.updateColumnValue(row, family, qf1, newValue);
653 long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue);
654
655 if (ret != 0) System.out.println("ret: " + ret);
656 if (ret2 != 0) System.out.println("ret2: " + ret2);
657
658 Assert.assertTrue("ret: " + ret, ret >= 0);
659 size += ret;
660 Assert.assertTrue("ret2: " + ret2, ret2 >= 0);
661 size += ret2;
662
663
664 if (i % 1000 == 0)
665 ee.setValue(++time);
666 }
667
668 long computedSize=0;
669 for (Cell cell : ((DefaultMemStore)this.store.memstore).cellSet) {
670 long kvsize = DefaultMemStore.heapSizeChange(cell, true);
671
672 computedSize += kvsize;
673 }
674 Assert.assertEquals(computedSize, size);
675 }
676
677 @Test
678 public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception {
679 ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
680 EnvironmentEdgeManagerTestHelper.injectEdge(mee);
681 init(this.name.getMethodName());
682
683 long oldValue = 1L;
684 long newValue = 3L;
685 this.store.add(new KeyValue(row, family, qf1,
686 EnvironmentEdgeManager.currentTime(),
687 Bytes.toBytes(oldValue)));
688
689
690 this.store.snapshot();
691
692
693 long ret = this.store.updateColumnValue(row, family, qf1, newValue);
694
695
696 Assert.assertTrue(ret > 0);
697
698
699 flushStore(store, id++);
700 Assert.assertEquals(1, this.store.getStorefiles().size());
701 Assert.assertEquals(1, ((DefaultMemStore)this.store.memstore).cellSet.size());
702
703
704 newValue += 1;
705 this.store.updateColumnValue(row, family, qf1, newValue);
706
707
708 newValue += 1;
709 this.store.updateColumnValue(row, family, qf1, newValue);
710
711
712
713
714
715 Get get = new Get(row);
716 get.addColumn(family, qf1);
717 get.setMaxVersions();
718 List<Cell> results = new ArrayList<Cell>();
719
720 results = HBaseTestingUtility.getFromStoreFile(store, get);
721 Assert.assertEquals(2, results.size());
722
723 long ts1 = results.get(0).getTimestamp();
724 long ts2 = results.get(1).getTimestamp();
725
726 Assert.assertTrue(ts1 > ts2);
727 Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
728 Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
729
730 mee.setValue(2);
731 newValue += 1;
732 this.store.updateColumnValue(row, family, qf1, newValue);
733
734 results = HBaseTestingUtility.getFromStoreFile(store, get);
735 Assert.assertEquals(2, results.size());
736
737 ts1 = results.get(0).getTimestamp();
738 ts2 = results.get(1).getTimestamp();
739
740 Assert.assertTrue(ts1 > ts2);
741 Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
742 Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
743 }
744
745 @Test
746 public void testHandleErrorsInFlush() throws Exception {
747 LOG.info("Setting up a faulty file system that cannot write");
748
749 final Configuration conf = HBaseConfiguration.create();
750 User user = User.createUserForTesting(conf,
751 "testhandleerrorsinflush", new String[]{"foo"});
752
753 conf.setClass("fs.file.impl", FaultyFileSystem.class,
754 FileSystem.class);
755 user.runAs(new PrivilegedExceptionAction<Object>() {
756 @Override
757 public Object run() throws Exception {
758
759 FileSystem fs = FileSystem.get(conf);
760 Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
761
762
763 init(name.getMethodName(), conf);
764
765 LOG.info("Adding some data");
766 store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
767 store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
768 store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
769
770 LOG.info("Before flush, we should have no files");
771
772 Collection<StoreFileInfo> files =
773 store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
774 Assert.assertEquals(0, files != null ? files.size() : 0);
775
776
777 try {
778 LOG.info("Flushing");
779 flush(1);
780 Assert.fail("Didn't bubble up IOE!");
781 } catch (IOException ioe) {
782 Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
783 }
784
785 LOG.info("After failed flush, we should still have no files!");
786 files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
787 Assert.assertEquals(0, files != null ? files.size() : 0);
788 store.getHRegion().getWAL().close();
789 return null;
790 }
791 });
792 FileSystem.closeAllForUGI(user.getUGI());
793 }
794
795
796
797
798
799 static class FaultyFileSystem extends FilterFileSystem {
800 List<SoftReference<FaultyOutputStream>> outStreams =
801 new ArrayList<SoftReference<FaultyOutputStream>>();
802 private long faultPos = 200;
803 AtomicBoolean fault = new AtomicBoolean(true);
804
805 public FaultyFileSystem() {
806 super(new LocalFileSystem());
807 System.err.println("Creating faulty!");
808 }
809
810 @Override
811 public FSDataOutputStream create(Path p) throws IOException {
812 return new FaultyOutputStream(super.create(p), faultPos, fault);
813 }
814
815 @Override
816 public FSDataOutputStream create(Path f, FsPermission permission,
817 boolean overwrite, int bufferSize, short replication, long blockSize,
818 Progressable progress) throws IOException {
819 return new FaultyOutputStream(super.create(f, permission,
820 overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
821 }
822
823 @Override
824 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
825 int bufferSize, short replication, long blockSize, Progressable progress)
826 throws IOException {
827
828
829 return create(f, overwrite, bufferSize, replication, blockSize, progress);
830 }
831 }
832
833 static class FaultyOutputStream extends FSDataOutputStream {
834 volatile long faultPos = Long.MAX_VALUE;
835 private final AtomicBoolean fault;
836
837 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
838 throws IOException {
839 super(out, null);
840 this.faultPos = faultPos;
841 this.fault = fault;
842 }
843
844 @Override
845 public void write(byte[] buf, int offset, int length) throws IOException {
846 System.err.println("faulty stream write at pos " + getPos());
847 injectFault();
848 super.write(buf, offset, length);
849 }
850
851 private void injectFault() throws IOException {
852 if (this.fault.get() && getPos() >= faultPos) {
853 throw new IOException("Fault injected");
854 }
855 }
856 }
857
858 private static void flushStore(HStore store, long id) throws IOException {
859 StoreFlushContext storeFlushCtx = store.createFlushContext(id);
860 storeFlushCtx.prepare();
861 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
862 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
863 }
864
865
866
867
868
869
870
871
872
873 List<Cell> getKeyValueSet(long[] timestamps, int numRows,
874 byte[] qualifier, byte[] family) {
875 List<Cell> kvList = new ArrayList<Cell>();
876 for (int i=1;i<=numRows;i++) {
877 byte[] b = Bytes.toBytes(i);
878 for (long timestamp: timestamps) {
879 kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
880 }
881 }
882 return kvList;
883 }
884
885
886
887
888
889 @Test
890 public void testMultipleTimestamps() throws IOException {
891 int numRows = 1;
892 long[] timestamps1 = new long[] {1,5,10,20};
893 long[] timestamps2 = new long[] {30,80};
894
895 init(this.name.getMethodName());
896
897 List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
898 for (Cell kv : kvList1) {
899 this.store.add(KeyValueUtil.ensureKeyValue(kv));
900 }
901
902 this.store.snapshot();
903 flushStore(store, id++);
904
905 List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
906 for(Cell kv : kvList2) {
907 this.store.add(KeyValueUtil.ensureKeyValue(kv));
908 }
909
910 List<Cell> result;
911 Get get = new Get(Bytes.toBytes(1));
912 get.addColumn(family,qf1);
913
914 get.setTimeRange(0,15);
915 result = HBaseTestingUtility.getFromStoreFile(store, get);
916 Assert.assertTrue(result.size()>0);
917
918 get.setTimeRange(40,90);
919 result = HBaseTestingUtility.getFromStoreFile(store, get);
920 Assert.assertTrue(result.size()>0);
921
922 get.setTimeRange(10,45);
923 result = HBaseTestingUtility.getFromStoreFile(store, get);
924 Assert.assertTrue(result.size()>0);
925
926 get.setTimeRange(80,145);
927 result = HBaseTestingUtility.getFromStoreFile(store, get);
928 Assert.assertTrue(result.size()>0);
929
930 get.setTimeRange(1,2);
931 result = HBaseTestingUtility.getFromStoreFile(store, get);
932 Assert.assertTrue(result.size()>0);
933
934 get.setTimeRange(90,200);
935 result = HBaseTestingUtility.getFromStoreFile(store, get);
936 Assert.assertTrue(result.size()==0);
937 }
938
939
940
941
942
943
944 @Test
945 public void testSplitWithEmptyColFam() throws IOException {
946 init(this.name.getMethodName());
947 Assert.assertNull(store.getSplitPoint());
948 store.getHRegion().forceSplit(null);
949 Assert.assertNull(store.getSplitPoint());
950 store.getHRegion().clearSplit();
951 }
952
953 @Test
954 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
955 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
956 long anyValue = 10;
957
958
959
960
961 Configuration conf = HBaseConfiguration.create();
962 conf.setLong(CONFIG_KEY, anyValue);
963 init(name.getMethodName() + "-xml", conf);
964 Assert.assertTrue(store.throttleCompaction(anyValue + 1));
965 Assert.assertFalse(store.throttleCompaction(anyValue));
966
967
968 --anyValue;
969 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
970 HColumnDescriptor hcd = new HColumnDescriptor(family);
971 htd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
972 init(name.getMethodName() + "-htd", conf, htd, hcd);
973 Assert.assertTrue(store.throttleCompaction(anyValue + 1));
974 Assert.assertFalse(store.throttleCompaction(anyValue));
975
976
977 --anyValue;
978 hcd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
979 init(name.getMethodName() + "-hcd", conf, htd, hcd);
980 Assert.assertTrue(store.throttleCompaction(anyValue + 1));
981 Assert.assertFalse(store.throttleCompaction(anyValue));
982 }
983
984 public static class DummyStoreEngine extends DefaultStoreEngine {
985 public static DefaultCompactor lastCreatedCompactor = null;
986 @Override
987 protected void createComponents(
988 Configuration conf, Store store, KVComparator comparator) throws IOException {
989 super.createComponents(conf, store, comparator);
990 lastCreatedCompactor = this.compactor;
991 }
992 }
993
994 @Test
995 public void testStoreUsesSearchEngineOverride() throws Exception {
996 Configuration conf = HBaseConfiguration.create();
997 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
998 init(this.name.getMethodName(), conf);
999 Assert.assertEquals(DummyStoreEngine.lastCreatedCompactor,
1000 this.store.storeEngine.getCompactor());
1001 }
1002
1003 private void addStoreFile() throws IOException {
1004 StoreFile f = this.store.getStorefiles().iterator().next();
1005 Path storedir = f.getPath().getParent();
1006 long seqid = this.store.getMaxSequenceId();
1007 Configuration c = TEST_UTIL.getConfiguration();
1008 FileSystem fs = FileSystem.get(c);
1009 HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
1010 StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c),
1011 fs)
1012 .withOutputDir(storedir)
1013 .withFileContext(fileContext)
1014 .build();
1015 w.appendMetadata(seqid + 1, false);
1016 w.close();
1017 LOG.info("Added store file:" + w.getPath());
1018 }
1019
1020 private void archiveStoreFile(int index) throws IOException {
1021 Collection<StoreFile> files = this.store.getStorefiles();
1022 StoreFile sf = null;
1023 Iterator<StoreFile> it = files.iterator();
1024 for (int i = 0; i <= index; i++) {
1025 sf = it.next();
1026 }
1027 store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
1028 }
1029
1030 @Test
1031 public void testRefreshStoreFiles() throws Exception {
1032 init(name.getMethodName());
1033
1034 assertEquals(0, this.store.getStorefilesCount());
1035
1036
1037 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
1038 flush(1);
1039 assertEquals(1, this.store.getStorefilesCount());
1040
1041
1042 addStoreFile();
1043
1044 assertEquals(1, this.store.getStorefilesCount());
1045 store.refreshStoreFiles();
1046 assertEquals(2, this.store.getStorefilesCount());
1047
1048
1049 addStoreFile();
1050 addStoreFile();
1051 addStoreFile();
1052
1053 assertEquals(2, this.store.getStorefilesCount());
1054 store.refreshStoreFiles();
1055 assertEquals(5, this.store.getStorefilesCount());
1056
1057 archiveStoreFile(0);
1058
1059 assertEquals(5, this.store.getStorefilesCount());
1060 store.refreshStoreFiles();
1061 assertEquals(4, this.store.getStorefilesCount());
1062
1063 archiveStoreFile(0);
1064 archiveStoreFile(1);
1065 archiveStoreFile(2);
1066
1067 assertEquals(4, this.store.getStorefilesCount());
1068 store.refreshStoreFiles();
1069 assertEquals(1, this.store.getStorefilesCount());
1070
1071 archiveStoreFile(0);
1072 store.refreshStoreFiles();
1073 assertEquals(0, this.store.getStorefilesCount());
1074 }
1075
1076 @SuppressWarnings("unchecked")
1077 @Test
1078 public void testRefreshStoreFilesNotChanged() throws IOException {
1079 init(name.getMethodName());
1080
1081 assertEquals(0, this.store.getStorefilesCount());
1082
1083
1084 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
1085 flush(1);
1086
1087 addStoreFile();
1088
1089 HStore spiedStore = spy(store);
1090
1091
1092 spiedStore.refreshStoreFiles();
1093 assertEquals(2, this.store.getStorefilesCount());
1094 verify(spiedStore, times(1)).replaceStoreFiles(any(Collection.class), any(Collection.class));
1095
1096
1097 spiedStore.refreshStoreFiles();
1098
1099
1100 verify(spiedStore, times(0)).replaceStoreFiles(null, null);
1101 }
1102 }