View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  import static org.junit.Assert.fail;
24  
25  import java.io.IOException;
26  import java.security.PrivilegedExceptionAction;
27  import java.util.ArrayList;
28  import java.util.List;
29  import java.util.SortedSet;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.concurrent.atomic.AtomicInteger;
32  import java.util.concurrent.atomic.AtomicLong;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileStatus;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HBaseTestingUtility;
43  import org.apache.hadoop.hbase.HColumnDescriptor;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionInfo;
46  import org.apache.hadoop.hbase.HTableDescriptor;
47  import org.apache.hadoop.hbase.KeyValue;
48  import org.apache.hadoop.hbase.MasterNotRunningException;
49  import org.apache.hadoop.hbase.MediumTests;
50  import org.apache.hadoop.hbase.MiniHBaseCluster;
51  import org.apache.hadoop.hbase.ServerName;
52  import org.apache.hadoop.hbase.TableName;
53  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
54  import org.apache.hadoop.hbase.client.Delete;
55  import org.apache.hadoop.hbase.client.Get;
56  import org.apache.hadoop.hbase.client.HTable;
57  import org.apache.hadoop.hbase.client.Put;
58  import org.apache.hadoop.hbase.client.Result;
59  import org.apache.hadoop.hbase.client.ResultScanner;
60  import org.apache.hadoop.hbase.client.Scan;
61  import org.apache.hadoop.hbase.master.HMaster;
62  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
63  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
64  import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
65  import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
66  import org.apache.hadoop.hbase.regionserver.FlushRequester;
67  import org.apache.hadoop.hbase.regionserver.HRegion;
68  import org.apache.hadoop.hbase.regionserver.HRegionServer;
69  import org.apache.hadoop.hbase.regionserver.RegionScanner;
70  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
71  import org.apache.hadoop.hbase.regionserver.Store;
72  import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
73  import org.apache.hadoop.hbase.security.User;
74  import org.apache.hadoop.hbase.util.Bytes;
75  import org.apache.hadoop.hbase.util.EnvironmentEdge;
76  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
77  import org.apache.hadoop.hbase.util.FSUtils;
78  import org.apache.hadoop.hbase.util.HFileTestUtil;
79  import org.apache.hadoop.hbase.util.Pair;
80  import org.junit.After;
81  import org.junit.AfterClass;
82  import org.junit.Before;
83  import org.junit.BeforeClass;
84  import org.junit.Test;
85  import org.junit.experimental.categories.Category;
86  import org.mockito.Mockito;
87  
88  /**
89   * Test replay of edits out of a WAL split.
90   */
91  @Category(MediumTests.class)
92  public class TestWALReplay {
93    public static final Log LOG = LogFactory.getLog(TestWALReplay.class);
94    static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
95    private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
96    private Path hbaseRootDir = null;
97    private String logName;
98    private Path oldLogDir;
99    private Path logDir;
100   private FileSystem fs;
101   private Configuration conf;
102   private RecoveryMode mode;
103   
104 
105   @BeforeClass
106   public static void setUpBeforeClass() throws Exception {
107     Configuration conf = TEST_UTIL.getConfiguration();
108     conf.setBoolean("dfs.support.append", true);
109     // The below config supported by 0.20-append and CDH3b2
110     conf.setInt("dfs.client.block.recovery.retries", 2);
111     TEST_UTIL.startMiniCluster(3);
112     Path hbaseRootDir =
113       TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
114     LOG.info("hbase.rootdir=" + hbaseRootDir);
115     FSUtils.setRootDir(conf, hbaseRootDir);
116   }
117 
118   @AfterClass
119   public static void tearDownAfterClass() throws Exception {
120     TEST_UTIL.shutdownMiniCluster();
121   }
122 
123   @Before
124   public void setUp() throws Exception {
125     this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
126     this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
127     this.hbaseRootDir = FSUtils.getRootDir(this.conf);
128     this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
129     this.logName = HConstants.HREGION_LOGDIR_NAME;
130     this.logDir = new Path(this.hbaseRootDir, logName);
131     if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
132       TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
133     }
134     this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
135         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
136   }
137 
138   @After
139   public void tearDown() throws Exception {
140     TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
141   }
142 
143   /*
144    * @param p Directory to cleanup
145    */
146   private void deleteDir(final Path p) throws IOException {
147     if (this.fs.exists(p)) {
148       if (!this.fs.delete(p, true)) {
149         throw new IOException("Failed remove of " + p);
150       }
151     }
152   }
153 
154   /**
155    * 
156    * @throws Exception
157    */
158   @Test
159   public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
160     final TableName tableName =
161         TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF");
162     byte[] family1 = Bytes.toBytes("cf1");
163     byte[] family2 = Bytes.toBytes("cf2");
164     byte[] qualifier = Bytes.toBytes("q");
165     byte[] value = Bytes.toBytes("testV");
166     byte[][] familys = { family1, family2 };
167     TEST_UTIL.createTable(tableName, familys);
168     HTable htable = new HTable(TEST_UTIL.getConfiguration(), tableName);
169     Put put = new Put(Bytes.toBytes("r1"));
170     put.add(family1, qualifier, value);
171     htable.put(put);
172     ResultScanner resultScanner = htable.getScanner(new Scan());
173     int count = 0;
174     while (resultScanner.next() != null) {
175       count++;
176     }
177     resultScanner.close();
178     assertEquals(1, count);
179 
180     MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
181     List<HRegion> regions = hbaseCluster.getRegions(tableName);
182     assertEquals(1, regions.size());
183 
184     // move region to another regionserver
185     HRegion destRegion = regions.get(0);
186     int originServerNum = hbaseCluster
187         .getServerWith(destRegion.getRegionName());
188     assertTrue("Please start more than 1 regionserver", hbaseCluster
189         .getRegionServerThreads().size() > 1);
190     int destServerNum = 0;
191     while (destServerNum == originServerNum) {
192       destServerNum++;
193     }
194     HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
195     HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
196     // move region to destination regionserver
197     moveRegionAndWait(destRegion, destServer);
198 
199     // delete the row
200     Delete del = new Delete(Bytes.toBytes("r1"));
201     htable.delete(del);
202     resultScanner = htable.getScanner(new Scan());
203     count = 0;
204     while (resultScanner.next() != null) {
205       count++;
206     }
207     resultScanner.close();
208     assertEquals(0, count);
209 
210     // flush region and make major compaction
211     destServer.getOnlineRegion(destRegion.getRegionName()).flushcache();
212     // wait to complete major compaction
213     for (Store store : destServer.getOnlineRegion(destRegion.getRegionName())
214         .getStores().values()) {
215       store.triggerMajorCompaction();
216     }
217     destServer.getOnlineRegion(destRegion.getRegionName()).compactStores();
218 
219     // move region to origin regionserver
220     moveRegionAndWait(destRegion, originServer);
221     // abort the origin regionserver
222     originServer.abort("testing");
223 
224     // see what we get
225     Result result = htable.get(new Get(Bytes.toBytes("r1")));
226     if (result != null) {
227       assertTrue("Row is deleted, but we get" + result.toString(),
228           (result == null) || result.isEmpty());
229     }
230     resultScanner.close();
231   }
232 
233   private void moveRegionAndWait(HRegion destRegion, HRegionServer destServer)
234       throws InterruptedException, MasterNotRunningException,
235       ZooKeeperConnectionException, IOException {
236     HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
237     TEST_UTIL.getHBaseAdmin().move(
238         destRegion.getRegionInfo().getEncodedNameAsBytes(),
239         Bytes.toBytes(destServer.getServerName().getServerName()));
240     while (true) {
241       ServerName serverName = master.getAssignmentManager()
242         .getRegionStates().getRegionServerOfRegion(destRegion.getRegionInfo());
243       if (serverName != null && serverName.equals(destServer.getServerName())) {
244         TEST_UTIL.assertRegionOnServer(
245           destRegion.getRegionInfo(), serverName, 200);
246         break;
247       }
248       Thread.sleep(10);
249     }
250   }
251 
252   /**
253    * Tests for hbase-2727.
254    * @throws Exception
255    * @see https://issues.apache.org/jira/browse/HBASE-2727
256    */
257   @Test
258   public void test2727() throws Exception {
259     // Test being able to have > 1 set of edits in the recovered.edits directory.
260     // Ensure edits are replayed properly.
261     final TableName tableName =
262         TableName.valueOf("test2727");
263     HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
264     Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
265     deleteDir(basedir);
266 
267     HTableDescriptor htd = createBasic3FamilyHTD(tableName);
268     HRegion region2 = HRegion.createHRegion(hri,
269         hbaseRootDir, this.conf, htd);
270     HRegion.closeHRegion(region2);
271     final byte [] rowName = tableName.getName();
272 
273     HLog wal1 = createWAL(this.conf);
274     // Add 1k to each family.
275     final int countPerFamily = 1000;
276     final AtomicLong sequenceId = new AtomicLong(1);
277     for (HColumnDescriptor hcd: htd.getFamilies()) {
278       addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee,
279           wal1, htd, sequenceId);
280     }
281     wal1.close();
282     runWALSplit(this.conf);
283 
284     HLog wal2 = createWAL(this.conf);
285     // Add 1k to each family.
286     for (HColumnDescriptor hcd: htd.getFamilies()) {
287       addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
288           ee, wal2, htd, sequenceId);
289     }
290     wal2.close();
291     runWALSplit(this.conf);
292 
293     HLog wal3 = createWAL(this.conf);
294     try {
295       HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3);
296       long seqid = region.getOpenSeqNum();
297       // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1.
298       // When opened, this region would apply 6k edits, and increment the sequenceId by 1
299       assertTrue(seqid > sequenceId.get());
300       assertEquals(seqid - 1, sequenceId.get());
301       LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: "
302           + sequenceId.get());
303 
304       // TODO: Scan all.
305       region.close();
306     } finally {
307       wal3.closeAndDelete();
308     }
309   }
310 
311   /**
312    * Test case of HRegion that is only made out of bulk loaded files.  Assert
313    * that we don't 'crash'.
314    * @throws IOException
315    * @throws IllegalAccessException
316    * @throws NoSuchFieldException
317    * @throws IllegalArgumentException
318    * @throws SecurityException
319    */
320   @Test
321   public void testRegionMadeOfBulkLoadedFilesOnly()
322   throws IOException, SecurityException, IllegalArgumentException,
323       NoSuchFieldException, IllegalAccessException, InterruptedException {
324     final TableName tableName =
325         TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
326     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
327     final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
328     deleteDir(basedir);
329     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
330     HRegion region2 = HRegion.createHRegion(hri,
331         hbaseRootDir, this.conf, htd);
332     HRegion.closeHRegion(region2);
333     HLog wal = createWAL(this.conf);
334     HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
335 
336     byte [] family = htd.getFamilies().iterator().next().getName();
337     Path f =  new Path(basedir, "hfile");
338     HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
339         Bytes.toBytes("z"), 10);
340     List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
341     hfs.add(Pair.newPair(family, f.toString()));
342     region.bulkLoadHFiles(hfs, true);
343 
344     // Add an edit so something in the WAL
345     byte [] row = tableName.getName();
346     region.put((new Put(row)).add(family, family, family));
347     wal.sync();
348     final int rowsInsertedCount = 11;
349 
350     assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
351 
352     // Now 'crash' the region by stealing its wal
353     final Configuration newConf = HBaseConfiguration.create(this.conf);
354     User user = HBaseTestingUtility.getDifferentUser(newConf,
355         tableName.getNameAsString());
356     user.runAs(new PrivilegedExceptionAction() {
357       public Object run() throws Exception {
358         runWALSplit(newConf);
359         HLog wal2 = createWAL(newConf);
360 
361         HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
362           hbaseRootDir, hri, htd, wal2);
363         long seqid2 = region2.getOpenSeqNum();
364         assertTrue(seqid2 > -1);
365         assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
366 
367         // I can't close wal1.  Its been appropriated when we split.
368         region2.close();
369         wal2.closeAndDelete();
370         return null;
371       }
372     });
373   }
374 
375   /**
376    * HRegion test case that is made of a major compacted HFile (created with three bulk loaded
377    * files) and an edit in the memstore.
378    * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries
379    * from being replayed"
380    * @throws IOException
381    * @throws IllegalAccessException
382    * @throws NoSuchFieldException
383    * @throws IllegalArgumentException
384    * @throws SecurityException
385    */
386   @Test
387   public void testCompactedBulkLoadedFiles()
388       throws IOException, SecurityException, IllegalArgumentException,
389       NoSuchFieldException, IllegalAccessException, InterruptedException {
390     final TableName tableName =
391         TableName.valueOf("testCompactedBulkLoadedFiles");
392     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
393     final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
394     deleteDir(basedir);
395     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
396     HRegion region2 = HRegion.createHRegion(hri,
397         hbaseRootDir, this.conf, htd);
398     HRegion.closeHRegion(region2);
399     HLog wal = createWAL(this.conf);
400     HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
401 
402     // Add an edit so something in the WAL
403     byte [] row = tableName.getName();
404     byte [] family = htd.getFamilies().iterator().next().getName();
405     region.put((new Put(row)).add(family, family, family));
406     wal.sync();
407 
408     List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
409     for (int i = 0; i < 3; i++) {
410       Path f = new Path(basedir, "hfile"+i);
411       HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
412           Bytes.toBytes(i + "50"), 10);
413       hfs.add(Pair.newPair(family, f.toString()));
414     }
415     region.bulkLoadHFiles(hfs, true);
416     final int rowsInsertedCount = 31;
417     assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
418 
419     // major compact to turn all the bulk loaded files into one normal file
420     region.compactStores(true);
421     assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
422 
423     // Now 'crash' the region by stealing its wal
424     final Configuration newConf = HBaseConfiguration.create(this.conf);
425     User user = HBaseTestingUtility.getDifferentUser(newConf,
426         tableName.getNameAsString());
427     user.runAs(new PrivilegedExceptionAction() {
428       public Object run() throws Exception {
429         runWALSplit(newConf);
430         HLog wal2 = createWAL(newConf);
431 
432         HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
433             hbaseRootDir, hri, htd, wal2);
434         long seqid2 = region2.getOpenSeqNum();
435         assertTrue(seqid2 > -1);
436         assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
437 
438         // I can't close wal1.  Its been appropriated when we split.
439         region2.close();
440         wal2.closeAndDelete();
441         return null;
442       }
443     });
444   }
445 
446 
447   /**
448    * Test writing edits into an HRegion, closing it, splitting logs, opening
449    * Region again.  Verify seqids.
450    * @throws IOException
451    * @throws IllegalAccessException
452    * @throws NoSuchFieldException
453    * @throws IllegalArgumentException
454    * @throws SecurityException
455    */
456   @Test
457   public void testReplayEditsWrittenViaHRegion()
458   throws IOException, SecurityException, IllegalArgumentException,
459       NoSuchFieldException, IllegalAccessException, InterruptedException {
460     final TableName tableName =
461         TableName.valueOf("testReplayEditsWrittenViaHRegion");
462     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
463     final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
464     deleteDir(basedir);
465     final byte[] rowName = tableName.getName();
466     final int countPerFamily = 10;
467     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
468     HRegion region3 = HRegion.createHRegion(hri,
469             hbaseRootDir, this.conf, htd);
470     HRegion.closeHRegion(region3);
471     // Write countPerFamily edits into the three families.  Do a flush on one
472     // of the families during the load of edits so its seqid is not same as
473     // others to test we do right thing when different seqids.
474     HLog wal = createWAL(this.conf);
475     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
476     long seqid = region.getOpenSeqNum();
477     boolean first = true;
478     for (HColumnDescriptor hcd: htd.getFamilies()) {
479       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
480       if (first ) {
481         // If first, so we have at least one family w/ different seqid to rest.
482         region.flushcache();
483         first = false;
484       }
485     }
486     // Now assert edits made it in.
487     final Get g = new Get(rowName);
488     Result result = region.get(g);
489     assertEquals(countPerFamily * htd.getFamilies().size(),
490       result.size());
491     // Now close the region (without flush), split the log, reopen the region and assert that
492     // replay of log has the correct effect, that our seqids are calculated correctly so
493     // all edits in logs are seen as 'stale'/old.
494     region.close(true);
495     wal.close();
496     runWALSplit(this.conf);
497     HLog wal2 = createWAL(this.conf);
498     HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2);
499     long seqid2 = region2.getOpenSeqNum();
500     assertTrue(seqid + result.size() < seqid2);
501     final Result result1b = region2.get(g);
502     assertEquals(result.size(), result1b.size());
503 
504     // Next test.  Add more edits, then 'crash' this region by stealing its wal
505     // out from under it and assert that replay of the log adds the edits back
506     // correctly when region is opened again.
507     for (HColumnDescriptor hcd: htd.getFamilies()) {
508       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
509     }
510     // Get count of edits.
511     final Result result2 = region2.get(g);
512     assertEquals(2 * result.size(), result2.size());
513     wal2.sync();
514     // Set down maximum recovery so we dfsclient doesn't linger retrying something
515     // long gone.
516     HBaseTestingUtility.setMaxRecoveryErrorCount(((FSHLog) wal2).getOutputStream(), 1);
517     final Configuration newConf = HBaseConfiguration.create(this.conf);
518     User user = HBaseTestingUtility.getDifferentUser(newConf,
519       tableName.getNameAsString());
520     user.runAs(new PrivilegedExceptionAction() {
521       public Object run() throws Exception {
522         runWALSplit(newConf);
523         FileSystem newFS = FileSystem.get(newConf);
524         // Make a new wal for new region open.
525         HLog wal3 = createWAL(newConf);
526         final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
527         HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
528           @Override
529           protected boolean restoreEdit(Store s, KeyValue kv) {
530             boolean b = super.restoreEdit(s, kv);
531             countOfRestoredEdits.incrementAndGet();
532             return b;
533           }
534         };
535         long seqid3 = region3.initialize();
536         Result result3 = region3.get(g);
537         // Assert that count of cells is same as before crash.
538         assertEquals(result2.size(), result3.size());
539         assertEquals(htd.getFamilies().size() * countPerFamily,
540           countOfRestoredEdits.get());
541 
542         // I can't close wal1.  Its been appropriated when we split.
543         region3.close();
544         wal3.closeAndDelete();
545         return null;
546       }
547     });
548   }
549 
550   /**
551    * Test that we recover correctly when there is a failure in between the
552    * flushes. i.e. Some stores got flushed but others did not.
553    *
554    * Unfortunately, there is no easy hook to flush at a store level. The way
555    * we get around this is by flushing at the region level, and then deleting
556    * the recently flushed store file for one of the Stores. This would put us
557    * back in the situation where all but that store got flushed and the region
558    * died.
559    *
560    * We restart Region again, and verify that the edits were replayed.
561    *
562    * @throws IOException
563    * @throws IllegalAccessException
564    * @throws NoSuchFieldException
565    * @throws IllegalArgumentException
566    * @throws SecurityException
567    */
568   @Test
569   public void testReplayEditsAfterPartialFlush()
570   throws IOException, SecurityException, IllegalArgumentException,
571       NoSuchFieldException, IllegalAccessException, InterruptedException {
572     final TableName tableName =
573         TableName.valueOf("testReplayEditsWrittenViaHRegion");
574     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
575     final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
576     deleteDir(basedir);
577     final byte[] rowName = tableName.getName();
578     final int countPerFamily = 10;
579     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
580     HRegion region3 = HRegion.createHRegion(hri,
581             hbaseRootDir, this.conf, htd);
582     HRegion.closeHRegion(region3);
583     // Write countPerFamily edits into the three families.  Do a flush on one
584     // of the families during the load of edits so its seqid is not same as
585     // others to test we do right thing when different seqids.
586     HLog wal = createWAL(this.conf);
587     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
588     long seqid = region.getOpenSeqNum();
589     for (HColumnDescriptor hcd: htd.getFamilies()) {
590       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
591     }
592 
593     // Now assert edits made it in.
594     final Get g = new Get(rowName);
595     Result result = region.get(g);
596     assertEquals(countPerFamily * htd.getFamilies().size(),
597       result.size());
598 
599     // Let us flush the region
600     region.flushcache();
601     region.close(true);
602     wal.close();
603 
604     // delete the store files in the second column family to simulate a failure
605     // in between the flushcache();
606     // we have 3 families. killing the middle one ensures that taking the maximum
607     // will make us fail.
608     int cf_count = 0;
609     for (HColumnDescriptor hcd: htd.getFamilies()) {
610       cf_count++;
611       if (cf_count == 2) {
612         region.getRegionFileSystem().deleteFamily(hcd.getNameAsString());
613       }
614     }
615 
616 
617     // Let us try to split and recover
618     runWALSplit(this.conf);
619     HLog wal2 = createWAL(this.conf);
620     HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2);
621     long seqid2 = region2.getOpenSeqNum();
622     assertTrue(seqid + result.size() < seqid2);
623 
624     final Result result1b = region2.get(g);
625     assertEquals(result.size(), result1b.size());
626   }
627 
628 
629   // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush.
630   // Only throws exception if throwExceptionWhenFlushing is set true.
631   public static class CustomStoreFlusher extends DefaultStoreFlusher {
632     // Switch between throw and not throw exception in flush
633     static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
634 
635     public CustomStoreFlusher(Configuration conf, Store store) {
636       super(conf, store);
637     }
638     @Override
639     public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushId,
640         TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status)
641             throws IOException {
642       if (throwExceptionWhenFlushing.get()) {
643         throw new IOException("Simulated exception by tests");
644       }
645       return super.flushSnapshot(snapshot, cacheFlushId, snapshotTimeRangeTracker,
646           flushedSize, status);
647     }
648 
649   };
650 
651   /**
652    * Test that we could recover the data correctly after aborting flush. In the
653    * test, first we abort flush after writing some data, then writing more data
654    * and flush again, at last verify the data.
655    * @throws IOException
656    */
657   @Test
658   public void testReplayEditsAfterAbortingFlush() throws IOException {
659     final TableName tableName =
660         TableName.valueOf("testReplayEditsAfterAbortingFlush");
661     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
662     final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
663     deleteDir(basedir);
664     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
665     HRegion region3 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd);
666     region3.close();
667     region3.getLog().closeAndDelete();
668     // Write countPerFamily edits into the three families. Do a flush on one
669     // of the families during the load of edits so its seqid is not same as
670     // others to test we do right thing when different seqids.
671     HLog wal = createWAL(this.conf);
672     RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
673     Mockito.doReturn(false).when(rsServices).isAborted();
674     Configuration customConf = new Configuration(this.conf);
675     customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
676         CustomStoreFlusher.class.getName());
677     HRegion region =
678       HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null);
679     int writtenRowCount = 10;
680     List<HColumnDescriptor> families = new ArrayList<HColumnDescriptor>(
681         htd.getFamilies());
682     for (int i = 0; i < writtenRowCount; i++) {
683       Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
684       put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
685           Bytes.toBytes("val"));
686       region.put(put);
687     }
688 
689     // Now assert edits made it in.
690     RegionScanner scanner = region.getScanner(new Scan());
691     assertEquals(writtenRowCount, getScannedCount(scanner));
692 
693     // Let us flush the region
694     CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
695     try {
696       region.flushcache();
697       fail("Injected exception hasn't been thrown");
698     } catch (Throwable t) {
699       LOG.info("Expected simulated exception when flushing region,"
700           + t.getMessage());
701       // simulated to abort server
702       Mockito.doReturn(true).when(rsServices).isAborted();
703     }
704     // writing more data
705     int moreRow = 10;
706     for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
707       Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
708       put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
709           Bytes.toBytes("val"));
710       region.put(put);
711     }
712     writtenRowCount += moreRow;
713     // call flush again
714     CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
715     try {
716       region.flushcache();
717     } catch (IOException t) {
718       LOG.info("Expected exception when flushing region because server is stopped,"
719           + t.getMessage());
720     }
721 
722     region.close(true);
723     wal.close();
724 
725     // Let us try to split and recover
726     runWALSplit(this.conf);
727     HLog wal2 = createWAL(this.conf);
728     Mockito.doReturn(false).when(rsServices).isAborted();
729     HRegion region2 =
730       HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null);
731     scanner = region2.getScanner(new Scan());
732     assertEquals(writtenRowCount, getScannedCount(scanner));
733   }
734 
735   private int getScannedCount(RegionScanner scanner) throws IOException {
736     int scannedCount = 0;
737     List<Cell> results = new ArrayList<Cell>();
738     while (true) {
739       boolean existMore = scanner.next(results);
740       if (!results.isEmpty())
741         scannedCount++;
742       if (!existMore)
743         break;
744       results.clear();
745     }
746     return scannedCount;
747   }
748 
749   /**
750    * Create an HRegion with the result of a HLog split and test we only see the
751    * good edits
752    * @throws Exception
753    */
754   @Test
755   public void testReplayEditsWrittenIntoWAL() throws Exception {
756     final TableName tableName =
757         TableName.valueOf("testReplayEditsWrittenIntoWAL");
758     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
759     final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
760     deleteDir(basedir);
761 
762     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
763     HRegion region2 = HRegion.createHRegion(hri,
764             hbaseRootDir, this.conf, htd);
765     HRegion.closeHRegion(region2);
766     final HLog wal = createWAL(this.conf);
767     final byte[] rowName = tableName.getName();
768     final byte[] regionName = hri.getEncodedNameAsBytes();
769     final AtomicLong sequenceId = new AtomicLong(1);
770 
771     // Add 1k to each family.
772     final int countPerFamily = 1000;
773     for (HColumnDescriptor hcd: htd.getFamilies()) {
774       addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
775           ee, wal, htd, sequenceId);
776     }
777 
778     // Add a cache flush, shouldn't have any effect
779     wal.startCacheFlush(regionName);
780     wal.completeCacheFlush(regionName);
781 
782     // Add an edit to another family, should be skipped.
783     WALEdit edit = new WALEdit();
784     long now = ee.currentTimeMillis();
785     edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
786       now, rowName));
787     wal.append(hri, tableName, edit, now, htd, sequenceId);
788 
789     // Delete the c family to verify deletes make it over.
790     edit = new WALEdit();
791     now = ee.currentTimeMillis();
792     edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now,
793       KeyValue.Type.DeleteFamily));
794     wal.append(hri, tableName, edit, now, htd, sequenceId);
795 
796     // Sync.
797     wal.sync();
798     // Set down maximum recovery so we dfsclient doesn't linger retrying something
799     // long gone.
800     HBaseTestingUtility.setMaxRecoveryErrorCount(((FSHLog) wal).getOutputStream(), 1);
801     // Make a new conf and a new fs for the splitter to run on so we can take
802     // over old wal.
803     final Configuration newConf = HBaseConfiguration.create(this.conf);
804     User user = HBaseTestingUtility.getDifferentUser(newConf,
805       ".replay.wal.secondtime");
806     user.runAs(new PrivilegedExceptionAction() {
807       public Object run() throws Exception {
808         runWALSplit(newConf);
809         FileSystem newFS = FileSystem.get(newConf);
810         // 100k seems to make for about 4 flushes during HRegion#initialize.
811         newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100);
812         // Make a new wal for new region.
813         HLog newWal = createWAL(newConf);
814         final AtomicInteger flushcount = new AtomicInteger(0);
815         try {
816           final HRegion region =
817               new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
818             protected FlushResult internalFlushcache(
819                 final HLog wal, final long myseqid, MonitoredTask status)
820             throws IOException {
821               LOG.info("InternalFlushCache Invoked");
822               FlushResult fs = super.internalFlushcache(wal, myseqid,
823                   Mockito.mock(MonitoredTask.class));
824               flushcount.incrementAndGet();
825               return fs;
826             };
827           };
828           long seqid = region.initialize();
829           // We flushed during init.
830           assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0);
831           assertTrue(seqid - 1 == sequenceId.get());
832 
833           Get get = new Get(rowName);
834           Result result = region.get(get);
835           // Make sure we only see the good edits
836           assertEquals(countPerFamily * (htd.getFamilies().size() - 1),
837             result.size());
838           region.close();
839         } finally {
840           newWal.closeAndDelete();
841         }
842         return null;
843       }
844     });
845   }
846 
847   @Test
848   // the following test is for HBASE-6065
849   public void testSequentialEditLogSeqNum() throws IOException {
850     final TableName tableName =
851         TableName.valueOf("testSequentialEditLogSeqNum");
852     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
853     final Path basedir =
854         FSUtils.getTableDir(this.hbaseRootDir, tableName);
855     deleteDir(basedir);
856     final byte[] rowName = tableName.getName();
857     final int countPerFamily = 10;
858     final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
859 
860     // Mock the HLog
861     MockHLog wal = createMockWAL(this.conf);
862 
863     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
864     for (HColumnDescriptor hcd : htd.getFamilies()) {
865       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
866     }
867 
868     // Let us flush the region
869     // But this time completeflushcache is not yet done
870     region.flushcache();
871     for (HColumnDescriptor hcd : htd.getFamilies()) {
872       addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
873     }
874     long lastestSeqNumber = region.getSequenceId().get();
875     // get the current seq no
876     wal.doCompleteCacheFlush = true;
877     // allow complete cache flush with the previous seq number got after first
878     // set of edits.
879     wal.completeCacheFlush(hri.getEncodedNameAsBytes());
880     wal.close();
881     FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
882     HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0],
883       this.fs, this.conf, null, null, null, mode);
884     FileStatus[] listStatus1 = this.fs.listStatus(
885         new Path(FSUtils.getTableDir(hbaseRootDir, tableName),
886             new Path(hri.getEncodedName(), "recovered.edits")));
887     int editCount = 0;
888     for (FileStatus fileStatus : listStatus1) {
889       editCount = Integer.parseInt(fileStatus.getPath().getName());
890     }
891     // The sequence number should be same 
892     assertEquals(
893         "The sequence number of the recoverd.edits and the current edit seq should be same",
894         lastestSeqNumber, editCount);
895   }
896 
897   static class MockHLog extends FSHLog {
898     boolean doCompleteCacheFlush = false;
899 
900     public MockHLog(FileSystem fs, Path rootDir, String logName, Configuration conf) throws IOException {
901       super(fs, rootDir, logName, conf);
902     }
903 
904     @Override
905     public void completeCacheFlush(byte[] encodedRegionName) {
906       if (!doCompleteCacheFlush) {
907         return;
908       }
909       super.completeCacheFlush(encodedRegionName);
910     }
911   }
912 
913   private HTableDescriptor createBasic1FamilyHTD(final TableName tableName) {
914     HTableDescriptor htd = new HTableDescriptor(tableName);
915     HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
916     htd.addFamily(a);
917     return htd;
918   }
919   
920   private MockHLog createMockWAL(Configuration conf) throws IOException {
921     MockHLog wal = new MockHLog(FileSystem.get(conf), hbaseRootDir, logName, conf);
922     // Set down maximum recovery so we dfsclient doesn't linger retrying something
923     // long gone.
924     HBaseTestingUtility.setMaxRecoveryErrorCount(((FSHLog) wal).getOutputStream(), 1);
925     return wal;
926   }
927 
928   // Flusher used in this test.  Keep count of how often we are called and
929   // actually run the flush inside here.
930   class TestFlusher implements FlushRequester {
931     private HRegion r;
932 
933     @Override
934     public void requestFlush(HRegion region) {
935       try {
936         r.flushcache();
937       } catch (IOException e) {
938         throw new RuntimeException("Exception flushing", e);
939       }
940     }
941 
942     @Override
943     public void requestDelayedFlush(HRegion region, long when) {
944       // TODO Auto-generated method stub
945       
946     }
947   }
948 
949   private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
950       final byte[] family, final int count, EnvironmentEdge ee, final HLog wal,
951       final HTableDescriptor htd, final AtomicLong sequenceId)
952   throws IOException {
953     String familyStr = Bytes.toString(family);
954     for (int j = 0; j < count; j++) {
955       byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
956       byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
957       WALEdit edit = new WALEdit();
958       edit.add(new KeyValue(rowName, family, qualifierBytes,
959         ee.currentTimeMillis(), columnBytes));
960       wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd, sequenceId);
961     }
962   }
963 
964   private void addRegionEdits (final byte [] rowName, final byte [] family,
965       final int count, EnvironmentEdge ee, final HRegion r,
966       final String qualifierPrefix)
967   throws IOException {
968     for (int j = 0; j < count; j++) {
969       byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
970       Put p = new Put(rowName);
971       p.add(family, qualifier, ee.currentTimeMillis(), rowName);
972       r.put(p);
973     }
974   }
975 
976   /*
977    * Creates an HRI around an HTD that has <code>tableName</code> and three
978    * column families named 'a','b', and 'c'.
979    * @param tableName Name of table to use when we create HTableDescriptor.
980    */
981    private HRegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) {
982     return new HRegionInfo(tableName, null, null, false);
983    }
984 
985   /*
986    * Run the split.  Verify only single split file made.
987    * @param c
988    * @return The single split file made
989    * @throws IOException
990    */
991   private Path runWALSplit(final Configuration c) throws IOException {
992     List<Path> splits = HLogSplitter.split(
993       hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c);
994     // Split should generate only 1 file since there's only 1 region
995     assertEquals("splits=" + splits, 1, splits.size());
996     // Make sure the file exists
997     assertTrue(fs.exists(splits.get(0)));
998     LOG.info("Split file=" + splits.get(0));
999     return splits.get(0);
1000   }
1001 
1002   /*
1003    * @param c
1004    * @return WAL with retries set down from 5 to 1 only.
1005    * @throws IOException
1006    */
1007   private HLog createWAL(final Configuration c) throws IOException {
1008     HLog wal = HLogFactory.createHLog(FileSystem.get(c), 
1009         hbaseRootDir, logName, c);
1010     // Set down maximum recovery so we dfsclient doesn't linger retrying something
1011     // long gone.
1012     HBaseTestingUtility.setMaxRecoveryErrorCount(((FSHLog) wal).getOutputStream(), 1);
1013     return wal;
1014   }
1015 
1016   private HTableDescriptor createBasic3FamilyHTD(final TableName tableName) {
1017     HTableDescriptor htd = new HTableDescriptor(tableName);
1018     HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1019     htd.addFamily(a);
1020     HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
1021     htd.addFamily(b);
1022     HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
1023     htd.addFamily(c);
1024     return htd;
1025   }
1026 
1027 }
1028