View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.CoordinatedStateManager;
29  import org.apache.hadoop.hbase.HBaseTestingUtility;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.LocalHBaseCluster;
32  import org.apache.hadoop.hbase.ServerName;
33  import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
34  import org.apache.hadoop.hbase.master.HMaster;
35  import org.apache.hadoop.hbase.master.ServerManager;
36  import org.apache.hadoop.hbase.testclassification.MediumTests;
37  import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
38  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
39  import org.apache.zookeeper.KeeperException;
40  import org.junit.After;
41  import org.junit.Before;
42  import org.junit.Test;
43  import org.junit.experimental.categories.Category;
44  
45  @Category(MediumTests.class)
46  public class TestRegionServerReportForDuty {
47  
48    private static final Log LOG = LogFactory.getLog(TestRegionServerReportForDuty.class);
49  
50    private static final long SLEEP_INTERVAL = 500;
51  
52    private HBaseTestingUtility testUtil;
53    private LocalHBaseCluster cluster;
54    private RegionServerThread rs;
55    private RegionServerThread rs2;
56    private MasterThread master;
57    private MasterThread backupMaster;
58  
59    @Before
60    public void setUp() throws Exception {
61      testUtil = new HBaseTestingUtility();
62      testUtil.startMiniDFSCluster(1);
63      testUtil.startMiniZKCluster(1);
64      testUtil.createRootDir();
65      cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0);
66    }
67  
68    @After
69    public void tearDown() throws Exception {
70      cluster.shutdown();
71      cluster.join();
72      testUtil.shutdownMiniZKCluster();
73      testUtil.shutdownMiniDFSCluster();
74    }
75  
76    /**
77     * Tests region sever reportForDuty with backup master becomes primary master after
78     * the first master goes away.
79     */
80    @Test (timeout=180000)
81    public void testReportForDutyWithMasterChange() throws Exception {
82  
83      // Start a master and wait for it to become the active/primary master.
84      // Use a random unique port
85      cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
86      cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
87      cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
88      master = cluster.addMaster();
89      rs = cluster.addRegionServer();
90      LOG.debug("Starting master: " + master.getMaster().getServerName());
91      master.start();
92      rs.start();
93  
94      waitForClusterOnline(master);
95  
96      // Add a 2nd region server
97      cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
98      rs2 = cluster.addRegionServer();
99      // Start the region server. This region server will refresh RPC connection
100     // from the current active master to the next active master before completing
101     // reportForDuty
102     LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName());
103     rs2.start();
104 
105     waitForSecondRsStarted();
106 
107     // Stop the current master.
108     master.getMaster().stop("Stopping master");
109 
110     // Start a new master and use another random unique port
111     // Also let it wait for exactly 2 region severs to report in.
112     cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
113     cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 2);
114     cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2);
115     backupMaster = cluster.addMaster();
116     LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName());
117     backupMaster.start();
118 
119     waitForClusterOnline(backupMaster);
120 
121     // Do some checking/asserts here.
122     assertTrue(backupMaster.getMaster().isActiveMaster());
123     assertTrue(backupMaster.getMaster().isInitialized());
124     assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 2);
125 
126   }
127 
128   private void waitForClusterOnline(MasterThread master) throws InterruptedException {
129     while (true) {
130       if (master.getMaster().isInitialized()) {
131         break;
132       }
133       Thread.sleep(SLEEP_INTERVAL);
134       LOG.debug("Waiting for master to come online ...");
135     }
136     rs.waitForServerOnline();
137   }
138 
139   private void waitForSecondRsStarted() throws InterruptedException {
140     while (true) {
141       if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) {
142         break;
143       }
144       Thread.sleep(SLEEP_INTERVAL);
145       LOG.debug("Waiting 2nd RS to be started ...");
146     }
147   }
148 
149   // Create a Region Server that provide a hook so that we can wait for the master switch over
150   // before continuing reportForDuty to the mater.
151   // The idea is that we get a RPC connection to the first active master, then we wait.
152   // The first master goes down, the second master becomes the active master. The region
153   // server continues reportForDuty. It should succeed with the new master.
154   public static class MyRegionServer extends MiniHBaseClusterRegionServer {
155 
156     private ServerName sn;
157     // This flag is to make sure this rs has obtained the rpcStub to the first master.
158     // The first master will go down after this.
159     private boolean rpcStubCreatedFlag = false;
160     private boolean masterChanged = false;
161 
162     public MyRegionServer(Configuration conf, CoordinatedStateManager cp)
163       throws IOException, KeeperException,
164         InterruptedException {
165       super(conf, cp);
166     }
167 
168     @Override
169     protected synchronized ServerName createRegionServerStatusStub() {
170       sn = super.createRegionServerStatusStub();
171       rpcStubCreatedFlag = true;
172 
173       // Wait for master switch over. Only do this for the second region server.
174       while (!masterChanged) {
175         ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true);
176         if (newSn != null && !newSn.equals(sn)) {
177           masterChanged = true;
178           break;
179         }
180         try {
181           Thread.sleep(SLEEP_INTERVAL);
182         } catch (InterruptedException e) {
183           return null;
184         }
185         LOG.debug("Waiting for master switch over ... ");
186       }
187       return sn;
188     }
189 
190     public boolean getRpcStubCreatedFlag() {
191       return rpcStubCreatedFlag;
192     }
193   }
194 }