1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import static org.junit.Assert.*;
22
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.NavigableSet;
26 import java.util.Set;
27 import java.util.TreeSet;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.*;
33 import org.apache.hadoop.hbase.client.HTable;
34 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
37 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
38 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
39 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
40 import org.apache.zookeeper.KeeperException;
41 import org.junit.Test;
42 import org.junit.experimental.categories.Category;
43
44
45
46
47 @Category(LargeTests.class)
48 public class TestRollingRestart {
49 private static final Log LOG = LogFactory.getLog(TestRollingRestart.class);
50
51 @Test (timeout=500000)
52 public void testBasicRollingRestart() throws Exception {
53
54
55 final int NUM_MASTERS = 2;
56 final int NUM_RS = 3;
57 final int NUM_REGIONS_TO_CREATE = 20;
58
59 int expectedNumRS = 3;
60
61
62 log("Starting cluster");
63 Configuration conf = HBaseConfiguration.create();
64 conf.setInt("hbase.master.assignment.timeoutmonitor.period", 2000);
65 conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 5000);
66 HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
67 TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
68 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
69 log("Waiting for active/ready master");
70 cluster.waitForActiveAndReadyMaster();
71 ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testRollingRestart",
72 null);
73 HMaster master = cluster.getMaster();
74
75
76 byte [] table = Bytes.toBytes("tableRestart");
77 byte [] family = Bytes.toBytes("family");
78 log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions");
79 HTable ht = TEST_UTIL.createTable(table, family);
80 int numRegions = TEST_UTIL.createMultiRegions(conf, ht, family,
81 NUM_REGIONS_TO_CREATE);
82 numRegions += 1;
83 log("Waiting for no more RIT\n");
84 blockUntilNoRIT(zkw, master);
85 log("Disabling table\n");
86 TEST_UTIL.getHBaseAdmin().disableTable(table);
87 log("Waiting for no more RIT\n");
88 blockUntilNoRIT(zkw, master);
89 NavigableSet<String> regions = getAllOnlineRegions(cluster);
90 log("Verifying only catalog and namespace regions are assigned\n");
91 if (regions.size() != 2) {
92 for (String oregion : regions) log("Region still online: " + oregion);
93 }
94 assertEquals(2, regions.size());
95 log("Enabling table\n");
96 TEST_UTIL.getHBaseAdmin().enableTable(table);
97 log("Waiting for no more RIT\n");
98 blockUntilNoRIT(zkw, master);
99 log("Verifying there are " + numRegions + " assigned on cluster\n");
100 regions = getAllOnlineRegions(cluster);
101 assertRegionsAssigned(cluster, regions);
102 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
103
104
105 log("Adding a fourth RS");
106 RegionServerThread restarted = cluster.startRegionServer();
107 expectedNumRS++;
108 restarted.waitForServerOnline();
109 log("Additional RS is online");
110 log("Waiting for no more RIT");
111 blockUntilNoRIT(zkw, master);
112 log("Verifying there are " + numRegions + " assigned on cluster");
113 assertRegionsAssigned(cluster, regions);
114 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
115
116
117 List<MasterThread> masterThreads = cluster.getMasterThreads();
118 MasterThread activeMaster = null;
119 MasterThread backupMaster = null;
120 assertEquals(2, masterThreads.size());
121 if (masterThreads.get(0).getMaster().isActiveMaster()) {
122 activeMaster = masterThreads.get(0);
123 backupMaster = masterThreads.get(1);
124 } else {
125 activeMaster = masterThreads.get(1);
126 backupMaster = masterThreads.get(0);
127 }
128
129
130 log("Stopping backup master\n\n");
131 backupMaster.getMaster().stop("Stop of backup during rolling restart");
132 cluster.hbaseCluster.waitOnMaster(backupMaster);
133
134
135 log("Stopping primary master\n\n");
136 activeMaster.getMaster().stop("Stop of active during rolling restart");
137 cluster.hbaseCluster.waitOnMaster(activeMaster);
138
139
140 log("Restarting primary master\n\n");
141 activeMaster = cluster.startMaster();
142 cluster.waitForActiveAndReadyMaster();
143 master = activeMaster.getMaster();
144
145
146 log("Restarting backup master\n\n");
147 backupMaster = cluster.startMaster();
148
149 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
150
151
152
153
154 List<RegionServerThread> regionServers =
155 cluster.getLiveRegionServerThreads();
156 int num = 1;
157 int total = regionServers.size();
158 for (RegionServerThread rst : regionServers) {
159 ServerName serverName = rst.getRegionServer().getServerName();
160 log("Stopping region server " + num + " of " + total + " [ " +
161 serverName + "]");
162 rst.getRegionServer().stop("Stopping RS during rolling restart");
163 cluster.hbaseCluster.waitOnRegionServer(rst);
164 log("Waiting for RS shutdown to be handled by master");
165 waitForRSShutdownToStartAndFinish(activeMaster, serverName);
166 log("RS shutdown done, waiting for no more RIT");
167 blockUntilNoRIT(zkw, master);
168 log("Verifying there are " + numRegions + " assigned on cluster");
169 assertRegionsAssigned(cluster, regions);
170 expectedNumRS--;
171 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
172 log("Restarting region server " + num + " of " + total);
173 restarted = cluster.startRegionServer();
174 restarted.waitForServerOnline();
175 expectedNumRS++;
176 log("Region server " + num + " is back online");
177 log("Waiting for no more RIT");
178 blockUntilNoRIT(zkw, master);
179 log("Verifying there are " + numRegions + " assigned on cluster");
180 assertRegionsAssigned(cluster, regions);
181 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
182 num++;
183 }
184 Thread.sleep(1000);
185 assertRegionsAssigned(cluster, regions);
186
187
188 RegionServerThread metaServer = getServerHostingMeta(cluster);
189 log("Stopping server hosting hbase:meta #1");
190 metaServer.getRegionServer().stop("Stopping hbase:meta server");
191 cluster.hbaseCluster.waitOnRegionServer(metaServer);
192 log("Meta server down #1");
193 expectedNumRS--;
194 log("Waiting for meta server #1 RS shutdown to be handled by master");
195 waitForRSShutdownToStartAndFinish(activeMaster,
196 metaServer.getRegionServer().getServerName());
197 log("Waiting for no more RIT");
198 long start = System.currentTimeMillis();
199 do {
200 blockUntilNoRIT(zkw, master);
201 } while (getNumberOfOnlineRegions(cluster) < numRegions
202 && System.currentTimeMillis()-start < 60000);
203 log("Verifying there are " + numRegions + " assigned on cluster");
204 assertRegionsAssigned(cluster, regions);
205 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
206
207
208 metaServer = getServerHostingMeta(cluster);
209 log("Stopping server hosting hbase:meta #2");
210 metaServer.getRegionServer().stop("Stopping hbase:meta server");
211 cluster.hbaseCluster.waitOnRegionServer(metaServer);
212 log("Meta server down");
213 expectedNumRS--;
214 log("Waiting for RS shutdown to be handled by master");
215 waitForRSShutdownToStartAndFinish(activeMaster,
216 metaServer.getRegionServer().getServerName());
217 log("RS shutdown done, waiting for no more RIT");
218 blockUntilNoRIT(zkw, master);
219 log("Verifying there are " + numRegions + " assigned on cluster");
220 assertRegionsAssigned(cluster, regions);
221 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
222
223
224 cluster.startRegionServer().waitForServerOnline();
225 cluster.startRegionServer().waitForServerOnline();
226 cluster.startRegionServer().waitForServerOnline();
227 Thread.sleep(1000);
228 log("Waiting for no more RIT");
229 blockUntilNoRIT(zkw, master);
230 log("Verifying there are " + numRegions + " assigned on cluster");
231 assertRegionsAssigned(cluster, regions);
232
233 metaServer = getServerHostingMeta(cluster);
234 log("Stopping server hosting hbase:meta (1 of 3)");
235 metaServer.getRegionServer().stop("Stopping hbase:meta server");
236 cluster.hbaseCluster.waitOnRegionServer(metaServer);
237 log("Meta server down (1 of 3)");
238 log("Waiting for RS shutdown to be handled by master");
239 waitForRSShutdownToStartAndFinish(activeMaster,
240 metaServer.getRegionServer().getServerName());
241 log("RS shutdown done, waiting for no more RIT");
242 blockUntilNoRIT(zkw, master);
243 log("Verifying there are " + numRegions + " assigned on cluster");
244 assertRegionsAssigned(cluster, regions);
245
246
247 metaServer = getServerHostingMeta(cluster);
248 log("Stopping server hosting hbase:meta (2 of 3)");
249 metaServer.getRegionServer().stop("Stopping hbase:meta server");
250 cluster.hbaseCluster.waitOnRegionServer(metaServer);
251 log("Meta server down (2 of 3)");
252 log("Waiting for RS shutdown to be handled by master");
253 waitForRSShutdownToStartAndFinish(activeMaster,
254 metaServer.getRegionServer().getServerName());
255 log("RS shutdown done, waiting for no more RIT");
256 blockUntilNoRIT(zkw, master);
257 log("Verifying there are " + numRegions + " assigned on cluster");
258 assertRegionsAssigned(cluster, regions);
259
260
261 metaServer = getServerHostingMeta(cluster);
262 log("Stopping server hosting hbase:meta (3 of 3)");
263 metaServer.getRegionServer().stop("Stopping hbase:meta server");
264 cluster.hbaseCluster.waitOnRegionServer(metaServer);
265 log("Meta server down (3 of 3)");
266 log("Waiting for RS shutdown to be handled by master");
267 waitForRSShutdownToStartAndFinish(activeMaster,
268 metaServer.getRegionServer().getServerName());
269 log("RS shutdown done, waiting for no more RIT");
270 blockUntilNoRIT(zkw, master);
271 log("Verifying there are " + numRegions + " assigned on cluster");
272 assertRegionsAssigned(cluster, regions);
273
274 if (cluster.getRegionServerThreads().size() != 1) {
275 log("Online regionservers:");
276 for (RegionServerThread rst : cluster.getRegionServerThreads()) {
277 log("RS: " + rst.getRegionServer().getServerName());
278 }
279 }
280 assertEquals(2, cluster.getRegionServerThreads().size());
281
282
283
284
285 ht.close();
286
287 TEST_UTIL.shutdownMiniCluster();
288 }
289
290 private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master)
291 throws KeeperException, InterruptedException {
292 ZKAssign.blockUntilNoRIT(zkw);
293 master.assignmentManager.waitUntilNoRegionsInTransition(60000);
294 }
295
296 private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster,
297 ServerName serverName) throws InterruptedException {
298 ServerManager sm = activeMaster.getMaster().getServerManager();
299
300 while (!sm.getDeadServers().isDeadServer(serverName)) {
301 log("Waiting for [" + serverName + "] to be listed as dead in master");
302 Thread.sleep(1);
303 }
304 log("Server [" + serverName + "] marked as dead, waiting for it to " +
305 "finish dead processing");
306 while (sm.areDeadServersInProgress()) {
307 log("Server [" + serverName + "] still being processed, waiting");
308 Thread.sleep(100);
309 }
310 log("Server [" + serverName + "] done with server shutdown processing");
311 }
312
313 private void log(String msg) {
314 LOG.debug("\n\nTRR: " + msg + "\n");
315 }
316
317 private RegionServerThread getServerHostingMeta(MiniHBaseCluster cluster)
318 throws IOException {
319 return getServerHosting(cluster, HRegionInfo.FIRST_META_REGIONINFO);
320 }
321
322 private RegionServerThread getServerHosting(MiniHBaseCluster cluster,
323 HRegionInfo region) throws IOException {
324 for (RegionServerThread rst : cluster.getRegionServerThreads()) {
325 if (ProtobufUtil.getOnlineRegions(rst.getRegionServer()).contains(region)) {
326 return rst;
327 }
328 }
329 return null;
330 }
331
332 private int getNumberOfOnlineRegions(MiniHBaseCluster cluster) {
333 int numFound = 0;
334 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
335 numFound += rst.getRegionServer().getNumberOfOnlineRegions();
336 }
337 return numFound;
338 }
339
340 private void assertRegionsAssigned(MiniHBaseCluster cluster,
341 Set<String> expectedRegions) throws IOException {
342 int numFound = getNumberOfOnlineRegions(cluster);
343 if (expectedRegions.size() > numFound) {
344 log("Expected to find " + expectedRegions.size() + " but only found"
345 + " " + numFound);
346 NavigableSet<String> foundRegions = getAllOnlineRegions(cluster);
347 for (String region : expectedRegions) {
348 if (!foundRegions.contains(region)) {
349 log("Missing region: " + region);
350 }
351 }
352 assertEquals(expectedRegions.size(), numFound);
353 } else if (expectedRegions.size() < numFound) {
354 int doubled = numFound - expectedRegions.size();
355 log("Expected to find " + expectedRegions.size() + " but found"
356 + " " + numFound + " (" + doubled + " double assignments?)");
357 NavigableSet<String> doubleRegions = getDoubleAssignedRegions(cluster);
358 for (String region : doubleRegions) {
359 log("Region is double assigned: " + region);
360 }
361 assertEquals(expectedRegions.size(), numFound);
362 } else {
363 log("Success! Found expected number of " + numFound + " regions");
364 }
365 }
366
367 private NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
368 throws IOException {
369 NavigableSet<String> online = new TreeSet<String>();
370 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
371 for (HRegionInfo region : ProtobufUtil.getOnlineRegions(rst.getRegionServer())) {
372 online.add(region.getRegionNameAsString());
373 }
374 }
375 return online;
376 }
377
378 private NavigableSet<String> getDoubleAssignedRegions(
379 MiniHBaseCluster cluster) throws IOException {
380 NavigableSet<String> online = new TreeSet<String>();
381 NavigableSet<String> doubled = new TreeSet<String>();
382 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
383 for (HRegionInfo region : ProtobufUtil.getOnlineRegions(rst.getRegionServer())) {
384 if(!online.add(region.getRegionNameAsString())) {
385 doubled.add(region.getRegionNameAsString());
386 }
387 }
388 }
389 return doubled;
390 }
391
392
393 }
394