1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Random;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.CellUtil;
32 import org.apache.hadoop.hbase.HBaseTestCase;
33 import org.apache.hadoop.hbase.HColumnDescriptor;
34 import org.apache.hadoop.hbase.HTableDescriptor;
35 import org.apache.hadoop.hbase.SmallTests;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.client.Durability;
38 import org.apache.hadoop.hbase.client.Put;
39 import org.apache.hadoop.hbase.client.Scan;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.junit.experimental.categories.Category;
42
43 @Category(SmallTests.class)
44 public class TestWideScanner extends HBaseTestCase {
45 private final Log LOG = LogFactory.getLog(this.getClass());
46
47 static final byte[] A = Bytes.toBytes("A");
48 static final byte[] B = Bytes.toBytes("B");
49 static final byte[] C = Bytes.toBytes("C");
50 static byte[][] COLUMNS = { A, B, C };
51 static final Random rng = new Random();
52 static final HTableDescriptor TESTTABLEDESC =
53 new HTableDescriptor(TableName.valueOf("testwidescan"));
54 static {
55 for (byte[] cfName : new byte[][] { A, B, C }) {
56 TESTTABLEDESC.addFamily(new HColumnDescriptor(cfName)
57
58 .setMaxVersions(100)
59 .setBlocksize(8 * 1024)
60 );
61 }
62 }
63
64
65 HRegion r;
66
67 private int addWideContent(HRegion region) throws IOException {
68 int count = 0;
69 for (char c = 'a'; c <= 'c'; c++) {
70 byte[] row = Bytes.toBytes("ab" + c);
71 int i, j;
72 long ts = System.currentTimeMillis();
73 for (i = 0; i < 100; i++) {
74 byte[] b = Bytes.toBytes(String.format("%10d", i));
75 for (j = 0; j < 100; j++) {
76 Put put = new Put(row);
77 put.setDurability(Durability.SKIP_WAL);
78 put.add(COLUMNS[rng.nextInt(COLUMNS.length)], b, ++ts, b);
79 region.put(put);
80 count++;
81 }
82 }
83 }
84 return count;
85 }
86
87 public void testWideScanBatching() throws IOException {
88 final int batch = 256;
89 try {
90 this.r = createNewHRegion(TESTTABLEDESC, null, null);
91 int inserted = addWideContent(this.r);
92 List<Cell> results = new ArrayList<Cell>();
93 Scan scan = new Scan();
94 scan.addFamily(A);
95 scan.addFamily(B);
96 scan.addFamily(C);
97 scan.setMaxVersions(100);
98 scan.setBatch(batch);
99 InternalScanner s = r.getScanner(scan);
100 int total = 0;
101 int i = 0;
102 boolean more;
103 do {
104 more = s.next(results);
105 i++;
106 LOG.info("iteration #" + i + ", results.size=" + results.size());
107
108
109 assertTrue(results.size() <= batch);
110
111 total += results.size();
112
113 if (results.size() > 0) {
114
115 byte[] row = CellUtil.cloneRow(results.get(0));
116 for (Cell kv: results) {
117 assertTrue(Bytes.equals(row, CellUtil.cloneRow(kv)));
118 }
119 }
120
121 results.clear();
122
123
124 Iterator<KeyValueScanner> scanners =
125 ((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator();
126 while (scanners.hasNext()) {
127 StoreScanner ss = (StoreScanner)scanners.next();
128 ss.updateReaders();
129 }
130 } while (more);
131
132
133 LOG.info("inserted " + inserted + ", scanned " + total);
134 assertEquals(total, inserted);
135
136 s.close();
137 } finally {
138 HRegion.closeHRegion(this.r);
139 }
140 }
141
142 }
143