1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import com.google.common.collect.Lists;
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.fs.FileSystem;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.hadoop.hbase.Cell;
27 import org.apache.hadoop.hbase.HBaseConfiguration;
28 import org.apache.hadoop.hbase.HBaseTestingUtility;
29 import org.apache.hadoop.hbase.HColumnDescriptor;
30 import org.apache.hadoop.hbase.HTableDescriptor;
31 import org.apache.hadoop.hbase.KeyValue;
32 import org.apache.hadoop.hbase.client.HTable;
33 import org.apache.hadoop.hbase.testclassification.LargeTests;
34 import org.apache.hadoop.hbase.KeyValueUtil;
35 import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
36 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
37 import org.apache.hadoop.hbase.TableExistsException;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.client.HConnection;
40 import org.apache.hadoop.hbase.client.RegionServerCallable;
41 import org.apache.hadoop.hbase.client.Result;
42 import org.apache.hadoop.hbase.client.ResultScanner;
43 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
44 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
45 import org.apache.hadoop.hbase.client.Scan;
46 import org.apache.hadoop.hbase.io.compress.Compression;
47 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
48 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
49 import org.apache.hadoop.hbase.io.hfile.HFile;
50 import org.apache.hadoop.hbase.io.hfile.HFileContext;
51 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
52 import org.apache.hadoop.hbase.protobuf.RequestConverter;
53 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
54 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
55 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
56 import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
57 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.util.Pair;
60 import org.apache.hadoop.hbase.wal.WAL;
61 import org.apache.hadoop.hbase.wal.WALKey;
62 import org.junit.Test;
63 import org.junit.experimental.categories.Category;
64
65 import com.google.common.collect.Lists;
66
67 import java.io.IOException;
68 import java.util.ArrayList;
69 import java.util.List;
70 import java.util.Map;
71 import java.util.concurrent.atomic.AtomicLong;
72
73 import static org.hamcrest.core.Is.is;
74 import static org.junit.Assert.assertThat;
75
76
77
78
79
80 @Category(LargeTests.class)
81 public class TestHRegionServerBulkLoad {
82 private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
83 private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
84 private final static Configuration conf = UTIL.getConfiguration();
85 private final static byte[] QUAL = Bytes.toBytes("qual");
86 private final static int NUM_CFS = 10;
87 public static int BLOCKSIZE = 64 * 1024;
88 public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
89
90 private final static byte[][] families = new byte[NUM_CFS][];
91 static {
92 for (int i = 0; i < NUM_CFS; i++) {
93 families[i] = Bytes.toBytes(family(i));
94 }
95 }
96
97
98
99
100
101 public static byte[] rowkey(int i) {
102 return Bytes.toBytes(String.format("row_%08d", i));
103 }
104
105 static String family(int i) {
106 return String.format("family_%04d", i);
107 }
108
109
110
111
112 public static void createHFile(FileSystem fs, Path path, byte[] family,
113 byte[] qualifier, byte[] value, int numRows) throws IOException {
114 HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE)
115 .withCompression(COMPRESSION)
116 .build();
117 HFile.Writer writer = HFile
118 .getWriterFactory(conf, new CacheConfig(conf))
119 .withPath(fs, path)
120 .withFileContext(context)
121 .create();
122 long now = System.currentTimeMillis();
123 try {
124
125 for (int i = 0; i < numRows; i++) {
126 KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
127 writer.append(kv);
128 }
129 writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(now));
130 } finally {
131 writer.close();
132 }
133 }
134
135
136
137
138
139
140
141
142
143 public static class AtomicHFileLoader extends RepeatingTestThread {
144 final AtomicLong numBulkLoads = new AtomicLong();
145 final AtomicLong numCompactions = new AtomicLong();
146 private TableName tableName;
147
148 public AtomicHFileLoader(TableName tableName, TestContext ctx,
149 byte targetFamilies[][]) throws IOException {
150 super(ctx);
151 this.tableName = tableName;
152 }
153
154 public void doAnAction() throws Exception {
155 long iteration = numBulkLoads.getAndIncrement();
156 Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
157 iteration));
158
159
160 FileSystem fs = UTIL.getTestFileSystem();
161 byte[] val = Bytes.toBytes(String.format("%010d", iteration));
162 final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(
163 NUM_CFS);
164 for (int i = 0; i < NUM_CFS; i++) {
165 Path hfile = new Path(dir, family(i));
166 byte[] fam = Bytes.toBytes(family(i));
167 createHFile(fs, hfile, fam, QUAL, val, 1000);
168 famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
169 }
170
171
172 final HConnection conn = UTIL.getHBaseAdmin().getConnection();
173 RegionServerCallable<Void> callable =
174 new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
175 @Override
176 public Void call(int callTimeout) throws Exception {
177 LOG.debug("Going to connect to server " + getLocation() + " for row "
178 + Bytes.toStringBinary(getRow()));
179 byte[] regionName = getLocation().getRegionInfo().getRegionName();
180 BulkLoadHFileRequest request =
181 RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
182 getStub().bulkLoadHFile(null, request);
183 return null;
184 }
185 };
186 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
187 RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
188 caller.callWithRetries(callable, Integer.MAX_VALUE);
189
190
191 if (numBulkLoads.get() % 10 == 0) {
192
193 callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
194 @Override
195 public Void call(int callTimeout) throws Exception {
196 LOG.debug("compacting " + getLocation() + " for row "
197 + Bytes.toStringBinary(getRow()));
198 AdminProtos.AdminService.BlockingInterface server =
199 conn.getAdmin(getLocation().getServerName());
200 CompactRegionRequest request =
201 RequestConverter.buildCompactRegionRequest(
202 getLocation().getRegionInfo().getRegionName(), true, null);
203 server.compactRegion(null, request);
204 numCompactions.incrementAndGet();
205 return null;
206 }
207 };
208 caller.callWithRetries(callable, Integer.MAX_VALUE);
209 }
210 }
211 }
212
213
214
215
216
217 public static class AtomicScanReader extends RepeatingTestThread {
218 byte targetFamilies[][];
219 HTable table;
220 AtomicLong numScans = new AtomicLong();
221 AtomicLong numRowsScanned = new AtomicLong();
222 TableName TABLE_NAME;
223
224 public AtomicScanReader(TableName TABLE_NAME, TestContext ctx,
225 byte targetFamilies[][]) throws IOException {
226 super(ctx);
227 this.TABLE_NAME = TABLE_NAME;
228 this.targetFamilies = targetFamilies;
229 table = new HTable(conf, TABLE_NAME);
230 }
231
232 public void doAnAction() throws Exception {
233 Scan s = new Scan();
234 for (byte[] family : targetFamilies) {
235 s.addFamily(family);
236 }
237 ResultScanner scanner = table.getScanner(s);
238
239 for (Result res : scanner) {
240 byte[] lastRow = null, lastFam = null, lastQual = null;
241 byte[] gotValue = null;
242 for (byte[] family : targetFamilies) {
243 byte qualifier[] = QUAL;
244 byte thisValue[] = res.getValue(family, qualifier);
245 if (gotValue != null && thisValue != null
246 && !Bytes.equals(gotValue, thisValue)) {
247
248 StringBuilder msg = new StringBuilder();
249 msg.append("Failed on scan ").append(numScans)
250 .append(" after scanning ").append(numRowsScanned)
251 .append(" rows!\n");
252 msg.append("Current was " + Bytes.toString(res.getRow()) + "/"
253 + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
254 + " = " + Bytes.toString(thisValue) + "\n");
255 msg.append("Previous was " + Bytes.toString(lastRow) + "/"
256 + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual)
257 + " = " + Bytes.toString(gotValue));
258 throw new RuntimeException(msg.toString());
259 }
260
261 lastFam = family;
262 lastQual = qualifier;
263 lastRow = res.getRow();
264 gotValue = thisValue;
265 }
266 numRowsScanned.getAndIncrement();
267 }
268 numScans.getAndIncrement();
269 }
270 }
271
272
273
274
275
276 private void setupTable(TableName table, int cfs) throws IOException {
277 try {
278 LOG.info("Creating table " + table);
279 HTableDescriptor htd = new HTableDescriptor(table);
280 for (int i = 0; i < 10; i++) {
281 htd.addFamily(new HColumnDescriptor(family(i)));
282 }
283
284 UTIL.getHBaseAdmin().createTable(htd);
285 } catch (TableExistsException tee) {
286 LOG.info("Table " + table + " already exists");
287 }
288 }
289
290
291
292
293 @Test
294 public void testAtomicBulkLoad() throws Exception {
295 TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
296
297 int millisToRun = 30000;
298 int numScanners = 50;
299
300 UTIL.startMiniCluster(1);
301 try {
302 WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
303 FindBulkHBaseListener listener = new FindBulkHBaseListener();
304 log.registerWALActionsListener(listener);
305 runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
306 assertThat(listener.isFound(), is(true));
307 } finally {
308 UTIL.shutdownMiniCluster();
309 }
310 }
311
312 void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
313 throws Exception {
314 setupTable(tableName, 10);
315
316 TestContext ctx = new TestContext(UTIL.getConfiguration());
317
318 AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
319 ctx.addThread(loader);
320
321 List<AtomicScanReader> scanners = Lists.newArrayList();
322 for (int i = 0; i < numScanners; i++) {
323 AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
324 scanners.add(scanner);
325 ctx.addThread(scanner);
326 }
327
328 ctx.startThreads();
329 ctx.waitFor(millisToRun);
330 ctx.stop();
331
332 LOG.info("Loaders:");
333 LOG.info(" loaded " + loader.numBulkLoads.get());
334 LOG.info(" compations " + loader.numCompactions.get());
335
336 LOG.info("Scanners:");
337 for (AtomicScanReader scanner : scanners) {
338 LOG.info(" scanned " + scanner.numScans.get());
339 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
340 }
341 }
342
343
344
345
346
347 public static void main(String args[]) throws Exception {
348 try {
349 Configuration c = HBaseConfiguration.create();
350 TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad();
351 test.setConf(c);
352 test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
353 } finally {
354 System.exit(0);
355 }
356 }
357
358 private void setConf(Configuration c) {
359 UTIL = new HBaseTestingUtility(c);
360 }
361
362 static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener {
363 private boolean found = false;
364
365 @Override
366 public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
367 for (Cell cell : logEdit.getCells()) {
368 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
369 for (Map.Entry entry : kv.toStringMap().entrySet()) {
370 if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
371 found = true;
372 }
373 }
374 }
375 }
376
377 public boolean isFound() {
378 return found;
379 }
380 }
381 }
382
383