View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.List;
24  
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.ClusterManager.ServiceType;
28  import org.apache.hadoop.hbase.client.HBaseAdmin;
29  import org.apache.hadoop.hbase.client.HConnection;
30  import org.apache.hadoop.hbase.client.HConnectionManager;
31  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
32  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
33  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
34  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
35  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.Threads;
38  
39  import com.google.common.collect.Sets;
40  
41  /**
42   * Manages the interactions with an already deployed distributed cluster (as opposed to
43   * a pseudo-distributed, or mini/local cluster). This is used by integration and system tests.
44   */
45  @InterfaceAudience.Private
46  public class DistributedHBaseCluster extends HBaseCluster {
47  
48    private HBaseAdmin admin;
49  
50    private ClusterManager clusterManager;
51  
52    public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager)
53        throws IOException {
54      super(conf);
55      this.clusterManager = clusterManager;
56      this.admin = new HBaseAdmin(conf);
57      this.initialClusterStatus = getClusterStatus();
58    }
59  
60    public void setClusterManager(ClusterManager clusterManager) {
61      this.clusterManager = clusterManager;
62    }
63  
64    public ClusterManager getClusterManager() {
65      return clusterManager;
66    }
67  
68    /**
69     * Returns a ClusterStatus for this HBase cluster
70     * @throws IOException
71     */
72    @Override
73    public ClusterStatus getClusterStatus() throws IOException {
74      return admin.getClusterStatus();
75    }
76  
77    @Override
78    public ClusterStatus getInitialClusterStatus() throws IOException {
79      return initialClusterStatus;
80    }
81  
82    @Override
83    public void close() throws IOException {
84      if (this.admin != null) {
85        admin.close();
86      }
87    }
88  
89    @Override
90    public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName)
91    throws IOException {
92      return admin.getConnection().getAdmin(serverName);
93    }
94  
95    @Override
96    public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName)
97    throws IOException {
98      return admin.getConnection().getClient(serverName);
99    }
100 
101   @Override
102   public void startRegionServer(String hostname) throws IOException {
103     LOG.info("Starting RS on: " + hostname);
104     clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname);
105   }
106 
107   @Override
108   public void killRegionServer(ServerName serverName) throws IOException {
109     LOG.info("Aborting RS: " + serverName.getServerName());
110     clusterManager.kill(ServiceType.HBASE_REGIONSERVER, serverName.getHostname());
111   }
112 
113   @Override
114   public void stopRegionServer(ServerName serverName) throws IOException {
115     LOG.info("Stopping RS: " + serverName.getServerName());
116     clusterManager.stop(ServiceType.HBASE_REGIONSERVER, serverName.getHostname());
117   }
118 
119   @Override
120   public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
121     waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout);
122   }
123 
124   private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout)
125     throws IOException {
126     LOG.info("Waiting service:" + service + " to stop: " + serverName.getServerName());
127     long start = System.currentTimeMillis();
128 
129     while ((System.currentTimeMillis() - start) < timeout) {
130       if (!clusterManager.isRunning(service, serverName.getHostname())) {
131         return;
132       }
133       Threads.sleep(1000);
134     }
135     throw new IOException("did timeout waiting for service to stop:" + serverName);
136   }
137 
138   @Override
139   public MasterService.BlockingInterface getMaster()
140   throws IOException {
141     HConnection conn = HConnectionManager.getConnection(conf);
142     return conn.getMaster();
143   }
144 
145   @Override
146   public void startMaster(String hostname) throws IOException {
147     LOG.info("Starting Master on: " + hostname);
148     clusterManager.start(ServiceType.HBASE_MASTER, hostname);
149   }
150 
151   @Override
152   public void killMaster(ServerName serverName) throws IOException {
153     LOG.info("Aborting Master: " + serverName.getServerName());
154     clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname());
155   }
156 
157   @Override
158   public void stopMaster(ServerName serverName) throws IOException {
159     LOG.info("Stopping Master: " + serverName.getServerName());
160     clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname());
161   }
162 
163   @Override
164   public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
165     waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout);
166   }
167 
168   @Override
169   public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
170     long start = System.currentTimeMillis();
171     while (System.currentTimeMillis() - start < timeout) {
172       try {
173         getMaster();
174         return true;
175       } catch (MasterNotRunningException m) {
176         LOG.warn("Master not started yet " + m);
177       } catch (ZooKeeperConnectionException e) {
178         LOG.warn("Failed to connect to ZK " + e);
179       }
180       Threads.sleep(1000);
181     }
182     return false;
183   }
184 
185   @Override
186   public ServerName getServerHoldingRegion(byte[] regionName) throws IOException {
187     HConnection connection = admin.getConnection();
188     HRegionLocation regionLoc = connection.locateRegion(regionName);
189     if (regionLoc == null) {
190       LOG.warn("Cannot find region server holding region " + Bytes.toString(regionName)
191           + " for table " + HRegionInfo.getTableName(regionName) + ", start key [" +
192           Bytes.toString(HRegionInfo.getStartKey(regionName)) + "]");
193       return null;
194     }
195 
196     AdminProtos.AdminService.BlockingInterface client =
197       connection.getAdmin(regionLoc.getServerName());
198     ServerInfo info = ProtobufUtil.getServerInfo(client);
199     return ProtobufUtil.toServerName(info.getServerName());
200   }
201 
202   @Override
203   public void waitUntilShutDown() {
204     //Simply wait for a few seconds for now (after issuing serverManager.kill
205     throw new RuntimeException("Not implemented yet");
206   }
207 
208   @Override
209   public void shutdown() throws IOException {
210     //not sure we want this
211     throw new RuntimeException("Not implemented yet");
212   }
213 
214   @Override
215   public boolean isDistributedCluster() {
216     return true;
217   }
218 
219   @Override
220   public boolean restoreClusterStatus(ClusterStatus initial) throws IOException {
221     ClusterStatus current = getClusterStatus();
222 
223     LOG.info("Restoring cluster - started");
224 
225     // do a best effort restore
226     boolean success = true;
227     success = restoreMasters(initial, current) & success;
228     success = restoreRegionServers(initial, current) & success;
229     success = restoreAdmin() & success;
230 
231     LOG.info("Restoring cluster - done");
232     return success;
233   }
234 
235   protected boolean restoreMasters(ClusterStatus initial, ClusterStatus current) {
236     List<IOException> deferred = new ArrayList<IOException>();
237     //check whether current master has changed
238     if (!ServerName.isSameHostnameAndPort(initial.getMaster(), current.getMaster())) {
239       LOG.info("Restoring cluster - Initial active master : " + initial.getMaster().getHostname()
240           + " has changed to : " + current.getMaster().getHostname());
241       // If initial master is stopped, start it, before restoring the state.
242       // It will come up as a backup master, if there is already an active master.
243       try {
244         if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, initial.getMaster().getHostname())) {
245           LOG.info("Restoring cluster - starting initial active master at:" + initial.getMaster().getHostname());
246           startMaster(initial.getMaster().getHostname());
247         }
248 
249         //master has changed, we would like to undo this.
250         //1. Kill the current backups
251         //2. Stop current master
252         //3. Start backup masters
253         for (ServerName currentBackup : current.getBackupMasters()) {
254           if (!ServerName.isSameHostnameAndPort(currentBackup, initial.getMaster())) {
255             LOG.info("Restoring cluster - stopping backup master: " + currentBackup);
256             stopMaster(currentBackup);
257           }
258         }
259         LOG.info("Restoring cluster - stopping active master: " + current.getMaster());
260         stopMaster(current.getMaster());
261         waitForActiveAndReadyMaster(); //wait so that active master takes over
262       } catch (IOException ex) {
263         // if we fail to start the initial active master, we do not want to continue stopping
264         // backup masters. Just keep what we have now
265         deferred.add(ex);
266       }
267 
268       //start backup masters
269       for (ServerName backup : initial.getBackupMasters()) {
270         try {
271           //these are not started in backup mode, but we should already have an active master
272           if(!clusterManager.isRunning(ServiceType.HBASE_MASTER, backup.getHostname())) {
273             LOG.info("Restoring cluster - starting initial backup master: " + backup.getHostname());
274             startMaster(backup.getHostname());
275           }
276         } catch (IOException ex) {
277           deferred.add(ex);
278         }
279       }
280     } else {
281       //current master has not changed, match up backup masters
282       HashMap<String, ServerName> initialBackups = new HashMap<String, ServerName>();
283       HashMap<String, ServerName> currentBackups = new HashMap<String, ServerName>();
284 
285       for (ServerName server : initial.getBackupMasters()) {
286         initialBackups.put(server.getHostname(), server);
287       }
288       for (ServerName server : current.getBackupMasters()) {
289         currentBackups.put(server.getHostname(), server);
290       }
291 
292       for (String hostname : Sets.difference(initialBackups.keySet(), currentBackups.keySet())) {
293         try {
294           if(!clusterManager.isRunning(ServiceType.HBASE_MASTER, hostname)) {
295             LOG.info("Restoring cluster - starting initial backup master: " + hostname);
296             startMaster(hostname);
297           }
298         } catch (IOException ex) {
299           deferred.add(ex);
300         }
301       }
302 
303       for (String hostname : Sets.difference(currentBackups.keySet(), initialBackups.keySet())) {
304         try {
305           if(clusterManager.isRunning(ServiceType.HBASE_MASTER, hostname)) {
306             LOG.info("Restoring cluster - stopping backup master: " + hostname);
307             stopMaster(currentBackups.get(hostname));
308           }
309         } catch (IOException ex) {
310           deferred.add(ex);
311         }
312       }
313     }
314     if (!deferred.isEmpty()) {
315       LOG.warn("Restoring cluster - restoring region servers reported " + deferred.size() + " errors:");
316       for (int i=0; i<deferred.size() && i < 3; i++) {
317         LOG.warn(deferred.get(i));
318       }
319     }
320 
321     return deferred.isEmpty();
322   }
323 
324   protected boolean restoreRegionServers(ClusterStatus initial, ClusterStatus current) {
325     HashMap<String, ServerName> initialServers = new HashMap<String, ServerName>();
326     HashMap<String, ServerName> currentServers = new HashMap<String, ServerName>();
327 
328     for (ServerName server : initial.getServers()) {
329       initialServers.put(server.getHostname(), server);
330     }
331     for (ServerName server : current.getServers()) {
332       currentServers.put(server.getHostname(), server);
333     }
334 
335     List<IOException> deferred = new ArrayList<IOException>();
336     for (String hostname : Sets.difference(initialServers.keySet(), currentServers.keySet())) {
337       try {
338         if(!clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, hostname)) {
339           LOG.info("Restoring cluster - starting initial region server: " + hostname);
340           startRegionServer(hostname);
341         }
342       } catch (IOException ex) {
343         deferred.add(ex);
344       }
345     }
346 
347     for (String hostname : Sets.difference(currentServers.keySet(), initialServers.keySet())) {
348       try {
349         if(clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, hostname)) {
350           LOG.info("Restoring cluster - stopping initial region server: " + hostname);
351           stopRegionServer(currentServers.get(hostname));
352         }
353       } catch (IOException ex) {
354         deferred.add(ex);
355       }
356     }
357     if (!deferred.isEmpty()) {
358       LOG.warn("Restoring cluster - restoring region servers reported " + deferred.size() + " errors:");
359       for (int i=0; i<deferred.size() && i < 3; i++) {
360         LOG.warn(deferred.get(i));
361       }
362     }
363 
364     return deferred.isEmpty();
365   }
366 
367   protected boolean restoreAdmin() throws IOException {
368     // While restoring above, if the HBase Master which was initially the Active one, was down
369     // and the restore put the cluster back to Initial configuration, HAdmin instance will need
370     // to refresh its connections (otherwise it will return incorrect information) or we can
371     // point it to new instance.
372     try {
373       admin.close();
374     } catch (IOException ioe) {
375       LOG.warn("While closing the old connection", ioe);
376     }
377     this.admin = new HBaseAdmin(conf);
378     LOG.info("Added new HBaseAdmin");
379     return true;
380   }
381 }