1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.security.PrivilegedAction;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.classification.InterfaceAudience;
29 import org.apache.hadoop.classification.InterfaceStability;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.hbase.client.HConnectionManager;
33 import org.apache.hadoop.hbase.master.HMaster;
34 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
35 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
36 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
37 import org.apache.hadoop.hbase.regionserver.HRegion;
38 import org.apache.hadoop.hbase.regionserver.HRegionServer;
39 import org.apache.hadoop.hbase.security.User;
40 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
41 import org.apache.hadoop.hbase.util.JVMClusterUtil;
42 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
43 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
44 import org.apache.hadoop.hbase.util.Threads;
45
46
47
48
49
50
51
52 @InterfaceAudience.Public
53 @InterfaceStability.Evolving
54 public class MiniHBaseCluster extends HBaseCluster {
55 static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName());
56 public LocalHBaseCluster hbaseCluster;
57 private static int index;
58
59
60
61
62
63
64
65 public MiniHBaseCluster(Configuration conf, int numRegionServers)
66 throws IOException, InterruptedException {
67 this(conf, 1, numRegionServers);
68 }
69
70
71
72
73
74
75
76
77 public MiniHBaseCluster(Configuration conf, int numMasters,
78 int numRegionServers)
79 throws IOException, InterruptedException {
80 this(conf, numMasters, numRegionServers, null, null);
81 }
82
83 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
84 Class<? extends HMaster> masterClass,
85 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
86 throws IOException, InterruptedException {
87 super(conf);
88 conf.set(HConstants.MASTER_PORT, "0");
89
90
91 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
92
93 init(numMasters, numRegionServers, masterClass, regionserverClass);
94 this.initialClusterStatus = getClusterStatus();
95 }
96
97 public Configuration getConfiguration() {
98 return this.conf;
99 }
100
101
102
103
104
105
106
107 public static class MiniHBaseClusterRegionServer extends HRegionServer {
108 private Thread shutdownThread = null;
109 private User user = null;
110 public static boolean TEST_SKIP_CLOSE = false;
111
112 public MiniHBaseClusterRegionServer(Configuration conf)
113 throws IOException, InterruptedException {
114 super(conf);
115 this.user = User.getCurrent();
116 }
117
118
119
120
121
122
123
124
125
126 @Override
127 protected void handleReportForDutyResponse(
128 final RegionServerStartupResponse c) throws IOException {
129 super.handleReportForDutyResponse(c);
130
131 this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
132 }
133
134 @Override
135 public void run() {
136 try {
137 this.user.runAs(new PrivilegedAction<Object>(){
138 public Object run() {
139 runRegionServer();
140 return null;
141 }
142 });
143 } catch (Throwable t) {
144 LOG.error("Exception in run", t);
145 } finally {
146
147 if (this.shutdownThread != null) {
148 this.shutdownThread.start();
149 Threads.shutdown(this.shutdownThread, 30000);
150 }
151 }
152 }
153
154 private void runRegionServer() {
155 super.run();
156 }
157
158 @Override
159 public void kill() {
160 super.kill();
161 }
162
163 public void abort(final String reason, final Throwable cause) {
164 this.user.runAs(new PrivilegedAction<Object>() {
165 public Object run() {
166 abortRegionServer(reason, cause);
167 return null;
168 }
169 });
170 }
171
172 private void abortRegionServer(String reason, Throwable cause) {
173 super.abort(reason, cause);
174 }
175 }
176
177
178
179
180
181 static class SingleFileSystemShutdownThread extends Thread {
182 private final FileSystem fs;
183 SingleFileSystemShutdownThread(final FileSystem fs) {
184 super("Shutdown of " + fs);
185 this.fs = fs;
186 }
187 @Override
188 public void run() {
189 try {
190 LOG.info("Hook closing fs=" + this.fs);
191 this.fs.close();
192 } catch (NullPointerException npe) {
193 LOG.debug("Need to fix these: " + npe.toString());
194 } catch (IOException e) {
195 LOG.warn("Running hook", e);
196 }
197 }
198 }
199
200 private void init(final int nMasterNodes, final int nRegionNodes,
201 Class<? extends HMaster> masterClass,
202 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
203 throws IOException, InterruptedException {
204 try {
205 if (masterClass == null){
206 masterClass = HMaster.class;
207 }
208 if (regionserverClass == null){
209 regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class;
210 }
211
212
213 hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0,
214 masterClass, regionserverClass);
215
216
217 for (int i=0; i<nRegionNodes; i++) {
218 Configuration rsConf = HBaseConfiguration.create(conf);
219 User user = HBaseTestingUtility.getDifferentUser(rsConf,
220 ".hfs."+index++);
221 hbaseCluster.addRegionServer(rsConf, i, user);
222 }
223
224 hbaseCluster.startup();
225 } catch (IOException e) {
226 shutdown();
227 throw e;
228 } catch (Throwable t) {
229 LOG.error("Error starting cluster", t);
230 shutdown();
231 throw new IOException("Shutting down", t);
232 }
233 }
234
235 @Override
236 public void startRegionServer(String hostname) throws IOException {
237 this.startRegionServer();
238 }
239
240 @Override
241 public void killRegionServer(ServerName serverName) throws IOException {
242 HRegionServer server = getRegionServer(getRegionServerIndex(serverName));
243 if (server instanceof MiniHBaseClusterRegionServer) {
244 LOG.info("Killing " + server.toString());
245 ((MiniHBaseClusterRegionServer) server).kill();
246 } else {
247 abortRegionServer(getRegionServerIndex(serverName));
248 }
249 }
250
251 @Override
252 public void stopRegionServer(ServerName serverName) throws IOException {
253 stopRegionServer(getRegionServerIndex(serverName));
254 }
255
256 @Override
257 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
258
259 waitOnRegionServer(getRegionServerIndex(serverName));
260 }
261
262 @Override
263 public void startMaster(String hostname) throws IOException {
264 this.startMaster();
265 }
266
267 @Override
268 public void killMaster(ServerName serverName) throws IOException {
269 abortMaster(getMasterIndex(serverName));
270 }
271
272 @Override
273 public void stopMaster(ServerName serverName) throws IOException {
274 stopMaster(getMasterIndex(serverName));
275 }
276
277 @Override
278 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
279
280 waitOnMaster(getMasterIndex(serverName));
281 }
282
283
284
285
286
287
288
289 public JVMClusterUtil.RegionServerThread startRegionServer()
290 throws IOException {
291 final Configuration newConf = HBaseConfiguration.create(conf);
292 User rsUser =
293 HBaseTestingUtility.getDifferentUser(newConf, ".hfs."+index++);
294 JVMClusterUtil.RegionServerThread t = null;
295 try {
296 t = hbaseCluster.addRegionServer(
297 newConf, hbaseCluster.getRegionServers().size(), rsUser);
298 t.start();
299 t.waitForServerOnline();
300 } catch (InterruptedException ie) {
301 throw new IOException("Interrupted adding regionserver to cluster", ie);
302 }
303 return t;
304 }
305
306
307
308
309
310 public String abortRegionServer(int serverNumber) {
311 HRegionServer server = getRegionServer(serverNumber);
312 LOG.info("Aborting " + server.toString());
313 server.abort("Aborting for tests", new Exception("Trace info"));
314 return server.toString();
315 }
316
317
318
319
320
321
322
323 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
324 return stopRegionServer(serverNumber, true);
325 }
326
327
328
329
330
331
332
333
334
335
336
337 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
338 final boolean shutdownFS) {
339 JVMClusterUtil.RegionServerThread server =
340 hbaseCluster.getRegionServers().get(serverNumber);
341 LOG.info("Stopping " + server.toString());
342 server.getRegionServer().stop("Stopping rs " + serverNumber);
343 return server;
344 }
345
346
347
348
349
350
351
352 public String waitOnRegionServer(final int serverNumber) {
353 return this.hbaseCluster.waitOnRegionServer(serverNumber);
354 }
355
356
357
358
359
360
361
362
363 public JVMClusterUtil.MasterThread startMaster() throws IOException {
364 Configuration c = HBaseConfiguration.create(conf);
365 User user =
366 HBaseTestingUtility.getDifferentUser(c, ".hfs."+index++);
367
368 JVMClusterUtil.MasterThread t = null;
369 try {
370 t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
371 t.start();
372 } catch (InterruptedException ie) {
373 throw new IOException("Interrupted adding master to cluster", ie);
374 }
375 return t;
376 }
377
378
379
380
381
382 public HMaster getMaster() {
383 return this.hbaseCluster.getActiveMaster();
384 }
385
386
387
388
389
390 public HMaster getMaster(final int serverNumber) {
391 return this.hbaseCluster.getMaster(serverNumber);
392 }
393
394
395
396
397
398 public String abortMaster(int serverNumber) {
399 HMaster server = getMaster(serverNumber);
400 LOG.info("Aborting " + server.toString());
401 server.abort("Aborting for tests", new Exception("Trace info"));
402 return server.toString();
403 }
404
405
406
407
408
409
410
411 public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
412 return stopMaster(serverNumber, true);
413 }
414
415
416
417
418
419
420
421
422
423
424
425 public JVMClusterUtil.MasterThread stopMaster(int serverNumber,
426 final boolean shutdownFS) {
427 JVMClusterUtil.MasterThread server =
428 hbaseCluster.getMasters().get(serverNumber);
429 LOG.info("Stopping " + server.toString());
430 server.getMaster().stop("Stopping master " + serverNumber);
431 return server;
432 }
433
434
435
436
437
438
439
440 public String waitOnMaster(final int serverNumber) {
441 return this.hbaseCluster.waitOnMaster(serverNumber);
442 }
443
444
445
446
447
448
449
450
451
452 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
453 List<JVMClusterUtil.MasterThread> mts;
454 long start = System.currentTimeMillis();
455 while (!(mts = getMasterThreads()).isEmpty()
456 && (System.currentTimeMillis() - start) < timeout) {
457 for (JVMClusterUtil.MasterThread mt : mts) {
458 if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
459 return true;
460 }
461 }
462
463 Threads.sleep(100);
464 }
465 return false;
466 }
467
468
469
470
471 public List<JVMClusterUtil.MasterThread> getMasterThreads() {
472 return this.hbaseCluster.getMasters();
473 }
474
475
476
477
478 public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
479 return this.hbaseCluster.getLiveMasters();
480 }
481
482
483
484
485 public void join() {
486 this.hbaseCluster.join();
487 }
488
489
490
491
492
493 public void shutdown() throws IOException {
494 if (this.hbaseCluster != null) {
495 this.hbaseCluster.shutdown();
496 }
497 HConnectionManager.deleteAllConnections(false);
498 }
499
500 @Override
501 public void close() throws IOException {
502 }
503
504 @Override
505 public ClusterStatus getClusterStatus() throws IOException {
506 HMaster master = getMaster();
507 return master == null ? null : master.getClusterStatus();
508 }
509
510
511
512
513
514 public void flushcache() throws IOException {
515 for (JVMClusterUtil.RegionServerThread t:
516 this.hbaseCluster.getRegionServers()) {
517 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
518 r.flushcache();
519 }
520 }
521 }
522
523
524
525
526
527 public void flushcache(TableName tableName) throws IOException {
528 for (JVMClusterUtil.RegionServerThread t:
529 this.hbaseCluster.getRegionServers()) {
530 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
531 if(r.getTableDesc().getTableName().equals(tableName)) {
532 r.flushcache();
533 }
534 }
535 }
536 }
537
538
539
540
541
542 public void compact(boolean major) throws IOException {
543 for (JVMClusterUtil.RegionServerThread t:
544 this.hbaseCluster.getRegionServers()) {
545 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
546 r.compactStores(major);
547 }
548 }
549 }
550
551
552
553
554
555 public void compact(TableName tableName, boolean major) throws IOException {
556 for (JVMClusterUtil.RegionServerThread t:
557 this.hbaseCluster.getRegionServers()) {
558 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
559 if(r.getTableDesc().getTableName().equals(tableName)) {
560 r.compactStores(major);
561 }
562 }
563 }
564 }
565
566
567
568
569 public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
570 return this.hbaseCluster.getRegionServers();
571 }
572
573
574
575
576 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
577 return this.hbaseCluster.getLiveRegionServers();
578 }
579
580
581
582
583
584
585 public HRegionServer getRegionServer(int serverNumber) {
586 return hbaseCluster.getRegionServer(serverNumber);
587 }
588
589 public List<HRegion> getRegions(byte[] tableName) {
590 return getRegions(TableName.valueOf(tableName));
591 }
592
593 public List<HRegion> getRegions(TableName tableName) {
594 List<HRegion> ret = new ArrayList<HRegion>();
595 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
596 HRegionServer hrs = rst.getRegionServer();
597 for (HRegion region : hrs.getOnlineRegionsLocalContext()) {
598 if (region.getTableDesc().getTableName().equals(tableName)) {
599 ret.add(region);
600 }
601 }
602 }
603 return ret;
604 }
605
606
607
608
609
610 public int getServerWithMeta() {
611 return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
612 }
613
614
615
616
617
618
619
620 public int getServerWith(byte[] regionName) {
621 int index = -1;
622 int count = 0;
623 for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
624 HRegionServer hrs = rst.getRegionServer();
625 HRegion metaRegion =
626 hrs.getOnlineRegion(regionName);
627 if (metaRegion != null) {
628 index = count;
629 break;
630 }
631 count++;
632 }
633 return index;
634 }
635
636 @Override
637 public ServerName getServerHoldingRegion(byte[] regionName) throws IOException {
638 int index = getServerWith(regionName);
639 if (index < 0) {
640 return null;
641 }
642 return getRegionServer(index).getServerName();
643 }
644
645
646
647
648
649
650
651 public long countServedRegions() {
652 long count = 0;
653 for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
654 count += rst.getRegionServer().getNumberOfOnlineRegions();
655 }
656 return count;
657 }
658
659
660
661
662
663 public void killAll() {
664 for (RegionServerThread rst : getRegionServerThreads()) {
665 rst.getRegionServer().abort("killAll");
666 }
667 for (MasterThread masterThread : getMasterThreads()) {
668 masterThread.getMaster().abort("killAll", new Throwable());
669 }
670 }
671
672 @Override
673 public void waitUntilShutDown() {
674 this.hbaseCluster.join();
675 }
676
677 public List<HRegion> findRegionsForTable(TableName tableName) {
678 ArrayList<HRegion> ret = new ArrayList<HRegion>();
679 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
680 HRegionServer hrs = rst.getRegionServer();
681 for (HRegion region : hrs.getOnlineRegions(tableName)) {
682 if (region.getTableDesc().getTableName().equals(tableName)) {
683 ret.add(region);
684 }
685 }
686 }
687 return ret;
688 }
689
690
691 protected int getRegionServerIndex(ServerName serverName) {
692
693 List<RegionServerThread> servers = getRegionServerThreads();
694 for (int i=0; i < servers.size(); i++) {
695 if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
696 return i;
697 }
698 }
699 return -1;
700 }
701
702 protected int getMasterIndex(ServerName serverName) {
703 List<MasterThread> masters = getMasterThreads();
704 for (int i = 0; i < masters.size(); i++) {
705 if (masters.get(i).getMaster().getServerName().equals(serverName)) {
706 return i;
707 }
708 }
709 return -1;
710 }
711
712 @Override
713 public AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException {
714 return getRegionServer(getRegionServerIndex(serverName));
715 }
716
717 @Override
718 public ClientService.BlockingInterface getClientProtocol(ServerName serverName)
719 throws IOException {
720 return getRegionServer(getRegionServerIndex(serverName));
721 }
722 }