1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24
25 import org.apache.hadoop.classification.InterfaceAudience;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.ClusterManager.ServiceType;
28 import org.apache.hadoop.hbase.client.HBaseAdmin;
29 import org.apache.hadoop.hbase.client.HConnection;
30 import org.apache.hadoop.hbase.client.HConnectionManager;
31 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
32 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
33 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
34 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
35 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.Threads;
38
39 import com.google.common.collect.Sets;
40
41
42
43
44
45 @InterfaceAudience.Private
46 public class DistributedHBaseCluster extends HBaseCluster {
47
48 private HBaseAdmin admin;
49
50 private ClusterManager clusterManager;
51
52 public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager)
53 throws IOException {
54 super(conf);
55 this.clusterManager = clusterManager;
56 this.admin = new HBaseAdmin(conf);
57 this.initialClusterStatus = getClusterStatus();
58 }
59
60 public void setClusterManager(ClusterManager clusterManager) {
61 this.clusterManager = clusterManager;
62 }
63
64 public ClusterManager getClusterManager() {
65 return clusterManager;
66 }
67
68
69
70
71
72 @Override
73 public ClusterStatus getClusterStatus() throws IOException {
74 return admin.getClusterStatus();
75 }
76
77 @Override
78 public ClusterStatus getInitialClusterStatus() throws IOException {
79 return initialClusterStatus;
80 }
81
82 @Override
83 public void close() throws IOException {
84 if (this.admin != null) {
85 admin.close();
86 }
87 }
88
89 @Override
90 public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName)
91 throws IOException {
92 return admin.getConnection().getAdmin(serverName);
93 }
94
95 @Override
96 public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName)
97 throws IOException {
98 return admin.getConnection().getClient(serverName);
99 }
100
101 @Override
102 public void startRegionServer(String hostname) throws IOException {
103 LOG.info("Starting RS on: " + hostname);
104 clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname);
105 }
106
107 @Override
108 public void killRegionServer(ServerName serverName) throws IOException {
109 LOG.info("Aborting RS: " + serverName.getServerName());
110 clusterManager.kill(ServiceType.HBASE_REGIONSERVER, serverName.getHostname());
111 }
112
113 @Override
114 public void stopRegionServer(ServerName serverName) throws IOException {
115 LOG.info("Stopping RS: " + serverName.getServerName());
116 clusterManager.stop(ServiceType.HBASE_REGIONSERVER, serverName.getHostname());
117 }
118
119 @Override
120 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
121 waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout);
122 }
123
124 private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout)
125 throws IOException {
126 LOG.info("Waiting service:" + service + " to stop: " + serverName.getServerName());
127 long start = System.currentTimeMillis();
128
129 while ((System.currentTimeMillis() - start) < timeout) {
130 if (!clusterManager.isRunning(service, serverName.getHostname())) {
131 return;
132 }
133 Threads.sleep(1000);
134 }
135 throw new IOException("did timeout waiting for service to stop:" + serverName);
136 }
137
138 @Override
139 public MasterService.BlockingInterface getMaster()
140 throws IOException {
141 HConnection conn = HConnectionManager.getConnection(conf);
142 return conn.getMaster();
143 }
144
145 @Override
146 public void startMaster(String hostname) throws IOException {
147 LOG.info("Starting Master on: " + hostname);
148 clusterManager.start(ServiceType.HBASE_MASTER, hostname);
149 }
150
151 @Override
152 public void killMaster(ServerName serverName) throws IOException {
153 LOG.info("Aborting Master: " + serverName.getServerName());
154 clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname());
155 }
156
157 @Override
158 public void stopMaster(ServerName serverName) throws IOException {
159 LOG.info("Stopping Master: " + serverName.getServerName());
160 clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname());
161 }
162
163 @Override
164 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
165 waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout);
166 }
167
168 @Override
169 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
170 long start = System.currentTimeMillis();
171 while (System.currentTimeMillis() - start < timeout) {
172 try {
173 getMaster();
174 return true;
175 } catch (MasterNotRunningException m) {
176 LOG.warn("Master not started yet " + m);
177 } catch (ZooKeeperConnectionException e) {
178 LOG.warn("Failed to connect to ZK " + e);
179 }
180 Threads.sleep(1000);
181 }
182 return false;
183 }
184
185 @Override
186 public ServerName getServerHoldingRegion(byte[] regionName) throws IOException {
187 HConnection connection = admin.getConnection();
188 HRegionLocation regionLoc = connection.locateRegion(regionName);
189 if (regionLoc == null) {
190 LOG.warn("Cannot find region server holding region " + Bytes.toString(regionName)
191 + " for table " + HRegionInfo.getTableName(regionName) + ", start key [" +
192 Bytes.toString(HRegionInfo.getStartKey(regionName)) + "]");
193 return null;
194 }
195
196 AdminProtos.AdminService.BlockingInterface client =
197 connection.getAdmin(regionLoc.getServerName());
198 ServerInfo info = ProtobufUtil.getServerInfo(client);
199 return ProtobufUtil.toServerName(info.getServerName());
200 }
201
202 @Override
203 public void waitUntilShutDown() {
204
205 throw new RuntimeException("Not implemented yet");
206 }
207
208 @Override
209 public void shutdown() throws IOException {
210
211 throw new RuntimeException("Not implemented yet");
212 }
213
214 @Override
215 public boolean isDistributedCluster() {
216 return true;
217 }
218
219 @Override
220 public boolean restoreClusterStatus(ClusterStatus initial) throws IOException {
221 ClusterStatus current = getClusterStatus();
222
223 LOG.info("Restoring cluster - started");
224
225
226 boolean success = true;
227 success = restoreMasters(initial, current) & success;
228 success = restoreRegionServers(initial, current) & success;
229 success = restoreAdmin() & success;
230
231 LOG.info("Restoring cluster - done");
232 return success;
233 }
234
235 protected boolean restoreMasters(ClusterStatus initial, ClusterStatus current) {
236 List<IOException> deferred = new ArrayList<IOException>();
237
238 if (!ServerName.isSameHostnameAndPort(initial.getMaster(), current.getMaster())) {
239 LOG.info("Restoring cluster - Initial active master : " + initial.getMaster().getHostname()
240 + " has changed to : " + current.getMaster().getHostname());
241
242
243 try {
244 if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, initial.getMaster().getHostname())) {
245 LOG.info("Restoring cluster - starting initial active master at:" + initial.getMaster().getHostname());
246 startMaster(initial.getMaster().getHostname());
247 }
248
249
250
251
252
253 for (ServerName currentBackup : current.getBackupMasters()) {
254 if (!ServerName.isSameHostnameAndPort(currentBackup, initial.getMaster())) {
255 LOG.info("Restoring cluster - stopping backup master: " + currentBackup);
256 stopMaster(currentBackup);
257 }
258 }
259 LOG.info("Restoring cluster - stopping active master: " + current.getMaster());
260 stopMaster(current.getMaster());
261 waitForActiveAndReadyMaster();
262 } catch (IOException ex) {
263
264
265 deferred.add(ex);
266 }
267
268
269 for (ServerName backup : initial.getBackupMasters()) {
270 try {
271
272 if(!clusterManager.isRunning(ServiceType.HBASE_MASTER, backup.getHostname())) {
273 LOG.info("Restoring cluster - starting initial backup master: " + backup.getHostname());
274 startMaster(backup.getHostname());
275 }
276 } catch (IOException ex) {
277 deferred.add(ex);
278 }
279 }
280 } else {
281
282 HashMap<String, ServerName> initialBackups = new HashMap<String, ServerName>();
283 HashMap<String, ServerName> currentBackups = new HashMap<String, ServerName>();
284
285 for (ServerName server : initial.getBackupMasters()) {
286 initialBackups.put(server.getHostname(), server);
287 }
288 for (ServerName server : current.getBackupMasters()) {
289 currentBackups.put(server.getHostname(), server);
290 }
291
292 for (String hostname : Sets.difference(initialBackups.keySet(), currentBackups.keySet())) {
293 try {
294 if(!clusterManager.isRunning(ServiceType.HBASE_MASTER, hostname)) {
295 LOG.info("Restoring cluster - starting initial backup master: " + hostname);
296 startMaster(hostname);
297 }
298 } catch (IOException ex) {
299 deferred.add(ex);
300 }
301 }
302
303 for (String hostname : Sets.difference(currentBackups.keySet(), initialBackups.keySet())) {
304 try {
305 if(clusterManager.isRunning(ServiceType.HBASE_MASTER, hostname)) {
306 LOG.info("Restoring cluster - stopping backup master: " + hostname);
307 stopMaster(currentBackups.get(hostname));
308 }
309 } catch (IOException ex) {
310 deferred.add(ex);
311 }
312 }
313 }
314 if (!deferred.isEmpty()) {
315 LOG.warn("Restoring cluster - restoring region servers reported " + deferred.size() + " errors:");
316 for (int i=0; i<deferred.size() && i < 3; i++) {
317 LOG.warn(deferred.get(i));
318 }
319 }
320
321 return deferred.isEmpty();
322 }
323
324 protected boolean restoreRegionServers(ClusterStatus initial, ClusterStatus current) {
325 HashMap<String, ServerName> initialServers = new HashMap<String, ServerName>();
326 HashMap<String, ServerName> currentServers = new HashMap<String, ServerName>();
327
328 for (ServerName server : initial.getServers()) {
329 initialServers.put(server.getHostname(), server);
330 }
331 for (ServerName server : current.getServers()) {
332 currentServers.put(server.getHostname(), server);
333 }
334
335 List<IOException> deferred = new ArrayList<IOException>();
336 for (String hostname : Sets.difference(initialServers.keySet(), currentServers.keySet())) {
337 try {
338 if(!clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, hostname)) {
339 LOG.info("Restoring cluster - starting initial region server: " + hostname);
340 startRegionServer(hostname);
341 }
342 } catch (IOException ex) {
343 deferred.add(ex);
344 }
345 }
346
347 for (String hostname : Sets.difference(currentServers.keySet(), initialServers.keySet())) {
348 try {
349 if(clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, hostname)) {
350 LOG.info("Restoring cluster - stopping initial region server: " + hostname);
351 stopRegionServer(currentServers.get(hostname));
352 }
353 } catch (IOException ex) {
354 deferred.add(ex);
355 }
356 }
357 if (!deferred.isEmpty()) {
358 LOG.warn("Restoring cluster - restoring region servers reported " + deferred.size() + " errors:");
359 for (int i=0; i<deferred.size() && i < 3; i++) {
360 LOG.warn(deferred.get(i));
361 }
362 }
363
364 return deferred.isEmpty();
365 }
366
367 protected boolean restoreAdmin() throws IOException {
368
369
370
371
372 try {
373 admin.close();
374 } catch (IOException ioe) {
375 LOG.warn("While closing the old connection", ioe);
376 }
377 this.admin = new HBaseAdmin(conf);
378 LOG.info("Added new HBaseAdmin");
379 return true;
380 }
381 }