1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.concurrent.CountDownLatch;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.client.Admin;
34 import org.apache.hadoop.hbase.client.HBaseAdmin;
35 import org.apache.hadoop.hbase.client.Table;
36 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
38 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
39 import org.apache.hadoop.hbase.regionserver.HRegion;
40 import org.apache.hadoop.hbase.regionserver.HRegionServer;
41 import org.apache.hadoop.hbase.regionserver.HStore;
42 import org.apache.hadoop.hbase.regionserver.Region;
43 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
44 import org.apache.hadoop.hbase.regionserver.Store;
45 import org.apache.hadoop.hbase.regionserver.StoreFile;
46 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
47 import org.apache.hadoop.hbase.testclassification.MediumTests;
48 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
49 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
50 import org.apache.hadoop.hbase.security.User;
51 import org.apache.hadoop.hbase.util.Bytes;
52 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
53 import org.apache.hadoop.hbase.wal.WAL;
54 import org.junit.Test;
55 import org.junit.experimental.categories.Category;
56
57 import com.google.common.collect.Lists;
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 @Category(MediumTests.class)
78 public class TestIOFencing {
79 private static final Log LOG = LogFactory.getLog(TestIOFencing.class);
80 static {
81
82
83
84
85
86
87
88
89 }
90
91 public abstract static class CompactionBlockerRegion extends HRegion {
92 volatile int compactCount = 0;
93 volatile CountDownLatch compactionsBlocked = new CountDownLatch(0);
94 volatile CountDownLatch compactionsWaiting = new CountDownLatch(0);
95
96 @SuppressWarnings("deprecation")
97 public CompactionBlockerRegion(Path tableDir, WAL log,
98 FileSystem fs, Configuration confParam, HRegionInfo info,
99 HTableDescriptor htd, RegionServerServices rsServices) {
100 super(tableDir, log, fs, confParam, info, htd, rsServices);
101 }
102
103 public void stopCompactions() {
104 compactionsBlocked = new CountDownLatch(1);
105 compactionsWaiting = new CountDownLatch(1);
106 }
107
108 public void allowCompactions() {
109 LOG.debug("allowing compactions");
110 compactionsBlocked.countDown();
111 }
112 public void waitForCompactionToBlock() throws IOException {
113 try {
114 LOG.debug("waiting for compaction to block");
115 compactionsWaiting.await();
116 LOG.debug("compaction block reached");
117 } catch (InterruptedException ex) {
118 throw new IOException(ex);
119 }
120 }
121
122 @Override
123 public boolean compact(CompactionContext compaction, Store store,
124 CompactionThroughputController throughputController) throws IOException {
125 try {
126 return super.compact(compaction, store, throughputController);
127 } finally {
128 compactCount++;
129 }
130 }
131
132 @Override
133 public boolean compact(CompactionContext compaction, Store store,
134 CompactionThroughputController throughputController, User user) throws IOException {
135 try {
136 return super.compact(compaction, store, throughputController, user);
137 } finally {
138 compactCount++;
139 }
140 }
141
142 public int countStoreFiles() {
143 int count = 0;
144 for (Store store : stores.values()) {
145 count += store.getStorefilesCount();
146 }
147 return count;
148 }
149 }
150
151
152
153
154
155 public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion {
156
157 public BlockCompactionsInPrepRegion(Path tableDir, WAL log,
158 FileSystem fs, Configuration confParam, HRegionInfo info,
159 HTableDescriptor htd, RegionServerServices rsServices) {
160 super(tableDir, log, fs, confParam, info, htd, rsServices);
161 }
162 @Override
163 protected void doRegionCompactionPrep() throws IOException {
164 compactionsWaiting.countDown();
165 try {
166 compactionsBlocked.await();
167 } catch (InterruptedException ex) {
168 throw new IOException();
169 }
170 super.doRegionCompactionPrep();
171 }
172 }
173
174
175
176
177
178
179 public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion {
180 public BlockCompactionsInCompletionRegion(Path tableDir, WAL log,
181 FileSystem fs, Configuration confParam, HRegionInfo info,
182 HTableDescriptor htd, RegionServerServices rsServices) {
183 super(tableDir, log, fs, confParam, info, htd, rsServices);
184 }
185 @Override
186 protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
187 return new BlockCompactionsInCompletionHStore(this, family, this.conf);
188 }
189 }
190
191 public static class BlockCompactionsInCompletionHStore extends HStore {
192 CompactionBlockerRegion r;
193 protected BlockCompactionsInCompletionHStore(HRegion region, HColumnDescriptor family,
194 Configuration confParam) throws IOException {
195 super(region, family, confParam);
196 r = (CompactionBlockerRegion) region;
197 }
198
199 @Override
200 protected void completeCompaction(final Collection<StoreFile> compactedFiles,
201 boolean removeFiles) throws IOException {
202 try {
203 r.compactionsWaiting.countDown();
204 r.compactionsBlocked.await();
205 } catch (InterruptedException ex) {
206 throw new IOException(ex);
207 }
208 super.completeCompaction(compactedFiles, removeFiles);
209 }
210 @Override
211 protected void completeCompaction(Collection<StoreFile> compactedFiles) throws IOException {
212 try {
213 r.compactionsWaiting.countDown();
214 r.compactionsBlocked.await();
215 } catch (InterruptedException ex) {
216 throw new IOException(ex);
217 }
218 super.completeCompaction(compactedFiles);
219 }
220 }
221
222 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
223 private final static TableName TABLE_NAME =
224 TableName.valueOf("tabletest");
225 private final static byte[] FAMILY = Bytes.toBytes("family");
226 private static final int FIRST_BATCH_COUNT = 4000;
227 private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT;
228
229
230
231
232
233
234
235 @Test
236 public void testFencingAroundCompaction() throws Exception {
237 doTest(BlockCompactionsInPrepRegion.class);
238 }
239
240
241
242
243
244
245
246 @Test
247 public void testFencingAroundCompactionAfterWALSync() throws Exception {
248 doTest(BlockCompactionsInCompletionRegion.class);
249 }
250
251 public void doTest(Class<?> regionClass) throws Exception {
252 Configuration c = TEST_UTIL.getConfiguration();
253
254 c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
255 c.setBoolean("dfs.support.append", true);
256
257 c.setLong("hbase.hregion.memstore.flush.size", 200000);
258 c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName());
259
260 c.setInt("hbase.hstore.compactionThreshold", 1000);
261 c.setLong("hbase.hstore.blockingStoreFiles", 1000);
262
263 c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000);
264 LOG.info("Starting mini cluster");
265 TEST_UTIL.startMiniCluster(1);
266 CompactionBlockerRegion compactingRegion = null;
267 Admin admin = null;
268 try {
269 LOG.info("Creating admin");
270 admin = TEST_UTIL.getConnection().getAdmin();
271 LOG.info("Creating table");
272 TEST_UTIL.createTable(TABLE_NAME, FAMILY);
273 Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
274 LOG.info("Loading test table");
275
276 List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME);
277 assertEquals(1, testRegions.size());
278 compactingRegion = (CompactionBlockerRegion)testRegions.get(0);
279 LOG.info("Blocking compactions");
280 compactingRegion.stopCompactions();
281 long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores();
282
283 TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
284
285
286
287 HRegionInfo oldHri = new HRegionInfo(table.getName(),
288 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
289 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
290 FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
291 new Path("store_dir"));
292 WALUtil.writeCompactionMarker(compactingRegion.getWAL(), table.getTableDescriptor(),
293 oldHri, compactionDescriptor, compactingRegion.getMVCC());
294
295
296 long startWaitTime = System.currentTimeMillis();
297 while (compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime ||
298 compactingRegion.countStoreFiles() <= 1) {
299 LOG.info("Waiting for the region to flush " +
300 compactingRegion.getRegionInfo().getRegionNameAsString());
301 Thread.sleep(1000);
302 assertTrue("Timed out waiting for the region to flush",
303 System.currentTimeMillis() - startWaitTime < 30000);
304 }
305 assertTrue(compactingRegion.countStoreFiles() > 1);
306 final byte REGION_NAME[] = compactingRegion.getRegionInfo().getRegionName();
307 LOG.info("Asking for compaction");
308 ((HBaseAdmin)admin).majorCompact(TABLE_NAME.getName());
309 LOG.info("Waiting for compaction to be about to start");
310 compactingRegion.waitForCompactionToBlock();
311 LOG.info("Starting a new server");
312 RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
313 final HRegionServer newServer = newServerThread.getRegionServer();
314 LOG.info("Killing region server ZK lease");
315 TEST_UTIL.expireRegionServerSession(0);
316 CompactionBlockerRegion newRegion = null;
317 startWaitTime = System.currentTimeMillis();
318 LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
319
320
321 Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() {
322 @Override
323 public boolean evaluate() throws Exception {
324 Region newRegion = newServer.getOnlineRegion(REGION_NAME);
325 return newRegion != null && !newRegion.isRecovering();
326 }
327 });
328
329 newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
330
331
332
333 FileSystem fs = newRegion.getFilesystem();
334 for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) {
335 assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f)));
336 }
337 LOG.info("Allowing compaction to proceed");
338 compactingRegion.allowCompactions();
339 while (compactingRegion.compactCount == 0) {
340 Thread.sleep(1000);
341 }
342
343
344 LOG.info("Compaction finished");
345
346
347
348 TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
349 ((HBaseAdmin)admin).majorCompact(TABLE_NAME.getName());
350 startWaitTime = System.currentTimeMillis();
351 while (newRegion.compactCount == 0) {
352 Thread.sleep(1000);
353 assertTrue("New region never compacted", System.currentTimeMillis() - startWaitTime < 180000);
354 }
355 assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, TEST_UTIL.countRows(table));
356 } finally {
357 if (compactingRegion != null) {
358 compactingRegion.allowCompactions();
359 }
360 admin.close();
361 TEST_UTIL.shutdownMiniCluster();
362 }
363 }
364 }