View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.junit.Assert.*;
22  
23  import java.io.IOException;
24  import java.util.Arrays;
25  import java.util.Collections;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.*;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.junit.After;
33  import org.junit.AfterClass;
34  import org.junit.Before;
35  import org.junit.BeforeClass;
36  import org.junit.Test;
37  import org.junit.experimental.categories.Category;
38  
39  /**
40   * Run tests related to {@link TimestampsFilter} using HBase client APIs.
41   * Sets up the HBase mini cluster once at start. Each creates a table
42   * named for the method and does its stuff against that.
43   */
44  @Category(LargeTests.class)
45  public class TestMultipleTimestamps {
46    final Log LOG = LogFactory.getLog(getClass());
47    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
48  
49    /**
50     * @throws java.lang.Exception
51     */
52    @BeforeClass
53    public static void setUpBeforeClass() throws Exception {
54      TEST_UTIL.startMiniCluster();
55    }
56  
57    /**
58     * @throws java.lang.Exception
59     */
60    @AfterClass
61    public static void tearDownAfterClass() throws Exception {
62      TEST_UTIL.shutdownMiniCluster();
63    }
64  
65    /**
66     * @throws java.lang.Exception
67     */
68    @Before
69    public void setUp() throws Exception {
70      // Nothing to do.
71    }
72  
73    /**
74     * @throws java.lang.Exception
75     */
76    @After
77    public void tearDown() throws Exception {
78      // Nothing to do.
79    }
80  
81    @Test
82    public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException {
83      TableName TABLE =
84          TableName.valueOf("testReseeksWithOne" +
85              "ColumnMiltipleTimestamps");
86      byte [] FAMILY = Bytes.toBytes("event_log");
87      byte [][] FAMILIES = new byte[][] { FAMILY };
88  
89      // create table; set versions to max...
90      HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
91  
92      Integer[] putRows = new Integer[] {1, 3, 5, 7};
93      Integer[] putColumns = new Integer[] { 1, 3, 5};
94      Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
95  
96      Integer[] scanRows = new Integer[] {3, 5};
97      Integer[] scanColumns = new Integer[] {3};
98      Long[] scanTimestamps = new Long[] {3L, 4L};
99      int scanMaxVersions = 2;
100 
101     put(ht, FAMILY, putRows, putColumns, putTimestamps);
102 
103     TEST_UTIL.flush(TABLE);
104 
105     ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
106         scanTimestamps, scanMaxVersions);
107 
108     Cell [] kvs;
109 
110     kvs = scanner.next().rawCells();
111     assertEquals(2, kvs.length);
112     checkOneCell(kvs[0], FAMILY, 3, 3, 4);
113     checkOneCell(kvs[1], FAMILY, 3, 3, 3);
114     kvs = scanner.next().rawCells();
115     assertEquals(2, kvs.length);
116     checkOneCell(kvs[0], FAMILY, 5, 3, 4);
117     checkOneCell(kvs[1], FAMILY, 5, 3, 3);
118 
119     ht.close();
120   }
121 
122   @Test
123   public void testReseeksWithMultipleColumnOneTimestamp() throws IOException {
124     LOG.info("testReseeksWithMultipleColumnOneTimestamp");
125     TableName TABLE =
126         TableName.valueOf("testReseeksWithMultiple" +
127             "ColumnOneTimestamps");
128     byte [] FAMILY = Bytes.toBytes("event_log");
129     byte [][] FAMILIES = new byte[][] { FAMILY };
130 
131     // create table; set versions to max...
132     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
133 
134     Integer[] putRows = new Integer[] {1, 3, 5, 7};
135     Integer[] putColumns = new Integer[] { 1, 3, 5};
136     Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
137 
138     Integer[] scanRows = new Integer[] {3, 5};
139     Integer[] scanColumns = new Integer[] {3,4};
140     Long[] scanTimestamps = new Long[] {3L};
141     int scanMaxVersions = 2;
142 
143     put(ht, FAMILY, putRows, putColumns, putTimestamps);
144 
145     TEST_UTIL.flush(TABLE);
146 
147     ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
148         scanTimestamps, scanMaxVersions);
149 
150     Cell[] kvs;
151 
152     kvs = scanner.next().rawCells();
153     assertEquals(1, kvs.length);
154     checkOneCell(kvs[0], FAMILY, 3, 3, 3);
155     kvs = scanner.next().rawCells();
156     assertEquals(1, kvs.length);
157     checkOneCell(kvs[0], FAMILY, 5, 3, 3);
158 
159     ht.close();
160   }
161 
162   @Test
163   public void testReseeksWithMultipleColumnMultipleTimestamp() throws
164   IOException {
165     LOG.info("testReseeksWithMultipleColumnMultipleTimestamp");
166 
167     TableName TABLE =
168         TableName.valueOf("testReseeksWithMultipleColumnMiltipleTimestamps");
169     byte [] FAMILY = Bytes.toBytes("event_log");
170     byte [][] FAMILIES = new byte[][] { FAMILY };
171 
172     // create table; set versions to max...
173     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
174 
175     Integer[] putRows = new Integer[] {1, 3, 5, 7};
176     Integer[] putColumns = new Integer[] { 1, 3, 5};
177     Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
178 
179     Integer[] scanRows = new Integer[] {5, 7};
180     Integer[] scanColumns = new Integer[] {3, 4, 5};
181     Long[] scanTimestamps = new Long[] {2l, 3L};
182     int scanMaxVersions = 2;
183 
184     put(ht, FAMILY, putRows, putColumns, putTimestamps);
185 
186     TEST_UTIL.flush(TABLE);
187     Scan scan = new Scan();
188     scan.setMaxVersions(10);
189     ResultScanner scanner = ht.getScanner(scan);
190     while (true) {
191       Result r = scanner.next();
192       if (r == null) break;
193       LOG.info("r=" + r);
194     }
195     scanner = scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions);
196 
197     Cell[] kvs;
198 
199     // This looks like wrong answer.  Should be 2.  Even then we are returning wrong result,
200     // timestamps that are 3 whereas should be 2 since min is inclusive.
201     kvs = scanner.next().rawCells();
202     assertEquals(4, kvs.length);
203     checkOneCell(kvs[0], FAMILY, 5, 3, 3);
204     checkOneCell(kvs[1], FAMILY, 5, 3, 2);
205     checkOneCell(kvs[2], FAMILY, 5, 5, 3);
206     checkOneCell(kvs[3], FAMILY, 5, 5, 2);
207     kvs = scanner.next().rawCells();
208     assertEquals(4, kvs.length);
209     checkOneCell(kvs[0], FAMILY, 7, 3, 3);
210     checkOneCell(kvs[1], FAMILY, 7, 3, 2);
211     checkOneCell(kvs[2], FAMILY, 7, 5, 3);
212     checkOneCell(kvs[3], FAMILY, 7, 5, 2);
213 
214     ht.close();
215   }
216 
217   @Test
218   public void testReseeksWithMultipleFiles() throws IOException {
219     LOG.info("testReseeksWithMultipleFiles");
220     TableName TABLE =
221         TableName.valueOf("testReseeksWithMultipleFiles");
222     byte [] FAMILY = Bytes.toBytes("event_log");
223     byte [][] FAMILIES = new byte[][] { FAMILY };
224 
225     // create table; set versions to max...
226     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
227 
228     Integer[] putRows1 = new Integer[] {1, 2, 3};
229     Integer[] putColumns1 = new Integer[] { 2, 5, 6};
230     Long[] putTimestamps1 = new Long[] {1L, 2L, 5L};
231 
232     Integer[] putRows2 = new Integer[] {6, 7};
233     Integer[] putColumns2 = new Integer[] {3, 6};
234     Long[] putTimestamps2 = new Long[] {4L, 5L};
235 
236     Integer[] putRows3 = new Integer[] {2, 3, 5};
237     Integer[] putColumns3 = new Integer[] {1, 2, 3};
238     Long[] putTimestamps3 = new Long[] {4L,8L};
239 
240 
241     Integer[] scanRows = new Integer[] {3, 5, 7};
242     Integer[] scanColumns = new Integer[] {3, 4, 5};
243     Long[] scanTimestamps = new Long[] {2l, 4L};
244     int scanMaxVersions = 5;
245 
246     put(ht, FAMILY, putRows1, putColumns1, putTimestamps1);
247     TEST_UTIL.flush(TABLE);
248     put(ht, FAMILY, putRows2, putColumns2, putTimestamps2);
249     TEST_UTIL.flush(TABLE);
250     put(ht, FAMILY, putRows3, putColumns3, putTimestamps3);
251 
252     ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
253         scanTimestamps, scanMaxVersions);
254 
255     Cell[] kvs;
256 
257     kvs = scanner.next().rawCells();
258     assertEquals(2, kvs.length);
259     checkOneCell(kvs[0], FAMILY, 3, 3, 4);
260     checkOneCell(kvs[1], FAMILY, 3, 5, 2);
261 
262     kvs = scanner.next().rawCells();
263     assertEquals(1, kvs.length);
264     checkOneCell(kvs[0], FAMILY, 5, 3, 4);
265 
266     kvs = scanner.next().rawCells();
267     assertEquals(1, kvs.length);
268     checkOneCell(kvs[0], FAMILY, 6, 3, 4);
269 
270     kvs = scanner.next().rawCells();
271     assertEquals(1, kvs.length);
272     checkOneCell(kvs[0], FAMILY, 7, 3, 4);
273 
274     ht.close();
275   }
276 
277   @Test
278   public void testWithVersionDeletes() throws Exception {
279 
280     // first test from memstore (without flushing).
281     testWithVersionDeletes(false);
282 
283     // run same test against HFiles (by forcing a flush).
284     testWithVersionDeletes(true);
285   }
286 
287   public void testWithVersionDeletes(boolean flushTables) throws IOException {
288     LOG.info("testWithVersionDeletes_"+ (flushTables ? "flush" : "noflush"));
289     TableName TABLE =
290         TableName.valueOf("testWithVersionDeletes_" + (flushTables ?
291             "flush" : "noflush"));
292     byte [] FAMILY = Bytes.toBytes("event_log");
293     byte [][] FAMILIES = new byte[][] { FAMILY };
294 
295     // create table; set versions to max...
296     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
297 
298     // For row:0, col:0: insert versions 1 through 5.
299     putNVersions(ht, FAMILY, 0, 0, 1, 5);
300 
301     if (flushTables) {
302       TEST_UTIL.flush(TABLE);
303     }
304 
305     // delete version 4.
306     deleteOneVersion(ht, FAMILY, 0, 0, 4);
307 
308     // request a bunch of versions including the deleted version. We should
309     // only get back entries for the versions that exist.
310     Cell kvs[] = getNVersions(ht, FAMILY, 0, 0,
311         Arrays.asList(2L, 3L, 4L, 5L));
312     assertEquals(3, kvs.length);
313     checkOneCell(kvs[0], FAMILY, 0, 0, 5);
314     checkOneCell(kvs[1], FAMILY, 0, 0, 3);
315     checkOneCell(kvs[2], FAMILY, 0, 0, 2);
316 
317     ht.close();
318   }
319 
320   @Test
321   public void testWithMultipleVersionDeletes() throws IOException {
322     LOG.info("testWithMultipleVersionDeletes");
323 
324     TableName TABLE =
325         TableName.valueOf("testWithMultipleVersionDeletes");
326     byte [] FAMILY = Bytes.toBytes("event_log");
327     byte [][] FAMILIES = new byte[][] { FAMILY };
328 
329     // create table; set versions to max...
330     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
331 
332     // For row:0, col:0: insert versions 1 through 5.
333     putNVersions(ht, FAMILY, 0, 0, 1, 5);
334 
335     TEST_UTIL.flush(TABLE);
336 
337     // delete all versions before 4.
338     deleteAllVersionsBefore(ht, FAMILY, 0, 0, 4);
339 
340     // request a bunch of versions including the deleted version. We should
341     // only get back entries for the versions that exist.
342     Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
343     assertEquals(0, kvs.length);
344 
345     ht.close();
346   }
347 
348   @Test
349   public void testWithColumnDeletes() throws IOException {
350     TableName TABLE =
351         TableName.valueOf("testWithColumnDeletes");
352     byte [] FAMILY = Bytes.toBytes("event_log");
353     byte [][] FAMILIES = new byte[][] { FAMILY };
354 
355     // create table; set versions to max...
356     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
357 
358     // For row:0, col:0: insert versions 1 through 5.
359     putNVersions(ht, FAMILY, 0, 0, 1, 5);
360 
361     TEST_UTIL.flush(TABLE);
362 
363     // delete all versions before 4.
364     deleteColumn(ht, FAMILY, 0, 0);
365 
366     // request a bunch of versions including the deleted version. We should
367     // only get back entries for the versions that exist.
368     Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
369     assertEquals(0, kvs.length);
370 
371     ht.close();
372   }
373 
374   @Test
375   public void testWithFamilyDeletes() throws IOException {
376     TableName TABLE =
377         TableName.valueOf("testWithFamilyDeletes");
378     byte [] FAMILY = Bytes.toBytes("event_log");
379     byte [][] FAMILIES = new byte[][] { FAMILY };
380 
381     // create table; set versions to max...
382     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
383 
384     // For row:0, col:0: insert versions 1 through 5.
385     putNVersions(ht, FAMILY, 0, 0, 1, 5);
386 
387     TEST_UTIL.flush(TABLE);
388 
389     // delete all versions before 4.
390     deleteFamily(ht, FAMILY, 0);
391 
392     // request a bunch of versions including the deleted version. We should
393     // only get back entries for the versions that exist.
394     Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
395     assertEquals(0, kvs.length);
396 
397     ht.close();
398   }
399 
400   /**
401    * Assert that the passed in KeyValue has expected contents for the
402    * specified row, column & timestamp.
403    */
404   private void checkOneCell(Cell kv, byte[] cf,
405       int rowIdx, int colIdx, long ts) {
406 
407     String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
408 
409     assertEquals("Row mismatch which checking: " + ctx,
410         "row:"+ rowIdx, Bytes.toString(CellUtil.cloneRow(kv)));
411 
412     assertEquals("ColumnFamily mismatch while checking: " + ctx,
413         Bytes.toString(cf), Bytes.toString(CellUtil.cloneFamily(kv)));
414 
415     assertEquals("Column qualifier mismatch while checking: " + ctx,
416         "column:" + colIdx,
417         Bytes.toString(CellUtil.cloneQualifier(kv)));
418 
419     assertEquals("Timestamp mismatch while checking: " + ctx,
420         ts, kv.getTimestamp());
421 
422     assertEquals("Value mismatch while checking: " + ctx,
423         "value-version-" + ts, Bytes.toString(CellUtil.cloneValue(kv)));
424   }
425 
426   /**
427    * Uses the TimestampFilter on a Get to request a specified list of
428    * versions for the row/column specified by rowIdx & colIdx.
429    *
430    */
431   private  Cell[] getNVersions(HTable ht, byte[] cf, int rowIdx,
432       int colIdx, List<Long> versions)
433   throws IOException {
434     byte row[] = Bytes.toBytes("row:" + rowIdx);
435     byte column[] = Bytes.toBytes("column:" + colIdx);
436     Get get = new Get(row);
437     get.addColumn(cf, column);
438     get.setMaxVersions();
439     get.setTimeRange(Collections.min(versions), Collections.max(versions)+1);
440     Result result = ht.get(get);
441 
442     return result.rawCells();
443   }
444 
445   private  ResultScanner scan(HTable ht, byte[] cf,
446       Integer[] rowIndexes, Integer[] columnIndexes,
447       Long[] versions, int maxVersions)
448   throws IOException {
449     Arrays.asList(rowIndexes);
450     byte startRow[] = Bytes.toBytes("row:" +
451         Collections.min( Arrays.asList(rowIndexes)));
452     byte endRow[] = Bytes.toBytes("row:" +
453         Collections.max( Arrays.asList(rowIndexes))+1);
454     Scan scan = new Scan(startRow, endRow);
455     for (Integer colIdx: columnIndexes) {
456       byte column[] = Bytes.toBytes("column:" + colIdx);
457       scan.addColumn(cf, column);
458     }
459     scan.setMaxVersions(maxVersions);
460     scan.setTimeRange(Collections.min(Arrays.asList(versions)),
461         Collections.max(Arrays.asList(versions))+1);
462     ResultScanner scanner = ht.getScanner(scan);
463     return scanner;
464   }
465 
466   private void put(HTable ht, byte[] cf, Integer[] rowIndexes,
467       Integer[] columnIndexes, Long[] versions)
468   throws IOException {
469     for (int rowIdx: rowIndexes) {
470       byte row[] = Bytes.toBytes("row:" + rowIdx);
471       Put put = new Put(row);
472       put.setDurability(Durability.SKIP_WAL);
473       for(int colIdx: columnIndexes) {
474         byte column[] = Bytes.toBytes("column:" + colIdx);
475         for (long version: versions) {
476           put.add(cf, column, version, Bytes.toBytes("value-version-" +
477               version));
478         }
479       }
480       ht.put(put);
481     }
482   }
483 
484   /**
485    * Insert in specific row/column versions with timestamps
486    * versionStart..versionEnd.
487    */
488   private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
489       long versionStart, long versionEnd)
490   throws IOException {
491     byte row[] = Bytes.toBytes("row:" + rowIdx);
492     byte column[] = Bytes.toBytes("column:" + colIdx);
493     Put put = new Put(row);
494     put.setDurability(Durability.SKIP_WAL);
495 
496     for (long idx = versionStart; idx <= versionEnd; idx++) {
497       put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
498     }
499 
500     ht.put(put);
501   }
502 
503   /**
504    * For row/column specified by rowIdx/colIdx, delete the cell
505    * corresponding to the specified version.
506    */
507   private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
508       int colIdx, long version)
509   throws IOException {
510     byte row[] = Bytes.toBytes("row:" + rowIdx);
511     byte column[] = Bytes.toBytes("column:" + colIdx);
512     Delete del = new Delete(row);
513     del.deleteColumn(cf, column, version);
514     ht.delete(del);
515   }
516 
517   /**
518    * For row/column specified by rowIdx/colIdx, delete all cells
519    * preceeding the specified version.
520    */
521   private void deleteAllVersionsBefore(HTable ht, byte[] cf, int rowIdx,
522       int colIdx, long version)
523   throws IOException {
524     byte row[] = Bytes.toBytes("row:" + rowIdx);
525     byte column[] = Bytes.toBytes("column:" + colIdx);
526     Delete del = new Delete(row);
527     del.deleteColumns(cf, column, version);
528     ht.delete(del);
529   }
530 
531   private void deleteColumn(HTable ht, byte[] cf, int rowIdx, int colIdx) throws IOException {
532     byte row[] = Bytes.toBytes("row:" + rowIdx);
533     byte column[] = Bytes.toBytes("column:" + colIdx);
534     Delete del = new Delete(row);
535     del.deleteColumns(cf, column);
536     ht.delete(del);
537   }
538 
539   private void deleteFamily(HTable ht, byte[] cf, int rowIdx) throws IOException {
540     byte row[] = Bytes.toBytes("row:" + rowIdx);
541     Delete del = new Delete(row);
542     del.deleteFamily(cf);
543     ht.delete(del);
544   }
545 
546 }
547 
548