1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.cleaner;
19
20 import static org.junit.Assert.assertEquals;
21
22 import java.io.IOException;
23 import java.net.URLEncoder;
24
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.fs.FileStatus;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.*;
30 import org.apache.hadoop.hbase.catalog.CatalogTracker;
31 import org.apache.hadoop.hbase.replication.ReplicationFactory;
32 import org.apache.hadoop.hbase.replication.ReplicationQueues;
33 import org.apache.hadoop.hbase.replication.regionserver.Replication;
34 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
35 import org.junit.AfterClass;
36 import org.junit.BeforeClass;
37 import org.junit.Test;
38 import org.junit.experimental.categories.Category;
39
40 @Category(MediumTests.class)
41 public class TestLogsCleaner {
42
43 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
44
45
46
47
48 @BeforeClass
49 public static void setUpBeforeClass() throws Exception {
50 TEST_UTIL.startMiniZKCluster();
51 }
52
53
54
55
56 @AfterClass
57 public static void tearDownAfterClass() throws Exception {
58 TEST_UTIL.shutdownMiniZKCluster();
59 }
60
61 @Test
62 public void testLogCleaning() throws Exception{
63 Configuration conf = TEST_UTIL.getConfiguration();
64
65 long ttl = 10000;
66 conf.setLong("hbase.master.logcleaner.ttl", ttl);
67 conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
68 Replication.decorateMasterConfiguration(conf);
69 Server server = new DummyServer();
70 ReplicationQueues repQueues =
71 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
72 repQueues.init(server.getServerName().toString());
73 final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(),
74 HConstants.HREGION_OLDLOGDIR_NAME);
75 String fakeMachineName =
76 URLEncoder.encode(server.getServerName().toString(), "UTF8");
77
78 final FileSystem fs = FileSystem.get(conf);
79
80
81 long now = System.currentTimeMillis();
82 fs.delete(oldLogDir, true);
83 fs.mkdirs(oldLogDir);
84
85 fs.createNewFile(new Path(oldLogDir, "a"));
86 fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a"));
87
88
89 System.out.println("Now is: " + now);
90 for (int i = 1; i < 31; i++) {
91
92
93 Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i) );
94 fs.createNewFile(fileName);
95
96
97
98
99 if (i % (30/3) == 1) {
100 repQueues.addLog(fakeMachineName, fileName.getName());
101 System.out.println("Replication log file: " + fileName);
102 }
103 }
104
105
106 Thread.sleep(ttl);
107 fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now));
108
109
110
111 fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + 10000) ));
112
113 for (FileStatus stat : fs.listStatus(oldLogDir)) {
114 System.out.println(stat.getPath().toString());
115 }
116
117 assertEquals(34, fs.listStatus(oldLogDir).length);
118
119 LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir);
120 cleaner.chore();
121
122
123
124 TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
125 @Override
126 public boolean evaluate() throws Exception {
127 return 5 == fs.listStatus(oldLogDir).length;
128 }
129 });
130
131 for (FileStatus file : fs.listStatus(oldLogDir)) {
132 System.out.println("Kept log files: " + file.getPath().getName());
133 }
134 }
135
136 static class DummyServer implements Server {
137
138 @Override
139 public Configuration getConfiguration() {
140 return TEST_UTIL.getConfiguration();
141 }
142
143 @Override
144 public ZooKeeperWatcher getZooKeeper() {
145 try {
146 return new ZooKeeperWatcher(getConfiguration(), "dummy server", this);
147 } catch (IOException e) {
148 e.printStackTrace();
149 }
150 return null;
151 }
152
153 @Override
154 public CatalogTracker getCatalogTracker() {
155 return null;
156 }
157
158 @Override
159 public ServerName getServerName() {
160 return ServerName.valueOf("regionserver,60020,000000");
161 }
162
163 @Override
164 public void abort(String why, Throwable e) {}
165
166 @Override
167 public boolean isAborted() {
168 return false;
169 }
170
171 @Override
172 public void stop(String why) {}
173
174 @Override
175 public boolean isStopped() {
176 return false;
177 }
178 }
179
180 }
181