View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.util;
18  
19  import java.io.BufferedReader;
20  import java.io.BufferedWriter;
21  import java.io.File;
22  import java.io.FileInputStream;
23  import java.io.FileNotFoundException;
24  import java.io.FileWriter;
25  import java.io.FilenameFilter;
26  import java.io.IOException;
27  import java.io.InputStreamReader;
28  import java.io.PrintStream;
29  import java.util.ArrayList;
30  import java.util.Collections;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.Scanner;
36  import java.util.Set;
37  import java.util.TreeMap;
38  import java.util.regex.Matcher;
39  import java.util.regex.Pattern;
40  
41  import org.apache.commons.io.FileUtils;
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.hbase.HBaseTestingUtility;
46  import org.apache.hadoop.hbase.HConstants;
47  import org.apache.hadoop.hbase.LargeTests;
48  import org.apache.hadoop.hbase.MiniHBaseCluster;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.client.HTable;
51  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
52  import org.apache.hadoop.hdfs.MiniDFSCluster;
53  import org.junit.experimental.categories.Category;
54  
55  /**
56   * A helper class for process-based mini-cluster tests. Unlike
57   * {@link MiniHBaseCluster}, starts daemons as separate processes, allowing to
58   * do real kill testing.
59   */
60  @Category(LargeTests.class)
61  public class ProcessBasedLocalHBaseCluster {
62  
63    private final String hbaseHome, workDir;
64    private final Configuration conf;
65    private final int numMasters, numRegionServers, numDataNodes;
66    private final List<Integer> rsPorts, masterPorts;
67  
68    private final int zkClientPort;
69  
70    private static final int MAX_FILE_SIZE_OVERRIDE = 10 * 1000 * 1000;
71  
72    private static final Log LOG = LogFactory.getLog(
73        ProcessBasedLocalHBaseCluster.class);
74  
75    private List<String> daemonPidFiles =
76        Collections.synchronizedList(new ArrayList<String>());;
77  
78    private boolean shutdownHookInstalled;
79  
80    private String hbaseDaemonScript;
81  
82    private MiniDFSCluster dfsCluster;
83  
84    private HBaseTestingUtility testUtil;
85  
86    private Thread logTailerThread;
87  
88    private List<String> logTailDirs = Collections.synchronizedList(new ArrayList<String>());
89  
90    private static enum ServerType {
91      MASTER("master"),
92      RS("regionserver"),
93      ZK("zookeeper");
94  
95      private final String fullName;
96  
97      private ServerType(String fullName) {
98        this.fullName = fullName;
99      }
100   }
101 
102   /**
103    * Constructor. Modifies the passed configuration.
104    * @param hbaseHome the top directory of the HBase source tree
105    */
106   public ProcessBasedLocalHBaseCluster(Configuration conf,
107       int numDataNodes, int numRegionServers) {
108     this.conf = conf;
109     this.hbaseHome = HBaseHomePath.getHomePath();
110     this.numMasters = 1;
111     this.numRegionServers = numRegionServers;
112     this.workDir = hbaseHome + "/target/local_cluster";
113     this.numDataNodes = numDataNodes;
114 
115     hbaseDaemonScript = hbaseHome + "/bin/hbase-daemon.sh";
116     zkClientPort = HBaseTestingUtility.randomFreePort();
117 
118     this.rsPorts = sortedPorts(numRegionServers);
119     this.masterPorts = sortedPorts(numMasters);
120 
121     conf.set(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
122     conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
123   }
124 
125   /**
126    * Makes this local HBase cluster use a mini-DFS cluster. Must be called before
127    * {@link #startHBase()}.
128    * @throws IOException
129    */
130   public void startMiniDFS() throws Exception {
131     if (testUtil == null) {
132       testUtil = new HBaseTestingUtility(conf);
133     }
134     dfsCluster = testUtil.startMiniDFSCluster(numDataNodes);
135   }
136 
137   /**
138    * Generates a list of random port numbers in the sorted order. A sorted
139    * order makes sense if we ever want to refer to these servers by their index
140    * in the returned array, e.g. server #0, #1, etc.
141    */
142   private static List<Integer> sortedPorts(int n) {
143     List<Integer> ports = new ArrayList<Integer>(n);
144     for (int i = 0; i < n; ++i) {
145       ports.add(HBaseTestingUtility.randomFreePort());
146     }
147     Collections.sort(ports);
148     return ports;
149   }
150 
151   public void startHBase() throws IOException {
152     startDaemonLogTailer();
153     cleanupOldState();
154 
155     // start ZK
156     LOG.info("Starting ZooKeeper on port " + zkClientPort);
157     startZK();
158 
159     HBaseTestingUtility.waitForHostPort(HConstants.LOCALHOST, zkClientPort);
160 
161     for (int masterPort : masterPorts) {
162       startMaster(masterPort);
163     }
164 
165     ZKUtil.waitForBaseZNode(conf);
166 
167     for (int rsPort : rsPorts) {
168       startRegionServer(rsPort);
169     }
170 
171     LOG.info("Waiting for HBase startup by scanning META");
172     int attemptsLeft = 10;
173     while (attemptsLeft-- > 0) {
174       try {
175         new HTable(conf, TableName.META_TABLE_NAME);
176       } catch (Exception e) {
177         LOG.info("Waiting for HBase to startup. Retries left: " + attemptsLeft,
178             e);
179         Threads.sleep(1000);
180       }
181     }
182 
183     LOG.info("Process-based HBase Cluster with " + numRegionServers +
184         " region servers up and running... \n\n");
185   }
186 
187   public void startRegionServer(int port) {
188     startServer(ServerType.RS, port);
189   }
190 
191   public void startMaster(int port) {
192     startServer(ServerType.MASTER, port);
193   }
194 
195   public void killRegionServer(int port) throws IOException {
196     killServer(ServerType.RS, port);
197   }
198 
199   public void killMaster() throws IOException {
200     killServer(ServerType.MASTER, 0);
201   }
202 
203   public void startZK() {
204     startServer(ServerType.ZK, 0);
205   }
206 
207   private void executeCommand(String command) {
208     executeCommand(command, null);
209   }
210 
211   private void executeCommand(String command, Map<String,
212       String> envOverrides) {
213     ensureShutdownHookInstalled();
214     LOG.debug("Command : " + command);
215 
216     try {
217       String [] envp = null;
218       if (envOverrides != null) {
219         Map<String, String> map = new HashMap<String, String>(
220             System.getenv());
221         map.putAll(envOverrides);
222         envp = new String[map.size()];
223         int idx = 0;
224         for (Map.Entry<String, String> e: map.entrySet()) {
225           envp[idx++] = e.getKey() + "=" + e.getValue();
226         }
227       }
228 
229       Process p = Runtime.getRuntime().exec(command, envp);
230 
231       BufferedReader stdInput = new BufferedReader(
232           new InputStreamReader(p.getInputStream()));
233       BufferedReader stdError = new BufferedReader(
234           new InputStreamReader(p.getErrorStream()));
235 
236       // read the output from the command
237       String s = null;
238       while ((s = stdInput.readLine()) != null) {
239         System.out.println(s);
240       }
241 
242       // read any errors from the attempted command
243       while ((s = stdError.readLine()) != null) {
244         System.out.println(s);
245       }
246     } catch (IOException e) {
247       LOG.error("Error running: " + command, e);
248     }
249   }
250 
251   private void shutdownAllProcesses() {
252     LOG.info("Killing daemons using pid files");
253     final List<String> pidFiles = new ArrayList<String>(daemonPidFiles);
254     for (String pidFile : pidFiles) {
255       int pid = 0;
256       try {
257         pid = readPidFromFile(pidFile);
258       } catch (IOException ex) {
259         LOG.error("Could not read pid from file " + pidFile);
260       }
261 
262       if (pid > 0) {
263         LOG.info("Killing pid " + pid + " (" + pidFile + ")");
264         killProcess(pid);
265       }
266     }
267   }
268 
269   private void ensureShutdownHookInstalled() {
270     if (shutdownHookInstalled) {
271       return;
272     }
273 
274     Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
275       @Override
276       public void run() {
277         shutdownAllProcesses();
278       }
279     }));
280 
281     shutdownHookInstalled = true;
282   }
283 
284   private void cleanupOldState() {
285     executeCommand("rm -rf " + workDir);
286   }
287 
288   private void writeStringToFile(String s, String fileName) {
289     try {
290       BufferedWriter out = new BufferedWriter(new FileWriter(fileName));
291       out.write(s);
292       out.close();
293     } catch (IOException e) {
294       LOG.error("Error writing to: " + fileName, e);
295     }
296   }
297 
298   private String serverWorkingDir(ServerType serverType, int port) {
299     return workDir + "/" + serverType + "-" + port;
300   }
301 
302   private int getServerPID(ServerType serverType, int port) throws IOException {
303     String pidFile = pidFilePath(serverType, port);
304     return readPidFromFile(pidFile);
305   }
306 
307   private static int readPidFromFile(String pidFile) throws IOException {
308     Scanner scanner = new Scanner(new File(pidFile));
309     try {
310       return scanner.nextInt();
311     } finally {
312       scanner.close();
313     }
314   }
315 
316   private String pidFilePath(ServerType serverType, int port) {
317     String dir = serverWorkingDir(serverType, port);
318     String user = System.getenv("USER");
319     String pidFile = String.format("%s/hbase-%s-%s.pid",
320                                    dir, user, serverType.fullName);
321     return pidFile;
322   }
323 
324   private void killServer(ServerType serverType, int port) throws IOException {
325     int pid = getServerPID(serverType, port);
326     if (pid > 0) {
327       LOG.info("Killing " + serverType + "; pid=" + pid);
328       killProcess(pid);
329     }
330   }
331 
332   private void killProcess(int pid) {
333     String cmd = "kill -s KILL " + pid;
334     executeCommand(cmd);
335   }
336 
337   private void startServer(ServerType serverType, int rsPort) {
338     // create working directory for this region server.
339     String dir = serverWorkingDir(serverType, rsPort);
340     String confStr = generateConfig(serverType, rsPort, dir);
341     LOG.debug("Creating directory " + dir);
342     new File(dir).mkdirs();
343 
344     writeStringToFile(confStr, dir + "/hbase-site.xml");
345 
346     // Set debug options to an empty string so that hbase-config.sh does not configure them
347     // using default ports. If we want to run remote debugging on process-based local cluster's
348     // daemons, we can automatically choose non-conflicting JDWP and JMX ports for each daemon
349     // and specify them here.
350     writeStringToFile(
351         "unset HBASE_MASTER_OPTS\n" +
352         "unset HBASE_REGIONSERVER_OPTS\n" +
353         "unset HBASE_ZOOKEEPER_OPTS\n" +
354         "HBASE_MASTER_DBG_OPTS=' '\n" +
355         "HBASE_REGIONSERVER_DBG_OPTS=' '\n" +
356         "HBASE_ZOOKEEPER_DBG_OPTS=' '\n" +
357         "HBASE_MASTER_JMX_OPTS=' '\n" +
358         "HBASE_REGIONSERVER_JMX_OPTS=' '\n" +
359         "HBASE_ZOOKEEPER_JMX_OPTS=' '\n",
360         dir + "/hbase-env.sh");
361 
362     Map<String, String> envOverrides = new HashMap<String, String>();
363     envOverrides.put("HBASE_LOG_DIR", dir);
364     envOverrides.put("HBASE_PID_DIR", dir);
365     try {
366       FileUtils.copyFile(
367           new File(hbaseHome, "conf/log4j.properties"),
368           new File(dir, "log4j.properties"));
369     } catch (IOException ex) {
370       LOG.error("Could not install log4j.properties into " + dir);
371     }
372 
373     executeCommand(hbaseDaemonScript + " --config " + dir +
374                    " start " + serverType.fullName, envOverrides);
375     daemonPidFiles.add(pidFilePath(serverType, rsPort));
376     logTailDirs.add(dir);
377   }
378 
379   private final String generateConfig(ServerType serverType, int rpcPort,
380       String daemonDir) {
381     StringBuilder sb = new StringBuilder();
382     Map<String, Object> confMap = new TreeMap<String, Object>();
383     confMap.put(HConstants.CLUSTER_DISTRIBUTED, true);
384 
385     if (serverType == ServerType.MASTER) {
386       confMap.put(HConstants.MASTER_PORT, rpcPort);
387 
388       int masterInfoPort = HBaseTestingUtility.randomFreePort();
389       reportWebUIPort("master", masterInfoPort);
390       confMap.put(HConstants.MASTER_INFO_PORT, masterInfoPort);
391     } else if (serverType == ServerType.RS) {
392       confMap.put(HConstants.REGIONSERVER_PORT, rpcPort);
393 
394       int rsInfoPort = HBaseTestingUtility.randomFreePort();
395       reportWebUIPort("region server", rsInfoPort);
396       confMap.put(HConstants.REGIONSERVER_INFO_PORT, rsInfoPort);
397     } else {
398       confMap.put(HConstants.ZOOKEEPER_DATA_DIR, daemonDir);
399     }
400 
401     confMap.put(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
402     confMap.put(HConstants.HREGION_MAX_FILESIZE, MAX_FILE_SIZE_OVERRIDE);
403 
404     if (dfsCluster != null) {
405       String fsURL = "hdfs://" + HConstants.LOCALHOST + ":" + dfsCluster.getNameNodePort();
406       confMap.put("fs.default.name", fsURL);
407       confMap.put("fs.defaultFS", fsURL);
408       confMap.put("hbase.rootdir", fsURL + "/hbase_test");
409     }
410 
411     sb.append("<configuration>\n");
412     for (Map.Entry<String, Object> entry : confMap.entrySet()) {
413       sb.append("  <property>\n");
414       sb.append("    <name>" + entry.getKey() + "</name>\n");
415       sb.append("    <value>" + entry.getValue() + "</value>\n");
416       sb.append("  </property>\n");
417     }
418     sb.append("</configuration>\n");
419     return sb.toString();
420   }
421 
422   private static void reportWebUIPort(String daemon, int port) {
423     LOG.info("Local " + daemon + " web UI is at http://"
424         + HConstants.LOCALHOST + ":" + port);
425   }
426 
427   public Configuration getConf() {
428     return conf;
429   }
430 
431   public void shutdown() {
432     if (dfsCluster != null) {
433       dfsCluster.shutdown();
434     }
435     shutdownAllProcesses();
436   }
437 
438   private static final Pattern TO_REMOVE_FROM_LOG_LINES_RE =
439       Pattern.compile("org\\.apache\\.hadoop\\.hbase\\.");
440 
441   private static final Pattern LOG_PATH_FORMAT_RE =
442       Pattern.compile("^.*/([A-Z]+)-(\\d+)/[^/]+$");
443 
444   private static String processLine(String line) {
445     Matcher m = TO_REMOVE_FROM_LOG_LINES_RE.matcher(line);
446     return m.replaceAll("");
447   }
448 
449   private final class LocalDaemonLogTailer implements Runnable {
450     private final Set<String> tailedFiles = new HashSet<String>();
451     private final List<String> dirList = new ArrayList<String>();
452     private final Object printLock = new Object();
453 
454     private final FilenameFilter LOG_FILES = new FilenameFilter() {
455       @Override
456       public boolean accept(File dir, String name) {
457         return name.endsWith(".out") || name.endsWith(".log");
458       }
459     };
460 
461     @Override
462     public void run() {
463       try {
464         runInternal();
465       } catch (IOException ex) {
466         LOG.error(ex);
467       }
468     }
469 
470     private void runInternal() throws IOException {
471       Thread.currentThread().setName(getClass().getSimpleName());
472       while (true) {
473         scanDirs();
474         try {
475           Thread.sleep(500);
476         } catch (InterruptedException e) {
477           LOG.error("Log tailer thread interrupted", e);
478           break;
479         }
480       }
481     }
482 
483     private void scanDirs() throws FileNotFoundException {
484       dirList.clear();
485       dirList.addAll(logTailDirs);
486       for (String d : dirList) {
487         for (File f : new File(d).listFiles(LOG_FILES)) {
488           String filePath = f.getAbsolutePath();
489           if (!tailedFiles.contains(filePath)) {
490             tailedFiles.add(filePath);
491             startTailingFile(filePath);
492           }
493         }
494       }
495     }
496 
497     private void startTailingFile(final String filePath) throws FileNotFoundException {
498       final PrintStream dest = filePath.endsWith(".log") ? System.err : System.out;
499       final ServerType serverType;
500       final int serverPort;
501       Matcher m = LOG_PATH_FORMAT_RE.matcher(filePath);
502       if (m.matches()) {
503         serverType = ServerType.valueOf(m.group(1));
504         serverPort = Integer.valueOf(m.group(2));
505       } else {
506         LOG.error("Unrecognized log path format: " + filePath);
507         return;
508       }
509       final String logMsgPrefix =
510           "[" + serverType + (serverPort != 0 ? ":" + serverPort : "") + "] ";
511 
512       LOG.debug("Tailing " + filePath);
513       Thread t = new Thread(new Runnable() {
514         @Override
515         public void run() {
516           try {
517             FileInputStream fis = new FileInputStream(filePath);
518             BufferedReader br = new BufferedReader(new InputStreamReader(fis));
519             String line;
520             while (true) {
521               try {
522                 Thread.sleep(200);
523               } catch (InterruptedException e) {
524                 LOG.error("Tailer for " + filePath + " interrupted");
525                 break;
526               }
527               while ((line = br.readLine()) != null) {
528                 line = logMsgPrefix + processLine(line);
529                 synchronized (printLock) {
530                   if (line.endsWith("\n")) {
531                     dest.print(line);
532                   } else {
533                     dest.println(line);
534                   }
535                   dest.flush();
536                 }
537               }
538             }
539           } catch (IOException ex) {
540             LOG.error("Failed tailing " + filePath, ex);
541           }
542         }
543       });
544       t.setDaemon(true);
545       t.setName("Tailer for " + filePath);
546       t.start();
547     }
548 
549   }
550 
551   private void startDaemonLogTailer() {
552     logTailerThread = new Thread(new LocalDaemonLogTailer());
553     logTailerThread.setDaemon(true);
554     logTailerThread.start();
555   }
556 
557 }
558