1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.lang.ref.SoftReference;
24 import java.security.PrivilegedExceptionAction;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.NavigableSet;
31 import java.util.concurrent.ConcurrentSkipListSet;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FSDataOutputStream;
38 import org.apache.hadoop.fs.FileStatus;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.FilterFileSystem;
41 import org.apache.hadoop.fs.LocalFileSystem;
42 import org.apache.hadoop.fs.Path;
43 import org.apache.hadoop.fs.permission.FsPermission;
44 import org.apache.hadoop.hbase.Cell;
45 import org.apache.hadoop.hbase.CellUtil;
46 import org.apache.hadoop.hbase.HBaseConfiguration;
47 import org.apache.hadoop.hbase.HBaseTestingUtility;
48 import org.apache.hadoop.hbase.HColumnDescriptor;
49 import org.apache.hadoop.hbase.HRegionInfo;
50 import org.apache.hadoop.hbase.HTableDescriptor;
51 import org.apache.hadoop.hbase.KeyValue;
52 import org.apache.hadoop.hbase.KeyValue.KVComparator;
53 import org.apache.hadoop.hbase.KeyValueUtil;
54 import org.apache.hadoop.hbase.MediumTests;
55 import org.apache.hadoop.hbase.TableName;
56 import org.apache.hadoop.hbase.client.Get;
57 import org.apache.hadoop.hbase.io.compress.Compression;
58 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
59 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
60 import org.apache.hadoop.hbase.io.hfile.HFile;
61 import org.apache.hadoop.hbase.io.hfile.HFileContext;
62 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
63 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
64 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
65 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
66 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
67 import org.apache.hadoop.hbase.regionserver.wal.HLog;
68 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
69 import org.apache.hadoop.hbase.security.User;
70 import org.apache.hadoop.hbase.util.Bytes;
71 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
72 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
73 import org.apache.hadoop.hbase.util.FSUtils;
74 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
75 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
76 import org.apache.hadoop.util.Progressable;
77 import org.junit.After;
78 import org.junit.Assert;
79 import org.junit.Before;
80 import org.junit.Rule;
81 import org.junit.Test;
82 import org.junit.experimental.categories.Category;
83 import org.junit.rules.TestName;
84 import org.mockito.Mockito;
85
86
87
88
89 @Category(MediumTests.class)
90 public class TestStore {
91 public static final Log LOG = LogFactory.getLog(TestStore.class);
92 @Rule public TestName name = new TestName();
93
94 HStore store;
95 byte [] table = Bytes.toBytes("table");
96 byte [] family = Bytes.toBytes("family");
97
98 byte [] row = Bytes.toBytes("row");
99 byte [] row2 = Bytes.toBytes("row2");
100 byte [] qf1 = Bytes.toBytes("qf1");
101 byte [] qf2 = Bytes.toBytes("qf2");
102 byte [] qf3 = Bytes.toBytes("qf3");
103 byte [] qf4 = Bytes.toBytes("qf4");
104 byte [] qf5 = Bytes.toBytes("qf5");
105 byte [] qf6 = Bytes.toBytes("qf6");
106
107 NavigableSet<byte[]> qualifiers =
108 new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
109
110 List<Cell> expected = new ArrayList<Cell>();
111 List<Cell> result = new ArrayList<Cell>();
112
113 long id = System.currentTimeMillis();
114 Get get = new Get(row);
115
116 private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
117 private final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
118
119
120
121
122
123
124 @Before
125 public void setUp() throws IOException {
126 qualifiers.add(qf1);
127 qualifiers.add(qf3);
128 qualifiers.add(qf5);
129
130 Iterator<byte[]> iter = qualifiers.iterator();
131 while(iter.hasNext()){
132 byte [] next = iter.next();
133 expected.add(new KeyValue(row, family, next, 1, (byte[])null));
134 get.addColumn(family, next);
135 }
136 }
137
138 private void init(String methodName) throws IOException {
139 init(methodName, HBaseConfiguration.create());
140 }
141
142 private void init(String methodName, Configuration conf)
143 throws IOException {
144 HColumnDescriptor hcd = new HColumnDescriptor(family);
145
146
147 hcd.setMaxVersions(4);
148 init(methodName, conf, hcd);
149 }
150
151 private void init(String methodName, Configuration conf,
152 HColumnDescriptor hcd) throws IOException {
153 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
154 init(methodName, conf, htd, hcd);
155 }
156
157 @SuppressWarnings("deprecation")
158 private Store init(String methodName, Configuration conf, HTableDescriptor htd,
159 HColumnDescriptor hcd) throws IOException {
160
161 Path basedir = new Path(DIR+methodName);
162 Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
163 String logName = "logs";
164 Path logdir = new Path(basedir, logName);
165
166 FileSystem fs = FileSystem.get(conf);
167
168 fs.delete(logdir, true);
169
170 htd.addFamily(hcd);
171 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
172 HLog hlog = HLogFactory.createHLog(fs, basedir, logName, conf);
173 HRegion region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
174
175 store = new HStore(region, hcd, conf);
176 return store;
177 }
178
179
180
181
182
183
184 @Test
185 public void testFlushSizeAccounting() throws Exception {
186 LOG.info("Setting up a faulty file system that cannot write in " +
187 this.name.getMethodName());
188 final Configuration conf = HBaseConfiguration.create();
189
190 conf.setInt("hbase.hstore.flush.retries.number", 1);
191 User user = User.createUserForTesting(conf, this.name.getMethodName(),
192 new String[]{"foo"});
193
194 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
195 user.runAs(new PrivilegedExceptionAction<Object>() {
196 public Object run() throws Exception {
197
198 FileSystem fs = FileSystem.get(conf);
199 Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
200 FaultyFileSystem ffs = (FaultyFileSystem)fs;
201
202
203 init(name.getMethodName(), conf);
204
205 long size = store.memstore.getFlushableSize();
206 Assert.assertEquals(0, size);
207 LOG.info("Adding some data");
208 long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
209 size = store.memstore.getFlushableSize();
210 Assert.assertEquals(kvSize, size);
211
212 try {
213 LOG.info("Flushing");
214 flushStore(store, id++);
215 Assert.fail("Didn't bubble up IOE!");
216 } catch (IOException ioe) {
217 Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
218 }
219 size = store.memstore.getFlushableSize();
220 Assert.assertEquals(kvSize, size);
221 store.add(new KeyValue(row, family, qf2, 2, (byte[])null));
222
223
224 Assert.assertEquals(kvSize, size);
225 ffs.fault.set(false);
226 flushStore(store, id++);
227 size = store.memstore.getFlushableSize();
228
229 Assert.assertEquals(kvSize, size);
230 flushStore(store, id++);
231 size = store.memstore.getFlushableSize();
232 Assert.assertEquals(0, size);
233 return null;
234 }
235 });
236 }
237
238
239
240
241
242 @Test
243 public void testCreateWriter() throws Exception {
244 Configuration conf = HBaseConfiguration.create();
245 FileSystem fs = FileSystem.get(conf);
246
247 HColumnDescriptor hcd = new HColumnDescriptor(family);
248 hcd.setCompressionType(Compression.Algorithm.GZ);
249 hcd.setDataBlockEncoding(DataBlockEncoding.DIFF);
250 init(name.getMethodName(), conf, hcd);
251
252
253 StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false, true, false);
254 Path path = writer.getPath();
255 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
256 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
257 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
258 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
259 writer.close();
260
261
262 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), conf);
263 Assert.assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm());
264 Assert.assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
265 reader.close();
266 }
267
268 @Test
269 public void testDeleteExpiredStoreFiles() throws Exception {
270 int storeFileNum = 4;
271 int ttl = 4;
272 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
273 EnvironmentEdgeManagerTestHelper.injectEdge(edge);
274
275 Configuration conf = HBaseConfiguration.create();
276
277 conf.setBoolean("hbase.store.delete.expired.storefile", true);
278 HColumnDescriptor hcd = new HColumnDescriptor(family);
279 hcd.setTimeToLive(ttl);
280 init(name.getMethodName(), conf, hcd);
281
282 long sleepTime = this.store.getScanInfo().getTtl() / storeFileNum;
283 long timeStamp;
284
285
286 for (int i = 1; i <= storeFileNum; i++) {
287 LOG.info("Adding some data for the store file #" + i);
288 timeStamp = EnvironmentEdgeManager.currentTimeMillis();
289 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null));
290 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null));
291 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null));
292 flush(i);
293 edge.incrementTime(sleepTime);
294 }
295
296
297 Assert.assertEquals(storeFileNum, this.store.getStorefiles().size());
298
299
300
301 for (int i = 1; i <= storeFileNum; i++) {
302
303 CompactionContext compaction = this.store.requestCompaction();
304 CompactionRequest cr = compaction.getRequest();
305
306
307 List<StoreFile> files = new ArrayList<StoreFile>(cr.getFiles());
308 Assert.assertEquals(Math.min(i, 2), cr.getFiles().size());
309 for (int j = 0; j < files.size(); j++) {
310 Assert.assertTrue(files.get(j).getReader().getMaxTimestamp() < (edge
311 .currentTimeMillis() - this.store.getScanInfo().getTtl()));
312 }
313
314
315 StoreFile compactedFile = this.store.compact(compaction).get(0);
316
317 Assert.assertEquals(0, compactedFile.getReader().getEntries());
318
319
320 edge.incrementTime(sleepTime);
321 }
322 }
323
324 @Test
325 public void testLowestModificationTime() throws Exception {
326 Configuration conf = HBaseConfiguration.create();
327 FileSystem fs = FileSystem.get(conf);
328
329 init(name.getMethodName(), conf);
330
331 int storeFileNum = 4;
332 for (int i = 1; i <= storeFileNum; i++) {
333 LOG.info("Adding some data for the store file #"+i);
334 this.store.add(new KeyValue(row, family, qf1, i, (byte[])null));
335 this.store.add(new KeyValue(row, family, qf2, i, (byte[])null));
336 this.store.add(new KeyValue(row, family, qf3, i, (byte[])null));
337 flush(i);
338 }
339
340 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
341 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
342 Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
343
344
345 store.compact(store.requestCompaction());
346 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
347 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
348 Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
349 }
350
351 private static long getLowestTimeStampFromFS(FileSystem fs,
352 final Collection<StoreFile> candidates) throws IOException {
353 long minTs = Long.MAX_VALUE;
354 if (candidates.isEmpty()) {
355 return minTs;
356 }
357 Path[] p = new Path[candidates.size()];
358 int i = 0;
359 for (StoreFile sf : candidates) {
360 p[i] = sf.getPath();
361 ++i;
362 }
363
364 FileStatus[] stats = fs.listStatus(p);
365 if (stats == null || stats.length == 0) {
366 return minTs;
367 }
368 for (FileStatus s : stats) {
369 minTs = Math.min(minTs, s.getModificationTime());
370 }
371 return minTs;
372 }
373
374
375
376
377
378 private static final int BLOCKSIZE_SMALL = 8192;
379
380
381
382
383 @Test
384 public void testEmptyStoreFile() throws IOException {
385 init(this.name.getMethodName());
386
387 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
388 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
389 flush(1);
390
391
392 StoreFile f = this.store.getStorefiles().iterator().next();
393 Path storedir = f.getPath().getParent();
394 long seqid = f.getMaxSequenceId();
395 Configuration c = HBaseConfiguration.create();
396 FileSystem fs = FileSystem.get(c);
397 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
398 StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c),
399 fs)
400 .withOutputDir(storedir)
401 .withFileContext(meta)
402 .build();
403 w.appendMetadata(seqid + 1, false);
404 w.close();
405 this.store.close();
406
407 this.store = new HStore(this.store.getHRegion(), this.store.getFamily(), c);
408 Assert.assertEquals(2, this.store.getStorefilesCount());
409
410 result = HBaseTestingUtility.getFromStoreFile(store,
411 get.getRow(),
412 qualifiers);
413 Assert.assertEquals(1, result.size());
414 }
415
416
417
418
419
420 @Test
421 public void testGet_FromMemStoreOnly() throws IOException {
422 init(this.name.getMethodName());
423
424
425 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
426 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
427 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
428 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
429 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
430 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
431
432
433 result = HBaseTestingUtility.getFromStoreFile(store,
434 get.getRow(), qualifiers);
435
436
437 assertCheck();
438 }
439
440
441
442
443
444 @Test
445 public void testGet_FromFilesOnly() throws IOException {
446 init(this.name.getMethodName());
447
448
449 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
450 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
451
452 flush(1);
453
454
455 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
456 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
457
458 flush(2);
459
460
461 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
462 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
463
464 flush(3);
465
466
467 result = HBaseTestingUtility.getFromStoreFile(store,
468 get.getRow(),
469 qualifiers);
470
471
472
473 Collections.sort(result, KeyValue.COMPARATOR);
474
475
476 assertCheck();
477 }
478
479
480
481
482
483 @Test
484 public void testGet_FromMemStoreAndFiles() throws IOException {
485 init(this.name.getMethodName());
486
487
488 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
489 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
490
491 flush(1);
492
493
494 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
495 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
496
497 flush(2);
498
499
500 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
501 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
502
503
504 result = HBaseTestingUtility.getFromStoreFile(store,
505 get.getRow(), qualifiers);
506
507
508 Collections.sort(result, KeyValue.COMPARATOR);
509
510
511 assertCheck();
512 }
513
514 private void flush(int storeFilessize) throws IOException{
515 this.store.snapshot();
516 flushStore(store, id++);
517 Assert.assertEquals(storeFilessize, this.store.getStorefiles().size());
518 Assert.assertEquals(0, this.store.memstore.kvset.size());
519 }
520
521 private void assertCheck() {
522 Assert.assertEquals(expected.size(), result.size());
523 for(int i=0; i<expected.size(); i++) {
524 Assert.assertEquals(expected.get(i), result.get(i));
525 }
526 }
527
528
529
530
531
532
533
534 @Test
535 public void testIncrementColumnValue_ICVDuringFlush()
536 throws IOException, InterruptedException {
537 init(this.name.getMethodName());
538
539 long oldValue = 1L;
540 long newValue = 3L;
541 this.store.add(new KeyValue(row, family, qf1,
542 System.currentTimeMillis(),
543 Bytes.toBytes(oldValue)));
544
545
546 this.store.snapshot();
547
548
549 this.store.add(new KeyValue(row, family, qf2,
550 System.currentTimeMillis(),
551 Bytes.toBytes(oldValue)));
552
553
554 long ret = this.store.updateColumnValue(row, family, qf1, newValue);
555
556
557 Assert.assertTrue(ret > 0);
558
559
560 flushStore(store, id++);
561 Assert.assertEquals(1, this.store.getStorefiles().size());
562
563 Assert.assertEquals(2, this.store.memstore.kvset.size());
564
565
566 Get get = new Get(row);
567 get.addColumn(family, qf1);
568 get.setMaxVersions();
569 List<Cell> results = new ArrayList<Cell>();
570
571 results = HBaseTestingUtility.getFromStoreFile(store, get);
572 Assert.assertEquals(2, results.size());
573
574 long ts1 = results.get(0).getTimestamp();
575 long ts2 = results.get(1).getTimestamp();
576
577 Assert.assertTrue(ts1 > ts2);
578
579 Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
580 Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
581 }
582
583 @After
584 public void tearDown() throws Exception {
585 EnvironmentEdgeManagerTestHelper.reset();
586 }
587
588 @Test
589 public void testICV_negMemstoreSize() throws IOException {
590 init(this.name.getMethodName());
591
592 long time = 100;
593 ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
594 ee.setValue(time);
595 EnvironmentEdgeManagerTestHelper.injectEdge(ee);
596 long newValue = 3L;
597 long size = 0;
598
599
600 size += this.store.add(new KeyValue(Bytes.toBytes("200909091000"), family, qf1,
601 System.currentTimeMillis(),
602 Bytes.toBytes(newValue)));
603 size += this.store.add(new KeyValue(Bytes.toBytes("200909091200"), family, qf1,
604 System.currentTimeMillis(),
605 Bytes.toBytes(newValue)));
606 size += this.store.add(new KeyValue(Bytes.toBytes("200909091300"), family, qf1,
607 System.currentTimeMillis(),
608 Bytes.toBytes(newValue)));
609 size += this.store.add(new KeyValue(Bytes.toBytes("200909091400"), family, qf1,
610 System.currentTimeMillis(),
611 Bytes.toBytes(newValue)));
612 size += this.store.add(new KeyValue(Bytes.toBytes("200909091500"), family, qf1,
613 System.currentTimeMillis(),
614 Bytes.toBytes(newValue)));
615
616
617 for ( int i = 0 ; i < 10000 ; ++i) {
618 newValue++;
619
620 long ret = this.store.updateColumnValue(row, family, qf1, newValue);
621 long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue);
622
623 if (ret != 0) System.out.println("ret: " + ret);
624 if (ret2 != 0) System.out.println("ret2: " + ret2);
625
626 Assert.assertTrue("ret: " + ret, ret >= 0);
627 size += ret;
628 Assert.assertTrue("ret2: " + ret2, ret2 >= 0);
629 size += ret2;
630
631
632 if (i % 1000 == 0)
633 ee.setValue(++time);
634 }
635
636 long computedSize=0;
637 for (KeyValue kv : this.store.memstore.kvset) {
638 long kvsize = MemStore.heapSizeChange(kv, true);
639
640 computedSize += kvsize;
641 }
642 Assert.assertEquals(computedSize, size);
643 }
644
645 @Test
646 public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception {
647 ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
648 EnvironmentEdgeManagerTestHelper.injectEdge(mee);
649 init(this.name.getMethodName());
650
651 long oldValue = 1L;
652 long newValue = 3L;
653 this.store.add(new KeyValue(row, family, qf1,
654 EnvironmentEdgeManager.currentTimeMillis(),
655 Bytes.toBytes(oldValue)));
656
657
658 this.store.snapshot();
659
660
661 long ret = this.store.updateColumnValue(row, family, qf1, newValue);
662
663
664 Assert.assertTrue(ret > 0);
665
666
667 flushStore(store, id++);
668 Assert.assertEquals(1, this.store.getStorefiles().size());
669 Assert.assertEquals(1, this.store.memstore.kvset.size());
670
671
672 newValue += 1;
673 this.store.updateColumnValue(row, family, qf1, newValue);
674
675
676 newValue += 1;
677 this.store.updateColumnValue(row, family, qf1, newValue);
678
679
680
681
682
683 Get get = new Get(row);
684 get.addColumn(family, qf1);
685 get.setMaxVersions();
686 List<Cell> results = new ArrayList<Cell>();
687
688 results = HBaseTestingUtility.getFromStoreFile(store, get);
689 Assert.assertEquals(2, results.size());
690
691 long ts1 = results.get(0).getTimestamp();
692 long ts2 = results.get(1).getTimestamp();
693
694 Assert.assertTrue(ts1 > ts2);
695 Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
696 Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
697
698 mee.setValue(2);
699 newValue += 1;
700 this.store.updateColumnValue(row, family, qf1, newValue);
701
702 results = HBaseTestingUtility.getFromStoreFile(store, get);
703 Assert.assertEquals(2, results.size());
704
705 ts1 = results.get(0).getTimestamp();
706 ts2 = results.get(1).getTimestamp();
707
708 Assert.assertTrue(ts1 > ts2);
709 Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
710 Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
711 }
712
713 @Test
714 public void testHandleErrorsInFlush() throws Exception {
715 LOG.info("Setting up a faulty file system that cannot write");
716
717 final Configuration conf = HBaseConfiguration.create();
718 User user = User.createUserForTesting(conf,
719 "testhandleerrorsinflush", new String[]{"foo"});
720
721 conf.setClass("fs.file.impl", FaultyFileSystem.class,
722 FileSystem.class);
723 user.runAs(new PrivilegedExceptionAction<Object>() {
724 public Object run() throws Exception {
725
726 FileSystem fs = FileSystem.get(conf);
727 Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
728
729
730 init(name.getMethodName(), conf);
731
732 LOG.info("Adding some data");
733 store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
734 store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
735 store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
736
737 LOG.info("Before flush, we should have no files");
738
739 Collection<StoreFileInfo> files =
740 store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
741 Assert.assertEquals(0, files != null ? files.size() : 0);
742
743
744 try {
745 LOG.info("Flushing");
746 flush(1);
747 Assert.fail("Didn't bubble up IOE!");
748 } catch (IOException ioe) {
749 Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
750 }
751
752 LOG.info("After failed flush, we should still have no files!");
753 files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
754 Assert.assertEquals(0, files != null ? files.size() : 0);
755 store.getHRegion().getLog().closeAndDelete();
756 return null;
757 }
758 });
759 FileSystem.closeAllForUGI(user.getUGI());
760 }
761
762
763
764
765
766 static class FaultyFileSystem extends FilterFileSystem {
767 List<SoftReference<FaultyOutputStream>> outStreams =
768 new ArrayList<SoftReference<FaultyOutputStream>>();
769 private long faultPos = 200;
770 AtomicBoolean fault = new AtomicBoolean(true);
771
772 public FaultyFileSystem() {
773 super(new LocalFileSystem());
774 System.err.println("Creating faulty!");
775 }
776
777 @Override
778 public FSDataOutputStream create(Path p) throws IOException {
779 return new FaultyOutputStream(super.create(p), faultPos, fault);
780 }
781
782 @Override
783 public FSDataOutputStream create(Path f, FsPermission permission,
784 boolean overwrite, int bufferSize, short replication, long blockSize,
785 Progressable progress) throws IOException {
786 return new FaultyOutputStream(super.create(f, permission,
787 overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
788 }
789
790 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
791 int bufferSize, short replication, long blockSize, Progressable progress)
792 throws IOException {
793
794
795 return create(f, overwrite, bufferSize, replication, blockSize, progress);
796 }
797 }
798
799 static class FaultyOutputStream extends FSDataOutputStream {
800 volatile long faultPos = Long.MAX_VALUE;
801 private final AtomicBoolean fault;
802
803 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
804 throws IOException {
805 super(out, null);
806 this.faultPos = faultPos;
807 this.fault = fault;
808 }
809
810 @Override
811 public void write(byte[] buf, int offset, int length) throws IOException {
812 System.err.println("faulty stream write at pos " + getPos());
813 injectFault();
814 super.write(buf, offset, length);
815 }
816
817 private void injectFault() throws IOException {
818 if (this.fault.get() && getPos() >= faultPos) {
819 throw new IOException("Fault injected");
820 }
821 }
822 }
823
824 private static void flushStore(HStore store, long id) throws IOException {
825 StoreFlushContext storeFlushCtx = store.createFlushContext(id);
826 storeFlushCtx.prepare();
827 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
828 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
829 }
830
831
832
833
834
835
836
837
838
839 List<Cell> getKeyValueSet(long[] timestamps, int numRows,
840 byte[] qualifier, byte[] family) {
841 List<Cell> kvList = new ArrayList<Cell>();
842 for (int i=1;i<=numRows;i++) {
843 byte[] b = Bytes.toBytes(i);
844 for (long timestamp: timestamps) {
845 kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
846 }
847 }
848 return kvList;
849 }
850
851
852
853
854
855 @Test
856 public void testMultipleTimestamps() throws IOException {
857 int numRows = 1;
858 long[] timestamps1 = new long[] {1,5,10,20};
859 long[] timestamps2 = new long[] {30,80};
860
861 init(this.name.getMethodName());
862
863 List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
864 for (Cell kv : kvList1) {
865 this.store.add(KeyValueUtil.ensureKeyValue(kv));
866 }
867
868 this.store.snapshot();
869 flushStore(store, id++);
870
871 List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
872 for(Cell kv : kvList2) {
873 this.store.add(KeyValueUtil.ensureKeyValue(kv));
874 }
875
876 List<Cell> result;
877 Get get = new Get(Bytes.toBytes(1));
878 get.addColumn(family,qf1);
879
880 get.setTimeRange(0,15);
881 result = HBaseTestingUtility.getFromStoreFile(store, get);
882 Assert.assertTrue(result.size()>0);
883
884 get.setTimeRange(40,90);
885 result = HBaseTestingUtility.getFromStoreFile(store, get);
886 Assert.assertTrue(result.size()>0);
887
888 get.setTimeRange(10,45);
889 result = HBaseTestingUtility.getFromStoreFile(store, get);
890 Assert.assertTrue(result.size()>0);
891
892 get.setTimeRange(80,145);
893 result = HBaseTestingUtility.getFromStoreFile(store, get);
894 Assert.assertTrue(result.size()>0);
895
896 get.setTimeRange(1,2);
897 result = HBaseTestingUtility.getFromStoreFile(store, get);
898 Assert.assertTrue(result.size()>0);
899
900 get.setTimeRange(90,200);
901 result = HBaseTestingUtility.getFromStoreFile(store, get);
902 Assert.assertTrue(result.size()==0);
903 }
904
905
906
907
908
909
910 @Test
911 public void testSplitWithEmptyColFam() throws IOException {
912 init(this.name.getMethodName());
913 Assert.assertNull(store.getSplitPoint());
914 store.getHRegion().forceSplit(null);
915 Assert.assertNull(store.getSplitPoint());
916 store.getHRegion().clearSplit_TESTS_ONLY();
917 }
918
919 @Test
920 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
921 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
922 long anyValue = 10;
923
924
925
926
927 Configuration conf = HBaseConfiguration.create();
928 conf.setLong(CONFIG_KEY, anyValue);
929 init(name.getMethodName() + "-xml", conf);
930 Assert.assertTrue(store.throttleCompaction(anyValue + 1));
931 Assert.assertFalse(store.throttleCompaction(anyValue));
932
933
934 --anyValue;
935 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
936 HColumnDescriptor hcd = new HColumnDescriptor(family);
937 htd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
938 init(name.getMethodName() + "-htd", conf, htd, hcd);
939 Assert.assertTrue(store.throttleCompaction(anyValue + 1));
940 Assert.assertFalse(store.throttleCompaction(anyValue));
941
942
943 --anyValue;
944 hcd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
945 init(name.getMethodName() + "-hcd", conf, htd, hcd);
946 Assert.assertTrue(store.throttleCompaction(anyValue + 1));
947 Assert.assertFalse(store.throttleCompaction(anyValue));
948 }
949
950 public static class DummyStoreEngine extends DefaultStoreEngine {
951 public static DefaultCompactor lastCreatedCompactor = null;
952 @Override
953 protected void createComponents(
954 Configuration conf, Store store, KVComparator comparator) throws IOException {
955 super.createComponents(conf, store, comparator);
956 lastCreatedCompactor = this.compactor;
957 }
958 }
959
960 @Test
961 public void testStoreUsesSearchEngineOverride() throws Exception {
962 Configuration conf = HBaseConfiguration.create();
963 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
964 init(this.name.getMethodName(), conf);
965 Assert.assertEquals(DummyStoreEngine.lastCreatedCompactor,
966 this.store.storeEngine.getCompactor());
967 }
968 }