View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
22  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
23  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertFalse;
26  import static org.junit.Assert.assertNotNull;
27  import static org.junit.Assert.assertTrue;
28  import static org.junit.Assert.fail;
29  
30  import java.io.IOException;
31  import java.util.ArrayList;
32  import java.util.List;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.CategoryBasedTimeout;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.CellUtil;
39  import org.apache.hadoop.hbase.HBaseTestCase;
40  import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
41  import org.apache.hadoop.hbase.HBaseTestCase.ScannerIncommon;
42  import org.apache.hadoop.hbase.HBaseTestingUtility;
43  import org.apache.hadoop.hbase.HColumnDescriptor;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionInfo;
46  import org.apache.hadoop.hbase.HTableDescriptor;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.UnknownScannerException;
49  import org.apache.hadoop.hbase.client.Delete;
50  import org.apache.hadoop.hbase.client.Get;
51  import org.apache.hadoop.hbase.client.Put;
52  import org.apache.hadoop.hbase.client.Result;
53  import org.apache.hadoop.hbase.client.Scan;
54  import org.apache.hadoop.hbase.filter.Filter;
55  import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
56  import org.apache.hadoop.hbase.filter.PrefixFilter;
57  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
58  import org.apache.hadoop.hbase.testclassification.SmallTests;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.junit.Rule;
61  import org.junit.Test;
62  import org.junit.experimental.categories.Category;
63  import org.junit.rules.TestName;
64  import org.junit.rules.TestRule;
65  
66  
67  /**
68   * Test of a long-lived scanner validating as we go.
69   */
70  @Category(SmallTests.class)
71  public class TestScanner {
72    @Rule public TestName name = new TestName();
73    @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
74        withTimeout(this.getClass()).withLookingForStuckThread(true).build();
75  
76    private static final Log LOG = LogFactory.getLog(TestScanner.class);
77    private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
78  
79    private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW;
80    private static final byte [][] COLS = { HConstants.CATALOG_FAMILY };
81    private static final byte [][] EXPLICIT_COLS = {
82      HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER,
83        // TODO ryan
84        //HConstants.STARTCODE_QUALIFIER
85    };
86  
87    static final HTableDescriptor TESTTABLEDESC =
88      new HTableDescriptor(TableName.valueOf("testscanner"));
89    static {
90      TESTTABLEDESC.addFamily(
91          new HColumnDescriptor(HConstants.CATALOG_FAMILY)
92              // Ten is an arbitrary number.  Keep versions to help debugging.
93              .setMaxVersions(10)
94              .setBlockCacheEnabled(false)
95              .setBlocksize(8 * 1024)
96      );
97    }
98    /** HRegionInfo for root region */
99    public static final HRegionInfo REGION_INFO =
100     new HRegionInfo(TESTTABLEDESC.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
101     HConstants.EMPTY_BYTE_ARRAY);
102 
103   private static final byte [] ROW_KEY = REGION_INFO.getRegionName();
104 
105   private static final long START_CODE = Long.MAX_VALUE;
106 
107   private HRegion r;
108   private HRegionIncommon region;
109 
110   private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
111   final private byte[] col1, col2;
112 
113   public TestScanner() {
114     super();
115 
116     firstRowBytes = START_KEY_BYTES;
117     secondRowBytes = START_KEY_BYTES.clone();
118     // Increment the least significant character so we get to next row.
119     secondRowBytes[START_KEY_BYTES.length - 1]++;
120     thirdRowBytes = START_KEY_BYTES.clone();
121     thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
122     col1 = Bytes.toBytes("column1");
123     col2 = Bytes.toBytes("column2");
124   }
125 
126   /**
127    * Test basic stop row filter works.
128    * @throws Exception
129    */
130   @Test
131   public void testStopRow() throws Exception {
132     byte [] startrow = Bytes.toBytes("bbb");
133     byte [] stoprow = Bytes.toBytes("ccc");
134     try {
135       this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
136       HBaseTestCase.addContent(this.r, HConstants.CATALOG_FAMILY);
137       List<Cell> results = new ArrayList<Cell>();
138       // Do simple test of getting one row only first.
139       Scan scan = new Scan(Bytes.toBytes("abc"), Bytes.toBytes("abd"));
140       scan.addFamily(HConstants.CATALOG_FAMILY);
141 
142       InternalScanner s = r.getScanner(scan);
143       int count = 0;
144       while (s.next(results)) {
145         count++;
146       }
147       s.close();
148       assertEquals(0, count);
149       // Now do something a bit more imvolved.
150       scan = new Scan(startrow, stoprow);
151       scan.addFamily(HConstants.CATALOG_FAMILY);
152 
153       s = r.getScanner(scan);
154       count = 0;
155       Cell kv = null;
156       results = new ArrayList<Cell>();
157       for (boolean first = true; s.next(results);) {
158         kv = results.get(0);
159         if (first) {
160           assertTrue(CellUtil.matchingRow(kv,  startrow));
161           first = false;
162         }
163         count++;
164       }
165       assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, CellUtil.cloneRow(kv)) > 0);
166       // We got something back.
167       assertTrue(count > 10);
168       s.close();
169     } finally {
170       HRegion.closeHRegion(this.r);
171     }
172   }
173 
174   void rowPrefixFilter(Scan scan) throws IOException {
175     List<Cell> results = new ArrayList<Cell>();
176     scan.addFamily(HConstants.CATALOG_FAMILY);
177     InternalScanner s = r.getScanner(scan);
178     boolean hasMore = true;
179     while (hasMore) {
180       hasMore = s.next(results);
181       for (Cell kv : results) {
182         assertEquals((byte)'a', CellUtil.cloneRow(kv)[0]);
183         assertEquals((byte)'b', CellUtil.cloneRow(kv)[1]);
184       }
185       results.clear();
186     }
187     s.close();
188   }
189 
190   void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException {
191     List<Cell> results = new ArrayList<Cell>();
192     scan.addFamily(HConstants.CATALOG_FAMILY);
193     InternalScanner s = r.getScanner(scan);
194     boolean hasMore = true;
195     while (hasMore) {
196       hasMore = s.next(results);
197       for (Cell kv : results) {
198         assertTrue(Bytes.compareTo(CellUtil.cloneRow(kv), stopRow) <= 0);
199       }
200       results.clear();
201     }
202     s.close();
203   }
204 
205   @Test
206   public void testFilters() throws IOException {
207     try {
208       this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
209       HBaseTestCase.addContent(this.r, HConstants.CATALOG_FAMILY);
210       byte [] prefix = Bytes.toBytes("ab");
211       Filter newFilter = new PrefixFilter(prefix);
212       Scan scan = new Scan();
213       scan.setFilter(newFilter);
214       rowPrefixFilter(scan);
215 
216       byte[] stopRow = Bytes.toBytes("bbc");
217       newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow));
218       scan = new Scan();
219       scan.setFilter(newFilter);
220       rowInclusiveStopFilter(scan, stopRow);
221 
222     } finally {
223       HRegion.closeHRegion(this.r);
224     }
225   }
226 
227   /**
228    * Test that closing a scanner while a client is using it doesn't throw
229    * NPEs but instead a UnknownScannerException. HBASE-2503
230    * @throws Exception
231    */
232   @Test
233   public void testRaceBetweenClientAndTimeout() throws Exception {
234     try {
235       this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
236       HBaseTestCase.addContent(this.r, HConstants.CATALOG_FAMILY);
237       Scan scan = new Scan();
238       InternalScanner s = r.getScanner(scan);
239       List<Cell> results = new ArrayList<Cell>();
240       try {
241         s.next(results);
242         s.close();
243         s.next(results);
244         fail("We don't want anything more, we should be failing");
245       } catch (UnknownScannerException ex) {
246         // ok!
247         return;
248       }
249     } finally {
250       HRegion.closeHRegion(this.r);
251     }
252   }
253 
254   /** The test!
255    * @throws IOException
256    */
257   @Test
258   public void testScanner() throws IOException {
259     try {
260       r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
261       region = new HRegionIncommon(r);
262 
263       // Write information to the meta table
264 
265       Put put = new Put(ROW_KEY, System.currentTimeMillis());
266 
267       put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
268           REGION_INFO.toByteArray());
269       region.put(put);
270 
271       // What we just committed is in the memstore. Verify that we can get
272       // it back both with scanning and get
273 
274       scan(false, null);
275       getRegionInfo();
276 
277       // Close and re-open
278 
279       ((HRegion)r).close();
280       r = HRegion.openHRegion(r, null);
281       region = new HRegionIncommon(r);
282 
283       // Verify we can get the data back now that it is on disk.
284 
285       scan(false, null);
286       getRegionInfo();
287 
288       // Store some new information
289 
290       String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtility.randomFreePort();
291 
292       put = new Put(ROW_KEY, System.currentTimeMillis());
293       put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
294           Bytes.toBytes(address));
295 
296 //      put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE));
297 
298       region.put(put);
299 
300       // Validate that we can still get the HRegionInfo, even though it is in
301       // an older row on disk and there is a newer row in the memstore
302 
303       scan(true, address.toString());
304       getRegionInfo();
305 
306       // flush cache
307 
308       region.flushcache();
309 
310       // Validate again
311 
312       scan(true, address.toString());
313       getRegionInfo();
314 
315       // Close and reopen
316 
317       ((HRegion)r).close();
318       r = HRegion.openHRegion(r,null);
319       region = new HRegionIncommon(r);
320 
321       // Validate again
322 
323       scan(true, address.toString());
324       getRegionInfo();
325 
326       // Now update the information again
327 
328       address = "bar.foo.com:4321";
329 
330       put = new Put(ROW_KEY, System.currentTimeMillis());
331 
332       put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
333           Bytes.toBytes(address));
334       region.put(put);
335 
336       // Validate again
337 
338       scan(true, address.toString());
339       getRegionInfo();
340 
341       // flush cache
342 
343       region.flushcache();
344 
345       // Validate again
346 
347       scan(true, address.toString());
348       getRegionInfo();
349 
350       // Close and reopen
351 
352       ((HRegion)r).close();
353       r = HRegion.openHRegion(r,null);
354       region = new HRegionIncommon(r);
355 
356       // Validate again
357 
358       scan(true, address.toString());
359       getRegionInfo();
360 
361     } finally {
362       // clean up
363       HRegion.closeHRegion(r);
364     }
365   }
366 
367   /** Compare the HRegionInfo we read from HBase to what we stored */
368   private void validateRegionInfo(byte [] regionBytes) throws IOException {
369     HRegionInfo info = HRegionInfo.parseFromOrNull(regionBytes);
370 
371     assertEquals(REGION_INFO.getRegionId(), info.getRegionId());
372     assertEquals(0, info.getStartKey().length);
373     assertEquals(0, info.getEndKey().length);
374     assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName()));
375     //assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc()));
376   }
377 
378   /** Use a scanner to get the region info and then validate the results */
379   private void scan(boolean validateStartcode, String serverName)
380   throws IOException {
381     InternalScanner scanner = null;
382     Scan scan = null;
383     List<Cell> results = new ArrayList<Cell>();
384     byte [][][] scanColumns = {
385         COLS,
386         EXPLICIT_COLS
387     };
388 
389     for(int i = 0; i < scanColumns.length; i++) {
390       try {
391         scan = new Scan(FIRST_ROW);
392         for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) {
393           scan.addColumn(COLS[0],  EXPLICIT_COLS[ii]);
394         }
395         scanner = r.getScanner(scan);
396         while (scanner.next(results)) {
397           assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
398               HConstants.REGIONINFO_QUALIFIER));
399           byte [] val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY,
400               HConstants.REGIONINFO_QUALIFIER));
401           validateRegionInfo(val);
402           if(validateStartcode) {
403 //            assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
404 //                HConstants.STARTCODE_QUALIFIER));
405 //            val = getColumn(results, HConstants.CATALOG_FAMILY,
406 //                HConstants.STARTCODE_QUALIFIER).getValue();
407             assertNotNull(val);
408             assertFalse(val.length == 0);
409             long startCode = Bytes.toLong(val);
410             assertEquals(START_CODE, startCode);
411           }
412 
413           if(serverName != null) {
414             assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
415                 HConstants.SERVER_QUALIFIER));
416             val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY,
417                 HConstants.SERVER_QUALIFIER));
418             assertNotNull(val);
419             assertFalse(val.length == 0);
420             String server = Bytes.toString(val);
421             assertEquals(0, server.compareTo(serverName));
422           }
423         }
424       } finally {
425         InternalScanner s = scanner;
426         scanner = null;
427         if(s != null) {
428           s.close();
429         }
430       }
431     }
432   }
433 
434   private boolean hasColumn(final List<Cell> kvs, final byte [] family,
435       final byte [] qualifier) {
436     for (Cell kv: kvs) {
437       if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
438         return true;
439       }
440     }
441     return false;
442   }
443 
444   private Cell getColumn(final List<Cell> kvs, final byte [] family,
445       final byte [] qualifier) {
446     for (Cell kv: kvs) {
447       if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
448         return kv;
449       }
450     }
451     return null;
452   }
453 
454 
455   /** Use get to retrieve the HRegionInfo and validate it */
456   private void getRegionInfo() throws IOException {
457     Get get = new Get(ROW_KEY);
458     get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
459     Result result = region.get(get);
460     byte [] bytes = result.value();
461     validateRegionInfo(bytes);
462   }
463 
464   /**
465    * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner
466    * update readers code essentially.  This is not highly concurrent, since its all 1 thread.
467    * HBase-910.
468    * @throws Exception
469    */
470   @Test
471   public void testScanAndSyncFlush() throws Exception {
472     this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
473     HRegionIncommon hri = new HRegionIncommon(r);
474     try {
475         LOG.info("Added: " + HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
476             Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
477       int count = count(hri, -1, false);
478       assertEquals(count, count(hri, 100, false)); // do a sync flush.
479     } catch (Exception e) {
480       LOG.error("Failed", e);
481       throw e;
482     } finally {
483       HRegion.closeHRegion(this.r);
484     }
485   }
486 
487   /**
488    * Tests to do a concurrent flush (using a 2nd thread) while scanning.  This tests both
489    * the StoreScanner update readers and the transition from memstore -> snapshot -> store file.
490    *
491    * @throws Exception
492    */
493   @Test
494   public void testScanAndRealConcurrentFlush() throws Exception {
495     this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
496     HRegionIncommon hri = new HRegionIncommon(r);
497     try {
498         LOG.info("Added: " + HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
499             Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
500       int count = count(hri, -1, false);
501       assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush
502     } catch (Exception e) {
503       LOG.error("Failed", e);
504       throw e;
505     } finally {
506       HRegion.closeHRegion(this.r);
507     }
508   }
509 
510   /**
511    * Make sure scanner returns correct result when we run a major compaction
512    * with deletes.
513    *
514    * @throws Exception
515    */
516   @Test
517   @SuppressWarnings("deprecation")
518   public void testScanAndConcurrentMajorCompact() throws Exception {
519     HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name.getMethodName());
520     this.r = TEST_UTIL.createLocalHRegion(htd, null, null);
521     HRegionIncommon hri = new HRegionIncommon(r);
522 
523     try {
524       HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1),
525           firstRowBytes, secondRowBytes);
526       HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1),
527           firstRowBytes, secondRowBytes);
528 
529       Delete dc = new Delete(firstRowBytes);
530       /* delete column1 of firstRow */
531       dc.deleteColumns(fam1, col1);
532       r.delete(dc);
533       r.flush(true);
534 
535       HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1),
536           secondRowBytes, thirdRowBytes);
537       HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1),
538           secondRowBytes, thirdRowBytes);
539       r.flush(true);
540 
541       InternalScanner s = r.getScanner(new Scan());
542       // run a major compact, column1 of firstRow will be cleaned.
543       r.compact(true);
544 
545       List<Cell> results = new ArrayList<Cell>();
546       s.next(results);
547 
548       // make sure returns column2 of firstRow
549       assertTrue("result is not correct, keyValues : " + results,
550           results.size() == 1);
551       assertTrue(CellUtil.matchingRow(results.get(0), firstRowBytes)); 
552       assertTrue(CellUtil.matchingFamily(results.get(0), fam2));
553 
554       results = new ArrayList<Cell>();
555       s.next(results);
556 
557       // get secondRow
558       assertTrue(results.size() == 2);
559       assertTrue(CellUtil.matchingRow(results.get(0), secondRowBytes));
560       assertTrue(CellUtil.matchingFamily(results.get(0), fam1));
561       assertTrue(CellUtil.matchingFamily(results.get(1), fam2));
562     } finally {
563       HRegion.closeHRegion(this.r);
564     }
565   }
566 
567 
568   /*
569    * @param hri Region
570    * @param flushIndex At what row we start the flush.
571    * @param concurrent if the flush should be concurrent or sync.
572    * @return Count of rows found.
573    * @throws IOException
574    */
575   private int count(final HRegionIncommon hri, final int flushIndex,
576                     boolean concurrent)
577   throws IOException {
578     LOG.info("Taking out counting scan");
579     ScannerIncommon s = hri.getScanner(HConstants.CATALOG_FAMILY, EXPLICIT_COLS,
580         HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP);
581     List<Cell> values = new ArrayList<Cell>();
582     int count = 0;
583     boolean justFlushed = false;
584     while (s.next(values)) {
585       if (justFlushed) {
586         LOG.info("after next() just after next flush");
587         justFlushed=false;
588       }
589       count++;
590       if (flushIndex == count) {
591         LOG.info("Starting flush at flush index " + flushIndex);
592         Thread t = new Thread() {
593           public void run() {
594             try {
595               hri.flushcache();
596               LOG.info("Finishing flush");
597             } catch (IOException e) {
598               LOG.info("Failed flush cache");
599             }
600           }
601         };
602         if (concurrent) {
603           t.start(); // concurrently flush.
604         } else {
605           t.run(); // sync flush
606         }
607         LOG.info("Continuing on after kicking off background flush");
608         justFlushed = true;
609       }
610     }
611     s.close();
612     LOG.info("Found " + count + " items");
613     return count;
614   }
615 
616 }