View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  import java.util.Collection;
25  import java.util.List;
26  import java.util.concurrent.CountDownLatch;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.commons.logging.impl.Log4JLogger;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.client.HBaseAdmin;
36  import org.apache.hadoop.hbase.client.HTable;
37  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
39  import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
40  import org.apache.hadoop.hbase.regionserver.HRegion;
41  import org.apache.hadoop.hbase.regionserver.HRegionServer;
42  import org.apache.hadoop.hbase.regionserver.HStore;
43  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
44  import org.apache.hadoop.hbase.regionserver.Store;
45  import org.apache.hadoop.hbase.regionserver.StoreFile;
46  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
47  import org.apache.hadoop.hbase.regionserver.wal.HLog;
48  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
51  import org.apache.hadoop.hdfs.DFSClient;
52  import org.apache.hadoop.hdfs.server.datanode.DataNode;
53  import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
54  import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
55  import org.apache.log4j.Level;
56  import org.junit.Ignore;
57  import org.junit.Test;
58  import org.junit.experimental.categories.Category;
59  
60  import com.google.common.collect.Lists;
61  
62  /**
63   * Test for the case where a regionserver going down has enough cycles to do damage to regions
64   * that have actually been assigned elsehwere.
65   *
66   * <p>If we happen to assign a region before it fully done with in its old location -- i.e. it is on two servers at the
67   * same time -- all can work fine until the case where the region on the dying server decides to compact or otherwise
68   * change the region file set.  The region in its new location will then get a surprise when it tries to do something
69   * w/ a file removed by the region in its old location on dying server.
70   *
71   * <p>Making a test for this case is a little tough in that even if a file is deleted up on the namenode,
72   * if the file was opened before the delete, it will continue to let reads happen until something changes the
73   * state of cached blocks in the dfsclient that was already open (a block from the deleted file is cleaned
74   * from the datanode by NN).
75   *
76   * <p>What we will do below is do an explicit check for existence on the files listed in the region that
77   * has had some files removed because of a compaction.  This sort of hurry's along and makes certain what is a chance
78   * occurance.
79   */
80  @Category(MediumTests.class)
81  public class TestIOFencing {
82    static final Log LOG = LogFactory.getLog(TestIOFencing.class);
83    static {
84      ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
85      ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
86      ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
87      ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem")).getLogger().setLevel(Level.ALL);
88      ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
89      ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
90    }
91  
92    public abstract static class CompactionBlockerRegion extends HRegion {
93      volatile int compactCount = 0;
94      volatile CountDownLatch compactionsBlocked = new CountDownLatch(0);
95      volatile CountDownLatch compactionsWaiting = new CountDownLatch(0);
96  
97      @SuppressWarnings("deprecation")
98      public CompactionBlockerRegion(Path tableDir, HLog log,
99          FileSystem fs, Configuration confParam, HRegionInfo info,
100         HTableDescriptor htd, RegionServerServices rsServices) {
101       super(tableDir, log, fs, confParam, info, htd, rsServices);
102     }
103 
104     public void stopCompactions() {
105       compactionsBlocked = new CountDownLatch(1);
106       compactionsWaiting = new CountDownLatch(1);
107     }
108 
109     public void allowCompactions() {
110       LOG.debug("allowing compactions");
111       compactionsBlocked.countDown();
112     }
113     public void waitForCompactionToBlock() throws IOException {
114       try {
115         LOG.debug("waiting for compaction to block");
116         compactionsWaiting.await();
117         LOG.debug("compaction block reached");
118       } catch (InterruptedException ex) {
119         throw new IOException(ex);
120       }
121     }
122     @Override
123     public boolean compact(CompactionContext compaction, Store store) throws IOException {
124       try {
125         return super.compact(compaction, store);
126       } finally {
127         compactCount++;
128       }
129     }
130     public int countStoreFiles() {
131       int count = 0;
132       for (Store store : stores.values()) {
133         count += store.getStorefilesCount();
134       }
135       return count;
136     }
137   }
138 
139   /**
140    * An override of HRegion that allows us park compactions in a holding pattern and
141    * then when appropriate for the test, allow them proceed again.
142    */
143   public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion {
144 
145     public BlockCompactionsInPrepRegion(Path tableDir, HLog log,
146         FileSystem fs, Configuration confParam, HRegionInfo info,
147         HTableDescriptor htd, RegionServerServices rsServices) {
148       super(tableDir, log, fs, confParam, info, htd, rsServices);
149     }
150     @Override
151     protected void doRegionCompactionPrep() throws IOException {
152       compactionsWaiting.countDown();
153       try {
154         compactionsBlocked.await();
155       } catch (InterruptedException ex) {
156         throw new IOException();
157       }
158       super.doRegionCompactionPrep();
159     }
160   }
161 
162   /**
163    * An override of HRegion that allows us park compactions in a holding pattern and
164    * then when appropriate for the test, allow them proceed again. This allows the compaction
165    * entry to go the WAL before blocking, but blocks afterwards
166    */
167   public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion {
168     public BlockCompactionsInCompletionRegion(Path tableDir, HLog log,
169         FileSystem fs, Configuration confParam, HRegionInfo info,
170         HTableDescriptor htd, RegionServerServices rsServices) {
171       super(tableDir, log, fs, confParam, info, htd, rsServices);
172     }
173     @Override
174     protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
175       return new BlockCompactionsInCompletionHStore(this, family, this.conf);
176     }
177   }
178 
179   public static class BlockCompactionsInCompletionHStore extends HStore {
180     CompactionBlockerRegion r;
181     protected BlockCompactionsInCompletionHStore(HRegion region, HColumnDescriptor family,
182         Configuration confParam) throws IOException {
183       super(region, family, confParam);
184       r = (CompactionBlockerRegion) region;
185     }
186 
187     @Override
188     protected void completeCompaction(Collection<StoreFile> compactedFiles) throws IOException {
189       try {
190         r.compactionsWaiting.countDown();
191         r.compactionsBlocked.await();
192       } catch (InterruptedException ex) {
193         throw new IOException(ex);
194       }
195       super.completeCompaction(compactedFiles);
196     }
197   }
198 
199   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
200   private final static TableName TABLE_NAME =
201       TableName.valueOf("tabletest");
202   private final static byte[] FAMILY = Bytes.toBytes("family");
203   private static final int FIRST_BATCH_COUNT = 4000;
204   private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT;
205 
206   /**
207    * Test that puts up a regionserver, starts a compaction on a loaded region but holds the
208    * compaction until after we have killed the server and the region has come up on
209    * a new regionserver altogether.  This fakes the double assignment case where region in one
210    * location changes the files out from underneath a region being served elsewhere.
211    */
212   @Ignore("See HBASE-10298")
213   @Test
214   public void testFencingAroundCompaction() throws Exception {
215     doTest(BlockCompactionsInPrepRegion.class, false);
216     doTest(BlockCompactionsInPrepRegion.class, true);
217   }
218 
219   /**
220    * Test that puts up a regionserver, starts a compaction on a loaded region but holds the
221    * compaction completion until after we have killed the server and the region has come up on
222    * a new regionserver altogether.  This fakes the double assignment case where region in one
223    * location changes the files out from underneath a region being served elsewhere.
224    */
225   @Ignore("See HBASE-10298")
226   @Test
227   public void testFencingAroundCompactionAfterWALSync() throws Exception {
228     doTest(BlockCompactionsInCompletionRegion.class, false);
229     doTest(BlockCompactionsInCompletionRegion.class, true);
230   }
231 
232   public void doTest(Class<?> regionClass, boolean distributedLogReplay) throws Exception {
233     Configuration c = TEST_UTIL.getConfiguration();
234     c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
235     // Insert our custom region
236     c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
237     c.setBoolean("dfs.support.append", true);
238     // Encourage plenty of flushes
239     c.setLong("hbase.hregion.memstore.flush.size", 200000);
240     c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName());
241     // Only run compaction when we tell it to
242     c.setInt("hbase.hstore.compactionThreshold", 1000);
243     c.setLong("hbase.hstore.blockingStoreFiles", 1000);
244     // Compact quickly after we tell it to!
245     c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000);
246     LOG.info("Starting mini cluster");
247     TEST_UTIL.startMiniCluster(1);
248     CompactionBlockerRegion compactingRegion = null;
249     HBaseAdmin admin = null;
250     try {
251       LOG.info("Creating admin");
252       admin = new HBaseAdmin(c);
253       LOG.info("Creating table");
254       TEST_UTIL.createTable(TABLE_NAME, FAMILY);
255       HTable table = new HTable(c, TABLE_NAME);
256       LOG.info("Loading test table");
257       // Find the region
258       List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME);
259       assertEquals(1, testRegions.size());
260       compactingRegion = (CompactionBlockerRegion)testRegions.get(0);
261       LOG.info("Blocking compactions");
262       compactingRegion.stopCompactions();
263       long lastFlushTime = compactingRegion.getLastFlushTime();
264       // Load some rows
265       TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
266 
267       // add a compaction from an older (non-existing) region to see whether we successfully skip
268       // those entries
269       HRegionInfo oldHri = new HRegionInfo(table.getName(),
270         HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
271       CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
272         FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
273         new Path("store_dir"));
274       HLogUtil.writeCompactionMarker(compactingRegion.getLog(), table.getTableDescriptor(),
275         oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100));
276 
277       // Wait till flush has happened, otherwise there won't be multiple store files
278       long startWaitTime = System.currentTimeMillis();
279       while (compactingRegion.getLastFlushTime() <= lastFlushTime ||
280           compactingRegion.countStoreFiles() <= 1) {
281         LOG.info("Waiting for the region to flush " + compactingRegion.getRegionNameAsString());
282         Thread.sleep(1000);
283         assertTrue("Timed out waiting for the region to flush",
284           System.currentTimeMillis() - startWaitTime < 30000);
285       }
286       assertTrue(compactingRegion.countStoreFiles() > 1);
287       final byte REGION_NAME[] = compactingRegion.getRegionName();
288       LOG.info("Asking for compaction");
289       admin.majorCompact(TABLE_NAME.getName());
290       LOG.info("Waiting for compaction to be about to start");
291       compactingRegion.waitForCompactionToBlock();
292       LOG.info("Starting a new server");
293       RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
294       final HRegionServer newServer = newServerThread.getRegionServer();
295       LOG.info("Killing region server ZK lease");
296       TEST_UTIL.expireRegionServerSession(0);
297       CompactionBlockerRegion newRegion = null;
298       startWaitTime = System.currentTimeMillis();
299       LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
300 
301       // wait for region to be assigned and to go out of log replay if applicable
302       Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() {
303         @Override
304         public boolean evaluate() throws Exception {
305           HRegion newRegion = newServer.getOnlineRegion(REGION_NAME);
306           return newRegion != null && !newRegion.isRecovering();
307         }
308       });
309 
310       newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
311 
312       LOG.info("Allowing compaction to proceed");
313       compactingRegion.allowCompactions();
314       while (compactingRegion.compactCount == 0) {
315         Thread.sleep(1000);
316       }
317       // The server we killed stays up until the compaction that was started before it was killed completes.  In logs
318       // you should see the old regionserver now going down.
319       LOG.info("Compaction finished");
320       // After compaction of old region finishes on the server that was going down, make sure that
321       // all the files we expect are still working when region is up in new location.
322       FileSystem fs = newRegion.getFilesystem();
323       for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) {
324         assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f)));
325       }
326       // If we survive the split keep going...
327       // Now we make sure that the region isn't totally confused.  Load up more rows.
328       TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
329       admin.majorCompact(TABLE_NAME.getName());
330       startWaitTime = System.currentTimeMillis();
331       while (newRegion.compactCount == 0) {
332         Thread.sleep(1000);
333         assertTrue("New region never compacted", System.currentTimeMillis() - startWaitTime < 180000);
334       }
335       assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, TEST_UTIL.countRows(table));
336     } finally {
337       if (compactingRegion != null) {
338         compactingRegion.allowCompactions();
339       }
340       admin.close();
341       TEST_UTIL.shutdownMiniCluster();
342     }
343   }
344 }