View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collections;
26  import java.util.Comparator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.TreeSet;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.HBaseTestCase;
37  import org.apache.hadoop.hbase.HBaseTestingUtility;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HRegionInfo;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.SmallTests;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.client.Scan;
44  import org.apache.hadoop.hbase.io.HFileLink;
45  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
46  import org.apache.hadoop.hbase.io.hfile.BlockCache;
47  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
48  import org.apache.hadoop.hbase.io.hfile.CacheStats;
49  import org.apache.hadoop.hbase.io.hfile.HFileContext;
50  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
51  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
52  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
53  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
54  import org.apache.hadoop.hbase.util.BloomFilterFactory;
55  import org.apache.hadoop.hbase.util.Bytes;
56  import org.apache.hadoop.hbase.util.ChecksumType;
57  import org.apache.hadoop.hbase.util.FSUtils;
58  import org.junit.experimental.categories.Category;
59  import org.mockito.Mockito;
60  
61  import com.google.common.base.Joiner;
62  import com.google.common.collect.Iterables;
63  import com.google.common.collect.Lists;
64  
65  /**
66   * Test HStoreFile
67   */
68  @Category(SmallTests.class)
69  public class TestStoreFile extends HBaseTestCase {
70    static final Log LOG = LogFactory.getLog(TestStoreFile.class);
71    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
72    private CacheConfig cacheConf =  new CacheConfig(TEST_UTIL.getConfiguration());
73    private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestStoreFile").toString();
74    private static final ChecksumType CKTYPE = ChecksumType.CRC32;
75    private static final int CKBYTES = 512;
76    private static String TEST_FAMILY = "cf";
77  
78    @Override
79    public void setUp() throws Exception {
80      super.setUp();
81    }
82  
83    @Override
84    public void tearDown() throws Exception {
85      super.tearDown();
86    }
87  
88    /**
89     * Write a file and then assert that we can read from top and bottom halves
90     * using two HalfMapFiles.
91     * @throws Exception
92     */
93    public void testBasicHalfMapFile() throws Exception {
94      final HRegionInfo hri =
95          new HRegionInfo(TableName.valueOf("testBasicHalfMapFileTb"));
96      HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
97        conf, fs, new Path(this.testDir, hri.getTable().getNameAsString()), hri);
98  
99      HFileContext meta = new HFileContextBuilder().withBlockSize(2*1024).build();
100     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
101             .withFilePath(regionFs.createTempName())
102             .withFileContext(meta)
103             .build();
104     writeStoreFile(writer);
105 
106     Path sfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
107     StoreFile sf = new StoreFile(this.fs, sfPath, conf, cacheConf,
108       BloomType.NONE);
109     checkHalfHFile(regionFs, sf);
110   }
111 
112   private void writeStoreFile(final StoreFile.Writer writer) throws IOException {
113     writeStoreFile(writer, Bytes.toBytes(getName()), Bytes.toBytes(getName()));
114   }
115 
116   // pick an split point (roughly halfway)
117   byte[] SPLITKEY = new byte[] { (LAST_CHAR + FIRST_CHAR)/2, FIRST_CHAR};
118 
119   /*
120    * Writes HStoreKey and ImmutableBytes data to passed writer and
121    * then closes it.
122    * @param writer
123    * @throws IOException
124    */
125   public static void writeStoreFile(final StoreFile.Writer writer, byte[] fam, byte[] qualifier)
126   throws IOException {
127     long now = System.currentTimeMillis();
128     try {
129       for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
130         for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
131           byte[] b = new byte[] { (byte) d, (byte) e };
132           writer.append(new KeyValue(b, fam, qualifier, now, b));
133         }
134       }
135     } finally {
136       writer.close();
137     }
138   }
139 
140   /**
141    * Test that our mechanism of writing store files in one region to reference
142    * store files in other regions works.
143    * @throws IOException
144    */
145   public void testReference() throws IOException {
146     final HRegionInfo hri = new HRegionInfo(TableName.valueOf("testReferenceTb"));
147     HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
148       conf, fs, new Path(this.testDir, hri.getTable().getNameAsString()), hri);
149 
150     HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
151     // Make a store file and write data to it.
152     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
153             .withFilePath(regionFs.createTempName())
154             .withFileContext(meta)
155             .build();
156     writeStoreFile(writer);
157 
158     Path hsfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
159     StoreFile hsf = new StoreFile(this.fs, hsfPath, conf, cacheConf,
160       BloomType.NONE);
161     StoreFile.Reader reader = hsf.createReader();
162     // Split on a row, not in middle of row.  Midkey returned by reader
163     // may be in middle of row.  Create new one with empty column and
164     // timestamp.
165     KeyValue kv = KeyValue.createKeyValueFromKey(reader.midkey());
166     byte [] midRow = kv.getRow();
167     kv = KeyValue.createKeyValueFromKey(reader.getLastKey());
168     byte [] finalRow = kv.getRow();
169     // Make a reference
170     HRegionInfo splitHri = new HRegionInfo(hri.getTable(), null, midRow);
171     Path refPath = splitStoreFile(regionFs, splitHri, TEST_FAMILY, hsf, midRow, true);
172     StoreFile refHsf = new StoreFile(this.fs, refPath, conf, cacheConf,
173       BloomType.NONE);
174     // Now confirm that I can read from the reference and that it only gets
175     // keys from top half of the file.
176     HFileScanner s = refHsf.createReader().getScanner(false, false);
177     for(boolean first = true; (!s.isSeeked() && s.seekTo()) || s.next();) {
178       ByteBuffer bb = s.getKey();
179       kv = KeyValue.createKeyValueFromKey(bb);
180       if (first) {
181         assertTrue(Bytes.equals(kv.getRow(), midRow));
182         first = false;
183       }
184     }
185     assertTrue(Bytes.equals(kv.getRow(), finalRow));
186   }
187 
188   public void testHFileLink() throws IOException {
189     final HRegionInfo hri = new HRegionInfo(TableName.valueOf("testHFileLinkTb"));
190     // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
191     Configuration testConf = new Configuration(this.conf);
192     FSUtils.setRootDir(testConf, this.testDir);
193     HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
194       testConf, fs, FSUtils.getTableDir(this.testDir, hri.getTable()), hri);
195     HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
196 
197     // Make a store file and write data to it.
198     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
199             .withFilePath(regionFs.createTempName())
200             .withFileContext(meta)
201             .build();
202     writeStoreFile(writer);
203 
204     Path storeFilePath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
205     Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", TEST_FAMILY));
206     HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
207     Path linkFilePath = new Path(dstPath,
208                   HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
209 
210     // Try to open store file from link
211     StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath);
212     StoreFile hsf = new StoreFile(this.fs, storeFileInfo, testConf, cacheConf,
213       BloomType.NONE);
214     assertTrue(storeFileInfo.isLink());
215 
216     // Now confirm that I can read from the link
217     int count = 1;
218     HFileScanner s = hsf.createReader().getScanner(false, false);
219     s.seekTo();
220     while (s.next()) {
221       count++;
222     }
223     assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), count);
224   }
225 
226   /**
227    * This test creates an hfile and then the dir structures and files to verify that references
228    * to hfilelinks (created by snapshot clones) can be properly interpreted.
229    */
230   public void testReferenceToHFileLink() throws IOException {
231     // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
232     Configuration testConf = new Configuration(this.conf);
233     FSUtils.setRootDir(testConf, this.testDir);
234 
235     // adding legal table name chars to verify regex handles it.
236     HRegionInfo hri = new HRegionInfo(TableName.valueOf("_original-evil-name"));
237     HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
238       testConf, fs, FSUtils.getTableDir(this.testDir, hri.getTable()), hri);
239 
240     HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
241     // Make a store file and write data to it. <root>/<tablename>/<rgn>/<cf>/<file>
242     StoreFile.Writer writer = new StoreFile.WriterBuilder(testConf, cacheConf, this.fs)
243             .withFilePath(regionFs.createTempName())
244             .withFileContext(meta)
245             .build();
246     writeStoreFile(writer);
247     Path storeFilePath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
248 
249     // create link to store file. <root>/clone/region/<cf>/<hfile>-<region>-<table>
250     HRegionInfo hriClone = new HRegionInfo(TableName.valueOf("clone"));
251     HRegionFileSystem cloneRegionFs = HRegionFileSystem.createRegionOnFileSystem(
252       testConf, fs, FSUtils.getTableDir(this.testDir, hri.getTable()),
253         hriClone);
254     Path dstPath = cloneRegionFs.getStoreDir(TEST_FAMILY);
255     HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
256     Path linkFilePath = new Path(dstPath,
257                   HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
258 
259     // create splits of the link.
260     // <root>/clone/splitA/<cf>/<reftohfilelink>,
261     // <root>/clone/splitB/<cf>/<reftohfilelink>
262     HRegionInfo splitHriA = new HRegionInfo(hri.getTable(), null, SPLITKEY);
263     HRegionInfo splitHriB = new HRegionInfo(hri.getTable(), SPLITKEY, null);
264     StoreFile f = new StoreFile(fs, linkFilePath, testConf, cacheConf, BloomType.NONE);
265     Path pathA = splitStoreFile(cloneRegionFs, splitHriA, TEST_FAMILY, f, SPLITKEY, true); // top
266     Path pathB = splitStoreFile(cloneRegionFs, splitHriB, TEST_FAMILY, f, SPLITKEY, false);// bottom
267 
268     // OK test the thing
269     FSUtils.logFileSystemState(fs, this.testDir, LOG);
270 
271     // There is a case where a file with the hfilelink pattern is actually a daughter
272     // reference to a hfile link.  This code in StoreFile that handles this case.
273 
274     // Try to open store file from link
275     StoreFile hsfA = new StoreFile(this.fs, pathA, testConf, cacheConf,
276       BloomType.NONE);
277 
278     // Now confirm that I can read from the ref to link
279     int count = 1;
280     HFileScanner s = hsfA.createReader().getScanner(false, false);
281     s.seekTo();
282     while (s.next()) {
283       count++;
284     }
285     assertTrue(count > 0); // read some rows here
286 
287     // Try to open store file from link
288     StoreFile hsfB = new StoreFile(this.fs, pathB, testConf, cacheConf,
289       BloomType.NONE);
290 
291     // Now confirm that I can read from the ref to link
292     HFileScanner sB = hsfB.createReader().getScanner(false, false);
293     sB.seekTo();
294     
295     //count++ as seekTo() will advance the scanner
296     count++;
297     while (sB.next()) {
298       count++;
299     }
300 
301     // read the rest of the rows
302     assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), count);
303   }
304 
305   private void checkHalfHFile(final HRegionFileSystem regionFs, final StoreFile f)
306       throws IOException {
307     byte [] midkey = f.createReader().midkey();
308     KeyValue midKV = KeyValue.createKeyValueFromKey(midkey);
309     byte [] midRow = midKV.getRow();
310     // Create top split.
311     HRegionInfo topHri = new HRegionInfo(regionFs.getRegionInfo().getTable(),
312         null, midRow);
313     Path topPath = splitStoreFile(regionFs, topHri, TEST_FAMILY, f, midRow, true);
314     // Create bottom split.
315     HRegionInfo bottomHri = new HRegionInfo(regionFs.getRegionInfo().getTable(),
316         midRow, null);
317     Path bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, midRow, false);
318     // Make readers on top and bottom.
319     StoreFile.Reader top = new StoreFile(
320       this.fs, topPath, conf, cacheConf, BloomType.NONE).createReader();
321     StoreFile.Reader bottom = new StoreFile(
322       this.fs, bottomPath, conf, cacheConf, BloomType.NONE).createReader();
323     ByteBuffer previous = null;
324     LOG.info("Midkey: " + midKV.toString());
325     ByteBuffer bbMidkeyBytes = ByteBuffer.wrap(midkey);
326     try {
327       // Now make two HalfMapFiles and assert they can read the full backing
328       // file, one from the top and the other from the bottom.
329       // Test bottom half first.
330       // Now test reading from the top.
331       boolean first = true;
332       ByteBuffer key = null;
333       HFileScanner topScanner = top.getScanner(false, false);
334       while ((!topScanner.isSeeked() && topScanner.seekTo()) ||
335              (topScanner.isSeeked() && topScanner.next())) {
336         key = topScanner.getKey();
337 
338         if (topScanner.getReader().getComparator().compareFlatKey(key.array(),
339           key.arrayOffset(), key.limit(), midkey, 0, midkey.length) < 0) {
340           fail("key=" + Bytes.toStringBinary(key) + " < midkey=" +
341               Bytes.toStringBinary(midkey));
342         }
343         if (first) {
344           first = false;
345           LOG.info("First in top: " + Bytes.toString(Bytes.toBytes(key)));
346         }
347       }
348       LOG.info("Last in top: " + Bytes.toString(Bytes.toBytes(key)));
349 
350       first = true;
351       HFileScanner bottomScanner = bottom.getScanner(false, false);
352       while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) ||
353           bottomScanner.next()) {
354         previous = bottomScanner.getKey();
355         key = bottomScanner.getKey();
356         if (first) {
357           first = false;
358           LOG.info("First in bottom: " +
359             Bytes.toString(Bytes.toBytes(previous)));
360         }
361         assertTrue(key.compareTo(bbMidkeyBytes) < 0);
362       }
363       if (previous != null) {
364         LOG.info("Last in bottom: " + Bytes.toString(Bytes.toBytes(previous)));
365       }
366       // Remove references.
367       regionFs.cleanupDaughterRegion(topHri);
368       regionFs.cleanupDaughterRegion(bottomHri);
369 
370       // Next test using a midkey that does not exist in the file.
371       // First, do a key that is < than first key. Ensure splits behave
372       // properly.
373       byte [] badmidkey = Bytes.toBytes("  .");
374       assertTrue(fs.exists(f.getPath()));
375       topPath = splitStoreFile(regionFs, topHri, TEST_FAMILY, f, badmidkey, true);
376       bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, badmidkey, false);
377       
378       assertNull(bottomPath);
379       
380       top = new StoreFile(this.fs, topPath, conf, cacheConf, BloomType.NONE).createReader();
381       // Now read from the top.
382       first = true;
383       topScanner = top.getScanner(false, false);
384       while ((!topScanner.isSeeked() && topScanner.seekTo()) ||
385           topScanner.next()) {
386         key = topScanner.getKey();
387         assertTrue(topScanner.getReader().getComparator().compareFlatKey(key.array(),
388           key.arrayOffset(), key.limit(), badmidkey, 0, badmidkey.length) >= 0);
389         if (first) {
390           first = false;
391           KeyValue keyKV = KeyValue.createKeyValueFromKey(key);
392           LOG.info("First top when key < bottom: " + keyKV);
393           String tmp = Bytes.toString(keyKV.getRow());
394           for (int i = 0; i < tmp.length(); i++) {
395             assertTrue(tmp.charAt(i) == 'a');
396           }
397         }
398       }
399       KeyValue keyKV = KeyValue.createKeyValueFromKey(key);
400       LOG.info("Last top when key < bottom: " + keyKV);
401       String tmp = Bytes.toString(keyKV.getRow());
402       for (int i = 0; i < tmp.length(); i++) {
403         assertTrue(tmp.charAt(i) == 'z');
404       }
405       // Remove references.
406       regionFs.cleanupDaughterRegion(topHri);
407       regionFs.cleanupDaughterRegion(bottomHri);
408 
409       // Test when badkey is > than last key in file ('||' > 'zz').
410       badmidkey = Bytes.toBytes("|||");
411       topPath = splitStoreFile(regionFs,topHri, TEST_FAMILY, f, badmidkey, true);
412       bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, badmidkey, false);
413       assertNull(topPath);
414       bottom = new StoreFile(this.fs, bottomPath, conf, cacheConf,
415         BloomType.NONE).createReader();
416       first = true;
417       bottomScanner = bottom.getScanner(false, false);
418       while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) ||
419           bottomScanner.next()) {
420         key = bottomScanner.getKey();
421         if (first) {
422           first = false;
423           keyKV = KeyValue.createKeyValueFromKey(key);
424           LOG.info("First bottom when key > top: " + keyKV);
425           tmp = Bytes.toString(keyKV.getRow());
426           for (int i = 0; i < tmp.length(); i++) {
427             assertTrue(tmp.charAt(i) == 'a');
428           }
429         }
430       }
431       keyKV = KeyValue.createKeyValueFromKey(key);
432       LOG.info("Last bottom when key > top: " + keyKV);
433       for (int i = 0; i < tmp.length(); i++) {
434         assertTrue(Bytes.toString(keyKV.getRow()).charAt(i) == 'z');
435       }
436     } finally {
437       if (top != null) {
438         top.close(true); // evict since we are about to delete the file
439       }
440       if (bottom != null) {
441         bottom.close(true); // evict since we are about to delete the file
442       }
443       fs.delete(f.getPath(), true);
444     }
445   }
446 
447   private static final String localFormatter = "%010d";
448 
449   private void bloomWriteRead(StoreFile.Writer writer, FileSystem fs) throws Exception {
450     float err = conf.getFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, 0);
451     Path f = writer.getPath();
452     long now = System.currentTimeMillis();
453     for (int i = 0; i < 2000; i += 2) {
454       String row = String.format(localFormatter, i);
455       KeyValue kv = new KeyValue(row.getBytes(), "family".getBytes(),
456         "col".getBytes(), now, "value".getBytes());
457       writer.append(kv);
458     }
459     writer.close();
460 
461     StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf);
462     reader.loadFileInfo();
463     reader.loadBloomfilter();
464     StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
465 
466     // check false positives rate
467     int falsePos = 0;
468     int falseNeg = 0;
469     for (int i = 0; i < 2000; i++) {
470       String row = String.format(localFormatter, i);
471       TreeSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
472       columns.add("family:col".getBytes());
473 
474       Scan scan = new Scan(row.getBytes(),row.getBytes());
475       scan.addColumn("family".getBytes(), "family:col".getBytes());
476       boolean exists = scanner.shouldUseScanner(scan, columns, Long.MIN_VALUE);
477       if (i % 2 == 0) {
478         if (!exists) falseNeg++;
479       } else {
480         if (exists) falsePos++;
481       }
482     }
483     reader.close(true); // evict because we are about to delete the file
484     fs.delete(f, true);
485     assertEquals("False negatives: " + falseNeg, 0, falseNeg);
486     int maxFalsePos = (int) (2 * 2000 * err);
487     assertTrue("Too many false positives: " + falsePos + " (err=" + err
488         + ", expected no more than " + maxFalsePos + ")",
489         falsePos <= maxFalsePos);
490   }
491   
492   private static final int BLOCKSIZE_SMALL = 8192;
493 
494   public void testBloomFilter() throws Exception {
495     FileSystem fs = FileSystem.getLocal(conf);
496     conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, (float) 0.01);
497     conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
498 
499     // write the file
500     Path f = new Path(ROOT_DIR, getName());
501     HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
502                         .withChecksumType(CKTYPE)
503                         .withBytesPerCheckSum(CKBYTES).build();
504     // Make a store file and write data to it.
505     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
506             .withFilePath(f)
507             .withBloomType(BloomType.ROW)
508             .withMaxKeyCount(2000)
509             .withFileContext(meta)
510             .build();
511     bloomWriteRead(writer, fs);
512   }
513 
514   public void testDeleteFamilyBloomFilter() throws Exception {
515     FileSystem fs = FileSystem.getLocal(conf);
516     conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, (float) 0.01);
517     conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
518     float err = conf.getFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, 0);
519 
520     // write the file
521     Path f = new Path(ROOT_DIR, getName());
522 
523     HFileContext meta = new HFileContextBuilder()
524                         .withBlockSize(BLOCKSIZE_SMALL)
525                         .withChecksumType(CKTYPE)
526                         .withBytesPerCheckSum(CKBYTES).build();
527     // Make a store file and write data to it.
528     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
529             .withFilePath(f)
530             .withMaxKeyCount(2000)
531             .withFileContext(meta)
532             .build();
533 
534     // add delete family
535     long now = System.currentTimeMillis();
536     for (int i = 0; i < 2000; i += 2) {
537       String row = String.format(localFormatter, i);
538       KeyValue kv = new KeyValue(row.getBytes(), "family".getBytes(),
539           "col".getBytes(), now, KeyValue.Type.DeleteFamily, "value".getBytes());
540       writer.append(kv);
541     }
542     writer.close();
543 
544     StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf);
545     reader.loadFileInfo();
546     reader.loadBloomfilter();
547 
548     // check false positives rate
549     int falsePos = 0;
550     int falseNeg = 0;
551     for (int i = 0; i < 2000; i++) {
552       String row = String.format(localFormatter, i);
553       byte[] rowKey = Bytes.toBytes(row);
554       boolean exists = reader.passesDeleteFamilyBloomFilter(rowKey, 0,
555           rowKey.length);
556       if (i % 2 == 0) {
557         if (!exists)
558           falseNeg++;
559       } else {
560         if (exists)
561           falsePos++;
562       }
563     }
564     assertEquals(1000, reader.getDeleteFamilyCnt());
565     reader.close(true); // evict because we are about to delete the file
566     fs.delete(f, true);
567     assertEquals("False negatives: " + falseNeg, 0, falseNeg);
568     int maxFalsePos = (int) (2 * 2000 * err);
569     assertTrue("Too many false positives: " + falsePos + " (err=" + err
570         + ", expected no more than " + maxFalsePos, falsePos <= maxFalsePos);
571   }
572 
573   /**
574    * Test for HBASE-8012
575    */
576   public void testReseek() throws Exception {
577     // write the file
578     Path f = new Path(ROOT_DIR, getName());
579     HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
580     // Make a store file and write data to it.
581     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
582             .withFilePath(f)
583             .withFileContext(meta)
584             .build();
585 
586     writeStoreFile(writer);
587     writer.close();
588 
589     StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf);
590 
591     // Now do reseek with empty KV to position to the beginning of the file
592 
593     KeyValue k = KeyValue.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY);
594     StoreFileScanner s = reader.getStoreFileScanner(false, false);
595     s.reseek(k);
596 
597     assertNotNull("Intial reseek should position at the beginning of the file", s.peek());
598   }
599 
600   public void testBloomTypes() throws Exception {
601     float err = (float) 0.01;
602     FileSystem fs = FileSystem.getLocal(conf);
603     conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, err);
604     conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
605 
606     int rowCount = 50;
607     int colCount = 10;
608     int versions = 2;
609 
610     // run once using columns and once using rows
611     BloomType[] bt = {BloomType.ROWCOL, BloomType.ROW};
612     int[] expKeys  = {rowCount*colCount, rowCount};
613     // below line deserves commentary.  it is expected bloom false positives
614     //  column = rowCount*2*colCount inserts
615     //  row-level = only rowCount*2 inserts, but failures will be magnified by
616     //              2nd for loop for every column (2*colCount)
617     float[] expErr   = {2*rowCount*colCount*err, 2*rowCount*2*colCount*err};
618 
619     for (int x : new int[]{0,1}) {
620       // write the file
621       Path f = new Path(ROOT_DIR, getName() + x);
622       HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
623           .withChecksumType(CKTYPE)
624           .withBytesPerCheckSum(CKBYTES).build();
625       // Make a store file and write data to it.
626       StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
627               .withFilePath(f)
628               .withBloomType(bt[x])
629               .withMaxKeyCount(expKeys[x])
630               .withFileContext(meta)
631               .build();
632 
633       long now = System.currentTimeMillis();
634       for (int i = 0; i < rowCount*2; i += 2) { // rows
635         for (int j = 0; j < colCount*2; j += 2) {   // column qualifiers
636           String row = String.format(localFormatter, i);
637           String col = String.format(localFormatter, j);
638           for (int k= 0; k < versions; ++k) { // versions
639             KeyValue kv = new KeyValue(row.getBytes(),
640               "family".getBytes(), ("col" + col).getBytes(),
641                 now-k, Bytes.toBytes((long)-1));
642             writer.append(kv);
643           }
644         }
645       }
646       writer.close();
647 
648       StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf);
649       reader.loadFileInfo();
650       reader.loadBloomfilter();
651       StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
652       assertEquals(expKeys[x], reader.generalBloomFilter.getKeyCount());
653 
654       // check false positives rate
655       int falsePos = 0;
656       int falseNeg = 0;
657       for (int i = 0; i < rowCount*2; ++i) { // rows
658         for (int j = 0; j < colCount*2; ++j) {   // column qualifiers
659           String row = String.format(localFormatter, i);
660           String col = String.format(localFormatter, j);
661           TreeSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
662           columns.add(("col" + col).getBytes());
663 
664           Scan scan = new Scan(row.getBytes(),row.getBytes());
665           scan.addColumn("family".getBytes(), ("col"+col).getBytes());
666           boolean exists =
667               scanner.shouldUseScanner(scan, columns, Long.MIN_VALUE);
668           boolean shouldRowExist = i % 2 == 0;
669           boolean shouldColExist = j % 2 == 0;
670           shouldColExist = shouldColExist || bt[x] == BloomType.ROW;
671           if (shouldRowExist && shouldColExist) {
672             if (!exists) falseNeg++;
673           } else {
674             if (exists) falsePos++;
675           }
676         }
677       }
678       reader.close(true); // evict because we are about to delete the file
679       fs.delete(f, true);
680       System.out.println(bt[x].toString());
681       System.out.println("  False negatives: " + falseNeg);
682       System.out.println("  False positives: " + falsePos);
683       assertEquals(0, falseNeg);
684       assertTrue(falsePos < 2*expErr[x]);
685     }
686   }
687 
688   public void testSeqIdComparator() {
689     assertOrdering(StoreFile.Comparators.SEQ_ID,
690         mockStoreFile(true,  100,   1000, -1, "/foo/123"),
691         mockStoreFile(true,  100,   1000, -1, "/foo/124"),
692         mockStoreFile(true,  99,    1000, -1, "/foo/126"),
693         mockStoreFile(true,  98,    2000, -1, "/foo/126"),
694         mockStoreFile(false, 3453, -1,     1, "/foo/1"),
695         mockStoreFile(false, 2,    -1,     3, "/foo/2"),
696         mockStoreFile(false, 1000, -1,     5, "/foo/2"),
697         mockStoreFile(false, 76,   -1,     5, "/foo/3"));
698   }
699 
700   /**
701    * Assert that the given comparator orders the given storefiles in the
702    * same way that they're passed.
703    */
704   private void assertOrdering(Comparator<StoreFile> comparator, StoreFile ... sfs) {
705     ArrayList<StoreFile> sorted = Lists.newArrayList(sfs);
706     Collections.shuffle(sorted);
707     Collections.sort(sorted, comparator);
708     LOG.debug("sfs: " + Joiner.on(",").join(sfs));
709     LOG.debug("sorted: " + Joiner.on(",").join(sorted));
710     assertTrue(Iterables.elementsEqual(Arrays.asList(sfs), sorted));
711   }
712 
713   /**
714    * Create a mock StoreFile with the given attributes.
715    */
716   private StoreFile mockStoreFile(boolean bulkLoad,
717                                   long size,
718                                   long bulkTimestamp,
719                                   long seqId,
720                                   String path) {
721     StoreFile mock = Mockito.mock(StoreFile.class);
722     StoreFile.Reader reader = Mockito.mock(StoreFile.Reader.class);
723 
724     Mockito.doReturn(size).when(reader).length();
725 
726     Mockito.doReturn(reader).when(mock).getReader();
727     Mockito.doReturn(bulkLoad).when(mock).isBulkLoadResult();
728     Mockito.doReturn(bulkTimestamp).when(mock).getBulkLoadTimestamp();
729     Mockito.doReturn(seqId).when(mock).getMaxSequenceId();
730     Mockito.doReturn(new Path(path)).when(mock).getPath();
731     String name = "mock storefile, bulkLoad=" + bulkLoad +
732       " bulkTimestamp=" + bulkTimestamp +
733       " seqId=" + seqId +
734       " path=" + path;
735     Mockito.doReturn(name).when(mock).toString();
736     return mock;
737   }
738 
739   /**
740    * Generate a list of KeyValues for testing based on given parameters
741    * @param timestamps
742    * @param numRows
743    * @param qualifier
744    * @param family
745    * @return
746    */
747   List<KeyValue> getKeyValueSet(long[] timestamps, int numRows,
748       byte[] qualifier, byte[] family) {
749     List<KeyValue> kvList = new ArrayList<KeyValue>();
750     for (int i=1;i<=numRows;i++) {
751       byte[] b = Bytes.toBytes(i) ;
752       LOG.info(Bytes.toString(b));
753       LOG.info(Bytes.toString(b));
754       for (long timestamp: timestamps)
755       {
756         kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
757       }
758     }
759     return kvList;
760   }
761 
762   /**
763    * Test to ensure correctness when using StoreFile with multiple timestamps
764    * @throws IOException
765    */
766   public void testMultipleTimestamps() throws IOException {
767     byte[] family = Bytes.toBytes("familyname");
768     byte[] qualifier = Bytes.toBytes("qualifier");
769     int numRows = 10;
770     long[] timestamps = new long[] {20,10,5,1};
771     Scan scan = new Scan();
772 
773     // Make up a directory hierarchy that has a regiondir ("7e0102") and familyname.
774     Path storedir = new Path(new Path(this.testDir, "7e0102"), "familyname");
775     Path dir = new Path(storedir, "1234567890");
776     HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
777     // Make a store file and write data to it.
778     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
779             .withOutputDir(dir)
780             .withFileContext(meta)
781             .build();
782 
783     List<KeyValue> kvList = getKeyValueSet(timestamps,numRows,
784         family, qualifier);
785 
786     for (KeyValue kv : kvList) {
787       writer.append(kv);
788     }
789     writer.appendMetadata(0, false);
790     writer.close();
791 
792     StoreFile hsf = new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
793       BloomType.NONE);
794     StoreFile.Reader reader = hsf.createReader();
795     StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
796     TreeSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
797     columns.add(qualifier);
798 
799     scan.setTimeRange(20, 100);
800     assertTrue(scanner.shouldUseScanner(scan, columns, Long.MIN_VALUE));
801 
802     scan.setTimeRange(1, 2);
803     assertTrue(scanner.shouldUseScanner(scan, columns, Long.MIN_VALUE));
804 
805     scan.setTimeRange(8, 10);
806     assertTrue(scanner.shouldUseScanner(scan, columns, Long.MIN_VALUE));
807 
808     scan.setTimeRange(7, 50);
809     assertTrue(scanner.shouldUseScanner(scan, columns, Long.MIN_VALUE));
810 
811     // This test relies on the timestamp range optimization
812     scan.setTimeRange(27, 50);
813     assertTrue(!scanner.shouldUseScanner(scan, columns, Long.MIN_VALUE));
814   }
815 
816   public void testCacheOnWriteEvictOnClose() throws Exception {
817     Configuration conf = this.conf;
818 
819     // Find a home for our files (regiondir ("7e0102") and familyname).
820     Path baseDir = new Path(new Path(this.testDir, "7e0102"),"twoCOWEOC");
821 
822     // Grab the block cache and get the initial hit/miss counts
823     BlockCache bc = new CacheConfig(conf).getBlockCache();
824     assertNotNull(bc);
825     CacheStats cs = bc.getStats();
826     long startHit = cs.getHitCount();
827     long startMiss = cs.getMissCount();
828     long startEvicted = cs.getEvictedCount();
829 
830     // Let's write a StoreFile with three blocks, with cache on write off
831     conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, false);
832     CacheConfig cacheConf = new CacheConfig(conf);
833     Path pathCowOff = new Path(baseDir, "123456789");
834     StoreFile.Writer writer = writeStoreFile(conf, cacheConf, pathCowOff, 3);
835     StoreFile hsf = new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
836       BloomType.NONE);
837     LOG.debug(hsf.getPath().toString());
838 
839     // Read this file, we should see 3 misses
840     StoreFile.Reader reader = hsf.createReader();
841     reader.loadFileInfo();
842     StoreFileScanner scanner = reader.getStoreFileScanner(true, true);
843     scanner.seek(KeyValue.LOWESTKEY);
844     while (scanner.next() != null);
845     assertEquals(startHit, cs.getHitCount());
846     assertEquals(startMiss + 3, cs.getMissCount());
847     assertEquals(startEvicted, cs.getEvictedCount());
848     startMiss += 3;
849     scanner.close();
850     reader.close(cacheConf.shouldEvictOnClose());
851 
852     // Now write a StoreFile with three blocks, with cache on write on
853     conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true);
854     cacheConf = new CacheConfig(conf);
855     Path pathCowOn = new Path(baseDir, "123456788");
856     writer = writeStoreFile(conf, cacheConf, pathCowOn, 3);
857     hsf = new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
858       BloomType.NONE);
859 
860     // Read this file, we should see 3 hits
861     reader = hsf.createReader();
862     scanner = reader.getStoreFileScanner(true, true);
863     scanner.seek(KeyValue.LOWESTKEY);
864     while (scanner.next() != null);
865     assertEquals(startHit + 3, cs.getHitCount());
866     assertEquals(startMiss, cs.getMissCount());
867     assertEquals(startEvicted, cs.getEvictedCount());
868     startHit += 3;
869     scanner.close();
870     reader.close(cacheConf.shouldEvictOnClose());
871 
872     // Let's read back the two files to ensure the blocks exactly match
873     hsf = new StoreFile(this.fs, pathCowOff, conf, cacheConf,
874       BloomType.NONE);
875     StoreFile.Reader readerOne = hsf.createReader();
876     readerOne.loadFileInfo();
877     StoreFileScanner scannerOne = readerOne.getStoreFileScanner(true, true);
878     scannerOne.seek(KeyValue.LOWESTKEY);
879     hsf = new StoreFile(this.fs, pathCowOn, conf, cacheConf,
880       BloomType.NONE);
881     StoreFile.Reader readerTwo = hsf.createReader();
882     readerTwo.loadFileInfo();
883     StoreFileScanner scannerTwo = readerTwo.getStoreFileScanner(true, true);
884     scannerTwo.seek(KeyValue.LOWESTKEY);
885     KeyValue kv1 = null;
886     KeyValue kv2 = null;
887     while ((kv1 = scannerOne.next()) != null) {
888       kv2 = scannerTwo.next();
889       assertTrue(kv1.equals(kv2));
890       assertTrue(Bytes.compareTo(
891           kv1.getBuffer(), kv1.getKeyOffset(), kv1.getKeyLength(), 
892           kv2.getBuffer(), kv2.getKeyOffset(), kv2.getKeyLength()) == 0);
893       assertTrue(Bytes.compareTo(
894           kv1.getBuffer(), kv1.getValueOffset(), kv1.getValueLength(),
895           kv2.getBuffer(), kv2.getValueOffset(), kv2.getValueLength()) == 0);
896     }
897     assertNull(scannerTwo.next());
898     assertEquals(startHit + 6, cs.getHitCount());
899     assertEquals(startMiss, cs.getMissCount());
900     assertEquals(startEvicted, cs.getEvictedCount());
901     startHit += 6;
902     scannerOne.close();
903     readerOne.close(cacheConf.shouldEvictOnClose());
904     scannerTwo.close();
905     readerTwo.close(cacheConf.shouldEvictOnClose());
906 
907     // Let's close the first file with evict on close turned on
908     conf.setBoolean("hbase.rs.evictblocksonclose", true);
909     cacheConf = new CacheConfig(conf);
910     hsf = new StoreFile(this.fs, pathCowOff, conf, cacheConf,
911       BloomType.NONE);
912     reader = hsf.createReader();
913     reader.close(cacheConf.shouldEvictOnClose());
914 
915     // We should have 3 new evictions
916     assertEquals(startHit, cs.getHitCount());
917     assertEquals(startMiss, cs.getMissCount());
918     assertEquals(startEvicted + 3, cs.getEvictedCount());
919     startEvicted += 3;
920 
921     // Let's close the second file with evict on close turned off
922     conf.setBoolean("hbase.rs.evictblocksonclose", false);
923     cacheConf = new CacheConfig(conf);
924     hsf = new StoreFile(this.fs, pathCowOn, conf, cacheConf,
925       BloomType.NONE);
926     reader = hsf.createReader();
927     reader.close(cacheConf.shouldEvictOnClose());
928 
929     // We expect no changes
930     assertEquals(startHit, cs.getHitCount());
931     assertEquals(startMiss, cs.getMissCount());
932     assertEquals(startEvicted, cs.getEvictedCount());
933   }
934 
935   private Path splitStoreFile(final HRegionFileSystem regionFs, final HRegionInfo hri,
936       final String family, final StoreFile sf, final byte[] splitKey, boolean isTopRef)
937       throws IOException {
938     FileSystem fs = regionFs.getFileSystem();
939     Path path = regionFs.splitStoreFile(hri, family, sf, splitKey, isTopRef);
940     if (null == path) {
941       return null;
942     }
943     Path regionDir = regionFs.commitDaughterRegion(hri);
944     return new Path(new Path(regionDir, family), path.getName());
945   }
946 
947   private StoreFile.Writer writeStoreFile(Configuration conf,
948       CacheConfig cacheConf, Path path, int numBlocks)
949   throws IOException {
950     // Let's put ~5 small KVs in each block, so let's make 5*numBlocks KVs
951     int numKVs = 5 * numBlocks;
952     List<KeyValue> kvs = new ArrayList<KeyValue>(numKVs);
953     byte [] b = Bytes.toBytes("x");
954     int totalSize = 0;
955     for (int i=numKVs;i>0;i--) {
956       KeyValue kv = new KeyValue(b, b, b, i, b);
957       kvs.add(kv);
958       // kv has memstoreTS 0, which takes 1 byte to store.
959       totalSize += kv.getLength() + 1;
960     }
961     int blockSize = totalSize / numBlocks;
962     HFileContext meta = new HFileContextBuilder().withBlockSize(blockSize)
963                         .withChecksumType(CKTYPE)
964                         .withBytesPerCheckSum(CKBYTES)
965                         .build();
966     // Make a store file and write data to it.
967     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
968             .withFilePath(path)
969             .withMaxKeyCount(2000)
970             .withFileContext(meta)
971             .build();
972     // We'll write N-1 KVs to ensure we don't write an extra block
973     kvs.remove(kvs.size()-1);
974     for (KeyValue kv : kvs) {
975       writer.append(kv);
976     }
977     writer.appendMetadata(0, false);
978     writer.close();
979     return writer;
980   }
981 
982   /**
983    * Check if data block encoding information is saved correctly in HFile's
984    * file info.
985    */
986   public void testDataBlockEncodingMetaData() throws IOException {
987     // Make up a directory hierarchy that has a regiondir ("7e0102") and familyname.
988     Path dir = new Path(new Path(this.testDir, "7e0102"), "familyname");
989     Path path = new Path(dir, "1234567890");
990 
991     DataBlockEncoding dataBlockEncoderAlgo =
992         DataBlockEncoding.FAST_DIFF;
993     HFileDataBlockEncoder dataBlockEncoder =
994         new HFileDataBlockEncoderImpl(
995             dataBlockEncoderAlgo);
996     cacheConf = new CacheConfig(conf);
997     HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
998         .withChecksumType(CKTYPE)
999         .withBytesPerCheckSum(CKBYTES)
1000         .withDataBlockEncoding(dataBlockEncoderAlgo)
1001         .build();
1002     // Make a store file and write data to it.
1003     StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
1004             .withFilePath(path)
1005             .withMaxKeyCount(2000)
1006             .withFileContext(meta)
1007             .build();
1008     writer.close();
1009 
1010     StoreFile storeFile = new StoreFile(fs, writer.getPath(), conf,
1011       cacheConf, BloomType.NONE);
1012     StoreFile.Reader reader = storeFile.createReader();
1013 
1014     Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
1015     byte[] value = fileInfo.get(HFileDataBlockEncoder.DATA_BLOCK_ENCODING);
1016     assertEquals(dataBlockEncoderAlgo.getNameInBytes(), value);
1017   }
1018 }
1019