1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.atomic.AtomicLong;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.commons.logging.impl.Log4JLogger;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.client.HBaseAdmin;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
39 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
40 import org.apache.hadoop.hbase.regionserver.HRegion;
41 import org.apache.hadoop.hbase.regionserver.HRegionServer;
42 import org.apache.hadoop.hbase.regionserver.HStore;
43 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
44 import org.apache.hadoop.hbase.regionserver.Store;
45 import org.apache.hadoop.hbase.regionserver.StoreFile;
46 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
47 import org.apache.hadoop.hbase.regionserver.wal.HLog;
48 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
51 import org.apache.hadoop.hdfs.DFSClient;
52 import org.apache.hadoop.hdfs.server.datanode.DataNode;
53 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
54 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
55 import org.apache.log4j.Level;
56 import org.junit.Ignore;
57 import org.junit.Test;
58 import org.junit.experimental.categories.Category;
59
60 import com.google.common.collect.Lists;
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 @Category(MediumTests.class)
81 public class TestIOFencing {
82 static final Log LOG = LogFactory.getLog(TestIOFencing.class);
83 static {
84 ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
85 ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
86 ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
87 ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem")).getLogger().setLevel(Level.ALL);
88 ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
89 ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
90 }
91
92 public abstract static class CompactionBlockerRegion extends HRegion {
93 volatile int compactCount = 0;
94 volatile CountDownLatch compactionsBlocked = new CountDownLatch(0);
95 volatile CountDownLatch compactionsWaiting = new CountDownLatch(0);
96
97 @SuppressWarnings("deprecation")
98 public CompactionBlockerRegion(Path tableDir, HLog log,
99 FileSystem fs, Configuration confParam, HRegionInfo info,
100 HTableDescriptor htd, RegionServerServices rsServices) {
101 super(tableDir, log, fs, confParam, info, htd, rsServices);
102 }
103
104 public void stopCompactions() {
105 compactionsBlocked = new CountDownLatch(1);
106 compactionsWaiting = new CountDownLatch(1);
107 }
108
109 public void allowCompactions() {
110 LOG.debug("allowing compactions");
111 compactionsBlocked.countDown();
112 }
113 public void waitForCompactionToBlock() throws IOException {
114 try {
115 LOG.debug("waiting for compaction to block");
116 compactionsWaiting.await();
117 LOG.debug("compaction block reached");
118 } catch (InterruptedException ex) {
119 throw new IOException(ex);
120 }
121 }
122 @Override
123 public boolean compact(CompactionContext compaction, Store store) throws IOException {
124 try {
125 return super.compact(compaction, store);
126 } finally {
127 compactCount++;
128 }
129 }
130 public int countStoreFiles() {
131 int count = 0;
132 for (Store store : stores.values()) {
133 count += store.getStorefilesCount();
134 }
135 return count;
136 }
137 }
138
139
140
141
142
143 public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion {
144
145 public BlockCompactionsInPrepRegion(Path tableDir, HLog log,
146 FileSystem fs, Configuration confParam, HRegionInfo info,
147 HTableDescriptor htd, RegionServerServices rsServices) {
148 super(tableDir, log, fs, confParam, info, htd, rsServices);
149 }
150 @Override
151 protected void doRegionCompactionPrep() throws IOException {
152 compactionsWaiting.countDown();
153 try {
154 compactionsBlocked.await();
155 } catch (InterruptedException ex) {
156 throw new IOException();
157 }
158 super.doRegionCompactionPrep();
159 }
160 }
161
162
163
164
165
166
167 public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion {
168 public BlockCompactionsInCompletionRegion(Path tableDir, HLog log,
169 FileSystem fs, Configuration confParam, HRegionInfo info,
170 HTableDescriptor htd, RegionServerServices rsServices) {
171 super(tableDir, log, fs, confParam, info, htd, rsServices);
172 }
173 @Override
174 protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
175 return new BlockCompactionsInCompletionHStore(this, family, this.conf);
176 }
177 }
178
179 public static class BlockCompactionsInCompletionHStore extends HStore {
180 CompactionBlockerRegion r;
181 protected BlockCompactionsInCompletionHStore(HRegion region, HColumnDescriptor family,
182 Configuration confParam) throws IOException {
183 super(region, family, confParam);
184 r = (CompactionBlockerRegion) region;
185 }
186
187 @Override
188 protected void completeCompaction(Collection<StoreFile> compactedFiles) throws IOException {
189 try {
190 r.compactionsWaiting.countDown();
191 r.compactionsBlocked.await();
192 } catch (InterruptedException ex) {
193 throw new IOException(ex);
194 }
195 super.completeCompaction(compactedFiles);
196 }
197 }
198
199 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
200 private final static TableName TABLE_NAME =
201 TableName.valueOf("tabletest");
202 private final static byte[] FAMILY = Bytes.toBytes("family");
203 private static final int FIRST_BATCH_COUNT = 4000;
204 private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT;
205
206
207
208
209
210
211
212 @Ignore("See HBASE-10298")
213 @Test
214 public void testFencingAroundCompaction() throws Exception {
215 doTest(BlockCompactionsInPrepRegion.class, false);
216 doTest(BlockCompactionsInPrepRegion.class, true);
217 }
218
219
220
221
222
223
224
225 @Ignore("See HBASE-10298")
226 @Test
227 public void testFencingAroundCompactionAfterWALSync() throws Exception {
228 doTest(BlockCompactionsInCompletionRegion.class, false);
229 doTest(BlockCompactionsInCompletionRegion.class, true);
230 }
231
232 public void doTest(Class<?> regionClass, boolean distributedLogReplay) throws Exception {
233 Configuration c = TEST_UTIL.getConfiguration();
234 c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
235
236 c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
237 c.setBoolean("dfs.support.append", true);
238
239 c.setLong("hbase.hregion.memstore.flush.size", 200000);
240 c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName());
241
242 c.setInt("hbase.hstore.compactionThreshold", 1000);
243 c.setLong("hbase.hstore.blockingStoreFiles", 1000);
244
245 c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000);
246 LOG.info("Starting mini cluster");
247 TEST_UTIL.startMiniCluster(1);
248 CompactionBlockerRegion compactingRegion = null;
249 HBaseAdmin admin = null;
250 try {
251 LOG.info("Creating admin");
252 admin = new HBaseAdmin(c);
253 LOG.info("Creating table");
254 TEST_UTIL.createTable(TABLE_NAME, FAMILY);
255 HTable table = new HTable(c, TABLE_NAME);
256 LOG.info("Loading test table");
257
258 List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME);
259 assertEquals(1, testRegions.size());
260 compactingRegion = (CompactionBlockerRegion)testRegions.get(0);
261 LOG.info("Blocking compactions");
262 compactingRegion.stopCompactions();
263 long lastFlushTime = compactingRegion.getLastFlushTime();
264
265 TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
266
267
268
269 HRegionInfo oldHri = new HRegionInfo(table.getName(),
270 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
271 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
272 FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
273 new Path("store_dir"));
274 HLogUtil.writeCompactionMarker(compactingRegion.getLog(), table.getTableDescriptor(),
275 oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100));
276
277
278 long startWaitTime = System.currentTimeMillis();
279 while (compactingRegion.getLastFlushTime() <= lastFlushTime ||
280 compactingRegion.countStoreFiles() <= 1) {
281 LOG.info("Waiting for the region to flush " + compactingRegion.getRegionNameAsString());
282 Thread.sleep(1000);
283 assertTrue("Timed out waiting for the region to flush",
284 System.currentTimeMillis() - startWaitTime < 30000);
285 }
286 assertTrue(compactingRegion.countStoreFiles() > 1);
287 final byte REGION_NAME[] = compactingRegion.getRegionName();
288 LOG.info("Asking for compaction");
289 admin.majorCompact(TABLE_NAME.getName());
290 LOG.info("Waiting for compaction to be about to start");
291 compactingRegion.waitForCompactionToBlock();
292 LOG.info("Starting a new server");
293 RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
294 final HRegionServer newServer = newServerThread.getRegionServer();
295 LOG.info("Killing region server ZK lease");
296 TEST_UTIL.expireRegionServerSession(0);
297 CompactionBlockerRegion newRegion = null;
298 startWaitTime = System.currentTimeMillis();
299 LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
300
301
302 Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() {
303 @Override
304 public boolean evaluate() throws Exception {
305 HRegion newRegion = newServer.getOnlineRegion(REGION_NAME);
306 return newRegion != null && !newRegion.isRecovering();
307 }
308 });
309
310 newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
311
312 LOG.info("Allowing compaction to proceed");
313 compactingRegion.allowCompactions();
314 while (compactingRegion.compactCount == 0) {
315 Thread.sleep(1000);
316 }
317
318
319 LOG.info("Compaction finished");
320
321
322 FileSystem fs = newRegion.getFilesystem();
323 for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) {
324 assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f)));
325 }
326
327
328 TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
329 admin.majorCompact(TABLE_NAME.getName());
330 startWaitTime = System.currentTimeMillis();
331 while (newRegion.compactCount == 0) {
332 Thread.sleep(1000);
333 assertTrue("New region never compacted", System.currentTimeMillis() - startWaitTime < 180000);
334 }
335 assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, TEST_UTIL.countRows(table));
336 } finally {
337 if (compactingRegion != null) {
338 compactingRegion.allowCompactions();
339 }
340 admin.close();
341 TEST_UTIL.shutdownMiniCluster();
342 }
343 }
344 }