View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.hbase.HBaseTestingUtility;
25  import org.apache.hadoop.hbase.LargeTests;
26  import org.apache.hadoop.hbase.UnknownScannerException;
27  import org.apache.hadoop.hbase.client.Result;
28  import org.apache.hadoop.hbase.client.ResultScanner;
29  import org.apache.hadoop.hbase.client.Scan;
30  import org.junit.experimental.categories.Category;
31  
32  import static org.junit.Assert.fail;
33  
34  @Category(LargeTests.class)
35  public class TestReplicationKillRS extends TestReplicationBase {
36  
37    private static final Log LOG = LogFactory.getLog(TestReplicationKillRS.class);
38  
39    /**
40     * Load up 1 tables over 2 region servers and kill a source during
41     * the upload. The failover happens internally.
42     *
43     * WARNING this test sometimes fails because of HBASE-3515
44     *
45     * @throws Exception
46     */
47    public void loadTableAndKillRS(HBaseTestingUtility util) throws Exception {
48      // killing the RS with hbase:meta can result into failed puts until we solve
49      // IO fencing
50      int rsToKill1 =
51          util.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
52  
53      // Takes about 20 secs to run the full loading, kill around the middle
54      Thread killer = killARegionServer(util, 5000, rsToKill1);
55  
56      LOG.info("Start loading table");
57      int initialCount = utility1.loadTable(htable1, famName);
58      LOG.info("Done loading table");
59      killer.join(5000);
60      LOG.info("Done waiting for threads");
61  
62      Result[] res;
63      while (true) {
64        try {
65          Scan scan = new Scan();
66          ResultScanner scanner = htable1.getScanner(scan);
67          res = scanner.next(initialCount);
68          scanner.close();
69          break;
70        } catch (UnknownScannerException ex) {
71          LOG.info("Cluster wasn't ready yet, restarting scanner");
72        }
73      }
74      // Test we actually have all the rows, we may miss some because we
75      // don't have IO fencing.
76      if (res.length != initialCount) {
77        LOG.warn("We lost some rows on the master cluster!");
78        // We don't really expect the other cluster to have more rows
79        initialCount = res.length;
80      }
81  
82      int lastCount = 0;
83  
84      final long start = System.currentTimeMillis();
85      int i = 0;
86      while (true) {
87        if (i==NB_RETRIES-1) {
88          fail("Waited too much time for queueFailover replication. " +
89              "Waited "+(System.currentTimeMillis() - start)+"ms.");
90        }
91        Scan scan2 = new Scan();
92        ResultScanner scanner2 = htable2.getScanner(scan2);
93        Result[] res2 = scanner2.next(initialCount * 2);
94        scanner2.close();
95        if (res2.length < initialCount) {
96          if (lastCount < res2.length) {
97            i--; // Don't increment timeout if we make progress
98          } else {
99            i++;
100         }
101         lastCount = res2.length;
102         LOG.info("Only got " + lastCount + " rows instead of " +
103             initialCount + " current i=" + i);
104         Thread.sleep(SLEEP_TIME*2);
105       } else {
106         break;
107       }
108     }
109   }
110 
111   private static Thread killARegionServer(final HBaseTestingUtility utility,
112                                           final long timeout, final int rs) {
113     Thread killer = new Thread() {
114       public void run() {
115         try {
116           Thread.sleep(timeout);
117           utility.getHBaseCluster().getRegionServer(rs).stop("Stopping as part of the test");
118         } catch (Exception e) {
119           LOG.error("Couldn't kill a region server", e);
120         }
121       }
122     };
123     killer.setDaemon(true);
124     killer.start();
125     return killer;
126   }
127 }