View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Comparator;
23  import java.util.List;
24  import java.util.Set;
25  import java.util.TreeSet;
26  
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.ClusterManager.ServiceType;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.client.Admin;
31  import org.apache.hadoop.hbase.client.ClusterConnection;
32  import org.apache.hadoop.hbase.client.Connection;
33  import org.apache.hadoop.hbase.client.ConnectionFactory;
34  import org.apache.hadoop.hbase.client.RegionLocator;
35  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
37  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
38  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
39  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.hbase.util.Threads;
42  
43  /**
44   * Manages the interactions with an already deployed distributed cluster (as opposed to
45   * a pseudo-distributed, or mini/local cluster). This is used by integration and system tests.
46   */
47  @InterfaceAudience.Private
48  public class DistributedHBaseCluster extends HBaseCluster {
49    private Admin admin;
50    private final Connection connection;
51  
52    private ClusterManager clusterManager;
53  
54    public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager)
55        throws IOException {
56      super(conf);
57      this.clusterManager = clusterManager;
58      this.connection = ConnectionFactory.createConnection(conf);
59      this.admin = this.connection.getAdmin();
60      this.initialClusterStatus = getClusterStatus();
61    }
62  
63    public void setClusterManager(ClusterManager clusterManager) {
64      this.clusterManager = clusterManager;
65    }
66  
67    public ClusterManager getClusterManager() {
68      return clusterManager;
69    }
70  
71    /**
72     * Returns a ClusterStatus for this HBase cluster
73     * @throws IOException
74     */
75    @Override
76    public ClusterStatus getClusterStatus() throws IOException {
77      return admin.getClusterStatus();
78    }
79  
80    @Override
81    public ClusterStatus getInitialClusterStatus() throws IOException {
82      return initialClusterStatus;
83    }
84  
85    @Override
86    public void close() throws IOException {
87      if (this.admin != null) {
88        admin.close();
89      }
90      if (this.connection != null && !this.connection.isClosed()) {
91        this.connection.close();
92      }
93    }
94  
95    @Override
96    public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName)
97    throws IOException {
98      return ((ClusterConnection)this.connection).getAdmin(serverName);
99    }
100 
101   @Override
102   public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName)
103   throws IOException {
104     return ((ClusterConnection)this.connection).getClient(serverName);
105   }
106 
107   @Override
108   public void startRegionServer(String hostname, int port) throws IOException {
109     LOG.info("Starting RS on: " + hostname);
110     clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port);
111   }
112 
113   @Override
114   public void killRegionServer(ServerName serverName) throws IOException {
115     LOG.info("Aborting RS: " + serverName.getServerName());
116     clusterManager.kill(ServiceType.HBASE_REGIONSERVER,
117       serverName.getHostname(), serverName.getPort());
118   }
119 
120   @Override
121   public void stopRegionServer(ServerName serverName) throws IOException {
122     LOG.info("Stopping RS: " + serverName.getServerName());
123     clusterManager.stop(ServiceType.HBASE_REGIONSERVER,
124       serverName.getHostname(), serverName.getPort());
125   }
126 
127   @Override
128   public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
129     waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout);
130   }
131 
132   @Override
133   public void startZkNode(String hostname, int port) throws IOException {
134     LOG.info("Starting Zookeeper node on: " + hostname);
135     clusterManager.start(ServiceType.ZOOKEEPER_SERVER, hostname, port);
136   }
137 
138   @Override
139   public void killZkNode(ServerName serverName) throws IOException {
140     LOG.info("Aborting Zookeeper node on: " + serverName.getServerName());
141     clusterManager.kill(ServiceType.ZOOKEEPER_SERVER,
142       serverName.getHostname(), serverName.getPort());
143   }
144 
145   @Override
146   public void stopZkNode(ServerName serverName) throws IOException {
147     LOG.info("Stopping Zookeeper node: " + serverName.getServerName());
148     clusterManager.stop(ServiceType.ZOOKEEPER_SERVER,
149       serverName.getHostname(), serverName.getPort());
150   }
151 
152   @Override
153   public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
154     waitForServiceToStart(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
155   }
156 
157   @Override
158   public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
159     waitForServiceToStop(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
160   }
161 
162   @Override
163   public void startDataNode(ServerName serverName) throws IOException {
164     LOG.info("Starting data node on: " + serverName.getServerName());
165     clusterManager.start(ServiceType.HADOOP_DATANODE,
166       serverName.getHostname(), serverName.getPort());
167   }
168 
169   @Override
170   public void killDataNode(ServerName serverName) throws IOException {
171     LOG.info("Aborting data node on: " + serverName.getServerName());
172     clusterManager.kill(ServiceType.HADOOP_DATANODE,
173       serverName.getHostname(), serverName.getPort());
174   }
175 
176   @Override
177   public void stopDataNode(ServerName serverName) throws IOException {
178     LOG.info("Stopping data node on: " + serverName.getServerName());
179     clusterManager.stop(ServiceType.HADOOP_DATANODE,
180       serverName.getHostname(), serverName.getPort());
181   }
182 
183   @Override
184   public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
185     waitForServiceToStart(ServiceType.HADOOP_DATANODE, serverName, timeout);
186   }
187 
188   @Override
189   public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
190     waitForServiceToStop(ServiceType.HADOOP_DATANODE, serverName, timeout);
191   }
192 
193   private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout)
194     throws IOException {
195     LOG.info("Waiting for service: " + service + " to stop: " + serverName.getServerName());
196     long start = System.currentTimeMillis();
197 
198     while ((System.currentTimeMillis() - start) < timeout) {
199       if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
200         return;
201       }
202       Threads.sleep(100);
203     }
204     throw new IOException("did timeout waiting for service to stop:" + serverName);
205   }
206 
207   private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout)
208     throws IOException {
209     LOG.info("Waiting for service: " + service + " to start: " + serverName.getServerName());
210     long start = System.currentTimeMillis();
211 
212     while ((System.currentTimeMillis() - start) < timeout) {
213       if (clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
214         return;
215       }
216       Threads.sleep(100);
217     }
218     throw new IOException("did timeout waiting for service to start:" + serverName);
219   }
220 
221 
222   @Override
223   public MasterService.BlockingInterface getMasterAdminService()
224   throws IOException {
225     return ((ClusterConnection)this.connection).getMaster();
226   }
227 
228   @Override
229   public void startMaster(String hostname, int port) throws IOException {
230     LOG.info("Starting Master on: " + hostname + ":" + port);
231     clusterManager.start(ServiceType.HBASE_MASTER, hostname, port);
232   }
233 
234   @Override
235   public void killMaster(ServerName serverName) throws IOException {
236     LOG.info("Aborting Master: " + serverName.getServerName());
237     clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
238   }
239 
240   @Override
241   public void stopMaster(ServerName serverName) throws IOException {
242     LOG.info("Stopping Master: " + serverName.getServerName());
243     clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
244   }
245 
246   @Override
247   public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
248     waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout);
249   }
250 
251   @Override
252   public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
253     long start = System.currentTimeMillis();
254     while (System.currentTimeMillis() - start < timeout) {
255       try {
256         getMasterAdminService();
257         return true;
258       } catch (MasterNotRunningException m) {
259         LOG.warn("Master not started yet " + m);
260       } catch (ZooKeeperConnectionException e) {
261         LOG.warn("Failed to connect to ZK " + e);
262       }
263       Threads.sleep(1000);
264     }
265     return false;
266   }
267 
268   @Override
269   public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException {
270     HRegionLocation regionLoc = null;
271     try (RegionLocator locator = connection.getRegionLocator(tn)) {
272       regionLoc = locator.getRegionLocation(regionName);
273     }
274     if (regionLoc == null) {
275       LOG.warn("Cannot find region server holding region " + Bytes.toString(regionName) +
276         ", start key [" + Bytes.toString(HRegionInfo.getStartKey(regionName)) + "]");
277       return null;
278     }
279 
280     AdminProtos.AdminService.BlockingInterface client =
281         ((ClusterConnection)this.connection).getAdmin(regionLoc.getServerName());
282     ServerInfo info = ProtobufUtil.getServerInfo(null, client);
283     return ProtobufUtil.toServerName(info.getServerName());
284   }
285 
286   @Override
287   public void waitUntilShutDown() {
288     // Simply wait for a few seconds for now (after issuing serverManager.kill
289     throw new RuntimeException("Not implemented yet");
290   }
291 
292   @Override
293   public void shutdown() throws IOException {
294     // not sure we want this
295     throw new RuntimeException("Not implemented yet");
296   }
297 
298   @Override
299   public boolean isDistributedCluster() {
300     return true;
301   }
302 
303   @Override
304   public boolean restoreClusterStatus(ClusterStatus initial) throws IOException {
305     ClusterStatus current = getClusterStatus();
306 
307     LOG.info("Restoring cluster - started");
308 
309     // do a best effort restore
310     boolean success = true;
311     success = restoreMasters(initial, current) & success;
312     success = restoreRegionServers(initial, current) & success;
313     success = restoreAdmin() & success;
314 
315     LOG.info("Restoring cluster - done");
316     return success;
317   }
318 
319   protected boolean restoreMasters(ClusterStatus initial, ClusterStatus current) {
320     List<IOException> deferred = new ArrayList<IOException>();
321     //check whether current master has changed
322     final ServerName initMaster = initial.getMaster();
323     if (!ServerName.isSameHostnameAndPort(initMaster, current.getMaster())) {
324       LOG.info("Restoring cluster - Initial active master : "
325               + initMaster.getHostAndPort()
326               + " has changed to : "
327               + current.getMaster().getHostAndPort());
328       // If initial master is stopped, start it, before restoring the state.
329       // It will come up as a backup master, if there is already an active master.
330       try {
331         if (!clusterManager.isRunning(ServiceType.HBASE_MASTER,
332                 initMaster.getHostname(), initMaster.getPort())) {
333           LOG.info("Restoring cluster - starting initial active master at:"
334                   + initMaster.getHostAndPort());
335           startMaster(initMaster.getHostname(), initMaster.getPort());
336         }
337 
338         // master has changed, we would like to undo this.
339         // 1. Kill the current backups
340         // 2. Stop current master
341         // 3. Start backup masters
342         for (ServerName currentBackup : current.getBackupMasters()) {
343           if (!ServerName.isSameHostnameAndPort(currentBackup, initMaster)) {
344             LOG.info("Restoring cluster - stopping backup master: " + currentBackup);
345             stopMaster(currentBackup);
346           }
347         }
348         LOG.info("Restoring cluster - stopping active master: " + current.getMaster());
349         stopMaster(current.getMaster());
350         waitForActiveAndReadyMaster(); // wait so that active master takes over
351       } catch (IOException ex) {
352         // if we fail to start the initial active master, we do not want to continue stopping
353         // backup masters. Just keep what we have now
354         deferred.add(ex);
355       }
356 
357       //start backup masters
358       for (ServerName backup : initial.getBackupMasters()) {
359         try {
360           //these are not started in backup mode, but we should already have an active master
361           if (!clusterManager.isRunning(ServiceType.HBASE_MASTER,
362                   backup.getHostname(),
363                   backup.getPort())) {
364             LOG.info("Restoring cluster - starting initial backup master: "
365                     + backup.getHostAndPort());
366             startMaster(backup.getHostname(), backup.getPort());
367           }
368         } catch (IOException ex) {
369           deferred.add(ex);
370         }
371       }
372     } else {
373       //current master has not changed, match up backup masters
374       Set<ServerName> toStart = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
375       Set<ServerName> toKill = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
376       toStart.addAll(initial.getBackupMasters());
377       toKill.addAll(current.getBackupMasters());
378 
379       for (ServerName server : current.getBackupMasters()) {
380         toStart.remove(server);
381       }
382       for (ServerName server: initial.getBackupMasters()) {
383         toKill.remove(server);
384       }
385 
386       for (ServerName sn:toStart) {
387         try {
388           if(!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
389             LOG.info("Restoring cluster - starting initial backup master: " + sn.getHostAndPort());
390             startMaster(sn.getHostname(), sn.getPort());
391           }
392         } catch (IOException ex) {
393           deferred.add(ex);
394         }
395       }
396 
397       for (ServerName sn:toKill) {
398         try {
399           if(clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
400             LOG.info("Restoring cluster - stopping backup master: " + sn.getHostAndPort());
401             stopMaster(sn);
402           }
403         } catch (IOException ex) {
404           deferred.add(ex);
405         }
406       }
407     }
408     if (!deferred.isEmpty()) {
409       LOG.warn("Restoring cluster - restoring region servers reported "
410               + deferred.size() + " errors:");
411       for (int i=0; i<deferred.size() && i < 3; i++) {
412         LOG.warn(deferred.get(i));
413       }
414     }
415 
416     return deferred.isEmpty();
417   }
418 
419 
420   private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> {
421     @Override
422     public int compare(ServerName o1, ServerName o2) {
423       int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname());
424       if (compare != 0) return compare;
425       compare = o1.getPort() - o2.getPort();
426       if (compare != 0) return compare;
427       return 0;
428     }
429   }
430 
431   protected boolean restoreRegionServers(ClusterStatus initial, ClusterStatus current) {
432     Set<ServerName> toStart = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
433     Set<ServerName> toKill = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
434     toStart.addAll(initial.getServers());
435     toKill.addAll(current.getServers());
436 
437     for (ServerName server : current.getServers()) {
438       toStart.remove(server);
439     }
440     for (ServerName server: initial.getServers()) {
441       toKill.remove(server);
442     }
443 
444     List<IOException> deferred = new ArrayList<IOException>();
445 
446     for(ServerName sn:toStart) {
447       try {
448         if (!clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER,
449                 sn.getHostname(),
450                 sn.getPort())) {
451           LOG.info("Restoring cluster - starting initial region server: " + sn.getHostAndPort());
452           startRegionServer(sn.getHostname(), sn.getPort());
453         }
454       } catch (IOException ex) {
455         deferred.add(ex);
456       }
457     }
458 
459     for(ServerName sn:toKill) {
460       try {
461         if (clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER,
462                 sn.getHostname(),
463                 sn.getPort())) {
464           LOG.info("Restoring cluster - stopping initial region server: " + sn.getHostAndPort());
465           stopRegionServer(sn);
466         }
467       } catch (IOException ex) {
468         deferred.add(ex);
469       }
470     }
471     if (!deferred.isEmpty()) {
472       LOG.warn("Restoring cluster - restoring region servers reported "
473               + deferred.size() + " errors:");
474       for (int i=0; i<deferred.size() && i < 3; i++) {
475         LOG.warn(deferred.get(i));
476       }
477     }
478 
479     return deferred.isEmpty();
480   }
481 
482   protected boolean restoreAdmin() throws IOException {
483     // While restoring above, if the HBase Master which was initially the Active one, was down
484     // and the restore put the cluster back to Initial configuration, HAdmin instance will need
485     // to refresh its connections (otherwise it will return incorrect information) or we can
486     // point it to new instance.
487     try {
488       admin.close();
489     } catch (IOException ioe) {
490       LOG.warn("While closing the old connection", ioe);
491     }
492     this.admin = this.connection.getAdmin();
493     LOG.info("Added new HBaseAdmin");
494     return true;
495   }
496 }