View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertTrue;
25  import static org.junit.Assert.fail;
26  
27  import java.io.FileNotFoundException;
28  import java.io.IOException;
29  import java.lang.reflect.Method;
30  import java.security.PrivilegedExceptionAction;
31  import java.util.ArrayList;
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.NavigableSet;
37  import java.util.concurrent.CountDownLatch;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicLong;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.commons.logging.impl.Log4JLogger;
45  import org.apache.hadoop.hbase.TableName;
46  import org.apache.log4j.Level;
47  import org.apache.hadoop.hdfs.server.datanode.DataNode;
48  import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
49  import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
50  import org.apache.hadoop.conf.Configuration;
51  import org.apache.hadoop.fs.FSDataInputStream;
52  import org.apache.hadoop.fs.FSDataOutputStream;
53  import org.apache.hadoop.fs.FileStatus;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.FileUtil;
56  import org.apache.hadoop.fs.Path;
57  import org.apache.hadoop.hbase.HBaseConfiguration;
58  import org.apache.hadoop.hbase.HBaseTestingUtility;
59  import org.apache.hadoop.hbase.HColumnDescriptor;
60  import org.apache.hadoop.hbase.HConstants;
61  import org.apache.hadoop.hbase.HRegionInfo;
62  import org.apache.hadoop.hbase.HTableDescriptor;
63  import org.apache.hadoop.hbase.KeyValue;
64  import org.apache.hadoop.hbase.LargeTests;
65  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
66  import org.apache.hadoop.hbase.regionserver.HRegion;
67  import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
68  import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
69  import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.CorruptedLogFileException;
70  import org.apache.hadoop.hbase.security.User;
71  import org.apache.hadoop.hbase.util.Bytes;
72  import org.apache.hadoop.hbase.util.CancelableProgressable;
73  import org.apache.hadoop.hbase.util.FSUtils;
74  import org.apache.hadoop.hbase.util.Threads;
75  import org.apache.hadoop.hdfs.DFSTestUtil;
76  import org.apache.hadoop.hdfs.DistributedFileSystem;
77  import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
78  import org.apache.hadoop.ipc.RemoteException;
79  import org.junit.After;
80  import org.junit.AfterClass;
81  import org.junit.Assert;
82  import org.junit.Before;
83  import org.junit.BeforeClass;
84  import org.junit.Ignore;
85  import org.junit.Test;
86  import org.junit.experimental.categories.Category;
87  import org.mockito.Mockito;
88  import org.mockito.invocation.InvocationOnMock;
89  import org.mockito.stubbing.Answer;
90  
91  import com.google.common.base.Joiner;
92  import com.google.common.collect.ImmutableList;
93  
94  /**
95   * Testing {@link HLog} splitting code.
96   */
97  @Category(LargeTests.class)
98  public class TestHLogSplit {
99    {
100     ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
101     ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
102     ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
103   }
104   private final static Log LOG = LogFactory.getLog(TestHLogSplit.class);
105 
106   private Configuration conf;
107   private FileSystem fs;
108 
109   protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
110 
111   private static final Path HBASEDIR = new Path("/hbase");
112   private static final Path HLOGDIR = new Path(HBASEDIR, "hlog");
113   private static final Path OLDLOGDIR = new Path(HBASEDIR, "hlog.old");
114   private static final Path CORRUPTDIR = new Path(HBASEDIR, HConstants.CORRUPT_DIR_NAME);
115 
116   private static final int NUM_WRITERS = 10;
117   private static final int ENTRIES = 10; // entries per writer per region
118 
119   private static final TableName TABLE_NAME =
120       TableName.valueOf("t1");
121   private static final byte[] FAMILY = "f1".getBytes();
122   private static final byte[] QUALIFIER = "q1".getBytes();
123   private static final byte[] VALUE = "v1".getBytes();
124   private static final String HLOG_FILE_PREFIX = "hlog.dat.";
125   private static List<String> REGIONS = new ArrayList<String>();
126   private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
127   private static final Path TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
128   private static String ROBBER;
129   private static String ZOMBIE;
130   private static String [] GROUP = new String [] {"supergroup"};
131   private RecoveryMode mode;
132 
133   static enum Corruptions {
134     INSERT_GARBAGE_ON_FIRST_LINE,
135     INSERT_GARBAGE_IN_THE_MIDDLE,
136     APPEND_GARBAGE,
137     TRUNCATE,
138     TRUNCATE_TRAILER
139   }
140 
141   @BeforeClass
142   public static void setUpBeforeClass() throws Exception {
143     FSUtils.setRootDir(TEST_UTIL.getConfiguration(), HBASEDIR);
144     TEST_UTIL.getConfiguration().setClass("hbase.regionserver.hlog.writer.impl",
145       InstrumentedSequenceFileLogWriter.class, HLog.Writer.class);
146     TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
147     TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
148     // This is how you turn off shortcircuit read currently.  TODO: Fix.  Should read config.
149     System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
150     // Create fake maping user to group and set it to the conf.
151     Map<String, String []> u2g_map = new HashMap<String, String []>(2);
152     ROBBER = User.getCurrent().getName() + "-robber";
153     ZOMBIE = User.getCurrent().getName() + "-zombie";
154     u2g_map.put(ROBBER, GROUP);
155     u2g_map.put(ZOMBIE, GROUP);
156     DFSTestUtil.updateConfWithFakeGroupMapping(TEST_UTIL.getConfiguration(), u2g_map);
157     TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
158     TEST_UTIL.startMiniDFSCluster(2);
159   }
160 
161   @AfterClass
162   public static void tearDownAfterClass() throws Exception {
163     TEST_UTIL.shutdownMiniDFSCluster();
164   }
165 
166   @Before
167   public void setUp() throws Exception {
168     flushToConsole("Cleaning up cluster for new test\n"
169         + "--------------------------");
170     conf = TEST_UTIL.getConfiguration();
171     fs = TEST_UTIL.getDFSCluster().getFileSystem();
172     FileStatus[] entries = fs.listStatus(new Path("/"));
173     flushToConsole("Num entries in /:" + entries.length);
174     for (FileStatus dir : entries){
175       assertTrue("Deleting " + dir.getPath(), fs.delete(dir.getPath(), true));
176     }
177     // create the HLog directory because recursive log creates are not allowed
178     fs.mkdirs(HLOGDIR);
179     REGIONS.clear();
180     Collections.addAll(REGIONS, "bbb", "ccc");
181     InstrumentedSequenceFileLogWriter.activateFailure = false;
182     this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
183         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
184   }
185 
186   @After
187   public void tearDown() throws Exception {
188   }
189 
190   /**
191    * Simulates splitting a WAL out from under a regionserver that is still trying to write it.  Ensures we do not
192    * lose edits.
193    * @throws IOException
194    * @throws InterruptedException
195    */
196   @Test (timeout=300000)
197   public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
198     final AtomicLong counter = new AtomicLong(0);
199     AtomicBoolean stop = new AtomicBoolean(false);
200     // Region we'll write edits too and then later examine to make sure they all made it in.
201     final String region = REGIONS.get(0);
202     Thread zombie = new ZombieLastLogWriterRegionServer(this.conf, counter, stop, region);
203     try {
204       long startCount = counter.get();
205       zombie.start();
206       // Wait till writer starts going.
207       while (startCount == counter.get()) Threads.sleep(1);
208       // Give it a second to write a few appends.
209       Threads.sleep(1000);
210       final Configuration conf2 = HBaseConfiguration.create(this.conf);
211       final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
212       int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
213         @Override
214         public Integer run() throws Exception {
215           FileSystem fs = FileSystem.get(conf2);
216           int expectedFiles = fs.listStatus(HLOGDIR).length;
217           HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf2);
218           Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
219           assertEquals(expectedFiles, logfiles.length);
220           int count = 0;
221           for (Path logfile: logfiles) {
222             count += countHLog(logfile, fs, conf2);
223           }
224           return count;
225         }
226       });
227       LOG.info("zombie=" + counter.get() + ", robber=" + count);
228       assertTrue("The log file could have at most 1 extra log entry, but can't have less. Zombie could write " +
229         counter.get() + " and logfile had only " + count,
230         counter.get() == count || counter.get() + 1 == count);
231     } finally {
232       stop.set(true);
233       zombie.interrupt();
234       Threads.threadDumpingIsAlive(zombie);
235     }
236   }
237 
238   /**
239    * This thread will keep writing to a 'wal' file even after the split process has started.
240    * It simulates a region server that was considered dead but woke up and wrote some more to he last log entry.
241    * Does its writing as an alternate user in another filesystem instance to simulate better it being a regionserver.
242    */
243   static class ZombieLastLogWriterRegionServer extends Thread {
244     final AtomicLong editsCount;
245     final AtomicBoolean stop;
246     // final User user;
247     /**
248      * Region to write edits for.
249      */
250     final String region;
251     final Configuration conf;
252     final User user;
253 
254     public ZombieLastLogWriterRegionServer(final Configuration conf, AtomicLong counter, AtomicBoolean stop,
255         final String region)
256     throws IOException, InterruptedException {
257       super("ZombieLastLogWriterRegionServer");
258       setDaemon(true);
259       this.stop = stop;
260       this.editsCount = counter;
261       this.region = region;
262       this.conf = HBaseConfiguration.create(conf);
263       this.user = User.createUserForTesting(this.conf, ZOMBIE, GROUP);
264     }
265 
266     @Override
267     public void run() {
268       try {
269         doWriting();
270       } catch (IOException e) {
271         LOG.warn(getName() + " Writer exiting " + e);
272       } catch (InterruptedException e) {
273         LOG.warn(getName() + " Writer exiting " + e);
274       }
275     }
276 
277     private void doWriting() throws IOException, InterruptedException {
278       this.user.runAs(new PrivilegedExceptionAction<Object>() {
279         @Override
280         public Object run() throws Exception {
281           // Index of the WAL we want to keep open.  generateHLogs will leave open the WAL whose index we supply here.
282           int walToKeepOpen = 2;
283           // How many files to write.
284           final int numOfWriters = walToKeepOpen + 1;
285           // The below method writes numOfWriters files each with ENTRIES entries for a total of numOfWriters * ENTRIES
286           // added per column family in the region.
287           HLog.Writer[] writers = null;
288           try {
289             DistributedFileSystem dfs = (DistributedFileSystem)FileSystem.get(conf);
290             writers = generateHLogs(dfs, numOfWriters, ENTRIES, walToKeepOpen);
291           } catch (IOException e1) {
292             throw new RuntimeException("Failed", e1);
293           }
294           // Update counter so has all edits written so far.
295           editsCount.addAndGet(numOfWriters * NUM_WRITERS);
296           // This WAL should be open still after our call to generateHLogs -- we asked it leave it open.
297           HLog.Writer writer = writers[walToKeepOpen];
298           loop(writer);
299           return null;
300         }
301       });
302     }
303 
304     private void loop(final HLog.Writer writer) {
305       byte [] regionBytes = Bytes.toBytes(this.region);
306       while (true) {
307         try {
308           long seq = appendEntry(writer, TABLE_NAME, regionBytes, ("r" + editsCount.get()).getBytes(),
309             regionBytes, QUALIFIER, VALUE, 0);
310           long count = editsCount.incrementAndGet();
311           flushToConsole(getName() + " sync count=" + count + ", seq=" + seq);
312           try {
313             Thread.sleep(1);
314           } catch (InterruptedException e) {
315             //
316           }
317         } catch (IOException ex) {
318           flushToConsole(getName() + " ex " + ex.toString());
319           if (ex instanceof RemoteException) {
320             flushToConsole("Juliet: got RemoteException " + ex.getMessage() +
321               " while writing " + (editsCount.get() + 1));
322           } else {
323             flushToConsole(getName() + " failed to write....at " + editsCount.get());
324             assertTrue("Failed to write " + editsCount.get(), false);
325           }
326           break;
327         } catch (Throwable t) {
328           flushToConsole(getName() + " HOW? " + t);
329           t.printStackTrace();
330           break;
331         }
332       }
333       flushToConsole(getName() + " Writer exiting");
334     }
335   }
336 
337   /**
338    * @throws IOException
339    * @see https://issues.apache.org/jira/browse/HBASE-3020
340    */
341   @Test (timeout=300000)
342   public void testRecoveredEditsPathForMeta() throws IOException {
343     FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
344     byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
345     Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
346     Path regiondir = new Path(tdir,
347         HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
348     fs.mkdirs(regiondir);
349     long now = System.currentTimeMillis();
350     HLog.Entry entry =
351         new HLog.Entry(new HLogKey(encoded,
352             TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
353       new WALEdit());
354     Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true);
355     String parentOfParent = p.getParent().getParent().getName();
356     assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
357   }
358 
359   /**
360    * Test old recovered edits file doesn't break HLogSplitter.
361    * This is useful in upgrading old instances.
362    */
363   @Test (timeout=300000)
364   public void testOldRecoveredEditsFileSidelined() throws IOException {
365     FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
366     byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
367     Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
368     Path regiondir = new Path(tdir,
369         HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
370     fs.mkdirs(regiondir);
371     long now = System.currentTimeMillis();
372     HLog.Entry entry =
373         new HLog.Entry(new HLogKey(encoded,
374             TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
375       new WALEdit());
376     Path parent = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
377     assertEquals(parent.getName(), HConstants.RECOVERED_EDITS_DIR);
378     fs.createNewFile(parent); // create a recovered.edits file
379 
380     Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true);
381     String parentOfParent = p.getParent().getParent().getName();
382     assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
383     HLogFactory.createRecoveredEditsWriter(fs, p, conf).close();
384   }
385 
386   @Test (timeout=300000)
387   public void testSplitPreservesEdits() throws IOException{
388     final String REGION = "region__1";
389     REGIONS.removeAll(REGIONS);
390     REGIONS.add(REGION);
391 
392     generateHLogs(1, 10, -1);
393     fs.initialize(fs.getUri(), conf);
394     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
395     Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
396     Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
397     assertEquals(1, splitLog.length);
398 
399     assertEquals("edits differ after split", true, logsAreEqual(originalLog, splitLog[0]));
400   }
401 
402 
403   @Test (timeout=300000)
404   public void testEmptyLogFiles() throws IOException {
405 
406     injectEmptyFile(".empty", true);
407     generateHLogs(Integer.MAX_VALUE);
408     injectEmptyFile("empty", true);
409 
410     // make fs act as a different client now
411     // initialize will create a new DFSClient with a new client ID
412     fs.initialize(fs.getUri(), conf);
413 
414     int expectedFiles = fs.listStatus(HLOGDIR).length - 2; // less 2 empty files
415     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
416     for (String region : REGIONS) {
417       Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
418       assertEquals(expectedFiles, logfiles.length);
419       int count = 0;
420       for (Path logfile: logfiles) {
421         count += countHLog(logfile, fs, conf);
422       }
423       assertEquals(NUM_WRITERS * ENTRIES, count);
424     }
425   }
426 
427 
428   @Test (timeout=300000)
429   public void testEmptyOpenLogFiles() throws IOException {
430     injectEmptyFile(".empty", false);
431     generateHLogs(Integer.MAX_VALUE);
432     injectEmptyFile("empty", false);
433 
434     // make fs act as a different client now
435     // initialize will create a new DFSClient with a new client ID
436     fs.initialize(fs.getUri(), conf);
437 
438     int expectedFiles = fs.listStatus(HLOGDIR).length - 2 ; // less 2 empty files
439     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
440     for (String region : REGIONS) {
441       Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
442       assertEquals(expectedFiles, logfiles.length);
443       int count = 0;
444       for (Path logfile: logfiles) {
445         count += countHLog(logfile, fs, conf);
446       }
447       assertEquals(NUM_WRITERS * ENTRIES, count);
448     }
449   }
450 
451   @Test (timeout=300000)
452   public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
453     // generate logs but leave hlog.dat.5 open.
454     generateHLogs(5);
455 
456     fs.initialize(fs.getUri(), conf);
457 
458     int expectedFiles = fs.listStatus(HLOGDIR).length;
459     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
460     for (String region : REGIONS) {
461       Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
462       assertEquals(expectedFiles, logfiles.length);
463       int count = 0;
464       for (Path logfile: logfiles) {
465         count += countHLog(logfile, fs, conf);
466       }
467       assertEquals(NUM_WRITERS * ENTRIES, count);
468     }
469   }
470 
471 
472   @Test (timeout=300000)
473   public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
474     conf.setBoolean(HBASE_SKIP_ERRORS, true);
475     generateHLogs(Integer.MAX_VALUE);
476     corruptHLog(new Path(HLOGDIR, HLOG_FILE_PREFIX + "5"),
477             Corruptions.APPEND_GARBAGE, true, fs);
478     fs.initialize(fs.getUri(), conf);
479 
480     int expectedFiles = fs.listStatus(HLOGDIR).length;
481     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
482     for (String region : REGIONS) {
483       Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
484       assertEquals(expectedFiles, logfiles.length);
485       int count = 0;
486       for (Path logfile: logfiles) {
487         count += countHLog(logfile, fs, conf);
488       }
489       assertEquals(NUM_WRITERS * ENTRIES, count);
490     }
491   }
492 
493   @Test (timeout=300000)
494   public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
495     conf.setBoolean(HBASE_SKIP_ERRORS, true);
496     generateHLogs(Integer.MAX_VALUE);
497     corruptHLog(new Path(HLOGDIR, HLOG_FILE_PREFIX + "5"),
498             Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
499     fs.initialize(fs.getUri(), conf);
500 
501     int expectedFiles = fs.listStatus(HLOGDIR).length - 1; // less 1 corrupted file
502     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
503     for (String region : REGIONS) {
504       Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
505       assertEquals(expectedFiles, logfiles.length);
506       int count = 0;
507       for (Path logfile: logfiles) {
508         count += countHLog(logfile, fs, conf);
509       }
510       assertEquals((NUM_WRITERS - 1) * ENTRIES, count);
511     }
512   }
513 
514   @Test (timeout=300000)
515   public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
516     conf.setBoolean(HBASE_SKIP_ERRORS, true);
517     generateHLogs(Integer.MAX_VALUE);
518     corruptHLog(new Path(HLOGDIR, HLOG_FILE_PREFIX + "5"),
519             Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
520     fs.initialize(fs.getUri(), conf);
521 
522     int expectedFiles = fs.listStatus(HLOGDIR).length;
523     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
524     for (String region : REGIONS) {
525       Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
526       assertEquals(expectedFiles, logfiles.length);
527       int count = 0;
528       for (Path logfile: logfiles) {
529         count += countHLog(logfile, fs, conf);
530       }
531       // the entries in the original logs are alternating regions
532       // considering the sequence file header, the middle corruption should
533       // affect at least half of the entries
534       int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
535       int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
536       assertTrue("The file up to the corrupted area hasn't been parsed",
537               goodEntries + firstHalfEntries <= count);
538     }
539   }
540 
541   @Test (timeout=300000)
542   public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
543     conf.setBoolean(HBASE_SKIP_ERRORS, true);
544     Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
545         Reader.class);
546     InstrumentedSequenceFileLogWriter.activateFailure = false;
547     HLogFactory.resetLogReaderClass();
548 
549     try {
550     Path c1 = new Path(HLOGDIR, HLOG_FILE_PREFIX + "0");
551       conf.setClass("hbase.regionserver.hlog.reader.impl",
552           FaultySequenceFileLogReader.class, HLog.Reader.class);
553       for (FaultySequenceFileLogReader.FailureType  failureType : FaultySequenceFileLogReader.FailureType.values()) {
554         conf.set("faultysequencefilelogreader.failuretype", failureType.name());
555         generateHLogs(1, ENTRIES, -1);
556         fs.initialize(fs.getUri(), conf);
557         HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
558         FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
559         assertEquals("expected a different file", c1.getName(), archivedLogs[0]
560             .getPath().getName());
561         assertEquals(archivedLogs.length, 1);
562         fs.delete(new Path(OLDLOGDIR, HLOG_FILE_PREFIX + "0"), false);
563       }
564     } finally {
565       conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
566           Reader.class);
567       HLogFactory.resetLogReaderClass();
568     }
569   }
570 
571   @Test (timeout=300000, expected = IOException.class)
572   public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
573       throws IOException {
574     conf.setBoolean(HBASE_SKIP_ERRORS, false);
575     Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
576         Reader.class);
577     InstrumentedSequenceFileLogWriter.activateFailure = false;
578     HLogFactory.resetLogReaderClass();
579 
580     try {
581       conf.setClass("hbase.regionserver.hlog.reader.impl",
582           FaultySequenceFileLogReader.class, HLog.Reader.class);
583       conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
584       generateHLogs(Integer.MAX_VALUE);
585       fs.initialize(fs.getUri(), conf);
586       HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
587     } finally {
588       conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
589           Reader.class);
590       HLogFactory.resetLogReaderClass();
591     }
592   }
593 
594   @Test (timeout=300000)
595   public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
596       throws IOException {
597     conf.setBoolean(HBASE_SKIP_ERRORS, false);
598     Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
599         Reader.class);
600     InstrumentedSequenceFileLogWriter.activateFailure = false;
601     HLogFactory.resetLogReaderClass();
602 
603     try {
604       conf.setClass("hbase.regionserver.hlog.reader.impl",
605           FaultySequenceFileLogReader.class, HLog.Reader.class);
606       conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
607       generateHLogs(-1);
608       fs.initialize(fs.getUri(), conf);
609       try {
610         HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
611       } catch (IOException e) {
612         assertEquals(
613             "if skip.errors is false all files should remain in place",
614             NUM_WRITERS, fs.listStatus(HLOGDIR).length);
615       }
616     } finally {
617       conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
618           Reader.class);
619       HLogFactory.resetLogReaderClass();
620     }
621   }
622 
623   @Test (timeout=300000)
624   public void testEOFisIgnored() throws IOException {
625     conf.setBoolean(HBASE_SKIP_ERRORS, false);
626 
627     final String REGION = "region__1";
628     REGIONS.removeAll(REGIONS);
629     REGIONS.add(REGION);
630 
631     int entryCount = 10;
632     Path c1 = new Path(HLOGDIR, HLOG_FILE_PREFIX + "0");
633     generateHLogs(1, entryCount, -1);
634     corruptHLog(c1, Corruptions.TRUNCATE, true, fs);
635 
636     fs.initialize(fs.getUri(), conf);
637     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
638 
639     Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
640     assertEquals(1, splitLog.length);
641 
642     int actualCount = 0;
643     HLog.Reader in = HLogFactory.createReader(fs, splitLog[0], conf);
644     @SuppressWarnings("unused")
645     HLog.Entry entry;
646     while ((entry = in.next()) != null) ++actualCount;
647     assertEquals(entryCount-1, actualCount);
648 
649     // should not have stored the EOF files as corrupt
650     FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
651     assertEquals(archivedLogs.length, 0);
652   }
653 
654   @Test (timeout=300000)
655   public void testCorruptWALTrailer() throws IOException {
656     conf.setBoolean(HBASE_SKIP_ERRORS, false);
657 
658     final String REGION = "region__1";
659     REGIONS.removeAll(REGIONS);
660     REGIONS.add(REGION);
661 
662     int entryCount = 10;
663     Path c1 = new Path(HLOGDIR, HLOG_FILE_PREFIX + "0");
664     generateHLogs(1, entryCount, -1);
665     corruptHLog(c1, Corruptions.TRUNCATE_TRAILER, true, fs);
666 
667     fs.initialize(fs.getUri(), conf);
668     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
669 
670     Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
671     assertEquals(1, splitLog.length);
672 
673     int actualCount = 0;
674     HLog.Reader in = HLogFactory.createReader(fs, splitLog[0], conf);
675     @SuppressWarnings("unused")
676     HLog.Entry entry;
677     while ((entry = in.next()) != null) ++actualCount;
678     assertEquals(entryCount, actualCount);
679 
680     // should not have stored the EOF files as corrupt
681     FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
682     assertEquals(archivedLogs.length, 0);
683   }
684 
685   @Test (timeout=300000)
686   public void testLogsGetArchivedAfterSplit() throws IOException {
687     conf.setBoolean(HBASE_SKIP_ERRORS, false);
688     generateHLogs(-1);
689     fs.initialize(fs.getUri(), conf);
690     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
691     FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
692     assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
693   }
694 
695   @Test (timeout=300000)
696   public void testSplit() throws IOException {
697     generateHLogs(-1);
698     fs.initialize(fs.getUri(), conf);
699 
700     int expectedFiles = fs.listStatus(HLOGDIR).length;
701     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
702     for (String region : REGIONS) {
703       Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
704       assertEquals(expectedFiles, logfiles.length);
705       int count = 0;
706       for (Path logfile: logfiles) {
707         count += countHLog(logfile, fs, conf);
708       }
709       assertEquals(NUM_WRITERS * ENTRIES, count);
710     }
711   }
712 
713   @Test (timeout=300000)
714   public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
715   throws IOException {
716     generateHLogs(-1);
717     fs.initialize(fs.getUri(), conf);
718     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
719     FileStatus [] statuses = null;
720     try {
721       statuses = fs.listStatus(HLOGDIR);
722       if (statuses != null) {
723         Assert.fail("Files left in log dir: " +
724             Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
725       }
726     } catch (FileNotFoundException e) {
727       // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
728     }
729   }
730 
731   @Test(timeout=300000, expected = IOException.class)
732   public void testSplitWillFailIfWritingToRegionFails() throws Exception {
733     //leave 5th log open so we could append the "trap"
734     HLog.Writer [] writer = generateHLogs(4);
735 
736     fs.initialize(fs.getUri(), conf);
737 
738     String region = "break";
739     Path regiondir = new Path(TABLEDIR, region);
740     fs.mkdirs(regiondir);
741 
742     InstrumentedSequenceFileLogWriter.activateFailure = false;
743     appendEntry(writer[4], TABLE_NAME, Bytes.toBytes(region),
744         ("r" + 999).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
745     writer[4].close();
746 
747     try {
748       InstrumentedSequenceFileLogWriter.activateFailure = true;
749       HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
750     } catch (IOException e) {
751       assertTrue(e.getMessage().
752         contains("This exception is instrumented and should only be thrown for testing"));
753       throw e;
754     } finally {
755       InstrumentedSequenceFileLogWriter.activateFailure = false;
756     }
757   }
758 
759 
760   // @Test TODO this test has been disabled since it was created!
761   // It currently fails because the second split doesn't output anything
762   // -- because there are no region dirs after we move aside the first
763   // split result
764   public void testSplittingLargeNumberOfRegionsConsistency() throws IOException {
765 
766     REGIONS.removeAll(REGIONS);
767     for (int i=0; i<100; i++) {
768       REGIONS.add("region__"+i);
769     }
770 
771     generateHLogs(1, 100, -1);
772     fs.initialize(fs.getUri(), conf);
773 
774     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
775     fs.rename(OLDLOGDIR, HLOGDIR);
776     Path firstSplitPath = new Path(HBASEDIR, TABLE_NAME+ ".first");
777     Path splitPath = new Path(HBASEDIR, TABLE_NAME.getNameAsString());
778     fs.rename(splitPath,
779             firstSplitPath);
780 
781     fs.initialize(fs.getUri(), conf);
782     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
783     assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath));
784   }
785 
786   @Test (timeout=300000)
787   public void testSplitDeletedRegion() throws IOException {
788     REGIONS.removeAll(REGIONS);
789     String region = "region_that_splits";
790     REGIONS.add(region);
791 
792     generateHLogs(1);
793     fs.initialize(fs.getUri(), conf);
794 
795     Path regiondir = new Path(TABLEDIR, region);
796     fs.delete(regiondir, true);
797     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
798     assertFalse(fs.exists(regiondir));
799   }
800 
801   @Test (timeout=300000)
802   public void testIOEOnOutputThread() throws Exception {
803     conf.setBoolean(HBASE_SKIP_ERRORS, false);
804 
805     generateHLogs(-1);
806     fs.initialize(fs.getUri(), conf);
807     FileStatus[] logfiles = fs.listStatus(HLOGDIR);
808     assertTrue("There should be some log file",
809       logfiles != null && logfiles.length > 0);
810     // Set up a splitter that will throw an IOE on the output side
811     HLogSplitter logSplitter = new HLogSplitter(
812         conf, HBASEDIR, fs, null, null, this.mode) {
813       protected HLog.Writer createWriter(FileSystem fs,
814           Path logfile, Configuration conf) throws IOException {
815         HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
816         Mockito.doThrow(new IOException("Injected")).when(
817           mockWriter).append(Mockito.<HLog.Entry>any());
818         return mockWriter;
819       }
820     };
821     // Set up a background thread dumper.  Needs a thread to depend on and then we need to run
822     // the thread dumping in a background thread so it does not hold up the test.
823     final AtomicBoolean stop = new AtomicBoolean(false);
824     final Thread someOldThread = new Thread("Some-old-thread") {
825       @Override
826       public void run() {
827         while(!stop.get()) Threads.sleep(10);
828       }
829     };
830     someOldThread.setDaemon(true);
831     someOldThread.start();
832     final Thread t = new Thread("Background-thread-dumper") {
833       public void run() {
834         try {
835           Threads.threadDumpingIsAlive(someOldThread);
836         } catch (InterruptedException e) {
837           e.printStackTrace();
838         }
839       }
840     };
841     t.setDaemon(true);
842     t.start();
843     try {
844       logSplitter.splitLogFile(logfiles[0], null);
845       fail("Didn't throw!");
846     } catch (IOException ioe) {
847       assertTrue(ioe.toString().contains("Injected"));
848     } finally {
849       // Setting this to true will turn off the background thread dumper.
850       stop.set(true);
851     }
852   }
853 
854   // Test for HBASE-3412
855   @Test (timeout=300000)
856   public void testMovedHLogDuringRecovery() throws Exception {
857     generateHLogs(-1);
858 
859     fs.initialize(fs.getUri(), conf);
860 
861     // This partial mock will throw LEE for every file simulating
862     // files that were moved
863     FileSystem spiedFs = Mockito.spy(fs);
864     // The "File does not exist" part is very important,
865     // that's how it comes out of HDFS
866     Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
867         when(spiedFs).append(Mockito.<Path>any());
868 
869     try {
870       HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, conf);
871       assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
872       assertFalse(fs.exists(HLOGDIR));
873     } catch (IOException e) {
874       fail("There shouldn't be any exception but: " + e.toString());
875     }
876   }
877 
878   @Test (timeout=300000)
879   public void testRetryOpenDuringRecovery() throws Exception {
880     generateHLogs(-1);
881 
882     fs.initialize(fs.getUri(), conf);
883 
884     FileSystem spiedFs = Mockito.spy(fs);
885     // The "Cannot obtain block length", "Could not obtain the last block",
886     // and "Blocklist for [^ ]* has changed.*" part is very important,
887     // that's how it comes out of HDFS. If HDFS changes the exception
888     // message, this test needs to be adjusted accordingly.
889     //
890     // When DFSClient tries to open a file, HDFS needs to locate
891     // the last block of the file and get its length. However, if the
892     // last block is under recovery, HDFS may have problem to obtain
893     // the block length, in which case, retry may help.
894     Mockito.doAnswer(new Answer<FSDataInputStream>() {
895       private final String[] errors = new String[] {
896         "Cannot obtain block length", "Could not obtain the last block",
897         "Blocklist for " + OLDLOGDIR + " has changed"};
898       private int count = 0;
899 
900       public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
901             if (count < 3) {
902                 throw new IOException(errors[count++]);
903             }
904             return (FSDataInputStream)invocation.callRealMethod();
905         }
906     }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
907 
908     try {
909       HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, conf);
910       assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
911       assertFalse(fs.exists(HLOGDIR));
912     } catch (IOException e) {
913       fail("There shouldn't be any exception but: " + e.toString());
914     }
915   }
916 
917   @Test (timeout=300000)
918   public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
919     generateHLogs(1, 10, -1);
920     FileStatus logfile = fs.listStatus(HLOGDIR)[0];
921     fs.initialize(fs.getUri(), conf);
922 
923     final AtomicInteger count = new AtomicInteger();
924 
925     CancelableProgressable localReporter
926       = new CancelableProgressable() {
927         @Override
928         public boolean progress() {
929           count.getAndIncrement();
930           return false;
931         }
932       };
933 
934     FileSystem spiedFs = Mockito.spy(fs);
935     Mockito.doAnswer(new Answer<FSDataInputStream>() {
936       public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
937         Thread.sleep(1500); // Sleep a while and wait report status invoked
938         return (FSDataInputStream)invocation.callRealMethod();
939       }
940     }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
941 
942     try {
943       conf.setInt("hbase.splitlog.report.period", 1000);
944       boolean ret = HLogSplitter.splitLogFile(
945         HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode);
946       assertFalse("Log splitting should failed", ret);
947       assertTrue(count.get() > 0);
948     } catch (IOException e) {
949       fail("There shouldn't be any exception but: " + e.toString());
950     } finally {
951       // reset it back to its default value
952       conf.setInt("hbase.splitlog.report.period", 59000);
953     }
954   }
955 
956   /**
957    * Test log split process with fake data and lots of edits to trigger threading
958    * issues.
959    */
960   @Test (timeout=300000)
961   public void testThreading() throws Exception {
962     doTestThreading(20000, 128*1024*1024, 0);
963   }
964 
965   /**
966    * Test blocking behavior of the log split process if writers are writing slower
967    * than the reader is reading.
968    */
969   @Test (timeout=300000)
970   public void testThreadingSlowWriterSmallBuffer() throws Exception {
971     doTestThreading(200, 1024, 50);
972   }
973 
974   /**
975    * Sets up a log splitter with a mock reader and writer. The mock reader generates
976    * a specified number of edits spread across 5 regions. The mock writer optionally
977    * sleeps for each edit it is fed.
978    * *
979    * After the split is complete, verifies that the statistics show the correct number
980    * of edits output into each region.
981    *
982    * @param numFakeEdits number of fake edits to push through pipeline
983    * @param bufferSize size of in-memory buffer
984    * @param writerSlowness writer threads will sleep this many ms per edit
985    */
986   private void doTestThreading(final int numFakeEdits,
987       final int bufferSize,
988       final int writerSlowness) throws Exception {
989 
990     Configuration localConf = new Configuration(conf);
991     localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
992 
993     // Create a fake log file (we'll override the reader to produce a stream of edits)
994     Path logPath = new Path(HLOGDIR, HLOG_FILE_PREFIX + ".fake");
995     FSDataOutputStream out = fs.create(logPath);
996     out.close();
997 
998     // Make region dirs for our destination regions so the output doesn't get skipped
999     final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
1000     makeRegionDirs(fs, regions);
1001 
1002     // Create a splitter that reads and writes the data without touching disk
1003     HLogSplitter logSplitter = new HLogSplitter(
1004         localConf, HBASEDIR, fs, null, null, this.mode) {
1005 
1006       /* Produce a mock writer that doesn't write anywhere */
1007       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
1008       throws IOException {
1009         HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
1010         Mockito.doAnswer(new Answer<Void>() {
1011           int expectedIndex = 0;
1012 
1013           @Override
1014           public Void answer(InvocationOnMock invocation) {
1015             if (writerSlowness > 0) {
1016               try {
1017                 Thread.sleep(writerSlowness);
1018               } catch (InterruptedException ie) {
1019                 Thread.currentThread().interrupt();
1020               }
1021             }
1022             HLog.Entry entry = (Entry) invocation.getArguments()[0];
1023             WALEdit edit = entry.getEdit();
1024             List<KeyValue> keyValues = edit.getKeyValues();
1025             assertEquals(1, keyValues.size());
1026             KeyValue kv = keyValues.get(0);
1027 
1028             // Check that the edits come in the right order.
1029             assertEquals(expectedIndex, Bytes.toInt(kv.getRow()));
1030             expectedIndex++;
1031             return null;
1032           }
1033         }).when(mockWriter).append(Mockito.<HLog.Entry>any());
1034         return mockWriter;
1035       }
1036 
1037       /* Produce a mock reader that generates fake entries */
1038       protected Reader getReader(FileSystem fs, Path curLogFile,
1039           Configuration conf, CancelableProgressable reporter) throws IOException {
1040         Reader mockReader = Mockito.mock(Reader.class);
1041         Mockito.doAnswer(new Answer<HLog.Entry>() {
1042           int index = 0;
1043 
1044           @Override
1045           public HLog.Entry answer(InvocationOnMock invocation) throws Throwable {
1046             if (index >= numFakeEdits) return null;
1047 
1048             // Generate r0 through r4 in round robin fashion
1049             int regionIdx = index % regions.size();
1050             byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
1051 
1052             HLog.Entry ret = createTestEntry(TABLE_NAME, region,
1053                 Bytes.toBytes((int)(index / regions.size())),
1054                 FAMILY, QUALIFIER, VALUE, index);
1055             index++;
1056             return ret;
1057           }
1058         }).when(mockReader).next();
1059         return mockReader;
1060       }
1061     };
1062 
1063     logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
1064 
1065     // Verify number of written edits per region
1066     Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
1067     for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
1068       LOG.info("Got " + entry.getValue() + " output edits for region " +
1069           Bytes.toString(entry.getKey()));
1070       assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
1071     }
1072     assertEquals(regions.size(), outputCounts.size());
1073   }
1074 
1075   // HBASE-2312: tests the case where a RegionServer enters a GC pause,
1076   // comes back online after the master declared it dead and started to split.
1077   // Want log rolling after a master split to fail
1078   @Test (timeout=300000)
1079   @Ignore("Need HADOOP-6886, HADOOP-6840, & HDFS-617 for this. HDFS 0.20.205.1+ should have this")
1080   public void testLogRollAfterSplitStart() throws IOException {
1081     HLog log = null;
1082     String logName = "testLogRollAfterSplitStart";
1083     Path thisTestsDir = new Path(HBASEDIR, logName);
1084 
1085     try {
1086       // put some entries in an HLog
1087       TableName tableName =
1088           TableName.valueOf(this.getClass().getName());
1089       HRegionInfo regioninfo = new HRegionInfo(tableName,
1090           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
1091       log = HLogFactory.createHLog(fs, HBASEDIR, logName, conf);
1092       final AtomicLong sequenceId = new AtomicLong(1);
1093 
1094       final int total = 20;
1095       for (int i = 0; i < total; i++) {
1096         WALEdit kvs = new WALEdit();
1097         kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
1098         HTableDescriptor htd = new HTableDescriptor(tableName);
1099         htd.addFamily(new HColumnDescriptor("column"));
1100         log.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
1101       }
1102       // Send the data to HDFS datanodes and close the HDFS writer
1103       log.sync();
1104       ((FSHLog) log).cleanupCurrentWriter(log.getFilenum());
1105 
1106       /* code taken from ProcessServerShutdown.process()
1107        * handles RS shutdowns (as observed by the Master)
1108        */
1109       // rename the directory so a rogue RS doesn't create more HLogs
1110       Path rsSplitDir = new Path(thisTestsDir.getParent(),
1111                                  thisTestsDir.getName() + "-splitting");
1112       fs.rename(thisTestsDir, rsSplitDir);
1113       LOG.debug("Renamed region directory: " + rsSplitDir);
1114 
1115       // Process the old log files
1116       HLogSplitter.split(HBASEDIR, rsSplitDir, OLDLOGDIR, fs, conf);
1117 
1118       // Now, try to roll the HLog and verify failure
1119       try {
1120         log.rollWriter();
1121         Assert.fail("rollWriter() did not throw any exception.");
1122       } catch (IOException ioe) {
1123         if (ioe.getCause().getMessage().contains("FileNotFound")) {
1124           LOG.info("Got the expected exception: ", ioe.getCause());
1125         } else {
1126           Assert.fail("Unexpected exception: " + ioe);
1127         }
1128       }
1129     } finally {
1130       if (log != null) {
1131         log.close();
1132       }
1133       if (fs.exists(thisTestsDir)) {
1134         fs.delete(thisTestsDir, true);
1135       }
1136     }
1137   }
1138 
1139   /**
1140    * This thread will keep adding new log files
1141    * It simulates a region server that was considered dead but woke up and wrote
1142    * some more to a new hlog
1143    */
1144   class ZombieNewLogWriterRegionServer extends Thread {
1145     AtomicBoolean stop;
1146     CountDownLatch latch;
1147     public ZombieNewLogWriterRegionServer(CountDownLatch latch, AtomicBoolean stop) {
1148       super("ZombieNewLogWriterRegionServer");
1149       this.latch = latch;
1150       this.stop = stop;
1151     }
1152 
1153     @Override
1154     public void run() {
1155       if (stop.get()) {
1156         return;
1157       }
1158       Path tableDir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1159       Path regionDir = new Path(tableDir, REGIONS.get(0));
1160       Path recoveredEdits = new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
1161       String region = "juliet";
1162       Path julietLog = new Path(HLOGDIR, HLOG_FILE_PREFIX + ".juliet");
1163       try {
1164 
1165         while (!fs.exists(recoveredEdits) && !stop.get()) {
1166           LOG.info("Juliet: split not started, sleeping a bit...");
1167           Threads.sleep(10);
1168         }
1169 
1170         fs.mkdirs(new Path(tableDir, region));
1171         HLog.Writer writer = HLogFactory.createWALWriter(fs,
1172           julietLog, conf);
1173         appendEntry(writer, TableName.valueOf("juliet"), ("juliet").getBytes(),
1174             ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0);
1175         writer.close();
1176         LOG.info("Juliet file creator: created file " + julietLog);
1177         latch.countDown();
1178       } catch (IOException e1) {
1179         LOG.error("Failed to create file " + julietLog, e1);
1180         assertTrue("Failed to create file " + julietLog, false);
1181       }
1182     }
1183   }
1184 
1185   @Test (timeout=300000)
1186   public void testSplitLogFileWithOneRegion() throws IOException {
1187     LOG.info("testSplitLogFileWithOneRegion");
1188     final String REGION = "region__1";
1189     REGIONS.removeAll(REGIONS);
1190     REGIONS.add(REGION);
1191 
1192     generateHLogs(1, 10, -1);
1193     fs.initialize(fs.getUri(), conf);
1194     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
1195 
1196     Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
1197     Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
1198     assertEquals(1, splitLog.length);
1199 
1200     assertEquals(true, logsAreEqual(originalLog, splitLog[0]));
1201   }
1202 
1203   @Test (timeout=300000)
1204   public void testSplitLogFileDeletedRegionDir() throws IOException {
1205     LOG.info("testSplitLogFileDeletedRegionDir");
1206     final String REGION = "region__1";
1207     REGIONS.removeAll(REGIONS);
1208     REGIONS.add(REGION);
1209 
1210     generateHLogs(1, 10, -1);
1211     fs.initialize(fs.getUri(), conf);
1212 
1213     Path regiondir = new Path(TABLEDIR, REGION);
1214     LOG.info("Region directory is" + regiondir);
1215     fs.delete(regiondir, true);
1216 
1217     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
1218 
1219     assertTrue(!fs.exists(regiondir));
1220     assertTrue(true);
1221   }
1222 
1223   @Test (timeout=300000)
1224   public void testSplitLogFileEmpty() throws IOException {
1225     LOG.info("testSplitLogFileEmpty");
1226     injectEmptyFile(".empty", true);
1227 
1228     fs.initialize(fs.getUri(), conf);
1229 
1230     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
1231     Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1232     assertFalse(fs.exists(tdir));
1233 
1234     assertEquals(0, countHLog(fs.listStatus(OLDLOGDIR)[0].getPath(), fs, conf));
1235   }
1236 
1237   @Test (timeout=300000)
1238   public void testSplitLogFileMultipleRegions() throws IOException {
1239     LOG.info("testSplitLogFileMultipleRegions");
1240     generateHLogs(1, 10, -1);
1241     fs.initialize(fs.getUri(), conf);
1242 
1243     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
1244     for (String region : REGIONS) {
1245       Path[] recovered = getLogForRegion(HBASEDIR, TABLE_NAME, region);
1246       assertEquals(1, recovered.length);
1247       assertEquals(10, countHLog(recovered[0], fs, conf));
1248     }
1249   }
1250 
1251   @Test (timeout=300000)
1252   public void testSplitLogFileFirstLineCorruptionLog()
1253   throws IOException {
1254     conf.setBoolean(HBASE_SKIP_ERRORS, true);
1255     generateHLogs(1, 10, -1);
1256     FileStatus logfile = fs.listStatus(HLOGDIR)[0];
1257 
1258     corruptHLog(logfile.getPath(),
1259         Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
1260 
1261     fs.initialize(fs.getUri(), conf);
1262     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
1263 
1264     final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
1265         "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
1266     assertEquals(1, fs.listStatus(corruptDir).length);
1267   }
1268 
1269   /**
1270    * @throws IOException
1271    * @see https://issues.apache.org/jira/browse/HBASE-4862
1272    */
1273   @Test (timeout=300000)
1274   public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1275     LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1276     // Generate hlogs for our destination region
1277     String regionName = "r0";
1278     final Path regiondir = new Path(TABLEDIR, regionName);
1279     REGIONS = new ArrayList<String>();
1280     REGIONS.add(regionName);
1281     generateHLogs(-1);
1282 
1283     HLogFactory.createHLog(fs, regiondir, regionName, conf);
1284     FileStatus[] logfiles = fs.listStatus(HLOGDIR);
1285     assertTrue("There should be some log file",
1286       logfiles != null && logfiles.length > 0);
1287 
1288     HLogSplitter logSplitter = new HLogSplitter(
1289         conf, HBASEDIR, fs, null, null, this.mode) {
1290       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
1291       throws IOException {
1292         HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);
1293         // After creating writer, simulate region's
1294         // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1295         // region and delete them, excluding files with '.temp' suffix.
1296         NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
1297         if (files != null && !files.isEmpty()) {
1298           for (Path file : files) {
1299             if (!this.fs.delete(file, false)) {
1300               LOG.error("Failed delete of " + file);
1301             } else {
1302               LOG.debug("Deleted recovered.edits file=" + file);
1303             }
1304           }
1305         }
1306         return writer;
1307       }
1308     };
1309     try{
1310       logSplitter.splitLogFile(logfiles[0], null);
1311     } catch (IOException e) {
1312       LOG.info(e);
1313       Assert.fail("Throws IOException when spliting "
1314           + "log, it is most likely because writing file does not "
1315           + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1316     }
1317     if (fs.exists(CORRUPTDIR)) {
1318       if (fs.listStatus(CORRUPTDIR).length > 0) {
1319         Assert.fail("There are some corrupt logs, "
1320                 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1321       }
1322     }
1323   }
1324 
1325   private static void flushToConsole(String s) {
1326     System.out.println(s);
1327     System.out.flush();
1328   }
1329 
1330 
1331   private HLog.Writer [] generateHLogs(int leaveOpen) throws IOException {
1332     return generateHLogs(NUM_WRITERS, ENTRIES, leaveOpen);
1333   }
1334 
1335   private HLog.Writer [] generateHLogs(final int writers, final int entries, final int leaveOpen) throws IOException {
1336     return generateHLogs((DistributedFileSystem)this.fs, writers, entries, leaveOpen);
1337   }
1338 
1339   private static void makeRegionDirs(FileSystem fs, List<String> regions) throws IOException {
1340     for (String region : regions) {
1341       flushToConsole("Creating dir for region " + region);
1342       fs.mkdirs(new Path(TABLEDIR, region));
1343     }
1344   }
1345 
1346   private static HLog.Writer [] generateHLogs(final DistributedFileSystem dfs, int writers, int entries, int leaveOpen)
1347   throws IOException {
1348     makeRegionDirs(dfs, REGIONS);
1349     dfs.mkdirs(HLOGDIR);
1350     HLog.Writer [] ws = new HLog.Writer[writers];
1351     int seq = 0;
1352     for (int i = 0; i < writers; i++) {
1353       ws[i] = HLogFactory.createWALWriter(dfs, new Path(HLOGDIR, HLOG_FILE_PREFIX + i), dfs.getConf());
1354       for (int j = 0; j < entries; j++) {
1355         int prefix = 0;
1356         for (String region : REGIONS) {
1357           String row_key = region + prefix++ + i + j;
1358           appendEntry(ws[i], TABLE_NAME, region.getBytes(), row_key.getBytes(), FAMILY, QUALIFIER, VALUE, seq++);
1359         }
1360       }
1361       if (i != leaveOpen) {
1362         ws[i].close();
1363         LOG.info("Closing writer " + i);
1364       }
1365     }
1366     return ws;
1367   }
1368 
1369   private Path[] getLogForRegion(Path rootdir, TableName table, String region)
1370   throws IOException {
1371     Path tdir = FSUtils.getTableDir(rootdir, table);
1372     @SuppressWarnings("deprecation")
1373     Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
1374       Bytes.toString(region.getBytes())));
1375     FileStatus [] files = this.fs.listStatus(editsdir);
1376     Path[] paths = new Path[files.length];
1377     for (int i = 0; i < files.length; i++) {
1378       paths[i] = files[i].getPath();
1379     }
1380     return paths;
1381   }
1382 
1383   private void corruptHLog(Path path, Corruptions corruption, boolean close,
1384                            FileSystem fs) throws IOException {
1385 
1386     FSDataOutputStream out;
1387     int fileSize = (int) fs.listStatus(path)[0].getLen();
1388 
1389     FSDataInputStream in = fs.open(path);
1390     byte[] corrupted_bytes = new byte[fileSize];
1391     in.readFully(0, corrupted_bytes, 0, fileSize);
1392     in.close();
1393 
1394     switch (corruption) {
1395       case APPEND_GARBAGE:
1396         fs.delete(path, false);
1397         out = fs.create(path);
1398         out.write(corrupted_bytes);
1399         out.write("-----".getBytes());
1400         closeOrFlush(close, out);
1401         break;
1402 
1403       case INSERT_GARBAGE_ON_FIRST_LINE:
1404         fs.delete(path, false);
1405         out = fs.create(path);
1406         out.write(0);
1407         out.write(corrupted_bytes);
1408         closeOrFlush(close, out);
1409         break;
1410 
1411       case INSERT_GARBAGE_IN_THE_MIDDLE:
1412         fs.delete(path, false);
1413         out = fs.create(path);
1414         int middle = (int) Math.floor(corrupted_bytes.length / 2);
1415         out.write(corrupted_bytes, 0, middle);
1416         out.write(0);
1417         out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1418         closeOrFlush(close, out);
1419         break;
1420 
1421       case TRUNCATE:
1422         fs.delete(path, false);
1423         out = fs.create(path);
1424         out.write(corrupted_bytes, 0, fileSize
1425           - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1426         closeOrFlush(close, out);
1427         break;
1428 
1429       case TRUNCATE_TRAILER:
1430         fs.delete(path, false);
1431         out = fs.create(path);
1432         out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
1433         closeOrFlush(close, out);
1434         break;
1435     }
1436   }
1437 
1438   private void closeOrFlush(boolean close, FSDataOutputStream out)
1439   throws IOException {
1440     if (close) {
1441       out.close();
1442     } else {
1443       Method syncMethod = null;
1444       try {
1445         syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
1446       } catch (NoSuchMethodException e) {
1447         try {
1448           syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
1449         } catch (NoSuchMethodException ex) {
1450           throw new IOException("This version of Hadoop supports " +
1451               "neither Syncable.sync() nor Syncable.hflush().");
1452         }
1453       }
1454       try {
1455         syncMethod.invoke(out, new Object[]{});
1456       } catch (Exception e) {
1457         throw new IOException(e);
1458       }
1459       // Not in 0out.hflush();
1460     }
1461   }
1462 
1463   @SuppressWarnings("unused")
1464   private void dumpHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
1465     HLog.Entry entry;
1466     HLog.Reader in = HLogFactory.createReader(fs, log, conf);
1467     while ((entry = in.next()) != null) {
1468       System.out.println(entry);
1469     }
1470   }
1471 
1472   private int countHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
1473     int count = 0;
1474     HLog.Reader in = HLogFactory.createReader(fs, log, conf);
1475     while (in.next() != null) {
1476       count++;
1477     }
1478     return count;
1479   }
1480 
1481 
1482   public static long appendEntry(HLog.Writer writer, TableName table, byte[] region,
1483                           byte[] row, byte[] family, byte[] qualifier,
1484                           byte[] value, long seq)
1485           throws IOException {
1486     LOG.info(Thread.currentThread().getName() + " append");
1487     writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1488     LOG.info(Thread.currentThread().getName() + " sync");
1489     writer.sync();
1490     return seq;
1491   }
1492 
1493   private static HLog.Entry createTestEntry(
1494       TableName table, byte[] region,
1495       byte[] row, byte[] family, byte[] qualifier,
1496       byte[] value, long seq) {
1497     long time = System.nanoTime();
1498     WALEdit edit = new WALEdit();
1499     seq++;
1500     edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
1501     return new HLog.Entry(new HLogKey(region, table, seq, time,
1502         HConstants.DEFAULT_CLUSTER_ID), edit);
1503   }
1504 
1505 
1506   private void injectEmptyFile(String suffix, boolean closeFile)
1507           throws IOException {
1508     HLog.Writer writer = HLogFactory.createWALWriter(
1509         fs, new Path(HLOGDIR, HLOG_FILE_PREFIX + suffix), conf);
1510     if (closeFile) writer.close();
1511   }
1512 
1513   @SuppressWarnings("unused")
1514   private void listLogs(FileSystem fs, Path dir) throws IOException {
1515     for (FileStatus file : fs.listStatus(dir)) {
1516       System.out.println(file.getPath());
1517     }
1518 
1519   }
1520 
1521   private int compareHLogSplitDirs(Path p1, Path p2) throws IOException {
1522     FileStatus[] f1 = fs.listStatus(p1);
1523     FileStatus[] f2 = fs.listStatus(p2);
1524     assertNotNull("Path " + p1 + " doesn't exist", f1);
1525     assertNotNull("Path " + p2 + " doesn't exist", f2);
1526 
1527     System.out.println("Files in " + p1 + ": " +
1528         Joiner.on(",").join(FileUtil.stat2Paths(f1)));
1529     System.out.println("Files in " + p2 + ": " +
1530         Joiner.on(",").join(FileUtil.stat2Paths(f2)));
1531     assertEquals(f1.length, f2.length);
1532 
1533     for (int i = 0; i < f1.length; i++) {
1534       // Regions now have a directory named RECOVERED_EDITS_DIR and in here
1535       // are split edit files. In below presume only 1.
1536       Path rd1 = HLogUtil.getRegionDirRecoveredEditsDir(f1[i].getPath());
1537       FileStatus[] rd1fs = fs.listStatus(rd1);
1538       assertEquals(1, rd1fs.length);
1539       Path rd2 = HLogUtil.getRegionDirRecoveredEditsDir(f2[i].getPath());
1540       FileStatus[] rd2fs = fs.listStatus(rd2);
1541       assertEquals(1, rd2fs.length);
1542       if (!logsAreEqual(rd1fs[0].getPath(), rd2fs[0].getPath())) {
1543         return -1;
1544       }
1545     }
1546     return 0;
1547   }
1548 
1549   private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1550     HLog.Reader in1, in2;
1551     in1 = HLogFactory.createReader(fs, p1, conf);
1552     in2 = HLogFactory.createReader(fs, p2, conf);
1553     HLog.Entry entry1;
1554     HLog.Entry entry2;
1555     while ((entry1 = in1.next()) != null) {
1556       entry2 = in2.next();
1557       if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
1558               (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
1559         return false;
1560       }
1561     }
1562     return true;
1563   }
1564 }