View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.junit.Assert.*;
22  import static org.mockito.Matchers.any;
23  import static org.mockito.Matchers.anyBoolean;
24  import static org.mockito.Mockito.mock;
25  import static org.mockito.Mockito.spy;
26  import static org.mockito.Mockito.times;
27  import static org.mockito.Mockito.verify;
28  import static org.mockito.Mockito.when;
29  import static org.apache.hadoop.hbase.regionserver.TestHRegion.*;
30  
31  import java.io.FileNotFoundException;
32  import java.io.IOException;
33  import java.util.ArrayList;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.Random;
37  import java.util.UUID;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FSDataOutputStream;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.CellUtil;
47  import org.apache.hadoop.hbase.HBaseTestingUtility;
48  import org.apache.hadoop.hbase.HColumnDescriptor;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.HTableDescriptor;
52  import org.apache.hadoop.hbase.KeyValue;
53  import org.apache.hadoop.hbase.ServerName;
54  import org.apache.hadoop.hbase.TableName;
55  import org.apache.hadoop.hbase.client.Durability;
56  import org.apache.hadoop.hbase.client.Get;
57  import org.apache.hadoop.hbase.client.Put;
58  import org.apache.hadoop.hbase.io.hfile.HFile;
59  import org.apache.hadoop.hbase.io.hfile.HFileContext;
60  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
61  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
62  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
63  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
64  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
65  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
66  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
67  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
68  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
69  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
70  import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
71  import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
72  import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
73  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
74  import org.apache.hadoop.hbase.testclassification.MediumTests;
75  import org.apache.hadoop.hbase.util.Bytes;
76  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
77  import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
78  import org.apache.hadoop.hbase.util.FSUtils;
79  import org.apache.hadoop.hbase.util.Pair;
80  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
81  import org.apache.hadoop.hbase.wal.WAL;
82  import org.apache.hadoop.hbase.wal.WALFactory;
83  import org.apache.hadoop.hbase.wal.WALKey;
84  import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
85  import org.apache.hadoop.util.StringUtils;
86  import org.junit.After;
87  import org.junit.Before;
88  import org.junit.Rule;
89  import org.junit.Test;
90  import org.junit.experimental.categories.Category;
91  import org.junit.rules.TestName;
92  
93  import com.google.common.collect.Lists;
94  import com.google.protobuf.ByteString;
95  
96  /**
97   * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
98   * region replicas
99   */
100 @Category(MediumTests.class)
101 public class TestHRegionReplayEvents {
102 
103   private static final Log LOG = LogFactory.getLog(TestHRegion.class);
104   @Rule public TestName name = new TestName();
105 
106   private static HBaseTestingUtility TEST_UTIL;
107 
108   public static Configuration CONF ;
109   private String dir;
110   private static FileSystem FILESYSTEM;
111 
112   private byte[][] families = new byte[][] {
113       Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")};
114 
115   // Test names
116   protected byte[] tableName;
117   protected String method;
118   protected final byte[] row = Bytes.toBytes("rowA");
119   protected final byte[] row2 = Bytes.toBytes("rowB");
120   protected byte[] cq = Bytes.toBytes("cq");
121 
122   // per test fields
123   private Path rootDir;
124   private HTableDescriptor htd;
125   private long time;
126   private RegionServerServices rss;
127   private HRegionInfo primaryHri, secondaryHri;
128   private HRegion primaryRegion, secondaryRegion;
129   private WALFactory wals;
130   private WAL walPrimary, walSecondary;
131   private WAL.Reader reader;
132 
133   @Before
134   public void setup() throws IOException {
135     TEST_UTIL = HBaseTestingUtility.createLocalHTU();
136     FILESYSTEM = TEST_UTIL.getTestFileSystem();
137     CONF = TEST_UTIL.getConfiguration();
138     dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString();
139     method = name.getMethodName();
140     tableName = Bytes.toBytes(name.getMethodName());
141     rootDir = new Path(dir + method);
142     TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
143     method = name.getMethodName();
144 
145     htd = new HTableDescriptor(TableName.valueOf(method));
146     for (byte[] family : families) {
147       htd.addFamily(new HColumnDescriptor(family));
148     }
149 
150     time = System.currentTimeMillis();
151 
152     primaryHri = new HRegionInfo(htd.getTableName(),
153       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
154       false, time, 0);
155     secondaryHri = new HRegionInfo(htd.getTableName(),
156       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
157       false, time, 1);
158 
159     wals = TestHRegion.createWALFactory(CONF, rootDir);
160     walPrimary = wals.getWAL(primaryHri.getEncodedNameAsBytes());
161     walSecondary = wals.getWAL(secondaryHri.getEncodedNameAsBytes());
162 
163     rss = mock(RegionServerServices.class);
164     when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
165     when(rss.getConfiguration()).thenReturn(CONF);
166     when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting());
167 
168     primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
169     primaryRegion.close();
170 
171     primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
172     secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
173 
174     reader = null;
175   }
176 
177   @After
178   public void tearDown() throws Exception {
179     if (reader != null) {
180       reader.close();
181     }
182 
183     if (primaryRegion != null) {
184       HRegion.closeHRegion(primaryRegion);
185     }
186     if (secondaryRegion != null) {
187       HRegion.closeHRegion(secondaryRegion);
188     }
189 
190     EnvironmentEdgeManagerTestHelper.reset();
191     LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
192     TEST_UTIL.cleanupTestDir();
193   }
194 
195   String getName() {
196     return name.getMethodName();
197   }
198 
199   // Some of the test cases are as follows:
200   // 1. replay flush start marker again
201   // 2. replay flush with smaller seqId than what is there in memstore snapshot
202   // 3. replay flush with larger seqId than what is there in memstore snapshot
203   // 4. replay flush commit without flush prepare. non droppable memstore
204   // 5. replay flush commit without flush prepare. droppable memstore
205   // 6. replay open region event
206   // 7. replay open region event after flush start
207   // 8. replay flush form an earlier seqId (test ignoring seqIds)
208   // 9. start flush does not prevent region from closing.
209 
210   @Test
211   public void testRegionReplicaSecondaryCannotFlush() throws IOException {
212     // load some data and flush ensure that the secondary replica will not execute the flush
213 
214     // load some data to secondary by replaying
215     putDataByReplay(secondaryRegion, 0, 1000, cq, families);
216 
217     verifyData(secondaryRegion, 0, 1000, cq, families);
218 
219     // flush region
220     FlushResultImpl flush = (FlushResultImpl)secondaryRegion.flush(true);
221     assertEquals(flush.result, FlushResultImpl.Result.CANNOT_FLUSH);
222 
223     verifyData(secondaryRegion, 0, 1000, cq, families);
224 
225     // close the region, and inspect that it has not flushed
226     Map<byte[], List<StoreFile>> files = secondaryRegion.close(false);
227     // assert that there are no files (due to flush)
228     for (List<StoreFile> f : files.values()) {
229       assertTrue(f.isEmpty());
230     }
231   }
232 
233   /**
234    * Tests a case where we replay only a flush start marker, then the region is closed. This region
235    * should not block indefinitely
236    */
237   @Test (timeout = 60000)
238   public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException {
239     // load some data to primary and flush
240     int start = 0;
241     LOG.info("-- Writing some data to primary from " +  start + " to " + (start+100));
242     putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
243     LOG.info("-- Flushing primary, creating 3 files for 3 stores");
244     primaryRegion.flush(true);
245 
246     // now replay the edits and the flush marker
247     reader = createWALReaderForPrimary();
248 
249     LOG.info("-- Replaying edits and flush events in secondary");
250     while (true) {
251       WAL.Entry entry = reader.next();
252       if (entry == null) {
253         break;
254       }
255       FlushDescriptor flushDesc
256         = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
257       if (flushDesc != null) {
258         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
259           LOG.info("-- Replaying flush start in secondary");
260           PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
261         } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
262           LOG.info("-- NOT Replaying flush commit in secondary");
263         }
264       } else {
265         replayEdit(secondaryRegion, entry);
266       }
267     }
268 
269     assertTrue(rss.getRegionServerAccounting().getGlobalMemstoreSize() > 0);
270     // now close the region which should not cause hold because of un-committed flush
271     secondaryRegion.close();
272 
273     // verify that the memstore size is back to what it was
274     assertEquals(0, rss.getRegionServerAccounting().getGlobalMemstoreSize());
275   }
276 
277   static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
278     if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
279       return 0; // handled elsewhere
280     }
281     Put put = new Put(entry.getEdit().getCells().get(0).getRow());
282     for (Cell cell : entry.getEdit().getCells()) put.add(cell);
283     put.setDurability(Durability.SKIP_WAL);
284     MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
285     region.batchReplay(new MutationReplay[] {mutation},
286       entry.getKey().getLogSeqNum());
287     return Integer.parseInt(Bytes.toString(put.getRow()));
288   }
289 
290   WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException {
291     return wals.createReader(TEST_UTIL.getTestFileSystem(),
292       DefaultWALProvider.getCurrentFileName(walPrimary),
293       TEST_UTIL.getConfiguration());
294   }
295 
296   @Test
297   public void testReplayFlushesAndCompactions() throws IOException {
298     // initiate a secondary region with some data.
299 
300     // load some data to primary and flush. 3 flushes and some more unflushed data
301     putDataWithFlushes(primaryRegion, 100, 300, 100);
302 
303     // compaction from primary
304     LOG.info("-- Compacting primary, only 1 store");
305     primaryRegion.compactStore(Bytes.toBytes("cf1"),
306       NoLimitCompactionThroughputController.INSTANCE);
307 
308     // now replay the edits and the flush marker
309     reader = createWALReaderForPrimary();
310 
311     LOG.info("-- Replaying edits and flush events in secondary");
312     int lastReplayed = 0;
313     int expectedStoreFileCount = 0;
314     while (true) {
315       WAL.Entry entry = reader.next();
316       if (entry == null) {
317         break;
318       }
319       FlushDescriptor flushDesc
320       = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
321       CompactionDescriptor compactionDesc
322       = WALEdit.getCompaction(entry.getEdit().getCells().get(0));
323       if (flushDesc != null) {
324         // first verify that everything is replayed and visible before flush event replay
325         verifyData(secondaryRegion, 0, lastReplayed, cq, families);
326         Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
327         long storeMemstoreSize = store.getMemStoreSize();
328         long regionMemstoreSize = secondaryRegion.getMemstoreSize();
329         long storeFlushableSize = store.getFlushableSize();
330         long storeSize = store.getSize();
331         long storeSizeUncompressed = store.getStoreSizeUncompressed();
332         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
333           LOG.info("-- Replaying flush start in secondary");
334           PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
335           assertNull(result.result);
336           assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber());
337 
338           // assert that the store memstore is smaller now
339           long newStoreMemstoreSize = store.getMemStoreSize();
340           LOG.info("Memstore size reduced by:"
341               + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
342           assertTrue(storeMemstoreSize > newStoreMemstoreSize);
343 
344         } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
345           LOG.info("-- Replaying flush commit in secondary");
346           secondaryRegion.replayWALFlushCommitMarker(flushDesc);
347 
348           // assert that the flush files are picked
349           expectedStoreFileCount++;
350           for (Store s : secondaryRegion.getStores()) {
351             assertEquals(expectedStoreFileCount, s.getStorefilesCount());
352           }
353           long newFlushableSize = store.getFlushableSize();
354           assertTrue(storeFlushableSize > newFlushableSize);
355 
356           // assert that the region memstore is smaller now
357           long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
358           assertTrue(regionMemstoreSize > newRegionMemstoreSize);
359 
360           // assert that the store sizes are bigger
361           assertTrue(store.getSize() > storeSize);
362           assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed);
363           assertEquals(store.getSize(), store.getStorefilesSize());
364         }
365         // after replay verify that everything is still visible
366         verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
367       } else if (compactionDesc != null) {
368         secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE);
369 
370         // assert that the compaction is applied
371         for (Store store : secondaryRegion.getStores()) {
372           if (store.getColumnFamilyName().equals("cf1")) {
373             assertEquals(1, store.getStorefilesCount());
374           } else {
375             assertEquals(expectedStoreFileCount, store.getStorefilesCount());
376           }
377         }
378       } else {
379         lastReplayed = replayEdit(secondaryRegion, entry);;
380       }
381     }
382 
383     assertEquals(400-1, lastReplayed);
384     LOG.info("-- Verifying edits from secondary");
385     verifyData(secondaryRegion, 0, 400, cq, families);
386 
387     LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted");
388     verifyData(primaryRegion, 0, lastReplayed, cq, families);
389     for (Store store : primaryRegion.getStores()) {
390       if (store.getColumnFamilyName().equals("cf1")) {
391         assertEquals(1, store.getStorefilesCount());
392       } else {
393         assertEquals(expectedStoreFileCount, store.getStorefilesCount());
394       }
395     }
396   }
397 
398   /**
399    * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
400    * equal to, greater or less than the previous flush start marker.
401    */
402   @Test
403   public void testReplayFlushStartMarkers() throws IOException {
404     // load some data to primary and flush. 1 flush and some more unflushed data
405     putDataWithFlushes(primaryRegion, 100, 100, 100);
406     int numRows = 200;
407 
408     // now replay the edits and the flush marker
409     reader =  createWALReaderForPrimary();
410 
411     LOG.info("-- Replaying edits and flush events in secondary");
412 
413     FlushDescriptor startFlushDesc = null;
414 
415     int lastReplayed = 0;
416     while (true) {
417       WAL.Entry entry = reader.next();
418       if (entry == null) {
419         break;
420       }
421       FlushDescriptor flushDesc
422       = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
423       if (flushDesc != null) {
424         // first verify that everything is replayed and visible before flush event replay
425         Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
426         long storeMemstoreSize = store.getMemStoreSize();
427         long regionMemstoreSize = secondaryRegion.getMemstoreSize();
428         long storeFlushableSize = store.getFlushableSize();
429 
430         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
431           startFlushDesc = flushDesc;
432           LOG.info("-- Replaying flush start in secondary");
433           PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
434           assertNull(result.result);
435           assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
436           assertTrue(regionMemstoreSize > 0);
437           assertTrue(storeFlushableSize > 0);
438 
439           // assert that the store memstore is smaller now
440           long newStoreMemstoreSize = store.getMemStoreSize();
441           LOG.info("Memstore size reduced by:"
442               + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
443           assertTrue(storeMemstoreSize > newStoreMemstoreSize);
444           verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
445 
446         }
447         // after replay verify that everything is still visible
448         verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
449       } else {
450         lastReplayed = replayEdit(secondaryRegion, entry);
451       }
452     }
453 
454     // at this point, there should be some data (rows 0-100) in memstore snapshot
455     // and some more data in memstores (rows 100-200)
456 
457     verifyData(secondaryRegion, 0, numRows, cq, families);
458 
459     // Test case 1: replay the same flush start marker again
460     LOG.info("-- Replaying same flush start in secondary again");
461     PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
462     assertNull(result); // this should return null. Ignoring the flush start marker
463     // assert that we still have prepared flush with the previous setup.
464     assertNotNull(secondaryRegion.getPrepareFlushResult());
465     assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
466       startFlushDesc.getFlushSequenceNumber());
467     assertTrue(secondaryRegion.getMemstoreSize() > 0); // memstore is not empty
468     verifyData(secondaryRegion, 0, numRows, cq, families);
469 
470     // Test case 2: replay a flush start marker with a smaller seqId
471     FlushDescriptor startFlushDescSmallerSeqId
472       = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50);
473     LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId);
474     result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId);
475     assertNull(result); // this should return null. Ignoring the flush start marker
476     // assert that we still have prepared flush with the previous setup.
477     assertNotNull(secondaryRegion.getPrepareFlushResult());
478     assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
479       startFlushDesc.getFlushSequenceNumber());
480     assertTrue(secondaryRegion.getMemstoreSize() > 0); // memstore is not empty
481     verifyData(secondaryRegion, 0, numRows, cq, families);
482 
483     // Test case 3: replay a flush start marker with a larger seqId
484     FlushDescriptor startFlushDescLargerSeqId
485       = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50);
486     LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId);
487     result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId);
488     assertNull(result); // this should return null. Ignoring the flush start marker
489     // assert that we still have prepared flush with the previous setup.
490     assertNotNull(secondaryRegion.getPrepareFlushResult());
491     assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
492       startFlushDesc.getFlushSequenceNumber());
493     assertTrue(secondaryRegion.getMemstoreSize() > 0); // memstore is not empty
494     verifyData(secondaryRegion, 0, numRows, cq, families);
495 
496     LOG.info("-- Verifying edits from secondary");
497     verifyData(secondaryRegion, 0, numRows, cq, families);
498 
499     LOG.info("-- Verifying edits from primary.");
500     verifyData(primaryRegion, 0, numRows, cq, families);
501   }
502 
503   /**
504    * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
505    * less than the previous flush start marker.
506    */
507   @Test
508   public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException {
509     // load some data to primary and flush. 2 flushes and some more unflushed data
510     putDataWithFlushes(primaryRegion, 100, 200, 100);
511     int numRows = 300;
512 
513     // now replay the edits and the flush marker
514     reader =  createWALReaderForPrimary();
515 
516     LOG.info("-- Replaying edits and flush events in secondary");
517     FlushDescriptor startFlushDesc = null;
518     FlushDescriptor commitFlushDesc = null;
519 
520     int lastReplayed = 0;
521     while (true) {
522       System.out.println(lastReplayed);
523       WAL.Entry entry = reader.next();
524       if (entry == null) {
525         break;
526       }
527       FlushDescriptor flushDesc
528       = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
529       if (flushDesc != null) {
530         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
531           // don't replay the first flush start marker, hold on to it, replay the second one
532           if (startFlushDesc == null) {
533             startFlushDesc = flushDesc;
534           } else {
535             LOG.info("-- Replaying flush start in secondary");
536             startFlushDesc = flushDesc;
537             PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
538             assertNull(result.result);
539           }
540         } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
541           // do not replay any flush commit yet
542           if (commitFlushDesc == null) {
543             commitFlushDesc = flushDesc; // hold on to the first flush commit marker
544           }
545         }
546         // after replay verify that everything is still visible
547         verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
548       } else {
549         lastReplayed = replayEdit(secondaryRegion, entry);
550       }
551     }
552 
553     // at this point, there should be some data (rows 0-200) in memstore snapshot
554     // and some more data in memstores (rows 200-300)
555     verifyData(secondaryRegion, 0, numRows, cq, families);
556 
557     // no store files in the region
558     int expectedStoreFileCount = 0;
559     for (Store s : secondaryRegion.getStores()) {
560       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
561     }
562     long regionMemstoreSize = secondaryRegion.getMemstoreSize();
563 
564     // Test case 1: replay the a flush commit marker smaller than what we have prepared
565     LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
566         + startFlushDesc);
567     assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber());
568 
569     LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
570     secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
571 
572     // assert that the flush files are picked
573     expectedStoreFileCount++;
574     for (Store s : secondaryRegion.getStores()) {
575       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
576     }
577     Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
578     long newFlushableSize = store.getFlushableSize();
579     assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
580 
581     // assert that the region memstore is same as before
582     long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
583     assertEquals(regionMemstoreSize, newRegionMemstoreSize);
584 
585     assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped
586 
587     LOG.info("-- Verifying edits from secondary");
588     verifyData(secondaryRegion, 0, numRows, cq, families);
589 
590     LOG.info("-- Verifying edits from primary.");
591     verifyData(primaryRegion, 0, numRows, cq, families);
592   }
593 
594   /**
595    * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
596    * larger than the previous flush start marker.
597    */
598   @Test
599   public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException {
600     // load some data to primary and flush. 1 flush and some more unflushed data
601     putDataWithFlushes(primaryRegion, 100, 100, 100);
602     int numRows = 200;
603 
604     // now replay the edits and the flush marker
605     reader =  createWALReaderForPrimary();
606 
607     LOG.info("-- Replaying edits and flush events in secondary");
608     FlushDescriptor startFlushDesc = null;
609     FlushDescriptor commitFlushDesc = null;
610 
611     int lastReplayed = 0;
612     while (true) {
613       WAL.Entry entry = reader.next();
614       if (entry == null) {
615         break;
616       }
617       FlushDescriptor flushDesc
618       = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
619       if (flushDesc != null) {
620         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
621           if (startFlushDesc == null) {
622             LOG.info("-- Replaying flush start in secondary");
623             startFlushDesc = flushDesc;
624             PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
625             assertNull(result.result);
626           }
627         } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
628           // do not replay any flush commit yet
629           // hold on to the flush commit marker but simulate a larger
630           // flush commit seqId
631           commitFlushDesc =
632               FlushDescriptor.newBuilder(flushDesc)
633               .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50)
634               .build();
635         }
636         // after replay verify that everything is still visible
637         verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
638       } else {
639         lastReplayed = replayEdit(secondaryRegion, entry);
640       }
641     }
642 
643     // at this point, there should be some data (rows 0-100) in memstore snapshot
644     // and some more data in memstores (rows 100-200)
645     verifyData(secondaryRegion, 0, numRows, cq, families);
646 
647     // no store files in the region
648     int expectedStoreFileCount = 0;
649     for (Store s : secondaryRegion.getStores()) {
650       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
651     }
652     long regionMemstoreSize = secondaryRegion.getMemstoreSize();
653 
654     // Test case 1: replay the a flush commit marker larger than what we have prepared
655     LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
656         + startFlushDesc);
657     assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber());
658 
659     LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
660     secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
661 
662     // assert that the flush files are picked
663     expectedStoreFileCount++;
664     for (Store s : secondaryRegion.getStores()) {
665       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
666     }
667     Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
668     long newFlushableSize = store.getFlushableSize();
669     assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
670 
671     // assert that the region memstore is smaller than before, but not empty
672     long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
673     assertTrue(newRegionMemstoreSize > 0);
674     assertTrue(regionMemstoreSize > newRegionMemstoreSize);
675 
676     assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped
677 
678     LOG.info("-- Verifying edits from secondary");
679     verifyData(secondaryRegion, 0, numRows, cq, families);
680 
681     LOG.info("-- Verifying edits from primary.");
682     verifyData(primaryRegion, 0, numRows, cq, families);
683   }
684 
685   /**
686    * Tests the case where we receive a flush commit before receiving any flush prepare markers.
687    * The memstore edits should be dropped after the flush commit replay since they should be in
688    * flushed files
689    */
690   @Test
691   public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
692       throws IOException {
693     testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
694   }
695 
696   /**
697    * Tests the case where we receive a flush commit before receiving any flush prepare markers.
698    * The memstore edits should be not dropped after the flush commit replay since not every edit
699    * will be in flushed files (based on seqId)
700    */
701   @Test
702   public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
703       throws IOException {
704     testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
705   }
706 
707   /**
708    * Tests the case where we receive a flush commit before receiving any flush prepare markers
709    */
710   public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore)
711       throws IOException {
712     // load some data to primary and flush. 1 flushes and some more unflushed data.
713     // write more data after flush depending on whether droppableSnapshot
714     putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100);
715     int numRows = droppableMemstore ? 100 : 200;
716 
717     // now replay the edits and the flush marker
718     reader =  createWALReaderForPrimary();
719 
720     LOG.info("-- Replaying edits and flush events in secondary");
721     FlushDescriptor commitFlushDesc = null;
722 
723     int lastReplayed = 0;
724     while (true) {
725       WAL.Entry entry = reader.next();
726       if (entry == null) {
727         break;
728       }
729       FlushDescriptor flushDesc
730       = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
731       if (flushDesc != null) {
732         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
733           // do not replay flush start marker
734         } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
735           commitFlushDesc = flushDesc; // hold on to the flush commit marker
736         }
737         // after replay verify that everything is still visible
738         verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
739       } else {
740         lastReplayed = replayEdit(secondaryRegion, entry);
741       }
742     }
743 
744     // at this point, there should be some data (rows 0-200) in the memstore without snapshot
745     // and some more data in memstores (rows 100-300)
746     verifyData(secondaryRegion, 0, numRows, cq, families);
747 
748     // no store files in the region
749     int expectedStoreFileCount = 0;
750     for (Store s : secondaryRegion.getStores()) {
751       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
752     }
753     long regionMemstoreSize = secondaryRegion.getMemstoreSize();
754 
755     // Test case 1: replay a flush commit marker without start flush marker
756     assertNull(secondaryRegion.getPrepareFlushResult());
757     assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0);
758 
759     // ensure all files are visible in secondary
760     for (Store store : secondaryRegion.getStores()) {
761       assertTrue(store.getMaxSequenceId() <= secondaryRegion.getSequenceId());
762     }
763 
764     LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
765     secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
766 
767     // assert that the flush files are picked
768     expectedStoreFileCount++;
769     for (Store s : secondaryRegion.getStores()) {
770       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
771     }
772     Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
773     long newFlushableSize = store.getFlushableSize();
774     if (droppableMemstore) {
775       assertTrue(newFlushableSize == 0); // assert that the memstore is dropped
776     } else {
777       assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
778     }
779 
780     // assert that the region memstore is same as before (we could not drop)
781     long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
782     if (droppableMemstore) {
783       assertTrue(0 == newRegionMemstoreSize);
784     } else {
785       assertTrue(regionMemstoreSize == newRegionMemstoreSize);
786     }
787 
788     LOG.info("-- Verifying edits from secondary");
789     verifyData(secondaryRegion, 0, numRows, cq, families);
790 
791     LOG.info("-- Verifying edits from primary.");
792     verifyData(primaryRegion, 0, numRows, cq, families);
793   }
794 
795   private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) {
796     return FlushDescriptor.newBuilder(flush)
797         .setFlushSequenceNumber(flushSeqId)
798         .build();
799   }
800 
801   /**
802    * Tests replaying region open markers from primary region. Checks whether the files are picked up
803    */
804   @Test
805   public void testReplayRegionOpenEvent() throws IOException {
806     putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
807     int numRows = 100;
808 
809     // close the region and open again.
810     primaryRegion.close();
811     primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
812 
813     // now replay the edits and the flush marker
814     reader =  createWALReaderForPrimary();
815     List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
816 
817     LOG.info("-- Replaying edits and region events in secondary");
818     while (true) {
819       WAL.Entry entry = reader.next();
820       if (entry == null) {
821         break;
822       }
823       FlushDescriptor flushDesc
824         = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
825       RegionEventDescriptor regionEventDesc
826         = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
827 
828       if (flushDesc != null) {
829         // don't replay flush events
830       } else if (regionEventDesc != null) {
831         regionEvents.add(regionEventDesc);
832       } else {
833         // don't replay edits
834       }
835     }
836 
837     // we should have 1 open, 1 close and 1 open event
838     assertEquals(3, regionEvents.size());
839 
840     // replay the first region open event.
841     secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0));
842 
843     // replay the close event as well
844     secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1));
845 
846     // no store files in the region
847     int expectedStoreFileCount = 0;
848     for (Store s : secondaryRegion.getStores()) {
849       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
850     }
851     long regionMemstoreSize = secondaryRegion.getMemstoreSize();
852     assertTrue(regionMemstoreSize == 0);
853 
854     // now replay the region open event that should contain new file locations
855     LOG.info("Testing replaying region open event " + regionEvents.get(2));
856     secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
857 
858     // assert that the flush files are picked
859     expectedStoreFileCount++;
860     for (Store s : secondaryRegion.getStores()) {
861       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
862     }
863     Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
864     long newFlushableSize = store.getFlushableSize();
865     assertTrue(newFlushableSize == 0);
866 
867     // assert that the region memstore is empty
868     long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
869     assertTrue(newRegionMemstoreSize == 0);
870 
871     assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
872 
873     LOG.info("-- Verifying edits from secondary");
874     verifyData(secondaryRegion, 0, numRows, cq, families);
875 
876     LOG.info("-- Verifying edits from primary.");
877     verifyData(primaryRegion, 0, numRows, cq, families);
878   }
879 
880   /**
881    * Tests the case where we replay a region open event after a flush start but before receiving
882    * flush commit
883    */
884   @Test
885   public void testReplayRegionOpenEventAfterFlushStart() throws IOException {
886     putDataWithFlushes(primaryRegion, 100, 100, 100);
887     int numRows = 200;
888 
889     // close the region and open again.
890     primaryRegion.close();
891     primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
892 
893     // now replay the edits and the flush marker
894     reader =  createWALReaderForPrimary();
895     List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
896 
897     LOG.info("-- Replaying edits and region events in secondary");
898     while (true) {
899       WAL.Entry entry = reader.next();
900       if (entry == null) {
901         break;
902       }
903       FlushDescriptor flushDesc
904         = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
905       RegionEventDescriptor regionEventDesc
906         = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
907 
908       if (flushDesc != null) {
909         // only replay flush start
910         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
911           secondaryRegion.replayWALFlushStartMarker(flushDesc);
912         }
913       } else if (regionEventDesc != null) {
914         regionEvents.add(regionEventDesc);
915       } else {
916         replayEdit(secondaryRegion, entry);
917       }
918     }
919 
920     // at this point, there should be some data (rows 0-100) in the memstore snapshot
921     // and some more data in memstores (rows 100-200)
922     verifyData(secondaryRegion, 0, numRows, cq, families);
923 
924     // we should have 1 open, 1 close and 1 open event
925     assertEquals(3, regionEvents.size());
926 
927     // no store files in the region
928     int expectedStoreFileCount = 0;
929     for (Store s : secondaryRegion.getStores()) {
930       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
931     }
932 
933     // now replay the region open event that should contain new file locations
934     LOG.info("Testing replaying region open event " + regionEvents.get(2));
935     secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
936 
937     // assert that the flush files are picked
938     expectedStoreFileCount = 2; // two flushes happened
939     for (Store s : secondaryRegion.getStores()) {
940       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
941     }
942     Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
943     long newSnapshotSize = store.getSnapshotSize();
944     assertTrue(newSnapshotSize == 0);
945 
946     // assert that the region memstore is empty
947     long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
948     assertTrue(newRegionMemstoreSize == 0);
949 
950     assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
951 
952     LOG.info("-- Verifying edits from secondary");
953     verifyData(secondaryRegion, 0, numRows, cq, families);
954 
955     LOG.info("-- Verifying edits from primary.");
956     verifyData(primaryRegion, 0, numRows, cq, families);
957   }
958 
959   /**
960    * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
961    * of the last replayed region open event.
962    */
963   @Test
964   public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException {
965     putDataWithFlushes(primaryRegion, 100, 100, 0);
966     int numRows = 100;
967 
968     // close the region and open again.
969     primaryRegion.close();
970     primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
971 
972     // now replay the edits and the flush marker
973     reader =  createWALReaderForPrimary();
974     List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
975     List<WAL.Entry> edits = Lists.newArrayList();
976 
977     LOG.info("-- Replaying edits and region events in secondary");
978     while (true) {
979       WAL.Entry entry = reader.next();
980       if (entry == null) {
981         break;
982       }
983       FlushDescriptor flushDesc
984         = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
985       RegionEventDescriptor regionEventDesc
986         = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
987 
988       if (flushDesc != null) {
989         // don't replay flushes
990       } else if (regionEventDesc != null) {
991         regionEvents.add(regionEventDesc);
992       } else {
993         edits.add(entry);
994       }
995     }
996 
997     // replay the region open of first open, but with the seqid of the second open
998     // this way non of the flush files will be picked up.
999     secondaryRegion.replayWALRegionEventMarker(
1000       RegionEventDescriptor.newBuilder(regionEvents.get(0)).setLogSequenceNumber(
1001         regionEvents.get(2).getLogSequenceNumber()).build());
1002 
1003 
1004     // replay edits from the before region close. If replay does not
1005     // skip these the following verification will NOT fail.
1006     for (WAL.Entry entry: edits) {
1007       replayEdit(secondaryRegion, entry);
1008     }
1009 
1010     boolean expectedFail = false;
1011     try {
1012       verifyData(secondaryRegion, 0, numRows, cq, families);
1013     } catch (AssertionError e) {
1014       expectedFail = true; // expected
1015     }
1016     if (!expectedFail) {
1017       fail("Should have failed this verification");
1018     }
1019   }
1020 
1021   @Test
1022   public void testReplayFlushSeqIds() throws IOException {
1023     // load some data to primary and flush
1024     int start = 0;
1025     LOG.info("-- Writing some data to primary from " +  start + " to " + (start+100));
1026     putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
1027     LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1028     primaryRegion.flush(true);
1029 
1030     // now replay the flush marker
1031     reader =  createWALReaderForPrimary();
1032 
1033     long flushSeqId = -1;
1034     LOG.info("-- Replaying flush events in secondary");
1035     while (true) {
1036       WAL.Entry entry = reader.next();
1037       if (entry == null) {
1038         break;
1039       }
1040       FlushDescriptor flushDesc
1041         = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1042       if (flushDesc != null) {
1043         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1044           LOG.info("-- Replaying flush start in secondary");
1045           secondaryRegion.replayWALFlushStartMarker(flushDesc);
1046           flushSeqId = flushDesc.getFlushSequenceNumber();
1047         } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1048           LOG.info("-- Replaying flush commit in secondary");
1049           secondaryRegion.replayWALFlushCommitMarker(flushDesc);
1050           assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber());
1051         }
1052       }
1053       // else do not replay
1054     }
1055 
1056     // TODO: what to do with this?
1057     // assert that the newly picked up flush file is visible
1058     long readPoint = secondaryRegion.getMVCC().getReadPoint();
1059     assertEquals(flushSeqId, readPoint);
1060 
1061     // after replay verify that everything is still visible
1062     verifyData(secondaryRegion, 0, 100, cq, families);
1063   }
1064 
1065   @Test
1066   public void testSeqIdsFromReplay() throws IOException {
1067     // test the case where seqId's coming from replayed WALEdits are made persisted with their
1068     // original seqIds and they are made visible through mvcc read point upon replay
1069     String method = name.getMethodName();
1070     byte[] tableName = Bytes.toBytes(method);
1071     byte[] family = Bytes.toBytes("family");
1072 
1073     HRegion region = initHRegion(tableName, method, family);
1074     try {
1075       // replay an entry that is bigger than current read point
1076       long readPoint = region.getMVCC().getReadPoint();
1077       long origSeqId = readPoint + 100;
1078 
1079       Put put = new Put(row).add(family, row, row);
1080       put.setDurability(Durability.SKIP_WAL); // we replay with skip wal
1081       replay(region, put, origSeqId);
1082 
1083       // read point should have advanced to this seqId
1084       assertGet(region, family, row);
1085 
1086       // region seqId should have advanced at least to this seqId
1087       assertEquals(origSeqId, region.getSequenceId());
1088 
1089       // replay an entry that is smaller than current read point
1090       // caution: adding an entry below current read point might cause partial dirty reads. Normal
1091       // replay does not allow reads while replay is going on.
1092       put = new Put(row2).add(family, row2, row2);
1093       put.setDurability(Durability.SKIP_WAL);
1094       replay(region, put, origSeqId - 50);
1095 
1096       assertGet(region, family, row2);
1097     } finally {
1098       region.close();
1099     }
1100   }
1101 
1102   /**
1103    * Tests that a region opened in secondary mode would not write region open / close
1104    * events to its WAL.
1105    * @throws IOException
1106    */
1107   @SuppressWarnings("unchecked")
1108   @Test
1109   public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException {
1110     secondaryRegion.close();
1111     walSecondary = spy(walSecondary);
1112 
1113     // test for region open and close
1114     secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
1115     verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
1116       (WALKey)any(), (WALEdit)any(),  anyBoolean());
1117 
1118     // test for replay prepare flush
1119     putDataByReplay(secondaryRegion, 0, 10, cq, families);
1120     secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder().
1121       setFlushSequenceNumber(10)
1122       .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
1123       .setAction(FlushAction.START_FLUSH)
1124       .setEncodedRegionName(
1125         ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1126       .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
1127       .build());
1128 
1129     verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
1130       (WALKey)any(), (WALEdit)any(), anyBoolean());
1131 
1132     secondaryRegion.close();
1133     verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
1134       (WALKey)any(), (WALEdit)any(),  anyBoolean());
1135   }
1136 
1137   /**
1138    * Tests the reads enabled flag for the region. When unset all reads should be rejected
1139    */
1140   @Test
1141   public void testRegionReadsEnabledFlag() throws IOException {
1142 
1143     putDataByReplay(secondaryRegion, 0, 100, cq, families);
1144 
1145     verifyData(secondaryRegion, 0, 100, cq, families);
1146 
1147     // now disable reads
1148     secondaryRegion.setReadsEnabled(false);
1149     try {
1150       verifyData(secondaryRegion, 0, 100, cq, families);
1151       fail("Should have failed with IOException");
1152     } catch(IOException ex) {
1153       // expected
1154     }
1155 
1156     // verify that we can still replay data
1157     putDataByReplay(secondaryRegion, 100, 100, cq, families);
1158 
1159     // now enable reads again
1160     secondaryRegion.setReadsEnabled(true);
1161     verifyData(secondaryRegion, 0, 200, cq, families);
1162   }
1163 
1164   /**
1165    * Tests the case where a request for flush cache is sent to the region, but region cannot flush.
1166    * It should write the flush request marker instead.
1167    */
1168   @Test
1169   public void testWriteFlushRequestMarker() throws IOException {
1170     // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
1171     FlushResultImpl result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, false);
1172     assertNotNull(result);
1173     assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
1174     assertFalse(result.wroteFlushWalMarker);
1175 
1176     // request flush again, but this time with writeFlushRequestWalMarker = true
1177     result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, true);
1178     assertNotNull(result);
1179     assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
1180     assertTrue(result.wroteFlushWalMarker);
1181 
1182     List<FlushDescriptor> flushes = Lists.newArrayList();
1183     reader = createWALReaderForPrimary();
1184     while (true) {
1185       WAL.Entry entry = reader.next();
1186       if (entry == null) {
1187         break;
1188       }
1189       FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1190       if (flush != null) {
1191         flushes.add(flush);
1192       }
1193     }
1194 
1195     assertEquals(1, flushes.size());
1196     assertNotNull(flushes.get(0));
1197     assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction());
1198   }
1199 
1200   /**
1201    * Test the case where the secondary region replica is not in reads enabled state because it is
1202    * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH
1203    * flush marker entry should restore the reads enabled status in the region and allow the reads
1204    * to continue.
1205    */
1206   @Test
1207   public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
1208     disableReads(secondaryRegion);
1209 
1210     // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
1211     // triggered flush restores readsEnabled
1212     primaryRegion.flushcache(true, true);
1213     reader = createWALReaderForPrimary();
1214     while (true) {
1215       WAL.Entry entry = reader.next();
1216       if (entry == null) {
1217         break;
1218       }
1219       FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1220       if (flush != null) {
1221         secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
1222       }
1223     }
1224 
1225     // now reads should be enabled
1226     secondaryRegion.get(new Get(Bytes.toBytes(0)));
1227   }
1228 
1229   /**
1230    * Test the case where the secondary region replica is not in reads enabled state because it is
1231    * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1232    * entries should restore the reads enabled status in the region and allow the reads
1233    * to continue.
1234    */
1235   @Test
1236   public void testReplayingFlushRestoresReadsEnabledState() throws IOException {
1237     // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1238     // from triggered flush restores readsEnabled
1239     disableReads(secondaryRegion);
1240 
1241     // put some data in primary
1242     putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1243     primaryRegion.flush(true);
1244 
1245     reader = createWALReaderForPrimary();
1246     while (true) {
1247       WAL.Entry entry = reader.next();
1248       if (entry == null) {
1249         break;
1250       }
1251       FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1252       if (flush != null) {
1253         secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
1254       } else {
1255         replayEdit(secondaryRegion, entry);
1256       }
1257     }
1258 
1259     // now reads should be enabled
1260     verifyData(secondaryRegion, 0, 100, cq, families);
1261   }
1262 
1263   /**
1264    * Test the case where the secondary region replica is not in reads enabled state because it is
1265    * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1266    * entries should restore the reads enabled status in the region and allow the reads
1267    * to continue.
1268    */
1269   @Test
1270   public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException {
1271     // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1272     // from triggered flush restores readsEnabled
1273     disableReads(secondaryRegion);
1274 
1275     // put some data in primary
1276     putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1277     primaryRegion.flush(true);
1278 
1279     reader = createWALReaderForPrimary();
1280     while (true) {
1281       WAL.Entry entry = reader.next();
1282       if (entry == null) {
1283         break;
1284       }
1285       FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1286       if (flush != null) {
1287         secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
1288       }
1289     }
1290 
1291     // now reads should be enabled
1292     verifyData(secondaryRegion, 0, 100, cq, families);
1293   }
1294 
1295   /**
1296    * Test the case where the secondary region replica is not in reads enabled state because it is
1297    * waiting for a flush or region open marker from primary region. Replaying region open event
1298    * entry from primary should restore the reads enabled status in the region and allow the reads
1299    * to continue.
1300    */
1301   @Test
1302   public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
1303     // Test case 3: Test that replaying region open event markers restores readsEnabled
1304     disableReads(secondaryRegion);
1305 
1306     primaryRegion.close();
1307     primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1308 
1309     reader = createWALReaderForPrimary();
1310     while (true) {
1311       WAL.Entry entry = reader.next();
1312       if (entry == null) {
1313         break;
1314       }
1315 
1316       RegionEventDescriptor regionEventDesc
1317         = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1318 
1319       if (regionEventDesc != null) {
1320         secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
1321       }
1322     }
1323 
1324     // now reads should be enabled
1325     secondaryRegion.get(new Get(Bytes.toBytes(0)));
1326   }
1327 
1328   @Test
1329   public void testRefreshStoreFiles() throws IOException {
1330     assertEquals(0, primaryRegion.getStoreFileList(families).size());
1331     assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1332 
1333     // Test case 1: refresh with an empty region
1334     secondaryRegion.refreshStoreFiles();
1335     assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1336 
1337     // do one flush
1338     putDataWithFlushes(primaryRegion, 100, 100, 0);
1339     int numRows = 100;
1340 
1341     // refresh the store file list, and ensure that the files are picked up.
1342     secondaryRegion.refreshStoreFiles();
1343     assertPathListsEqual(primaryRegion.getStoreFileList(families),
1344       secondaryRegion.getStoreFileList(families));
1345     assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1346 
1347     LOG.info("-- Verifying edits from secondary");
1348     verifyData(secondaryRegion, 0, numRows, cq, families);
1349 
1350     // Test case 2: 3 some more flushes
1351     putDataWithFlushes(primaryRegion, 100, 300, 0);
1352     numRows = 300;
1353 
1354     // refresh the store file list, and ensure that the files are picked up.
1355     secondaryRegion.refreshStoreFiles();
1356     assertPathListsEqual(primaryRegion.getStoreFileList(families),
1357       secondaryRegion.getStoreFileList(families));
1358     assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size());
1359 
1360     LOG.info("-- Verifying edits from secondary");
1361     verifyData(secondaryRegion, 0, numRows, cq, families);
1362 
1363     if (FSUtils.WINDOWS) {
1364       // compaction cannot move files while they are open in secondary on windows. Skip remaining.
1365       return;
1366     }
1367 
1368     // Test case 3: compact primary files
1369     primaryRegion.compactStores();
1370     secondaryRegion.refreshStoreFiles();
1371     assertPathListsEqual(primaryRegion.getStoreFileList(families),
1372       secondaryRegion.getStoreFileList(families));
1373     assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1374 
1375     LOG.info("-- Verifying edits from secondary");
1376     verifyData(secondaryRegion, 0, numRows, cq, families);
1377 
1378     LOG.info("-- Replaying edits in secondary");
1379 
1380     // Test case 4: replay some edits, ensure that memstore is dropped.
1381     assertTrue(secondaryRegion.getMemstoreSize() == 0);
1382     putDataWithFlushes(primaryRegion, 400, 400, 0);
1383     numRows = 400;
1384 
1385     reader =  createWALReaderForPrimary();
1386     while (true) {
1387       WAL.Entry entry = reader.next();
1388       if (entry == null) {
1389         break;
1390       }
1391       FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1392       if (flush != null) {
1393         // do not replay flush
1394       } else {
1395         replayEdit(secondaryRegion, entry);
1396       }
1397     }
1398 
1399     assertTrue(secondaryRegion.getMemstoreSize() > 0);
1400 
1401     secondaryRegion.refreshStoreFiles();
1402 
1403     assertTrue(secondaryRegion.getMemstoreSize() == 0);
1404 
1405     LOG.info("-- Verifying edits from primary");
1406     verifyData(primaryRegion, 0, numRows, cq, families);
1407     LOG.info("-- Verifying edits from secondary");
1408     verifyData(secondaryRegion, 0, numRows, cq, families);
1409   }
1410 
1411   /**
1412    * Paths can be qualified or not. This does the assertion using String->Path conversion.
1413    */
1414   private void assertPathListsEqual(List<String> list1, List<String> list2) {
1415     List<Path> l1 = new ArrayList<>(list1.size());
1416     for (String path : list1) {
1417       l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1418     }
1419     List<Path> l2 = new ArrayList<>(list2.size());
1420     for (String path : list2) {
1421       l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1422     }
1423     assertEquals(l1, l2);
1424   }
1425 
1426   private void disableReads(HRegion region) {
1427     region.setReadsEnabled(false);
1428     try {
1429       verifyData(region, 0, 1, cq, families);
1430       fail("Should have failed with IOException");
1431     } catch(IOException ex) {
1432       // expected
1433     }
1434   }
1435 
1436   private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
1437     put.setDurability(Durability.SKIP_WAL);
1438     MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
1439     region.batchReplay(new MutationReplay[] {mutation}, replaySeqId);
1440   }
1441 
1442   /**
1443    * Tests replaying region open markers from primary region. Checks whether the files are picked up
1444    */
1445   @Test
1446   public void testReplayBulkLoadEvent() throws IOException {
1447     LOG.info("testReplayBulkLoadEvent starts");
1448     putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
1449 
1450     // close the region and open again.
1451     primaryRegion.close();
1452     primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1453 
1454     // bulk load a file into primary region
1455     Random random = new Random();
1456     byte[] randomValues = new byte[20];
1457     random.nextBytes(randomValues);
1458     Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
1459 
1460     List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
1461     int expectedLoadFileCount = 0;
1462     for (byte[] family : families) {
1463       familyPaths.add(new Pair<byte[], String>(family, createHFileForFamilies(testPath, family,
1464         randomValues)));
1465       expectedLoadFileCount++;
1466     }
1467     primaryRegion.bulkLoadHFiles(familyPaths, false, null);
1468 
1469     // now replay the edits and the bulk load marker
1470     reader = createWALReaderForPrimary();
1471 
1472     LOG.info("-- Replaying edits and region events in secondary");
1473     BulkLoadDescriptor bulkloadEvent = null;
1474     while (true) {
1475       WAL.Entry entry = reader.next();
1476       if (entry == null) {
1477         break;
1478       }
1479       bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
1480       if (bulkloadEvent != null) {
1481         break;
1482       }
1483     }
1484 
1485     // we should have 1 bulk load event
1486     assertTrue(bulkloadEvent != null);
1487     assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
1488 
1489     // replay the bulk load event
1490     secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
1491 
1492 
1493     List<String> storeFileName = new ArrayList<String>();
1494     for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
1495       storeFileName.addAll(storeDesc.getStoreFileList());
1496     }
1497     // assert that the bulk loaded files are picked
1498     for (Store s : secondaryRegion.getStores()) {
1499       for (StoreFile sf : s.getStorefiles()) {
1500         storeFileName.remove(sf.getPath().getName());
1501       }
1502     }
1503     assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty());
1504 
1505     LOG.info("-- Verifying edits from secondary");
1506     for (byte[] family : families) {
1507       assertGet(secondaryRegion, family, randomValues);
1508     }
1509   }
1510 
1511   @Test
1512   public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
1513     // tests replaying flush commit marker, but the flush file has already been compacted
1514     // from primary and also deleted from the archive directory
1515     secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder().
1516       setFlushSequenceNumber(Long.MAX_VALUE)
1517       .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
1518       .setAction(FlushAction.COMMIT_FLUSH)
1519       .setEncodedRegionName(
1520         ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1521       .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
1522       .addStoreFlushes(StoreFlushDescriptor.newBuilder()
1523         .setFamilyName(ByteString.copyFrom(families[0]))
1524         .setStoreHomeDir("/store_home_dir")
1525         .addFlushOutput("/foo/baz/bar")
1526         .build())
1527       .build());
1528   }
1529 
1530   @Test
1531   public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
1532     // tests replaying compaction marker, but the compaction output file has already been compacted
1533     // from primary and also deleted from the archive directory
1534     secondaryRegion.replayWALCompactionMarker(CompactionDescriptor.newBuilder()
1535       .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
1536       .setEncodedRegionName(
1537         ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1538       .setFamilyName(ByteString.copyFrom(families[0]))
1539       .addCompactionInput("/foo")
1540       .addCompactionOutput("/bar")
1541       .setStoreHomeDir("/store_home_dir")
1542       .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
1543       .build()
1544       , true, true, Long.MAX_VALUE);
1545   }
1546 
1547   @Test
1548   public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
1549     // tests replaying region open event marker, but the region files have already been compacted
1550     // from primary and also deleted from the archive directory
1551     secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
1552       .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
1553       .setEncodedRegionName(
1554         ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1555       .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
1556       .setEventType(EventType.REGION_OPEN)
1557       .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
1558       .setLogSequenceNumber(Long.MAX_VALUE)
1559       .addStores(StoreDescriptor.newBuilder()
1560         .setFamilyName(ByteString.copyFrom(families[0]))
1561         .setStoreHomeDir("/store_home_dir")
1562         .addStoreFile("/foo")
1563         .build())
1564       .build());
1565   }
1566 
1567   @Test
1568   public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
1569     // tests replaying bulk load event marker, but the bulk load files have already been compacted
1570     // from primary and also deleted from the archive directory
1571     secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
1572       .setTableName(ProtobufUtil.toProtoTableName(primaryRegion.getTableDesc().getTableName()))
1573       .setEncodedRegionName(
1574         ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1575       .setBulkloadSeqNum(Long.MAX_VALUE)
1576       .addStores(StoreDescriptor.newBuilder()
1577         .setFamilyName(ByteString.copyFrom(families[0]))
1578         .setStoreHomeDir("/store_home_dir")
1579         .addStoreFile("/foo")
1580         .build())
1581       .build());
1582   }
1583 
1584   private String createHFileForFamilies(Path testPath, byte[] family,
1585       byte[] valueBytes) throws IOException {
1586     HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
1587     // TODO We need a way to do this without creating files
1588     Path testFile = new Path(testPath, UUID.randomUUID().toString());
1589     FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile);
1590     try {
1591       hFileFactory.withOutputStream(out);
1592       hFileFactory.withFileContext(new HFileContext());
1593       HFile.Writer writer = hFileFactory.create();
1594       try {
1595         writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0l,
1596           KeyValue.Type.Put.getCode(), valueBytes)));
1597       } finally {
1598         writer.close();
1599       }
1600     } finally {
1601       out.close();
1602     }
1603     return testFile.toString();
1604   }
1605 
1606   /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does
1607    * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of
1608    * more rows but does not execute flush after
1609    * @throws IOException */
1610   private void putDataWithFlushes(HRegion region, int flushInterval,
1611       int numRows, int numRowsAfterFlush) throws IOException {
1612     int start = 0;
1613     for (; start < numRows; start += flushInterval) {
1614       LOG.info("-- Writing some data to primary from " +  start + " to " + (start+flushInterval));
1615       putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families);
1616       LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1617       region.flush(true);
1618     }
1619     LOG.info("-- Writing some more data to primary, not flushing");
1620     putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families);
1621   }
1622 
1623   private void putDataByReplay(HRegion region,
1624       int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
1625     for (int i = startRow; i < startRow + numRows; i++) {
1626       Put put = new Put(Bytes.toBytes("" + i));
1627       put.setDurability(Durability.SKIP_WAL);
1628       for (byte[] family : families) {
1629         put.add(family, qf, EnvironmentEdgeManager.currentTime(), null);
1630       }
1631       replay(region, put, i+1);
1632     }
1633   }
1634 
1635   private static HRegion initHRegion(byte[] tableName,
1636       String callingMethod, byte[]... families) throws IOException {
1637     return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
1638       callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families);
1639   }
1640 
1641   private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
1642       String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
1643       WAL wal, byte[]... families) throws IOException {
1644     return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf,
1645       isReadOnly, durability, wal, families);
1646   }
1647 }