View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
22  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
23  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
24  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired;
25  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
26  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
27  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
28  import static org.junit.Assert.*;
29  import static org.junit.Assert.assertEquals;
30  import static org.junit.Assert.assertFalse;
31  import static org.junit.Assert.assertTrue;
32  import static org.junit.Assert.fail;
33  
34  import java.io.IOException;
35  import java.util.ArrayList;
36  import java.util.Arrays;
37  import java.util.HashSet;
38  import java.util.Iterator;
39  import java.util.LinkedList;
40  import java.util.List;
41  import java.util.NavigableSet;
42  import java.util.Set;
43  import java.util.TreeSet;
44  import java.util.concurrent.ExecutorService;
45  import java.util.concurrent.Executors;
46  import java.util.concurrent.Future;
47  import java.util.concurrent.TimeUnit;
48  import java.util.concurrent.TimeoutException;
49  import java.util.concurrent.atomic.AtomicLong;
50  
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.fs.FSDataOutputStream;
55  import org.apache.hadoop.fs.FileStatus;
56  import org.apache.hadoop.fs.FileSystem;
57  import org.apache.hadoop.fs.Path;
58  import org.apache.hadoop.hbase.HColumnDescriptor;
59  import org.apache.hadoop.hbase.TableName;
60  import org.apache.hadoop.hbase.HBaseConfiguration;
61  import org.apache.hadoop.hbase.HBaseTestingUtility;
62  import org.apache.hadoop.hbase.HConstants;
63  import org.apache.hadoop.hbase.HRegionInfo;
64  import org.apache.hadoop.hbase.HTableDescriptor;
65  import org.apache.hadoop.hbase.KeyValue;
66  import org.apache.hadoop.hbase.LargeTests;
67  import org.apache.hadoop.hbase.MiniHBaseCluster;
68  import org.apache.hadoop.hbase.NamespaceDescriptor;
69  import org.apache.hadoop.hbase.ServerName;
70  import org.apache.hadoop.hbase.SplitLogCounters;
71  import org.apache.hadoop.hbase.Waiter;
72  import org.apache.hadoop.hbase.client.Delete;
73  import org.apache.hadoop.hbase.client.Get;
74  import org.apache.hadoop.hbase.client.HConnectionManager;
75  import org.apache.hadoop.hbase.client.HTable;
76  import org.apache.hadoop.hbase.client.Increment;
77  import org.apache.hadoop.hbase.client.NonceGenerator;
78  import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
79  import org.apache.hadoop.hbase.client.Put;
80  import org.apache.hadoop.hbase.client.Result;
81  import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
82  import org.apache.hadoop.hbase.exceptions.OperationConflictException;
83  import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
84  import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
85  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
86  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
87  import org.apache.hadoop.hbase.regionserver.HRegion;
88  import org.apache.hadoop.hbase.regionserver.HRegionServer;
89  import org.apache.hadoop.hbase.regionserver.wal.HLog;
90  import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
91  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
92  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
93  import org.apache.hadoop.hbase.util.Bytes;
94  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
95  import org.apache.hadoop.hbase.util.FSUtils;
96  import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
97  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
98  import org.apache.hadoop.hbase.util.Threads;
99  import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
100 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
101 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
102 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
103 import org.apache.hadoop.hdfs.MiniDFSCluster;
104 import org.apache.log4j.Level;
105 import org.apache.log4j.Logger;
106 import org.apache.zookeeper.KeeperException;
107 import org.junit.After;
108 import org.junit.AfterClass;
109 import org.junit.Assert;
110 import org.junit.Before;
111 import org.junit.BeforeClass;
112 import org.junit.Test;
113 import org.junit.experimental.categories.Category;
114 
115 @Category(LargeTests.class)
116 public class TestDistributedLogSplitting {
117   private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
118   static {
119     Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
120 
121     // test ThreeRSAbort fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on. this
122     // turns it off for this test.  TODO: Figure out why scr breaks recovery. 
123     System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
124 
125   }
126 
127   // Start a cluster with 2 masters and 6 regionservers
128   static final int NUM_MASTERS = 2;
129   static final int NUM_RS = 6;
130 
131   MiniHBaseCluster cluster;
132   HMaster master;
133   Configuration conf;
134   static Configuration originalConf;
135   static HBaseTestingUtility TEST_UTIL;
136   static MiniDFSCluster dfsCluster;
137   static MiniZooKeeperCluster zkCluster;
138 
139   @BeforeClass
140   public static void setup() throws Exception {
141     TEST_UTIL = new HBaseTestingUtility(HBaseConfiguration.create());
142     dfsCluster = TEST_UTIL.startMiniDFSCluster(1);
143     zkCluster = TEST_UTIL.startMiniZKCluster();
144     originalConf = TEST_UTIL.getConfiguration();
145   }
146 
147   @AfterClass
148   public static void tearDown() throws IOException {
149     TEST_UTIL.shutdownMiniZKCluster();
150     TEST_UTIL.shutdownMiniDFSCluster();
151   }
152 
153   private void startCluster(int num_rs) throws Exception {
154     SplitLogCounters.resetCounters();
155     LOG.info("Starting cluster");
156     conf.getLong("hbase.splitlog.max.resubmit", 0);
157     // Make the failure test faster
158     conf.setInt("zookeeper.recovery.retry", 0);
159     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
160     conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
161     conf.setInt("hbase.regionserver.wal.max.splitters", 3);
162     conf.setInt("hfile.format.version", 3);
163     TEST_UTIL = new HBaseTestingUtility(conf);
164     TEST_UTIL.setDFSCluster(dfsCluster);
165     TEST_UTIL.setZkCluster(zkCluster);
166     TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, num_rs);
167     cluster = TEST_UTIL.getHBaseCluster();
168     LOG.info("Waiting for active/ready master");
169     cluster.waitForActiveAndReadyMaster();
170     master = cluster.getMaster();
171     while (cluster.getLiveRegionServerThreads().size() < num_rs) {
172       Threads.sleep(1);
173     }
174   }
175 
176   @Before
177   public void before() throws Exception {
178     // refresh configuration
179     conf = HBaseConfiguration.create(originalConf);
180   }
181   
182   @After
183   public void after() throws Exception {
184     try {
185       if (TEST_UTIL.getHBaseCluster() != null) {
186         for (MasterThread mt : TEST_UTIL.getHBaseCluster().getLiveMasterThreads()) {
187           mt.getMaster().abort("closing...", null);
188         }
189       }
190       TEST_UTIL.shutdownMiniHBaseCluster();
191     } finally {
192       TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true);
193       ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
194     }
195   }
196   
197   @Test (timeout=300000)
198   public void testRecoveredEdits() throws Exception {
199     LOG.info("testRecoveredEdits");
200     conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
201     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
202     startCluster(NUM_RS);
203 
204     final int NUM_LOG_LINES = 1000;
205     final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
206     // turn off load balancing to prevent regions from moving around otherwise
207     // they will consume recovered.edits
208     master.balanceSwitch(false);
209     FileSystem fs = master.getMasterFileSystem().getFileSystem();
210 
211     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
212 
213     Path rootdir = FSUtils.getRootDir(conf);
214 
215     installTable(new ZooKeeperWatcher(conf, "table-creation", null),
216         "table", "family", 40);
217     TableName table = TableName.valueOf("table");
218     List<HRegionInfo> regions = null;
219     HRegionServer hrs = null;
220     for (int i = 0; i < NUM_RS; i++) {
221       boolean foundRs = false;
222       hrs = rsts.get(i).getRegionServer();
223       regions = ProtobufUtil.getOnlineRegions(hrs);
224       for (HRegionInfo region : regions) {
225         if (region.getTable().getNameAsString().equalsIgnoreCase("table")) {
226           foundRs = true;
227           break;
228         }
229       }
230       if (foundRs) break;
231     }
232     final Path logDir = new Path(rootdir, HLogUtil.getHLogDirectoryName(hrs
233         .getServerName().toString()));
234 
235     LOG.info("#regions = " + regions.size());
236     Iterator<HRegionInfo> it = regions.iterator();
237     while (it.hasNext()) {
238       HRegionInfo region = it.next();
239       if (region.getTable().getNamespaceAsString()
240           .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
241         it.remove();
242       }
243     }
244     makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
245 
246     slm.splitLogDistributed(logDir);
247 
248     int count = 0;
249     for (HRegionInfo hri : regions) {
250 
251       Path tdir = FSUtils.getTableDir(rootdir, table);
252       @SuppressWarnings("deprecation")
253       Path editsdir =
254         HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
255       LOG.debug("checking edits dir " + editsdir);
256       FileStatus[] files = fs.listStatus(editsdir);
257       assertTrue(files.length > 1);
258       for (int i = 0; i < files.length; i++) {
259         int c = countHLog(files[i].getPath(), fs, conf);
260         count += c;
261       }
262       LOG.info(count + " edits in " + files.length + " recovered edits files.");
263     }
264     assertEquals(NUM_LOG_LINES, count);
265   }
266 
267   @Test(timeout = 300000)
268   public void testLogReplayWithNonMetaRSDown() throws Exception {
269     LOG.info("testLogReplayWithNonMetaRSDown");
270     conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
271     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
272     startCluster(NUM_RS);
273     final int NUM_REGIONS_TO_CREATE = 40;
274     final int NUM_LOG_LINES = 1000;
275     // turn off load balancing to prevent regions from moving around otherwise
276     // they will consume recovered.edits
277     master.balanceSwitch(false);
278 
279     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
280     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
281 
282     HRegionServer hrs = findRSToKill(false, "table");
283     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
284     makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
285 
286     // wait for abort completes
287     this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
288     ht.close();
289     zkw.close();
290   }
291 
292   private static class NonceGeneratorWithDups extends PerClientRandomNonceGenerator {
293     private boolean isDups = false;
294     private LinkedList<Long> nonces = new LinkedList<Long>();
295 
296     public void startDups() {
297       isDups = true;
298     }
299 
300     @Override
301     public long newNonce() {
302       long nonce = isDups ? nonces.removeFirst() : super.newNonce();
303       if (!isDups) {
304         nonces.add(nonce);
305       }
306       return nonce;
307     }
308   }
309 
310   @Test(timeout = 300000)
311   public void testNonceRecovery() throws Exception {
312     LOG.info("testNonceRecovery");
313     final String TABLE_NAME = "table";
314     final String FAMILY_NAME = "family";
315     final int NUM_REGIONS_TO_CREATE = 40;
316 
317     conf.setLong("hbase.regionserver.hlog.blocksize", 100*1024);
318     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
319     startCluster(NUM_RS);
320     master.balanceSwitch(false);
321 
322     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
323     HTable ht = installTable(zkw, TABLE_NAME, FAMILY_NAME, NUM_REGIONS_TO_CREATE);
324     NonceGeneratorWithDups ng = new NonceGeneratorWithDups();
325     NonceGenerator oldNg =
326         HConnectionManager.injectNonceGeneratorForTesting(ht.getConnection(), ng);
327 
328     try {
329       List<Increment> reqs = new ArrayList<Increment>();
330       for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
331         HRegionServer hrs = rst.getRegionServer();
332         List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs);
333         for (HRegionInfo hri : hris) {
334           if (TABLE_NAME.equalsIgnoreCase(hri.getTable().getNameAsString())) {
335             byte[] key = hri.getStartKey();
336             if (key == null || key.length == 0) {
337               key = Bytes.copy(hri.getEndKey());
338               --(key[key.length - 1]);
339             }
340             Increment incr = new Increment(key);
341             incr.addColumn(Bytes.toBytes(FAMILY_NAME), Bytes.toBytes("q"), 1);
342             ht.increment(incr);
343             reqs.add(incr);
344           }
345         }
346       }
347 
348       HRegionServer hrs = findRSToKill(false, "table");
349       abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
350       ng.startDups();
351       for (Increment incr : reqs) {
352         try {
353           ht.increment(incr);
354           fail("should have thrown");
355         } catch (OperationConflictException ope) {
356           LOG.debug("Caught as expected: " + ope.getMessage());
357         }
358       }
359     } finally {
360       HConnectionManager.injectNonceGeneratorForTesting(ht.getConnection(), oldNg);
361       ht.close();
362       zkw.close();
363     }
364   }
365 
366   @Test(timeout = 300000)
367   public void testLogReplayWithMetaRSDown() throws Exception {
368     LOG.info("testRecoveredEditsReplayWithMetaRSDown");
369     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
370     startCluster(NUM_RS);
371     final int NUM_REGIONS_TO_CREATE = 40;
372     final int NUM_LOG_LINES = 1000;
373     // turn off load balancing to prevent regions from moving around otherwise
374     // they will consume recovered.edits
375     master.balanceSwitch(false);
376 
377     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
378     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
379 
380     HRegionServer hrs = findRSToKill(true, "table");
381     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
382     makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
383 
384     this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
385     ht.close();
386     zkw.close();
387   }
388 
389   private void abortRSAndVerifyRecovery(HRegionServer hrs, HTable ht, final ZooKeeperWatcher zkw,
390       final int numRegions, final int numofLines) throws Exception {
391 
392     abortRSAndWaitForRecovery(hrs, zkw, numRegions);
393     assertEquals(numofLines, TEST_UTIL.countRows(ht));
394   }
395 
396   private void abortRSAndWaitForRecovery(HRegionServer hrs, final ZooKeeperWatcher zkw,
397       final int numRegions) throws Exception {
398     final MiniHBaseCluster tmpCluster = this.cluster;
399 
400     // abort RS
401     LOG.info("Aborting region server: " + hrs.getServerName());
402     hrs.abort("testing");
403 
404     // wait for abort completes
405     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
406       @Override
407       public boolean evaluate() throws Exception {
408         return (tmpCluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
409       }
410     });
411 
412     // wait for regions come online
413     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
414       @Override
415       public boolean evaluate() throws Exception {
416         return (getAllOnlineRegions(tmpCluster).size() >= (numRegions + 1));
417       }
418     });
419 
420     // wait for all regions are fully recovered
421     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
422       @Override
423       public boolean evaluate() throws Exception {
424         List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
425           zkw.recoveringRegionsZNode, false);
426         return (recoveringRegions != null && recoveringRegions.size() == 0);
427       }
428     });
429   }
430 
431   @Test(timeout = 300000)
432   public void testMasterStartsUpWithLogSplittingWork() throws Exception {
433     LOG.info("testMasterStartsUpWithLogSplittingWork");
434     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
435     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
436     startCluster(NUM_RS);
437 
438     final int NUM_REGIONS_TO_CREATE = 40;
439     final int NUM_LOG_LINES = 1000;
440     // turn off load balancing to prevent regions from moving around otherwise
441     // they will consume recovered.edits
442     master.balanceSwitch(false);
443 
444     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
445     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
446     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
447 
448     HRegionServer hrs = findRSToKill(false, "table");
449     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
450     makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
451 
452     // abort master
453     abortMaster(cluster);
454 
455     // abort RS
456     LOG.info("Aborting region server: " + hrs.getServerName());
457     hrs.abort("testing");
458 
459     // wait for abort completes
460     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
461       @Override
462       public boolean evaluate() throws Exception {
463         return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
464       }
465     });
466 
467     Thread.sleep(2000);
468     LOG.info("Current Open Regions:" + getAllOnlineRegions(cluster).size());
469     
470     startMasterAndWaitUntilLogSplit(cluster);
471     
472     // wait for abort completes
473     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
474       @Override
475       public boolean evaluate() throws Exception {
476         return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
477       }
478     });
479 
480     LOG.info("Current Open Regions After Master Node Starts Up:"
481         + getAllOnlineRegions(cluster).size());
482 
483     assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
484 
485     ht.close();
486     zkw.close();
487   }
488   
489   @Test(timeout = 300000)
490   public void testMasterStartsUpWithLogReplayWork() throws Exception {
491     LOG.info("testMasterStartsUpWithLogReplayWork");
492     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
493     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
494     startCluster(NUM_RS);
495 
496     final int NUM_REGIONS_TO_CREATE = 40;
497     final int NUM_LOG_LINES = 1000;
498     // turn off load balancing to prevent regions from moving around otherwise
499     // they will consume recovered.edits
500     master.balanceSwitch(false);
501 
502     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
503     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
504     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
505 
506     HRegionServer hrs = findRSToKill(false, "table");
507     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
508     makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
509 
510     // abort master
511     abortMaster(cluster);
512 
513     // abort RS
514     LOG.info("Aborting region server: " + hrs.getServerName());
515     hrs.abort("testing");
516 
517     // wait for the RS dies
518     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
519       @Override
520       public boolean evaluate() throws Exception {
521         return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
522       }
523     });
524 
525     Thread.sleep(2000);
526     LOG.info("Current Open Regions:" + getAllOnlineRegions(cluster).size());
527 
528     startMasterAndWaitUntilLogSplit(cluster);
529 
530     // wait for all regions are fully recovered
531     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
532       @Override
533       public boolean evaluate() throws Exception {
534         List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
535           zkw.recoveringRegionsZNode, false);
536         return (recoveringRegions != null && recoveringRegions.size() == 0);
537       }
538     });
539 
540     LOG.info("Current Open Regions After Master Node Starts Up:"
541         + getAllOnlineRegions(cluster).size());
542 
543     assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
544 
545     ht.close();
546     zkw.close();
547   }
548 
549 
550   @Test(timeout = 300000)
551   public void testLogReplayTwoSequentialRSDown() throws Exception {
552     LOG.info("testRecoveredEditsReplayTwoSequentialRSDown");
553     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
554     startCluster(NUM_RS);
555     final int NUM_REGIONS_TO_CREATE = 40;
556     final int NUM_LOG_LINES = 1000;
557     // turn off load balancing to prevent regions from moving around otherwise
558     // they will consume recovered.edits
559     master.balanceSwitch(false);
560 
561     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
562     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
563     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
564 
565     List<HRegionInfo> regions = null;
566     HRegionServer hrs1 = findRSToKill(false, "table");
567     regions = ProtobufUtil.getOnlineRegions(hrs1);
568 
569     makeHLog(hrs1.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
570 
571     // abort RS1
572     LOG.info("Aborting region server: " + hrs1.getServerName());
573     hrs1.abort("testing");
574 
575     // wait for abort completes
576     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
577       @Override
578       public boolean evaluate() throws Exception {
579         return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
580       }
581     });
582 
583     // wait for regions come online
584     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
585       @Override
586       public boolean evaluate() throws Exception {
587         return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
588       }
589     });
590 
591     // sleep a little bit in order to interrupt recovering in the middle
592     Thread.sleep(300);
593     // abort second region server
594     rsts = cluster.getLiveRegionServerThreads();
595     HRegionServer hrs2 = rsts.get(0).getRegionServer();
596     LOG.info("Aborting one more region server: " + hrs2.getServerName());
597     hrs2.abort("testing");
598 
599     // wait for abort completes
600     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
601       @Override
602       public boolean evaluate() throws Exception {
603         return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 2));
604       }
605     });
606 
607     // wait for regions come online
608     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
609       @Override
610       public boolean evaluate() throws Exception {
611         return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
612       }
613     });
614 
615     // wait for all regions are fully recovered
616     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
617       @Override
618       public boolean evaluate() throws Exception {
619         List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
620           zkw.recoveringRegionsZNode, false);
621         return (recoveringRegions != null && recoveringRegions.size() == 0);
622       }
623     });
624 
625     assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
626     ht.close();
627     zkw.close();
628   }
629 
630   @Test(timeout = 300000)
631   public void testMarkRegionsRecoveringInZK() throws Exception {
632     LOG.info("testMarkRegionsRecoveringInZK");
633     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
634     startCluster(NUM_RS);
635     master.balanceSwitch(false);
636     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
637     final ZooKeeperWatcher zkw = master.getZooKeeperWatcher();
638     HTable ht = installTable(zkw, "table", "family", 40);
639     final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
640 
641     Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
642     HRegionInfo region = null;
643     HRegionServer hrs = null;
644     ServerName firstFailedServer = null;
645     ServerName secondFailedServer = null;
646     for (int i = 0; i < NUM_RS; i++) {
647       hrs = rsts.get(i).getRegionServer();
648       List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
649       if (regions.isEmpty()) continue;
650       region = regions.get(0);
651       regionSet.add(region);
652       firstFailedServer = hrs.getServerName();
653       secondFailedServer = rsts.get((i + 1) % NUM_RS).getRegionServer().getServerName();
654       break;
655     }
656 
657     slm.markRegionsRecoveringInZK(firstFailedServer, regionSet);
658     slm.markRegionsRecoveringInZK(secondFailedServer, regionSet);
659 
660     List<String> recoveringRegions = ZKUtil.listChildrenNoWatch(zkw,
661       ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName()));
662 
663     assertEquals(recoveringRegions.size(), 2);
664 
665     // wait for splitLogWorker to mark them up because there is no WAL files recorded in ZK
666     final HRegionServer tmphrs = hrs;
667     TEST_UTIL.waitFor(60000, 1000, new Waiter.Predicate<Exception>() {
668       @Override
669       public boolean evaluate() throws Exception {
670         return (tmphrs.getRecoveringRegions().size() == 0);
671       }
672     });
673     ht.close();
674     zkw.close();
675   }
676 
677   @Test(timeout = 300000)
678   public void testReplayCmd() throws Exception {
679     LOG.info("testReplayCmd");
680     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
681     startCluster(NUM_RS);
682     final int NUM_REGIONS_TO_CREATE = 40;
683     // turn off load balancing to prevent regions from moving around otherwise
684     // they will consume recovered.edits
685     master.balanceSwitch(false);
686 
687     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
688     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
689     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
690 
691     List<HRegionInfo> regions = null;
692     HRegionServer hrs = null;
693     for (int i = 0; i < NUM_RS; i++) {
694       boolean isCarryingMeta = false;
695       hrs = rsts.get(i).getRegionServer();
696       regions = ProtobufUtil.getOnlineRegions(hrs);
697       for (HRegionInfo region : regions) {
698         if (region.isMetaRegion()) {
699           isCarryingMeta = true;
700           break;
701         }
702       }
703       if (isCarryingMeta) {
704         continue;
705       }
706       if (regions.size() > 0) break;
707     }
708 
709     this.prepareData(ht, Bytes.toBytes("family"), Bytes.toBytes("c1"));
710     String originalCheckSum = TEST_UTIL.checksumRows(ht);
711     
712     // abort RA and trigger replay
713     abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
714 
715     assertEquals("Data should remain after reopening of regions", originalCheckSum,
716       TEST_UTIL.checksumRows(ht));
717 
718     ht.close();
719     zkw.close();
720   }
721 
722   @Test(timeout = 300000)
723   public void testLogReplayForDisablingTable() throws Exception {
724     LOG.info("testLogReplayForDisablingTable");
725     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
726     startCluster(NUM_RS);
727     final int NUM_REGIONS_TO_CREATE = 40;
728     final int NUM_LOG_LINES = 1000;
729 
730     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
731     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
732     HTable disablingHT = installTable(zkw, "disableTable", "family", NUM_REGIONS_TO_CREATE);
733     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE, NUM_REGIONS_TO_CREATE);
734 
735     // turn off load balancing to prevent regions from moving around otherwise
736     // they will consume recovered.edits
737     master.balanceSwitch(false);
738 
739     List<HRegionInfo> regions = null;
740     HRegionServer hrs = null;
741     boolean hasRegionsForBothTables = false;
742     String tableName = null;
743     for (int i = 0; i < NUM_RS; i++) {
744       tableName = null;
745       hasRegionsForBothTables = false;
746       boolean isCarryingSystem = false;
747       hrs = rsts.get(i).getRegionServer();
748       regions = ProtobufUtil.getOnlineRegions(hrs);
749       for (HRegionInfo region : regions) {
750         if (region.getTable().isSystemTable()) {
751           isCarryingSystem = true;
752           break;
753         }
754         if (tableName != null &&
755             !tableName.equalsIgnoreCase(region.getTable().getNameAsString())) {
756           // make sure that we find a RS has online regions for both "table" and "disableTable"
757           hasRegionsForBothTables = true;
758           break;
759         } else if (tableName == null) {
760           tableName = region.getTable().getNameAsString();
761         }
762       }
763       if (isCarryingSystem) {
764         continue;
765       }
766       if (hasRegionsForBothTables) {
767         break;
768       }
769     }
770 
771     // make sure we found a good RS
772     Assert.assertTrue(hasRegionsForBothTables);
773 
774     LOG.info("#regions = " + regions.size());
775     Iterator<HRegionInfo> it = regions.iterator();
776     while (it.hasNext()) {
777       HRegionInfo region = it.next();
778       if (region.isMetaTable()) {
779         it.remove();
780       }
781     }
782     makeHLog(hrs.getWAL(), regions, "disableTable", "family", NUM_LOG_LINES, 100, false);
783     makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
784     
785     LOG.info("Disabling table\n");
786     TEST_UTIL.getHBaseAdmin().disableTable(Bytes.toBytes("disableTable"));
787     
788     // abort RS
789     LOG.info("Aborting region server: " + hrs.getServerName());
790     hrs.abort("testing");
791 
792     // wait for abort completes
793     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
794       @Override
795       public boolean evaluate() throws Exception {
796         return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
797       }
798     });
799 
800     // wait for regions come online
801     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
802       @Override
803       public boolean evaluate() throws Exception {
804         return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
805       }
806     });
807 
808     // wait for all regions are fully recovered
809     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
810       @Override
811       public boolean evaluate() throws Exception {
812         List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
813           zkw.recoveringRegionsZNode, false);
814         ServerManager serverManager = master.getServerManager();
815         return (!serverManager.areDeadServersInProgress() &&
816             recoveringRegions != null && recoveringRegions.size() == 0);
817       }
818     });
819 
820     int count = 0;
821     FileSystem fs = master.getMasterFileSystem().getFileSystem();
822     Path rootdir = FSUtils.getRootDir(conf);
823     Path tdir = FSUtils.getTableDir(rootdir, TableName.valueOf("disableTable"));
824     for (HRegionInfo hri : regions) {
825       @SuppressWarnings("deprecation")
826       Path editsdir =
827         HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
828       LOG.debug("checking edits dir " + editsdir);
829       if(!fs.exists(editsdir)) continue;
830       FileStatus[] files = fs.listStatus(editsdir);
831       if(files != null) {
832         for(FileStatus file : files) {
833           int c = countHLog(file.getPath(), fs, conf);
834           count += c;
835           LOG.info(c + " edits in " + file.getPath());
836         }
837       }
838     }
839 
840     LOG.info("Verify edits in recovered.edits files");
841     assertEquals(NUM_LOG_LINES, count);
842     LOG.info("Verify replayed edits");
843     assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
844     
845     // clean up
846     for (HRegionInfo hri : regions) {
847       @SuppressWarnings("deprecation")
848       Path editsdir =
849         HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
850       fs.delete(editsdir, true);
851     }
852     disablingHT.close();
853     ht.close();
854     zkw.close();
855   }
856 
857   @Test(timeout = 300000)
858   public void testDisallowWritesInRecovering() throws Exception {
859     LOG.info("testDisallowWritesInRecovering");
860     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
861     conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
862     conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true);
863     startCluster(NUM_RS);
864     final int NUM_REGIONS_TO_CREATE = 40;
865     // turn off load balancing to prevent regions from moving around otherwise
866     // they will consume recovered.edits
867     master.balanceSwitch(false);
868 
869     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
870     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
871     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
872     final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
873 
874     Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
875     HRegionInfo region = null;
876     HRegionServer hrs = null;
877     HRegionServer dstRS = null;
878     for (int i = 0; i < NUM_RS; i++) {
879       hrs = rsts.get(i).getRegionServer();
880       List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
881       if (regions.isEmpty()) continue;
882       region = regions.get(0);
883       regionSet.add(region);
884       dstRS = rsts.get((i+1) % NUM_RS).getRegionServer();
885       break;
886     }
887     
888     slm.markRegionsRecoveringInZK(hrs.getServerName(), regionSet);
889     // move region in order for the region opened in recovering state
890     final HRegionInfo hri = region;
891     final HRegionServer tmpRS = dstRS;
892     TEST_UTIL.getHBaseAdmin().move(region.getEncodedNameAsBytes(),
893       Bytes.toBytes(dstRS.getServerName().getServerName()));
894     // wait for region move completes
895     final RegionStates regionStates =
896         TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
897     TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
898       @Override
899       public boolean evaluate() throws Exception {
900         ServerName sn = regionStates.getRegionServerOfRegion(hri);
901         return (sn != null && sn.equals(tmpRS.getServerName()));
902       }
903     });
904     
905     try {
906       byte[] key = region.getStartKey();
907       if (key == null || key.length == 0) {
908         key = new byte[] { 0, 0, 0, 0, 1 };
909       }
910       ht.setAutoFlush(true, true);
911       Put put = new Put(key);
912       put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'});
913       ht.put(put);
914       ht.close();
915     } catch (IOException ioe) {
916       Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException);
917       RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe;
918       boolean foundRegionInRecoveryException = false;
919       for (Throwable t : re.getCauses()) {
920         if (t instanceof RegionInRecoveryException) {
921           foundRegionInRecoveryException = true;
922           break;
923         }
924       }
925       Assert.assertTrue(
926         "No RegionInRecoveryException. Following exceptions returned=" + re.getCauses(),
927         foundRegionInRecoveryException);
928     }
929 
930     zkw.close();
931   }
932 
933   /**
934    * The original intention of this test was to force an abort of a region
935    * server and to make sure that the failure path in the region servers is
936    * properly evaluated. But it is difficult to ensure that the region server
937    * doesn't finish the log splitting before it aborts. Also now, there is
938    * this code path where the master will preempt the region server when master
939    * detects that the region server has aborted.
940    * @throws Exception
941    */
942   @Test (timeout=300000)
943   public void testWorkerAbort() throws Exception {
944     LOG.info("testWorkerAbort");
945     startCluster(3);
946     final int NUM_LOG_LINES = 10000;
947     final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
948     FileSystem fs = master.getMasterFileSystem().getFileSystem();
949 
950     final List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
951     HRegionServer hrs = findRSToKill(false, "table");
952     Path rootdir = FSUtils.getRootDir(conf);
953     final Path logDir = new Path(rootdir,
954         HLogUtil.getHLogDirectoryName(hrs.getServerName().toString()));
955 
956     installTable(new ZooKeeperWatcher(conf, "table-creation", null),
957         "table", "family", 40);
958 
959     makeHLog(hrs.getWAL(), ProtobufUtil.getOnlineRegions(hrs), "table", "family", NUM_LOG_LINES,
960       100);
961 
962     new Thread() {
963       public void run() {
964         waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
965         for (RegionServerThread rst : rsts) {
966           rst.getRegionServer().abort("testing");
967           break;
968         }
969       }
970     }.start();
971     // slm.splitLogDistributed(logDir);
972     FileStatus[] logfiles = fs.listStatus(logDir);
973     TaskBatch batch = new TaskBatch();
974     slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
975     //waitForCounter but for one of the 2 counters
976     long curt = System.currentTimeMillis();
977     long waitTime = 80000;
978     long endt = curt + waitTime;
979     while (curt < endt) {
980       if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
981           tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() +
982           tot_wkr_preempt_task.get()) == 0) {
983         Thread.yield();
984         curt = System.currentTimeMillis();
985       } else {
986         assertTrue(1 <= (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
987             tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() +
988             tot_wkr_preempt_task.get()));
989         return;
990       }
991     }
992     fail("none of the following counters went up in " + waitTime +
993         " milliseconds - " +
994         "tot_wkr_task_resigned, tot_wkr_task_err, " +
995         "tot_wkr_final_transition_failed, tot_wkr_task_done, " +
996         "tot_wkr_preempt_task");
997   }
998 
999   @Test (timeout=300000)
1000   public void testThreeRSAbort() throws Exception {
1001     LOG.info("testThreeRSAbort");
1002     final int NUM_REGIONS_TO_CREATE = 40;
1003     final int NUM_ROWS_PER_REGION = 100;
1004 
1005     startCluster(NUM_RS); // NUM_RS=6.
1006 
1007     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf,
1008         "distributed log splitting test", null);
1009 
1010     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1011     populateDataInTable(NUM_ROWS_PER_REGION, "family");
1012 
1013 
1014     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1015     assertEquals(NUM_RS, rsts.size());
1016     rsts.get(0).getRegionServer().abort("testing");
1017     rsts.get(1).getRegionServer().abort("testing");
1018     rsts.get(2).getRegionServer().abort("testing");
1019 
1020     long start = EnvironmentEdgeManager.currentTimeMillis();
1021     while (cluster.getLiveRegionServerThreads().size() > (NUM_RS - 3)) {
1022       if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) {
1023         assertTrue(false);
1024       }
1025       Thread.sleep(200);
1026     }
1027 
1028     start = EnvironmentEdgeManager.currentTimeMillis();
1029     while (getAllOnlineRegions(cluster).size() < (NUM_REGIONS_TO_CREATE + 1)) {
1030       if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) {
1031         assertTrue("Timedout", false);
1032       }
1033       Thread.sleep(200);
1034     }
1035 
1036     // wait for all regions are fully recovered
1037     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
1038       @Override
1039       public boolean evaluate() throws Exception {
1040         List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
1041           zkw.recoveringRegionsZNode, false);
1042         return (recoveringRegions != null && recoveringRegions.size() == 0);
1043       }
1044     });
1045 
1046     assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION,
1047         TEST_UTIL.countRows(ht));
1048     ht.close();
1049     zkw.close();
1050   }
1051 
1052 
1053 
1054   @Test(timeout=30000)
1055   public void testDelayedDeleteOnFailure() throws Exception {
1056     LOG.info("testDelayedDeleteOnFailure");
1057     startCluster(1);
1058     final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
1059     final FileSystem fs = master.getMasterFileSystem().getFileSystem();
1060     final Path logDir = new Path(FSUtils.getRootDir(conf), "x");
1061     fs.mkdirs(logDir);
1062     ExecutorService executor = null;
1063     try {
1064       final Path corruptedLogFile = new Path(logDir, "x");
1065       FSDataOutputStream out;
1066       out = fs.create(corruptedLogFile);
1067       out.write(0);
1068       out.write(Bytes.toBytes("corrupted bytes"));
1069       out.close();
1070       slm.ignoreZKDeleteForTesting = true;
1071       executor = Executors.newSingleThreadExecutor();
1072       Runnable runnable = new Runnable() {
1073        @Override
1074        public void run() {
1075           try {
1076             // since the logDir is a fake, corrupted one, so the split log worker
1077             // will finish it quickly with error, and this call will fail and throw
1078             // an IOException.
1079             slm.splitLogDistributed(logDir);
1080           } catch (IOException ioe) {
1081             try {
1082               assertTrue(fs.exists(corruptedLogFile));
1083               // this call will block waiting for the task to be removed from the
1084               // tasks map which is not going to happen since ignoreZKDeleteForTesting
1085               // is set to true, until it is interrupted.
1086               slm.splitLogDistributed(logDir);
1087             } catch (IOException e) {
1088               assertTrue(Thread.currentThread().isInterrupted());
1089               return;
1090             }
1091             fail("did not get the expected IOException from the 2nd call");
1092           }
1093           fail("did not get the expected IOException from the 1st call");
1094         }
1095       };
1096       Future<?> result = executor.submit(runnable);
1097       try {
1098         result.get(2000, TimeUnit.MILLISECONDS);
1099       } catch (TimeoutException te) {
1100         // it is ok, expected.
1101       }
1102       waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
1103       executor.shutdownNow();
1104       executor = null;
1105 
1106       // make sure the runnable is finished with no exception thrown.
1107       result.get();
1108     } finally {
1109       if (executor != null) {
1110         // interrupt the thread in case the test fails in the middle.
1111         // it has no effect if the thread is already terminated.
1112         executor.shutdownNow();
1113       }
1114       fs.delete(logDir, true);
1115     }
1116   }
1117 
1118   @Test(timeout = 300000)
1119   public void testMetaRecoveryInZK() throws Exception {
1120     LOG.info("testMetaRecoveryInZK");
1121     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1122     startCluster(NUM_RS);
1123 
1124     // turn off load balancing to prevent regions from moving around otherwise
1125     // they will consume recovered.edits
1126     master.balanceSwitch(false);
1127     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1128     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1129 
1130     // only testing meta recovery in ZK operation
1131     HRegionServer hrs = findRSToKill(true, null);
1132     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
1133 
1134     LOG.info("#regions = " + regions.size());
1135     Set<HRegionInfo> tmpRegions = new HashSet<HRegionInfo>();
1136     tmpRegions.add(HRegionInfo.FIRST_META_REGIONINFO);
1137     master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), tmpRegions);
1138     Set<HRegionInfo> userRegionSet = new HashSet<HRegionInfo>();
1139     userRegionSet.addAll(regions);
1140     master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), userRegionSet);
1141     boolean isMetaRegionInRecovery = false;
1142     List<String> recoveringRegions =
1143         zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
1144     for (String curEncodedRegionName : recoveringRegions) {
1145       if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1146         isMetaRegionInRecovery = true;
1147         break;
1148       }
1149     }
1150     assertTrue(isMetaRegionInRecovery);
1151 
1152     master.getMasterFileSystem().splitMetaLog(hrs.getServerName());
1153     
1154     isMetaRegionInRecovery = false;
1155     recoveringRegions =
1156         zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
1157     for (String curEncodedRegionName : recoveringRegions) {
1158       if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1159         isMetaRegionInRecovery = true;
1160         break;
1161       }
1162     }
1163     // meta region should be recovered
1164     assertFalse(isMetaRegionInRecovery);
1165     zkw.close();
1166   }
1167 
1168   @Test(timeout = 300000)
1169   public void testSameVersionUpdatesRecovery() throws Exception {
1170     LOG.info("testSameVersionUpdatesRecovery");
1171     conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
1172     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1173     startCluster(NUM_RS);
1174     final AtomicLong sequenceId = new AtomicLong(100);
1175     final int NUM_REGIONS_TO_CREATE = 40;
1176     final int NUM_LOG_LINES = 1000;
1177     // turn off load balancing to prevent regions from moving around otherwise
1178     // they will consume recovered.edits
1179     master.balanceSwitch(false);
1180 
1181     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1182     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1183     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1184 
1185     List<HRegionInfo> regions = null;
1186     HRegionServer hrs = null;
1187     for (int i = 0; i < NUM_RS; i++) {
1188       boolean isCarryingMeta = false;
1189       hrs = rsts.get(i).getRegionServer();
1190       regions = ProtobufUtil.getOnlineRegions(hrs);
1191       for (HRegionInfo region : regions) {
1192         if (region.isMetaRegion()) {
1193           isCarryingMeta = true;
1194           break;
1195         }
1196       }
1197       if (isCarryingMeta) {
1198         continue;
1199       }
1200       break;
1201     }
1202 
1203     LOG.info("#regions = " + regions.size());
1204     Iterator<HRegionInfo> it = regions.iterator();
1205     while (it.hasNext()) {
1206       HRegionInfo region = it.next();
1207       if (region.isMetaTable()
1208           || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1209         it.remove();
1210       }
1211     }
1212     if (regions.size() == 0) return;
1213     HRegionInfo curRegionInfo = regions.get(0);
1214     byte[] startRow = curRegionInfo.getStartKey();
1215     if (startRow == null || startRow.length == 0) {
1216       startRow = new byte[] { 0, 0, 0, 0, 1 };
1217     }
1218     byte[] row = Bytes.incrementBytes(startRow, 1);
1219     // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key
1220     row = Arrays.copyOfRange(row, 3, 8);
1221     long value = 0;
1222     byte[] tableName = Bytes.toBytes("table");
1223     byte[] family = Bytes.toBytes("family");
1224     byte[] qualifier = Bytes.toBytes("c1");
1225     long timeStamp = System.currentTimeMillis();
1226     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
1227     htd.addFamily(new HColumnDescriptor(family));
1228     for (int i = 0; i < NUM_LOG_LINES; i += 1) {
1229       WALEdit e = new WALEdit();
1230       value++;
1231       e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
1232       hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e, 
1233         System.currentTimeMillis(), htd, sequenceId);
1234     }
1235     hrs.getWAL().sync();
1236     hrs.getWAL().close();
1237 
1238     // wait for abort completes
1239     this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
1240 
1241     // verify we got the last value
1242     LOG.info("Verification Starts...");
1243     Get g = new Get(row);
1244     Result r = ht.get(g);
1245     long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1246     assertEquals(value, theStoredVal);
1247 
1248     // after flush
1249     LOG.info("Verification after flush...");
1250     TEST_UTIL.getHBaseAdmin().flush(tableName);
1251     r = ht.get(g);
1252     theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1253     assertEquals(value, theStoredVal);
1254     ht.close();
1255   }
1256 
1257   @Test(timeout = 300000)
1258   public void testSameVersionUpdatesRecoveryWithCompaction() throws Exception {
1259     LOG.info("testSameVersionUpdatesRecoveryWithWrites");
1260     conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
1261     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1262     conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024);
1263     conf.setInt("hbase.hstore.compactionThreshold", 3);
1264     startCluster(NUM_RS);
1265     final AtomicLong sequenceId = new AtomicLong(100);
1266     final int NUM_REGIONS_TO_CREATE = 40;
1267     final int NUM_LOG_LINES = 1000;
1268     // turn off load balancing to prevent regions from moving around otherwise
1269     // they will consume recovered.edits
1270     master.balanceSwitch(false);
1271 
1272     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1273     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1274     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1275 
1276     List<HRegionInfo> regions = null;
1277     HRegionServer hrs = null;
1278     for (int i = 0; i < NUM_RS; i++) {
1279       boolean isCarryingMeta = false;
1280       hrs = rsts.get(i).getRegionServer();
1281       regions = ProtobufUtil.getOnlineRegions(hrs);
1282       for (HRegionInfo region : regions) {
1283         if (region.isMetaRegion()) {
1284           isCarryingMeta = true;
1285           break;
1286         }
1287       }
1288       if (isCarryingMeta) {
1289         continue;
1290       }
1291       break;
1292     }
1293 
1294     LOG.info("#regions = " + regions.size());
1295     Iterator<HRegionInfo> it = regions.iterator();
1296     while (it.hasNext()) {
1297       HRegionInfo region = it.next();
1298       if (region.isMetaTable()
1299           || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1300         it.remove();
1301       }
1302     }
1303     if (regions.size() == 0) return;
1304     HRegionInfo curRegionInfo = regions.get(0);
1305     byte[] startRow = curRegionInfo.getStartKey();
1306     if (startRow == null || startRow.length == 0) {
1307       startRow = new byte[] { 0, 0, 0, 0, 1 };
1308     }
1309     byte[] row = Bytes.incrementBytes(startRow, 1);
1310     // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key
1311     row = Arrays.copyOfRange(row, 3, 8);
1312     long value = 0;
1313     final byte[] tableName = Bytes.toBytes("table");
1314     byte[] family = Bytes.toBytes("family");
1315     byte[] qualifier = Bytes.toBytes("c1");
1316     long timeStamp = System.currentTimeMillis();
1317     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
1318     htd.addFamily(new HColumnDescriptor(family));
1319     for (int i = 0; i < NUM_LOG_LINES; i += 1) {
1320       WALEdit e = new WALEdit();
1321       value++;
1322       e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
1323       hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e, 
1324         System.currentTimeMillis(), htd, sequenceId);
1325     }
1326     hrs.getWAL().sync();
1327     hrs.getWAL().close();
1328 
1329     // wait for abort completes
1330     this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
1331  
1332     // verify we got the last value
1333     LOG.info("Verification Starts...");
1334     Get g = new Get(row);
1335     Result r = ht.get(g);
1336     long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1337     assertEquals(value, theStoredVal);
1338 
1339     // after flush & compaction
1340     LOG.info("Verification after flush...");
1341     TEST_UTIL.getHBaseAdmin().flush(tableName);
1342     TEST_UTIL.getHBaseAdmin().compact(tableName);
1343     
1344     // wait for compaction completes
1345     TEST_UTIL.waitFor(30000, 200, new Waiter.Predicate<Exception>() {
1346       @Override
1347       public boolean evaluate() throws Exception {
1348         return (TEST_UTIL.getHBaseAdmin().getCompactionState(tableName) == CompactionState.NONE);
1349       }
1350     });
1351 
1352     r = ht.get(g);
1353     theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1354     assertEquals(value, theStoredVal);
1355     ht.close();
1356   }
1357 
1358   HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs) throws Exception {
1359     return installTable(zkw, tname, fname, nrs, 0);
1360   }
1361 
1362   HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs, 
1363       int existingRegions) throws Exception {
1364     // Create a table with regions
1365     byte [] table = Bytes.toBytes(tname);
1366     byte [] family = Bytes.toBytes(fname);
1367     LOG.info("Creating table with " + nrs + " regions");
1368     HTable ht = TEST_UTIL.createTable(table, family);
1369     int numRegions = TEST_UTIL.createMultiRegions(conf, ht, family, nrs);
1370     assertEquals(nrs, numRegions);
1371       LOG.info("Waiting for no more RIT\n");
1372     blockUntilNoRIT(zkw, master);
1373     // disable-enable cycle to get rid of table's dead regions left behind
1374     // by createMultiRegions
1375     LOG.debug("Disabling table\n");
1376     TEST_UTIL.getHBaseAdmin().disableTable(table);
1377     LOG.debug("Waiting for no more RIT\n");
1378     blockUntilNoRIT(zkw, master);
1379     NavigableSet<String> regions = getAllOnlineRegions(cluster);
1380     LOG.debug("Verifying only catalog and namespace regions are assigned\n");
1381     if (regions.size() != 2) {
1382       for (String oregion : regions)
1383         LOG.debug("Region still online: " + oregion);
1384     }
1385     assertEquals(2 + existingRegions, regions.size());
1386     LOG.debug("Enabling table\n");
1387     TEST_UTIL.getHBaseAdmin().enableTable(table);
1388     LOG.debug("Waiting for no more RIT\n");
1389     blockUntilNoRIT(zkw, master);
1390     LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
1391     regions = getAllOnlineRegions(cluster);
1392     assertEquals(numRegions + 2 + existingRegions, regions.size());
1393     return ht;
1394   }
1395 
1396   void populateDataInTable(int nrows, String fname) throws Exception {
1397     byte [] family = Bytes.toBytes(fname);
1398 
1399     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1400     assertEquals(NUM_RS, rsts.size());
1401 
1402     for (RegionServerThread rst : rsts) {
1403       HRegionServer hrs = rst.getRegionServer();
1404       List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs);
1405       for (HRegionInfo hri : hris) {
1406         if (hri.getTable().isSystemTable()) {
1407           continue;
1408         }
1409         LOG.debug("adding data to rs = " + rst.getName() +
1410             " region = "+ hri.getRegionNameAsString());
1411         HRegion region = hrs.getOnlineRegion(hri.getRegionName());
1412         assertTrue(region != null);
1413         putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family);
1414       }
1415     }
1416   }
1417 
1418   public void makeHLog(HLog log, List<HRegionInfo> regions, String tname, String fname,
1419       int num_edits, int edit_size) throws IOException {
1420     makeHLog(log, regions, tname, fname, num_edits, edit_size, true);
1421   }
1422 
1423   public void makeHLog(HLog log, List<HRegionInfo> regions, String tname, String fname,
1424       int num_edits, int edit_size, boolean closeLog) throws IOException {
1425     TableName fullTName = TableName.valueOf(tname);
1426     // remove root and meta region
1427     regions.remove(HRegionInfo.FIRST_META_REGIONINFO);
1428     // using one sequenceId for edits across all regions is ok.
1429     final AtomicLong sequenceId = new AtomicLong(10);
1430 
1431 
1432     for(Iterator<HRegionInfo> iter = regions.iterator(); iter.hasNext(); ) {
1433       HRegionInfo regionInfo = iter.next();
1434       if(regionInfo.getTable().isSystemTable()) {
1435          iter.remove();
1436       }
1437     }
1438     HTableDescriptor htd = new HTableDescriptor(fullTName);
1439     byte[] family = Bytes.toBytes(fname);
1440     htd.addFamily(new HColumnDescriptor(family));
1441     byte[] value = new byte[edit_size];
1442 
1443     List<HRegionInfo> hris = new ArrayList<HRegionInfo>();
1444     for (HRegionInfo region : regions) {
1445       if (!region.getTable().getNameAsString().equalsIgnoreCase(tname)) {
1446         continue;
1447       }
1448       hris.add(region);
1449     }
1450     LOG.info("Creating wal edits across " + hris.size() + " regions.");
1451     for (int i = 0; i < edit_size; i++) {
1452       value[i] = (byte) ('a' + (i % 26));
1453     }
1454     int n = hris.size();
1455     int[] counts = new int[n];
1456     if (n > 0) {
1457       for (int i = 0; i < num_edits; i += 1) {
1458         WALEdit e = new WALEdit();
1459         HRegionInfo curRegionInfo = hris.get(i % n);
1460         byte[] startRow = curRegionInfo.getStartKey();
1461         if (startRow == null || startRow.length == 0) {
1462           startRow = new byte[] { 0, 0, 0, 0, 1 };
1463         }
1464         byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
1465         row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
1466                                              // HBaseTestingUtility.createMultiRegions use 5 bytes
1467                                              // key
1468         byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
1469         e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value));
1470         log.append(curRegionInfo, fullTName, e, System.currentTimeMillis(), htd, sequenceId);
1471         counts[i % n] += 1;
1472       }
1473     }
1474     log.sync();
1475     if(closeLog) {
1476       log.close();
1477     }
1478     for (int i = 0; i < n; i++) {
1479       LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
1480     }
1481     return;
1482   }
1483 
1484   private int countHLog(Path log, FileSystem fs, Configuration conf)
1485   throws IOException {
1486     int count = 0;
1487     HLog.Reader in = HLogFactory.createReader(fs, log, conf);
1488     while (in.next() != null) {
1489       count++;
1490     }
1491     return count;
1492   }
1493 
1494   private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master)
1495   throws KeeperException, InterruptedException {
1496     ZKAssign.blockUntilNoRIT(zkw);
1497     master.assignmentManager.waitUntilNoRegionsInTransition(60000);
1498   }
1499 
1500   private void putData(HRegion region, byte[] startRow, int numRows, byte [] qf,
1501       byte [] ...families)
1502   throws IOException {
1503     for(int i = 0; i < numRows; i++) {
1504       Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
1505       for(byte [] family : families) {
1506         put.add(family, qf, null);
1507       }
1508       region.put(put);
1509     }
1510   }
1511 
1512   /**
1513    * Load table with puts and deletes with expected values so that we can verify later
1514    */
1515   private void prepareData(final HTable t, final byte[] f, final byte[] column) throws IOException {
1516     t.setAutoFlush(false, true);
1517     byte[] k = new byte[3];
1518 
1519     // add puts
1520     for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1521       for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1522         for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1523           k[0] = b1;
1524           k[1] = b2;
1525           k[2] = b3;
1526           Put put = new Put(k);
1527           put.add(f, column, k);
1528           t.put(put);
1529         }
1530       }
1531     }
1532     t.flushCommits();
1533     // add deletes
1534     for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1535       k[0] = 'a';
1536       k[1] = 'a';
1537       k[2] = b3;
1538       Delete del = new Delete(k);
1539       t.delete(del);
1540     }
1541     t.flushCommits();
1542   }
1543 
1544   private NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
1545       throws IOException {
1546     NavigableSet<String> online = new TreeSet<String>();
1547     for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
1548       for (HRegionInfo region : ProtobufUtil.getOnlineRegions(rst.getRegionServer())) {
1549         online.add(region.getRegionNameAsString());
1550       }
1551     }
1552     return online;
1553   }
1554 
1555   private void waitForCounter(AtomicLong ctr, long oldval, long newval,
1556       long timems) {
1557     long curt = System.currentTimeMillis();
1558     long endt = curt + timems;
1559     while (curt < endt) {
1560       if (ctr.get() == oldval) {
1561         Thread.yield();
1562         curt = System.currentTimeMillis();
1563       } else {
1564         assertEquals(newval, ctr.get());
1565         return;
1566       }
1567     }
1568     assertTrue(false);
1569   }
1570 
1571   private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException {
1572     for (MasterThread mt : cluster.getLiveMasterThreads()) {
1573       if (mt.getMaster().isActiveMaster()) {
1574         mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
1575         mt.join();
1576         break;
1577       }
1578     }
1579     LOG.debug("Master is aborted");
1580   }
1581 
1582   private void startMasterAndWaitUntilLogSplit(MiniHBaseCluster cluster)
1583       throws IOException, InterruptedException {
1584     cluster.startMaster();
1585     HMaster master = cluster.getMaster();
1586     while (!master.isInitialized()) {
1587       Thread.sleep(100);
1588     }
1589     ServerManager serverManager = master.getServerManager();
1590     while (serverManager.areDeadServersInProgress()) {
1591       Thread.sleep(100);
1592     }
1593   }
1594 
1595   /**
1596    * Find a RS that has regions of a table.
1597    * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
1598    * @param tableName
1599    * @return
1600    * @throws Exception
1601    */
1602   private HRegionServer findRSToKill(boolean hasMetaRegion, String tableName) throws Exception {
1603     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1604     int numOfRSs = rsts.size();
1605     List<HRegionInfo> regions = null;
1606     HRegionServer hrs = null;
1607 
1608     for (int i = 0; i < numOfRSs; i++) {
1609       boolean isCarryingMeta = false;
1610       boolean foundTableRegion = false;
1611       hrs = rsts.get(i).getRegionServer();
1612       regions = ProtobufUtil.getOnlineRegions(hrs);
1613       for (HRegionInfo region : regions) {
1614         if (region.isMetaRegion()) {
1615           isCarryingMeta = true;
1616         }
1617         if (tableName == null || region.getTable().getNameAsString().equals(tableName)) {
1618           foundTableRegion = true;
1619         }
1620         if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
1621           break;
1622         }
1623       }
1624       if (isCarryingMeta && hasMetaRegion) {
1625         // clients ask for a RS with META
1626         if (!foundTableRegion) {
1627           final HRegionServer destRS = hrs;
1628           // the RS doesn't have regions of the specified table so we need move one to this RS
1629           List<HRegionInfo> tableRegions =
1630               TEST_UTIL.getHBaseAdmin().getTableRegions(Bytes.toBytes(tableName));
1631           final HRegionInfo hri = tableRegions.get(0);
1632           TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
1633             Bytes.toBytes(destRS.getServerName().getServerName()));
1634           // wait for region move completes
1635           final RegionStates regionStates =
1636               TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
1637           TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
1638             @Override
1639             public boolean evaluate() throws Exception {
1640               ServerName sn = regionStates.getRegionServerOfRegion(hri);
1641               return (sn != null && sn.equals(destRS.getServerName()));
1642             }
1643           });
1644         }
1645         return hrs;
1646       } else if (hasMetaRegion || isCarryingMeta) {
1647         continue;
1648       }
1649       if (foundTableRegion) break;
1650     }
1651 
1652     return hrs;
1653   }
1654 
1655 }