View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.junit.Assert.*;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.*;
31  import org.apache.hadoop.hbase.filter.Filter;
32  import org.apache.hadoop.hbase.filter.TimestampsFilter;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.junit.After;
35  import org.junit.AfterClass;
36  import org.junit.Before;
37  import org.junit.BeforeClass;
38  import org.junit.Test;
39  import org.junit.experimental.categories.Category;
40  
41  /**
42   * Run tests related to {@link TimestampsFilter} using HBase client APIs.
43   * Sets up the HBase mini cluster once at start. Each creates a table
44   * named for the method and does its stuff against that.
45   */
46  @Category(MediumTests.class)
47  public class TestTimestampsFilter {
48    final Log LOG = LogFactory.getLog(getClass());
49    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
50  
51    /**
52     * @throws java.lang.Exception
53     */
54    @BeforeClass
55    public static void setUpBeforeClass() throws Exception {
56      TEST_UTIL.startMiniCluster();
57    }
58  
59    /**
60     * @throws java.lang.Exception
61     */
62    @AfterClass
63    public static void tearDownAfterClass() throws Exception {
64      TEST_UTIL.shutdownMiniCluster();
65    }
66  
67    /**
68     * @throws java.lang.Exception
69     */
70    @Before
71    public void setUp() throws Exception {
72      // Nothing to do.
73    }
74  
75    /**
76     * @throws java.lang.Exception
77     */
78    @After
79    public void tearDown() throws Exception {
80      // Nothing to do.
81    }
82  
83    /**
84     * Test from client side for TimestampsFilter.
85     *
86     * The TimestampsFilter provides the ability to request cells (KeyValues)
87     * whose timestamp/version is in the specified list of timestamps/version.
88     *
89     * @throws Exception
90     */
91    @Test
92    public void testTimestampsFilter() throws Exception {
93      byte [] TABLE = Bytes.toBytes("testTimestampsFilter");
94      byte [] FAMILY = Bytes.toBytes("event_log");
95      byte [][] FAMILIES = new byte[][] { FAMILY };
96      Cell kvs[];
97  
98      // create table; set versions to max...
99      HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
100 
101     for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
102       for (int colIdx = 0; colIdx < 5; colIdx++) {
103         // insert versions 201..300
104         putNVersions(ht, FAMILY, rowIdx, colIdx, 201, 300);
105         // insert versions 1..100
106         putNVersions(ht, FAMILY, rowIdx, colIdx, 1, 100);
107       }
108     }
109 
110     // do some verification before flush
111     verifyInsertedValues(ht, FAMILY);
112 
113     TEST_UTIL.flush();
114 
115     // do some verification after flush
116     verifyInsertedValues(ht, FAMILY);
117 
118     // Insert some more versions after flush. These should be in memstore.
119     // After this we should have data in both memstore & HFiles.
120     for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
121       for (int colIdx = 0; colIdx < 5; colIdx++) {
122         putNVersions(ht, FAMILY, rowIdx, colIdx, 301, 400);
123         putNVersions(ht, FAMILY, rowIdx, colIdx, 101, 200);
124       }
125     }
126 
127     for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
128       for (int colIdx = 0; colIdx < 5; colIdx++) {
129         kvs = getNVersions(ht, FAMILY, rowIdx, colIdx,
130                            Arrays.asList(505L, 5L, 105L, 305L, 205L));
131         assertEquals(4, kvs.length);
132         checkOneCell(kvs[0], FAMILY, rowIdx, colIdx, 305);
133         checkOneCell(kvs[1], FAMILY, rowIdx, colIdx, 205);
134         checkOneCell(kvs[2], FAMILY, rowIdx, colIdx, 105);
135         checkOneCell(kvs[3], FAMILY, rowIdx, colIdx, 5);
136       }
137     }
138 
139     // Request an empty list of versions using the Timestamps filter;
140     // Should return none.
141     kvs = getNVersions(ht, FAMILY, 2, 2, new ArrayList<Long>());
142     assertEquals(0, kvs == null? 0: kvs.length);
143 
144     //
145     // Test the filter using a Scan operation
146     // Scan rows 0..4. For each row, get all its columns, but only
147     // those versions of the columns with the specified timestamps.
148     Result[] results = scanNVersions(ht, FAMILY, 0, 4,
149                                      Arrays.asList(6L, 106L, 306L));
150     assertEquals("# of rows returned from scan", 5, results.length);
151     for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
152       kvs = results[rowIdx].rawCells();
153       // each row should have 5 columns.
154       // And we have requested 3 versions for each.
155       assertEquals("Number of KeyValues in result for row:" + rowIdx,
156                    3*5, kvs.length);
157       for (int colIdx = 0; colIdx < 5; colIdx++) {
158         int offset = colIdx * 3;
159         checkOneCell(kvs[offset + 0], FAMILY, rowIdx, colIdx, 306);
160         checkOneCell(kvs[offset + 1], FAMILY, rowIdx, colIdx, 106);
161         checkOneCell(kvs[offset + 2], FAMILY, rowIdx, colIdx, 6);
162       }
163     }
164     ht.close();
165   }
166 
167   @Test
168   public void testMultiColumns() throws Exception {
169     byte [] TABLE = Bytes.toBytes("testTimestampsFilterMultiColumns");
170     byte [] FAMILY = Bytes.toBytes("event_log");
171     byte [][] FAMILIES = new byte[][] { FAMILY };
172 
173     // create table; set versions to max...
174     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
175 
176     Put p = new Put(Bytes.toBytes("row"));
177     p.add(FAMILY, Bytes.toBytes("column0"), 3, Bytes.toBytes("value0-3"));
178     p.add(FAMILY, Bytes.toBytes("column1"), 3, Bytes.toBytes("value1-3"));
179     p.add(FAMILY, Bytes.toBytes("column2"), 1, Bytes.toBytes("value2-1"));
180     p.add(FAMILY, Bytes.toBytes("column2"), 2, Bytes.toBytes("value2-2"));
181     p.add(FAMILY, Bytes.toBytes("column2"), 3, Bytes.toBytes("value2-3"));
182     p.add(FAMILY, Bytes.toBytes("column3"), 2, Bytes.toBytes("value3-2"));
183     p.add(FAMILY, Bytes.toBytes("column4"), 1, Bytes.toBytes("value4-1"));
184     p.add(FAMILY, Bytes.toBytes("column4"), 2, Bytes.toBytes("value4-2"));
185     p.add(FAMILY, Bytes.toBytes("column4"), 3, Bytes.toBytes("value4-3"));
186     ht.put(p);
187 
188     ArrayList<Long> timestamps = new ArrayList<Long>();
189     timestamps.add(new Long(3));
190     TimestampsFilter filter = new TimestampsFilter(timestamps);
191 
192     Get g = new Get(Bytes.toBytes("row"));
193     g.setFilter(filter);
194     g.setMaxVersions();
195     g.addColumn(FAMILY, Bytes.toBytes("column2"));
196     g.addColumn(FAMILY, Bytes.toBytes("column4"));
197 
198     Result result = ht.get(g);
199     for (Cell kv : result.listCells()) {
200       System.out.println("found row " + Bytes.toString(CellUtil.cloneRow(kv)) +
201           ", column " + Bytes.toString(CellUtil.cloneQualifier(kv)) + ", value "
202           + Bytes.toString(CellUtil.cloneValue(kv)));
203     }
204 
205     assertEquals(result.listCells().size(), 2);
206     assertTrue(CellUtil.matchingValue(result.listCells().get(0), Bytes.toBytes("value2-3")));
207     assertTrue(CellUtil.matchingValue(result.listCells().get(1), Bytes.toBytes("value4-3")));
208 
209     ht.close();
210   }
211 
212   /**
213    * Test TimestampsFilter in the presence of version deletes.
214    *
215    * @throws Exception
216    */
217   @Test
218   public void testWithVersionDeletes() throws Exception {
219 
220     // first test from memstore (without flushing).
221     testWithVersionDeletes(false);
222 
223     // run same test against HFiles (by forcing a flush).
224     testWithVersionDeletes(true);
225   }
226 
227   private void testWithVersionDeletes(boolean flushTables) throws IOException {
228     byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" +
229                                    (flushTables ? "flush" : "noflush")); 
230     byte [] FAMILY = Bytes.toBytes("event_log");
231     byte [][] FAMILIES = new byte[][] { FAMILY };
232 
233     // create table; set versions to max...
234     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
235 
236     // For row:0, col:0: insert versions 1 through 5.
237     putNVersions(ht, FAMILY, 0, 0, 1, 5);
238 
239     // delete version 4.
240     deleteOneVersion(ht, FAMILY, 0, 0, 4);
241 
242     if (flushTables) {
243       TEST_UTIL.flush();
244     }
245 
246     // request a bunch of versions including the deleted version. We should
247     // only get back entries for the versions that exist.
248     Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
249     assertEquals(3, kvs.length);
250     checkOneCell(kvs[0], FAMILY, 0, 0, 5);
251     checkOneCell(kvs[1], FAMILY, 0, 0, 3);
252     checkOneCell(kvs[2], FAMILY, 0, 0, 2);
253 
254     ht.close();
255   }
256 
257   private void verifyInsertedValues(HTable ht, byte[] cf) throws IOException {
258     for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
259       for (int colIdx = 0; colIdx < 5; colIdx++) {
260         // ask for versions that exist.
261         Cell[] kvs = getNVersions(ht, cf, rowIdx, colIdx,
262                                       Arrays.asList(5L, 300L, 6L, 80L));
263         assertEquals(4, kvs.length);
264         checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
265         checkOneCell(kvs[1], cf, rowIdx, colIdx, 80);
266         checkOneCell(kvs[2], cf, rowIdx, colIdx, 6);
267         checkOneCell(kvs[3], cf, rowIdx, colIdx, 5);
268 
269         // ask for versions that do not exist.
270         kvs = getNVersions(ht, cf, rowIdx, colIdx,
271                            Arrays.asList(101L, 102L));
272         assertEquals(0, kvs == null? 0: kvs.length);
273 
274         // ask for some versions that exist and some that do not.
275         kvs = getNVersions(ht, cf, rowIdx, colIdx,
276                            Arrays.asList(1L, 300L, 105L, 70L, 115L));
277         assertEquals(3, kvs.length);
278         checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
279         checkOneCell(kvs[1], cf, rowIdx, colIdx, 70);
280         checkOneCell(kvs[2], cf, rowIdx, colIdx, 1);
281       }
282     }
283   }
284 
285   /**
286    * Assert that the passed in KeyValue has expected contents for the
287    * specified row, column & timestamp.
288    */
289   private void checkOneCell(Cell kv, byte[] cf,
290                              int rowIdx, int colIdx, long ts) {
291 
292     String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
293 
294     assertEquals("Row mismatch which checking: " + ctx,
295                  "row:"+ rowIdx, Bytes.toString(CellUtil.cloneRow(kv)));
296 
297     assertEquals("ColumnFamily mismatch while checking: " + ctx,
298                  Bytes.toString(cf), Bytes.toString(CellUtil.cloneFamily(kv)));
299 
300     assertEquals("Column qualifier mismatch while checking: " + ctx,
301                  "column:" + colIdx,
302                   Bytes.toString(CellUtil.cloneQualifier(kv)));
303 
304     assertEquals("Timestamp mismatch while checking: " + ctx,
305                  ts, kv.getTimestamp());
306 
307     assertEquals("Value mismatch while checking: " + ctx,
308                  "value-version-" + ts, Bytes.toString(CellUtil.cloneValue(kv)));
309   }
310 
311   /**
312    * Uses the TimestampFilter on a Get to request a specified list of
313    * versions for the row/column specified by rowIdx & colIdx.
314    *
315    */
316   private  Cell[] getNVersions(HTable ht, byte[] cf, int rowIdx,
317                                    int colIdx, List<Long> versions)
318     throws IOException {
319     byte row[] = Bytes.toBytes("row:" + rowIdx);
320     byte column[] = Bytes.toBytes("column:" + colIdx);
321     Filter filter = new TimestampsFilter(versions);
322     Get get = new Get(row);
323     get.addColumn(cf, column);
324     get.setFilter(filter);
325     get.setMaxVersions();
326     Result result = ht.get(get);
327 
328     return result.rawCells();
329   }
330 
331   /**
332    * Uses the TimestampFilter on a Scan to request a specified list of
333    * versions for the rows from startRowIdx to endRowIdx (both inclusive).
334    */
335   private Result[] scanNVersions(HTable ht, byte[] cf, int startRowIdx,
336                                  int endRowIdx, List<Long> versions)
337     throws IOException {
338     byte startRow[] = Bytes.toBytes("row:" + startRowIdx);
339     byte endRow[] = Bytes.toBytes("row:" + endRowIdx + 1); // exclusive
340     Filter filter = new TimestampsFilter(versions);
341     Scan scan = new Scan(startRow, endRow);
342     scan.setFilter(filter);
343     scan.setMaxVersions();
344     ResultScanner scanner = ht.getScanner(scan);
345     return scanner.next(endRowIdx - startRowIdx + 1);
346   }
347 
348   /**
349    * Insert in specific row/column versions with timestamps
350    * versionStart..versionEnd.
351    */
352   private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
353                             long versionStart, long versionEnd)
354       throws IOException {
355     byte row[] = Bytes.toBytes("row:" + rowIdx);
356     byte column[] = Bytes.toBytes("column:" + colIdx);
357     Put put = new Put(row);
358     put.setDurability(Durability.SKIP_WAL);
359 
360     for (long idx = versionStart; idx <= versionEnd; idx++) {
361       put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
362     }
363 
364     ht.put(put);
365   }
366 
367   /**
368    * For row/column specified by rowIdx/colIdx, delete the cell
369    * corresponding to the specified version.
370    */
371   private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
372                                 int colIdx, long version)
373     throws IOException {
374     byte row[] = Bytes.toBytes("row:" + rowIdx);
375     byte column[] = Bytes.toBytes("column:" + colIdx);
376     Delete del = new Delete(row);
377     del.deleteColumn(cf, column, version);
378     ht.delete(del);
379   }
380 
381 }
382 
383