View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
22  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
23  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertFalse;
26  import static org.junit.Assert.assertNotNull;
27  import static org.junit.Assert.assertTrue;
28  import static org.junit.Assert.fail;
29  
30  import java.io.IOException;
31  import java.util.ArrayList;
32  import java.util.List;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.HBaseTestCase;
39  import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
40  import org.apache.hadoop.hbase.HBaseTestCase.ScannerIncommon;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.SmallTests;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.UnknownScannerException;
50  import org.apache.hadoop.hbase.client.Delete;
51  import org.apache.hadoop.hbase.client.Get;
52  import org.apache.hadoop.hbase.client.Put;
53  import org.apache.hadoop.hbase.client.Result;
54  import org.apache.hadoop.hbase.client.Scan;
55  import org.apache.hadoop.hbase.filter.Filter;
56  import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
57  import org.apache.hadoop.hbase.filter.PrefixFilter;
58  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.junit.Rule;
61  import org.junit.Test;
62  import org.junit.experimental.categories.Category;
63  import org.junit.rules.TestName;
64  
65  
66  /**
67   * Test of a long-lived scanner validating as we go.
68   */
69  @Category(SmallTests.class)
70  public class TestScanner {
71    @Rule public TestName name = new TestName();
72    private final Log LOG = LogFactory.getLog(this.getClass());
73    private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
74  
75    private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW;
76    private static final byte [][] COLS = { HConstants.CATALOG_FAMILY };
77    private static final byte [][] EXPLICIT_COLS = {
78      HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER,
79        // TODO ryan
80        //HConstants.STARTCODE_QUALIFIER
81    };
82  
83    static final HTableDescriptor TESTTABLEDESC =
84      new HTableDescriptor(TableName.valueOf("testscanner"));
85    static {
86      TESTTABLEDESC.addFamily(
87          new HColumnDescriptor(HConstants.CATALOG_FAMILY)
88              // Ten is an arbitrary number.  Keep versions to help debugging.
89              .setMaxVersions(10)
90              .setBlockCacheEnabled(false)
91              .setBlocksize(8 * 1024)
92      );
93    }
94    /** HRegionInfo for root region */
95    public static final HRegionInfo REGION_INFO =
96      new HRegionInfo(TESTTABLEDESC.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
97      HConstants.EMPTY_BYTE_ARRAY);
98  
99    private static final byte [] ROW_KEY = REGION_INFO.getRegionName();
100 
101   private static final long START_CODE = Long.MAX_VALUE;
102 
103   private HRegion r;
104   private HRegionIncommon region;
105 
106   private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
107   final private byte[] col1, col2;
108 
109   public TestScanner() {
110     super();
111 
112     firstRowBytes = START_KEY_BYTES;
113     secondRowBytes = START_KEY_BYTES.clone();
114     // Increment the least significant character so we get to next row.
115     secondRowBytes[START_KEY_BYTES.length - 1]++;
116     thirdRowBytes = START_KEY_BYTES.clone();
117     thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
118     col1 = Bytes.toBytes("column1");
119     col2 = Bytes.toBytes("column2");
120   }
121 
122   /**
123    * Test basic stop row filter works.
124    * @throws Exception
125    */
126   @Test
127   public void testStopRow() throws Exception {
128     byte [] startrow = Bytes.toBytes("bbb");
129     byte [] stoprow = Bytes.toBytes("ccc");
130     try {
131       this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
132       HBaseTestCase.addContent(this.r, HConstants.CATALOG_FAMILY);
133       List<Cell> results = new ArrayList<Cell>();
134       // Do simple test of getting one row only first.
135       Scan scan = new Scan(Bytes.toBytes("abc"), Bytes.toBytes("abd"));
136       scan.addFamily(HConstants.CATALOG_FAMILY);
137 
138       InternalScanner s = r.getScanner(scan);
139       int count = 0;
140       while (s.next(results)) {
141         count++;
142       }
143       s.close();
144       assertEquals(0, count);
145       // Now do something a bit more imvolved.
146       scan = new Scan(startrow, stoprow);
147       scan.addFamily(HConstants.CATALOG_FAMILY);
148 
149       s = r.getScanner(scan);
150       count = 0;
151       Cell kv = null;
152       results = new ArrayList<Cell>();
153       for (boolean first = true; s.next(results);) {
154         kv = results.get(0);
155         if (first) {
156           assertTrue(CellUtil.matchingRow(kv,  startrow));
157           first = false;
158         }
159         count++;
160       }
161       assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, CellUtil.cloneRow(kv)) > 0);
162       // We got something back.
163       assertTrue(count > 10);
164       s.close();
165     } finally {
166       HRegion.closeHRegion(this.r);
167     }
168   }
169 
170   void rowPrefixFilter(Scan scan) throws IOException {
171     List<Cell> results = new ArrayList<Cell>();
172     scan.addFamily(HConstants.CATALOG_FAMILY);
173     InternalScanner s = r.getScanner(scan);
174     boolean hasMore = true;
175     while (hasMore) {
176       hasMore = s.next(results);
177       for (Cell kv : results) {
178         assertEquals((byte)'a', CellUtil.cloneRow(kv)[0]);
179         assertEquals((byte)'b', CellUtil.cloneRow(kv)[1]);
180       }
181       results.clear();
182     }
183     s.close();
184   }
185 
186   void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException {
187     List<Cell> results = new ArrayList<Cell>();
188     scan.addFamily(HConstants.CATALOG_FAMILY);
189     InternalScanner s = r.getScanner(scan);
190     boolean hasMore = true;
191     while (hasMore) {
192       hasMore = s.next(results);
193       for (Cell kv : results) {
194         assertTrue(Bytes.compareTo(CellUtil.cloneRow(kv), stopRow) <= 0);
195       }
196       results.clear();
197     }
198     s.close();
199   }
200 
201   @Test
202   public void testFilters() throws IOException {
203     try {
204       this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
205       HBaseTestCase.addContent(this.r, HConstants.CATALOG_FAMILY);
206       byte [] prefix = Bytes.toBytes("ab");
207       Filter newFilter = new PrefixFilter(prefix);
208       Scan scan = new Scan();
209       scan.setFilter(newFilter);
210       rowPrefixFilter(scan);
211 
212       byte[] stopRow = Bytes.toBytes("bbc");
213       newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow));
214       scan = new Scan();
215       scan.setFilter(newFilter);
216       rowInclusiveStopFilter(scan, stopRow);
217 
218     } finally {
219       HRegion.closeHRegion(this.r);
220     }
221   }
222 
223   /**
224    * Test that closing a scanner while a client is using it doesn't throw
225    * NPEs but instead a UnknownScannerException. HBASE-2503
226    * @throws Exception
227    */
228   @Test
229   public void testRaceBetweenClientAndTimeout() throws Exception {
230     try {
231       this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
232       HBaseTestCase.addContent(this.r, HConstants.CATALOG_FAMILY);
233       Scan scan = new Scan();
234       InternalScanner s = r.getScanner(scan);
235       List<Cell> results = new ArrayList<Cell>();
236       try {
237         s.next(results);
238         s.close();
239         s.next(results);
240         fail("We don't want anything more, we should be failing");
241       } catch (UnknownScannerException ex) {
242         // ok!
243         return;
244       }
245     } finally {
246       HRegion.closeHRegion(this.r);
247     }
248   }
249 
250   /** The test!
251    * @throws IOException
252    */
253   @Test
254   public void testScanner() throws IOException {
255     try {
256       r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
257       region = new HRegionIncommon(r);
258 
259       // Write information to the meta table
260 
261       Put put = new Put(ROW_KEY, System.currentTimeMillis());
262 
263       put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
264           REGION_INFO.toByteArray());
265       region.put(put);
266 
267       // What we just committed is in the memstore. Verify that we can get
268       // it back both with scanning and get
269 
270       scan(false, null);
271       getRegionInfo();
272 
273       // Close and re-open
274 
275       r.close();
276       r = HRegion.openHRegion(r, null);
277       region = new HRegionIncommon(r);
278 
279       // Verify we can get the data back now that it is on disk.
280 
281       scan(false, null);
282       getRegionInfo();
283 
284       // Store some new information
285 
286       String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtility.randomFreePort();
287 
288       put = new Put(ROW_KEY, System.currentTimeMillis());
289       put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
290           Bytes.toBytes(address));
291 
292 //      put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE));
293 
294       region.put(put);
295 
296       // Validate that we can still get the HRegionInfo, even though it is in
297       // an older row on disk and there is a newer row in the memstore
298 
299       scan(true, address.toString());
300       getRegionInfo();
301 
302       // flush cache
303 
304       region.flushcache();
305 
306       // Validate again
307 
308       scan(true, address.toString());
309       getRegionInfo();
310 
311       // Close and reopen
312 
313       r.close();
314       r = HRegion.openHRegion(r,null);
315       region = new HRegionIncommon(r);
316 
317       // Validate again
318 
319       scan(true, address.toString());
320       getRegionInfo();
321 
322       // Now update the information again
323 
324       address = "bar.foo.com:4321";
325 
326       put = new Put(ROW_KEY, System.currentTimeMillis());
327 
328       put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
329           Bytes.toBytes(address));
330       region.put(put);
331 
332       // Validate again
333 
334       scan(true, address.toString());
335       getRegionInfo();
336 
337       // flush cache
338 
339       region.flushcache();
340 
341       // Validate again
342 
343       scan(true, address.toString());
344       getRegionInfo();
345 
346       // Close and reopen
347 
348       r.close();
349       r = HRegion.openHRegion(r,null);
350       region = new HRegionIncommon(r);
351 
352       // Validate again
353 
354       scan(true, address.toString());
355       getRegionInfo();
356 
357     } finally {
358       // clean up
359       HRegion.closeHRegion(r);
360     }
361   }
362 
363   /** Compare the HRegionInfo we read from HBase to what we stored */
364   private void validateRegionInfo(byte [] regionBytes) throws IOException {
365     HRegionInfo info = HRegionInfo.parseFromOrNull(regionBytes);
366 
367     assertEquals(REGION_INFO.getRegionId(), info.getRegionId());
368     assertEquals(0, info.getStartKey().length);
369     assertEquals(0, info.getEndKey().length);
370     assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName()));
371     //assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc()));
372   }
373 
374   /** Use a scanner to get the region info and then validate the results */
375   private void scan(boolean validateStartcode, String serverName)
376   throws IOException {
377     InternalScanner scanner = null;
378     Scan scan = null;
379     List<Cell> results = new ArrayList<Cell>();
380     byte [][][] scanColumns = {
381         COLS,
382         EXPLICIT_COLS
383     };
384 
385     for(int i = 0; i < scanColumns.length; i++) {
386       try {
387         scan = new Scan(FIRST_ROW);
388         for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) {
389           scan.addColumn(COLS[0],  EXPLICIT_COLS[ii]);
390         }
391         scanner = r.getScanner(scan);
392         while (scanner.next(results)) {
393           assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
394               HConstants.REGIONINFO_QUALIFIER));
395           byte [] val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY,
396               HConstants.REGIONINFO_QUALIFIER));
397           validateRegionInfo(val);
398           if(validateStartcode) {
399 //            assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
400 //                HConstants.STARTCODE_QUALIFIER));
401 //            val = getColumn(results, HConstants.CATALOG_FAMILY,
402 //                HConstants.STARTCODE_QUALIFIER).getValue();
403             assertNotNull(val);
404             assertFalse(val.length == 0);
405             long startCode = Bytes.toLong(val);
406             assertEquals(START_CODE, startCode);
407           }
408 
409           if(serverName != null) {
410             assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
411                 HConstants.SERVER_QUALIFIER));
412             val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY,
413                 HConstants.SERVER_QUALIFIER));
414             assertNotNull(val);
415             assertFalse(val.length == 0);
416             String server = Bytes.toString(val);
417             assertEquals(0, server.compareTo(serverName));
418           }
419         }
420       } finally {
421         InternalScanner s = scanner;
422         scanner = null;
423         if(s != null) {
424           s.close();
425         }
426       }
427     }
428   }
429 
430   private boolean hasColumn(final List<Cell> kvs, final byte [] family,
431       final byte [] qualifier) {
432     for (Cell kv: kvs) {
433       if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
434         return true;
435       }
436     }
437     return false;
438   }
439 
440   private Cell getColumn(final List<Cell> kvs, final byte [] family,
441       final byte [] qualifier) {
442     for (Cell kv: kvs) {
443       if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
444         return kv;
445       }
446     }
447     return null;
448   }
449 
450 
451   /** Use get to retrieve the HRegionInfo and validate it */
452   private void getRegionInfo() throws IOException {
453     Get get = new Get(ROW_KEY);
454     get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
455     Result result = region.get(get);
456     byte [] bytes = result.value();
457     validateRegionInfo(bytes);
458   }
459 
460   /**
461    * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner
462    * update readers code essentially.  This is not highly concurrent, since its all 1 thread.
463    * HBase-910.
464    * @throws Exception
465    */
466   @Test
467   public void testScanAndSyncFlush() throws Exception {
468     this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
469     HRegionIncommon hri = new HRegionIncommon(r);
470     try {
471         LOG.info("Added: " + HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
472             Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
473       int count = count(hri, -1, false);
474       assertEquals(count, count(hri, 100, false)); // do a sync flush.
475     } catch (Exception e) {
476       LOG.error("Failed", e);
477       throw e;
478     } finally {
479       HRegion.closeHRegion(this.r);
480     }
481   }
482 
483   /**
484    * Tests to do a concurrent flush (using a 2nd thread) while scanning.  This tests both
485    * the StoreScanner update readers and the transition from memstore -> snapshot -> store file.
486    *
487    * @throws Exception
488    */
489   @Test
490   public void testScanAndRealConcurrentFlush() throws Exception {
491     this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
492     HRegionIncommon hri = new HRegionIncommon(r);
493     try {
494         LOG.info("Added: " + HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
495             Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
496       int count = count(hri, -1, false);
497       assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush
498     } catch (Exception e) {
499       LOG.error("Failed", e);
500       throw e;
501     } finally {
502       HRegion.closeHRegion(this.r);
503     }
504   }
505 
506   /**
507    * Make sure scanner returns correct result when we run a major compaction
508    * with deletes.
509    *
510    * @throws Exception
511    */
512   @Test
513   @SuppressWarnings("deprecation")
514   public void testScanAndConcurrentMajorCompact() throws Exception {
515     HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name.getMethodName());
516     this.r = TEST_UTIL.createLocalHRegion(htd, null, null);
517     HRegionIncommon hri = new HRegionIncommon(r);
518 
519     try {
520       HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1),
521           firstRowBytes, secondRowBytes);
522       HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1),
523           firstRowBytes, secondRowBytes);
524 
525       Delete dc = new Delete(firstRowBytes);
526       /* delete column1 of firstRow */
527       dc.deleteColumns(fam1, col1);
528       r.delete(dc);
529       r.flushcache();
530 
531       HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1),
532           secondRowBytes, thirdRowBytes);
533       HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1),
534           secondRowBytes, thirdRowBytes);
535       r.flushcache();
536 
537       InternalScanner s = r.getScanner(new Scan());
538       // run a major compact, column1 of firstRow will be cleaned.
539       r.compactStores(true);
540 
541       List<Cell> results = new ArrayList<Cell>();
542       s.next(results);
543 
544       // make sure returns column2 of firstRow
545       assertTrue("result is not correct, keyValues : " + results,
546           results.size() == 1);
547       assertTrue(CellUtil.matchingRow(results.get(0), firstRowBytes)); 
548       assertTrue(CellUtil.matchingFamily(results.get(0), fam2));
549 
550       results = new ArrayList<Cell>();
551       s.next(results);
552 
553       // get secondRow
554       assertTrue(results.size() == 2);
555       assertTrue(CellUtil.matchingRow(results.get(0), secondRowBytes));
556       assertTrue(CellUtil.matchingFamily(results.get(0), fam1));
557       assertTrue(CellUtil.matchingFamily(results.get(1), fam2));
558     } finally {
559       HRegion.closeHRegion(this.r);
560     }
561   }
562 
563 
564   /*
565    * @param hri Region
566    * @param flushIndex At what row we start the flush.
567    * @param concurrent if the flush should be concurrent or sync.
568    * @return Count of rows found.
569    * @throws IOException
570    */
571   private int count(final HRegionIncommon hri, final int flushIndex,
572                     boolean concurrent)
573   throws IOException {
574     LOG.info("Taking out counting scan");
575     ScannerIncommon s = hri.getScanner(HConstants.CATALOG_FAMILY, EXPLICIT_COLS,
576         HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP);
577     List<Cell> values = new ArrayList<Cell>();
578     int count = 0;
579     boolean justFlushed = false;
580     while (s.next(values)) {
581       if (justFlushed) {
582         LOG.info("after next() just after next flush");
583         justFlushed=false;
584       }
585       count++;
586       if (flushIndex == count) {
587         LOG.info("Starting flush at flush index " + flushIndex);
588         Thread t = new Thread() {
589           public void run() {
590             try {
591               hri.flushcache();
592               LOG.info("Finishing flush");
593             } catch (IOException e) {
594               LOG.info("Failed flush cache");
595             }
596           }
597         };
598         if (concurrent) {
599           t.start(); // concurrently flush.
600         } else {
601           t.run(); // sync flush
602         }
603         LOG.info("Continuing on after kicking off background flush");
604         justFlushed = true;
605       }
606     }
607     s.close();
608     LOG.info("Found " + count + " items");
609     return count;
610   }
611 
612 }