1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.util;
18
19 import java.io.BufferedReader;
20 import java.io.BufferedWriter;
21 import java.io.File;
22 import java.io.FileInputStream;
23 import java.io.FileNotFoundException;
24 import java.io.FileWriter;
25 import java.io.FilenameFilter;
26 import java.io.IOException;
27 import java.io.InputStreamReader;
28 import java.io.PrintStream;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Scanner;
36 import java.util.Set;
37 import java.util.TreeMap;
38 import java.util.regex.Matcher;
39 import java.util.regex.Pattern;
40
41 import org.apache.commons.io.FileUtils;
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.apache.hadoop.conf.Configuration;
45 import org.apache.hadoop.hbase.HBaseTestingUtility;
46 import org.apache.hadoop.hbase.HConstants;
47 import org.apache.hadoop.hbase.LargeTests;
48 import org.apache.hadoop.hbase.MiniHBaseCluster;
49 import org.apache.hadoop.hbase.TableName;
50 import org.apache.hadoop.hbase.client.HTable;
51 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
52 import org.apache.hadoop.hdfs.MiniDFSCluster;
53 import org.junit.experimental.categories.Category;
54
55
56
57
58
59
60 @Category(LargeTests.class)
61 public class ProcessBasedLocalHBaseCluster {
62
63 private final String hbaseHome, workDir;
64 private final Configuration conf;
65 private final int numMasters, numRegionServers, numDataNodes;
66 private final List<Integer> rsPorts, masterPorts;
67
68 private final int zkClientPort;
69
70 private static final int MAX_FILE_SIZE_OVERRIDE = 10 * 1000 * 1000;
71
72 private static final Log LOG = LogFactory.getLog(
73 ProcessBasedLocalHBaseCluster.class);
74
75 private List<String> daemonPidFiles =
76 Collections.synchronizedList(new ArrayList<String>());;
77
78 private boolean shutdownHookInstalled;
79
80 private String hbaseDaemonScript;
81
82 private MiniDFSCluster dfsCluster;
83
84 private HBaseTestingUtility testUtil;
85
86 private Thread logTailerThread;
87
88 private List<String> logTailDirs = Collections.synchronizedList(new ArrayList<String>());
89
90 private static enum ServerType {
91 MASTER("master"),
92 RS("regionserver"),
93 ZK("zookeeper");
94
95 private final String fullName;
96
97 private ServerType(String fullName) {
98 this.fullName = fullName;
99 }
100 }
101
102
103
104
105
106 public ProcessBasedLocalHBaseCluster(Configuration conf,
107 int numDataNodes, int numRegionServers) {
108 this.conf = conf;
109 this.hbaseHome = HBaseHomePath.getHomePath();
110 this.numMasters = 1;
111 this.numRegionServers = numRegionServers;
112 this.workDir = hbaseHome + "/target/local_cluster";
113 this.numDataNodes = numDataNodes;
114
115 hbaseDaemonScript = hbaseHome + "/bin/hbase-daemon.sh";
116 zkClientPort = HBaseTestingUtility.randomFreePort();
117
118 this.rsPorts = sortedPorts(numRegionServers);
119 this.masterPorts = sortedPorts(numMasters);
120
121 conf.set(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
122 conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
123 }
124
125
126
127
128
129
130 public void startMiniDFS() throws Exception {
131 if (testUtil == null) {
132 testUtil = new HBaseTestingUtility(conf);
133 }
134 dfsCluster = testUtil.startMiniDFSCluster(numDataNodes);
135 }
136
137
138
139
140
141
142 private static List<Integer> sortedPorts(int n) {
143 List<Integer> ports = new ArrayList<Integer>(n);
144 for (int i = 0; i < n; ++i) {
145 ports.add(HBaseTestingUtility.randomFreePort());
146 }
147 Collections.sort(ports);
148 return ports;
149 }
150
151 public void startHBase() throws IOException {
152 startDaemonLogTailer();
153 cleanupOldState();
154
155
156 LOG.info("Starting ZooKeeper on port " + zkClientPort);
157 startZK();
158
159 HBaseTestingUtility.waitForHostPort(HConstants.LOCALHOST, zkClientPort);
160
161 for (int masterPort : masterPorts) {
162 startMaster(masterPort);
163 }
164
165 ZKUtil.waitForBaseZNode(conf);
166
167 for (int rsPort : rsPorts) {
168 startRegionServer(rsPort);
169 }
170
171 LOG.info("Waiting for HBase startup by scanning META");
172 int attemptsLeft = 10;
173 while (attemptsLeft-- > 0) {
174 try {
175 new HTable(conf, TableName.META_TABLE_NAME);
176 } catch (Exception e) {
177 LOG.info("Waiting for HBase to startup. Retries left: " + attemptsLeft,
178 e);
179 Threads.sleep(1000);
180 }
181 }
182
183 LOG.info("Process-based HBase Cluster with " + numRegionServers +
184 " region servers up and running... \n\n");
185 }
186
187 public void startRegionServer(int port) {
188 startServer(ServerType.RS, port);
189 }
190
191 public void startMaster(int port) {
192 startServer(ServerType.MASTER, port);
193 }
194
195 public void killRegionServer(int port) throws IOException {
196 killServer(ServerType.RS, port);
197 }
198
199 public void killMaster() throws IOException {
200 killServer(ServerType.MASTER, 0);
201 }
202
203 public void startZK() {
204 startServer(ServerType.ZK, 0);
205 }
206
207 private void executeCommand(String command) {
208 executeCommand(command, null);
209 }
210
211 private void executeCommand(String command, Map<String,
212 String> envOverrides) {
213 ensureShutdownHookInstalled();
214 LOG.debug("Command : " + command);
215
216 try {
217 String [] envp = null;
218 if (envOverrides != null) {
219 Map<String, String> map = new HashMap<String, String>(
220 System.getenv());
221 map.putAll(envOverrides);
222 envp = new String[map.size()];
223 int idx = 0;
224 for (Map.Entry<String, String> e: map.entrySet()) {
225 envp[idx++] = e.getKey() + "=" + e.getValue();
226 }
227 }
228
229 Process p = Runtime.getRuntime().exec(command, envp);
230
231 BufferedReader stdInput = new BufferedReader(
232 new InputStreamReader(p.getInputStream()));
233 BufferedReader stdError = new BufferedReader(
234 new InputStreamReader(p.getErrorStream()));
235
236
237 String s = null;
238 while ((s = stdInput.readLine()) != null) {
239 System.out.println(s);
240 }
241
242
243 while ((s = stdError.readLine()) != null) {
244 System.out.println(s);
245 }
246 } catch (IOException e) {
247 LOG.error("Error running: " + command, e);
248 }
249 }
250
251 private void shutdownAllProcesses() {
252 LOG.info("Killing daemons using pid files");
253 final List<String> pidFiles = new ArrayList<String>(daemonPidFiles);
254 for (String pidFile : pidFiles) {
255 int pid = 0;
256 try {
257 pid = readPidFromFile(pidFile);
258 } catch (IOException ex) {
259 LOG.error("Could not read pid from file " + pidFile);
260 }
261
262 if (pid > 0) {
263 LOG.info("Killing pid " + pid + " (" + pidFile + ")");
264 killProcess(pid);
265 }
266 }
267 }
268
269 private void ensureShutdownHookInstalled() {
270 if (shutdownHookInstalled) {
271 return;
272 }
273
274 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
275 @Override
276 public void run() {
277 shutdownAllProcesses();
278 }
279 }));
280
281 shutdownHookInstalled = true;
282 }
283
284 private void cleanupOldState() {
285 executeCommand("rm -rf " + workDir);
286 }
287
288 private void writeStringToFile(String s, String fileName) {
289 try {
290 BufferedWriter out = new BufferedWriter(new FileWriter(fileName));
291 out.write(s);
292 out.close();
293 } catch (IOException e) {
294 LOG.error("Error writing to: " + fileName, e);
295 }
296 }
297
298 private String serverWorkingDir(ServerType serverType, int port) {
299 return workDir + "/" + serverType + "-" + port;
300 }
301
302 private int getServerPID(ServerType serverType, int port) throws IOException {
303 String pidFile = pidFilePath(serverType, port);
304 return readPidFromFile(pidFile);
305 }
306
307 private static int readPidFromFile(String pidFile) throws IOException {
308 Scanner scanner = new Scanner(new File(pidFile));
309 try {
310 return scanner.nextInt();
311 } finally {
312 scanner.close();
313 }
314 }
315
316 private String pidFilePath(ServerType serverType, int port) {
317 String dir = serverWorkingDir(serverType, port);
318 String user = System.getenv("USER");
319 String pidFile = String.format("%s/hbase-%s-%s.pid",
320 dir, user, serverType.fullName);
321 return pidFile;
322 }
323
324 private void killServer(ServerType serverType, int port) throws IOException {
325 int pid = getServerPID(serverType, port);
326 if (pid > 0) {
327 LOG.info("Killing " + serverType + "; pid=" + pid);
328 killProcess(pid);
329 }
330 }
331
332 private void killProcess(int pid) {
333 String cmd = "kill -s KILL " + pid;
334 executeCommand(cmd);
335 }
336
337 private void startServer(ServerType serverType, int rsPort) {
338
339 String dir = serverWorkingDir(serverType, rsPort);
340 String confStr = generateConfig(serverType, rsPort, dir);
341 LOG.debug("Creating directory " + dir);
342 new File(dir).mkdirs();
343
344 writeStringToFile(confStr, dir + "/hbase-site.xml");
345
346
347
348
349
350 writeStringToFile(
351 "unset HBASE_MASTER_OPTS\n" +
352 "unset HBASE_REGIONSERVER_OPTS\n" +
353 "unset HBASE_ZOOKEEPER_OPTS\n" +
354 "HBASE_MASTER_DBG_OPTS=' '\n" +
355 "HBASE_REGIONSERVER_DBG_OPTS=' '\n" +
356 "HBASE_ZOOKEEPER_DBG_OPTS=' '\n" +
357 "HBASE_MASTER_JMX_OPTS=' '\n" +
358 "HBASE_REGIONSERVER_JMX_OPTS=' '\n" +
359 "HBASE_ZOOKEEPER_JMX_OPTS=' '\n",
360 dir + "/hbase-env.sh");
361
362 Map<String, String> envOverrides = new HashMap<String, String>();
363 envOverrides.put("HBASE_LOG_DIR", dir);
364 envOverrides.put("HBASE_PID_DIR", dir);
365 try {
366 FileUtils.copyFile(
367 new File(hbaseHome, "conf/log4j.properties"),
368 new File(dir, "log4j.properties"));
369 } catch (IOException ex) {
370 LOG.error("Could not install log4j.properties into " + dir);
371 }
372
373 executeCommand(hbaseDaemonScript + " --config " + dir +
374 " start " + serverType.fullName, envOverrides);
375 daemonPidFiles.add(pidFilePath(serverType, rsPort));
376 logTailDirs.add(dir);
377 }
378
379 private final String generateConfig(ServerType serverType, int rpcPort,
380 String daemonDir) {
381 StringBuilder sb = new StringBuilder();
382 Map<String, Object> confMap = new TreeMap<String, Object>();
383 confMap.put(HConstants.CLUSTER_DISTRIBUTED, true);
384
385 if (serverType == ServerType.MASTER) {
386 confMap.put(HConstants.MASTER_PORT, rpcPort);
387
388 int masterInfoPort = HBaseTestingUtility.randomFreePort();
389 reportWebUIPort("master", masterInfoPort);
390 confMap.put(HConstants.MASTER_INFO_PORT, masterInfoPort);
391 } else if (serverType == ServerType.RS) {
392 confMap.put(HConstants.REGIONSERVER_PORT, rpcPort);
393
394 int rsInfoPort = HBaseTestingUtility.randomFreePort();
395 reportWebUIPort("region server", rsInfoPort);
396 confMap.put(HConstants.REGIONSERVER_INFO_PORT, rsInfoPort);
397 } else {
398 confMap.put(HConstants.ZOOKEEPER_DATA_DIR, daemonDir);
399 }
400
401 confMap.put(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
402 confMap.put(HConstants.HREGION_MAX_FILESIZE, MAX_FILE_SIZE_OVERRIDE);
403
404 if (dfsCluster != null) {
405 String fsURL = "hdfs://" + HConstants.LOCALHOST + ":" + dfsCluster.getNameNodePort();
406 confMap.put("fs.default.name", fsURL);
407 confMap.put("fs.defaultFS", fsURL);
408 confMap.put("hbase.rootdir", fsURL + "/hbase_test");
409 }
410
411 sb.append("<configuration>\n");
412 for (Map.Entry<String, Object> entry : confMap.entrySet()) {
413 sb.append(" <property>\n");
414 sb.append(" <name>" + entry.getKey() + "</name>\n");
415 sb.append(" <value>" + entry.getValue() + "</value>\n");
416 sb.append(" </property>\n");
417 }
418 sb.append("</configuration>\n");
419 return sb.toString();
420 }
421
422 private static void reportWebUIPort(String daemon, int port) {
423 LOG.info("Local " + daemon + " web UI is at http://"
424 + HConstants.LOCALHOST + ":" + port);
425 }
426
427 public Configuration getConf() {
428 return conf;
429 }
430
431 public void shutdown() {
432 if (dfsCluster != null) {
433 dfsCluster.shutdown();
434 }
435 shutdownAllProcesses();
436 }
437
438 private static final Pattern TO_REMOVE_FROM_LOG_LINES_RE =
439 Pattern.compile("org\\.apache\\.hadoop\\.hbase\\.");
440
441 private static final Pattern LOG_PATH_FORMAT_RE =
442 Pattern.compile("^.*/([A-Z]+)-(\\d+)/[^/]+$");
443
444 private static String processLine(String line) {
445 Matcher m = TO_REMOVE_FROM_LOG_LINES_RE.matcher(line);
446 return m.replaceAll("");
447 }
448
449 private final class LocalDaemonLogTailer implements Runnable {
450 private final Set<String> tailedFiles = new HashSet<String>();
451 private final List<String> dirList = new ArrayList<String>();
452 private final Object printLock = new Object();
453
454 private final FilenameFilter LOG_FILES = new FilenameFilter() {
455 @Override
456 public boolean accept(File dir, String name) {
457 return name.endsWith(".out") || name.endsWith(".log");
458 }
459 };
460
461 @Override
462 public void run() {
463 try {
464 runInternal();
465 } catch (IOException ex) {
466 LOG.error(ex);
467 }
468 }
469
470 private void runInternal() throws IOException {
471 Thread.currentThread().setName(getClass().getSimpleName());
472 while (true) {
473 scanDirs();
474 try {
475 Thread.sleep(500);
476 } catch (InterruptedException e) {
477 LOG.error("Log tailer thread interrupted", e);
478 break;
479 }
480 }
481 }
482
483 private void scanDirs() throws FileNotFoundException {
484 dirList.clear();
485 dirList.addAll(logTailDirs);
486 for (String d : dirList) {
487 for (File f : new File(d).listFiles(LOG_FILES)) {
488 String filePath = f.getAbsolutePath();
489 if (!tailedFiles.contains(filePath)) {
490 tailedFiles.add(filePath);
491 startTailingFile(filePath);
492 }
493 }
494 }
495 }
496
497 private void startTailingFile(final String filePath) throws FileNotFoundException {
498 final PrintStream dest = filePath.endsWith(".log") ? System.err : System.out;
499 final ServerType serverType;
500 final int serverPort;
501 Matcher m = LOG_PATH_FORMAT_RE.matcher(filePath);
502 if (m.matches()) {
503 serverType = ServerType.valueOf(m.group(1));
504 serverPort = Integer.valueOf(m.group(2));
505 } else {
506 LOG.error("Unrecognized log path format: " + filePath);
507 return;
508 }
509 final String logMsgPrefix =
510 "[" + serverType + (serverPort != 0 ? ":" + serverPort : "") + "] ";
511
512 LOG.debug("Tailing " + filePath);
513 Thread t = new Thread(new Runnable() {
514 @Override
515 public void run() {
516 try {
517 FileInputStream fis = new FileInputStream(filePath);
518 BufferedReader br = new BufferedReader(new InputStreamReader(fis));
519 String line;
520 while (true) {
521 try {
522 Thread.sleep(200);
523 } catch (InterruptedException e) {
524 LOG.error("Tailer for " + filePath + " interrupted");
525 break;
526 }
527 while ((line = br.readLine()) != null) {
528 line = logMsgPrefix + processLine(line);
529 synchronized (printLock) {
530 if (line.endsWith("\n")) {
531 dest.print(line);
532 } else {
533 dest.println(line);
534 }
535 dest.flush();
536 }
537 }
538 }
539 } catch (IOException ex) {
540 LOG.error("Failed tailing " + filePath, ex);
541 }
542 }
543 });
544 t.setDaemon(true);
545 t.setName("Tailer for " + filePath);
546 t.start();
547 }
548
549 }
550
551 private void startDaemonLogTailer() {
552 logTailerThread = new Thread(new LocalDaemonLogTailer());
553 logTailerThread.setDaemon(true);
554 logTailerThread.start();
555 }
556
557 }
558