1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
26
27 import java.io.FileNotFoundException;
28 import java.io.IOException;
29 import java.lang.reflect.Method;
30 import java.security.PrivilegedExceptionAction;
31 import java.util.ArrayList;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.NavigableSet;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.concurrent.atomic.AtomicLong;
41
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.apache.commons.logging.impl.Log4JLogger;
45 import org.apache.hadoop.hbase.TableName;
46 import org.apache.log4j.Level;
47 import org.apache.hadoop.hdfs.server.datanode.DataNode;
48 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
49 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
50 import org.apache.hadoop.conf.Configuration;
51 import org.apache.hadoop.fs.FSDataInputStream;
52 import org.apache.hadoop.fs.FSDataOutputStream;
53 import org.apache.hadoop.fs.FileStatus;
54 import org.apache.hadoop.fs.FileSystem;
55 import org.apache.hadoop.fs.FileUtil;
56 import org.apache.hadoop.fs.Path;
57 import org.apache.hadoop.hbase.HBaseConfiguration;
58 import org.apache.hadoop.hbase.HBaseTestingUtility;
59 import org.apache.hadoop.hbase.HColumnDescriptor;
60 import org.apache.hadoop.hbase.HConstants;
61 import org.apache.hadoop.hbase.HRegionInfo;
62 import org.apache.hadoop.hbase.HTableDescriptor;
63 import org.apache.hadoop.hbase.KeyValue;
64 import org.apache.hadoop.hbase.LargeTests;
65 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
66 import org.apache.hadoop.hbase.regionserver.HRegion;
67 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
68 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
69 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.CorruptedLogFileException;
70 import org.apache.hadoop.hbase.security.User;
71 import org.apache.hadoop.hbase.util.Bytes;
72 import org.apache.hadoop.hbase.util.CancelableProgressable;
73 import org.apache.hadoop.hbase.util.FSUtils;
74 import org.apache.hadoop.hbase.util.Threads;
75 import org.apache.hadoop.hdfs.DFSTestUtil;
76 import org.apache.hadoop.hdfs.DistributedFileSystem;
77 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
78 import org.apache.hadoop.ipc.RemoteException;
79 import org.junit.After;
80 import org.junit.AfterClass;
81 import org.junit.Assert;
82 import org.junit.Before;
83 import org.junit.BeforeClass;
84 import org.junit.Ignore;
85 import org.junit.Test;
86 import org.junit.experimental.categories.Category;
87 import org.mockito.Mockito;
88 import org.mockito.invocation.InvocationOnMock;
89 import org.mockito.stubbing.Answer;
90
91 import com.google.common.base.Joiner;
92 import com.google.common.collect.ImmutableList;
93
94
95
96
97 @Category(LargeTests.class)
98 public class TestHLogSplit {
99 {
100 ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
101 ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
102 ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
103 }
104 private final static Log LOG = LogFactory.getLog(TestHLogSplit.class);
105
106 private Configuration conf;
107 private FileSystem fs;
108
109 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
110
111 private static final Path HBASEDIR = new Path("/hbase");
112 private static final Path HLOGDIR = new Path(HBASEDIR, "hlog");
113 private static final Path OLDLOGDIR = new Path(HBASEDIR, "hlog.old");
114 private static final Path CORRUPTDIR = new Path(HBASEDIR, HConstants.CORRUPT_DIR_NAME);
115
116 private static final int NUM_WRITERS = 10;
117 private static final int ENTRIES = 10;
118
119 private static final TableName TABLE_NAME =
120 TableName.valueOf("t1");
121 private static final byte[] FAMILY = "f1".getBytes();
122 private static final byte[] QUALIFIER = "q1".getBytes();
123 private static final byte[] VALUE = "v1".getBytes();
124 private static final String HLOG_FILE_PREFIX = "hlog.dat.";
125 private static List<String> REGIONS = new ArrayList<String>();
126 private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
127 private static final Path TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
128 private static String ROBBER;
129 private static String ZOMBIE;
130 private static String [] GROUP = new String [] {"supergroup"};
131 private RecoveryMode mode;
132
133 static enum Corruptions {
134 INSERT_GARBAGE_ON_FIRST_LINE,
135 INSERT_GARBAGE_IN_THE_MIDDLE,
136 APPEND_GARBAGE,
137 TRUNCATE,
138 TRUNCATE_TRAILER
139 }
140
141 @BeforeClass
142 public static void setUpBeforeClass() throws Exception {
143 FSUtils.setRootDir(TEST_UTIL.getConfiguration(), HBASEDIR);
144 TEST_UTIL.getConfiguration().setClass("hbase.regionserver.hlog.writer.impl",
145 InstrumentedSequenceFileLogWriter.class, HLog.Writer.class);
146 TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
147 TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
148
149 System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
150
151 Map<String, String []> u2g_map = new HashMap<String, String []>(2);
152 ROBBER = User.getCurrent().getName() + "-robber";
153 ZOMBIE = User.getCurrent().getName() + "-zombie";
154 u2g_map.put(ROBBER, GROUP);
155 u2g_map.put(ZOMBIE, GROUP);
156 DFSTestUtil.updateConfWithFakeGroupMapping(TEST_UTIL.getConfiguration(), u2g_map);
157 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
158 TEST_UTIL.startMiniDFSCluster(2);
159 }
160
161 @AfterClass
162 public static void tearDownAfterClass() throws Exception {
163 TEST_UTIL.shutdownMiniDFSCluster();
164 }
165
166 @Before
167 public void setUp() throws Exception {
168 flushToConsole("Cleaning up cluster for new test\n"
169 + "--------------------------");
170 conf = TEST_UTIL.getConfiguration();
171 fs = TEST_UTIL.getDFSCluster().getFileSystem();
172 FileStatus[] entries = fs.listStatus(new Path("/"));
173 flushToConsole("Num entries in /:" + entries.length);
174 for (FileStatus dir : entries){
175 assertTrue("Deleting " + dir.getPath(), fs.delete(dir.getPath(), true));
176 }
177
178 fs.mkdirs(HLOGDIR);
179 REGIONS.clear();
180 Collections.addAll(REGIONS, "bbb", "ccc");
181 InstrumentedSequenceFileLogWriter.activateFailure = false;
182 this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
183 RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
184 }
185
186 @After
187 public void tearDown() throws Exception {
188 }
189
190
191
192
193
194
195
196 @Test (timeout=300000)
197 public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
198 final AtomicLong counter = new AtomicLong(0);
199 AtomicBoolean stop = new AtomicBoolean(false);
200
201 final String region = REGIONS.get(0);
202 Thread zombie = new ZombieLastLogWriterRegionServer(this.conf, counter, stop, region);
203 try {
204 long startCount = counter.get();
205 zombie.start();
206
207 while (startCount == counter.get()) Threads.sleep(1);
208
209 Threads.sleep(1000);
210 final Configuration conf2 = HBaseConfiguration.create(this.conf);
211 final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
212 int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
213 @Override
214 public Integer run() throws Exception {
215 FileSystem fs = FileSystem.get(conf2);
216 int expectedFiles = fs.listStatus(HLOGDIR).length;
217 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf2);
218 Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
219 assertEquals(expectedFiles, logfiles.length);
220 int count = 0;
221 for (Path logfile: logfiles) {
222 count += countHLog(logfile, fs, conf2);
223 }
224 return count;
225 }
226 });
227 LOG.info("zombie=" + counter.get() + ", robber=" + count);
228 assertTrue("The log file could have at most 1 extra log entry, but can't have less. Zombie could write " +
229 counter.get() + " and logfile had only " + count,
230 counter.get() == count || counter.get() + 1 == count);
231 } finally {
232 stop.set(true);
233 zombie.interrupt();
234 Threads.threadDumpingIsAlive(zombie);
235 }
236 }
237
238
239
240
241
242
243 static class ZombieLastLogWriterRegionServer extends Thread {
244 final AtomicLong editsCount;
245 final AtomicBoolean stop;
246
247
248
249
250 final String region;
251 final Configuration conf;
252 final User user;
253
254 public ZombieLastLogWriterRegionServer(final Configuration conf, AtomicLong counter, AtomicBoolean stop,
255 final String region)
256 throws IOException, InterruptedException {
257 super("ZombieLastLogWriterRegionServer");
258 setDaemon(true);
259 this.stop = stop;
260 this.editsCount = counter;
261 this.region = region;
262 this.conf = HBaseConfiguration.create(conf);
263 this.user = User.createUserForTesting(this.conf, ZOMBIE, GROUP);
264 }
265
266 @Override
267 public void run() {
268 try {
269 doWriting();
270 } catch (IOException e) {
271 LOG.warn(getName() + " Writer exiting " + e);
272 } catch (InterruptedException e) {
273 LOG.warn(getName() + " Writer exiting " + e);
274 }
275 }
276
277 private void doWriting() throws IOException, InterruptedException {
278 this.user.runAs(new PrivilegedExceptionAction<Object>() {
279 @Override
280 public Object run() throws Exception {
281
282 int walToKeepOpen = 2;
283
284 final int numOfWriters = walToKeepOpen + 1;
285
286
287 HLog.Writer[] writers = null;
288 try {
289 DistributedFileSystem dfs = (DistributedFileSystem)FileSystem.get(conf);
290 writers = generateHLogs(dfs, numOfWriters, ENTRIES, walToKeepOpen);
291 } catch (IOException e1) {
292 throw new RuntimeException("Failed", e1);
293 }
294
295 editsCount.addAndGet(numOfWriters * NUM_WRITERS);
296
297 HLog.Writer writer = writers[walToKeepOpen];
298 loop(writer);
299 return null;
300 }
301 });
302 }
303
304 private void loop(final HLog.Writer writer) {
305 byte [] regionBytes = Bytes.toBytes(this.region);
306 while (true) {
307 try {
308 long seq = appendEntry(writer, TABLE_NAME, regionBytes, ("r" + editsCount.get()).getBytes(),
309 regionBytes, QUALIFIER, VALUE, 0);
310 long count = editsCount.incrementAndGet();
311 flushToConsole(getName() + " sync count=" + count + ", seq=" + seq);
312 try {
313 Thread.sleep(1);
314 } catch (InterruptedException e) {
315
316 }
317 } catch (IOException ex) {
318 flushToConsole(getName() + " ex " + ex.toString());
319 if (ex instanceof RemoteException) {
320 flushToConsole("Juliet: got RemoteException " + ex.getMessage() +
321 " while writing " + (editsCount.get() + 1));
322 } else {
323 flushToConsole(getName() + " failed to write....at " + editsCount.get());
324 assertTrue("Failed to write " + editsCount.get(), false);
325 }
326 break;
327 } catch (Throwable t) {
328 flushToConsole(getName() + " HOW? " + t);
329 t.printStackTrace();
330 break;
331 }
332 }
333 flushToConsole(getName() + " Writer exiting");
334 }
335 }
336
337
338
339
340
341 @Test (timeout=300000)
342 public void testRecoveredEditsPathForMeta() throws IOException {
343 FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
344 byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
345 Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
346 Path regiondir = new Path(tdir,
347 HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
348 fs.mkdirs(regiondir);
349 long now = System.currentTimeMillis();
350 HLog.Entry entry =
351 new HLog.Entry(new HLogKey(encoded,
352 TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
353 new WALEdit());
354 Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true);
355 String parentOfParent = p.getParent().getParent().getName();
356 assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
357 }
358
359
360
361
362
363 @Test (timeout=300000)
364 public void testOldRecoveredEditsFileSidelined() throws IOException {
365 FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
366 byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
367 Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
368 Path regiondir = new Path(tdir,
369 HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
370 fs.mkdirs(regiondir);
371 long now = System.currentTimeMillis();
372 HLog.Entry entry =
373 new HLog.Entry(new HLogKey(encoded,
374 TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
375 new WALEdit());
376 Path parent = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
377 assertEquals(parent.getName(), HConstants.RECOVERED_EDITS_DIR);
378 fs.createNewFile(parent);
379
380 Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true);
381 String parentOfParent = p.getParent().getParent().getName();
382 assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
383 HLogFactory.createRecoveredEditsWriter(fs, p, conf).close();
384 }
385
386 @Test (timeout=300000)
387 public void testSplitPreservesEdits() throws IOException{
388 final String REGION = "region__1";
389 REGIONS.removeAll(REGIONS);
390 REGIONS.add(REGION);
391
392 generateHLogs(1, 10, -1);
393 fs.initialize(fs.getUri(), conf);
394 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
395 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
396 Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
397 assertEquals(1, splitLog.length);
398
399 assertEquals("edits differ after split", true, logsAreEqual(originalLog, splitLog[0]));
400 }
401
402
403 @Test (timeout=300000)
404 public void testEmptyLogFiles() throws IOException {
405
406 injectEmptyFile(".empty", true);
407 generateHLogs(Integer.MAX_VALUE);
408 injectEmptyFile("empty", true);
409
410
411
412 fs.initialize(fs.getUri(), conf);
413
414 int expectedFiles = fs.listStatus(HLOGDIR).length - 2;
415 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
416 for (String region : REGIONS) {
417 Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
418 assertEquals(expectedFiles, logfiles.length);
419 int count = 0;
420 for (Path logfile: logfiles) {
421 count += countHLog(logfile, fs, conf);
422 }
423 assertEquals(NUM_WRITERS * ENTRIES, count);
424 }
425 }
426
427
428 @Test (timeout=300000)
429 public void testEmptyOpenLogFiles() throws IOException {
430 injectEmptyFile(".empty", false);
431 generateHLogs(Integer.MAX_VALUE);
432 injectEmptyFile("empty", false);
433
434
435
436 fs.initialize(fs.getUri(), conf);
437
438 int expectedFiles = fs.listStatus(HLOGDIR).length - 2 ;
439 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
440 for (String region : REGIONS) {
441 Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
442 assertEquals(expectedFiles, logfiles.length);
443 int count = 0;
444 for (Path logfile: logfiles) {
445 count += countHLog(logfile, fs, conf);
446 }
447 assertEquals(NUM_WRITERS * ENTRIES, count);
448 }
449 }
450
451 @Test (timeout=300000)
452 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
453
454 generateHLogs(5);
455
456 fs.initialize(fs.getUri(), conf);
457
458 int expectedFiles = fs.listStatus(HLOGDIR).length;
459 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
460 for (String region : REGIONS) {
461 Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
462 assertEquals(expectedFiles, logfiles.length);
463 int count = 0;
464 for (Path logfile: logfiles) {
465 count += countHLog(logfile, fs, conf);
466 }
467 assertEquals(NUM_WRITERS * ENTRIES, count);
468 }
469 }
470
471
472 @Test (timeout=300000)
473 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
474 conf.setBoolean(HBASE_SKIP_ERRORS, true);
475 generateHLogs(Integer.MAX_VALUE);
476 corruptHLog(new Path(HLOGDIR, HLOG_FILE_PREFIX + "5"),
477 Corruptions.APPEND_GARBAGE, true, fs);
478 fs.initialize(fs.getUri(), conf);
479
480 int expectedFiles = fs.listStatus(HLOGDIR).length;
481 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
482 for (String region : REGIONS) {
483 Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
484 assertEquals(expectedFiles, logfiles.length);
485 int count = 0;
486 for (Path logfile: logfiles) {
487 count += countHLog(logfile, fs, conf);
488 }
489 assertEquals(NUM_WRITERS * ENTRIES, count);
490 }
491 }
492
493 @Test (timeout=300000)
494 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
495 conf.setBoolean(HBASE_SKIP_ERRORS, true);
496 generateHLogs(Integer.MAX_VALUE);
497 corruptHLog(new Path(HLOGDIR, HLOG_FILE_PREFIX + "5"),
498 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
499 fs.initialize(fs.getUri(), conf);
500
501 int expectedFiles = fs.listStatus(HLOGDIR).length - 1;
502 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
503 for (String region : REGIONS) {
504 Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
505 assertEquals(expectedFiles, logfiles.length);
506 int count = 0;
507 for (Path logfile: logfiles) {
508 count += countHLog(logfile, fs, conf);
509 }
510 assertEquals((NUM_WRITERS - 1) * ENTRIES, count);
511 }
512 }
513
514 @Test (timeout=300000)
515 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
516 conf.setBoolean(HBASE_SKIP_ERRORS, true);
517 generateHLogs(Integer.MAX_VALUE);
518 corruptHLog(new Path(HLOGDIR, HLOG_FILE_PREFIX + "5"),
519 Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
520 fs.initialize(fs.getUri(), conf);
521
522 int expectedFiles = fs.listStatus(HLOGDIR).length;
523 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
524 for (String region : REGIONS) {
525 Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
526 assertEquals(expectedFiles, logfiles.length);
527 int count = 0;
528 for (Path logfile: logfiles) {
529 count += countHLog(logfile, fs, conf);
530 }
531
532
533
534 int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
535 int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
536 assertTrue("The file up to the corrupted area hasn't been parsed",
537 goodEntries + firstHalfEntries <= count);
538 }
539 }
540
541 @Test (timeout=300000)
542 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
543 conf.setBoolean(HBASE_SKIP_ERRORS, true);
544 Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
545 Reader.class);
546 InstrumentedSequenceFileLogWriter.activateFailure = false;
547 HLogFactory.resetLogReaderClass();
548
549 try {
550 Path c1 = new Path(HLOGDIR, HLOG_FILE_PREFIX + "0");
551 conf.setClass("hbase.regionserver.hlog.reader.impl",
552 FaultySequenceFileLogReader.class, HLog.Reader.class);
553 for (FaultySequenceFileLogReader.FailureType failureType : FaultySequenceFileLogReader.FailureType.values()) {
554 conf.set("faultysequencefilelogreader.failuretype", failureType.name());
555 generateHLogs(1, ENTRIES, -1);
556 fs.initialize(fs.getUri(), conf);
557 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
558 FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
559 assertEquals("expected a different file", c1.getName(), archivedLogs[0]
560 .getPath().getName());
561 assertEquals(archivedLogs.length, 1);
562 fs.delete(new Path(OLDLOGDIR, HLOG_FILE_PREFIX + "0"), false);
563 }
564 } finally {
565 conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
566 Reader.class);
567 HLogFactory.resetLogReaderClass();
568 }
569 }
570
571 @Test (timeout=300000, expected = IOException.class)
572 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
573 throws IOException {
574 conf.setBoolean(HBASE_SKIP_ERRORS, false);
575 Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
576 Reader.class);
577 InstrumentedSequenceFileLogWriter.activateFailure = false;
578 HLogFactory.resetLogReaderClass();
579
580 try {
581 conf.setClass("hbase.regionserver.hlog.reader.impl",
582 FaultySequenceFileLogReader.class, HLog.Reader.class);
583 conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
584 generateHLogs(Integer.MAX_VALUE);
585 fs.initialize(fs.getUri(), conf);
586 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
587 } finally {
588 conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
589 Reader.class);
590 HLogFactory.resetLogReaderClass();
591 }
592 }
593
594 @Test (timeout=300000)
595 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
596 throws IOException {
597 conf.setBoolean(HBASE_SKIP_ERRORS, false);
598 Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
599 Reader.class);
600 InstrumentedSequenceFileLogWriter.activateFailure = false;
601 HLogFactory.resetLogReaderClass();
602
603 try {
604 conf.setClass("hbase.regionserver.hlog.reader.impl",
605 FaultySequenceFileLogReader.class, HLog.Reader.class);
606 conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
607 generateHLogs(-1);
608 fs.initialize(fs.getUri(), conf);
609 try {
610 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
611 } catch (IOException e) {
612 assertEquals(
613 "if skip.errors is false all files should remain in place",
614 NUM_WRITERS, fs.listStatus(HLOGDIR).length);
615 }
616 } finally {
617 conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
618 Reader.class);
619 HLogFactory.resetLogReaderClass();
620 }
621 }
622
623 @Test (timeout=300000)
624 public void testEOFisIgnored() throws IOException {
625 conf.setBoolean(HBASE_SKIP_ERRORS, false);
626
627 final String REGION = "region__1";
628 REGIONS.removeAll(REGIONS);
629 REGIONS.add(REGION);
630
631 int entryCount = 10;
632 Path c1 = new Path(HLOGDIR, HLOG_FILE_PREFIX + "0");
633 generateHLogs(1, entryCount, -1);
634 corruptHLog(c1, Corruptions.TRUNCATE, true, fs);
635
636 fs.initialize(fs.getUri(), conf);
637 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
638
639 Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
640 assertEquals(1, splitLog.length);
641
642 int actualCount = 0;
643 HLog.Reader in = HLogFactory.createReader(fs, splitLog[0], conf);
644 @SuppressWarnings("unused")
645 HLog.Entry entry;
646 while ((entry = in.next()) != null) ++actualCount;
647 assertEquals(entryCount-1, actualCount);
648
649
650 FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
651 assertEquals(archivedLogs.length, 0);
652 }
653
654 @Test (timeout=300000)
655 public void testCorruptWALTrailer() throws IOException {
656 conf.setBoolean(HBASE_SKIP_ERRORS, false);
657
658 final String REGION = "region__1";
659 REGIONS.removeAll(REGIONS);
660 REGIONS.add(REGION);
661
662 int entryCount = 10;
663 Path c1 = new Path(HLOGDIR, HLOG_FILE_PREFIX + "0");
664 generateHLogs(1, entryCount, -1);
665 corruptHLog(c1, Corruptions.TRUNCATE_TRAILER, true, fs);
666
667 fs.initialize(fs.getUri(), conf);
668 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
669
670 Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
671 assertEquals(1, splitLog.length);
672
673 int actualCount = 0;
674 HLog.Reader in = HLogFactory.createReader(fs, splitLog[0], conf);
675 @SuppressWarnings("unused")
676 HLog.Entry entry;
677 while ((entry = in.next()) != null) ++actualCount;
678 assertEquals(entryCount, actualCount);
679
680
681 FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
682 assertEquals(archivedLogs.length, 0);
683 }
684
685 @Test (timeout=300000)
686 public void testLogsGetArchivedAfterSplit() throws IOException {
687 conf.setBoolean(HBASE_SKIP_ERRORS, false);
688 generateHLogs(-1);
689 fs.initialize(fs.getUri(), conf);
690 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
691 FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
692 assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
693 }
694
695 @Test (timeout=300000)
696 public void testSplit() throws IOException {
697 generateHLogs(-1);
698 fs.initialize(fs.getUri(), conf);
699
700 int expectedFiles = fs.listStatus(HLOGDIR).length;
701 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
702 for (String region : REGIONS) {
703 Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
704 assertEquals(expectedFiles, logfiles.length);
705 int count = 0;
706 for (Path logfile: logfiles) {
707 count += countHLog(logfile, fs, conf);
708 }
709 assertEquals(NUM_WRITERS * ENTRIES, count);
710 }
711 }
712
713 @Test (timeout=300000)
714 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
715 throws IOException {
716 generateHLogs(-1);
717 fs.initialize(fs.getUri(), conf);
718 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
719 FileStatus [] statuses = null;
720 try {
721 statuses = fs.listStatus(HLOGDIR);
722 if (statuses != null) {
723 Assert.fail("Files left in log dir: " +
724 Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
725 }
726 } catch (FileNotFoundException e) {
727
728 }
729 }
730
731 @Test(timeout=300000, expected = IOException.class)
732 public void testSplitWillFailIfWritingToRegionFails() throws Exception {
733
734 HLog.Writer [] writer = generateHLogs(4);
735
736 fs.initialize(fs.getUri(), conf);
737
738 String region = "break";
739 Path regiondir = new Path(TABLEDIR, region);
740 fs.mkdirs(regiondir);
741
742 InstrumentedSequenceFileLogWriter.activateFailure = false;
743 appendEntry(writer[4], TABLE_NAME, Bytes.toBytes(region),
744 ("r" + 999).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
745 writer[4].close();
746
747 try {
748 InstrumentedSequenceFileLogWriter.activateFailure = true;
749 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
750 } catch (IOException e) {
751 assertTrue(e.getMessage().
752 contains("This exception is instrumented and should only be thrown for testing"));
753 throw e;
754 } finally {
755 InstrumentedSequenceFileLogWriter.activateFailure = false;
756 }
757 }
758
759
760
761
762
763
764 public void testSplittingLargeNumberOfRegionsConsistency() throws IOException {
765
766 REGIONS.removeAll(REGIONS);
767 for (int i=0; i<100; i++) {
768 REGIONS.add("region__"+i);
769 }
770
771 generateHLogs(1, 100, -1);
772 fs.initialize(fs.getUri(), conf);
773
774 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
775 fs.rename(OLDLOGDIR, HLOGDIR);
776 Path firstSplitPath = new Path(HBASEDIR, TABLE_NAME+ ".first");
777 Path splitPath = new Path(HBASEDIR, TABLE_NAME.getNameAsString());
778 fs.rename(splitPath,
779 firstSplitPath);
780
781 fs.initialize(fs.getUri(), conf);
782 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
783 assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath));
784 }
785
786 @Test (timeout=300000)
787 public void testSplitDeletedRegion() throws IOException {
788 REGIONS.removeAll(REGIONS);
789 String region = "region_that_splits";
790 REGIONS.add(region);
791
792 generateHLogs(1);
793 fs.initialize(fs.getUri(), conf);
794
795 Path regiondir = new Path(TABLEDIR, region);
796 fs.delete(regiondir, true);
797 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
798 assertFalse(fs.exists(regiondir));
799 }
800
801 @Test (timeout=300000)
802 public void testIOEOnOutputThread() throws Exception {
803 conf.setBoolean(HBASE_SKIP_ERRORS, false);
804
805 generateHLogs(-1);
806 fs.initialize(fs.getUri(), conf);
807 FileStatus[] logfiles = fs.listStatus(HLOGDIR);
808 assertTrue("There should be some log file",
809 logfiles != null && logfiles.length > 0);
810
811 HLogSplitter logSplitter = new HLogSplitter(
812 conf, HBASEDIR, fs, null, null, this.mode) {
813 protected HLog.Writer createWriter(FileSystem fs,
814 Path logfile, Configuration conf) throws IOException {
815 HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
816 Mockito.doThrow(new IOException("Injected")).when(
817 mockWriter).append(Mockito.<HLog.Entry>any());
818 return mockWriter;
819 }
820 };
821
822
823 final AtomicBoolean stop = new AtomicBoolean(false);
824 final Thread someOldThread = new Thread("Some-old-thread") {
825 @Override
826 public void run() {
827 while(!stop.get()) Threads.sleep(10);
828 }
829 };
830 someOldThread.setDaemon(true);
831 someOldThread.start();
832 final Thread t = new Thread("Background-thread-dumper") {
833 public void run() {
834 try {
835 Threads.threadDumpingIsAlive(someOldThread);
836 } catch (InterruptedException e) {
837 e.printStackTrace();
838 }
839 }
840 };
841 t.setDaemon(true);
842 t.start();
843 try {
844 logSplitter.splitLogFile(logfiles[0], null);
845 fail("Didn't throw!");
846 } catch (IOException ioe) {
847 assertTrue(ioe.toString().contains("Injected"));
848 } finally {
849
850 stop.set(true);
851 }
852 }
853
854
855 @Test (timeout=300000)
856 public void testMovedHLogDuringRecovery() throws Exception {
857 generateHLogs(-1);
858
859 fs.initialize(fs.getUri(), conf);
860
861
862
863 FileSystem spiedFs = Mockito.spy(fs);
864
865
866 Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
867 when(spiedFs).append(Mockito.<Path>any());
868
869 try {
870 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, conf);
871 assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
872 assertFalse(fs.exists(HLOGDIR));
873 } catch (IOException e) {
874 fail("There shouldn't be any exception but: " + e.toString());
875 }
876 }
877
878 @Test (timeout=300000)
879 public void testRetryOpenDuringRecovery() throws Exception {
880 generateHLogs(-1);
881
882 fs.initialize(fs.getUri(), conf);
883
884 FileSystem spiedFs = Mockito.spy(fs);
885
886
887
888
889
890
891
892
893
894 Mockito.doAnswer(new Answer<FSDataInputStream>() {
895 private final String[] errors = new String[] {
896 "Cannot obtain block length", "Could not obtain the last block",
897 "Blocklist for " + OLDLOGDIR + " has changed"};
898 private int count = 0;
899
900 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
901 if (count < 3) {
902 throw new IOException(errors[count++]);
903 }
904 return (FSDataInputStream)invocation.callRealMethod();
905 }
906 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
907
908 try {
909 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, conf);
910 assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
911 assertFalse(fs.exists(HLOGDIR));
912 } catch (IOException e) {
913 fail("There shouldn't be any exception but: " + e.toString());
914 }
915 }
916
917 @Test (timeout=300000)
918 public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
919 generateHLogs(1, 10, -1);
920 FileStatus logfile = fs.listStatus(HLOGDIR)[0];
921 fs.initialize(fs.getUri(), conf);
922
923 final AtomicInteger count = new AtomicInteger();
924
925 CancelableProgressable localReporter
926 = new CancelableProgressable() {
927 @Override
928 public boolean progress() {
929 count.getAndIncrement();
930 return false;
931 }
932 };
933
934 FileSystem spiedFs = Mockito.spy(fs);
935 Mockito.doAnswer(new Answer<FSDataInputStream>() {
936 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
937 Thread.sleep(1500);
938 return (FSDataInputStream)invocation.callRealMethod();
939 }
940 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
941
942 try {
943 conf.setInt("hbase.splitlog.report.period", 1000);
944 boolean ret = HLogSplitter.splitLogFile(
945 HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode);
946 assertFalse("Log splitting should failed", ret);
947 assertTrue(count.get() > 0);
948 } catch (IOException e) {
949 fail("There shouldn't be any exception but: " + e.toString());
950 } finally {
951
952 conf.setInt("hbase.splitlog.report.period", 59000);
953 }
954 }
955
956
957
958
959
960 @Test (timeout=300000)
961 public void testThreading() throws Exception {
962 doTestThreading(20000, 128*1024*1024, 0);
963 }
964
965
966
967
968
969 @Test (timeout=300000)
970 public void testThreadingSlowWriterSmallBuffer() throws Exception {
971 doTestThreading(200, 1024, 50);
972 }
973
974
975
976
977
978
979
980
981
982
983
984
985
986 private void doTestThreading(final int numFakeEdits,
987 final int bufferSize,
988 final int writerSlowness) throws Exception {
989
990 Configuration localConf = new Configuration(conf);
991 localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
992
993
994 Path logPath = new Path(HLOGDIR, HLOG_FILE_PREFIX + ".fake");
995 FSDataOutputStream out = fs.create(logPath);
996 out.close();
997
998
999 final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
1000 makeRegionDirs(fs, regions);
1001
1002
1003 HLogSplitter logSplitter = new HLogSplitter(
1004 localConf, HBASEDIR, fs, null, null, this.mode) {
1005
1006
1007 protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
1008 throws IOException {
1009 HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
1010 Mockito.doAnswer(new Answer<Void>() {
1011 int expectedIndex = 0;
1012
1013 @Override
1014 public Void answer(InvocationOnMock invocation) {
1015 if (writerSlowness > 0) {
1016 try {
1017 Thread.sleep(writerSlowness);
1018 } catch (InterruptedException ie) {
1019 Thread.currentThread().interrupt();
1020 }
1021 }
1022 HLog.Entry entry = (Entry) invocation.getArguments()[0];
1023 WALEdit edit = entry.getEdit();
1024 List<KeyValue> keyValues = edit.getKeyValues();
1025 assertEquals(1, keyValues.size());
1026 KeyValue kv = keyValues.get(0);
1027
1028
1029 assertEquals(expectedIndex, Bytes.toInt(kv.getRow()));
1030 expectedIndex++;
1031 return null;
1032 }
1033 }).when(mockWriter).append(Mockito.<HLog.Entry>any());
1034 return mockWriter;
1035 }
1036
1037
1038 protected Reader getReader(FileSystem fs, Path curLogFile,
1039 Configuration conf, CancelableProgressable reporter) throws IOException {
1040 Reader mockReader = Mockito.mock(Reader.class);
1041 Mockito.doAnswer(new Answer<HLog.Entry>() {
1042 int index = 0;
1043
1044 @Override
1045 public HLog.Entry answer(InvocationOnMock invocation) throws Throwable {
1046 if (index >= numFakeEdits) return null;
1047
1048
1049 int regionIdx = index % regions.size();
1050 byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
1051
1052 HLog.Entry ret = createTestEntry(TABLE_NAME, region,
1053 Bytes.toBytes((int)(index / regions.size())),
1054 FAMILY, QUALIFIER, VALUE, index);
1055 index++;
1056 return ret;
1057 }
1058 }).when(mockReader).next();
1059 return mockReader;
1060 }
1061 };
1062
1063 logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
1064
1065
1066 Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
1067 for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
1068 LOG.info("Got " + entry.getValue() + " output edits for region " +
1069 Bytes.toString(entry.getKey()));
1070 assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
1071 }
1072 assertEquals(regions.size(), outputCounts.size());
1073 }
1074
1075
1076
1077
1078 @Test (timeout=300000)
1079 @Ignore("Need HADOOP-6886, HADOOP-6840, & HDFS-617 for this. HDFS 0.20.205.1+ should have this")
1080 public void testLogRollAfterSplitStart() throws IOException {
1081 HLog log = null;
1082 String logName = "testLogRollAfterSplitStart";
1083 Path thisTestsDir = new Path(HBASEDIR, logName);
1084
1085 try {
1086
1087 TableName tableName =
1088 TableName.valueOf(this.getClass().getName());
1089 HRegionInfo regioninfo = new HRegionInfo(tableName,
1090 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
1091 log = HLogFactory.createHLog(fs, HBASEDIR, logName, conf);
1092 final AtomicLong sequenceId = new AtomicLong(1);
1093
1094 final int total = 20;
1095 for (int i = 0; i < total; i++) {
1096 WALEdit kvs = new WALEdit();
1097 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
1098 HTableDescriptor htd = new HTableDescriptor(tableName);
1099 htd.addFamily(new HColumnDescriptor("column"));
1100 log.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
1101 }
1102
1103 log.sync();
1104 ((FSHLog) log).cleanupCurrentWriter(log.getFilenum());
1105
1106
1107
1108
1109
1110 Path rsSplitDir = new Path(thisTestsDir.getParent(),
1111 thisTestsDir.getName() + "-splitting");
1112 fs.rename(thisTestsDir, rsSplitDir);
1113 LOG.debug("Renamed region directory: " + rsSplitDir);
1114
1115
1116 HLogSplitter.split(HBASEDIR, rsSplitDir, OLDLOGDIR, fs, conf);
1117
1118
1119 try {
1120 log.rollWriter();
1121 Assert.fail("rollWriter() did not throw any exception.");
1122 } catch (IOException ioe) {
1123 if (ioe.getCause().getMessage().contains("FileNotFound")) {
1124 LOG.info("Got the expected exception: ", ioe.getCause());
1125 } else {
1126 Assert.fail("Unexpected exception: " + ioe);
1127 }
1128 }
1129 } finally {
1130 if (log != null) {
1131 log.close();
1132 }
1133 if (fs.exists(thisTestsDir)) {
1134 fs.delete(thisTestsDir, true);
1135 }
1136 }
1137 }
1138
1139
1140
1141
1142
1143
1144 class ZombieNewLogWriterRegionServer extends Thread {
1145 AtomicBoolean stop;
1146 CountDownLatch latch;
1147 public ZombieNewLogWriterRegionServer(CountDownLatch latch, AtomicBoolean stop) {
1148 super("ZombieNewLogWriterRegionServer");
1149 this.latch = latch;
1150 this.stop = stop;
1151 }
1152
1153 @Override
1154 public void run() {
1155 if (stop.get()) {
1156 return;
1157 }
1158 Path tableDir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1159 Path regionDir = new Path(tableDir, REGIONS.get(0));
1160 Path recoveredEdits = new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
1161 String region = "juliet";
1162 Path julietLog = new Path(HLOGDIR, HLOG_FILE_PREFIX + ".juliet");
1163 try {
1164
1165 while (!fs.exists(recoveredEdits) && !stop.get()) {
1166 LOG.info("Juliet: split not started, sleeping a bit...");
1167 Threads.sleep(10);
1168 }
1169
1170 fs.mkdirs(new Path(tableDir, region));
1171 HLog.Writer writer = HLogFactory.createWALWriter(fs,
1172 julietLog, conf);
1173 appendEntry(writer, TableName.valueOf("juliet"), ("juliet").getBytes(),
1174 ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0);
1175 writer.close();
1176 LOG.info("Juliet file creator: created file " + julietLog);
1177 latch.countDown();
1178 } catch (IOException e1) {
1179 LOG.error("Failed to create file " + julietLog, e1);
1180 assertTrue("Failed to create file " + julietLog, false);
1181 }
1182 }
1183 }
1184
1185 @Test (timeout=300000)
1186 public void testSplitLogFileWithOneRegion() throws IOException {
1187 LOG.info("testSplitLogFileWithOneRegion");
1188 final String REGION = "region__1";
1189 REGIONS.removeAll(REGIONS);
1190 REGIONS.add(REGION);
1191
1192 generateHLogs(1, 10, -1);
1193 fs.initialize(fs.getUri(), conf);
1194 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
1195
1196 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
1197 Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
1198 assertEquals(1, splitLog.length);
1199
1200 assertEquals(true, logsAreEqual(originalLog, splitLog[0]));
1201 }
1202
1203 @Test (timeout=300000)
1204 public void testSplitLogFileDeletedRegionDir() throws IOException {
1205 LOG.info("testSplitLogFileDeletedRegionDir");
1206 final String REGION = "region__1";
1207 REGIONS.removeAll(REGIONS);
1208 REGIONS.add(REGION);
1209
1210 generateHLogs(1, 10, -1);
1211 fs.initialize(fs.getUri(), conf);
1212
1213 Path regiondir = new Path(TABLEDIR, REGION);
1214 LOG.info("Region directory is" + regiondir);
1215 fs.delete(regiondir, true);
1216
1217 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
1218
1219 assertTrue(!fs.exists(regiondir));
1220 assertTrue(true);
1221 }
1222
1223 @Test (timeout=300000)
1224 public void testSplitLogFileEmpty() throws IOException {
1225 LOG.info("testSplitLogFileEmpty");
1226 injectEmptyFile(".empty", true);
1227
1228 fs.initialize(fs.getUri(), conf);
1229
1230 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
1231 Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1232 assertFalse(fs.exists(tdir));
1233
1234 assertEquals(0, countHLog(fs.listStatus(OLDLOGDIR)[0].getPath(), fs, conf));
1235 }
1236
1237 @Test (timeout=300000)
1238 public void testSplitLogFileMultipleRegions() throws IOException {
1239 LOG.info("testSplitLogFileMultipleRegions");
1240 generateHLogs(1, 10, -1);
1241 fs.initialize(fs.getUri(), conf);
1242
1243 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
1244 for (String region : REGIONS) {
1245 Path[] recovered = getLogForRegion(HBASEDIR, TABLE_NAME, region);
1246 assertEquals(1, recovered.length);
1247 assertEquals(10, countHLog(recovered[0], fs, conf));
1248 }
1249 }
1250
1251 @Test (timeout=300000)
1252 public void testSplitLogFileFirstLineCorruptionLog()
1253 throws IOException {
1254 conf.setBoolean(HBASE_SKIP_ERRORS, true);
1255 generateHLogs(1, 10, -1);
1256 FileStatus logfile = fs.listStatus(HLOGDIR)[0];
1257
1258 corruptHLog(logfile.getPath(),
1259 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
1260
1261 fs.initialize(fs.getUri(), conf);
1262 HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
1263
1264 final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
1265 "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
1266 assertEquals(1, fs.listStatus(corruptDir).length);
1267 }
1268
1269
1270
1271
1272
1273 @Test (timeout=300000)
1274 public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1275 LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1276
1277 String regionName = "r0";
1278 final Path regiondir = new Path(TABLEDIR, regionName);
1279 REGIONS = new ArrayList<String>();
1280 REGIONS.add(regionName);
1281 generateHLogs(-1);
1282
1283 HLogFactory.createHLog(fs, regiondir, regionName, conf);
1284 FileStatus[] logfiles = fs.listStatus(HLOGDIR);
1285 assertTrue("There should be some log file",
1286 logfiles != null && logfiles.length > 0);
1287
1288 HLogSplitter logSplitter = new HLogSplitter(
1289 conf, HBASEDIR, fs, null, null, this.mode) {
1290 protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
1291 throws IOException {
1292 HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);
1293
1294
1295
1296 NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
1297 if (files != null && !files.isEmpty()) {
1298 for (Path file : files) {
1299 if (!this.fs.delete(file, false)) {
1300 LOG.error("Failed delete of " + file);
1301 } else {
1302 LOG.debug("Deleted recovered.edits file=" + file);
1303 }
1304 }
1305 }
1306 return writer;
1307 }
1308 };
1309 try{
1310 logSplitter.splitLogFile(logfiles[0], null);
1311 } catch (IOException e) {
1312 LOG.info(e);
1313 Assert.fail("Throws IOException when spliting "
1314 + "log, it is most likely because writing file does not "
1315 + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1316 }
1317 if (fs.exists(CORRUPTDIR)) {
1318 if (fs.listStatus(CORRUPTDIR).length > 0) {
1319 Assert.fail("There are some corrupt logs, "
1320 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1321 }
1322 }
1323 }
1324
1325 private static void flushToConsole(String s) {
1326 System.out.println(s);
1327 System.out.flush();
1328 }
1329
1330
1331 private HLog.Writer [] generateHLogs(int leaveOpen) throws IOException {
1332 return generateHLogs(NUM_WRITERS, ENTRIES, leaveOpen);
1333 }
1334
1335 private HLog.Writer [] generateHLogs(final int writers, final int entries, final int leaveOpen) throws IOException {
1336 return generateHLogs((DistributedFileSystem)this.fs, writers, entries, leaveOpen);
1337 }
1338
1339 private static void makeRegionDirs(FileSystem fs, List<String> regions) throws IOException {
1340 for (String region : regions) {
1341 flushToConsole("Creating dir for region " + region);
1342 fs.mkdirs(new Path(TABLEDIR, region));
1343 }
1344 }
1345
1346 private static HLog.Writer [] generateHLogs(final DistributedFileSystem dfs, int writers, int entries, int leaveOpen)
1347 throws IOException {
1348 makeRegionDirs(dfs, REGIONS);
1349 dfs.mkdirs(HLOGDIR);
1350 HLog.Writer [] ws = new HLog.Writer[writers];
1351 int seq = 0;
1352 for (int i = 0; i < writers; i++) {
1353 ws[i] = HLogFactory.createWALWriter(dfs, new Path(HLOGDIR, HLOG_FILE_PREFIX + i), dfs.getConf());
1354 for (int j = 0; j < entries; j++) {
1355 int prefix = 0;
1356 for (String region : REGIONS) {
1357 String row_key = region + prefix++ + i + j;
1358 appendEntry(ws[i], TABLE_NAME, region.getBytes(), row_key.getBytes(), FAMILY, QUALIFIER, VALUE, seq++);
1359 }
1360 }
1361 if (i != leaveOpen) {
1362 ws[i].close();
1363 LOG.info("Closing writer " + i);
1364 }
1365 }
1366 return ws;
1367 }
1368
1369 private Path[] getLogForRegion(Path rootdir, TableName table, String region)
1370 throws IOException {
1371 Path tdir = FSUtils.getTableDir(rootdir, table);
1372 @SuppressWarnings("deprecation")
1373 Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
1374 Bytes.toString(region.getBytes())));
1375 FileStatus [] files = this.fs.listStatus(editsdir);
1376 Path[] paths = new Path[files.length];
1377 for (int i = 0; i < files.length; i++) {
1378 paths[i] = files[i].getPath();
1379 }
1380 return paths;
1381 }
1382
1383 private void corruptHLog(Path path, Corruptions corruption, boolean close,
1384 FileSystem fs) throws IOException {
1385
1386 FSDataOutputStream out;
1387 int fileSize = (int) fs.listStatus(path)[0].getLen();
1388
1389 FSDataInputStream in = fs.open(path);
1390 byte[] corrupted_bytes = new byte[fileSize];
1391 in.readFully(0, corrupted_bytes, 0, fileSize);
1392 in.close();
1393
1394 switch (corruption) {
1395 case APPEND_GARBAGE:
1396 fs.delete(path, false);
1397 out = fs.create(path);
1398 out.write(corrupted_bytes);
1399 out.write("-----".getBytes());
1400 closeOrFlush(close, out);
1401 break;
1402
1403 case INSERT_GARBAGE_ON_FIRST_LINE:
1404 fs.delete(path, false);
1405 out = fs.create(path);
1406 out.write(0);
1407 out.write(corrupted_bytes);
1408 closeOrFlush(close, out);
1409 break;
1410
1411 case INSERT_GARBAGE_IN_THE_MIDDLE:
1412 fs.delete(path, false);
1413 out = fs.create(path);
1414 int middle = (int) Math.floor(corrupted_bytes.length / 2);
1415 out.write(corrupted_bytes, 0, middle);
1416 out.write(0);
1417 out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1418 closeOrFlush(close, out);
1419 break;
1420
1421 case TRUNCATE:
1422 fs.delete(path, false);
1423 out = fs.create(path);
1424 out.write(corrupted_bytes, 0, fileSize
1425 - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1426 closeOrFlush(close, out);
1427 break;
1428
1429 case TRUNCATE_TRAILER:
1430 fs.delete(path, false);
1431 out = fs.create(path);
1432 out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);
1433 closeOrFlush(close, out);
1434 break;
1435 }
1436 }
1437
1438 private void closeOrFlush(boolean close, FSDataOutputStream out)
1439 throws IOException {
1440 if (close) {
1441 out.close();
1442 } else {
1443 Method syncMethod = null;
1444 try {
1445 syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
1446 } catch (NoSuchMethodException e) {
1447 try {
1448 syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
1449 } catch (NoSuchMethodException ex) {
1450 throw new IOException("This version of Hadoop supports " +
1451 "neither Syncable.sync() nor Syncable.hflush().");
1452 }
1453 }
1454 try {
1455 syncMethod.invoke(out, new Object[]{});
1456 } catch (Exception e) {
1457 throw new IOException(e);
1458 }
1459
1460 }
1461 }
1462
1463 @SuppressWarnings("unused")
1464 private void dumpHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
1465 HLog.Entry entry;
1466 HLog.Reader in = HLogFactory.createReader(fs, log, conf);
1467 while ((entry = in.next()) != null) {
1468 System.out.println(entry);
1469 }
1470 }
1471
1472 private int countHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
1473 int count = 0;
1474 HLog.Reader in = HLogFactory.createReader(fs, log, conf);
1475 while (in.next() != null) {
1476 count++;
1477 }
1478 return count;
1479 }
1480
1481
1482 public static long appendEntry(HLog.Writer writer, TableName table, byte[] region,
1483 byte[] row, byte[] family, byte[] qualifier,
1484 byte[] value, long seq)
1485 throws IOException {
1486 LOG.info(Thread.currentThread().getName() + " append");
1487 writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1488 LOG.info(Thread.currentThread().getName() + " sync");
1489 writer.sync();
1490 return seq;
1491 }
1492
1493 private static HLog.Entry createTestEntry(
1494 TableName table, byte[] region,
1495 byte[] row, byte[] family, byte[] qualifier,
1496 byte[] value, long seq) {
1497 long time = System.nanoTime();
1498 WALEdit edit = new WALEdit();
1499 seq++;
1500 edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
1501 return new HLog.Entry(new HLogKey(region, table, seq, time,
1502 HConstants.DEFAULT_CLUSTER_ID), edit);
1503 }
1504
1505
1506 private void injectEmptyFile(String suffix, boolean closeFile)
1507 throws IOException {
1508 HLog.Writer writer = HLogFactory.createWALWriter(
1509 fs, new Path(HLOGDIR, HLOG_FILE_PREFIX + suffix), conf);
1510 if (closeFile) writer.close();
1511 }
1512
1513 @SuppressWarnings("unused")
1514 private void listLogs(FileSystem fs, Path dir) throws IOException {
1515 for (FileStatus file : fs.listStatus(dir)) {
1516 System.out.println(file.getPath());
1517 }
1518
1519 }
1520
1521 private int compareHLogSplitDirs(Path p1, Path p2) throws IOException {
1522 FileStatus[] f1 = fs.listStatus(p1);
1523 FileStatus[] f2 = fs.listStatus(p2);
1524 assertNotNull("Path " + p1 + " doesn't exist", f1);
1525 assertNotNull("Path " + p2 + " doesn't exist", f2);
1526
1527 System.out.println("Files in " + p1 + ": " +
1528 Joiner.on(",").join(FileUtil.stat2Paths(f1)));
1529 System.out.println("Files in " + p2 + ": " +
1530 Joiner.on(",").join(FileUtil.stat2Paths(f2)));
1531 assertEquals(f1.length, f2.length);
1532
1533 for (int i = 0; i < f1.length; i++) {
1534
1535
1536 Path rd1 = HLogUtil.getRegionDirRecoveredEditsDir(f1[i].getPath());
1537 FileStatus[] rd1fs = fs.listStatus(rd1);
1538 assertEquals(1, rd1fs.length);
1539 Path rd2 = HLogUtil.getRegionDirRecoveredEditsDir(f2[i].getPath());
1540 FileStatus[] rd2fs = fs.listStatus(rd2);
1541 assertEquals(1, rd2fs.length);
1542 if (!logsAreEqual(rd1fs[0].getPath(), rd2fs[0].getPath())) {
1543 return -1;
1544 }
1545 }
1546 return 0;
1547 }
1548
1549 private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1550 HLog.Reader in1, in2;
1551 in1 = HLogFactory.createReader(fs, p1, conf);
1552 in2 = HLogFactory.createReader(fs, p2, conf);
1553 HLog.Entry entry1;
1554 HLog.Entry entry2;
1555 while ((entry1 = in1.next()) != null) {
1556 entry2 = in2.next();
1557 if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
1558 (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
1559 return false;
1560 }
1561 }
1562 return true;
1563 }
1564 }