1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import static org.junit.Assert.*;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.*;
31 import org.apache.hadoop.hbase.filter.Filter;
32 import org.apache.hadoop.hbase.filter.TimestampsFilter;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.junit.After;
35 import org.junit.AfterClass;
36 import org.junit.Before;
37 import org.junit.BeforeClass;
38 import org.junit.Test;
39 import org.junit.experimental.categories.Category;
40
41
42
43
44
45
46 @Category(MediumTests.class)
47 public class TestTimestampsFilter {
48 final Log LOG = LogFactory.getLog(getClass());
49 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
50
51
52
53
54 @BeforeClass
55 public static void setUpBeforeClass() throws Exception {
56 TEST_UTIL.startMiniCluster();
57 }
58
59
60
61
62 @AfterClass
63 public static void tearDownAfterClass() throws Exception {
64 TEST_UTIL.shutdownMiniCluster();
65 }
66
67
68
69
70 @Before
71 public void setUp() throws Exception {
72
73 }
74
75
76
77
78 @After
79 public void tearDown() throws Exception {
80
81 }
82
83
84
85
86
87
88
89
90
91 @Test
92 public void testTimestampsFilter() throws Exception {
93 byte [] TABLE = Bytes.toBytes("testTimestampsFilter");
94 byte [] FAMILY = Bytes.toBytes("event_log");
95 byte [][] FAMILIES = new byte[][] { FAMILY };
96 Cell kvs[];
97
98
99 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
100
101 for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
102 for (int colIdx = 0; colIdx < 5; colIdx++) {
103
104 putNVersions(ht, FAMILY, rowIdx, colIdx, 201, 300);
105
106 putNVersions(ht, FAMILY, rowIdx, colIdx, 1, 100);
107 }
108 }
109
110
111 verifyInsertedValues(ht, FAMILY);
112
113 TEST_UTIL.flush();
114
115
116 verifyInsertedValues(ht, FAMILY);
117
118
119
120 for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
121 for (int colIdx = 0; colIdx < 5; colIdx++) {
122 putNVersions(ht, FAMILY, rowIdx, colIdx, 301, 400);
123 putNVersions(ht, FAMILY, rowIdx, colIdx, 101, 200);
124 }
125 }
126
127 for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
128 for (int colIdx = 0; colIdx < 5; colIdx++) {
129 kvs = getNVersions(ht, FAMILY, rowIdx, colIdx,
130 Arrays.asList(505L, 5L, 105L, 305L, 205L));
131 assertEquals(4, kvs.length);
132 checkOneCell(kvs[0], FAMILY, rowIdx, colIdx, 305);
133 checkOneCell(kvs[1], FAMILY, rowIdx, colIdx, 205);
134 checkOneCell(kvs[2], FAMILY, rowIdx, colIdx, 105);
135 checkOneCell(kvs[3], FAMILY, rowIdx, colIdx, 5);
136 }
137 }
138
139
140
141 kvs = getNVersions(ht, FAMILY, 2, 2, new ArrayList<Long>());
142 assertEquals(0, kvs == null? 0: kvs.length);
143
144
145
146
147
148 Result[] results = scanNVersions(ht, FAMILY, 0, 4,
149 Arrays.asList(6L, 106L, 306L));
150 assertEquals("# of rows returned from scan", 5, results.length);
151 for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
152 kvs = results[rowIdx].rawCells();
153
154
155 assertEquals("Number of KeyValues in result for row:" + rowIdx,
156 3*5, kvs.length);
157 for (int colIdx = 0; colIdx < 5; colIdx++) {
158 int offset = colIdx * 3;
159 checkOneCell(kvs[offset + 0], FAMILY, rowIdx, colIdx, 306);
160 checkOneCell(kvs[offset + 1], FAMILY, rowIdx, colIdx, 106);
161 checkOneCell(kvs[offset + 2], FAMILY, rowIdx, colIdx, 6);
162 }
163 }
164 ht.close();
165 }
166
167 @Test
168 public void testMultiColumns() throws Exception {
169 byte [] TABLE = Bytes.toBytes("testTimestampsFilterMultiColumns");
170 byte [] FAMILY = Bytes.toBytes("event_log");
171 byte [][] FAMILIES = new byte[][] { FAMILY };
172
173
174 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
175
176 Put p = new Put(Bytes.toBytes("row"));
177 p.add(FAMILY, Bytes.toBytes("column0"), 3, Bytes.toBytes("value0-3"));
178 p.add(FAMILY, Bytes.toBytes("column1"), 3, Bytes.toBytes("value1-3"));
179 p.add(FAMILY, Bytes.toBytes("column2"), 1, Bytes.toBytes("value2-1"));
180 p.add(FAMILY, Bytes.toBytes("column2"), 2, Bytes.toBytes("value2-2"));
181 p.add(FAMILY, Bytes.toBytes("column2"), 3, Bytes.toBytes("value2-3"));
182 p.add(FAMILY, Bytes.toBytes("column3"), 2, Bytes.toBytes("value3-2"));
183 p.add(FAMILY, Bytes.toBytes("column4"), 1, Bytes.toBytes("value4-1"));
184 p.add(FAMILY, Bytes.toBytes("column4"), 2, Bytes.toBytes("value4-2"));
185 p.add(FAMILY, Bytes.toBytes("column4"), 3, Bytes.toBytes("value4-3"));
186 ht.put(p);
187
188 ArrayList<Long> timestamps = new ArrayList<Long>();
189 timestamps.add(new Long(3));
190 TimestampsFilter filter = new TimestampsFilter(timestamps);
191
192 Get g = new Get(Bytes.toBytes("row"));
193 g.setFilter(filter);
194 g.setMaxVersions();
195 g.addColumn(FAMILY, Bytes.toBytes("column2"));
196 g.addColumn(FAMILY, Bytes.toBytes("column4"));
197
198 Result result = ht.get(g);
199 for (Cell kv : result.listCells()) {
200 System.out.println("found row " + Bytes.toString(CellUtil.cloneRow(kv)) +
201 ", column " + Bytes.toString(CellUtil.cloneQualifier(kv)) + ", value "
202 + Bytes.toString(CellUtil.cloneValue(kv)));
203 }
204
205 assertEquals(result.listCells().size(), 2);
206 assertTrue(CellUtil.matchingValue(result.listCells().get(0), Bytes.toBytes("value2-3")));
207 assertTrue(CellUtil.matchingValue(result.listCells().get(1), Bytes.toBytes("value4-3")));
208
209 ht.close();
210 }
211
212
213
214
215
216
217 @Test
218 public void testWithVersionDeletes() throws Exception {
219
220
221 testWithVersionDeletes(false);
222
223
224 testWithVersionDeletes(true);
225 }
226
227 private void testWithVersionDeletes(boolean flushTables) throws IOException {
228 byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" +
229 (flushTables ? "flush" : "noflush"));
230 byte [] FAMILY = Bytes.toBytes("event_log");
231 byte [][] FAMILIES = new byte[][] { FAMILY };
232
233
234 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
235
236
237 putNVersions(ht, FAMILY, 0, 0, 1, 5);
238
239
240 deleteOneVersion(ht, FAMILY, 0, 0, 4);
241
242 if (flushTables) {
243 TEST_UTIL.flush();
244 }
245
246
247
248 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
249 assertEquals(3, kvs.length);
250 checkOneCell(kvs[0], FAMILY, 0, 0, 5);
251 checkOneCell(kvs[1], FAMILY, 0, 0, 3);
252 checkOneCell(kvs[2], FAMILY, 0, 0, 2);
253
254 ht.close();
255 }
256
257 private void verifyInsertedValues(HTable ht, byte[] cf) throws IOException {
258 for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
259 for (int colIdx = 0; colIdx < 5; colIdx++) {
260
261 Cell[] kvs = getNVersions(ht, cf, rowIdx, colIdx,
262 Arrays.asList(5L, 300L, 6L, 80L));
263 assertEquals(4, kvs.length);
264 checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
265 checkOneCell(kvs[1], cf, rowIdx, colIdx, 80);
266 checkOneCell(kvs[2], cf, rowIdx, colIdx, 6);
267 checkOneCell(kvs[3], cf, rowIdx, colIdx, 5);
268
269
270 kvs = getNVersions(ht, cf, rowIdx, colIdx,
271 Arrays.asList(101L, 102L));
272 assertEquals(0, kvs == null? 0: kvs.length);
273
274
275 kvs = getNVersions(ht, cf, rowIdx, colIdx,
276 Arrays.asList(1L, 300L, 105L, 70L, 115L));
277 assertEquals(3, kvs.length);
278 checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
279 checkOneCell(kvs[1], cf, rowIdx, colIdx, 70);
280 checkOneCell(kvs[2], cf, rowIdx, colIdx, 1);
281 }
282 }
283 }
284
285
286
287
288
289 private void checkOneCell(Cell kv, byte[] cf,
290 int rowIdx, int colIdx, long ts) {
291
292 String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
293
294 assertEquals("Row mismatch which checking: " + ctx,
295 "row:"+ rowIdx, Bytes.toString(CellUtil.cloneRow(kv)));
296
297 assertEquals("ColumnFamily mismatch while checking: " + ctx,
298 Bytes.toString(cf), Bytes.toString(CellUtil.cloneFamily(kv)));
299
300 assertEquals("Column qualifier mismatch while checking: " + ctx,
301 "column:" + colIdx,
302 Bytes.toString(CellUtil.cloneQualifier(kv)));
303
304 assertEquals("Timestamp mismatch while checking: " + ctx,
305 ts, kv.getTimestamp());
306
307 assertEquals("Value mismatch while checking: " + ctx,
308 "value-version-" + ts, Bytes.toString(CellUtil.cloneValue(kv)));
309 }
310
311
312
313
314
315
316 private Cell[] getNVersions(HTable ht, byte[] cf, int rowIdx,
317 int colIdx, List<Long> versions)
318 throws IOException {
319 byte row[] = Bytes.toBytes("row:" + rowIdx);
320 byte column[] = Bytes.toBytes("column:" + colIdx);
321 Filter filter = new TimestampsFilter(versions);
322 Get get = new Get(row);
323 get.addColumn(cf, column);
324 get.setFilter(filter);
325 get.setMaxVersions();
326 Result result = ht.get(get);
327
328 return result.rawCells();
329 }
330
331
332
333
334
335 private Result[] scanNVersions(HTable ht, byte[] cf, int startRowIdx,
336 int endRowIdx, List<Long> versions)
337 throws IOException {
338 byte startRow[] = Bytes.toBytes("row:" + startRowIdx);
339 byte endRow[] = Bytes.toBytes("row:" + endRowIdx + 1);
340 Filter filter = new TimestampsFilter(versions);
341 Scan scan = new Scan(startRow, endRow);
342 scan.setFilter(filter);
343 scan.setMaxVersions();
344 ResultScanner scanner = ht.getScanner(scan);
345 return scanner.next(endRowIdx - startRowIdx + 1);
346 }
347
348
349
350
351
352 private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
353 long versionStart, long versionEnd)
354 throws IOException {
355 byte row[] = Bytes.toBytes("row:" + rowIdx);
356 byte column[] = Bytes.toBytes("column:" + colIdx);
357 Put put = new Put(row);
358 put.setDurability(Durability.SKIP_WAL);
359
360 for (long idx = versionStart; idx <= versionEnd; idx++) {
361 put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
362 }
363
364 ht.put(put);
365 }
366
367
368
369
370
371 private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
372 int colIdx, long version)
373 throws IOException {
374 byte row[] = Bytes.toBytes("row:" + rowIdx);
375 byte column[] = Bytes.toBytes("column:" + colIdx);
376 Delete del = new Delete(row);
377 del.deleteColumn(cf, column, version);
378 ht.delete(del);
379 }
380
381 }
382
383