1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.security.PrivilegedExceptionAction;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.classification.InterfaceAudience;
30 import org.apache.hadoop.classification.InterfaceStability;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.client.HBaseAdmin;
33 import org.apache.hadoop.hbase.regionserver.HRegionServer;
34 import org.apache.hadoop.hbase.security.User;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
37 import org.apache.hadoop.hbase.util.Threads;
38
39 import java.util.concurrent.CopyOnWriteArrayList;
40 import org.apache.hadoop.hbase.master.HMaster;
41 import org.apache.hadoop.hbase.util.JVMClusterUtil;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 @InterfaceAudience.Public
60 @InterfaceStability.Evolving
61 public class LocalHBaseCluster {
62 static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
63 private final List<JVMClusterUtil.MasterThread> masterThreads =
64 new CopyOnWriteArrayList<JVMClusterUtil.MasterThread>();
65 private final List<JVMClusterUtil.RegionServerThread> regionThreads =
66 new CopyOnWriteArrayList<JVMClusterUtil.RegionServerThread>();
67 private final static int DEFAULT_NO = 1;
68
69 public static final String LOCAL = "local";
70
71 public static final String LOCAL_COLON = LOCAL + ":";
72 private final Configuration conf;
73 private final Class<? extends HMaster> masterClass;
74 private final Class<? extends HRegionServer> regionServerClass;
75
76
77
78
79
80
81 public LocalHBaseCluster(final Configuration conf)
82 throws IOException {
83 this(conf, DEFAULT_NO);
84 }
85
86
87
88
89
90
91
92
93 public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
94 throws IOException {
95 this(conf, 1, noRegionServers, getMasterImplementation(conf),
96 getRegionServerImplementation(conf));
97 }
98
99
100
101
102
103
104
105
106
107 public LocalHBaseCluster(final Configuration conf, final int noMasters,
108 final int noRegionServers)
109 throws IOException {
110 this(conf, noMasters, noRegionServers, getMasterImplementation(conf),
111 getRegionServerImplementation(conf));
112 }
113
114 @SuppressWarnings("unchecked")
115 private static Class<? extends HRegionServer> getRegionServerImplementation(final Configuration conf) {
116 return (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
117 HRegionServer.class);
118 }
119
120 @SuppressWarnings("unchecked")
121 private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) {
122 return (Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL,
123 HMaster.class);
124 }
125
126
127
128
129
130
131
132
133
134
135
136 @SuppressWarnings("unchecked")
137 public LocalHBaseCluster(final Configuration conf, final int noMasters,
138 final int noRegionServers, final Class<? extends HMaster> masterClass,
139 final Class<? extends HRegionServer> regionServerClass)
140 throws IOException {
141 this.conf = conf;
142
143
144 conf.set(HConstants.MASTER_PORT, "0");
145 conf.set(HConstants.REGIONSERVER_PORT, "0");
146 conf.set(HConstants.REGIONSERVER_INFO_PORT, "0");
147
148 this.masterClass = (Class<? extends HMaster>)
149 conf.getClass(HConstants.MASTER_IMPL, masterClass);
150
151 for (int i = 0; i < noMasters; i++) {
152 addMaster(new Configuration(conf), i);
153 }
154
155 this.regionServerClass =
156 (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
157 regionServerClass);
158
159 for (int i = 0; i < noRegionServers; i++) {
160 addRegionServer(new Configuration(conf), i);
161 }
162 }
163
164 public JVMClusterUtil.RegionServerThread addRegionServer()
165 throws IOException {
166 return addRegionServer(new Configuration(conf), this.regionThreads.size());
167 }
168
169 public JVMClusterUtil.RegionServerThread addRegionServer(
170 Configuration config, final int index)
171 throws IOException {
172
173
174
175 JVMClusterUtil.RegionServerThread rst =
176 JVMClusterUtil.createRegionServerThread(config,
177 this.regionServerClass, index);
178 this.regionThreads.add(rst);
179 return rst;
180 }
181
182 public JVMClusterUtil.RegionServerThread addRegionServer(
183 final Configuration config, final int index, User user)
184 throws IOException, InterruptedException {
185 return user.runAs(
186 new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
187 public JVMClusterUtil.RegionServerThread run() throws Exception {
188 return addRegionServer(config, index);
189 }
190 });
191 }
192
193 public JVMClusterUtil.MasterThread addMaster() throws IOException {
194 return addMaster(new Configuration(conf), this.masterThreads.size());
195 }
196
197 public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
198 throws IOException {
199
200
201
202 JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c,
203 (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
204 this.masterThreads.add(mt);
205 return mt;
206 }
207
208 public JVMClusterUtil.MasterThread addMaster(
209 final Configuration c, final int index, User user)
210 throws IOException, InterruptedException {
211 return user.runAs(
212 new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() {
213 public JVMClusterUtil.MasterThread run() throws Exception {
214 return addMaster(c, index);
215 }
216 });
217 }
218
219
220
221
222
223 public HRegionServer getRegionServer(int serverNumber) {
224 return regionThreads.get(serverNumber).getRegionServer();
225 }
226
227
228
229
230 public List<JVMClusterUtil.RegionServerThread> getRegionServers() {
231 return Collections.unmodifiableList(this.regionThreads);
232 }
233
234
235
236
237
238
239 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
240 List<JVMClusterUtil.RegionServerThread> liveServers =
241 new ArrayList<JVMClusterUtil.RegionServerThread>();
242 List<RegionServerThread> list = getRegionServers();
243 for (JVMClusterUtil.RegionServerThread rst: list) {
244 if (rst.isAlive()) liveServers.add(rst);
245 else LOG.info("Not alive " + rst.getName());
246 }
247 return liveServers;
248 }
249
250
251
252
253
254
255
256 public String waitOnRegionServer(int serverNumber) {
257 JVMClusterUtil.RegionServerThread regionServerThread =
258 this.regionThreads.remove(serverNumber);
259 while (regionServerThread.isAlive()) {
260 try {
261 LOG.info("Waiting on " +
262 regionServerThread.getRegionServer().toString());
263 regionServerThread.join();
264 } catch (InterruptedException e) {
265 e.printStackTrace();
266 }
267 }
268 return regionServerThread.getName();
269 }
270
271
272
273
274
275
276
277 public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
278 while (rst.isAlive()) {
279 try {
280 LOG.info("Waiting on " +
281 rst.getRegionServer().toString());
282 rst.join();
283 } catch (InterruptedException e) {
284 e.printStackTrace();
285 }
286 }
287 for (int i=0;i<regionThreads.size();i++) {
288 if (regionThreads.get(i) == rst) {
289 regionThreads.remove(i);
290 break;
291 }
292 }
293 return rst.getName();
294 }
295
296
297
298
299
300 public HMaster getMaster(int serverNumber) {
301 return masterThreads.get(serverNumber).getMaster();
302 }
303
304
305
306
307
308
309 public HMaster getActiveMaster() {
310 for (JVMClusterUtil.MasterThread mt : masterThreads) {
311 if (mt.getMaster().isActiveMaster()) {
312
313
314 if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) {
315 return mt.getMaster();
316 }
317 }
318 }
319 return null;
320 }
321
322
323
324
325 public List<JVMClusterUtil.MasterThread> getMasters() {
326 return Collections.unmodifiableList(this.masterThreads);
327 }
328
329
330
331
332
333
334 public List<JVMClusterUtil.MasterThread> getLiveMasters() {
335 List<JVMClusterUtil.MasterThread> liveServers =
336 new ArrayList<JVMClusterUtil.MasterThread>();
337 List<JVMClusterUtil.MasterThread> list = getMasters();
338 for (JVMClusterUtil.MasterThread mt: list) {
339 if (mt.isAlive()) {
340 liveServers.add(mt);
341 }
342 }
343 return liveServers;
344 }
345
346
347
348
349
350
351
352 public String waitOnMaster(int serverNumber) {
353 JVMClusterUtil.MasterThread masterThread =
354 this.masterThreads.remove(serverNumber);
355 while (masterThread.isAlive()) {
356 try {
357 LOG.info("Waiting on " +
358 masterThread.getMaster().getServerName().toString());
359 masterThread.join();
360 } catch (InterruptedException e) {
361 e.printStackTrace();
362 }
363 }
364 return masterThread.getName();
365 }
366
367
368
369
370
371
372
373 public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) {
374 while (masterThread.isAlive()) {
375 try {
376 LOG.info("Waiting on " +
377 masterThread.getMaster().getServerName().toString());
378 masterThread.join();
379 } catch (InterruptedException e) {
380 e.printStackTrace();
381 }
382 }
383 for (int i=0;i<masterThreads.size();i++) {
384 if (masterThreads.get(i) == masterThread) {
385 masterThreads.remove(i);
386 break;
387 }
388 }
389 return masterThread.getName();
390 }
391
392
393
394
395
396 public void join() {
397 if (this.regionThreads != null) {
398 for(Thread t: this.regionThreads) {
399 if (t.isAlive()) {
400 try {
401 Threads.threadDumpingIsAlive(t);
402 } catch (InterruptedException e) {
403 LOG.debug("Interrupted", e);
404 }
405 }
406 }
407 }
408 if (this.masterThreads != null) {
409 for (Thread t : this.masterThreads) {
410 if (t.isAlive()) {
411 try {
412 Threads.threadDumpingIsAlive(t);
413 } catch (InterruptedException e) {
414 LOG.debug("Interrupted", e);
415 }
416 }
417 }
418 }
419 }
420
421
422
423
424 public void startup() throws IOException {
425 JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
426 }
427
428
429
430
431 public void shutdown() {
432 JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
433 }
434
435
436
437
438
439 public static boolean isLocal(final Configuration c) {
440 boolean mode = c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
441 return(mode == HConstants.CLUSTER_IS_LOCAL);
442 }
443
444
445
446
447
448
449 public static void main(String[] args) throws IOException {
450 Configuration conf = HBaseConfiguration.create();
451 LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
452 cluster.startup();
453 HBaseAdmin admin = new HBaseAdmin(conf);
454 HTableDescriptor htd =
455 new HTableDescriptor(TableName.valueOf(cluster.getClass().getName()));
456 admin.createTable(htd);
457 cluster.shutdown();
458 }
459 }