View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.NavigableSet;
31  import java.util.Random;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.HBaseConfiguration;
40  import org.apache.hadoop.hbase.HBaseTestingUtility;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.KeyValue;
43  import org.apache.hadoop.hbase.MediumTests;
44  import org.apache.hadoop.hbase.client.Durability;
45  import org.apache.hadoop.hbase.client.Put;
46  import org.apache.hadoop.hbase.client.Result;
47  import org.apache.hadoop.hbase.client.Scan;
48  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
49  import org.apache.hadoop.hbase.filter.Filter;
50  import org.apache.hadoop.hbase.filter.FilterList;
51  import org.apache.hadoop.hbase.filter.FilterList.Operator;
52  import org.apache.hadoop.hbase.filter.PageFilter;
53  import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
54  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
55  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
56  import org.apache.hadoop.hbase.io.hfile.HFileContext;
57  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.Pair;
60  import org.junit.Test;
61  import org.junit.experimental.categories.Category;
62  
63  import com.google.common.collect.Lists;
64  /**
65   * Test cases against ReversibleKeyValueScanner
66   */
67  @Category(MediumTests.class)
68  public class TestReversibleScanners {
69    private static final Log LOG = LogFactory.getLog(TestReversibleScanners.class);
70    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
71  
72    private static byte[] FAMILYNAME = Bytes.toBytes("testCf");
73    private static long TS = System.currentTimeMillis();
74    private static int MAXMVCC = 7;
75    private static byte[] ROW = Bytes.toBytes("testRow");
76    private static final int ROWSIZE = 200;
77    private static byte[][] ROWS = makeN(ROW, ROWSIZE);
78    private static byte[] QUAL = Bytes.toBytes("testQual");
79    private static final int QUALSIZE = 5;
80    private static byte[][] QUALS = makeN(QUAL, QUALSIZE);
81    private static byte[] VALUE = Bytes.toBytes("testValue");
82    private static final int VALUESIZE = 3;
83    private static byte[][] VALUES = makeN(VALUE, VALUESIZE);
84  
85    @Test
86    public void testReversibleStoreFileScanner() throws IOException {
87      FileSystem fs = TEST_UTIL.getTestFileSystem();
88      Path hfilePath = new Path(new Path(
89          TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"),
90          "regionname"), "familyname");
91      CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
92      for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
93        HFileContextBuilder hcBuilder = new HFileContextBuilder();
94        hcBuilder.withBlockSize(2 * 1024);
95        hcBuilder.withDataBlockEncoding(encoding);
96        HFileContext hFileContext = hcBuilder.build();
97        StoreFile.Writer writer = new StoreFile.WriterBuilder(
98            TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(hfilePath)
99            .withFileContext(hFileContext).build();
100       writeStoreFile(writer);
101 
102       StoreFile sf = new StoreFile(fs, writer.getPath(),
103           TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
104 
105       List<StoreFileScanner> scanners = StoreFileScanner
106           .getScannersForStoreFiles(Collections.singletonList(sf), false, true,
107               false, Long.MAX_VALUE);
108       StoreFileScanner scanner = scanners.get(0);
109       seekTestOfReversibleKeyValueScanner(scanner);
110       for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
111         LOG.info("Setting read point to " + readPoint);
112         scanners = StoreFileScanner.getScannersForStoreFiles(
113             Collections.singletonList(sf), false, true, false, readPoint);
114         seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
115       }
116     }
117 
118   }
119 
120   @Test
121   public void testReversibleMemstoreScanner() throws IOException {
122     MemStore memstore = new MemStore();
123     writeMemstore(memstore);
124     List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
125     seekTestOfReversibleKeyValueScanner(scanners.get(0));
126     for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
127       LOG.info("Setting read point to " + readPoint);
128       scanners = memstore.getScanners(readPoint);
129       seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
130     }
131 
132   }
133 
134   @Test
135   public void testReversibleKeyValueHeap() throws IOException {
136     // write data to one memstore and two store files
137     FileSystem fs = TEST_UTIL.getTestFileSystem();
138     Path hfilePath = new Path(new Path(
139         TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"),
140         "familyname");
141     CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
142     HFileContextBuilder hcBuilder = new HFileContextBuilder();
143     hcBuilder.withBlockSize(2 * 1024);
144     HFileContext hFileContext = hcBuilder.build();
145     StoreFile.Writer writer1 = new StoreFile.WriterBuilder(
146         TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
147         hfilePath).withFileContext(hFileContext).build();
148     StoreFile.Writer writer2 = new StoreFile.WriterBuilder(
149         TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
150         hfilePath).withFileContext(hFileContext).build();
151 
152     MemStore memstore = new MemStore();
153     writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1,
154         writer2 });
155 
156     StoreFile sf1 = new StoreFile(fs, writer1.getPath(),
157         TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
158 
159     StoreFile sf2 = new StoreFile(fs, writer2.getPath(),
160         TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
161     /**
162      * Test without MVCC
163      */
164     int startRowNum = ROWSIZE / 2;
165     ReversedKeyValueHeap kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
166         ROWS[startRowNum], MAXMVCC);
167     internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
168 
169     startRowNum = ROWSIZE - 1;
170     kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
171         HConstants.EMPTY_START_ROW, MAXMVCC);
172     internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
173 
174     /**
175      * Test with MVCC
176      */
177     for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
178       LOG.info("Setting read point to " + readPoint);
179       startRowNum = ROWSIZE - 1;
180       kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
181           HConstants.EMPTY_START_ROW, readPoint);
182       for (int i = startRowNum; i >= 0; i--) {
183         if (i - 2 < 0) break;
184         i = i - 2;
185         kvHeap.seekToPreviousRow(KeyValue.createFirstOnRow(ROWS[i + 1]));
186         Pair<Integer, Integer> nextReadableNum = getNextReadableNumWithBackwardScan(
187             i, 0, readPoint);
188         if (nextReadableNum == null) break;
189         KeyValue expecedKey = makeKV(nextReadableNum.getFirst(),
190             nextReadableNum.getSecond());
191         assertEquals(expecedKey, kvHeap.peek());
192         i = nextReadableNum.getFirst();
193         int qualNum = nextReadableNum.getSecond();
194         if (qualNum + 1 < QUALSIZE) {
195           kvHeap.backwardSeek(makeKV(i, qualNum + 1));
196           nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1,
197               readPoint);
198           if (nextReadableNum == null) break;
199           expecedKey = makeKV(nextReadableNum.getFirst(),
200               nextReadableNum.getSecond());
201           assertEquals(expecedKey, kvHeap.peek());
202           i = nextReadableNum.getFirst();
203           qualNum = nextReadableNum.getSecond();
204         }
205 
206         kvHeap.next();
207 
208         if (qualNum + 1 >= QUALSIZE) {
209           nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0,
210               readPoint);
211         } else {
212           nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1,
213               readPoint);
214         }
215         if (nextReadableNum == null) break;
216         expecedKey = makeKV(nextReadableNum.getFirst(),
217             nextReadableNum.getSecond());
218         assertEquals(expecedKey, kvHeap.peek());
219         i = nextReadableNum.getFirst();
220       }
221     }
222   }
223 
224   @Test
225   public void testReversibleStoreScanner() throws IOException {
226     // write data to one memstore and two store files
227     FileSystem fs = TEST_UTIL.getTestFileSystem();
228     Path hfilePath = new Path(new Path(
229         TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"),
230         "familyname");
231     CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
232     HFileContextBuilder hcBuilder = new HFileContextBuilder();
233     hcBuilder.withBlockSize(2 * 1024);
234     HFileContext hFileContext = hcBuilder.build();
235     StoreFile.Writer writer1 = new StoreFile.WriterBuilder(
236         TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
237         hfilePath).withFileContext(hFileContext).build();
238     StoreFile.Writer writer2 = new StoreFile.WriterBuilder(
239         TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
240         hfilePath).withFileContext(hFileContext).build();
241 
242     MemStore memstore = new MemStore();
243     writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1,
244         writer2 });
245 
246     StoreFile sf1 = new StoreFile(fs, writer1.getPath(),
247         TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
248 
249     StoreFile sf2 = new StoreFile(fs, writer2.getPath(),
250         TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
251 
252     ScanType scanType = ScanType.USER_SCAN;
253     ScanInfo scanInfo = new ScanInfo(FAMILYNAME, 0, Integer.MAX_VALUE,
254         Long.MAX_VALUE, false, 0, KeyValue.COMPARATOR);
255 
256     // Case 1.Test a full reversed scan
257     Scan scan = new Scan();
258     scan.setReversed(true);
259     StoreScanner storeScanner = getReversibleStoreScanner(memstore, sf1, sf2,
260         scan, scanType, scanInfo, MAXMVCC);
261     verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false);
262 
263     // Case 2.Test reversed scan with a specified start row
264     int startRowNum = ROWSIZE / 2;
265     byte[] startRow = ROWS[startRowNum];
266     scan.setStartRow(startRow);
267     storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
268         scanType, scanInfo, MAXMVCC);
269     verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1),
270         startRowNum + 1, false);
271 
272     // Case 3.Test reversed scan with a specified start row and specified
273     // qualifiers
274     assertTrue(QUALSIZE > 2);
275     scan.addColumn(FAMILYNAME, QUALS[0]);
276     scan.addColumn(FAMILYNAME, QUALS[2]);
277     storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
278         scanType, scanInfo, MAXMVCC);
279     verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1,
280         false);
281 
282     // Case 4.Test reversed scan with mvcc based on case 3
283     for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
284       LOG.info("Setting read point to " + readPoint);
285       storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
286           scanType, scanInfo, readPoint);
287       int expectedRowCount = 0;
288       int expectedKVCount = 0;
289       for (int i = startRowNum; i >= 0; i--) {
290         int kvCount = 0;
291         if (makeMVCC(i, 0) <= readPoint) {
292           kvCount++;
293         }
294         if (makeMVCC(i, 2) <= readPoint) {
295           kvCount++;
296         }
297         if (kvCount > 0) {
298           expectedRowCount++;
299           expectedKVCount += kvCount;
300         }
301       }
302       verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount,
303           false);
304     }
305   }
306 
307   @Test
308   public void testReversibleRegionScanner() throws IOException {
309     byte[] tableName = Bytes.toBytes("testtable");
310     byte[] FAMILYNAME2 = Bytes.toBytes("testCf2");
311     Configuration conf = HBaseConfiguration.create();
312     HRegion region = TEST_UTIL.createLocalHRegion(tableName, null, null,
313         "testReversibleRegionScanner", conf, false, Durability.SYNC_WAL, null,
314         FAMILYNAME, FAMILYNAME2);
315     loadDataToRegion(region, FAMILYNAME2);
316 
317     // verify row count with forward scan
318     Scan scan = new Scan();
319     InternalScanner scanner = region.getScanner(scan);
320     verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true);
321 
322     // Case1:Full reversed scan
323     scan.setReversed(true);
324     scanner = region.getScanner(scan);
325     verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false);
326 
327     // Case2:Full reversed scan with one family
328     scan = new Scan();
329     scan.setReversed(true);
330     scan.addFamily(FAMILYNAME);
331     scanner = region.getScanner(scan);
332     verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false);
333 
334     // Case3:Specify qualifiers + One family
335     byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] };
336     for (byte[] specifiedQualifier : specifiedQualifiers)
337       scan.addColumn(FAMILYNAME, specifiedQualifier);
338     scanner = region.getScanner(scan);
339     verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false);
340 
341     // Case4:Specify qualifiers + Two families
342     for (byte[] specifiedQualifier : specifiedQualifiers)
343       scan.addColumn(FAMILYNAME2, specifiedQualifier);
344     scanner = region.getScanner(scan);
345     verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false);
346 
347     // Case5: Case4 + specify start row
348     int startRowNum = ROWSIZE * 3 / 4;
349     scan.setStartRow(ROWS[startRowNum]);
350     scanner = region.getScanner(scan);
351     verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1),
352         false);
353 
354     // Case6: Case4 + specify stop row
355     int stopRowNum = ROWSIZE / 4;
356     scan.setStartRow(HConstants.EMPTY_BYTE_ARRAY);
357     scan.setStopRow(ROWS[stopRowNum]);
358     scanner = region.getScanner(scan);
359     verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE
360         - stopRowNum - 1), false);
361 
362     // Case7: Case4 + specify start row + specify stop row
363     scan.setStartRow(ROWS[startRowNum]);
364     scanner = region.getScanner(scan);
365     verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2,
366         (startRowNum - stopRowNum), false);
367 
368     // Case8: Case7 + SingleColumnValueFilter
369     int valueNum = startRowNum % VALUESIZE;
370     Filter filter = new SingleColumnValueFilter(FAMILYNAME,
371         specifiedQualifiers[0], CompareOp.EQUAL, VALUES[valueNum]);
372     scan.setFilter(filter);
373     scanner = region.getScanner(scan);
374     int unfilteredRowNum = (startRowNum - stopRowNum) / VALUESIZE
375         + (stopRowNum / VALUESIZE == valueNum ? 0 : 1);
376     verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum,
377         false);
378 
379     // Case9: Case7 + PageFilter
380     int pageSize = 10;
381     filter = new PageFilter(pageSize);
382     scan.setFilter(filter);
383     scanner = region.getScanner(scan);
384     int expectedRowNum = pageSize;
385     verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
386 
387     // Case10: Case7 + FilterList+MUST_PASS_ONE
388     SingleColumnValueFilter scvFilter1 = new SingleColumnValueFilter(
389         FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[0]);
390     SingleColumnValueFilter scvFilter2 = new SingleColumnValueFilter(
391         FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[1]);
392     expectedRowNum = 0;
393     for (int i = startRowNum; i > stopRowNum; i--) {
394       if (i % VALUESIZE == 0 || i % VALUESIZE == 1) {
395         expectedRowNum++;
396       }
397     }
398     filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2);
399     scan.setFilter(filter);
400     scanner = region.getScanner(scan);
401     verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
402 
403     // Case10: Case7 + FilterList+MUST_PASS_ALL
404     filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2);
405     expectedRowNum = 0;
406     scan.setFilter(filter);
407     scanner = region.getScanner(scan);
408     verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
409   }
410 
411   private StoreScanner getReversibleStoreScanner(MemStore memstore,
412       StoreFile sf1, StoreFile sf2, Scan scan, ScanType scanType,
413       ScanInfo scanInfo, int readPoint) throws IOException {
414     List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null,
415         false, readPoint);
416     NavigableSet<byte[]> columns = null;
417     for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap()
418         .entrySet()) {
419       // Should only one family
420       columns = entry.getValue();
421     }
422     StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo,
423         scanType, columns, scanners);
424     return storeScanner;
425   }
426 
427   private void verifyCountAndOrder(InternalScanner scanner,
428       int expectedKVCount, int expectedRowCount, boolean forward)
429       throws IOException {
430     List<Cell> kvList = new ArrayList<Cell>();
431     Result lastResult = null;
432     int rowCount = 0;
433     int kvCount = 0;
434     try {
435       while (scanner.next(kvList)) {
436         if (kvList.isEmpty()) continue;
437         rowCount++;
438         kvCount += kvList.size();
439         if (lastResult != null) {
440           Result curResult = Result.create(kvList);
441           assertEquals("LastResult:" + lastResult + "CurResult:" + curResult,
442               forward,
443               Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0);
444         }
445         lastResult = Result.create(kvList);
446         kvList.clear();
447       }
448     } finally {
449       scanner.close();
450     }
451     if (!kvList.isEmpty()) {
452       rowCount++;
453       kvCount += kvList.size();
454       kvList.clear();
455     }
456     assertEquals(expectedKVCount, kvCount);
457     assertEquals(expectedRowCount, rowCount);
458   }
459 
460   private void internalTestSeekAndNextForReversibleKeyValueHeap(
461       ReversedKeyValueHeap kvHeap, int startRowNum) throws IOException {
462     // Test next and seek
463     for (int i = startRowNum; i >= 0; i--) {
464       if (i % 2 == 1 && i - 2 >= 0) {
465         i = i - 2;
466         kvHeap.seekToPreviousRow(KeyValue.createFirstOnRow(ROWS[i + 1]));
467       }
468       for (int j = 0; j < QUALSIZE; j++) {
469         if (j % 2 == 1 && (j + 1) < QUALSIZE) {
470           j = j + 1;
471           kvHeap.backwardSeek(makeKV(i, j));
472         }
473         assertEquals(makeKV(i, j), kvHeap.peek());
474         kvHeap.next();
475       }
476     }
477     assertEquals(null, kvHeap.peek());
478   }
479 
480   private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore,
481       StoreFile sf1, StoreFile sf2, byte[] startRow, int readPoint)
482       throws IOException {
483     List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow,
484         true, readPoint);
485     ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners,
486         KeyValue.COMPARATOR);
487     return kvHeap;
488   }
489 
490   private List<KeyValueScanner> getScanners(MemStore memstore, StoreFile sf1,
491       StoreFile sf2, byte[] startRow, boolean doSeek, int readPoint)
492       throws IOException {
493     List<StoreFileScanner> fileScanners = StoreFileScanner
494         .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true,
495             false, readPoint);
496     List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
497     List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(
498         fileScanners.size() + 1);
499     scanners.addAll(fileScanners);
500     scanners.addAll(memScanners);
501 
502     if (doSeek) {
503       if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) {
504         for (KeyValueScanner scanner : scanners) {
505           scanner.seekToLastRow();
506         }
507       } else {
508         KeyValue startKey = KeyValue.createFirstOnRow(startRow);
509         for (KeyValueScanner scanner : scanners) {
510           scanner.backwardSeek(startKey);
511         }
512       }
513     }
514     return scanners;
515   }
516 
517   private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner)
518       throws IOException {
519     /**
520      * Test without MVCC
521      */
522     // Test seek to last row
523     assertTrue(scanner.seekToLastRow());
524     assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek());
525 
526     // Test backward seek in three cases
527     // Case1: seek in the same row in backwardSeek
528     KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2);
529     assertTrue(scanner.backwardSeek(seekKey));
530     assertEquals(seekKey, scanner.peek());
531 
532     // Case2: seek to the previous row in backwardSeek
533     int seekRowNum = ROWSIZE - 2;
534     assertTrue(scanner.backwardSeek(KeyValue.createLastOnRow(ROWS[seekRowNum])));
535     KeyValue expectedKey = makeKV(seekRowNum - 1, 0);
536     assertEquals(expectedKey, scanner.peek());
537 
538     // Case3: unable to backward seek
539     assertFalse(scanner.backwardSeek(KeyValue.createLastOnRow(ROWS[0])));
540     assertEquals(null, scanner.peek());
541 
542     // Test seek to previous row
543     seekRowNum = ROWSIZE - 4;
544     assertTrue(scanner.seekToPreviousRow(KeyValue
545         .createFirstOnRow(ROWS[seekRowNum])));
546     expectedKey = makeKV(seekRowNum - 1, 0);
547     assertEquals(expectedKey, scanner.peek());
548 
549     // Test seek to previous row for the first row
550     assertFalse(scanner.seekToPreviousRow(makeKV(0, 0)));
551     assertEquals(null, scanner.peek());
552 
553   }
554 
555   private void seekTestOfReversibleKeyValueScannerWithMVCC(
556       KeyValueScanner scanner, int readPoint) throws IOException {
557     /**
558      * Test with MVCC
559      */
560       // Test seek to last row
561       KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(
562           ROWSIZE - 1, 0, readPoint);
563       assertEquals(expectedKey != null, scanner.seekToLastRow());
564       assertEquals(expectedKey, scanner.peek());
565 
566       // Test backward seek in two cases
567       // Case1: seek in the same row in backwardSeek
568       expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2,
569           QUALSIZE - 2, readPoint);
570       assertEquals(expectedKey != null, scanner.backwardSeek(expectedKey));
571       assertEquals(expectedKey, scanner.peek());
572 
573       // Case2: seek to the previous row in backwardSeek
574     int seekRowNum = ROWSIZE - 3;
575     KeyValue seekKey = KeyValue.createLastOnRow(ROWS[seekRowNum]);
576       expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
577           readPoint);
578       assertEquals(expectedKey != null, scanner.backwardSeek(seekKey));
579       assertEquals(expectedKey, scanner.peek());
580 
581       // Test seek to previous row
582       seekRowNum = ROWSIZE - 4;
583       expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
584           readPoint);
585       assertEquals(expectedKey != null, scanner.seekToPreviousRow(KeyValue
586           .createFirstOnRow(ROWS[seekRowNum])));
587       assertEquals(expectedKey, scanner.peek());
588   }
589 
590   private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum,
591       int startQualNum, int readPoint) {
592     Pair<Integer, Integer> nextReadableNum = getNextReadableNumWithBackwardScan(
593         startRowNum, startQualNum, readPoint);
594     if (nextReadableNum == null)
595       return null;
596     return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
597   }
598 
599   private Pair<Integer, Integer> getNextReadableNumWithBackwardScan(
600       int startRowNum, int startQualNum, int readPoint) {
601     Pair<Integer, Integer> nextReadableNum = null;
602     boolean findExpected = false;
603     for (int i = startRowNum; i >= 0; i--) {
604       for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) {
605         if (makeMVCC(i, j) <= readPoint) {
606           nextReadableNum = new Pair<Integer, Integer>(i, j);
607           findExpected = true;
608           break;
609         }
610       }
611       if (findExpected)
612         break;
613     }
614     return nextReadableNum;
615   }
616 
617   private static void loadDataToRegion(HRegion region, byte[] additionalFamily)
618       throws IOException {
619     for (int i = 0; i < ROWSIZE; i++) {
620       Put put = new Put(ROWS[i]);
621       for (int j = 0; j < QUALSIZE; j++) {
622         put.add(makeKV(i, j));
623         // put additional family
624         put.add(makeKV(i, j, additionalFamily));
625       }
626       region.put(put);
627       if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) {
628         region.flushcache();
629       }
630     }
631   }
632 
633   private static void writeMemstoreAndStoreFiles(MemStore memstore,
634       final StoreFile.Writer[] writers) throws IOException {
635     Random rand = new Random();
636     try {
637       for (int i = 0; i < ROWSIZE; i++) {
638         for (int j = 0; j < QUALSIZE; j++) {
639           if (i % 2 == 0) {
640             memstore.add(makeKV(i, j));
641           } else {
642             writers[(i + j) % writers.length].append(makeKV(i, j));
643           }
644         }
645       }
646     } finally {
647       for (int i = 0; i < writers.length; i++) {
648         writers[i].close();
649       }
650     }
651   }
652 
653   private static void writeStoreFile(final StoreFile.Writer writer)
654       throws IOException {
655     try {
656       for (int i = 0; i < ROWSIZE; i++) {
657         for (int j = 0; j < QUALSIZE; j++) {
658           writer.append(makeKV(i, j));
659         }
660       }
661     } finally {
662       writer.close();
663     }
664   }
665 
666   private static void writeMemstore(MemStore memstore) throws IOException {
667     // Add half of the keyvalues to memstore
668     for (int i = 0; i < ROWSIZE; i++) {
669       for (int j = 0; j < QUALSIZE; j++) {
670         if ((i + j) % 2 == 0) {
671           memstore.add(makeKV(i, j));
672         }
673       }
674     }
675     memstore.snapshot();
676     // Add another half of the keyvalues to snapshot
677     for (int i = 0; i < ROWSIZE; i++) {
678       for (int j = 0; j < QUALSIZE; j++) {
679         if ((i + j) % 2 == 1) {
680           memstore.add(makeKV(i, j));
681         }
682       }
683     }
684   }
685 
686   private static KeyValue makeKV(int rowNum, int cqNum) {
687     return makeKV(rowNum, cqNum, FAMILYNAME);
688   }
689 
690   private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) {
691     KeyValue kv = new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS,
692         VALUES[rowNum % VALUESIZE]);
693     kv.setMvccVersion(makeMVCC(rowNum, cqNum));
694     return kv;
695   }
696 
697   private static long makeMVCC(int rowNum, int cqNum) {
698     return (rowNum + cqNum) % (MAXMVCC + 1);
699   }
700 
701   private static byte[][] makeN(byte[] base, int n) {
702     byte[][] ret = new byte[n][];
703     for (int i = 0; i < n; i++) {
704       ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
705     }
706     return ret;
707   }
708 }