1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
22 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
23 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertTrue;
28 import static org.junit.Assert.fail;
29
30 import java.io.IOException;
31 import java.util.ArrayList;
32 import java.util.List;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.CellUtil;
38 import org.apache.hadoop.hbase.HBaseTestCase;
39 import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
40 import org.apache.hadoop.hbase.HBaseTestCase.ScannerIncommon;
41 import org.apache.hadoop.hbase.HBaseTestingUtility;
42 import org.apache.hadoop.hbase.HColumnDescriptor;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HRegionInfo;
45 import org.apache.hadoop.hbase.HTableDescriptor;
46 import org.apache.hadoop.hbase.KeyValue;
47 import org.apache.hadoop.hbase.SmallTests;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.UnknownScannerException;
50 import org.apache.hadoop.hbase.client.Delete;
51 import org.apache.hadoop.hbase.client.Get;
52 import org.apache.hadoop.hbase.client.Put;
53 import org.apache.hadoop.hbase.client.Result;
54 import org.apache.hadoop.hbase.client.Scan;
55 import org.apache.hadoop.hbase.filter.Filter;
56 import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
57 import org.apache.hadoop.hbase.filter.PrefixFilter;
58 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
59 import org.apache.hadoop.hbase.util.Bytes;
60 import org.junit.Rule;
61 import org.junit.Test;
62 import org.junit.experimental.categories.Category;
63 import org.junit.rules.TestName;
64
65
66
67
68
69 @Category(SmallTests.class)
70 public class TestScanner {
71 @Rule public TestName name = new TestName();
72 private final Log LOG = LogFactory.getLog(this.getClass());
73 private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
74
75 private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW;
76 private static final byte [][] COLS = { HConstants.CATALOG_FAMILY };
77 private static final byte [][] EXPLICIT_COLS = {
78 HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER,
79
80
81 };
82
83 static final HTableDescriptor TESTTABLEDESC =
84 new HTableDescriptor(TableName.valueOf("testscanner"));
85 static {
86 TESTTABLEDESC.addFamily(
87 new HColumnDescriptor(HConstants.CATALOG_FAMILY)
88
89 .setMaxVersions(10)
90 .setBlockCacheEnabled(false)
91 .setBlocksize(8 * 1024)
92 );
93 }
94
95 public static final HRegionInfo REGION_INFO =
96 new HRegionInfo(TESTTABLEDESC.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
97 HConstants.EMPTY_BYTE_ARRAY);
98
99 private static final byte [] ROW_KEY = REGION_INFO.getRegionName();
100
101 private static final long START_CODE = Long.MAX_VALUE;
102
103 private HRegion r;
104 private HRegionIncommon region;
105
106 private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
107 final private byte[] col1, col2;
108
109 public TestScanner() {
110 super();
111
112 firstRowBytes = START_KEY_BYTES;
113 secondRowBytes = START_KEY_BYTES.clone();
114
115 secondRowBytes[START_KEY_BYTES.length - 1]++;
116 thirdRowBytes = START_KEY_BYTES.clone();
117 thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
118 col1 = Bytes.toBytes("column1");
119 col2 = Bytes.toBytes("column2");
120 }
121
122
123
124
125
126 @Test
127 public void testStopRow() throws Exception {
128 byte [] startrow = Bytes.toBytes("bbb");
129 byte [] stoprow = Bytes.toBytes("ccc");
130 try {
131 this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
132 HBaseTestCase.addContent(this.r, HConstants.CATALOG_FAMILY);
133 List<Cell> results = new ArrayList<Cell>();
134
135 Scan scan = new Scan(Bytes.toBytes("abc"), Bytes.toBytes("abd"));
136 scan.addFamily(HConstants.CATALOG_FAMILY);
137
138 InternalScanner s = r.getScanner(scan);
139 int count = 0;
140 while (s.next(results)) {
141 count++;
142 }
143 s.close();
144 assertEquals(0, count);
145
146 scan = new Scan(startrow, stoprow);
147 scan.addFamily(HConstants.CATALOG_FAMILY);
148
149 s = r.getScanner(scan);
150 count = 0;
151 Cell kv = null;
152 results = new ArrayList<Cell>();
153 for (boolean first = true; s.next(results);) {
154 kv = results.get(0);
155 if (first) {
156 assertTrue(CellUtil.matchingRow(kv, startrow));
157 first = false;
158 }
159 count++;
160 }
161 assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, CellUtil.cloneRow(kv)) > 0);
162
163 assertTrue(count > 10);
164 s.close();
165 } finally {
166 HRegion.closeHRegion(this.r);
167 }
168 }
169
170 void rowPrefixFilter(Scan scan) throws IOException {
171 List<Cell> results = new ArrayList<Cell>();
172 scan.addFamily(HConstants.CATALOG_FAMILY);
173 InternalScanner s = r.getScanner(scan);
174 boolean hasMore = true;
175 while (hasMore) {
176 hasMore = s.next(results);
177 for (Cell kv : results) {
178 assertEquals((byte)'a', CellUtil.cloneRow(kv)[0]);
179 assertEquals((byte)'b', CellUtil.cloneRow(kv)[1]);
180 }
181 results.clear();
182 }
183 s.close();
184 }
185
186 void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException {
187 List<Cell> results = new ArrayList<Cell>();
188 scan.addFamily(HConstants.CATALOG_FAMILY);
189 InternalScanner s = r.getScanner(scan);
190 boolean hasMore = true;
191 while (hasMore) {
192 hasMore = s.next(results);
193 for (Cell kv : results) {
194 assertTrue(Bytes.compareTo(CellUtil.cloneRow(kv), stopRow) <= 0);
195 }
196 results.clear();
197 }
198 s.close();
199 }
200
201 @Test
202 public void testFilters() throws IOException {
203 try {
204 this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
205 HBaseTestCase.addContent(this.r, HConstants.CATALOG_FAMILY);
206 byte [] prefix = Bytes.toBytes("ab");
207 Filter newFilter = new PrefixFilter(prefix);
208 Scan scan = new Scan();
209 scan.setFilter(newFilter);
210 rowPrefixFilter(scan);
211
212 byte[] stopRow = Bytes.toBytes("bbc");
213 newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow));
214 scan = new Scan();
215 scan.setFilter(newFilter);
216 rowInclusiveStopFilter(scan, stopRow);
217
218 } finally {
219 HRegion.closeHRegion(this.r);
220 }
221 }
222
223
224
225
226
227
228 @Test
229 public void testRaceBetweenClientAndTimeout() throws Exception {
230 try {
231 this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
232 HBaseTestCase.addContent(this.r, HConstants.CATALOG_FAMILY);
233 Scan scan = new Scan();
234 InternalScanner s = r.getScanner(scan);
235 List<Cell> results = new ArrayList<Cell>();
236 try {
237 s.next(results);
238 s.close();
239 s.next(results);
240 fail("We don't want anything more, we should be failing");
241 } catch (UnknownScannerException ex) {
242
243 return;
244 }
245 } finally {
246 HRegion.closeHRegion(this.r);
247 }
248 }
249
250
251
252
253 @Test
254 public void testScanner() throws IOException {
255 try {
256 r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
257 region = new HRegionIncommon(r);
258
259
260
261 Put put = new Put(ROW_KEY, System.currentTimeMillis());
262
263 put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
264 REGION_INFO.toByteArray());
265 region.put(put);
266
267
268
269
270 scan(false, null);
271 getRegionInfo();
272
273
274
275 r.close();
276 r = HRegion.openHRegion(r, null);
277 region = new HRegionIncommon(r);
278
279
280
281 scan(false, null);
282 getRegionInfo();
283
284
285
286 String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtility.randomFreePort();
287
288 put = new Put(ROW_KEY, System.currentTimeMillis());
289 put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
290 Bytes.toBytes(address));
291
292
293
294 region.put(put);
295
296
297
298
299 scan(true, address.toString());
300 getRegionInfo();
301
302
303
304 region.flushcache();
305
306
307
308 scan(true, address.toString());
309 getRegionInfo();
310
311
312
313 r.close();
314 r = HRegion.openHRegion(r,null);
315 region = new HRegionIncommon(r);
316
317
318
319 scan(true, address.toString());
320 getRegionInfo();
321
322
323
324 address = "bar.foo.com:4321";
325
326 put = new Put(ROW_KEY, System.currentTimeMillis());
327
328 put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
329 Bytes.toBytes(address));
330 region.put(put);
331
332
333
334 scan(true, address.toString());
335 getRegionInfo();
336
337
338
339 region.flushcache();
340
341
342
343 scan(true, address.toString());
344 getRegionInfo();
345
346
347
348 r.close();
349 r = HRegion.openHRegion(r,null);
350 region = new HRegionIncommon(r);
351
352
353
354 scan(true, address.toString());
355 getRegionInfo();
356
357 } finally {
358
359 HRegion.closeHRegion(r);
360 }
361 }
362
363
364 private void validateRegionInfo(byte [] regionBytes) throws IOException {
365 HRegionInfo info = HRegionInfo.parseFromOrNull(regionBytes);
366
367 assertEquals(REGION_INFO.getRegionId(), info.getRegionId());
368 assertEquals(0, info.getStartKey().length);
369 assertEquals(0, info.getEndKey().length);
370 assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName()));
371
372 }
373
374
375 private void scan(boolean validateStartcode, String serverName)
376 throws IOException {
377 InternalScanner scanner = null;
378 Scan scan = null;
379 List<Cell> results = new ArrayList<Cell>();
380 byte [][][] scanColumns = {
381 COLS,
382 EXPLICIT_COLS
383 };
384
385 for(int i = 0; i < scanColumns.length; i++) {
386 try {
387 scan = new Scan(FIRST_ROW);
388 for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) {
389 scan.addColumn(COLS[0], EXPLICIT_COLS[ii]);
390 }
391 scanner = r.getScanner(scan);
392 while (scanner.next(results)) {
393 assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
394 HConstants.REGIONINFO_QUALIFIER));
395 byte [] val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY,
396 HConstants.REGIONINFO_QUALIFIER));
397 validateRegionInfo(val);
398 if(validateStartcode) {
399
400
401
402
403 assertNotNull(val);
404 assertFalse(val.length == 0);
405 long startCode = Bytes.toLong(val);
406 assertEquals(START_CODE, startCode);
407 }
408
409 if(serverName != null) {
410 assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
411 HConstants.SERVER_QUALIFIER));
412 val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY,
413 HConstants.SERVER_QUALIFIER));
414 assertNotNull(val);
415 assertFalse(val.length == 0);
416 String server = Bytes.toString(val);
417 assertEquals(0, server.compareTo(serverName));
418 }
419 }
420 } finally {
421 InternalScanner s = scanner;
422 scanner = null;
423 if(s != null) {
424 s.close();
425 }
426 }
427 }
428 }
429
430 private boolean hasColumn(final List<Cell> kvs, final byte [] family,
431 final byte [] qualifier) {
432 for (Cell kv: kvs) {
433 if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
434 return true;
435 }
436 }
437 return false;
438 }
439
440 private Cell getColumn(final List<Cell> kvs, final byte [] family,
441 final byte [] qualifier) {
442 for (Cell kv: kvs) {
443 if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
444 return kv;
445 }
446 }
447 return null;
448 }
449
450
451
452 private void getRegionInfo() throws IOException {
453 Get get = new Get(ROW_KEY);
454 get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
455 Result result = region.get(get);
456 byte [] bytes = result.value();
457 validateRegionInfo(bytes);
458 }
459
460
461
462
463
464
465
466 @Test
467 public void testScanAndSyncFlush() throws Exception {
468 this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
469 HRegionIncommon hri = new HRegionIncommon(r);
470 try {
471 LOG.info("Added: " + HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
472 Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
473 int count = count(hri, -1, false);
474 assertEquals(count, count(hri, 100, false));
475 } catch (Exception e) {
476 LOG.error("Failed", e);
477 throw e;
478 } finally {
479 HRegion.closeHRegion(this.r);
480 }
481 }
482
483
484
485
486
487
488
489 @Test
490 public void testScanAndRealConcurrentFlush() throws Exception {
491 this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
492 HRegionIncommon hri = new HRegionIncommon(r);
493 try {
494 LOG.info("Added: " + HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
495 Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
496 int count = count(hri, -1, false);
497 assertEquals(count, count(hri, 100, true));
498 } catch (Exception e) {
499 LOG.error("Failed", e);
500 throw e;
501 } finally {
502 HRegion.closeHRegion(this.r);
503 }
504 }
505
506
507
508
509
510
511
512 @Test
513 @SuppressWarnings("deprecation")
514 public void testScanAndConcurrentMajorCompact() throws Exception {
515 HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name.getMethodName());
516 this.r = TEST_UTIL.createLocalHRegion(htd, null, null);
517 HRegionIncommon hri = new HRegionIncommon(r);
518
519 try {
520 HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1),
521 firstRowBytes, secondRowBytes);
522 HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1),
523 firstRowBytes, secondRowBytes);
524
525 Delete dc = new Delete(firstRowBytes);
526
527 dc.deleteColumns(fam1, col1);
528 r.delete(dc);
529 r.flushcache();
530
531 HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1),
532 secondRowBytes, thirdRowBytes);
533 HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1),
534 secondRowBytes, thirdRowBytes);
535 r.flushcache();
536
537 InternalScanner s = r.getScanner(new Scan());
538
539 r.compactStores(true);
540
541 List<Cell> results = new ArrayList<Cell>();
542 s.next(results);
543
544
545 assertTrue("result is not correct, keyValues : " + results,
546 results.size() == 1);
547 assertTrue(CellUtil.matchingRow(results.get(0), firstRowBytes));
548 assertTrue(CellUtil.matchingFamily(results.get(0), fam2));
549
550 results = new ArrayList<Cell>();
551 s.next(results);
552
553
554 assertTrue(results.size() == 2);
555 assertTrue(CellUtil.matchingRow(results.get(0), secondRowBytes));
556 assertTrue(CellUtil.matchingFamily(results.get(0), fam1));
557 assertTrue(CellUtil.matchingFamily(results.get(1), fam2));
558 } finally {
559 HRegion.closeHRegion(this.r);
560 }
561 }
562
563
564
565
566
567
568
569
570
571 private int count(final HRegionIncommon hri, final int flushIndex,
572 boolean concurrent)
573 throws IOException {
574 LOG.info("Taking out counting scan");
575 ScannerIncommon s = hri.getScanner(HConstants.CATALOG_FAMILY, EXPLICIT_COLS,
576 HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP);
577 List<Cell> values = new ArrayList<Cell>();
578 int count = 0;
579 boolean justFlushed = false;
580 while (s.next(values)) {
581 if (justFlushed) {
582 LOG.info("after next() just after next flush");
583 justFlushed=false;
584 }
585 count++;
586 if (flushIndex == count) {
587 LOG.info("Starting flush at flush index " + flushIndex);
588 Thread t = new Thread() {
589 public void run() {
590 try {
591 hri.flushcache();
592 LOG.info("Finishing flush");
593 } catch (IOException e) {
594 LOG.info("Failed flush cache");
595 }
596 }
597 };
598 if (concurrent) {
599 t.start();
600 } else {
601 t.run();
602 }
603 LOG.info("Continuing on after kicking off background flush");
604 justFlushed = true;
605 }
606 }
607 s.close();
608 LOG.info("Found " + count + " items");
609 return count;
610 }
611
612 }