View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import static org.junit.Assert.assertArrayEquals;
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.NavigableMap;
29  import java.util.Random;
30  import java.util.Set;
31  import java.util.TreeSet;
32  
33  import org.apache.commons.io.IOUtils;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.ChoreService;
38  import org.apache.hadoop.hbase.HBaseTestingUtility;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.HRegionInfo;
41  import org.apache.hadoop.hbase.HRegionLocation;
42  import org.apache.hadoop.hbase.MetaTableAccessor;
43  import org.apache.hadoop.hbase.NotServingRegionException;
44  import org.apache.hadoop.hbase.ScheduledChore;
45  import org.apache.hadoop.hbase.ServerName;
46  import org.apache.hadoop.hbase.Stoppable;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.Admin;
49  import org.apache.hadoop.hbase.client.Connection;
50  import org.apache.hadoop.hbase.client.ConnectionFactory;
51  import org.apache.hadoop.hbase.client.Get;
52  import org.apache.hadoop.hbase.client.HTable;
53  import org.apache.hadoop.hbase.client.MetaScanner;
54  import org.apache.hadoop.hbase.client.Put;
55  import org.apache.hadoop.hbase.client.Result;
56  import org.apache.hadoop.hbase.client.Scan;
57  import org.apache.hadoop.hbase.client.Table;
58  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
59  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
60  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
61  import org.apache.hadoop.hbase.protobuf.RequestConverter;
62  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
63  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
64  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
65  import org.apache.hadoop.hbase.testclassification.LargeTests;
66  import org.apache.hadoop.hbase.util.Bytes;
67  import org.apache.hadoop.hbase.util.Pair;
68  import org.apache.hadoop.hbase.util.PairOfSameType;
69  import org.apache.hadoop.hbase.util.StoppableImplementation;
70  import org.apache.hadoop.hbase.util.Threads;
71  import org.junit.AfterClass;
72  import org.junit.BeforeClass;
73  import org.junit.Test;
74  import org.junit.experimental.categories.Category;
75  
76  import com.google.common.collect.Iterators;
77  import com.google.common.collect.Sets;
78  import com.google.protobuf.ServiceException;
79  
80  @Category(LargeTests.class)
81  public class TestEndToEndSplitTransaction {
82    private static final Log LOG = LogFactory.getLog(TestEndToEndSplitTransaction.class);
83    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
84    private static final Configuration CONF = TEST_UTIL.getConfiguration();
85  
86    @BeforeClass
87    public static void beforeAllTests() throws Exception {
88      TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
89      TEST_UTIL.startMiniCluster();
90    }
91  
92    @AfterClass
93    public static void afterAllTests() throws Exception {
94      TEST_UTIL.shutdownMiniCluster();
95    }
96  
97    @Test
98    public void testMasterOpsWhileSplitting() throws Exception {
99      TableName tableName = TableName.valueOf("TestSplit");
100     byte[] familyName = Bytes.toBytes("fam");
101     try (HTable ht = TEST_UTIL.createTable(tableName, familyName)) {
102       TEST_UTIL.loadTable(ht, familyName, false);
103     }
104     HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
105     byte[] firstRow = Bytes.toBytes("aaa");
106     byte[] splitRow = Bytes.toBytes("lll");
107     byte[] lastRow = Bytes.toBytes("zzz");
108     try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
109       // this will also cache the region
110       byte[] regionName = conn.getRegionLocator(tableName).getRegionLocation(splitRow)
111           .getRegionInfo().getRegionName();
112       Region region = server.getRegion(regionName);
113       SplitTransactionImpl split = new SplitTransactionImpl((HRegion) region, splitRow);
114       split.prepare();
115 
116       // 1. phase I
117       PairOfSameType<Region> regions = split.createDaughters(server, server, null);
118       assertFalse(test(conn, tableName, firstRow, server));
119       assertFalse(test(conn, tableName, lastRow, server));
120 
121       // passing null as services prevents final step
122       // 2, most of phase II
123       split.openDaughters(server, null, regions.getFirst(), regions.getSecond());
124       assertFalse(test(conn, tableName, firstRow, server));
125       assertFalse(test(conn, tableName, lastRow, server));
126 
127       // 3. finish phase II
128       // note that this replicates some code from SplitTransaction
129       // 2nd daughter first
130       if (split.useZKForAssignment) {
131         server.postOpenDeployTasks(regions.getSecond());
132       } else {
133       server.reportRegionStateTransition(
134         RegionServerStatusProtos.RegionStateTransition.TransitionCode.SPLIT,
135         region.getRegionInfo(), regions.getFirst().getRegionInfo(),
136         regions.getSecond().getRegionInfo());
137       }
138 
139       // Add to online regions
140       server.addToOnlineRegions(regions.getSecond());
141       // THIS is the crucial point:
142       // the 2nd daughter was added, so querying before the split key should fail.
143       assertFalse(test(conn, tableName, firstRow, server));
144       // past splitkey is ok.
145       assertTrue(test(conn, tableName, lastRow, server));
146 
147       // Add to online regions
148       server.addToOnlineRegions(regions.getFirst());
149       assertTrue(test(conn, tableName, firstRow, server));
150       assertTrue(test(conn, tableName, lastRow, server));
151 
152       if (split.useZKForAssignment) {
153         // 4. phase III
154         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
155           .getSplitTransactionCoordination().completeSplitTransaction(server, regions.getFirst(),
156             regions.getSecond(), split.std, region);
157       }
158 
159       assertTrue(test(conn, tableName, firstRow, server));
160       assertTrue(test(conn, tableName, lastRow, server));
161     }
162   }
163 
164   /**
165    * attempt to locate the region and perform a get and scan
166    * @return True if successful, False otherwise.
167    */
168   private boolean test(Connection conn, TableName tableName, byte[] row,
169       HRegionServer server) {
170     // not using HTable to avoid timeouts and retries
171     try {
172       byte[] regionName = conn.getRegionLocator(tableName).getRegionLocation(row, true)
173           .getRegionInfo().getRegionName();
174       // get and scan should now succeed without exception
175       ClientProtos.GetRequest request =
176           RequestConverter.buildGetRequest(regionName, new Get(row));
177       server.getRSRpcServices().get(null, request);
178       ScanRequest scanRequest = RequestConverter.buildScanRequest(
179         regionName, new Scan(row), 1, true);
180       try {
181         server.getRSRpcServices().scan(
182           new PayloadCarryingRpcController(), scanRequest);
183       } catch (ServiceException se) {
184         throw ProtobufUtil.getRemoteException(se);
185       }
186     } catch (IOException e) {
187       return false;
188     } catch (ServiceException e) {
189       return false;
190     }
191     return true;
192   }
193 
194   /**
195    * Tests that the client sees meta table changes as atomic during splits
196    */
197   @Test
198   public void testFromClientSideWhileSplitting() throws Throwable {
199     LOG.info("Starting testFromClientSideWhileSplitting");
200     final TableName TABLENAME =
201         TableName.valueOf("testFromClientSideWhileSplitting");
202     final byte[] FAMILY = Bytes.toBytes("family");
203 
204     //SplitTransaction will update the meta table by offlining the parent region, and adding info
205     //for daughters.
206     Table table = TEST_UTIL.createTable(TABLENAME, FAMILY);
207 
208     Stoppable stopper = new StoppableImplementation();
209     RegionSplitter regionSplitter = new RegionSplitter(table);
210     RegionChecker regionChecker = new RegionChecker(CONF, stopper, TABLENAME);
211     final ChoreService choreService = new ChoreService("TEST_SERVER");
212 
213     choreService.scheduleChore(regionChecker);
214     regionSplitter.start();
215 
216     //wait until the splitter is finished
217     regionSplitter.join();
218     stopper.stop(null);
219 
220     if (regionChecker.ex != null) {
221       throw regionChecker.ex;
222     }
223 
224     if (regionSplitter.ex != null) {
225       throw regionSplitter.ex;
226     }
227 
228     //one final check
229     regionChecker.verify();
230   }
231 
232   static class RegionSplitter extends Thread {
233     final Connection connection;
234     Throwable ex;
235     Table table;
236     TableName tableName;
237     byte[] family;
238     Admin admin;
239     HRegionServer rs;
240 
241     RegionSplitter(Table table) throws IOException {
242       this.table = table;
243       this.tableName = table.getName();
244       this.family = table.getTableDescriptor().getFamiliesKeys().iterator().next();
245       admin = TEST_UTIL.getHBaseAdmin();
246       rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
247       connection = TEST_UTIL.getConnection();
248     }
249 
250     @Override
251     public void run() {
252       try {
253         Random random = new Random();
254         for (int i= 0; i< 5; i++) {
255           NavigableMap<HRegionInfo, ServerName> regions =
256               MetaScanner.allTableRegions(connection, tableName);
257           if (regions.size() == 0) {
258             continue;
259           }
260           int regionIndex = random.nextInt(regions.size());
261 
262           //pick a random region and split it into two
263           HRegionInfo region = Iterators.get(regions.keySet().iterator(), regionIndex);
264 
265           //pick the mid split point
266           int start = 0, end = Integer.MAX_VALUE;
267           if (region.getStartKey().length > 0) {
268             start = Bytes.toInt(region.getStartKey());
269           }
270           if (region.getEndKey().length > 0) {
271             end = Bytes.toInt(region.getEndKey());
272           }
273           int mid = start + ((end - start) / 2);
274           byte[] splitPoint = Bytes.toBytes(mid);
275 
276           //put some rows to the regions
277           addData(start);
278           addData(mid);
279 
280           flushAndBlockUntilDone(admin, rs, region.getRegionName());
281           compactAndBlockUntilDone(admin, rs, region.getRegionName());
282 
283           log("Initiating region split for:" + region.getRegionNameAsString());
284           try {
285             admin.splitRegion(region.getRegionName(), splitPoint);
286             //wait until the split is complete
287             blockUntilRegionSplit(CONF, 50000, region.getRegionName(), true);
288 
289           } catch (NotServingRegionException ex) {
290             //ignore
291           }
292         }
293       } catch (Throwable ex) {
294         this.ex = ex;
295       }
296     }
297 
298     void addData(int start) throws IOException {
299       List<Put> puts = new ArrayList<>();
300       for (int i=start; i< start + 100; i++) {
301         Put put = new Put(Bytes.toBytes(i));
302         put.addColumn(family, family, Bytes.toBytes(i));
303         puts.add(put);
304       }
305       table.put(puts);
306     }
307   }
308 
309   /**
310    * Checks regions using MetaScanner, MetaTableAccessor and HTable methods
311    */
312   static class RegionChecker extends ScheduledChore {
313     Connection connection;
314     Configuration conf;
315     TableName tableName;
316     Throwable ex;
317 
318     RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) throws IOException {
319       super("RegionChecker", stopper, 10);
320       this.conf = conf;
321       this.tableName = tableName;
322 
323       this.connection = ConnectionFactory.createConnection(conf);
324     }
325 
326     /** verify region boundaries obtained from MetaScanner */
327     void verifyRegionsUsingMetaScanner() throws Exception {
328 
329       //MetaScanner.allTableRegions()
330       NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(connection,
331           tableName);
332       verifyTableRegions(regions.keySet());
333 
334       //MetaScanner.listAllRegions()
335       List<HRegionInfo> regionList = MetaScanner.listAllRegions(conf, connection, false);
336       verifyTableRegions(Sets.newTreeSet(regionList));
337     }
338 
339     /** verify region boundaries obtained from HTable.getStartEndKeys() */
340     void verifyRegionsUsingHTable() throws IOException {
341       HTable table = null;
342       try {
343         table = (HTable) connection.getTable(tableName);
344         Pair<byte[][], byte[][]> keys = table.getRegionLocator().getStartEndKeys();
345         verifyStartEndKeys(keys);
346 
347         //HTable.getRegionsInfo()
348         Set<HRegionInfo> regions = new TreeSet<HRegionInfo>();
349         for (HRegionLocation loc : table.getRegionLocator().getAllRegionLocations()) {
350           regions.add(loc.getRegionInfo());
351         }
352         verifyTableRegions(regions);
353       } finally {
354         IOUtils.closeQuietly(table);
355       }
356     }
357 
358     void verify() throws Exception {
359       verifyRegionsUsingMetaScanner();
360       verifyRegionsUsingHTable();
361     }
362 
363     void verifyTableRegions(Set<HRegionInfo> regions) {
364       log("Verifying " + regions.size() + " regions: " + regions);
365 
366       byte[][] startKeys = new byte[regions.size()][];
367       byte[][] endKeys = new byte[regions.size()][];
368 
369       int i=0;
370       for (HRegionInfo region : regions) {
371         startKeys[i] = region.getStartKey();
372         endKeys[i] = region.getEndKey();
373         i++;
374       }
375 
376       Pair<byte[][], byte[][]> keys = new Pair<byte[][], byte[][]>(startKeys, endKeys);
377       verifyStartEndKeys(keys);
378     }
379 
380     void verifyStartEndKeys(Pair<byte[][], byte[][]> keys) {
381       byte[][] startKeys = keys.getFirst();
382       byte[][] endKeys = keys.getSecond();
383       assertEquals(startKeys.length, endKeys.length);
384       assertTrue("Found 0 regions for the table", startKeys.length > 0);
385 
386       assertArrayEquals("Start key for the first region is not byte[0]",
387           HConstants.EMPTY_START_ROW, startKeys[0]);
388       byte[] prevEndKey = HConstants.EMPTY_START_ROW;
389 
390       // ensure that we do not have any gaps
391       for (int i=0; i<startKeys.length; i++) {
392         assertArrayEquals(
393             "Hole in hbase:meta is detected. prevEndKey=" + Bytes.toStringBinary(prevEndKey)
394                 + " ,regionStartKey=" + Bytes.toStringBinary(startKeys[i]), prevEndKey,
395             startKeys[i]);
396         prevEndKey = endKeys[i];
397       }
398       assertArrayEquals("End key for the last region is not byte[0]", HConstants.EMPTY_END_ROW,
399           endKeys[endKeys.length - 1]);
400     }
401 
402     @Override
403     protected void chore() {
404       try {
405         verify();
406       } catch (Throwable ex) {
407         this.ex = ex;
408         getStopper().stop("caught exception");
409       }
410     }
411   }
412 
413   public static void log(String msg) {
414     LOG.info(msg);
415   }
416 
417   /* some utility methods for split tests */
418 
419   public static void flushAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
420       throws IOException, InterruptedException {
421     log("flushing region: " + Bytes.toStringBinary(regionName));
422     admin.flushRegion(regionName);
423     log("blocking until flush is complete: " + Bytes.toStringBinary(regionName));
424     Threads.sleepWithoutInterrupt(500);
425     while (rs.getOnlineRegion(regionName).getMemstoreSize() > 0) {
426       Threads.sleep(50);
427     }
428   }
429 
430   public static void compactAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
431       throws IOException, InterruptedException {
432     log("Compacting region: " + Bytes.toStringBinary(regionName));
433     admin.majorCompactRegion(regionName);
434     log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName));
435     Threads.sleepWithoutInterrupt(500);
436     outer: for (;;) {
437       for (Store store : rs.getOnlineRegion(regionName).getStores()) {
438         if (store.getStorefilesCount() > 1) {
439           Threads.sleep(50);
440           continue outer;
441         }
442       }
443       break;
444     }
445   }
446 
447   /** Blocks until the region split is complete in hbase:meta and region server opens the daughters */
448   public static void blockUntilRegionSplit(Configuration conf, long timeout,
449       final byte[] regionName, boolean waitForDaughters)
450       throws IOException, InterruptedException {
451     long start = System.currentTimeMillis();
452     log("blocking until region is split:" +  Bytes.toStringBinary(regionName));
453     HRegionInfo daughterA = null, daughterB = null;
454     try (Connection conn = ConnectionFactory.createConnection(conf);
455         Table metaTable = conn.getTable(TableName.META_TABLE_NAME)) {
456       Result result = null;
457       HRegionInfo region = null;
458       while ((System.currentTimeMillis() - start) < timeout) {
459         result = metaTable.get(new Get(regionName));
460         if (result == null) {
461           break;
462         }
463 
464         region = MetaTableAccessor.getHRegionInfo(result);
465         if (region.isSplitParent()) {
466           log("found parent region: " + region.toString());
467           PairOfSameType<HRegionInfo> pair = MetaTableAccessor.getDaughterRegions(result);
468           daughterA = pair.getFirst();
469           daughterB = pair.getSecond();
470           break;
471         }
472         Threads.sleep(100);
473       }
474       if (daughterA == null || daughterB == null) {
475         throw new IOException("Failed to get daughters, daughterA=" + daughterA + ", daughterB=" +
476           daughterB + ", timeout=" + timeout + ", result=" + result + ", regionName=" + regionName +
477           ", region=" + region);
478       }
479 
480       //if we are here, this means the region split is complete or timed out
481       if (waitForDaughters) {
482         long rem = timeout - (System.currentTimeMillis() - start);
483         blockUntilRegionIsInMeta(conn, rem, daughterA);
484 
485         rem = timeout - (System.currentTimeMillis() - start);
486         blockUntilRegionIsInMeta(conn, rem, daughterB);
487 
488         rem = timeout - (System.currentTimeMillis() - start);
489         blockUntilRegionIsOpened(conf, rem, daughterA);
490 
491         rem = timeout - (System.currentTimeMillis() - start);
492         blockUntilRegionIsOpened(conf, rem, daughterB);
493       }
494     }
495   }
496 
497   public static void blockUntilRegionIsInMeta(Connection conn, long timeout, HRegionInfo hri)
498       throws IOException, InterruptedException {
499     log("blocking until region is in META: " + hri.getRegionNameAsString());
500     long start = System.currentTimeMillis();
501     while (System.currentTimeMillis() - start < timeout) {
502       HRegionLocation loc = MetaTableAccessor.getRegionLocation(conn, hri);
503       if (loc != null && !loc.getRegionInfo().isOffline()) {
504         log("found region in META: " + hri.getRegionNameAsString());
505         break;
506       }
507       Threads.sleep(10);
508     }
509   }
510 
511   public static void blockUntilRegionIsOpened(Configuration conf, long timeout, HRegionInfo hri)
512       throws IOException, InterruptedException {
513     log("blocking until region is opened for reading:" + hri.getRegionNameAsString());
514     long start = System.currentTimeMillis();
515     try (Connection conn = ConnectionFactory.createConnection(conf);
516         Table table = conn.getTable(hri.getTable())) {
517       byte[] row = hri.getStartKey();
518       // Check for null/empty row. If we find one, use a key that is likely to be in first region.
519       if (row == null || row.length <= 0) row = new byte[] { '0' };
520       Get get = new Get(row);
521       while (System.currentTimeMillis() - start < timeout) {
522         try {
523           table.get(get);
524           break;
525         } catch (IOException ex) {
526           // wait some more
527         }
528         Threads.sleep(10);
529       }
530     }
531   }
532 }
533