1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import java.io.IOException;
21 import java.util.List;
22 import java.util.concurrent.CountDownLatch;
23
24 import junit.framework.Assert;
25
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.HColumnDescriptor;
31 import org.apache.hadoop.hbase.HTableDescriptor;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.testclassification.MediumTests;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.TableNotFoundException;
36 import org.apache.hadoop.hbase.client.Admin;
37 import org.apache.hadoop.hbase.client.Connection;
38 import org.apache.hadoop.hbase.client.HBaseAdmin;
39 import org.apache.hadoop.hbase.client.HTable;
40 import org.apache.hadoop.hbase.client.Put;
41 import org.apache.hadoop.hbase.client.Result;
42 import org.apache.hadoop.hbase.client.ResultScanner;
43 import org.apache.hadoop.hbase.client.Scan;
44 import org.apache.hadoop.hbase.io.hfile.HFile;
45 import org.apache.hadoop.hbase.io.hfile.HFileContext;
46 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.junit.AfterClass;
49 import org.junit.BeforeClass;
50 import org.junit.Test;
51 import org.junit.experimental.categories.Category;
52
53 @Category(MediumTests.class)
54 public class TestScannerWithBulkload {
55 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
56
57 @BeforeClass
58 public static void setUpBeforeClass() throws Exception {
59 TEST_UTIL.startMiniCluster(1);
60 }
61
62 private static void createTable(Admin admin, TableName tableName) throws IOException {
63 HTableDescriptor desc = new HTableDescriptor(tableName);
64 HColumnDescriptor hcd = new HColumnDescriptor("col");
65 hcd.setMaxVersions(3);
66 desc.addFamily(hcd);
67 admin.createTable(desc);
68 }
69
70 @Test
71 public void testBulkLoad() throws Exception {
72 TableName tableName = TableName.valueOf("testBulkLoad");
73 long l = System.currentTimeMillis();
74 HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
75 createTable(admin, tableName);
76 Scan scan = createScan();
77 final HTable table = init(admin, l, scan, tableName);
78
79 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file",
80 false);
81 Configuration conf = TEST_UTIL.getConfiguration();
82 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
83 final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
84 bulkload.doBulkLoad(hfilePath, table);
85 ResultScanner scanner = table.getScanner(scan);
86 Result result = scanner.next();
87 result = scanAfterBulkLoad(scanner, result, "version2");
88 Put put0 = new Put(Bytes.toBytes("row1"));
89 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
90 .toBytes("version3")));
91 table.put(put0);
92 admin.flush(tableName);
93 scanner = table.getScanner(scan);
94 result = scanner.next();
95 while (result != null) {
96 List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
97 for (KeyValue _kv : kvs) {
98 if (Bytes.toString(_kv.getRow()).equals("row1")) {
99 System.out.println(Bytes.toString(_kv.getRow()));
100 System.out.println(Bytes.toString(_kv.getQualifier()));
101 System.out.println(Bytes.toString(_kv.getValue()));
102 Assert.assertEquals("version3", Bytes.toString(_kv.getValue()));
103 }
104 }
105 result = scanner.next();
106 }
107 scanner.close();
108 table.close();
109 }
110
111 private Result scanAfterBulkLoad(ResultScanner scanner, Result result, String expctedVal)
112 throws IOException {
113 while (result != null) {
114 List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
115 for (KeyValue _kv : kvs) {
116 if (Bytes.toString(_kv.getRow()).equals("row1")) {
117 System.out.println(Bytes.toString(_kv.getRow()));
118 System.out.println(Bytes.toString(_kv.getQualifier()));
119 System.out.println(Bytes.toString(_kv.getValue()));
120 Assert.assertEquals(expctedVal, Bytes.toString(_kv.getValue()));
121 }
122 }
123 result = scanner.next();
124 }
125 return result;
126 }
127
128
129
130 private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile)
131 throws IOException {
132 FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
133 final Path hfilePath = new Path(hFilePath);
134 fs.mkdirs(hfilePath);
135 Path path = new Path(pathStr);
136 HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
137 Assert.assertNotNull(wf);
138 HFileContext context = new HFileContext();
139 HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create();
140 KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
141 Bytes.toBytes("version2"));
142
143
144 if (nativeHFile) {
145
146
147
148 kv.setSequenceId(9999999);
149 }
150
151 writer.append(kv);
152
153 if (nativeHFile) {
154
155
156
157 writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999)));
158 }
159 else {
160 writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
161 }
162 writer.close();
163 return hfilePath;
164 }
165
166 private HTable init(HBaseAdmin admin, long l, Scan scan, TableName tableName) throws Exception {
167 Connection connection = TEST_UTIL.getConnection();
168 HTable table = (HTable) connection.getTable(tableName);
169 Put put0 = new Put(Bytes.toBytes("row1"));
170 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
171 .toBytes("version0")));
172 table.put(put0);
173 admin.flush(tableName);
174 Put put1 = new Put(Bytes.toBytes("row2"));
175 put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
176 .toBytes("version0")));
177 table.put(put1);
178 admin.flush(tableName);
179 put0 = new Put(Bytes.toBytes("row1"));
180 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
181 .toBytes("version1")));
182 table.put(put0);
183 admin.flush(tableName);
184 admin.compact(tableName);
185
186 ResultScanner scanner = table.getScanner(scan);
187 Result result = scanner.next();
188 List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
189 Assert.assertEquals(1, kvs.size());
190 Assert.assertEquals("version1", Bytes.toString(kvs.get(0).getValue()));
191 scanner.close();
192 return table;
193 }
194
195 @Test
196 public void testBulkLoadWithParallelScan() throws Exception {
197 TableName tableName = TableName.valueOf("testBulkLoadWithParallelScan");
198 final long l = System.currentTimeMillis();
199 HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
200 createTable(admin, tableName);
201 Scan scan = createScan();
202 final HTable table = init(admin, l, scan, tableName);
203
204 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/",
205 "/temp/testBulkLoadWithParallelScan/col/file", false);
206 Configuration conf = TEST_UTIL.getConfiguration();
207 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
208 final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
209 ResultScanner scanner = table.getScanner(scan);
210
211 final CountDownLatch latch = new CountDownLatch(1);
212 new Thread() {
213 public void run() {
214 try {
215 Put put1 = new Put(Bytes.toBytes("row5"));
216 put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
217 Bytes.toBytes("version0")));
218 table.put(put1);
219 bulkload.doBulkLoad(hfilePath, (HTable) table);
220 latch.countDown();
221 } catch (TableNotFoundException e) {
222 } catch (IOException e) {
223 }
224 }
225 }.start();
226 latch.await();
227
228
229 Result result = scanner.next();
230 scanAfterBulkLoad(scanner, result, "version1");
231 scanner.close();
232 table.close();
233
234 }
235
236 @Test
237 public void testBulkLoadNativeHFile() throws Exception {
238 TableName tableName = TableName.valueOf("testBulkLoadNativeHFile");
239 long l = System.currentTimeMillis();
240 HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
241 createTable(admin, tableName);
242 Scan scan = createScan();
243 final HTable table = init(admin, l, scan, tableName);
244
245 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/",
246 "/temp/testBulkLoadNativeHFile/col/file", true);
247 Configuration conf = TEST_UTIL.getConfiguration();
248 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
249 final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
250 bulkload.doBulkLoad(hfilePath, table);
251 ResultScanner scanner = table.getScanner(scan);
252 Result result = scanner.next();
253
254
255 result = scanAfterBulkLoad(scanner, result, "version2");
256 Put put0 = new Put(Bytes.toBytes("row1"));
257 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
258 .toBytes("version3")));
259 table.put(put0);
260 admin.flush(tableName);
261 scanner = table.getScanner(scan);
262 result = scanner.next();
263 while (result != null) {
264 List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
265 for (KeyValue _kv : kvs) {
266 if (Bytes.toString(_kv.getRow()).equals("row1")) {
267 System.out.println(Bytes.toString(_kv.getRow()));
268 System.out.println(Bytes.toString(_kv.getQualifier()));
269 System.out.println(Bytes.toString(_kv.getValue()));
270 Assert.assertEquals("version3", Bytes.toString(_kv.getValue()));
271 }
272 }
273 result = scanner.next();
274 }
275 scanner.close();
276 table.close();
277 }
278
279 private Scan createScan() {
280 Scan scan = new Scan();
281 scan.setMaxVersions(3);
282 return scan;
283 }
284
285 @AfterClass
286 public static void tearDownAfterClass() throws Exception {
287 TEST_UTIL.shutdownMiniCluster();
288 }
289 }