View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.lang.ref.SoftReference;
24  import java.security.PrivilegedExceptionAction;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.NavigableSet;
31  import java.util.concurrent.ConcurrentSkipListSet;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FSDataOutputStream;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.FilterFileSystem;
41  import org.apache.hadoop.fs.LocalFileSystem;
42  import org.apache.hadoop.fs.Path;
43  import org.apache.hadoop.fs.permission.FsPermission;
44  import org.apache.hadoop.hbase.Cell;
45  import org.apache.hadoop.hbase.CellUtil;
46  import org.apache.hadoop.hbase.HBaseConfiguration;
47  import org.apache.hadoop.hbase.HBaseTestingUtility;
48  import org.apache.hadoop.hbase.HColumnDescriptor;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.HTableDescriptor;
51  import org.apache.hadoop.hbase.KeyValue;
52  import org.apache.hadoop.hbase.KeyValue.KVComparator;
53  import org.apache.hadoop.hbase.KeyValueUtil;
54  import org.apache.hadoop.hbase.MediumTests;
55  import org.apache.hadoop.hbase.TableName;
56  import org.apache.hadoop.hbase.client.Get;
57  import org.apache.hadoop.hbase.io.compress.Compression;
58  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
59  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
60  import org.apache.hadoop.hbase.io.hfile.HFile;
61  import org.apache.hadoop.hbase.io.hfile.HFileContext;
62  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
63  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
64  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
65  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
66  import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
67  import org.apache.hadoop.hbase.regionserver.wal.HLog;
68  import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
69  import org.apache.hadoop.hbase.security.User;
70  import org.apache.hadoop.hbase.util.Bytes;
71  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
72  import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
73  import org.apache.hadoop.hbase.util.FSUtils;
74  import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
75  import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
76  import org.apache.hadoop.util.Progressable;
77  import org.junit.After;
78  import org.junit.Assert;
79  import org.junit.Before;
80  import org.junit.Rule;
81  import org.junit.Test;
82  import org.junit.experimental.categories.Category;
83  import org.junit.rules.TestName;
84  import org.mockito.Mockito;
85  
86  /**
87   * Test class for the Store
88   */
89  @Category(MediumTests.class)
90  public class TestStore {
91    public static final Log LOG = LogFactory.getLog(TestStore.class);
92    @Rule public TestName name = new TestName();
93  
94    HStore store;
95    byte [] table = Bytes.toBytes("table");
96    byte [] family = Bytes.toBytes("family");
97  
98    byte [] row = Bytes.toBytes("row");
99    byte [] row2 = Bytes.toBytes("row2");
100   byte [] qf1 = Bytes.toBytes("qf1");
101   byte [] qf2 = Bytes.toBytes("qf2");
102   byte [] qf3 = Bytes.toBytes("qf3");
103   byte [] qf4 = Bytes.toBytes("qf4");
104   byte [] qf5 = Bytes.toBytes("qf5");
105   byte [] qf6 = Bytes.toBytes("qf6");
106 
107   NavigableSet<byte[]> qualifiers =
108     new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
109 
110   List<Cell> expected = new ArrayList<Cell>();
111   List<Cell> result = new ArrayList<Cell>();
112 
113   long id = System.currentTimeMillis();
114   Get get = new Get(row);
115 
116   private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
117   private final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
118 
119 
120   /**
121    * Setup
122    * @throws IOException
123    */
124   @Before
125   public void setUp() throws IOException {
126     qualifiers.add(qf1);
127     qualifiers.add(qf3);
128     qualifiers.add(qf5);
129 
130     Iterator<byte[]> iter = qualifiers.iterator();
131     while(iter.hasNext()){
132       byte [] next = iter.next();
133       expected.add(new KeyValue(row, family, next, 1, (byte[])null));
134       get.addColumn(family, next);
135     }
136   }
137 
138   private void init(String methodName) throws IOException {
139     init(methodName, HBaseConfiguration.create());
140   }
141 
142   private void init(String methodName, Configuration conf)
143   throws IOException {
144     HColumnDescriptor hcd = new HColumnDescriptor(family);
145     // some of the tests write 4 versions and then flush
146     // (with HBASE-4241, lower versions are collected on flush)
147     hcd.setMaxVersions(4);
148     init(methodName, conf, hcd);
149   }
150 
151   private void init(String methodName, Configuration conf,
152       HColumnDescriptor hcd) throws IOException {
153     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
154     init(methodName, conf, htd, hcd);
155   }
156 
157   @SuppressWarnings("deprecation")
158   private Store init(String methodName, Configuration conf, HTableDescriptor htd,
159       HColumnDescriptor hcd) throws IOException {
160     //Setting up a Store
161     Path basedir = new Path(DIR+methodName);
162     Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
163     String logName = "logs";
164     Path logdir = new Path(basedir, logName);
165 
166     FileSystem fs = FileSystem.get(conf);
167 
168     fs.delete(logdir, true);
169 
170     htd.addFamily(hcd);
171     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
172     HLog hlog = HLogFactory.createHLog(fs, basedir, logName, conf);
173     HRegion region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
174 
175     store = new HStore(region, hcd, conf);
176     return store;
177   }
178 
179   /**
180    * Test we do not lose data if we fail a flush and then close.
181    * Part of HBase-10466
182    * @throws Exception
183    */
184   @Test
185   public void testFlushSizeAccounting() throws Exception {
186     LOG.info("Setting up a faulty file system that cannot write in " +
187       this.name.getMethodName());
188     final Configuration conf = HBaseConfiguration.create();
189     // Only retry once.
190     conf.setInt("hbase.hstore.flush.retries.number", 1);
191     User user = User.createUserForTesting(conf, this.name.getMethodName(),
192       new String[]{"foo"});
193     // Inject our faulty LocalFileSystem
194     conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
195     user.runAs(new PrivilegedExceptionAction<Object>() {
196       public Object run() throws Exception {
197         // Make sure it worked (above is sensitive to caching details in hadoop core)
198         FileSystem fs = FileSystem.get(conf);
199         Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
200         FaultyFileSystem ffs = (FaultyFileSystem)fs;
201 
202         // Initialize region
203         init(name.getMethodName(), conf);
204 
205         long size = store.memstore.getFlushableSize();
206         Assert.assertEquals(0, size);
207         LOG.info("Adding some data");
208         long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
209         size = store.memstore.getFlushableSize();
210         Assert.assertEquals(kvSize, size);
211         // Flush.  Bug #1 from HBASE-10466.  Make sure size calculation on failed flush is right.
212         try {
213           LOG.info("Flushing");
214           flushStore(store, id++);
215           Assert.fail("Didn't bubble up IOE!");
216         } catch (IOException ioe) {
217           Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
218         }
219         size = store.memstore.getFlushableSize();
220         Assert.assertEquals(kvSize, size);
221         store.add(new KeyValue(row, family, qf2, 2, (byte[])null));
222         // Even though we add a new kv, we expect the flushable size to be 'same' since we have
223         // not yet cleared the snapshot -- the above flush failed.
224         Assert.assertEquals(kvSize, size);
225         ffs.fault.set(false);
226         flushStore(store, id++);
227         size = store.memstore.getFlushableSize();
228         // Size should be the foreground kv size.
229         Assert.assertEquals(kvSize, size);
230         flushStore(store, id++);
231         size = store.memstore.getFlushableSize();
232         Assert.assertEquals(0, size);
233         return null;
234       }
235     });
236   }
237 
238   /**
239    * Verify that compression and data block encoding are respected by the
240    * Store.createWriterInTmp() method, used on store flush.
241    */
242   @Test
243   public void testCreateWriter() throws Exception {
244     Configuration conf = HBaseConfiguration.create();
245     FileSystem fs = FileSystem.get(conf);
246 
247     HColumnDescriptor hcd = new HColumnDescriptor(family);
248     hcd.setCompressionType(Compression.Algorithm.GZ);
249     hcd.setDataBlockEncoding(DataBlockEncoding.DIFF);
250     init(name.getMethodName(), conf, hcd);
251 
252     // Test createWriterInTmp()
253     StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false, true, false);
254     Path path = writer.getPath();
255     writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
256     writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
257     writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
258     writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
259     writer.close();
260 
261     // Verify that compression and encoding settings are respected
262     HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), conf);
263     Assert.assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm());
264     Assert.assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
265     reader.close();
266   }
267 
268   @Test
269   public void testDeleteExpiredStoreFiles() throws Exception {
270     int storeFileNum = 4;
271     int ttl = 4;
272     IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
273     EnvironmentEdgeManagerTestHelper.injectEdge(edge);
274     
275     Configuration conf = HBaseConfiguration.create();
276     // Enable the expired store file deletion
277     conf.setBoolean("hbase.store.delete.expired.storefile", true);
278     HColumnDescriptor hcd = new HColumnDescriptor(family);
279     hcd.setTimeToLive(ttl);
280     init(name.getMethodName(), conf, hcd);
281 
282     long sleepTime = this.store.getScanInfo().getTtl() / storeFileNum;
283     long timeStamp;
284     // There are 4 store files and the max time stamp difference among these
285     // store files will be (this.store.ttl / storeFileNum)
286     for (int i = 1; i <= storeFileNum; i++) {
287       LOG.info("Adding some data for the store file #" + i);
288       timeStamp = EnvironmentEdgeManager.currentTimeMillis();
289       this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null));
290       this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null));
291       this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null));
292       flush(i);
293       edge.incrementTime(sleepTime);
294     }
295 
296     // Verify the total number of store files
297     Assert.assertEquals(storeFileNum, this.store.getStorefiles().size());
298 
299     // Each compaction request will find one expired store file and delete it
300     // by the compaction.
301     for (int i = 1; i <= storeFileNum; i++) {
302       // verify the expired store file.
303       CompactionContext compaction = this.store.requestCompaction();
304       CompactionRequest cr = compaction.getRequest();
305       // the first is expired normally.
306       // If not the first compaction, there is another empty store file,
307       List<StoreFile> files = new ArrayList<StoreFile>(cr.getFiles());
308       Assert.assertEquals(Math.min(i, 2), cr.getFiles().size());
309       for (int j = 0; j < files.size(); j++) {
310         Assert.assertTrue(files.get(j).getReader().getMaxTimestamp() < (edge
311             .currentTimeMillis() - this.store.getScanInfo().getTtl()));
312       }
313       // Verify that the expired store file is compacted to an empty store file.
314       // Default compaction policy creates just one and only one compacted file.
315       StoreFile compactedFile = this.store.compact(compaction).get(0);
316       // It is an empty store file.
317       Assert.assertEquals(0, compactedFile.getReader().getEntries());
318 
319       // Let the next store file expired.
320       edge.incrementTime(sleepTime);
321     }
322   }
323 
324   @Test
325   public void testLowestModificationTime() throws Exception {
326     Configuration conf = HBaseConfiguration.create();
327     FileSystem fs = FileSystem.get(conf);
328     // Initialize region
329     init(name.getMethodName(), conf);
330     
331     int storeFileNum = 4;
332     for (int i = 1; i <= storeFileNum; i++) {
333       LOG.info("Adding some data for the store file #"+i);
334       this.store.add(new KeyValue(row, family, qf1, i, (byte[])null));
335       this.store.add(new KeyValue(row, family, qf2, i, (byte[])null));
336       this.store.add(new KeyValue(row, family, qf3, i, (byte[])null));
337       flush(i);
338     }
339     // after flush; check the lowest time stamp
340     long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
341     long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
342     Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
343 
344     // after compact; check the lowest time stamp
345     store.compact(store.requestCompaction());
346     lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
347     lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
348     Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
349   }
350   
351   private static long getLowestTimeStampFromFS(FileSystem fs, 
352       final Collection<StoreFile> candidates) throws IOException {
353     long minTs = Long.MAX_VALUE;
354     if (candidates.isEmpty()) {
355       return minTs; 
356     }
357     Path[] p = new Path[candidates.size()];
358     int i = 0;
359     for (StoreFile sf : candidates) {
360       p[i] = sf.getPath();
361       ++i;
362     }
363     
364     FileStatus[] stats = fs.listStatus(p);
365     if (stats == null || stats.length == 0) {
366       return minTs;
367     }
368     for (FileStatus s : stats) {
369       minTs = Math.min(minTs, s.getModificationTime());
370     }
371     return minTs;
372   }
373 
374   //////////////////////////////////////////////////////////////////////////////
375   // Get tests
376   //////////////////////////////////////////////////////////////////////////////
377 
378   private static final int BLOCKSIZE_SMALL = 8192;
379   /**
380    * Test for hbase-1686.
381    * @throws IOException
382    */
383   @Test
384   public void testEmptyStoreFile() throws IOException {
385     init(this.name.getMethodName());
386     // Write a store file.
387     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
388     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
389     flush(1);
390     // Now put in place an empty store file.  Its a little tricky.  Have to
391     // do manually with hacked in sequence id.
392     StoreFile f = this.store.getStorefiles().iterator().next();
393     Path storedir = f.getPath().getParent();
394     long seqid = f.getMaxSequenceId();
395     Configuration c = HBaseConfiguration.create();
396     FileSystem fs = FileSystem.get(c);
397     HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
398     StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c),
399         fs)
400             .withOutputDir(storedir)
401             .withFileContext(meta)
402             .build();
403     w.appendMetadata(seqid + 1, false);
404     w.close();
405     this.store.close();
406     // Reopen it... should pick up two files
407     this.store = new HStore(this.store.getHRegion(), this.store.getFamily(), c);
408     Assert.assertEquals(2, this.store.getStorefilesCount());
409 
410     result = HBaseTestingUtility.getFromStoreFile(store,
411         get.getRow(),
412         qualifiers);
413     Assert.assertEquals(1, result.size());
414   }
415 
416   /**
417    * Getting data from memstore only
418    * @throws IOException
419    */
420   @Test
421   public void testGet_FromMemStoreOnly() throws IOException {
422     init(this.name.getMethodName());
423 
424     //Put data in memstore
425     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
426     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
427     this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
428     this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
429     this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
430     this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
431 
432     //Get
433     result = HBaseTestingUtility.getFromStoreFile(store,
434         get.getRow(), qualifiers);
435 
436     //Compare
437     assertCheck();
438   }
439 
440   /**
441    * Getting data from files only
442    * @throws IOException
443    */
444   @Test
445   public void testGet_FromFilesOnly() throws IOException {
446     init(this.name.getMethodName());
447 
448     //Put data in memstore
449     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
450     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
451     //flush
452     flush(1);
453 
454     //Add more data
455     this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
456     this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
457     //flush
458     flush(2);
459 
460     //Add more data
461     this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
462     this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
463     //flush
464     flush(3);
465 
466     //Get
467     result = HBaseTestingUtility.getFromStoreFile(store,
468         get.getRow(),
469         qualifiers);
470     //this.store.get(get, qualifiers, result);
471 
472     //Need to sort the result since multiple files
473     Collections.sort(result, KeyValue.COMPARATOR);
474 
475     //Compare
476     assertCheck();
477   }
478 
479   /**
480    * Getting data from memstore and files
481    * @throws IOException
482    */
483   @Test
484   public void testGet_FromMemStoreAndFiles() throws IOException {
485     init(this.name.getMethodName());
486 
487     //Put data in memstore
488     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
489     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
490     //flush
491     flush(1);
492 
493     //Add more data
494     this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
495     this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
496     //flush
497     flush(2);
498 
499     //Add more data
500     this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
501     this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
502 
503     //Get
504     result = HBaseTestingUtility.getFromStoreFile(store,
505         get.getRow(), qualifiers);
506 
507     //Need to sort the result since multiple files
508     Collections.sort(result, KeyValue.COMPARATOR);
509 
510     //Compare
511     assertCheck();
512   }
513 
514   private void flush(int storeFilessize) throws IOException{
515     this.store.snapshot();
516     flushStore(store, id++);
517     Assert.assertEquals(storeFilessize, this.store.getStorefiles().size());
518     Assert.assertEquals(0, this.store.memstore.kvset.size());
519   }
520 
521   private void assertCheck() {
522     Assert.assertEquals(expected.size(), result.size());
523     for(int i=0; i<expected.size(); i++) {
524       Assert.assertEquals(expected.get(i), result.get(i));
525     }
526   }
527 
528   //////////////////////////////////////////////////////////////////////////////
529   // IncrementColumnValue tests
530   //////////////////////////////////////////////////////////////////////////////
531   /*
532    * test the internal details of how ICV works, especially during a flush scenario.
533    */
534   @Test
535   public void testIncrementColumnValue_ICVDuringFlush()
536       throws IOException, InterruptedException {
537     init(this.name.getMethodName());
538 
539     long oldValue = 1L;
540     long newValue = 3L;
541     this.store.add(new KeyValue(row, family, qf1,
542         System.currentTimeMillis(),
543         Bytes.toBytes(oldValue)));
544 
545     // snapshot the store.
546     this.store.snapshot();
547 
548     // add other things:
549     this.store.add(new KeyValue(row, family, qf2,
550         System.currentTimeMillis(),
551         Bytes.toBytes(oldValue)));
552 
553     // update during the snapshot.
554     long ret = this.store.updateColumnValue(row, family, qf1, newValue);
555 
556     // memstore should have grown by some amount.
557     Assert.assertTrue(ret > 0);
558 
559     // then flush.
560     flushStore(store, id++);
561     Assert.assertEquals(1, this.store.getStorefiles().size());
562     // from the one we inserted up there, and a new one
563     Assert.assertEquals(2, this.store.memstore.kvset.size());
564 
565     // how many key/values for this row are there?
566     Get get = new Get(row);
567     get.addColumn(family, qf1);
568     get.setMaxVersions(); // all versions.
569     List<Cell> results = new ArrayList<Cell>();
570 
571     results = HBaseTestingUtility.getFromStoreFile(store, get);
572     Assert.assertEquals(2, results.size());
573 
574     long ts1 = results.get(0).getTimestamp();
575     long ts2 = results.get(1).getTimestamp();
576 
577     Assert.assertTrue(ts1 > ts2);
578 
579     Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
580     Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
581   }
582 
583   @After
584   public void tearDown() throws Exception {
585     EnvironmentEdgeManagerTestHelper.reset();
586   }
587 
588   @Test
589   public void testICV_negMemstoreSize()  throws IOException {
590       init(this.name.getMethodName());
591 
592     long time = 100;
593     ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
594     ee.setValue(time);
595     EnvironmentEdgeManagerTestHelper.injectEdge(ee);
596     long newValue = 3L;
597     long size = 0;
598 
599 
600     size += this.store.add(new KeyValue(Bytes.toBytes("200909091000"), family, qf1,
601         System.currentTimeMillis(),
602         Bytes.toBytes(newValue)));
603     size += this.store.add(new KeyValue(Bytes.toBytes("200909091200"), family, qf1,
604         System.currentTimeMillis(),
605         Bytes.toBytes(newValue)));
606     size += this.store.add(new KeyValue(Bytes.toBytes("200909091300"), family, qf1,
607         System.currentTimeMillis(),
608         Bytes.toBytes(newValue)));
609     size += this.store.add(new KeyValue(Bytes.toBytes("200909091400"), family, qf1,
610         System.currentTimeMillis(),
611         Bytes.toBytes(newValue)));
612     size += this.store.add(new KeyValue(Bytes.toBytes("200909091500"), family, qf1,
613         System.currentTimeMillis(),
614         Bytes.toBytes(newValue)));
615 
616 
617     for ( int i = 0 ; i < 10000 ; ++i) {
618       newValue++;
619 
620       long ret = this.store.updateColumnValue(row, family, qf1, newValue);
621       long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue);
622 
623       if (ret != 0) System.out.println("ret: " + ret);
624       if (ret2 != 0) System.out.println("ret2: " + ret2);
625 
626       Assert.assertTrue("ret: " + ret, ret >= 0);
627       size += ret;
628       Assert.assertTrue("ret2: " + ret2, ret2 >= 0);
629       size += ret2;
630 
631 
632       if (i % 1000 == 0)
633         ee.setValue(++time);
634     }
635 
636     long computedSize=0;
637     for (KeyValue kv : this.store.memstore.kvset) {
638       long kvsize = MemStore.heapSizeChange(kv, true);
639       //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize());
640       computedSize += kvsize;
641     }
642     Assert.assertEquals(computedSize, size);
643   }
644 
645   @Test
646   public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception {
647     ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
648     EnvironmentEdgeManagerTestHelper.injectEdge(mee);
649     init(this.name.getMethodName());
650 
651     long oldValue = 1L;
652     long newValue = 3L;
653     this.store.add(new KeyValue(row, family, qf1,
654         EnvironmentEdgeManager.currentTimeMillis(),
655         Bytes.toBytes(oldValue)));
656 
657     // snapshot the store.
658     this.store.snapshot();
659 
660     // update during the snapshot, the exact same TS as the Put (lololol)
661     long ret = this.store.updateColumnValue(row, family, qf1, newValue);
662 
663     // memstore should have grown by some amount.
664     Assert.assertTrue(ret > 0);
665 
666     // then flush.
667     flushStore(store, id++);
668     Assert.assertEquals(1, this.store.getStorefiles().size());
669     Assert.assertEquals(1, this.store.memstore.kvset.size());
670 
671     // now increment again:
672     newValue += 1;
673     this.store.updateColumnValue(row, family, qf1, newValue);
674 
675     // at this point we have a TS=1 in snapshot, and a TS=2 in kvset, so increment again:
676     newValue += 1;
677     this.store.updateColumnValue(row, family, qf1, newValue);
678 
679     // the second TS should be TS=2 or higher., even though 'time=1' right now.
680 
681 
682     // how many key/values for this row are there?
683     Get get = new Get(row);
684     get.addColumn(family, qf1);
685     get.setMaxVersions(); // all versions.
686     List<Cell> results = new ArrayList<Cell>();
687 
688     results = HBaseTestingUtility.getFromStoreFile(store, get);
689     Assert.assertEquals(2, results.size());
690 
691     long ts1 = results.get(0).getTimestamp();
692     long ts2 = results.get(1).getTimestamp();
693 
694     Assert.assertTrue(ts1 > ts2);
695     Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
696     Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
697 
698     mee.setValue(2); // time goes up slightly
699     newValue += 1;
700     this.store.updateColumnValue(row, family, qf1, newValue);
701 
702     results = HBaseTestingUtility.getFromStoreFile(store, get);
703     Assert.assertEquals(2, results.size());
704 
705     ts1 = results.get(0).getTimestamp();
706     ts2 = results.get(1).getTimestamp();
707 
708     Assert.assertTrue(ts1 > ts2);
709     Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
710     Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
711   }
712 
713   @Test
714   public void testHandleErrorsInFlush() throws Exception {
715     LOG.info("Setting up a faulty file system that cannot write");
716 
717     final Configuration conf = HBaseConfiguration.create();
718     User user = User.createUserForTesting(conf,
719         "testhandleerrorsinflush", new String[]{"foo"});
720     // Inject our faulty LocalFileSystem
721     conf.setClass("fs.file.impl", FaultyFileSystem.class,
722         FileSystem.class);
723     user.runAs(new PrivilegedExceptionAction<Object>() {
724       public Object run() throws Exception {
725         // Make sure it worked (above is sensitive to caching details in hadoop core)
726         FileSystem fs = FileSystem.get(conf);
727         Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
728 
729         // Initialize region
730         init(name.getMethodName(), conf);
731 
732         LOG.info("Adding some data");
733         store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
734         store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
735         store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
736 
737         LOG.info("Before flush, we should have no files");
738 
739         Collection<StoreFileInfo> files =
740           store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
741         Assert.assertEquals(0, files != null ? files.size() : 0);
742 
743         //flush
744         try {
745           LOG.info("Flushing");
746           flush(1);
747           Assert.fail("Didn't bubble up IOE!");
748         } catch (IOException ioe) {
749           Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
750         }
751 
752         LOG.info("After failed flush, we should still have no files!");
753         files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
754         Assert.assertEquals(0, files != null ? files.size() : 0);
755         store.getHRegion().getLog().closeAndDelete();
756         return null;
757       }
758     });
759     FileSystem.closeAllForUGI(user.getUGI());
760   }
761 
762   /**
763    * Faulty file system that will fail if you write past its fault position the FIRST TIME
764    * only; thereafter it will succeed.  Used by {@link TestHRegion} too.
765    */
766   static class FaultyFileSystem extends FilterFileSystem {
767     List<SoftReference<FaultyOutputStream>> outStreams =
768       new ArrayList<SoftReference<FaultyOutputStream>>();
769     private long faultPos = 200;
770     AtomicBoolean fault = new AtomicBoolean(true);
771 
772     public FaultyFileSystem() {
773       super(new LocalFileSystem());
774       System.err.println("Creating faulty!");
775     }
776 
777     @Override
778     public FSDataOutputStream create(Path p) throws IOException {
779       return new FaultyOutputStream(super.create(p), faultPos, fault);
780     }
781 
782     @Override
783     public FSDataOutputStream create(Path f, FsPermission permission,
784         boolean overwrite, int bufferSize, short replication, long blockSize,
785         Progressable progress) throws IOException {
786       return new FaultyOutputStream(super.create(f, permission,
787           overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
788     }
789 
790     public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
791         int bufferSize, short replication, long blockSize, Progressable progress)
792     throws IOException {
793       // Fake it.  Call create instead.  The default implementation throws an IOE
794       // that this is not supported.
795       return create(f, overwrite, bufferSize, replication, blockSize, progress);
796     }
797   }
798 
799   static class FaultyOutputStream extends FSDataOutputStream {
800     volatile long faultPos = Long.MAX_VALUE;
801     private final AtomicBoolean fault;
802 
803     public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
804     throws IOException {
805       super(out, null);
806       this.faultPos = faultPos;
807       this.fault = fault;
808     }
809 
810     @Override
811     public void write(byte[] buf, int offset, int length) throws IOException {
812       System.err.println("faulty stream write at pos " + getPos());
813       injectFault();
814       super.write(buf, offset, length);
815     }
816 
817     private void injectFault() throws IOException {
818       if (this.fault.get() && getPos() >= faultPos) {
819         throw new IOException("Fault injected");
820       }
821     }
822   }
823 
824   private static void flushStore(HStore store, long id) throws IOException {
825     StoreFlushContext storeFlushCtx = store.createFlushContext(id);
826     storeFlushCtx.prepare();
827     storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
828     storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
829   }
830 
831   /**
832    * Generate a list of KeyValues for testing based on given parameters
833    * @param timestamps
834    * @param numRows
835    * @param qualifier
836    * @param family
837    * @return
838    */
839   List<Cell> getKeyValueSet(long[] timestamps, int numRows,
840       byte[] qualifier, byte[] family) {
841     List<Cell> kvList = new ArrayList<Cell>();
842     for (int i=1;i<=numRows;i++) {
843       byte[] b = Bytes.toBytes(i);
844       for (long timestamp: timestamps) {
845         kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
846       }
847     }
848     return kvList;
849   }
850 
851   /**
852    * Test to ensure correctness when using Stores with multiple timestamps
853    * @throws IOException
854    */
855   @Test
856   public void testMultipleTimestamps() throws IOException {
857     int numRows = 1;
858     long[] timestamps1 = new long[] {1,5,10,20};
859     long[] timestamps2 = new long[] {30,80};
860 
861     init(this.name.getMethodName());
862 
863     List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
864     for (Cell kv : kvList1) {
865       this.store.add(KeyValueUtil.ensureKeyValue(kv));
866     }
867 
868     this.store.snapshot();
869     flushStore(store, id++);
870 
871     List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
872     for(Cell kv : kvList2) {
873       this.store.add(KeyValueUtil.ensureKeyValue(kv));
874     }
875 
876     List<Cell> result;
877     Get get = new Get(Bytes.toBytes(1));
878     get.addColumn(family,qf1);
879 
880     get.setTimeRange(0,15);
881     result = HBaseTestingUtility.getFromStoreFile(store, get);
882     Assert.assertTrue(result.size()>0);
883 
884     get.setTimeRange(40,90);
885     result = HBaseTestingUtility.getFromStoreFile(store, get);
886     Assert.assertTrue(result.size()>0);
887 
888     get.setTimeRange(10,45);
889     result = HBaseTestingUtility.getFromStoreFile(store, get);
890     Assert.assertTrue(result.size()>0);
891 
892     get.setTimeRange(80,145);
893     result = HBaseTestingUtility.getFromStoreFile(store, get);
894     Assert.assertTrue(result.size()>0);
895 
896     get.setTimeRange(1,2);
897     result = HBaseTestingUtility.getFromStoreFile(store, get);
898     Assert.assertTrue(result.size()>0);
899 
900     get.setTimeRange(90,200);
901     result = HBaseTestingUtility.getFromStoreFile(store, get);
902     Assert.assertTrue(result.size()==0);
903   }
904 
905   /**
906    * Test for HBASE-3492 - Test split on empty colfam (no store files).
907    *
908    * @throws IOException When the IO operations fail.
909    */
910   @Test
911   public void testSplitWithEmptyColFam() throws IOException {
912     init(this.name.getMethodName());
913     Assert.assertNull(store.getSplitPoint());
914     store.getHRegion().forceSplit(null);
915     Assert.assertNull(store.getSplitPoint());
916     store.getHRegion().clearSplit_TESTS_ONLY();
917   }
918 
919   @Test
920   public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
921     final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
922     long anyValue = 10;
923 
924     // We'll check that it uses correct config and propagates it appropriately by going thru
925     // the simplest "real" path I can find - "throttleCompaction", which just checks whether
926     // a number we pass in is higher than some config value, inside compactionPolicy.
927     Configuration conf = HBaseConfiguration.create();
928     conf.setLong(CONFIG_KEY, anyValue);
929     init(name.getMethodName() + "-xml", conf);
930     Assert.assertTrue(store.throttleCompaction(anyValue + 1));
931     Assert.assertFalse(store.throttleCompaction(anyValue));
932 
933     // HTD overrides XML.
934     --anyValue;
935     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
936     HColumnDescriptor hcd = new HColumnDescriptor(family);
937     htd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
938     init(name.getMethodName() + "-htd", conf, htd, hcd);
939     Assert.assertTrue(store.throttleCompaction(anyValue + 1));
940     Assert.assertFalse(store.throttleCompaction(anyValue));
941 
942     // HCD overrides them both.
943     --anyValue;
944     hcd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
945     init(name.getMethodName() + "-hcd", conf, htd, hcd);
946     Assert.assertTrue(store.throttleCompaction(anyValue + 1));
947     Assert.assertFalse(store.throttleCompaction(anyValue));
948   }
949 
950   public static class DummyStoreEngine extends DefaultStoreEngine {
951     public static DefaultCompactor lastCreatedCompactor = null;
952     @Override
953     protected void createComponents(
954         Configuration conf, Store store, KVComparator comparator) throws IOException {
955       super.createComponents(conf, store, comparator);
956       lastCreatedCompactor = this.compactor;
957     }
958   }
959 
960   @Test
961   public void testStoreUsesSearchEngineOverride() throws Exception {
962     Configuration conf = HBaseConfiguration.create();
963     conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
964     init(this.name.getMethodName(), conf);
965     Assert.assertEquals(DummyStoreEngine.lastCreatedCompactor,
966       this.store.storeEngine.getCompactor());
967   }
968 }