View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase;
20  
21  import java.io.File;
22  import java.io.IOException;
23  import java.util.Map;
24  
25  import org.apache.commons.lang.StringUtils;
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation;
29  import org.apache.hadoop.hbase.util.Pair;
30  import org.apache.hadoop.hbase.util.RetryCounter;
31  import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
32  import org.apache.hadoop.hbase.util.RetryCounterFactory;
33  import org.apache.hadoop.util.Shell;
34  
35  /**
36   * A default cluster manager for HBase. Uses SSH, and hbase shell scripts
37   * to manage the cluster. Assumes Unix-like commands are available like 'ps',
38   * 'kill', etc. Also assumes the user running the test has enough "power" to start & stop
39   * servers on the remote machines (for example, the test user could be the same user as the
40   * user the daemon isrunning as)
41   */
42  @InterfaceAudience.Private
43  public class HBaseClusterManager extends ClusterManager {
44    private String sshUserName;
45    private String sshOptions;
46  
47    /**
48     * The command format that is used to execute the remote command. Arguments:
49     * 1 SSH options, 2 user name , 3 "@" if username is set, 4 host, 5 original command.
50     */
51    private static final String DEFAULT_TUNNEL_CMD = "/usr/bin/ssh %1$s %2$s%3$s%4$s \"%5$s\"";
52    private String tunnelCmd;
53  
54    private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
55    private static final int DEFAULT_RETRY_ATTEMPTS = 5;
56  
57    private static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval";
58    private static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000;
59  
60    protected RetryCounterFactory retryCounterFactory;
61  
62    @Override
63    public void setConf(Configuration conf) {
64      super.setConf(conf);
65      if (conf == null) {
66        // Configured gets passed null before real conf. Why? I don't know.
67        return;
68      }
69      sshUserName = conf.get("hbase.it.clustermanager.ssh.user", "");
70      String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", "");
71      sshOptions = System.getenv("HBASE_SSH_OPTS");
72      if (!extraSshOptions.isEmpty()) {
73        sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " ");
74      }
75      sshOptions = (sshOptions == null) ? "" : sshOptions;
76      tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD);
77      // Print out ssh special config if any.
78      if ((sshUserName != null && sshUserName.length() > 0) ||
79          (sshOptions != null && sshOptions.length() > 0)) {
80        LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]");
81      }
82  
83      this.retryCounterFactory = new RetryCounterFactory(new RetryConfig()
84          .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS))
85          .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
86    }
87  
88    /**
89     * Executes commands over SSH
90     */
91    protected class RemoteShell extends Shell.ShellCommandExecutor {
92      private String hostname;
93  
94      public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env,
95          long timeout) {
96        super(execString, dir, env, timeout);
97        this.hostname = hostname;
98      }
99  
100     public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) {
101       super(execString, dir, env);
102       this.hostname = hostname;
103     }
104 
105     public RemoteShell(String hostname, String[] execString, File dir) {
106       super(execString, dir);
107       this.hostname = hostname;
108     }
109 
110     public RemoteShell(String hostname, String[] execString) {
111       super(execString);
112       this.hostname = hostname;
113     }
114 
115     @Override
116     public String[] getExecString() {
117       String at = sshUserName.isEmpty() ? "" : "@";
118       String remoteCmd = StringUtils.join(super.getExecString(), " ");
119       String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd);
120       LOG.info("Executing full command [" + cmd + "]");
121       return new String[] { "/usr/bin/env", "bash", "-c", cmd };
122     }
123 
124     @Override
125     public void execute() throws IOException {
126       super.execute();
127     }
128   }
129 
130   /**
131    * Provides command strings for services to be executed by Shell. CommandProviders are
132    * pluggable, and different deployments(windows, bigtop, etc) can be managed by
133    * plugging-in custom CommandProvider's or ClusterManager's.
134    */
135   static abstract class CommandProvider {
136 
137     enum Operation {
138       START, STOP, RESTART
139     }
140 
141     public abstract String getCommand(ServiceType service, Operation op);
142 
143     public String isRunningCommand(ServiceType service) {
144       return findPidCommand(service);
145     }
146 
147     protected String findPidCommand(ServiceType service) {
148       return String.format("ps aux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
149           service);
150     }
151 
152     public String signalCommand(ServiceType service, String signal) {
153       return String.format("%s | xargs kill -s %s", findPidCommand(service), signal);
154     }
155   }
156 
157   /**
158    * CommandProvider to manage the service using bin/hbase-* scripts
159    */
160   static class HBaseShellCommandProvider extends CommandProvider {
161     private final String hbaseHome;
162     private final String confDir;
163 
164     HBaseShellCommandProvider(Configuration conf) {
165       hbaseHome = conf.get("hbase.it.clustermanager.hbase.home",
166         System.getenv("HBASE_HOME"));
167       String tmp = conf.get("hbase.it.clustermanager.hbase.conf.dir",
168         System.getenv("HBASE_CONF_DIR"));
169       if (tmp != null) {
170         confDir = String.format("--config %s", tmp);
171       } else {
172         confDir = "";
173       }
174     }
175 
176     @Override
177     public String getCommand(ServiceType service, Operation op) {
178       return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir,
179           op.toString().toLowerCase(), service);
180     }
181   }
182 
183   public HBaseClusterManager() {
184     super();
185   }
186 
187   protected CommandProvider getCommandProvider(ServiceType service) {
188     //TODO: make it pluggable, or auto-detect the best command provider, should work with
189     //hadoop daemons as well
190     return new HBaseShellCommandProvider(getConf());
191   }
192 
193   /**
194    * Execute the given command on the host using SSH
195    * @return pair of exit code and command output
196    * @throws IOException if something goes wrong.
197    */
198   private Pair<Integer, String> exec(String hostname, String... cmd) throws IOException {
199     LOG.info("Executing remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname);
200 
201     RemoteShell shell = new RemoteShell(hostname, cmd);
202     try {
203       shell.execute();
204     } catch (Shell.ExitCodeException ex) {
205       // capture the stdout of the process as well.
206       String output = shell.getOutput();
207       // add output for the ExitCodeException.
208       throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage()
209         + ", stdout: " + output);
210     }
211 
212     LOG.info("Executed remote command, exit code:" + shell.getExitCode()
213         + " , output:" + shell.getOutput());
214 
215     return new Pair<Integer, String>(shell.getExitCode(), shell.getOutput());
216   }
217 
218   private Pair<Integer, String> execWithRetries(String hostname, String... cmd)
219       throws IOException {
220     RetryCounter retryCounter = retryCounterFactory.create();
221     while (true) {
222       try {
223         return exec(hostname, cmd);
224       } catch (IOException e) {
225         retryOrThrow(retryCounter, e, hostname, cmd);
226       }
227       try {
228         retryCounter.sleepUntilNextRetry();
229       } catch (InterruptedException ex) {
230         // ignore
231         LOG.warn("Sleep Interrupted:" + ex);
232       }
233     }
234   }
235 
236   private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex,
237       String hostname, String[] cmd) throws E {
238     if (retryCounter.shouldRetry()) {
239       LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname
240         + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: "
241           + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage());
242       return;
243     }
244     throw ex;
245   }
246 
247   private void exec(String hostname, ServiceType service, Operation op) throws IOException {
248     execWithRetries(hostname, getCommandProvider(service).getCommand(service, op));
249   }
250 
251   @Override
252   public void start(ServiceType service, String hostname) throws IOException {
253     exec(hostname, service, Operation.START);
254   }
255 
256   @Override
257   public void stop(ServiceType service, String hostname) throws IOException {
258     exec(hostname, service, Operation.STOP);
259   }
260 
261   @Override
262   public void restart(ServiceType service, String hostname) throws IOException {
263     exec(hostname, service, Operation.RESTART);
264   }
265 
266   @Override
267   public void signal(ServiceType service, String signal, String hostname) throws IOException {
268     execWithRetries(hostname, getCommandProvider(service).signalCommand(service, signal));
269   }
270 
271   @Override
272   public boolean isRunning(ServiceType service, String hostname) throws IOException {
273     String ret = execWithRetries(hostname, getCommandProvider(service).isRunningCommand(service))
274         .getSecond();
275     return ret.length() > 0;
276   }
277 
278 }