View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertTrue;
25  
26  import java.io.EOFException;
27  import java.io.IOException;
28  import java.io.OutputStream;
29  import java.lang.reflect.InvocationTargetException;
30  import java.lang.reflect.Method;
31  import java.util.ArrayList;
32  import java.util.HashSet;
33  import java.util.List;
34  import java.util.Set;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.commons.logging.impl.Log4JLogger;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.LargeTests;
48  import org.apache.hadoop.hbase.MiniHBaseCluster;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.client.Get;
51  import org.apache.hadoop.hbase.client.HBaseAdmin;
52  import org.apache.hadoop.hbase.client.HTable;
53  import org.apache.hadoop.hbase.client.Put;
54  import org.apache.hadoop.hbase.client.Result;
55  import org.apache.hadoop.hbase.client.ResultScanner;
56  import org.apache.hadoop.hbase.client.Scan;
57  import org.apache.hadoop.hbase.fs.HFileSystem;
58  import org.apache.hadoop.hbase.regionserver.HRegion;
59  import org.apache.hadoop.hbase.regionserver.HRegionServer;
60  import org.apache.hadoop.hbase.regionserver.Store;
61  import org.apache.hadoop.hbase.util.Bytes;
62  import org.apache.hadoop.hbase.util.FSUtils;
63  import org.apache.hadoop.hbase.util.JVMClusterUtil;
64  import org.apache.hadoop.hbase.util.Threads;
65  import org.apache.hadoop.hdfs.DFSClient;
66  import org.apache.hadoop.hdfs.MiniDFSCluster;
67  import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
68  import org.apache.hadoop.hdfs.server.datanode.DataNode;
69  import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
70  import org.apache.log4j.Level;
71  import org.junit.After;
72  import org.junit.Assert;
73  import org.junit.Before;
74  import org.junit.BeforeClass;
75  import org.junit.Test;
76  import org.junit.experimental.categories.Category;
77  
78  /**
79   * Test log deletion as logs are rolled.
80   */
81  @Category(LargeTests.class)
82  public class TestLogRolling  {
83    private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
84    private HRegionServer server;
85    private HLog log;
86    private String tableName;
87    private byte[] value;
88    private FileSystem fs;
89    private MiniDFSCluster dfsCluster;
90    private HBaseAdmin admin;
91    private MiniHBaseCluster cluster;
92    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
93  
94   // verbose logging on classes that are touched in these tests
95   {
96     ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
97     ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
98     ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
99       .getLogger().setLevel(Level.ALL);
100    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
101    ((Log4JLogger)HRegionServer.LOG).getLogger().setLevel(Level.ALL);
102    ((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.ALL);
103    ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
104  }
105 
106   /**
107    * constructor
108    * @throws Exception
109    */
110   public TestLogRolling()  {
111     this.server = null;
112     this.log = null;
113     this.tableName = null;
114 
115     String className = this.getClass().getName();
116     StringBuilder v = new StringBuilder(className);
117     while (v.length() < 1000) {
118       v.append(className);
119     }
120     this.value = Bytes.toBytes(v.toString());
121   }
122 
123   // Need to override this setup so we can edit the config before it gets sent
124   // to the HDFS & HBase cluster startup.
125   @BeforeClass
126   public static void setUpBeforeClass() throws Exception {
127     // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2
128     // profile. See HBASE-9337 for related issues.
129     System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
130 
131     /**** configuration for testLogRolling ****/
132     // Force a region split after every 768KB
133     TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L);
134 
135     // We roll the log after every 32 writes
136     TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
137 
138     TEST_UTIL.getConfiguration().setInt(
139         "hbase.regionserver.logroll.errors.tolerated", 2);
140     TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000);
141     TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
142     TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
143 
144     // For less frequently updated regions flush after every 2 flushes
145     TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2);
146 
147     // We flush the cache after every 8192 bytes
148     TEST_UTIL.getConfiguration().setInt(
149         HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192);
150 
151     // Increase the amount of time between client retries
152     TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000);
153 
154     // Reduce thread wake frequency so that other threads can get
155     // a chance to run.
156     TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
157 
158    /**** configuration for testLogRollOnDatanodeDeath ****/
159    // make sure log.hflush() calls syncFs() to open a pipeline
160     TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
161    // lower the namenode & datanode heartbeat so the namenode
162    // quickly detects datanode failures
163     TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
164     TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
165    // the namenode might still try to choose the recently-dead datanode
166    // for a pipeline, so try to a new pipeline multiple times
167     TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
168     TEST_UTIL.getConfiguration().setInt(
169         "hbase.regionserver.hlog.tolerable.lowreplication", 2);
170     TEST_UTIL.getConfiguration().setInt(
171         "hbase.regionserver.hlog.lowreplication.rolllimit", 3);
172   }
173 
174   @Before
175   public void setUp() throws Exception {
176     TEST_UTIL.startMiniCluster(1, 1, 2);
177 
178     cluster = TEST_UTIL.getHBaseCluster();
179     dfsCluster = TEST_UTIL.getDFSCluster();
180     fs = TEST_UTIL.getTestFileSystem();
181     admin = TEST_UTIL.getHBaseAdmin();
182 
183     // disable region rebalancing (interferes with log watching)
184     cluster.getMaster().balanceSwitch(false);
185   }
186 
187   @After
188   public void tearDown() throws Exception  {
189     TEST_UTIL.shutdownMiniCluster();
190   }
191 
192   private void startAndWriteData() throws IOException, InterruptedException {
193     // When the hbase:meta table can be opened, the region servers are running
194     new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
195     this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
196     this.log = server.getWAL();
197 
198     HTable table = createTestTable(this.tableName);
199 
200     server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
201     this.log = server.getWAL();
202     for (int i = 1; i <= 256; i++) {    // 256 writes should cause 8 log rolls
203       doPut(table, i);
204       if (i % 32 == 0) {
205         // After every 32 writes sleep to let the log roller run
206         try {
207           Thread.sleep(2000);
208         } catch (InterruptedException e) {
209           // continue
210         }
211       }
212     }
213   }
214 
215   /**
216    * Tests that logs are deleted
217    * @throws IOException
218    * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
219    */
220   @Test
221   public void testLogRolling() throws Exception {
222     this.tableName = getName();
223       startAndWriteData();
224       LOG.info("after writing there are " + ((FSHLog) log).getNumRolledLogFiles() + " log files");
225 
226       // flush all regions
227 
228       List<HRegion> regions =
229         new ArrayList<HRegion>(server.getOnlineRegionsLocalContext());
230       for (HRegion r: regions) {
231         r.flushcache();
232       }
233 
234       // Now roll the log
235       log.rollWriter();
236 
237       int count = ((FSHLog) log).getNumRolledLogFiles();
238       LOG.info("after flushing all regions and rolling logs there are " +
239                                       ((FSHLog) log).getNumRolledLogFiles() + " log files");
240       assertTrue(("actual count: " + count), count <= 2);
241   }
242 
243   private static String getName() {
244     return "TestLogRolling";
245   }
246 
247   void writeData(HTable table, int rownum) throws IOException {
248     doPut(table, rownum);
249 
250     // sleep to let the log roller run (if it needs to)
251     try {
252       Thread.sleep(2000);
253     } catch (InterruptedException e) {
254       // continue
255     }
256   }
257 
258   void validateData(HTable table, int rownum) throws IOException {
259     String row = "row" + String.format("%1$04d", rownum);
260     Get get = new Get(Bytes.toBytes(row));
261     get.addFamily(HConstants.CATALOG_FAMILY);
262     Result result = table.get(get);
263     assertTrue(result.size() == 1);
264     assertTrue(Bytes.equals(value,
265                 result.getValue(HConstants.CATALOG_FAMILY, null)));
266     LOG.info("Validated row " + row);
267   }
268 
269   void batchWriteAndWait(HTable table, int start, boolean expect, int timeout)
270       throws IOException {
271     for (int i = 0; i < 10; i++) {
272       Put put = new Put(Bytes.toBytes("row"
273           + String.format("%1$04d", (start + i))));
274       put.add(HConstants.CATALOG_FAMILY, null, value);
275       table.put(put);
276     }
277     Put tmpPut = new Put(Bytes.toBytes("tmprow"));
278     tmpPut.add(HConstants.CATALOG_FAMILY, null, value);
279     long startTime = System.currentTimeMillis();
280     long remaining = timeout;
281     while (remaining > 0) {
282       if (log.isLowReplicationRollEnabled() == expect) {
283         break;
284       } else {
285         // Trigger calling FSHlog#checkLowReplication()
286         table.put(tmpPut);
287         try {
288           Thread.sleep(200);
289         } catch (InterruptedException e) {
290           // continue
291         }
292         remaining = timeout - (System.currentTimeMillis() - startTime);
293       }
294     }
295   }
296 
297   /**
298    * Give me the HDFS pipeline for this log file
299    */
300   DatanodeInfo[] getPipeline(HLog log) throws IllegalArgumentException,
301       IllegalAccessException, InvocationTargetException {
302     OutputStream stm = ((FSHLog) log).getOutputStream();
303     Method getPipeline = null;
304     for (Method m : stm.getClass().getDeclaredMethods()) {
305       if (m.getName().endsWith("getPipeline")) {
306         getPipeline = m;
307         getPipeline.setAccessible(true);
308         break;
309       }
310     }
311 
312     assertTrue("Need DFSOutputStream.getPipeline() for this test",
313         null != getPipeline);
314     Object repl = getPipeline.invoke(stm, new Object[] {} /* NO_ARGS */);
315     return (DatanodeInfo[]) repl;
316   }
317 
318 
319   /**
320    * Tests that logs are rolled upon detecting datanode death
321    * Requires an HDFS jar with HDFS-826 & syncFs() support (HDFS-200)
322    */
323   @Test
324   public void testLogRollOnDatanodeDeath() throws Exception {
325     assertTrue("This test requires HLog file replication set to 2.",
326       fs.getDefaultReplication() == 2);
327     LOG.info("Replication=" + fs.getDefaultReplication());
328 
329     this.server = cluster.getRegionServer(0);
330     this.log = server.getWAL();
331 
332     // Create the test table and open it
333     String tableName = getName();
334     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
335     desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
336 
337     admin.createTable(desc);
338     HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
339     assertTrue(table.isAutoFlush());
340 
341     server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
342     this.log = server.getWAL();
343 
344     assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
345     // don't run this test without append support (HDFS-200 & HDFS-142)
346     assertTrue("Need append support for this test", FSUtils
347         .isAppendSupported(TEST_UTIL.getConfiguration()));
348 
349     // add up the datanode count, to ensure proper replication when we kill 1
350     // This function is synchronous; when it returns, the dfs cluster is active
351     // We start 3 servers and then stop 2 to avoid a directory naming conflict
352     //  when we stop/start a namenode later, as mentioned in HBASE-5163
353     List<DataNode> existingNodes = dfsCluster.getDataNodes();
354     int numDataNodes = 3;
355     dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true,
356         null, null);
357     List<DataNode> allNodes = dfsCluster.getDataNodes();
358     for (int i = allNodes.size()-1; i >= 0; i--) {
359       if (existingNodes.contains(allNodes.get(i))) {
360         dfsCluster.stopDataNode( i );
361       }
362     }
363 
364     assertTrue("DataNodes " + dfsCluster.getDataNodes().size() +
365         " default replication " + fs.getDefaultReplication(),
366       dfsCluster.getDataNodes().size() >= fs.getDefaultReplication() + 1);
367 
368     writeData(table, 2);
369 
370     long curTime = System.currentTimeMillis();
371     long oldFilenum = ((FSHLog) log).getFilenum();
372     assertTrue("Log should have a timestamp older than now",
373         curTime > oldFilenum && oldFilenum != -1);
374 
375     assertTrue("The log shouldn't have rolled yet",
376       oldFilenum == ((FSHLog) log).getFilenum());
377     final DatanodeInfo[] pipeline = getPipeline(log);
378     assertTrue(pipeline.length == fs.getDefaultReplication());
379 
380     // kill a datanode in the pipeline to force a log roll on the next sync()
381     // This function is synchronous, when it returns the node is killed.
382     assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null);
383 
384     // this write should succeed, but trigger a log roll
385     writeData(table, 2);
386     long newFilenum = ((FSHLog) log).getFilenum();
387 
388     assertTrue("Missing datanode should've triggered a log roll",
389         newFilenum > oldFilenum && newFilenum > curTime);
390 
391     // write some more log data (this should use a new hdfs_out)
392     writeData(table, 3);
393     assertTrue("The log should not roll again.",
394       ((FSHLog) log).getFilenum() == newFilenum);
395     // kill another datanode in the pipeline, so the replicas will be lower than
396     // the configured value 2.
397     assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
398 
399     batchWriteAndWait(table, 3, false, 14000);
400     assertTrue("LowReplication Roller should've been disabled, current replication="
401             + ((FSHLog) log).getLogReplication(),
402         !log.isLowReplicationRollEnabled());
403 
404     dfsCluster
405         .startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
406 
407     // Force roll writer. The new log file will have the default replications,
408     // and the LowReplication Roller will be enabled.
409     log.rollWriter(true);
410     batchWriteAndWait(table, 13, true, 10000);
411     assertTrue("New log file should have the default replication instead of " +
412       ((FSHLog) log).getLogReplication(),
413       ((FSHLog) log).getLogReplication() == fs.getDefaultReplication());
414     assertTrue("LowReplication Roller should've been enabled",
415         log.isLowReplicationRollEnabled());
416   }
417 
418   /**
419    * Test that HLog is rolled when all data nodes in the pipeline have been
420    * restarted.
421    * @throws Exception
422    */
423   @Test
424   public void testLogRollOnPipelineRestart() throws Exception {
425     LOG.info("Starting testLogRollOnPipelineRestart");
426     assertTrue("This test requires HLog file replication.",
427       fs.getDefaultReplication() > 1);
428     LOG.info("Replication=" + fs.getDefaultReplication());
429     // When the hbase:meta table can be opened, the region servers are running
430     new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
431 
432     this.server = cluster.getRegionServer(0);
433     this.log = server.getWAL();
434 
435     // Create the test table and open it
436     String tableName = getName();
437     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
438     desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
439 
440     admin.createTable(desc);
441     HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
442 
443     server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
444     this.log = server.getWAL();
445     final List<Path> paths = new ArrayList<Path>();
446     final List<Integer> preLogRolledCalled = new ArrayList<Integer>();
447     paths.add(((FSHLog) log).computeFilename());
448     log.registerWALActionsListener(new WALActionsListener() {
449       @Override
450       public void preLogRoll(Path oldFile, Path newFile)  {
451         LOG.debug("preLogRoll: oldFile="+oldFile+" newFile="+newFile);
452         preLogRolledCalled.add(new Integer(1));
453       }
454       @Override
455       public void postLogRoll(Path oldFile, Path newFile) {
456         paths.add(newFile);
457       }
458       @Override
459       public void preLogArchive(Path oldFile, Path newFile) {}
460       @Override
461       public void postLogArchive(Path oldFile, Path newFile) {}
462       @Override
463       public void logRollRequested() {}
464       @Override
465       public void logCloseRequested() {}
466       @Override
467       public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
468           WALEdit logEdit) {}
469       @Override
470       public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
471           WALEdit logEdit) {}
472     });
473 
474     assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
475     // don't run this test without append support (HDFS-200 & HDFS-142)
476     assertTrue("Need append support for this test", FSUtils
477         .isAppendSupported(TEST_UTIL.getConfiguration()));
478 
479     writeData(table, 1002);
480 
481     table.setAutoFlush(true, true);
482 
483     long curTime = System.currentTimeMillis();
484     long oldFilenum = log.getFilenum();
485     assertTrue("Log should have a timestamp older than now",
486         curTime > oldFilenum && oldFilenum != -1);
487 
488     assertTrue("The log shouldn't have rolled yet", oldFilenum == log.getFilenum());
489 
490     // roll all datanodes in the pipeline
491     dfsCluster.restartDataNodes();
492     Thread.sleep(1000);
493     dfsCluster.waitActive();
494     LOG.info("Data Nodes restarted");
495     validateData(table, 1002);
496 
497     // this write should succeed, but trigger a log roll
498     writeData(table, 1003);
499     long newFilenum = log.getFilenum();
500 
501     assertTrue("Missing datanode should've triggered a log roll",
502         newFilenum > oldFilenum && newFilenum > curTime);
503     validateData(table, 1003);
504 
505     writeData(table, 1004);
506 
507     // roll all datanode again
508     dfsCluster.restartDataNodes();
509     Thread.sleep(1000);
510     dfsCluster.waitActive();
511     LOG.info("Data Nodes restarted");
512     validateData(table, 1004);
513 
514     // this write should succeed, but trigger a log roll
515     writeData(table, 1005);
516 
517     // force a log roll to read back and verify previously written logs
518     log.rollWriter(true);
519     assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(),
520         preLogRolledCalled.size() >= 1);
521 
522     // read back the data written
523     Set<String> loggedRows = new HashSet<String>();
524     FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration());
525     for (Path p : paths) {
526       LOG.debug("recovering lease for " + p);
527       fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), null);
528 
529       LOG.debug("Reading HLog "+FSUtils.getPath(p));
530       HLog.Reader reader = null;
531       try {
532         reader = HLogFactory.createReader(fs, p,
533             TEST_UTIL.getConfiguration());
534         HLog.Entry entry;
535         while ((entry = reader.next()) != null) {
536           LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getKeyValues());
537           for (KeyValue kv : entry.getEdit().getKeyValues()) {
538             loggedRows.add(Bytes.toStringBinary(kv.getRow()));
539           }
540         }
541       } catch (EOFException e) {
542         LOG.debug("EOF reading file "+FSUtils.getPath(p));
543       } finally {
544         if (reader != null) reader.close();
545       }
546     }
547 
548     // verify the written rows are there
549     assertTrue(loggedRows.contains("row1002"));
550     assertTrue(loggedRows.contains("row1003"));
551     assertTrue(loggedRows.contains("row1004"));
552     assertTrue(loggedRows.contains("row1005"));
553 
554     // flush all regions
555     List<HRegion> regions =
556         new ArrayList<HRegion>(server.getOnlineRegionsLocalContext());
557     for (HRegion r: regions) {
558       r.flushcache();
559     }
560 
561     ResultScanner scanner = table.getScanner(new Scan());
562     try {
563       for (int i=2; i<=5; i++) {
564         Result r = scanner.next();
565         assertNotNull(r);
566         assertFalse(r.isEmpty());
567         assertEquals("row100"+i, Bytes.toString(r.getRow()));
568       }
569     } finally {
570       scanner.close();
571     }
572 
573     // verify that no region servers aborted
574     for (JVMClusterUtil.RegionServerThread rsThread:
575         TEST_UTIL.getHBaseCluster().getRegionServerThreads()) {
576       assertFalse(rsThread.getRegionServer().isAborted());
577     }
578   }
579 
580   /**
581    * Tests that logs are deleted when some region has a compaction
582    * record in WAL and no other records. See HBASE-8597.
583    */
584   @Test
585   public void testCompactionRecordDoesntBlockRolling() throws Exception {
586     // When the hbase:meta table can be opened, the region servers are running
587     new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
588 
589     String tableName = getName();
590     HTable table = createTestTable(tableName);
591     String tableName2 = tableName + "1";
592     HTable table2 = createTestTable(tableName2);
593 
594     server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
595     this.log = server.getWAL();
596     FSHLog fshLog = (FSHLog)log;
597     HRegion region = server.getOnlineRegions(table2.getName()).get(0);
598     Store s = region.getStore(HConstants.CATALOG_FAMILY);
599 
600     //have to flush namespace to ensure it doesn't affect wall tests
601     admin.flush(TableName.NAMESPACE_TABLE_NAME.getName());
602 
603     // Put some stuff into table2, to make sure we have some files to compact.
604     for (int i = 1; i <= 2; ++i) {
605       doPut(table2, i);
606       admin.flush(table2.getTableName());
607     }
608     doPut(table2, 3); // don't flush yet, or compaction might trigger before we roll WAL
609     assertEquals("Should have no WAL after initial writes", 0, fshLog.getNumRolledLogFiles());
610     assertEquals(2, s.getStorefilesCount());
611 
612     // Roll the log and compact table2, to have compaction record in the 2nd WAL.
613     fshLog.rollWriter();
614     assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumRolledLogFiles());
615     admin.flush(table2.getTableName());
616     region.compactStores();
617     // Wait for compaction in case if flush triggered it before us.
618     Assert.assertNotNull(s);
619     for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) {
620       Threads.sleepWithoutInterrupt(200);
621     }
622     assertEquals("Compaction didn't happen", 1, s.getStorefilesCount());
623 
624     // Write some value to the table so the WAL cannot be deleted until table is flushed.
625     doPut(table, 0); // Now 2nd WAL will have compaction record for table2 and put for table.
626     fshLog.rollWriter(); // 1st WAL deleted, 2nd not deleted yet.
627     assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumRolledLogFiles());
628 
629     // Flush table to make latest WAL obsolete; write another record, and roll again.
630     admin.flush(table.getTableName());
631     doPut(table, 1);
632     fshLog.rollWriter(); // Now 2nd WAL is deleted and 3rd is added.
633     assertEquals("Should have 1 WALs at the end", 1, fshLog.getNumRolledLogFiles());
634 
635     table.close();
636     table2.close();
637   }
638 
639   private void doPut(HTable table, int i) throws IOException {
640     Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
641     put.add(HConstants.CATALOG_FAMILY, null, value);
642     table.put(put);
643   }
644 
645   private HTable createTestTable(String tableName) throws IOException {
646     // Create the test table and open it
647     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
648     desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
649     admin.createTable(desc);
650     return new HTable(TEST_UTIL.getConfiguration(), tableName);
651   }
652 }
653