1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import static org.junit.Assert.*;
22
23 import java.io.IOException;
24 import java.util.Arrays;
25 import java.util.Collections;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.*;
31 import org.apache.hadoop.hbase.util.Bytes;
32 import org.junit.After;
33 import org.junit.AfterClass;
34 import org.junit.Before;
35 import org.junit.BeforeClass;
36 import org.junit.Test;
37 import org.junit.experimental.categories.Category;
38
39
40
41
42
43
44 @Category(LargeTests.class)
45 public class TestMultipleTimestamps {
46 final Log LOG = LogFactory.getLog(getClass());
47 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
48
49
50
51
52 @BeforeClass
53 public static void setUpBeforeClass() throws Exception {
54 TEST_UTIL.startMiniCluster();
55 }
56
57
58
59
60 @AfterClass
61 public static void tearDownAfterClass() throws Exception {
62 TEST_UTIL.shutdownMiniCluster();
63 }
64
65
66
67
68 @Before
69 public void setUp() throws Exception {
70
71 }
72
73
74
75
76 @After
77 public void tearDown() throws Exception {
78
79 }
80
81 @Test
82 public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException {
83 TableName TABLE =
84 TableName.valueOf("testReseeksWithOne" +
85 "ColumnMiltipleTimestamps");
86 byte [] FAMILY = Bytes.toBytes("event_log");
87 byte [][] FAMILIES = new byte[][] { FAMILY };
88
89
90 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
91
92 Integer[] putRows = new Integer[] {1, 3, 5, 7};
93 Integer[] putColumns = new Integer[] { 1, 3, 5};
94 Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
95
96 Integer[] scanRows = new Integer[] {3, 5};
97 Integer[] scanColumns = new Integer[] {3};
98 Long[] scanTimestamps = new Long[] {3L, 4L};
99 int scanMaxVersions = 2;
100
101 put(ht, FAMILY, putRows, putColumns, putTimestamps);
102
103 TEST_UTIL.flush(TABLE);
104
105 ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
106 scanTimestamps, scanMaxVersions);
107
108 Cell [] kvs;
109
110 kvs = scanner.next().rawCells();
111 assertEquals(2, kvs.length);
112 checkOneCell(kvs[0], FAMILY, 3, 3, 4);
113 checkOneCell(kvs[1], FAMILY, 3, 3, 3);
114 kvs = scanner.next().rawCells();
115 assertEquals(2, kvs.length);
116 checkOneCell(kvs[0], FAMILY, 5, 3, 4);
117 checkOneCell(kvs[1], FAMILY, 5, 3, 3);
118
119 ht.close();
120 }
121
122 @Test
123 public void testReseeksWithMultipleColumnOneTimestamp() throws IOException {
124 LOG.info("testReseeksWithMultipleColumnOneTimestamp");
125 TableName TABLE =
126 TableName.valueOf("testReseeksWithMultiple" +
127 "ColumnOneTimestamps");
128 byte [] FAMILY = Bytes.toBytes("event_log");
129 byte [][] FAMILIES = new byte[][] { FAMILY };
130
131
132 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
133
134 Integer[] putRows = new Integer[] {1, 3, 5, 7};
135 Integer[] putColumns = new Integer[] { 1, 3, 5};
136 Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
137
138 Integer[] scanRows = new Integer[] {3, 5};
139 Integer[] scanColumns = new Integer[] {3,4};
140 Long[] scanTimestamps = new Long[] {3L};
141 int scanMaxVersions = 2;
142
143 put(ht, FAMILY, putRows, putColumns, putTimestamps);
144
145 TEST_UTIL.flush(TABLE);
146
147 ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
148 scanTimestamps, scanMaxVersions);
149
150 Cell[] kvs;
151
152 kvs = scanner.next().rawCells();
153 assertEquals(1, kvs.length);
154 checkOneCell(kvs[0], FAMILY, 3, 3, 3);
155 kvs = scanner.next().rawCells();
156 assertEquals(1, kvs.length);
157 checkOneCell(kvs[0], FAMILY, 5, 3, 3);
158
159 ht.close();
160 }
161
162 @Test
163 public void testReseeksWithMultipleColumnMultipleTimestamp() throws
164 IOException {
165 LOG.info("testReseeksWithMultipleColumnMultipleTimestamp");
166
167 TableName TABLE =
168 TableName.valueOf("testReseeksWithMultipleColumnMiltipleTimestamps");
169 byte [] FAMILY = Bytes.toBytes("event_log");
170 byte [][] FAMILIES = new byte[][] { FAMILY };
171
172
173 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
174
175 Integer[] putRows = new Integer[] {1, 3, 5, 7};
176 Integer[] putColumns = new Integer[] { 1, 3, 5};
177 Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
178
179 Integer[] scanRows = new Integer[] {5, 7};
180 Integer[] scanColumns = new Integer[] {3, 4, 5};
181 Long[] scanTimestamps = new Long[] {2l, 3L};
182 int scanMaxVersions = 2;
183
184 put(ht, FAMILY, putRows, putColumns, putTimestamps);
185
186 TEST_UTIL.flush(TABLE);
187 Scan scan = new Scan();
188 scan.setMaxVersions(10);
189 ResultScanner scanner = ht.getScanner(scan);
190 while (true) {
191 Result r = scanner.next();
192 if (r == null) break;
193 LOG.info("r=" + r);
194 }
195 scanner = scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions);
196
197 Cell[] kvs;
198
199
200
201 kvs = scanner.next().rawCells();
202 assertEquals(4, kvs.length);
203 checkOneCell(kvs[0], FAMILY, 5, 3, 3);
204 checkOneCell(kvs[1], FAMILY, 5, 3, 2);
205 checkOneCell(kvs[2], FAMILY, 5, 5, 3);
206 checkOneCell(kvs[3], FAMILY, 5, 5, 2);
207 kvs = scanner.next().rawCells();
208 assertEquals(4, kvs.length);
209 checkOneCell(kvs[0], FAMILY, 7, 3, 3);
210 checkOneCell(kvs[1], FAMILY, 7, 3, 2);
211 checkOneCell(kvs[2], FAMILY, 7, 5, 3);
212 checkOneCell(kvs[3], FAMILY, 7, 5, 2);
213
214 ht.close();
215 }
216
217 @Test
218 public void testReseeksWithMultipleFiles() throws IOException {
219 LOG.info("testReseeksWithMultipleFiles");
220 TableName TABLE =
221 TableName.valueOf("testReseeksWithMultipleFiles");
222 byte [] FAMILY = Bytes.toBytes("event_log");
223 byte [][] FAMILIES = new byte[][] { FAMILY };
224
225
226 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
227
228 Integer[] putRows1 = new Integer[] {1, 2, 3};
229 Integer[] putColumns1 = new Integer[] { 2, 5, 6};
230 Long[] putTimestamps1 = new Long[] {1L, 2L, 5L};
231
232 Integer[] putRows2 = new Integer[] {6, 7};
233 Integer[] putColumns2 = new Integer[] {3, 6};
234 Long[] putTimestamps2 = new Long[] {4L, 5L};
235
236 Integer[] putRows3 = new Integer[] {2, 3, 5};
237 Integer[] putColumns3 = new Integer[] {1, 2, 3};
238 Long[] putTimestamps3 = new Long[] {4L,8L};
239
240
241 Integer[] scanRows = new Integer[] {3, 5, 7};
242 Integer[] scanColumns = new Integer[] {3, 4, 5};
243 Long[] scanTimestamps = new Long[] {2l, 4L};
244 int scanMaxVersions = 5;
245
246 put(ht, FAMILY, putRows1, putColumns1, putTimestamps1);
247 TEST_UTIL.flush(TABLE);
248 put(ht, FAMILY, putRows2, putColumns2, putTimestamps2);
249 TEST_UTIL.flush(TABLE);
250 put(ht, FAMILY, putRows3, putColumns3, putTimestamps3);
251
252 ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
253 scanTimestamps, scanMaxVersions);
254
255 Cell[] kvs;
256
257 kvs = scanner.next().rawCells();
258 assertEquals(2, kvs.length);
259 checkOneCell(kvs[0], FAMILY, 3, 3, 4);
260 checkOneCell(kvs[1], FAMILY, 3, 5, 2);
261
262 kvs = scanner.next().rawCells();
263 assertEquals(1, kvs.length);
264 checkOneCell(kvs[0], FAMILY, 5, 3, 4);
265
266 kvs = scanner.next().rawCells();
267 assertEquals(1, kvs.length);
268 checkOneCell(kvs[0], FAMILY, 6, 3, 4);
269
270 kvs = scanner.next().rawCells();
271 assertEquals(1, kvs.length);
272 checkOneCell(kvs[0], FAMILY, 7, 3, 4);
273
274 ht.close();
275 }
276
277 @Test
278 public void testWithVersionDeletes() throws Exception {
279
280
281 testWithVersionDeletes(false);
282
283
284 testWithVersionDeletes(true);
285 }
286
287 public void testWithVersionDeletes(boolean flushTables) throws IOException {
288 LOG.info("testWithVersionDeletes_"+ (flushTables ? "flush" : "noflush"));
289 TableName TABLE =
290 TableName.valueOf("testWithVersionDeletes_" + (flushTables ?
291 "flush" : "noflush"));
292 byte [] FAMILY = Bytes.toBytes("event_log");
293 byte [][] FAMILIES = new byte[][] { FAMILY };
294
295
296 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
297
298
299 putNVersions(ht, FAMILY, 0, 0, 1, 5);
300
301 if (flushTables) {
302 TEST_UTIL.flush(TABLE);
303 }
304
305
306 deleteOneVersion(ht, FAMILY, 0, 0, 4);
307
308
309
310 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0,
311 Arrays.asList(2L, 3L, 4L, 5L));
312 assertEquals(3, kvs.length);
313 checkOneCell(kvs[0], FAMILY, 0, 0, 5);
314 checkOneCell(kvs[1], FAMILY, 0, 0, 3);
315 checkOneCell(kvs[2], FAMILY, 0, 0, 2);
316
317 ht.close();
318 }
319
320 @Test
321 public void testWithMultipleVersionDeletes() throws IOException {
322 LOG.info("testWithMultipleVersionDeletes");
323
324 TableName TABLE =
325 TableName.valueOf("testWithMultipleVersionDeletes");
326 byte [] FAMILY = Bytes.toBytes("event_log");
327 byte [][] FAMILIES = new byte[][] { FAMILY };
328
329
330 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
331
332
333 putNVersions(ht, FAMILY, 0, 0, 1, 5);
334
335 TEST_UTIL.flush(TABLE);
336
337
338 deleteAllVersionsBefore(ht, FAMILY, 0, 0, 4);
339
340
341
342 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
343 assertEquals(0, kvs.length);
344
345 ht.close();
346 }
347
348 @Test
349 public void testWithColumnDeletes() throws IOException {
350 TableName TABLE =
351 TableName.valueOf("testWithColumnDeletes");
352 byte [] FAMILY = Bytes.toBytes("event_log");
353 byte [][] FAMILIES = new byte[][] { FAMILY };
354
355
356 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
357
358
359 putNVersions(ht, FAMILY, 0, 0, 1, 5);
360
361 TEST_UTIL.flush(TABLE);
362
363
364 deleteColumn(ht, FAMILY, 0, 0);
365
366
367
368 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
369 assertEquals(0, kvs.length);
370
371 ht.close();
372 }
373
374 @Test
375 public void testWithFamilyDeletes() throws IOException {
376 TableName TABLE =
377 TableName.valueOf("testWithFamilyDeletes");
378 byte [] FAMILY = Bytes.toBytes("event_log");
379 byte [][] FAMILIES = new byte[][] { FAMILY };
380
381
382 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
383
384
385 putNVersions(ht, FAMILY, 0, 0, 1, 5);
386
387 TEST_UTIL.flush(TABLE);
388
389
390 deleteFamily(ht, FAMILY, 0);
391
392
393
394 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
395 assertEquals(0, kvs.length);
396
397 ht.close();
398 }
399
400
401
402
403
404 private void checkOneCell(Cell kv, byte[] cf,
405 int rowIdx, int colIdx, long ts) {
406
407 String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
408
409 assertEquals("Row mismatch which checking: " + ctx,
410 "row:"+ rowIdx, Bytes.toString(CellUtil.cloneRow(kv)));
411
412 assertEquals("ColumnFamily mismatch while checking: " + ctx,
413 Bytes.toString(cf), Bytes.toString(CellUtil.cloneFamily(kv)));
414
415 assertEquals("Column qualifier mismatch while checking: " + ctx,
416 "column:" + colIdx,
417 Bytes.toString(CellUtil.cloneQualifier(kv)));
418
419 assertEquals("Timestamp mismatch while checking: " + ctx,
420 ts, kv.getTimestamp());
421
422 assertEquals("Value mismatch while checking: " + ctx,
423 "value-version-" + ts, Bytes.toString(CellUtil.cloneValue(kv)));
424 }
425
426
427
428
429
430
431 private Cell[] getNVersions(HTable ht, byte[] cf, int rowIdx,
432 int colIdx, List<Long> versions)
433 throws IOException {
434 byte row[] = Bytes.toBytes("row:" + rowIdx);
435 byte column[] = Bytes.toBytes("column:" + colIdx);
436 Get get = new Get(row);
437 get.addColumn(cf, column);
438 get.setMaxVersions();
439 get.setTimeRange(Collections.min(versions), Collections.max(versions)+1);
440 Result result = ht.get(get);
441
442 return result.rawCells();
443 }
444
445 private ResultScanner scan(HTable ht, byte[] cf,
446 Integer[] rowIndexes, Integer[] columnIndexes,
447 Long[] versions, int maxVersions)
448 throws IOException {
449 Arrays.asList(rowIndexes);
450 byte startRow[] = Bytes.toBytes("row:" +
451 Collections.min( Arrays.asList(rowIndexes)));
452 byte endRow[] = Bytes.toBytes("row:" +
453 Collections.max( Arrays.asList(rowIndexes))+1);
454 Scan scan = new Scan(startRow, endRow);
455 for (Integer colIdx: columnIndexes) {
456 byte column[] = Bytes.toBytes("column:" + colIdx);
457 scan.addColumn(cf, column);
458 }
459 scan.setMaxVersions(maxVersions);
460 scan.setTimeRange(Collections.min(Arrays.asList(versions)),
461 Collections.max(Arrays.asList(versions))+1);
462 ResultScanner scanner = ht.getScanner(scan);
463 return scanner;
464 }
465
466 private void put(HTable ht, byte[] cf, Integer[] rowIndexes,
467 Integer[] columnIndexes, Long[] versions)
468 throws IOException {
469 for (int rowIdx: rowIndexes) {
470 byte row[] = Bytes.toBytes("row:" + rowIdx);
471 Put put = new Put(row);
472 put.setDurability(Durability.SKIP_WAL);
473 for(int colIdx: columnIndexes) {
474 byte column[] = Bytes.toBytes("column:" + colIdx);
475 for (long version: versions) {
476 put.add(cf, column, version, Bytes.toBytes("value-version-" +
477 version));
478 }
479 }
480 ht.put(put);
481 }
482 }
483
484
485
486
487
488 private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
489 long versionStart, long versionEnd)
490 throws IOException {
491 byte row[] = Bytes.toBytes("row:" + rowIdx);
492 byte column[] = Bytes.toBytes("column:" + colIdx);
493 Put put = new Put(row);
494 put.setDurability(Durability.SKIP_WAL);
495
496 for (long idx = versionStart; idx <= versionEnd; idx++) {
497 put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
498 }
499
500 ht.put(put);
501 }
502
503
504
505
506
507 private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
508 int colIdx, long version)
509 throws IOException {
510 byte row[] = Bytes.toBytes("row:" + rowIdx);
511 byte column[] = Bytes.toBytes("column:" + colIdx);
512 Delete del = new Delete(row);
513 del.deleteColumn(cf, column, version);
514 ht.delete(del);
515 }
516
517
518
519
520
521 private void deleteAllVersionsBefore(HTable ht, byte[] cf, int rowIdx,
522 int colIdx, long version)
523 throws IOException {
524 byte row[] = Bytes.toBytes("row:" + rowIdx);
525 byte column[] = Bytes.toBytes("column:" + colIdx);
526 Delete del = new Delete(row);
527 del.deleteColumns(cf, column, version);
528 ht.delete(del);
529 }
530
531 private void deleteColumn(HTable ht, byte[] cf, int rowIdx, int colIdx) throws IOException {
532 byte row[] = Bytes.toBytes("row:" + rowIdx);
533 byte column[] = Bytes.toBytes("column:" + colIdx);
534 Delete del = new Delete(row);
535 del.deleteColumns(cf, column);
536 ht.delete(del);
537 }
538
539 private void deleteFamily(HTable ht, byte[] cf, int rowIdx) throws IOException {
540 byte row[] = Bytes.toBytes("row:" + rowIdx);
541 Delete del = new Delete(row);
542 del.deleteFamily(cf);
543 ht.delete(del);
544 }
545
546 }
547
548