View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.filter;
19  
20  import static org.junit.Assert.assertEquals;
21  
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.List;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.CellUtil;
32  import org.apache.hadoop.hbase.HBaseTestingUtility;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.client.Durability;
36  import org.apache.hadoop.hbase.client.Put;
37  import org.apache.hadoop.hbase.client.Result;
38  import org.apache.hadoop.hbase.client.ResultScanner;
39  import org.apache.hadoop.hbase.client.Scan;
40  import org.apache.hadoop.hbase.client.HTable;
41  import org.apache.hadoop.hbase.client.Table;
42  import org.apache.hadoop.hbase.filter.FilterList.Operator;
43  import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
44  import org.apache.hadoop.hbase.regionserver.HRegion;
45  import org.apache.hadoop.hbase.regionserver.RegionScanner;
46  import org.apache.hadoop.hbase.testclassification.MediumTests;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.Pair;
49  import org.junit.After;
50  import org.junit.AfterClass;
51  import org.junit.Before;
52  import org.junit.BeforeClass;
53  import org.junit.Test;
54  import org.junit.experimental.categories.Category;
55  
56  import com.google.common.collect.Lists;
57  
58  /**
59   */
60  @Category(MediumTests.class)
61  public class TestFuzzyRowFilterEndToEnd {
62    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
63    private final static byte fuzzyValue = (byte) 63;
64    private static final Log LOG = LogFactory.getLog(TestFuzzyRowFilterEndToEnd.class);
65  
66    private static int firstPartCardinality = 50;
67    private static int secondPartCardinality = 50;
68    private static int thirdPartCardinality = 50;
69    private static int colQualifiersTotal = 5;
70    private static int totalFuzzyKeys = thirdPartCardinality / 2;
71  
72    private static String table = "TestFuzzyRowFilterEndToEnd";
73  
74    /**
75     * @throws java.lang.Exception
76     */
77    @BeforeClass
78    public static void setUpBeforeClass() throws Exception {
79      Configuration conf = TEST_UTIL.getConfiguration();
80      conf.setInt("hbase.client.scanner.caching", 1000);
81      conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
82        ConstantSizeRegionSplitPolicy.class.getName());
83      // set no splits
84      conf.setLong(HConstants.HREGION_MAX_FILESIZE, ((long) 1024) * 1024 * 1024 * 10);
85  
86      TEST_UTIL.startMiniCluster();
87    }
88  
89    /**
90     * @throws java.lang.Exception
91     */
92    @AfterClass
93    public static void tearDownAfterClass() throws Exception {
94      TEST_UTIL.shutdownMiniCluster();
95    }
96  
97    /**
98     * @throws java.lang.Exception
99     */
100   @Before
101   public void setUp() throws Exception {
102     // Nothing to do.
103   }
104 
105   /**
106    * @throws java.lang.Exception
107    */
108   @After
109   public void tearDown() throws Exception {
110     // Nothing to do.
111   }
112 
113   @Test  
114   public void testHBASE14782() throws IOException
115   {
116     String cf = "f";
117     String cq = "q";
118     String table = "HBASE14872";
119 
120     Table ht =
121         TEST_UTIL.createTable(TableName.valueOf(table), Bytes.toBytes(cf), Integer.MAX_VALUE);
122     // Load data
123     String[] rows = new String[]{
124         "\\x9C\\x00\\x044\\x00\\x00\\x00\\x00",
125         "\\x9C\\x00\\x044\\x01\\x00\\x00\\x00", 
126         "\\x9C\\x00\\x044\\x00\\x01\\x00\\x00",
127         "\\x9C\\x00\\x044\\x00\\x00\\x01\\x00",
128         "\\x9C\\x00\\x044\\x00\\x01\\x00\\x01", 
129         "\\x9B\\x00\\x044e\\xBB\\xB2\\xBB", 
130     };
131     
132     String badRow = "\\x9C\\x00\\x03\\xE9e\\xBB{X\\x1Fwts\\x1F\\x15vRX";
133     
134     for(int i=0; i < rows.length; i++){
135       Put p = new Put(Bytes.toBytesBinary(rows[i]));
136       p.addColumn(cf.getBytes(), cq.getBytes(), "value".getBytes());
137       ht.put(p);            
138     }
139     
140     Put p = new Put(Bytes.toBytesBinary(badRow));
141     p.addColumn(cf.getBytes(), cq.getBytes(), "value".getBytes());
142     ht.put(p);            
143 
144     TEST_UTIL.flush();
145 
146     List<Pair<byte[], byte[]>> data =  new ArrayList<Pair<byte[], byte[]>>();
147     byte[] fuzzyKey = Bytes.toBytesBinary("\\x00\\x00\\x044");
148     byte[] mask = new byte[] { 1,0,0,0};
149     data.add(new Pair<byte[], byte[]>(fuzzyKey, mask));
150     FuzzyRowFilter filter = new FuzzyRowFilter(data);
151     
152     Scan scan = new Scan();
153     scan.setFilter(filter);
154     
155     ResultScanner scanner = ht.getScanner(scan);
156     int total = 0;
157     while(scanner.next() != null){
158       total++;
159     }    
160     assertEquals(rows.length, total);
161     TEST_UTIL.deleteTable(TableName.valueOf(table));
162   }
163   
164   @Test
165   public void testEndToEnd() throws Exception {
166     String cf = "f";
167 
168     HTable ht =
169         TEST_UTIL.createTable(TableName.valueOf(table), Bytes.toBytes(cf), Integer.MAX_VALUE);
170 
171     // 10 byte row key - (2 bytes 4 bytes 4 bytes)
172     // 4 byte qualifier
173     // 4 byte value
174 
175     for (int i0 = 0; i0 < firstPartCardinality; i0++) {
176 
177       for (int i1 = 0; i1 < secondPartCardinality; i1++) {
178 
179         for (int i2 = 0; i2 < thirdPartCardinality; i2++) {
180           byte[] rk = new byte[10];
181 
182           ByteBuffer buf = ByteBuffer.wrap(rk);
183           buf.clear();
184           buf.putShort((short) i0);
185           buf.putInt(i1);
186           buf.putInt(i2);
187           for (int c = 0; c < colQualifiersTotal; c++) {
188             byte[] cq = new byte[4];
189             Bytes.putBytes(cq, 0, Bytes.toBytes(c), 0, 4);
190 
191             Put p = new Put(rk);
192             p.setDurability(Durability.SKIP_WAL);
193             p.add(cf.getBytes(), cq, Bytes.toBytes(c));
194             ht.put(p);
195           }
196         }
197       }
198     }
199 
200     TEST_UTIL.flush();
201 
202     // test passes
203     runTest1(ht);
204     runTest2(ht);
205 
206   }
207 
208   private void runTest1(Table hTable) throws IOException {
209     // [0, 2, ?, ?, ?, ?, 0, 0, 0, 1]
210 
211     byte[] mask = new byte[] { 0, 0, 1, 1, 1, 1, 0, 0, 0, 0 };
212 
213     List<Pair<byte[], byte[]>> list = new ArrayList<Pair<byte[], byte[]>>();
214     for (int i = 0; i < totalFuzzyKeys; i++) {
215       byte[] fuzzyKey = new byte[10];
216       ByteBuffer buf = ByteBuffer.wrap(fuzzyKey);
217       buf.clear();
218       buf.putShort((short) 2);
219       for (int j = 0; j < 4; j++) {
220         buf.put(fuzzyValue);
221       }
222       buf.putInt(i);
223 
224       Pair<byte[], byte[]> pair = new Pair<byte[], byte[]>(fuzzyKey, mask);
225       list.add(pair);
226     }
227 
228     int expectedSize = secondPartCardinality * totalFuzzyKeys * colQualifiersTotal;
229     FuzzyRowFilter fuzzyRowFilter0 = new FuzzyRowFilter(list);
230     // Filters are not stateless - we can't reuse them
231     FuzzyRowFilter fuzzyRowFilter1 = new FuzzyRowFilter(list);
232 
233     // regular test
234     runScanner(hTable, expectedSize, fuzzyRowFilter0);
235     // optimized from block cache
236     runScanner(hTable, expectedSize, fuzzyRowFilter1);
237 
238   }
239 
240   private void runTest2(Table hTable) throws IOException {
241     // [0, 0, ?, ?, ?, ?, 0, 0, 0, 0] , [0, 1, ?, ?, ?, ?, 0, 0, 0, 1]...
242 
243     byte[] mask = new byte[] { 0, 0, 1, 1, 1, 1, 0, 0, 0, 0 };
244 
245     List<Pair<byte[], byte[]>> list = new ArrayList<Pair<byte[], byte[]>>();
246 
247     for (int i = 0; i < totalFuzzyKeys; i++) {
248       byte[] fuzzyKey = new byte[10];
249       ByteBuffer buf = ByteBuffer.wrap(fuzzyKey);
250       buf.clear();
251       buf.putShort((short) (i * 2));
252       for (int j = 0; j < 4; j++) {
253         buf.put(fuzzyValue);
254       }
255       buf.putInt(i * 2);
256 
257       Pair<byte[], byte[]> pair = new Pair<byte[], byte[]>(fuzzyKey, mask);
258       list.add(pair);
259     }
260 
261     int expectedSize = totalFuzzyKeys * secondPartCardinality * colQualifiersTotal;
262 
263     FuzzyRowFilter fuzzyRowFilter0 = new FuzzyRowFilter(list);
264     // Filters are not stateless - we can't reuse them
265     FuzzyRowFilter fuzzyRowFilter1 = new FuzzyRowFilter(list);
266 
267     // regular test
268     runScanner(hTable, expectedSize, fuzzyRowFilter0);
269     // optimized from block cache
270     runScanner(hTable, expectedSize, fuzzyRowFilter1);
271 
272   }
273 
274   private void runScanner(Table hTable, int expectedSize, Filter filter) throws IOException {
275 
276     String cf = "f";
277     Scan scan = new Scan();
278     scan.addFamily(cf.getBytes());
279     scan.setFilter(filter);
280     List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table.getBytes());
281     HRegion first = regions.get(0);
282     first.getScanner(scan);
283     RegionScanner scanner = first.getScanner(scan);
284     List<Cell> results = new ArrayList<Cell>();
285     // Result result;
286     long timeBeforeScan = System.currentTimeMillis();
287     int found = 0;
288     while (scanner.next(results)) {
289       found += results.size();
290       results.clear();
291     }
292     found += results.size();
293     long scanTime = System.currentTimeMillis() - timeBeforeScan;
294     scanner.close();
295 
296     LOG.info("\nscan time = " + scanTime + "ms");
297     LOG.info("found " + found + " results\n");
298 
299     assertEquals(expectedSize, found);
300   }
301 
302   @SuppressWarnings("deprecation")
303   @Test
304   public void testFilterList() throws Exception {
305     String cf = "f";
306     String table = "TestFuzzyRowFiltersInFilterList";
307     HTable ht =
308         TEST_UTIL.createTable(TableName.valueOf(table), Bytes.toBytes(cf), Integer.MAX_VALUE);
309 
310     // 10 byte row key - (2 bytes 4 bytes 4 bytes)
311     // 4 byte qualifier
312     // 4 byte value
313 
314     for (int i1 = 0; i1 < 5; i1++) {
315       for (int i2 = 0; i2 < 5; i2++) {
316         byte[] rk = new byte[10];
317 
318         ByteBuffer buf = ByteBuffer.wrap(rk);
319         buf.clear();
320         buf.putShort((short) 2);
321         buf.putInt(i1);
322         buf.putInt(i2);
323 
324         // Each row contains 5 columns
325         for (int c = 0; c < 5; c++) {
326           byte[] cq = new byte[4];
327           Bytes.putBytes(cq, 0, Bytes.toBytes(c), 0, 4);
328 
329           Put p = new Put(rk);
330           p.setDurability(Durability.SKIP_WAL);
331           p.add(cf.getBytes(), cq, Bytes.toBytes(c));
332           ht.put(p);
333           LOG.info("Inserting: rk: " + Bytes.toStringBinary(rk) + " cq: "
334               + Bytes.toStringBinary(cq));
335         }
336       }
337     }
338 
339     TEST_UTIL.flush();
340 
341     // test passes if we get back 5 KV's (1 row)
342     runTest(ht, 5);
343 
344   }
345 
346   @SuppressWarnings("unchecked")
347   private void runTest(HTable hTable, int expectedSize) throws IOException {
348     // [0, 2, ?, ?, ?, ?, 0, 0, 0, 1]
349     byte[] fuzzyKey1 = new byte[10];
350     ByteBuffer buf = ByteBuffer.wrap(fuzzyKey1);
351     buf.clear();
352     buf.putShort((short) 2);
353     for (int i = 0; i < 4; i++)
354       buf.put(fuzzyValue);
355     buf.putInt((short) 1);
356     byte[] mask1 = new byte[] { 0, 0, 1, 1, 1, 1, 0, 0, 0, 0 };
357 
358     byte[] fuzzyKey2 = new byte[10];
359     buf = ByteBuffer.wrap(fuzzyKey2);
360     buf.clear();
361     buf.putShort((short) 2);
362     buf.putInt((short) 2);
363     for (int i = 0; i < 4; i++)
364       buf.put(fuzzyValue);
365 
366     byte[] mask2 = new byte[] { 0, 0, 0, 0, 0, 0, 1, 1, 1, 1 };
367 
368     Pair<byte[], byte[]> pair1 = new Pair<byte[], byte[]>(fuzzyKey1, mask1);
369     Pair<byte[], byte[]> pair2 = new Pair<byte[], byte[]>(fuzzyKey2, mask2);
370 
371     FuzzyRowFilter fuzzyRowFilter1 = new FuzzyRowFilter(Lists.newArrayList(pair1));
372     FuzzyRowFilter fuzzyRowFilter2 = new FuzzyRowFilter(Lists.newArrayList(pair2));
373     // regular test - we expect 1 row back (5 KVs)
374     runScanner(hTable, expectedSize, fuzzyRowFilter1, fuzzyRowFilter2);
375   }
376 
377   private void runScanner(Table hTable, int expectedSize, Filter filter1, Filter filter2)
378       throws IOException {
379     String cf = "f";
380     Scan scan = new Scan();
381     scan.addFamily(cf.getBytes());
382     FilterList filterList = new FilterList(Operator.MUST_PASS_ALL, filter1, filter2);
383     scan.setFilter(filterList);
384 
385     ResultScanner scanner = hTable.getScanner(scan);
386     List<Cell> results = new ArrayList<Cell>();
387     Result result;
388     long timeBeforeScan = System.currentTimeMillis();
389     while ((result = scanner.next()) != null) {
390       for (Cell kv : result.listCells()) {
391         LOG.info("Got rk: " + Bytes.toStringBinary(CellUtil.cloneRow(kv)) + " cq: "
392             + Bytes.toStringBinary(CellUtil.cloneQualifier(kv)));
393         results.add(kv);
394       }
395     }
396     long scanTime = System.currentTimeMillis() - timeBeforeScan;
397     scanner.close();
398 
399     LOG.info("scan time = " + scanTime + "ms");
400     LOG.info("found " + results.size() + " results");
401 
402     assertEquals(expectedSize, results.size());
403   }
404 }