1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import static org.junit.Assert.assertTrue;
21 import static org.junit.Assert.fail;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.commons.logging.impl.Log4JLogger;
26 import org.apache.hadoop.hbase.*;
27 import org.apache.hadoop.hbase.client.HBaseAdmin;
28 import org.apache.hadoop.hbase.client.HTable;
29 import org.apache.hadoop.hbase.client.Put;
30 import org.apache.hadoop.hbase.regionserver.HRegion;
31 import org.apache.hadoop.hbase.regionserver.HRegionServer;
32 import org.apache.hadoop.hbase.util.Bytes;
33 import org.apache.hadoop.hbase.util.FSUtils;
34 import org.apache.hadoop.hdfs.MiniDFSCluster;
35 import org.apache.hadoop.hdfs.server.datanode.DataNode;
36 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
37 import org.apache.log4j.Level;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.BeforeClass;
41 import org.junit.Test;
42 import org.junit.experimental.categories.Category;
43
44
45
46
47
48 @Category(MediumTests.class)
49 public class TestLogRollAbort {
50 private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
51 private static MiniDFSCluster dfsCluster;
52 private static HBaseAdmin admin;
53 private static MiniHBaseCluster cluster;
54 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
55
56
57 {
58 ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
59 ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
60 ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
61 .getLogger().setLevel(Level.ALL);
62 ((Log4JLogger)HRegionServer.LOG).getLogger().setLevel(Level.ALL);
63 ((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.ALL);
64 ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
65 }
66
67
68
69 @BeforeClass
70 public static void setUpBeforeClass() throws Exception {
71
72 TEST_UTIL.getConfiguration().setInt(
73 "hbase.regionserver.logroll.errors.tolerated", 2);
74 TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000);
75 TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
76 TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
77
78
79 TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 5 * 1000);
80
81
82 TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
83
84
85 TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
86 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
87
88
89 TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 10);
90 }
91
92 @Before
93 public void setUp() throws Exception {
94 TEST_UTIL.startMiniCluster(2);
95
96 cluster = TEST_UTIL.getHBaseCluster();
97 dfsCluster = TEST_UTIL.getDFSCluster();
98 admin = TEST_UTIL.getHBaseAdmin();
99
100
101 cluster.getMaster().balanceSwitch(false);
102 }
103
104 @After
105 public void tearDown() throws Exception {
106 TEST_UTIL.shutdownMiniCluster();
107 }
108
109
110
111
112
113 @Test
114 public void testRSAbortWithUnflushedEdits() throws Exception {
115 LOG.info("Starting testRSAbortWithUnflushedEdits()");
116
117
118 new HTable(TEST_UTIL.getConfiguration(),
119 TableName.META_TABLE_NAME).close();
120
121
122 String tableName = this.getClass().getSimpleName();
123 HTableDescriptor desc = new HTableDescriptor(tableName);
124 desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
125 desc.setDeferredLogFlush(true);
126
127 admin.createTable(desc);
128 HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
129
130 HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
131 HLog log = server.getWAL();
132
133 assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
134
135 assertTrue("Need append support for this test",
136 FSUtils.isAppendSupported(TEST_UTIL.getConfiguration()));
137
138 Put p = new Put(Bytes.toBytes("row2001"));
139 p.add(HConstants.CATALOG_FAMILY, Bytes.toBytes("col"), Bytes.toBytes(2001));
140 table.put(p);
141
142 log.sync();
143
144 p = new Put(Bytes.toBytes("row2002"));
145 p.add(HConstants.CATALOG_FAMILY, Bytes.toBytes("col"), Bytes.toBytes(2002));
146 table.put(p);
147
148 dfsCluster.restartDataNodes();
149 LOG.info("Restarted datanodes");
150
151 try {
152 log.rollWriter(true);
153 } catch (FailedLogCloseException flce) {
154 assertTrue("Should have deferred flush log edits outstanding",
155 ((FSHLog) log).hasUnSyncedEntries());
156 }
157 }
158
159 }
160