1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.util.Map;
24
25 import org.apache.commons.lang.StringUtils;
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation;
29 import org.apache.hadoop.hbase.util.Pair;
30 import org.apache.hadoop.hbase.util.RetryCounter;
31 import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
32 import org.apache.hadoop.hbase.util.RetryCounterFactory;
33 import org.apache.hadoop.util.Shell;
34
35
36
37
38
39
40
41
42 @InterfaceAudience.Private
43 public class HBaseClusterManager extends ClusterManager {
44 private String sshUserName;
45 private String sshOptions;
46
47
48
49
50
51 private static final String DEFAULT_TUNNEL_CMD = "/usr/bin/ssh %1$s %2$s%3$s%4$s \"%5$s\"";
52 private String tunnelCmd;
53
54 private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
55 private static final int DEFAULT_RETRY_ATTEMPTS = 5;
56
57 private static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval";
58 private static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000;
59
60 protected RetryCounterFactory retryCounterFactory;
61
62 @Override
63 public void setConf(Configuration conf) {
64 super.setConf(conf);
65 if (conf == null) {
66
67 return;
68 }
69 sshUserName = conf.get("hbase.it.clustermanager.ssh.user", "");
70 String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", "");
71 sshOptions = System.getenv("HBASE_SSH_OPTS");
72 if (!extraSshOptions.isEmpty()) {
73 sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " ");
74 }
75 sshOptions = (sshOptions == null) ? "" : sshOptions;
76 tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD);
77
78 if ((sshUserName != null && sshUserName.length() > 0) ||
79 (sshOptions != null && sshOptions.length() > 0)) {
80 LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]");
81 }
82
83 this.retryCounterFactory = new RetryCounterFactory(new RetryConfig()
84 .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS))
85 .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
86 }
87
88
89
90
91 protected class RemoteShell extends Shell.ShellCommandExecutor {
92 private String hostname;
93
94 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env,
95 long timeout) {
96 super(execString, dir, env, timeout);
97 this.hostname = hostname;
98 }
99
100 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) {
101 super(execString, dir, env);
102 this.hostname = hostname;
103 }
104
105 public RemoteShell(String hostname, String[] execString, File dir) {
106 super(execString, dir);
107 this.hostname = hostname;
108 }
109
110 public RemoteShell(String hostname, String[] execString) {
111 super(execString);
112 this.hostname = hostname;
113 }
114
115 @Override
116 public String[] getExecString() {
117 String at = sshUserName.isEmpty() ? "" : "@";
118 String remoteCmd = StringUtils.join(super.getExecString(), " ");
119 String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd);
120 LOG.info("Executing full command [" + cmd + "]");
121 return new String[] { "/usr/bin/env", "bash", "-c", cmd };
122 }
123
124 @Override
125 public void execute() throws IOException {
126 super.execute();
127 }
128 }
129
130
131
132
133
134
135 static abstract class CommandProvider {
136
137 enum Operation {
138 START, STOP, RESTART
139 }
140
141 public abstract String getCommand(ServiceType service, Operation op);
142
143 public String isRunningCommand(ServiceType service) {
144 return findPidCommand(service);
145 }
146
147 protected String findPidCommand(ServiceType service) {
148 return String.format("ps aux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
149 service);
150 }
151
152 public String signalCommand(ServiceType service, String signal) {
153 return String.format("%s | xargs kill -s %s", findPidCommand(service), signal);
154 }
155 }
156
157
158
159
160 static class HBaseShellCommandProvider extends CommandProvider {
161 private final String hbaseHome;
162 private final String confDir;
163
164 HBaseShellCommandProvider(Configuration conf) {
165 hbaseHome = conf.get("hbase.it.clustermanager.hbase.home",
166 System.getenv("HBASE_HOME"));
167 String tmp = conf.get("hbase.it.clustermanager.hbase.conf.dir",
168 System.getenv("HBASE_CONF_DIR"));
169 if (tmp != null) {
170 confDir = String.format("--config %s", tmp);
171 } else {
172 confDir = "";
173 }
174 }
175
176 @Override
177 public String getCommand(ServiceType service, Operation op) {
178 return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir,
179 op.toString().toLowerCase(), service);
180 }
181 }
182
183 public HBaseClusterManager() {
184 super();
185 }
186
187 protected CommandProvider getCommandProvider(ServiceType service) {
188
189
190 return new HBaseShellCommandProvider(getConf());
191 }
192
193
194
195
196
197
198 private Pair<Integer, String> exec(String hostname, String... cmd) throws IOException {
199 LOG.info("Executing remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname);
200
201 RemoteShell shell = new RemoteShell(hostname, cmd);
202 try {
203 shell.execute();
204 } catch (Shell.ExitCodeException ex) {
205
206 String output = shell.getOutput();
207
208 throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage()
209 + ", stdout: " + output);
210 }
211
212 LOG.info("Executed remote command, exit code:" + shell.getExitCode()
213 + " , output:" + shell.getOutput());
214
215 return new Pair<Integer, String>(shell.getExitCode(), shell.getOutput());
216 }
217
218 private Pair<Integer, String> execWithRetries(String hostname, String... cmd)
219 throws IOException {
220 RetryCounter retryCounter = retryCounterFactory.create();
221 while (true) {
222 try {
223 return exec(hostname, cmd);
224 } catch (IOException e) {
225 retryOrThrow(retryCounter, e, hostname, cmd);
226 }
227 try {
228 retryCounter.sleepUntilNextRetry();
229 } catch (InterruptedException ex) {
230
231 LOG.warn("Sleep Interrupted:" + ex);
232 }
233 }
234 }
235
236 private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex,
237 String hostname, String[] cmd) throws E {
238 if (retryCounter.shouldRetry()) {
239 LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname
240 + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: "
241 + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage());
242 return;
243 }
244 throw ex;
245 }
246
247 private void exec(String hostname, ServiceType service, Operation op) throws IOException {
248 execWithRetries(hostname, getCommandProvider(service).getCommand(service, op));
249 }
250
251 @Override
252 public void start(ServiceType service, String hostname) throws IOException {
253 exec(hostname, service, Operation.START);
254 }
255
256 @Override
257 public void stop(ServiceType service, String hostname) throws IOException {
258 exec(hostname, service, Operation.STOP);
259 }
260
261 @Override
262 public void restart(ServiceType service, String hostname) throws IOException {
263 exec(hostname, service, Operation.RESTART);
264 }
265
266 @Override
267 public void signal(ServiceType service, String signal, String hostname) throws IOException {
268 execWithRetries(hostname, getCommandProvider(service).signalCommand(service, signal));
269 }
270
271 @Override
272 public boolean isRunning(ServiceType service, String hostname) throws IOException {
273 String ret = execWithRetries(hostname, getCommandProvider(service).isRunningCommand(service))
274 .getSecond();
275 return ret.length() > 0;
276 }
277
278 }