View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase;
20  
21  import java.io.File;
22  import java.io.IOException;
23  import java.util.Map;
24  
25  import org.apache.commons.lang.StringUtils;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.conf.Configured;
31  import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation;
32  import org.apache.hadoop.hbase.util.Pair;
33  import org.apache.hadoop.hbase.util.RetryCounter;
34  import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
35  import org.apache.hadoop.hbase.util.RetryCounterFactory;
36  import org.apache.hadoop.util.Shell;
37  
38  /**
39   * A default cluster manager for HBase. Uses SSH, and hbase shell scripts
40   * to manage the cluster. Assumes Unix-like commands are available like 'ps',
41   * 'kill', etc. Also assumes the user running the test has enough "power" to start & stop
42   * servers on the remote machines (for example, the test user could be the same user as the
43   * user the daemon is running as)
44   */
45  @InterfaceAudience.Private
46  public class HBaseClusterManager extends Configured implements ClusterManager {
47    private static final String SIGKILL = "SIGKILL";
48    private static final String SIGSTOP = "SIGSTOP";
49    private static final String SIGCONT = "SIGCONT";
50  
51    protected static final Log LOG = LogFactory.getLog(HBaseClusterManager.class);
52    private String sshUserName;
53    private String sshOptions;
54  
55    /**
56     * The command format that is used to execute the remote command. Arguments:
57     * 1 SSH options, 2 user name , 3 "@" if username is set, 4 host,
58     * 5 original command, 6 service user.
59     */
60    private static final String DEFAULT_TUNNEL_CMD =
61        "/usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo -u %6$s %5$s\"";
62    private String tunnelCmd;
63  
64    private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
65    private static final int DEFAULT_RETRY_ATTEMPTS = 5;
66  
67    private static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval";
68    private static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000;
69  
70    protected RetryCounterFactory retryCounterFactory;
71  
72    @Override
73    public void setConf(Configuration conf) {
74      super.setConf(conf);
75      if (conf == null) {
76        // Configured gets passed null before real conf. Why? I don't know.
77        return;
78      }
79      sshUserName = conf.get("hbase.it.clustermanager.ssh.user", "");
80      String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", "");
81      sshOptions = System.getenv("HBASE_SSH_OPTS");
82      if (!extraSshOptions.isEmpty()) {
83        sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " ");
84      }
85      sshOptions = (sshOptions == null) ? "" : sshOptions;
86      tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD);
87      // Print out ssh special config if any.
88      if ((sshUserName != null && sshUserName.length() > 0) ||
89          (sshOptions != null && sshOptions.length() > 0)) {
90        LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]");
91      }
92  
93      this.retryCounterFactory = new RetryCounterFactory(new RetryConfig()
94          .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS))
95          .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
96    }
97  
98    private String getServiceUser(ServiceType service) {
99      Configuration conf = getConf();
100     switch (service) {
101       case HADOOP_DATANODE:
102         return conf.get("hbase.it.clustermanager.hadoop.hdfs.user", "hdfs");
103       case ZOOKEEPER_SERVER:
104         return conf.get("hbase.it.clustermanager.zookeeper.user", "zookeeper");
105       default:
106         return conf.get("hbase.it.clustermanager.hbase.user", "hbase");
107     }
108   }
109 
110   /**
111    * Executes commands over SSH
112    */
113   protected class RemoteShell extends Shell.ShellCommandExecutor {
114     private String hostname;
115     private String user;
116 
117     public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env,
118         long timeout) {
119       super(execString, dir, env, timeout);
120       this.hostname = hostname;
121     }
122 
123     public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) {
124       super(execString, dir, env);
125       this.hostname = hostname;
126     }
127 
128     public RemoteShell(String hostname, String[] execString, File dir) {
129       super(execString, dir);
130       this.hostname = hostname;
131     }
132 
133     public RemoteShell(String hostname, String[] execString) {
134       super(execString);
135       this.hostname = hostname;
136     }
137 
138     public RemoteShell(String hostname, String user, String[] execString) {
139       super(execString);
140       this.hostname = hostname;
141       this.user = user;
142     }
143 
144     @Override
145     public String[] getExecString() {
146       String at = sshUserName.isEmpty() ? "" : "@";
147       String remoteCmd = StringUtils.join(super.getExecString(), " ");
148       String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd, user);
149       LOG.info("Executing full command [" + cmd + "]");
150       return new String[] { "/usr/bin/env", "bash", "-c", cmd };
151     }
152 
153     @Override
154     public void execute() throws IOException {
155       super.execute();
156     }
157   }
158 
159   /**
160    * Provides command strings for services to be executed by Shell. CommandProviders are
161    * pluggable, and different deployments(windows, bigtop, etc) can be managed by
162    * plugging-in custom CommandProvider's or ClusterManager's.
163    */
164   static abstract class CommandProvider {
165 
166     enum Operation {
167       START, STOP, RESTART
168     }
169 
170     public abstract String getCommand(ServiceType service, Operation op);
171 
172     public String isRunningCommand(ServiceType service) {
173       return findPidCommand(service);
174     }
175 
176     protected String findPidCommand(ServiceType service) {
177       return String.format("ps ux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
178           service);
179     }
180 
181     public String signalCommand(ServiceType service, String signal) {
182       return String.format("%s | xargs kill -s %s", findPidCommand(service), signal);
183     }
184   }
185 
186   /**
187    * CommandProvider to manage the service using bin/hbase-* scripts
188    */
189   static class HBaseShellCommandProvider extends CommandProvider {
190     private final String hbaseHome;
191     private final String confDir;
192 
193     HBaseShellCommandProvider(Configuration conf) {
194       hbaseHome = conf.get("hbase.it.clustermanager.hbase.home",
195         System.getenv("HBASE_HOME"));
196       String tmp = conf.get("hbase.it.clustermanager.hbase.conf.dir",
197         System.getenv("HBASE_CONF_DIR"));
198       if (tmp != null) {
199         confDir = String.format("--config %s", tmp);
200       } else {
201         confDir = "";
202       }
203     }
204 
205     @Override
206     public String getCommand(ServiceType service, Operation op) {
207       return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir,
208           op.toString().toLowerCase(), service);
209     }
210   }
211 
212   /**
213    * CommandProvider to manage the service using sbin/hadoop-* scripts.
214    */
215   static class HadoopShellCommandProvider extends CommandProvider {
216     private final String hadoopHome;
217     private final String confDir;
218 
219     HadoopShellCommandProvider(Configuration conf) throws IOException {
220       hadoopHome = conf.get("hbase.it.clustermanager.hadoop.home",
221           System.getenv("HADOOP_HOME"));
222       String tmp = conf.get("hbase.it.clustermanager.hadoop.conf.dir",
223           System.getenv("HADOOP_CONF_DIR"));
224       if (hadoopHome == null) {
225         throw new IOException("Hadoop home configuration parameter i.e. " +
226           "'hbase.it.clustermanager.hadoop.home' is not configured properly.");
227       }
228       if (tmp != null) {
229         confDir = String.format("--config %s", tmp);
230       } else {
231         confDir = "";
232       }
233     }
234 
235     @Override
236     public String getCommand(ServiceType service, Operation op) {
237       return String.format("%s/sbin/hadoop-daemon.sh %s %s %s", hadoopHome, confDir,
238           op.toString().toLowerCase(), service);
239     }
240   }
241 
242   /**
243    * CommandProvider to manage the service using bin/zk* scripts.
244    */
245   static class ZookeeperShellCommandProvider extends CommandProvider {
246     private final String zookeeperHome;
247     private final String confDir;
248 
249     ZookeeperShellCommandProvider(Configuration conf) throws IOException {
250       zookeeperHome = conf.get("hbase.it.clustermanager.zookeeper.home",
251           System.getenv("ZOOBINDIR"));
252       String tmp = conf.get("hbase.it.clustermanager.zookeeper.conf.dir",
253           System.getenv("ZOOCFGDIR"));
254       if (zookeeperHome == null) {
255         throw new IOException("Zookeeper home configuration parameter i.e. " +
256           "'hbase.it.clustermanager.zookeeper.home' is not configured properly.");
257       }
258       if (tmp != null) {
259         confDir = String.format("--config %s", tmp);
260       } else {
261         confDir = "";
262       }
263     }
264 
265     @Override
266     public String getCommand(ServiceType service, Operation op) {
267       return String.format("%s/bin/zkServer.sh %s", zookeeperHome, op.toString().toLowerCase());
268     }
269 
270     @Override
271     protected String findPidCommand(ServiceType service) {
272       return String.format("ps ux | grep %s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
273         service);
274     }
275   }
276 
277   public HBaseClusterManager() {
278   }
279 
280   protected CommandProvider getCommandProvider(ServiceType service) throws IOException {
281     switch (service) {
282       case HADOOP_DATANODE:
283         return new HadoopShellCommandProvider(getConf());
284       case ZOOKEEPER_SERVER:
285         return new ZookeeperShellCommandProvider(getConf());
286       default:
287         return new HBaseShellCommandProvider(getConf());
288     }
289   }
290 
291   /**
292    * Execute the given command on the host using SSH
293    * @return pair of exit code and command output
294    * @throws IOException if something goes wrong.
295    */
296   private Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
297     throws IOException {
298     LOG.info("Executing remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname);
299 
300     RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd);
301     try {
302       shell.execute();
303     } catch (Shell.ExitCodeException ex) {
304       // capture the stdout of the process as well.
305       String output = shell.getOutput();
306       // add output for the ExitCodeException.
307       throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage()
308         + ", stdout: " + output);
309     }
310 
311     LOG.info("Executed remote command, exit code:" + shell.getExitCode()
312         + " , output:" + shell.getOutput());
313 
314     return new Pair<Integer, String>(shell.getExitCode(), shell.getOutput());
315   }
316 
317   private Pair<Integer, String> execWithRetries(String hostname, ServiceType service, String... cmd)
318       throws IOException {
319     RetryCounter retryCounter = retryCounterFactory.create();
320     while (true) {
321       try {
322         return exec(hostname, service, cmd);
323       } catch (IOException e) {
324         retryOrThrow(retryCounter, e, hostname, cmd);
325       }
326       try {
327         retryCounter.sleepUntilNextRetry();
328       } catch (InterruptedException ex) {
329         // ignore
330         LOG.warn("Sleep Interrupted:" + ex);
331       }
332     }
333   }
334 
335   private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex,
336       String hostname, String[] cmd) throws E {
337     if (retryCounter.shouldRetry()) {
338       LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname
339         + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: "
340           + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage());
341       return;
342     }
343     throw ex;
344   }
345 
346   private void exec(String hostname, ServiceType service, Operation op) throws IOException {
347     execWithRetries(hostname, service, getCommandProvider(service).getCommand(service, op));
348   }
349 
350   @Override
351   public void start(ServiceType service, String hostname, int port) throws IOException {
352     exec(hostname, service, Operation.START);
353   }
354 
355   @Override
356   public void stop(ServiceType service, String hostname, int port) throws IOException {
357     exec(hostname, service, Operation.STOP);
358   }
359 
360   @Override
361   public void restart(ServiceType service, String hostname, int port) throws IOException {
362     exec(hostname, service, Operation.RESTART);
363   }
364 
365   public void signal(ServiceType service, String signal, String hostname) throws IOException {
366     execWithRetries(hostname, service, getCommandProvider(service).signalCommand(service, signal));
367   }
368 
369   @Override
370   public boolean isRunning(ServiceType service, String hostname, int port) throws IOException {
371     String ret = execWithRetries(hostname, service,
372       getCommandProvider(service).isRunningCommand(service)).getSecond();
373     return ret.length() > 0;
374   }
375 
376   @Override
377   public void kill(ServiceType service, String hostname, int port) throws IOException {
378     signal(service, SIGKILL, hostname);
379   }
380 
381   @Override
382   public void suspend(ServiceType service, String hostname, int port) throws IOException {
383     signal(service, SIGSTOP, hostname);
384   }
385 
386   @Override
387   public void resume(ServiceType service, String hostname, int port) throws IOException {
388     signal(service, SIGCONT, hostname);
389   }
390 }