1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.backup.example;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.concurrent.CountDownLatch;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FileStatus;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.HBaseTestingUtility;
36 import org.apache.hadoop.hbase.HColumnDescriptor;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.MediumTests;
39 import org.apache.hadoop.hbase.Stoppable;
40 import org.apache.hadoop.hbase.client.Put;
41 import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
42 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
43 import org.apache.hadoop.hbase.regionserver.HRegion;
44 import org.apache.hadoop.hbase.regionserver.Store;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.util.FSUtils;
47 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
48 import org.apache.hadoop.hbase.util.StoppableImplementation;
49 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
50 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
51 import org.apache.zookeeper.KeeperException;
52 import org.junit.After;
53 import org.junit.AfterClass;
54 import org.junit.BeforeClass;
55 import org.junit.Test;
56 import org.junit.experimental.categories.Category;
57 import org.mockito.Mockito;
58 import org.mockito.invocation.InvocationOnMock;
59 import org.mockito.stubbing.Answer;
60
61
62
63
64
65 @Category(MediumTests.class)
66 public class TestZooKeeperTableArchiveClient {
67
68 private static final Log LOG = LogFactory.getLog(TestZooKeeperTableArchiveClient.class);
69 private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
70 private static final String STRING_TABLE_NAME = "test";
71 private static final byte[] TEST_FAM = Bytes.toBytes("fam");
72 private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
73 private static ZKTableArchiveClient archivingClient;
74 private final List<Path> toCleanup = new ArrayList<Path>();
75
76
77
78
79 @BeforeClass
80 public static void setupCluster() throws Exception {
81 setupConf(UTIL.getConfiguration());
82 UTIL.startMiniZKCluster();
83 archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), UTIL.getHBaseAdmin()
84 .getConnection());
85
86 ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
87 String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
88 ZKUtil.createWithParents(watcher, archivingZNode);
89 }
90
91 private static void setupConf(Configuration conf) {
92
93 conf.setInt("hbase.hstore.compaction.min", 3);
94 }
95
96 @After
97 public void tearDown() throws Exception {
98 try {
99 FileSystem fs = UTIL.getTestFileSystem();
100
101 for (Path file : toCleanup) {
102
103 FSUtils.delete(fs, file, true);
104 }
105 } catch (IOException e) {
106 LOG.warn("Failure to delete archive directory", e);
107 } finally {
108 toCleanup.clear();
109 }
110
111 archivingClient.disableHFileBackup();
112 }
113
114 @AfterClass
115 public static void cleanupTest() throws Exception {
116 try {
117 UTIL.shutdownMiniZKCluster();
118 } catch (Exception e) {
119 LOG.warn("problem shutting down cluster", e);
120 }
121 }
122
123
124
125
126 @Test (timeout=300000)
127 public void testArchivingEnableDisable() throws Exception {
128
129 LOG.debug("----Starting archiving");
130 archivingClient.enableHFileBackupAsync(TABLE_NAME);
131 assertTrue("Archving didn't get turned on", archivingClient
132 .getArchivingEnabled(TABLE_NAME));
133
134
135 archivingClient.disableHFileBackup();
136 assertFalse("Archving didn't get turned off.", archivingClient.getArchivingEnabled(TABLE_NAME));
137
138
139 archivingClient.enableHFileBackupAsync(TABLE_NAME);
140 assertTrue("Archving didn't get turned on", archivingClient
141 .getArchivingEnabled(TABLE_NAME));
142
143
144 archivingClient.disableHFileBackup(TABLE_NAME);
145 assertFalse("Archving didn't get turned off for " + STRING_TABLE_NAME,
146 archivingClient.getArchivingEnabled(TABLE_NAME));
147 }
148
149 @Test (timeout=300000)
150 public void testArchivingOnSingleTable() throws Exception {
151 createArchiveDirectory();
152 FileSystem fs = UTIL.getTestFileSystem();
153 Path archiveDir = getArchiveDir();
154 Path tableDir = getTableDir(STRING_TABLE_NAME);
155 toCleanup.add(archiveDir);
156 toCleanup.add(tableDir);
157
158 Configuration conf = UTIL.getConfiguration();
159
160 Stoppable stop = new StoppableImplementation();
161 HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
162 List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
163 final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
164
165
166 HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
167 HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
168
169 loadFlushAndCompact(region, TEST_FAM);
170
171
172 List<Path> files = getAllFiles(fs, archiveDir);
173 if (files == null) {
174 FSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG);
175 throw new RuntimeException("Didn't archive any files!");
176 }
177 CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size());
178
179 runCleaner(cleaner, finished, stop);
180
181
182 List<Path> archivedFiles = getAllFiles(fs, archiveDir);
183 assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles);
184
185
186 assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration())));
187 }
188
189
190
191
192
193 @Test (timeout=300000)
194 public void testMultipleTables() throws Exception {
195 createArchiveDirectory();
196 String otherTable = "otherTable";
197
198 FileSystem fs = UTIL.getTestFileSystem();
199 Path archiveDir = getArchiveDir();
200 Path tableDir = getTableDir(STRING_TABLE_NAME);
201 Path otherTableDir = getTableDir(otherTable);
202
203
204 toCleanup.add(archiveDir);
205 toCleanup.add(tableDir);
206 toCleanup.add(otherTableDir);
207 Configuration conf = UTIL.getConfiguration();
208
209 Stoppable stop = new StoppableImplementation();
210 HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
211 List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
212 final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
213
214
215 HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
216 HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
217 loadFlushAndCompact(region, TEST_FAM);
218
219
220 hcd = new HColumnDescriptor(TEST_FAM);
221 HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd);
222 loadFlushAndCompact(otherRegion, TEST_FAM);
223
224
225 List<Path> files = getAllFiles(fs, archiveDir);
226 if (files == null) {
227 FSUtils.logFileSystemState(fs, archiveDir, LOG);
228 throw new RuntimeException("Didn't load archive any files!");
229 }
230
231
232 int initialCountForPrimary = 0;
233 int initialCountForOtherTable = 0;
234 for (Path file : files) {
235 String tableName = file.getParent().getParent().getParent().getName();
236
237 if (tableName.equals(otherTable)) initialCountForOtherTable++;
238 else if (tableName.equals(STRING_TABLE_NAME)) initialCountForPrimary++;
239 }
240
241 assertTrue("Didn't archive files for:" + STRING_TABLE_NAME, initialCountForPrimary > 0);
242 assertTrue("Didn't archive files for:" + otherTable, initialCountForOtherTable > 0);
243
244
245
246 CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size() + 3);
247
248 cleaner.start();
249
250 finished.await();
251
252 stop.stop("");
253
254
255 List<Path> archivedFiles = getAllFiles(fs, archiveDir);
256 int archivedForPrimary = 0;
257 for(Path file: archivedFiles) {
258 String tableName = file.getParent().getParent().getParent().getName();
259
260 assertFalse("Have a file from the non-archived table: " + file, tableName.equals(otherTable));
261 if (tableName.equals(STRING_TABLE_NAME)) archivedForPrimary++;
262 }
263
264 assertEquals("Not all archived files for the primary table were retained.", initialCountForPrimary,
265 archivedForPrimary);
266
267
268 assertTrue("Archive directory was deleted via archiver", fs.exists(archiveDir));
269 }
270
271
272 private void createArchiveDirectory() throws IOException {
273
274 FileSystem fs = UTIL.getTestFileSystem();
275 Path archiveDir = getArchiveDir();
276 fs.mkdirs(archiveDir);
277 }
278
279 private Path getArchiveDir() throws IOException {
280 return new Path(UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY);
281 }
282
283 private Path getTableDir(String tableName) throws IOException {
284 Path testDataDir = UTIL.getDataTestDir();
285 FSUtils.setRootDir(UTIL.getConfiguration(), testDataDir);
286 return new Path(testDataDir, tableName);
287 }
288
289 private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir,
290 Stoppable stop) {
291 conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
292 LongTermArchivingHFileCleaner.class.getCanonicalName());
293 return new HFileCleaner(1000, stop, conf, fs, archiveDir);
294 }
295
296
297
298
299
300
301
302
303
304 private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner)
305 throws IOException, KeeperException {
306
307 LOG.debug("----Starting archiving for table:" + tableName);
308 archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName));
309 assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(tableName));
310
311
312 List<BaseHFileCleanerDelegate> cleaners = cleaner.getDelegatesForTesting();
313 LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
314 while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) {
315
316 }
317 return cleaners;
318 }
319
320
321
322
323
324
325
326 private CountDownLatch setupCleanerWatching(LongTermArchivingHFileCleaner cleaner,
327 List<BaseHFileCleanerDelegate> cleaners, final int expected) {
328
329 BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner);
330 final int[] counter = new int[] { 0 };
331 final CountDownLatch finished = new CountDownLatch(1);
332 Mockito.doAnswer(new Answer<Iterable<FileStatus>>() {
333
334 @Override
335 public Iterable<FileStatus> answer(InvocationOnMock invocation) throws Throwable {
336 counter[0]++;
337 LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to getDeletableFiles for files: "
338 + invocation.getArguments()[0]);
339
340 @SuppressWarnings("unchecked")
341 Iterable<FileStatus> ret = (Iterable<FileStatus>) invocation.callRealMethod();
342 if (counter[0] >= expected) finished.countDown();
343 return ret;
344 }
345 }).when(delegateSpy).getDeletableFiles(Mockito.anyListOf(FileStatus.class));
346 cleaners.set(0, delegateSpy);
347
348 return finished;
349 }
350
351
352
353
354
355
356 private List<Path> getAllFiles(FileSystem fs, Path dir) throws IOException {
357 FileStatus[] files = FSUtils.listStatus(fs, dir, null);
358 if (files == null) {
359 LOG.warn("No files under:" + dir);
360 return null;
361 }
362
363 List<Path> allFiles = new ArrayList<Path>();
364 for (FileStatus file : files) {
365 if (file.isDir()) {
366 List<Path> subFiles = getAllFiles(fs, file.getPath());
367 if (subFiles != null) allFiles.addAll(subFiles);
368 continue;
369 }
370 allFiles.add(file.getPath());
371 }
372 return allFiles;
373 }
374
375 private void loadFlushAndCompact(HRegion region, byte[] family) throws IOException {
376
377 createHFileInRegion(region, family);
378 createHFileInRegion(region, family);
379
380 Store s = region.getStore(family);
381 int count = s.getStorefilesCount();
382 assertTrue("Don't have the expected store files, wanted >= 2 store files, but was:" + count,
383 count >= 2);
384
385
386 LOG.debug("Compacting stores");
387 region.compactStores(true);
388 }
389
390
391
392
393
394
395
396 private void createHFileInRegion(HRegion region, byte[] columnFamily) throws IOException {
397
398 Put p = new Put(Bytes.toBytes("row"));
399 p.add(columnFamily, Bytes.toBytes("Qual"), Bytes.toBytes("v1"));
400 region.put(p);
401
402 region.flushcache();
403 }
404
405
406
407
408 private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop)
409 throws InterruptedException {
410
411 cleaner.start();
412
413 finished.await();
414
415 stop.stop("");
416 }
417 }