1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.apache.hadoop.hbase.filter;
17
18 import static org.junit.Assert.assertEquals;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.List;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.Cell;
27 import org.apache.hadoop.hbase.CellUtil;
28 import org.apache.hadoop.hbase.HBaseTestingUtility;
29 import org.apache.hadoop.hbase.MediumTests;
30 import org.apache.hadoop.hbase.client.Durability;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.client.Put;
33 import org.apache.hadoop.hbase.client.Result;
34 import org.apache.hadoop.hbase.client.ResultScanner;
35 import org.apache.hadoop.hbase.client.Scan;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.Pair;
38 import org.jboss.netty.buffer.ChannelBuffer;
39 import org.jboss.netty.buffer.ChannelBuffers;
40 import org.junit.After;
41 import org.junit.AfterClass;
42 import org.junit.Before;
43 import org.junit.BeforeClass;
44 import org.junit.Test;
45 import org.junit.experimental.categories.Category;
46
47 import com.google.common.collect.Lists;
48
49
50
51 @Category(MediumTests.class)
52 public class TestFuzzyRowAndColumnRangeFilter {
53 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
54 private final Log LOG = LogFactory.getLog(this.getClass());
55
56
57
58
59 @BeforeClass
60 public static void setUpBeforeClass() throws Exception {
61 TEST_UTIL.startMiniCluster();
62 }
63
64
65
66
67 @AfterClass
68 public static void tearDownAfterClass() throws Exception {
69 TEST_UTIL.shutdownMiniCluster();
70 }
71
72
73
74
75 @Before
76 public void setUp() throws Exception {
77
78 }
79
80
81
82
83 @After
84 public void tearDown() throws Exception {
85
86 }
87
88 @Test
89 public void Test() throws Exception {
90 String cf = "f";
91 String table = "TestFuzzyAndColumnRangeFilterClient";
92 HTable ht = TEST_UTIL.createTable(Bytes.toBytes(table),
93 Bytes.toBytes(cf), Integer.MAX_VALUE);
94
95
96
97
98
99 for (int i1 = 0; i1 < 2; i1++) {
100 for (int i2 = 0; i2 < 5; i2++) {
101 byte[] rk = new byte[10];
102
103 ChannelBuffer buf = ChannelBuffers.wrappedBuffer(rk);
104 buf.clear();
105 buf.writeShort((short) 2);
106 buf.writeInt(i1);
107 buf.writeInt(i2);
108
109 for (int c = 0; c < 5; c++) {
110 byte[] cq = new byte[4];
111 Bytes.putBytes(cq, 0, Bytes.toBytes(c), 0, 4);
112
113 Put p = new Put(rk);
114 p.setDurability(Durability.SKIP_WAL);
115 p.add(cf.getBytes(), cq, Bytes.toBytes(c));
116 ht.put(p);
117 LOG.info("Inserting: rk: " + Bytes.toStringBinary(rk) + " cq: "
118 + Bytes.toStringBinary(cq));
119 }
120 }
121 }
122
123 TEST_UTIL.flush();
124
125
126 runTest(ht, 0, 10);
127
128
129 runTest(ht, 1, 8);
130 }
131
132 private void runTest(HTable hTable, int cqStart, int expectedSize) throws IOException {
133
134 byte[] fuzzyKey = new byte[10];
135 ChannelBuffer buf = ChannelBuffers.wrappedBuffer(fuzzyKey);
136 buf.clear();
137 buf.writeShort((short) 2);
138 for (int i = 0; i < 4; i++)
139 buf.writeByte((short)63);
140 buf.writeInt((short)1);
141
142 byte[] mask = new byte[] {0 , 0, 1, 1, 1, 1, 0, 0, 0, 0};
143
144 Pair<byte[], byte[]> pair = new Pair<byte[], byte[]>(fuzzyKey, mask);
145 FuzzyRowFilter fuzzyRowFilter = new FuzzyRowFilter(Lists.newArrayList(pair));
146 ColumnRangeFilter columnRangeFilter = new ColumnRangeFilter(Bytes.toBytes(cqStart), true
147 , Bytes.toBytes(4), true);
148
149 runScanner(hTable, expectedSize, fuzzyRowFilter, columnRangeFilter);
150
151 runScanner(hTable, expectedSize, columnRangeFilter, fuzzyRowFilter);
152 }
153
154 private void runScanner(HTable hTable, int expectedSize, Filter... filters) throws IOException {
155 String cf = "f";
156 Scan scan = new Scan();
157 scan.addFamily(cf.getBytes());
158 FilterList filterList = new FilterList(filters);
159 scan.setFilter(filterList);
160
161 ResultScanner scanner = hTable.getScanner(scan);
162 List<Cell> results = new ArrayList<Cell>();
163 Result result;
164 long timeBeforeScan = System.currentTimeMillis();
165 while ((result = scanner.next()) != null) {
166 for (Cell kv : result.listCells()) {
167 LOG.info("Got rk: " + Bytes.toStringBinary(CellUtil.cloneRow(kv)) + " cq: "
168 + Bytes.toStringBinary(CellUtil.cloneQualifier(kv)));
169 results.add(kv);
170 }
171 }
172 long scanTime = System.currentTimeMillis() - timeBeforeScan;
173 scanner.close();
174
175 LOG.info("scan time = " + scanTime + "ms");
176 LOG.info("found " + results.size() + " results");
177
178 assertEquals(expectedSize, results.size());
179 }
180 }