View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.security.PrivilegedAction;
23  import java.util.ArrayList;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.classification.InterfaceStability;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FileSystem;
32  import org.apache.hadoop.hbase.master.HMaster;
33  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
34  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
35  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
36  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
37  import org.apache.hadoop.hbase.regionserver.HRegion;
38  import org.apache.hadoop.hbase.regionserver.HRegionServer;
39  import org.apache.hadoop.hbase.regionserver.Region;
40  import org.apache.hadoop.hbase.security.User;
41  import org.apache.hadoop.hbase.test.MetricsAssertHelper;
42  import org.apache.hadoop.hbase.util.JVMClusterUtil;
43  import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
44  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
45  import org.apache.hadoop.hbase.util.Threads;
46  
47  /**
48   * This class creates a single process HBase cluster.
49   * each server.  The master uses the 'default' FileSystem.  The RegionServers,
50   * if we are running on DistributedFilesystem, create a FileSystem instance
51   * each and will close down their instance on the way out.
52   */
53  @InterfaceAudience.Public
54  @InterfaceStability.Evolving
55  public class MiniHBaseCluster extends HBaseCluster {
56    private static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName());
57    public LocalHBaseCluster hbaseCluster;
58    private static int index;
59  
60    /**
61     * Start a MiniHBaseCluster.
62     * @param conf Configuration to be used for cluster
63     * @param numRegionServers initial number of region servers to start.
64     * @throws IOException
65     */
66    public MiniHBaseCluster(Configuration conf, int numRegionServers)
67    throws IOException, InterruptedException {
68      this(conf, 1, numRegionServers);
69    }
70  
71    /**
72     * Start a MiniHBaseCluster.
73     * @param conf Configuration to be used for cluster
74     * @param numMasters initial number of masters to start.
75     * @param numRegionServers initial number of region servers to start.
76     * @throws IOException
77     */
78    public MiniHBaseCluster(Configuration conf, int numMasters,
79                               int numRegionServers)
80        throws IOException, InterruptedException {
81      this(conf, numMasters, numRegionServers, null, null);
82    }
83  
84    public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
85           Class<? extends HMaster> masterClass,
86           Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
87        throws IOException, InterruptedException {
88      super(conf);
89      conf.set(HConstants.MASTER_PORT, "0");
90  
91      // Hadoop 2
92      CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
93  
94      init(numMasters, numRegionServers, masterClass, regionserverClass);
95      this.initialClusterStatus = getClusterStatus();
96    }
97  
98    public Configuration getConfiguration() {
99      return this.conf;
100   }
101 
102   /**
103    * Subclass so can get at protected methods (none at moment).  Also, creates
104    * a FileSystem instance per instantiation.  Adds a shutdown own FileSystem
105    * on the way out. Shuts down own Filesystem only, not All filesystems as
106    * the FileSystem system exit hook does.
107    */
108   public static class MiniHBaseClusterRegionServer extends HRegionServer {
109     private Thread shutdownThread = null;
110     private User user = null;
111     public static boolean TEST_SKIP_CLOSE = false;
112 
113     public MiniHBaseClusterRegionServer(Configuration conf, CoordinatedStateManager cp)
114         throws IOException, InterruptedException {
115       super(conf, cp);
116       this.user = User.getCurrent();
117     }
118 
119     /*
120      * @param c
121      * @param currentfs We return this if we did not make a new one.
122      * @param uniqueName Same name used to help identify the created fs.
123      * @return A new fs instance if we are up on DistributeFileSystem.
124      * @throws IOException
125      */
126 
127     @Override
128     protected void handleReportForDutyResponse(
129         final RegionServerStartupResponse c) throws IOException {
130       super.handleReportForDutyResponse(c);
131       // Run this thread to shutdown our filesystem on way out.
132       this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
133     }
134 
135     @Override
136     public void run() {
137       try {
138         this.user.runAs(new PrivilegedAction<Object>(){
139           public Object run() {
140             runRegionServer();
141             return null;
142           }
143         });
144       } catch (Throwable t) {
145         LOG.error("Exception in run", t);
146       } finally {
147         // Run this on the way out.
148         if (this.shutdownThread != null) {
149           this.shutdownThread.start();
150           Threads.shutdown(this.shutdownThread, 30000);
151         }
152       }
153     }
154 
155     private void runRegionServer() {
156       super.run();
157     }
158 
159     @Override
160     public void kill() {
161       super.kill();
162     }
163 
164     public void abort(final String reason, final Throwable cause) {
165       this.user.runAs(new PrivilegedAction<Object>() {
166         public Object run() {
167           abortRegionServer(reason, cause);
168           return null;
169         }
170       });
171     }
172 
173     private void abortRegionServer(String reason, Throwable cause) {
174       super.abort(reason, cause);
175     }
176   }
177 
178   /**
179    * Alternate shutdown hook.
180    * Just shuts down the passed fs, not all as default filesystem hook does.
181    */
182   static class SingleFileSystemShutdownThread extends Thread {
183     private final FileSystem fs;
184     SingleFileSystemShutdownThread(final FileSystem fs) {
185       super("Shutdown of " + fs);
186       this.fs = fs;
187     }
188     @Override
189     public void run() {
190       try {
191         LOG.info("Hook closing fs=" + this.fs);
192         this.fs.close();
193       } catch (NullPointerException npe) {
194         LOG.debug("Need to fix these: " + npe.toString());
195       } catch (IOException e) {
196         LOG.warn("Running hook", e);
197       }
198     }
199   }
200 
201   private void init(final int nMasterNodes, final int nRegionNodes,
202                  Class<? extends HMaster> masterClass,
203                  Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
204   throws IOException, InterruptedException {
205     try {
206       if (masterClass == null){
207         masterClass =  HMaster.class;
208       }
209       if (regionserverClass == null){
210         regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class;
211       }
212 
213       // start up a LocalHBaseCluster
214       hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0,
215           masterClass, regionserverClass);
216 
217       // manually add the regionservers as other users
218       for (int i=0; i<nRegionNodes; i++) {
219         Configuration rsConf = HBaseConfiguration.create(conf);
220         User user = HBaseTestingUtility.getDifferentUser(rsConf,
221             ".hfs."+index++);
222         hbaseCluster.addRegionServer(rsConf, i, user);
223       }
224 
225       hbaseCluster.startup();
226     } catch (IOException e) {
227       shutdown();
228       throw e;
229     } catch (Throwable t) {
230       LOG.error("Error starting cluster", t);
231       shutdown();
232       throw new IOException("Shutting down", t);
233     }
234   }
235 
236   @Override
237   public void startRegionServer(String hostname, int port) throws IOException {
238     this.startRegionServer();
239   }
240 
241   @Override
242   public void killRegionServer(ServerName serverName) throws IOException {
243     HRegionServer server = getRegionServer(getRegionServerIndex(serverName));
244     if (server instanceof MiniHBaseClusterRegionServer) {
245       LOG.info("Killing " + server.toString());
246       ((MiniHBaseClusterRegionServer) server).kill();
247     } else {
248       abortRegionServer(getRegionServerIndex(serverName));
249     }
250   }
251 
252   @Override
253   public void stopRegionServer(ServerName serverName) throws IOException {
254     stopRegionServer(getRegionServerIndex(serverName));
255   }
256 
257   @Override
258   public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
259     //ignore timeout for now
260     waitOnRegionServer(getRegionServerIndex(serverName));
261   }
262 
263   @Override
264   public void startZkNode(String hostname, int port) throws IOException {
265     LOG.warn("Starting zookeeper nodes on mini cluster is not supported");
266   }
267 
268   @Override
269   public void killZkNode(ServerName serverName) throws IOException {
270     LOG.warn("Aborting zookeeper nodes on mini cluster is not supported");
271   }
272 
273   @Override
274   public void stopZkNode(ServerName serverName) throws IOException {
275     LOG.warn("Stopping zookeeper nodes on mini cluster is not supported");
276   }
277 
278   @Override
279   public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
280     LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported");
281   }
282 
283   @Override
284   public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
285     LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported");
286   }
287 
288   @Override
289   public void startDataNode(ServerName serverName) throws IOException {
290     LOG.warn("Starting datanodes on mini cluster is not supported");
291   }
292 
293   @Override
294   public void killDataNode(ServerName serverName) throws IOException {
295     LOG.warn("Aborting datanodes on mini cluster is not supported");
296   }
297 
298   @Override
299   public void stopDataNode(ServerName serverName) throws IOException {
300     LOG.warn("Stopping datanodes on mini cluster is not supported");
301   }
302 
303   @Override
304   public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
305     LOG.warn("Waiting for datanodes to start on mini cluster is not supported");
306   }
307 
308   @Override
309   public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
310     LOG.warn("Waiting for datanodes to stop on mini cluster is not supported");
311   }
312 
313   @Override
314   public void startMaster(String hostname, int port) throws IOException {
315     this.startMaster();
316   }
317 
318   @Override
319   public void killMaster(ServerName serverName) throws IOException {
320     abortMaster(getMasterIndex(serverName));
321   }
322 
323   @Override
324   public void stopMaster(ServerName serverName) throws IOException {
325     stopMaster(getMasterIndex(serverName));
326   }
327 
328   @Override
329   public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
330     //ignore timeout for now
331     waitOnMaster(getMasterIndex(serverName));
332   }
333 
334   /**
335    * Starts a region server thread running
336    *
337    * @throws IOException
338    * @return New RegionServerThread
339    */
340   public JVMClusterUtil.RegionServerThread startRegionServer()
341       throws IOException {
342     final Configuration newConf = HBaseConfiguration.create(conf);
343     User rsUser =
344         HBaseTestingUtility.getDifferentUser(newConf, ".hfs."+index++);
345     JVMClusterUtil.RegionServerThread t =  null;
346     try {
347       t = hbaseCluster.addRegionServer(
348           newConf, hbaseCluster.getRegionServers().size(), rsUser);
349       t.start();
350       t.waitForServerOnline();
351     } catch (InterruptedException ie) {
352       throw new IOException("Interrupted adding regionserver to cluster", ie);
353     }
354     return t;
355   }
356 
357   /**
358    * Cause a region server to exit doing basic clean up only on its way out.
359    * @param serverNumber  Used as index into a list.
360    */
361   public String abortRegionServer(int serverNumber) {
362     HRegionServer server = getRegionServer(serverNumber);
363     LOG.info("Aborting " + server.toString());
364     server.abort("Aborting for tests", new Exception("Trace info"));
365     return server.toString();
366   }
367 
368   /**
369    * Shut down the specified region server cleanly
370    *
371    * @param serverNumber  Used as index into a list.
372    * @return the region server that was stopped
373    */
374   public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
375     return stopRegionServer(serverNumber, true);
376   }
377 
378   /**
379    * Shut down the specified region server cleanly
380    *
381    * @param serverNumber  Used as index into a list.
382    * @param shutdownFS True is we are to shutdown the filesystem as part of this
383    * regionserver's shutdown.  Usually we do but you do not want to do this if
384    * you are running multiple regionservers in a test and you shut down one
385    * before end of the test.
386    * @return the region server that was stopped
387    */
388   public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
389       final boolean shutdownFS) {
390     JVMClusterUtil.RegionServerThread server =
391       hbaseCluster.getRegionServers().get(serverNumber);
392     LOG.info("Stopping " + server.toString());
393     server.getRegionServer().stop("Stopping rs " + serverNumber);
394     return server;
395   }
396 
397   /**
398    * Wait for the specified region server to stop. Removes this thread from list
399    * of running threads.
400    * @param serverNumber
401    * @return Name of region server that just went down.
402    */
403   public String waitOnRegionServer(final int serverNumber) {
404     return this.hbaseCluster.waitOnRegionServer(serverNumber);
405   }
406 
407 
408   /**
409    * Starts a master thread running
410    *
411    * @throws IOException
412    * @return New RegionServerThread
413    */
414   public JVMClusterUtil.MasterThread startMaster() throws IOException {
415     Configuration c = HBaseConfiguration.create(conf);
416     User user =
417         HBaseTestingUtility.getDifferentUser(c, ".hfs."+index++);
418 
419     JVMClusterUtil.MasterThread t = null;
420     try {
421       t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
422       t.start();
423     } catch (InterruptedException ie) {
424       throw new IOException("Interrupted adding master to cluster", ie);
425     }
426     return t;
427   }
428 
429   /**
430    * Returns the current active master, if available.
431    * @return the active HMaster, null if none is active.
432    */
433   public MasterService.BlockingInterface getMasterAdminService() {
434     return this.hbaseCluster.getActiveMaster().getMasterRpcServices();
435   }
436 
437   /**
438    * Returns the current active master, if available.
439    * @return the active HMaster, null if none is active.
440    */
441   public HMaster getMaster() {
442     return this.hbaseCluster.getActiveMaster();
443   }
444 
445   /**
446    * Returns the current active master thread, if available.
447    * @return the active MasterThread, null if none is active.
448    */
449   public MasterThread getMasterThread() {
450     for (MasterThread mt: hbaseCluster.getLiveMasters()) {
451       if (mt.getMaster().isActiveMaster()) {
452         return mt;
453       }
454     }
455     return null;
456   }
457 
458   /**
459    * Returns the master at the specified index, if available.
460    * @return the active HMaster, null if none is active.
461    */
462   public HMaster getMaster(final int serverNumber) {
463     return this.hbaseCluster.getMaster(serverNumber);
464   }
465 
466   /**
467    * Cause a master to exit without shutting down entire cluster.
468    * @param serverNumber  Used as index into a list.
469    */
470   public String abortMaster(int serverNumber) {
471     HMaster server = getMaster(serverNumber);
472     LOG.info("Aborting " + server.toString());
473     server.abort("Aborting for tests", new Exception("Trace info"));
474     return server.toString();
475   }
476 
477   /**
478    * Shut down the specified master cleanly
479    *
480    * @param serverNumber  Used as index into a list.
481    * @return the region server that was stopped
482    */
483   public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
484     return stopMaster(serverNumber, true);
485   }
486 
487   /**
488    * Shut down the specified master cleanly
489    *
490    * @param serverNumber  Used as index into a list.
491    * @param shutdownFS True is we are to shutdown the filesystem as part of this
492    * master's shutdown.  Usually we do but you do not want to do this if
493    * you are running multiple master in a test and you shut down one
494    * before end of the test.
495    * @return the master that was stopped
496    */
497   public JVMClusterUtil.MasterThread stopMaster(int serverNumber,
498       final boolean shutdownFS) {
499     JVMClusterUtil.MasterThread server =
500       hbaseCluster.getMasters().get(serverNumber);
501     LOG.info("Stopping " + server.toString());
502     server.getMaster().stop("Stopping master " + serverNumber);
503     return server;
504   }
505 
506   /**
507    * Wait for the specified master to stop. Removes this thread from list
508    * of running threads.
509    * @param serverNumber
510    * @return Name of master that just went down.
511    */
512   public String waitOnMaster(final int serverNumber) {
513     return this.hbaseCluster.waitOnMaster(serverNumber);
514   }
515 
516   /**
517    * Blocks until there is an active master and that master has completed
518    * initialization.
519    *
520    * @return true if an active master becomes available.  false if there are no
521    *         masters left.
522    * @throws InterruptedException
523    */
524   public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
525     List<JVMClusterUtil.MasterThread> mts;
526     long start = System.currentTimeMillis();
527     while (!(mts = getMasterThreads()).isEmpty()
528         && (System.currentTimeMillis() - start) < timeout) {
529       for (JVMClusterUtil.MasterThread mt : mts) {
530         if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
531           return true;
532         }
533       }
534 
535       Threads.sleep(100);
536     }
537     return false;
538   }
539 
540   /**
541    * @return List of master threads.
542    */
543   public List<JVMClusterUtil.MasterThread> getMasterThreads() {
544     return this.hbaseCluster.getMasters();
545   }
546 
547   /**
548    * @return List of live master threads (skips the aborted and the killed)
549    */
550   public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
551     return this.hbaseCluster.getLiveMasters();
552   }
553 
554   /**
555    * Wait for Mini HBase Cluster to shut down.
556    */
557   public void join() {
558     this.hbaseCluster.join();
559   }
560 
561   /**
562    * Shut down the mini HBase cluster
563    * @throws IOException
564    */
565   @SuppressWarnings("deprecation")
566   public void shutdown() throws IOException {
567     if (this.hbaseCluster != null) {
568       this.hbaseCluster.shutdown();
569     }
570   }
571 
572   @Override
573   public void close() throws IOException {
574   }
575 
576   @Override
577   public ClusterStatus getClusterStatus() throws IOException {
578     HMaster master = getMaster();
579     return master == null ? null : master.getClusterStatus();
580   }
581 
582   /**
583    * Call flushCache on all regions on all participating regionservers.
584    * @throws IOException
585    */
586   public void flushcache() throws IOException {
587     for (JVMClusterUtil.RegionServerThread t:
588         this.hbaseCluster.getRegionServers()) {
589       for(Region r: t.getRegionServer().getOnlineRegionsLocalContext()) {
590         r.flush(true);
591       }
592     }
593   }
594 
595   /**
596    * Call flushCache on all regions of the specified table.
597    * @throws IOException
598    */
599   public void flushcache(TableName tableName) throws IOException {
600     for (JVMClusterUtil.RegionServerThread t:
601         this.hbaseCluster.getRegionServers()) {
602       for(Region r: t.getRegionServer().getOnlineRegionsLocalContext()) {
603         if(r.getTableDesc().getTableName().equals(tableName)) {
604           r.flush(true);
605         }
606       }
607     }
608   }
609 
610   /**
611    * Call flushCache on all regions on all participating regionservers.
612    * @throws IOException
613    */
614   public void compact(boolean major) throws IOException {
615     for (JVMClusterUtil.RegionServerThread t:
616         this.hbaseCluster.getRegionServers()) {
617       for(Region r: t.getRegionServer().getOnlineRegionsLocalContext()) {
618         r.compact(major);
619       }
620     }
621   }
622 
623   /**
624    * Call flushCache on all regions of the specified table.
625    * @throws IOException
626    */
627   public void compact(TableName tableName, boolean major) throws IOException {
628     for (JVMClusterUtil.RegionServerThread t:
629         this.hbaseCluster.getRegionServers()) {
630       for(Region r: t.getRegionServer().getOnlineRegionsLocalContext()) {
631         if(r.getTableDesc().getTableName().equals(tableName)) {
632           r.compact(major);
633         }
634       }
635     }
636   }
637 
638   /**
639    * @return List of region server threads.
640    */
641   public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
642     return this.hbaseCluster.getRegionServers();
643   }
644 
645   /**
646    * @return List of live region server threads (skips the aborted and the killed)
647    */
648   public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
649     return this.hbaseCluster.getLiveRegionServers();
650   }
651 
652   /**
653    * Grab a numbered region server of your choice.
654    * @param serverNumber
655    * @return region server
656    */
657   public HRegionServer getRegionServer(int serverNumber) {
658     return hbaseCluster.getRegionServer(serverNumber);
659   }
660 
661   public List<HRegion> getRegions(byte[] tableName) {
662     return getRegions(TableName.valueOf(tableName));
663   }
664 
665   public List<HRegion> getRegions(TableName tableName) {
666     List<HRegion> ret = new ArrayList<HRegion>();
667     for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
668       HRegionServer hrs = rst.getRegionServer();
669       for (Region region : hrs.getOnlineRegionsLocalContext()) {
670         if (region.getTableDesc().getTableName().equals(tableName)) {
671           ret.add((HRegion)region);
672         }
673       }
674     }
675     return ret;
676   }
677 
678   /**
679    * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
680    * of HRS carrying regionName. Returns -1 if none found.
681    */
682   public int getServerWithMeta() {
683     return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
684   }
685 
686   /**
687    * Get the location of the specified region
688    * @param regionName Name of the region in bytes
689    * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
690    * of HRS carrying hbase:meta. Returns -1 if none found.
691    */
692   public int getServerWith(byte[] regionName) {
693     int index = -1;
694     int count = 0;
695     for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
696       HRegionServer hrs = rst.getRegionServer();
697       Region region = hrs.getOnlineRegion(regionName);
698       if (region != null) {
699         index = count;
700         break;
701       }
702       count++;
703     }
704     return index;
705   }
706 
707   @Override
708   public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
709   throws IOException {
710     // Assume there is only one master thread which is the active master.
711     // If there are multiple master threads, the backup master threads
712     // should hold some regions. Please refer to #countServedRegions
713     // to see how we find out all regions.
714     HMaster master = getMaster();
715     Region region = master.getOnlineRegion(regionName);
716     if (region != null) {
717       return master.getServerName();
718     }
719     int index = getServerWith(regionName);
720     if (index < 0) {
721       return null;
722     }
723     return getRegionServer(index).getServerName();
724   }
725 
726   /**
727    * Counts the total numbers of regions being served by the currently online
728    * region servers by asking each how many regions they have.  Does not look
729    * at hbase:meta at all.  Count includes catalog tables.
730    * @return number of regions being served by all region servers
731    */
732   public long countServedRegions() {
733     long count = 0;
734     for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
735       count += rst.getRegionServer().getNumberOfOnlineRegions();
736     }
737     for (JVMClusterUtil.MasterThread mt : getLiveMasterThreads()) {
738       count += mt.getMaster().getNumberOfOnlineRegions();
739     }
740     return count;
741   }
742 
743   /**
744    * Do a simulated kill all masters and regionservers. Useful when it is
745    * impossible to bring the mini-cluster back for clean shutdown.
746    */
747   public void killAll() {
748     for (RegionServerThread rst : getRegionServerThreads()) {
749       rst.getRegionServer().abort("killAll");
750     }
751     for (MasterThread masterThread : getMasterThreads()) {
752       masterThread.getMaster().abort("killAll", new Throwable());
753     }
754   }
755 
756   @Override
757   public void waitUntilShutDown() {
758     this.hbaseCluster.join();
759   }
760 
761   public List<HRegion> findRegionsForTable(TableName tableName) {
762     ArrayList<HRegion> ret = new ArrayList<HRegion>();
763     for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
764       HRegionServer hrs = rst.getRegionServer();
765       for (Region region : hrs.getOnlineRegions(tableName)) {
766         if (region.getTableDesc().getTableName().equals(tableName)) {
767           ret.add((HRegion)region);
768         }
769       }
770     }
771     return ret;
772   }
773 
774 
775   protected int getRegionServerIndex(ServerName serverName) {
776     //we have a small number of region servers, this should be fine for now.
777     List<RegionServerThread> servers = getRegionServerThreads();
778     for (int i=0; i < servers.size(); i++) {
779       if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
780         return i;
781       }
782     }
783     return -1;
784   }
785 
786   protected int getMasterIndex(ServerName serverName) {
787     List<MasterThread> masters = getMasterThreads();
788     for (int i = 0; i < masters.size(); i++) {
789       if (masters.get(i).getMaster().getServerName().equals(serverName)) {
790         return i;
791       }
792     }
793     return -1;
794   }
795 
796   @Override
797   public AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException {
798     return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
799   }
800 
801   @Override
802   public ClientService.BlockingInterface getClientProtocol(ServerName serverName)
803   throws IOException {
804     return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
805   }
806 }