1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.junit.Assert.assertArrayEquals;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24
25 import java.io.IOException;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.NavigableMap;
29 import java.util.Random;
30 import java.util.Set;
31
32 import org.apache.commons.io.IOUtils;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.Chore;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.HBaseTestingUtility;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.HRegionInfo;
41 import org.apache.hadoop.hbase.LargeTests;
42 import org.apache.hadoop.hbase.NotServingRegionException;
43 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
44 import org.apache.hadoop.hbase.ServerName;
45 import org.apache.hadoop.hbase.Stoppable;
46 import org.apache.hadoop.hbase.client.Get;
47 import org.apache.hadoop.hbase.client.HBaseAdmin;
48 import org.apache.hadoop.hbase.client.HConnection;
49 import org.apache.hadoop.hbase.client.HConnectionManager;
50 import org.apache.hadoop.hbase.client.HTable;
51 import org.apache.hadoop.hbase.client.MetaScanner;
52 import org.apache.hadoop.hbase.client.Put;
53 import org.apache.hadoop.hbase.client.Result;
54 import org.apache.hadoop.hbase.client.Scan;
55 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
56 import org.apache.hadoop.hbase.protobuf.RequestConverter;
57 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.util.Pair;
60 import org.apache.hadoop.hbase.util.PairOfSameType;
61 import org.apache.hadoop.hbase.util.StoppableImplementation;
62 import org.apache.hadoop.hbase.util.Threads;
63 import org.junit.AfterClass;
64 import org.junit.BeforeClass;
65 import org.junit.Test;
66 import org.junit.experimental.categories.Category;
67
68 import com.google.common.collect.Iterators;
69 import com.google.common.collect.Sets;
70 import com.google.protobuf.ServiceException;
71
72 @Category(LargeTests.class)
73 public class TestEndToEndSplitTransaction {
74 private static final Log LOG = LogFactory.getLog(TestEndToEndSplitTransaction.class);
75 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
76 private static final Configuration conf = TEST_UTIL.getConfiguration();
77
78 @BeforeClass
79 public static void beforeAllTests() throws Exception {
80 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
81 TEST_UTIL.startMiniCluster();
82 }
83
84 @AfterClass
85 public static void afterAllTests() throws Exception {
86 TEST_UTIL.shutdownMiniCluster();
87 }
88
89 @Test
90 public void testMasterOpsWhileSplitting() throws Exception {
91 TableName tableName =
92 TableName.valueOf("TestSplit");
93 byte[] familyName = Bytes.toBytes("fam");
94 HTable ht = TEST_UTIL.createTable(tableName, familyName);
95 TEST_UTIL.loadTable(ht, familyName, false);
96 ht.close();
97 HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
98 byte []firstRow = Bytes.toBytes("aaa");
99 byte []splitRow = Bytes.toBytes("lll");
100 byte []lastRow = Bytes.toBytes("zzz");
101 HConnection con = HConnectionManager
102 .getConnection(TEST_UTIL.getConfiguration());
103
104 byte[] regionName = con.locateRegion(tableName, splitRow).getRegionInfo()
105 .getRegionName();
106 HRegion region = server.getRegion(regionName);
107 SplitTransaction split = new SplitTransaction(region, splitRow);
108 split.prepare();
109
110
111 PairOfSameType<HRegion> regions = split.createDaughters(server, server);
112 assertFalse(test(con, tableName, firstRow, server));
113 assertFalse(test(con, tableName, lastRow, server));
114
115
116
117 split.openDaughters(server, null, regions.getFirst(), regions.getSecond());
118 assertFalse(test(con, tableName, firstRow, server));
119 assertFalse(test(con, tableName, lastRow, server));
120
121
122
123
124 server.postOpenDeployTasks(regions.getSecond(), server.getCatalogTracker());
125
126 server.addToOnlineRegions(regions.getSecond());
127
128
129 assertFalse(test(con, tableName, firstRow, server));
130
131 assertTrue(test(con, tableName, lastRow, server));
132
133
134 server.postOpenDeployTasks(regions.getFirst(), server.getCatalogTracker());
135
136 server.addToOnlineRegions(regions.getFirst());
137 assertTrue(test(con, tableName, firstRow, server));
138 assertTrue(test(con, tableName, lastRow, server));
139
140
141 split.transitionZKNode(server, server, regions.getFirst(),
142 regions.getSecond());
143 assertTrue(test(con, tableName, firstRow, server));
144 assertTrue(test(con, tableName, lastRow, server));
145 }
146
147
148
149
150
151 private boolean test(HConnection con, TableName tableName, byte[] row,
152 HRegionServer server) {
153
154 try {
155 byte[] regionName = con.relocateRegion(tableName, row).getRegionInfo()
156 .getRegionName();
157
158 ProtobufUtil.get(server, regionName, new Get(row));
159 ScanRequest scanRequest = RequestConverter.buildScanRequest(
160 regionName, new Scan(row), 1, true);
161 try {
162 server.scan(new PayloadCarryingRpcController(), scanRequest);
163 } catch (ServiceException se) {
164 throw ProtobufUtil.getRemoteException(se);
165 }
166 } catch (IOException x) {
167 return false;
168 }
169 return true;
170 }
171
172
173
174
175 @Test
176 public void testFromClientSideWhileSplitting() throws Throwable {
177 LOG.info("Starting testFromClientSideWhileSplitting");
178 final TableName TABLENAME =
179 TableName.valueOf("testFromClientSideWhileSplitting");
180 final byte[] FAMILY = Bytes.toBytes("family");
181
182
183
184 HTable table = TEST_UTIL.createTable(TABLENAME, FAMILY);
185
186 Stoppable stopper = new StoppableImplementation();
187 RegionSplitter regionSplitter = new RegionSplitter(table);
188 RegionChecker regionChecker = new RegionChecker(conf, stopper, TABLENAME);
189
190 regionChecker.start();
191 regionSplitter.start();
192
193
194 regionSplitter.join();
195 stopper.stop(null);
196
197 if (regionChecker.ex != null) {
198 throw regionChecker.ex;
199 }
200
201 if (regionSplitter.ex != null) {
202 throw regionSplitter.ex;
203 }
204
205
206 regionChecker.verify();
207 }
208
209 static class RegionSplitter extends Thread {
210 Throwable ex;
211 HTable table;
212 TableName tableName;
213 byte[] family;
214 HBaseAdmin admin;
215 HRegionServer rs;
216
217 RegionSplitter(HTable table) throws IOException {
218 this.table = table;
219 this.tableName = table.getName();
220 this.family = table.getTableDescriptor().getFamiliesKeys().iterator().next();
221 admin = TEST_UTIL.getHBaseAdmin();
222 rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
223 }
224
225 public void run() {
226 try {
227 Random random = new Random();
228 for (int i=0; i< 5; i++) {
229 NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(conf, null,
230 tableName, false);
231 if (regions.size() == 0) {
232 continue;
233 }
234 int regionIndex = random.nextInt(regions.size());
235
236
237 HRegionInfo region = Iterators.get(regions.keySet().iterator(), regionIndex);
238
239
240 int start = 0, end = Integer.MAX_VALUE;
241 if (region.getStartKey().length > 0) {
242 start = Bytes.toInt(region.getStartKey());
243 }
244 if (region.getEndKey().length > 0) {
245 end = Bytes.toInt(region.getEndKey());
246 }
247 int mid = start + ((end - start) / 2);
248 byte[] splitPoint = Bytes.toBytes(mid);
249
250
251 addData(start);
252 addData(mid);
253
254 flushAndBlockUntilDone(admin, rs, region.getRegionName());
255 compactAndBlockUntilDone(admin, rs, region.getRegionName());
256
257 log("Initiating region split for:" + region.getRegionNameAsString());
258 try {
259 admin.split(region.getRegionName(), splitPoint);
260
261 blockUntilRegionSplit(conf, 50000, region.getRegionName(), true);
262
263 } catch (NotServingRegionException ex) {
264
265 }
266 }
267 } catch (Throwable ex) {
268 this.ex = ex;
269 }
270 }
271
272 void addData(int start) throws IOException {
273 for (int i=start; i< start + 100; i++) {
274 Put put = new Put(Bytes.toBytes(i));
275
276 put.add(family, family, Bytes.toBytes(i));
277 table.put(put);
278 }
279 table.flushCommits();
280 }
281 }
282
283
284
285
286 static class RegionChecker extends Chore {
287 Configuration conf;
288 TableName tableName;
289 Throwable ex;
290
291 RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) {
292 super("RegionChecker", 10, stopper);
293 this.conf = conf;
294 this.tableName = tableName;
295 this.setDaemon(true);
296 }
297
298
299 void verifyRegionsUsingMetaScanner() throws Exception {
300
301
302 NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(conf, null,
303 tableName, false);
304 verifyTableRegions(regions.keySet());
305
306
307 List<HRegionInfo> regionList = MetaScanner.listAllRegions(conf, false);
308 verifyTableRegions(Sets.newTreeSet(regionList));
309 }
310
311
312 void verifyRegionsUsingHTable() throws IOException {
313 HTable table = null;
314 try {
315
316 table = new HTable(conf, tableName);
317 Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
318 verifyStartEndKeys(keys);
319
320
321 Map<HRegionInfo, ServerName> regions = table.getRegionLocations();
322 verifyTableRegions(regions.keySet());
323 } finally {
324 IOUtils.closeQuietly(table);
325 }
326 }
327
328 void verify() throws Exception {
329 verifyRegionsUsingMetaScanner();
330 verifyRegionsUsingHTable();
331 }
332
333 void verifyTableRegions(Set<HRegionInfo> regions) {
334 log("Verifying " + regions.size() + " regions");
335
336 byte[][] startKeys = new byte[regions.size()][];
337 byte[][] endKeys = new byte[regions.size()][];
338
339 int i=0;
340 for (HRegionInfo region : regions) {
341 startKeys[i] = region.getStartKey();
342 endKeys[i] = region.getEndKey();
343 i++;
344 }
345
346 Pair<byte[][], byte[][]> keys = new Pair<byte[][], byte[][]>(startKeys, endKeys);
347 verifyStartEndKeys(keys);
348 }
349
350 void verifyStartEndKeys(Pair<byte[][], byte[][]> keys) {
351 byte[][] startKeys = keys.getFirst();
352 byte[][] endKeys = keys.getSecond();
353 assertEquals(startKeys.length, endKeys.length);
354 assertTrue("Found 0 regions for the table", startKeys.length > 0);
355
356 assertArrayEquals("Start key for the first region is not byte[0]",
357 HConstants.EMPTY_START_ROW, startKeys[0]);
358 byte[] prevEndKey = HConstants.EMPTY_START_ROW;
359
360
361 for (int i=0; i<startKeys.length; i++) {
362 assertArrayEquals(
363 "Hole in hbase:meta is detected. prevEndKey=" + Bytes.toStringBinary(prevEndKey)
364 + " ,regionStartKey=" + Bytes.toStringBinary(startKeys[i]), prevEndKey,
365 startKeys[i]);
366 prevEndKey = endKeys[i];
367 }
368 assertArrayEquals("End key for the last region is not byte[0]", HConstants.EMPTY_END_ROW,
369 endKeys[endKeys.length - 1]);
370 }
371
372 @Override
373 protected void chore() {
374 try {
375 verify();
376 } catch (Throwable ex) {
377 this.ex = ex;
378 stopper.stop("caught exception");
379 }
380 }
381 }
382
383 public static void log(String msg) {
384 LOG.info(msg);
385 }
386
387
388
389 public static void flushAndBlockUntilDone(HBaseAdmin admin, HRegionServer rs, byte[] regionName)
390 throws IOException, InterruptedException {
391 log("flushing region: " + Bytes.toStringBinary(regionName));
392 admin.flush(regionName);
393 log("blocking until flush is complete: " + Bytes.toStringBinary(regionName));
394 Threads.sleepWithoutInterrupt(500);
395 while (rs.cacheFlusher.getFlushQueueSize() > 0) {
396 Threads.sleep(50);
397 }
398 }
399
400 public static void compactAndBlockUntilDone(HBaseAdmin admin, HRegionServer rs, byte[] regionName)
401 throws IOException, InterruptedException {
402 log("Compacting region: " + Bytes.toStringBinary(regionName));
403 admin.majorCompact(regionName);
404 log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName));
405 Threads.sleepWithoutInterrupt(500);
406 while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
407 Threads.sleep(50);
408 }
409 }
410
411
412 public static void blockUntilRegionSplit(Configuration conf, long timeout,
413 final byte[] regionName, boolean waitForDaughters)
414 throws IOException, InterruptedException {
415 long start = System.currentTimeMillis();
416 log("blocking until region is split:" + Bytes.toStringBinary(regionName));
417 HRegionInfo daughterA = null, daughterB = null;
418 HTable metaTable = new HTable(conf, TableName.META_TABLE_NAME);
419
420 try {
421 while (System.currentTimeMillis() - start < timeout) {
422 Result result = getRegionRow(metaTable, regionName);
423 if (result == null) {
424 break;
425 }
426
427 HRegionInfo region = HRegionInfo.getHRegionInfo(result);
428 if(region.isSplitParent()) {
429 log("found parent region: " + region.toString());
430 PairOfSameType<HRegionInfo> pair = HRegionInfo.getDaughterRegions(result);
431 daughterA = pair.getFirst();
432 daughterB = pair.getSecond();
433 break;
434 }
435 Threads.sleep(100);
436 }
437
438
439 if (waitForDaughters) {
440 long rem = timeout - (System.currentTimeMillis() - start);
441 blockUntilRegionIsInMeta(metaTable, rem, daughterA);
442
443 rem = timeout - (System.currentTimeMillis() - start);
444 blockUntilRegionIsInMeta(metaTable, rem, daughterB);
445
446 rem = timeout - (System.currentTimeMillis() - start);
447 blockUntilRegionIsOpened(conf, rem, daughterA);
448
449 rem = timeout - (System.currentTimeMillis() - start);
450 blockUntilRegionIsOpened(conf, rem, daughterB);
451 }
452 } finally {
453 IOUtils.closeQuietly(metaTable);
454 }
455 }
456
457 public static Result getRegionRow(HTable metaTable, byte[] regionName) throws IOException {
458 Get get = new Get(regionName);
459 return metaTable.get(get);
460 }
461
462 public static void blockUntilRegionIsInMeta(HTable metaTable, long timeout, HRegionInfo hri)
463 throws IOException, InterruptedException {
464 log("blocking until region is in META: " + hri.getRegionNameAsString());
465 long start = System.currentTimeMillis();
466 while (System.currentTimeMillis() - start < timeout) {
467 Result result = getRegionRow(metaTable, hri.getRegionName());
468 if (result != null) {
469 HRegionInfo info = HRegionInfo.getHRegionInfo(result);
470 if (info != null && !info.isOffline()) {
471 log("found region in META: " + hri.getRegionNameAsString());
472 break;
473 }
474 }
475 Threads.sleep(10);
476 }
477 }
478
479 public static void blockUntilRegionIsOpened(Configuration conf, long timeout, HRegionInfo hri)
480 throws IOException, InterruptedException {
481 log("blocking until region is opened for reading:" + hri.getRegionNameAsString());
482 long start = System.currentTimeMillis();
483 HTable table = new HTable(conf, hri.getTable());
484
485 try {
486 byte [] row = hri.getStartKey();
487
488 if (row == null || row.length <= 0) row = new byte [] {'0'};
489 Get get = new Get(row);
490 while (System.currentTimeMillis() - start < timeout) {
491 try {
492 table.get(get);
493 break;
494 } catch(IOException ex) {
495
496 }
497 Threads.sleep(10);
498 }
499 } finally {
500 IOUtils.closeQuietly(table);
501 }
502 }
503 }
504