1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.IOException;
22 import java.io.PrintWriter;
23 import java.lang.reflect.Constructor;
24 import java.lang.reflect.InvocationTargetException;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.hbase.master.HMaster;
33 import org.apache.hadoop.hbase.regionserver.HRegionServer;
34 import org.apache.hadoop.hbase.regionserver.ShutdownHook;
35 import org.apache.hadoop.util.ReflectionUtils;
36
37
38
39
40 @InterfaceAudience.Private
41 public class JVMClusterUtil {
42 private static final Log LOG = LogFactory.getLog(JVMClusterUtil.class);
43
44
45
46
47 public static class RegionServerThread extends Thread {
48 private final HRegionServer regionServer;
49
50 public RegionServerThread(final HRegionServer r, final int index) {
51 super(r, "RS:" + index + ";" + r.getServerName().toShortString());
52 this.regionServer = r;
53 }
54
55
56 public HRegionServer getRegionServer() {
57 return this.regionServer;
58 }
59
60
61
62
63
64 public void waitForServerOnline() {
65
66
67
68
69 regionServer.waitForServerOnline();
70 }
71 }
72
73
74
75
76
77
78
79
80
81
82 public static JVMClusterUtil.RegionServerThread createRegionServerThread(
83 final Configuration c, final Class<? extends HRegionServer> hrsc,
84 final int index)
85 throws IOException {
86 HRegionServer server;
87 try {
88 Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class);
89 ctor.setAccessible(true);
90 server = ctor.newInstance(c);
91 } catch (InvocationTargetException ite) {
92 Throwable target = ite.getTargetException();
93 throw new RuntimeException("Failed construction of RegionServer: " +
94 hrsc.toString() + ((target.getCause() != null)?
95 target.getCause().getMessage(): ""), target);
96 } catch (Exception e) {
97 IOException ioe = new IOException();
98 ioe.initCause(e);
99 throw ioe;
100 }
101 return new JVMClusterUtil.RegionServerThread(server, index);
102 }
103
104
105
106
107
108 public static class MasterThread extends Thread {
109 private final HMaster master;
110
111 public MasterThread(final HMaster m, final int index) {
112 super(m, "M:" + index + ";" + m.getServerName().toShortString());
113 this.master = m;
114 }
115
116
117 public HMaster getMaster() {
118 return this.master;
119 }
120 }
121
122
123
124
125
126
127
128
129
130
131 public static JVMClusterUtil.MasterThread createMasterThread(
132 final Configuration c, final Class<? extends HMaster> hmc,
133 final int index)
134 throws IOException {
135 HMaster server;
136 try {
137 server = hmc.getConstructor(Configuration.class).newInstance(c);
138 } catch (InvocationTargetException ite) {
139 Throwable target = ite.getTargetException();
140 throw new RuntimeException("Failed construction of Master: " +
141 hmc.toString() + ((target.getCause() != null)?
142 target.getCause().getMessage(): ""), target);
143 } catch (Exception e) {
144 IOException ioe = new IOException();
145 ioe.initCause(e);
146 throw ioe;
147 }
148 return new JVMClusterUtil.MasterThread(server, index);
149 }
150
151 private static JVMClusterUtil.MasterThread findActiveMaster(
152 List<JVMClusterUtil.MasterThread> masters) {
153 for (JVMClusterUtil.MasterThread t : masters) {
154 if (t.master.isActiveMaster()) {
155 return t;
156 }
157 }
158
159 return null;
160 }
161
162
163
164
165
166
167
168
169 public static String startup(final List<JVMClusterUtil.MasterThread> masters,
170 final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException {
171
172 if (masters == null || masters.isEmpty()) {
173 return null;
174 }
175
176 for (JVMClusterUtil.MasterThread t : masters) {
177 t.start();
178 }
179
180
181
182
183 long startTime = System.currentTimeMillis();
184 while (findActiveMaster(masters) == null) {
185 try {
186 Thread.sleep(100);
187 } catch (InterruptedException ignored) {
188 }
189 if (System.currentTimeMillis() > startTime + 30000) {
190 throw new RuntimeException("Master not active after 30 seconds");
191 }
192 }
193
194 if (regionservers != null) {
195 for (JVMClusterUtil.RegionServerThread t: regionservers) {
196 HRegionServer hrs = t.getRegionServer();
197 ShutdownHook.install(hrs.getConfiguration(), FileSystem.get(hrs
198 .getConfiguration()), hrs, t);
199 t.start();
200 }
201 }
202
203
204
205 startTime = System.currentTimeMillis();
206 final int maxwait = 200000;
207 while (true) {
208 JVMClusterUtil.MasterThread t = findActiveMaster(masters);
209 if (t != null && t.master.isInitialized()) {
210 return t.master.getServerName().toString();
211 }
212
213 if (System.currentTimeMillis() > startTime + 10000) {
214
215 Threads.sleep(1000);
216 }
217 if (System.currentTimeMillis() > startTime + maxwait) {
218 String msg = "Master not initialized after " + maxwait + "ms seconds";
219 ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
220 "Thread dump because: " + msg);
221 throw new RuntimeException(msg);
222 }
223 try {
224 Thread.sleep(100);
225 } catch (InterruptedException ignored) {
226
227 }
228 }
229 }
230
231
232
233
234
235 public static void shutdown(final List<MasterThread> masters,
236 final List<RegionServerThread> regionservers) {
237 LOG.debug("Shutting down HBase Cluster");
238 if (masters != null) {
239
240 JVMClusterUtil.MasterThread activeMaster = null;
241 for (JVMClusterUtil.MasterThread t : masters) {
242 if (!t.master.isActiveMaster()) {
243 t.master.stopMaster();
244 } else {
245 activeMaster = t;
246 }
247 }
248
249 if (activeMaster != null)
250 activeMaster.master.shutdown();
251
252 }
253 boolean wasInterrupted = false;
254 final long maxTime = System.currentTimeMillis() + 30 * 1000;
255 if (regionservers != null) {
256
257 for (RegionServerThread t : regionservers) {
258 t.getRegionServer().stop("Shutdown requested");
259 }
260 for (RegionServerThread t : regionservers) {
261 long now = System.currentTimeMillis();
262 if (t.isAlive() && !wasInterrupted && now < maxTime) {
263 try {
264 t.join(maxTime - now);
265 } catch (InterruptedException e) {
266 LOG.info("Got InterruptedException on shutdown - " +
267 "not waiting anymore on region server ends", e);
268 wasInterrupted = true;
269 }
270 }
271 }
272
273
274 for (int i = 0; i < 100; ++i) {
275 boolean atLeastOneLiveServer = false;
276 for (RegionServerThread t : regionservers) {
277 if (t.isAlive()) {
278 atLeastOneLiveServer = true;
279 try {
280 LOG.warn("RegionServerThreads remaining, give one more chance before interrupting");
281 t.join(1000);
282 } catch (InterruptedException e) {
283 wasInterrupted = true;
284 }
285 }
286 }
287 if (!atLeastOneLiveServer) break;
288 for (RegionServerThread t : regionservers) {
289 if (t.isAlive()) {
290 LOG.warn("RegionServerThreads taking too long to stop, interrupting");
291 t.interrupt();
292 }
293 }
294 }
295 }
296
297 if (masters != null) {
298 for (JVMClusterUtil.MasterThread t : masters) {
299 while (t.master.isAlive() && !wasInterrupted) {
300 try {
301
302
303
304 Threads.threadDumpingIsAlive(t.master.getThread());
305 } catch(InterruptedException e) {
306 LOG.info("Got InterruptedException on shutdown - " +
307 "not waiting anymore on master ends", e);
308 wasInterrupted = true;
309 }
310 }
311 }
312 }
313 LOG.info("Shutdown of " +
314 ((masters != null) ? masters.size() : "0") + " master(s) and " +
315 ((regionservers != null) ? regionservers.size() : "0") +
316 " regionserver(s) " + (wasInterrupted ? "interrupted" : "complete"));
317
318 if (wasInterrupted){
319 Thread.currentThread().interrupt();
320 }
321 }
322 }