1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.junit.Assert.*;
22 import static org.mockito.Matchers.any;
23 import static org.mockito.Matchers.anyBoolean;
24 import static org.mockito.Mockito.mock;
25 import static org.mockito.Mockito.spy;
26 import static org.mockito.Mockito.times;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.when;
29 import static org.apache.hadoop.hbase.regionserver.TestHRegion.*;
30
31 import java.io.FileNotFoundException;
32 import java.io.IOException;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Random;
37 import java.util.UUID;
38
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.fs.FSDataOutputStream;
43 import org.apache.hadoop.fs.FileSystem;
44 import org.apache.hadoop.fs.Path;
45 import org.apache.hadoop.hbase.Cell;
46 import org.apache.hadoop.hbase.CellUtil;
47 import org.apache.hadoop.hbase.HBaseTestingUtility;
48 import org.apache.hadoop.hbase.HColumnDescriptor;
49 import org.apache.hadoop.hbase.HConstants;
50 import org.apache.hadoop.hbase.HRegionInfo;
51 import org.apache.hadoop.hbase.HTableDescriptor;
52 import org.apache.hadoop.hbase.KeyValue;
53 import org.apache.hadoop.hbase.ServerName;
54 import org.apache.hadoop.hbase.TableName;
55 import org.apache.hadoop.hbase.client.Durability;
56 import org.apache.hadoop.hbase.client.Get;
57 import org.apache.hadoop.hbase.client.Put;
58 import org.apache.hadoop.hbase.io.hfile.HFile;
59 import org.apache.hadoop.hbase.io.hfile.HFileContext;
60 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
61 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
62 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
63 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
64 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
65 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
66 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
67 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
68 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
69 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
70 import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
71 import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
72 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
73 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
74 import org.apache.hadoop.hbase.testclassification.MediumTests;
75 import org.apache.hadoop.hbase.util.Bytes;
76 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
77 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
78 import org.apache.hadoop.hbase.util.FSUtils;
79 import org.apache.hadoop.hbase.util.Pair;
80 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
81 import org.apache.hadoop.hbase.wal.WAL;
82 import org.apache.hadoop.hbase.wal.WALFactory;
83 import org.apache.hadoop.hbase.wal.WALKey;
84 import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
85 import org.apache.hadoop.util.StringUtils;
86 import org.junit.After;
87 import org.junit.Before;
88 import org.junit.Rule;
89 import org.junit.Test;
90 import org.junit.experimental.categories.Category;
91 import org.junit.rules.TestName;
92
93 import com.google.common.collect.Lists;
94 import com.google.protobuf.ByteString;
95
96
97
98
99
100 @Category(MediumTests.class)
101 public class TestHRegionReplayEvents {
102
103 private static final Log LOG = LogFactory.getLog(TestHRegion.class);
104 @Rule public TestName name = new TestName();
105
106 private static HBaseTestingUtility TEST_UTIL;
107
108 public static Configuration CONF ;
109 private String dir;
110 private static FileSystem FILESYSTEM;
111
112 private byte[][] families = new byte[][] {
113 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")};
114
115
116 protected byte[] tableName;
117 protected String method;
118 protected final byte[] row = Bytes.toBytes("rowA");
119 protected final byte[] row2 = Bytes.toBytes("rowB");
120 protected byte[] cq = Bytes.toBytes("cq");
121
122
123 private Path rootDir;
124 private HTableDescriptor htd;
125 private long time;
126 private RegionServerServices rss;
127 private HRegionInfo primaryHri, secondaryHri;
128 private HRegion primaryRegion, secondaryRegion;
129 private WALFactory wals;
130 private WAL walPrimary, walSecondary;
131 private WAL.Reader reader;
132
133 @Before
134 public void setup() throws IOException {
135 TEST_UTIL = HBaseTestingUtility.createLocalHTU();
136 FILESYSTEM = TEST_UTIL.getTestFileSystem();
137 CONF = TEST_UTIL.getConfiguration();
138 dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString();
139 method = name.getMethodName();
140 tableName = Bytes.toBytes(name.getMethodName());
141 rootDir = new Path(dir + method);
142 TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
143 method = name.getMethodName();
144
145 htd = new HTableDescriptor(TableName.valueOf(method));
146 for (byte[] family : families) {
147 htd.addFamily(new HColumnDescriptor(family));
148 }
149
150 time = System.currentTimeMillis();
151
152 primaryHri = new HRegionInfo(htd.getTableName(),
153 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
154 false, time, 0);
155 secondaryHri = new HRegionInfo(htd.getTableName(),
156 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
157 false, time, 1);
158
159 wals = TestHRegion.createWALFactory(CONF, rootDir);
160 walPrimary = wals.getWAL(primaryHri.getEncodedNameAsBytes());
161 walSecondary = wals.getWAL(secondaryHri.getEncodedNameAsBytes());
162
163 rss = mock(RegionServerServices.class);
164 when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
165 when(rss.getConfiguration()).thenReturn(CONF);
166 when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting());
167
168 primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
169 primaryRegion.close();
170
171 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
172 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
173
174 reader = null;
175 }
176
177 @After
178 public void tearDown() throws Exception {
179 if (reader != null) {
180 reader.close();
181 }
182
183 if (primaryRegion != null) {
184 HRegion.closeHRegion(primaryRegion);
185 }
186 if (secondaryRegion != null) {
187 HRegion.closeHRegion(secondaryRegion);
188 }
189
190 EnvironmentEdgeManagerTestHelper.reset();
191 LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
192 TEST_UTIL.cleanupTestDir();
193 }
194
195 String getName() {
196 return name.getMethodName();
197 }
198
199
200
201
202
203
204
205
206
207
208
209
210 @Test
211 public void testRegionReplicaSecondaryCannotFlush() throws IOException {
212
213
214
215 putDataByReplay(secondaryRegion, 0, 1000, cq, families);
216
217 verifyData(secondaryRegion, 0, 1000, cq, families);
218
219
220 FlushResultImpl flush = (FlushResultImpl)secondaryRegion.flush(true);
221 assertEquals(flush.result, FlushResultImpl.Result.CANNOT_FLUSH);
222
223 verifyData(secondaryRegion, 0, 1000, cq, families);
224
225
226 Map<byte[], List<StoreFile>> files = secondaryRegion.close(false);
227
228 for (List<StoreFile> f : files.values()) {
229 assertTrue(f.isEmpty());
230 }
231 }
232
233
234
235
236
237 @Test (timeout = 60000)
238 public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException {
239
240 int start = 0;
241 LOG.info("-- Writing some data to primary from " + start + " to " + (start+100));
242 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
243 LOG.info("-- Flushing primary, creating 3 files for 3 stores");
244 primaryRegion.flush(true);
245
246
247 reader = createWALReaderForPrimary();
248
249 LOG.info("-- Replaying edits and flush events in secondary");
250 while (true) {
251 WAL.Entry entry = reader.next();
252 if (entry == null) {
253 break;
254 }
255 FlushDescriptor flushDesc
256 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
257 if (flushDesc != null) {
258 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
259 LOG.info("-- Replaying flush start in secondary");
260 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
261 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
262 LOG.info("-- NOT Replaying flush commit in secondary");
263 }
264 } else {
265 replayEdit(secondaryRegion, entry);
266 }
267 }
268
269 assertTrue(rss.getRegionServerAccounting().getGlobalMemstoreSize() > 0);
270
271 secondaryRegion.close();
272
273
274 assertEquals(0, rss.getRegionServerAccounting().getGlobalMemstoreSize());
275 }
276
277 static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
278 if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
279 return 0;
280 }
281 Put put = new Put(entry.getEdit().getCells().get(0).getRow());
282 for (Cell cell : entry.getEdit().getCells()) put.add(cell);
283 put.setDurability(Durability.SKIP_WAL);
284 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
285 region.batchReplay(new MutationReplay[] {mutation},
286 entry.getKey().getLogSeqNum());
287 return Integer.parseInt(Bytes.toString(put.getRow()));
288 }
289
290 WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException {
291 return wals.createReader(TEST_UTIL.getTestFileSystem(),
292 DefaultWALProvider.getCurrentFileName(walPrimary),
293 TEST_UTIL.getConfiguration());
294 }
295
296 @Test
297 public void testReplayFlushesAndCompactions() throws IOException {
298
299
300
301 putDataWithFlushes(primaryRegion, 100, 300, 100);
302
303
304 LOG.info("-- Compacting primary, only 1 store");
305 primaryRegion.compactStore(Bytes.toBytes("cf1"),
306 NoLimitCompactionThroughputController.INSTANCE);
307
308
309 reader = createWALReaderForPrimary();
310
311 LOG.info("-- Replaying edits and flush events in secondary");
312 int lastReplayed = 0;
313 int expectedStoreFileCount = 0;
314 while (true) {
315 WAL.Entry entry = reader.next();
316 if (entry == null) {
317 break;
318 }
319 FlushDescriptor flushDesc
320 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
321 CompactionDescriptor compactionDesc
322 = WALEdit.getCompaction(entry.getEdit().getCells().get(0));
323 if (flushDesc != null) {
324
325 verifyData(secondaryRegion, 0, lastReplayed, cq, families);
326 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
327 long storeMemstoreSize = store.getMemStoreSize();
328 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
329 long storeFlushableSize = store.getFlushableSize();
330 long storeSize = store.getSize();
331 long storeSizeUncompressed = store.getStoreSizeUncompressed();
332 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
333 LOG.info("-- Replaying flush start in secondary");
334 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
335 assertNull(result.result);
336 assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber());
337
338
339 long newStoreMemstoreSize = store.getMemStoreSize();
340 LOG.info("Memstore size reduced by:"
341 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
342 assertTrue(storeMemstoreSize > newStoreMemstoreSize);
343
344 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
345 LOG.info("-- Replaying flush commit in secondary");
346 secondaryRegion.replayWALFlushCommitMarker(flushDesc);
347
348
349 expectedStoreFileCount++;
350 for (Store s : secondaryRegion.getStores()) {
351 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
352 }
353 long newFlushableSize = store.getFlushableSize();
354 assertTrue(storeFlushableSize > newFlushableSize);
355
356
357 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
358 assertTrue(regionMemstoreSize > newRegionMemstoreSize);
359
360
361 assertTrue(store.getSize() > storeSize);
362 assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed);
363 assertEquals(store.getSize(), store.getStorefilesSize());
364 }
365
366 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
367 } else if (compactionDesc != null) {
368 secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE);
369
370
371 for (Store store : secondaryRegion.getStores()) {
372 if (store.getColumnFamilyName().equals("cf1")) {
373 assertEquals(1, store.getStorefilesCount());
374 } else {
375 assertEquals(expectedStoreFileCount, store.getStorefilesCount());
376 }
377 }
378 } else {
379 lastReplayed = replayEdit(secondaryRegion, entry);;
380 }
381 }
382
383 assertEquals(400-1, lastReplayed);
384 LOG.info("-- Verifying edits from secondary");
385 verifyData(secondaryRegion, 0, 400, cq, families);
386
387 LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted");
388 verifyData(primaryRegion, 0, lastReplayed, cq, families);
389 for (Store store : primaryRegion.getStores()) {
390 if (store.getColumnFamilyName().equals("cf1")) {
391 assertEquals(1, store.getStorefilesCount());
392 } else {
393 assertEquals(expectedStoreFileCount, store.getStorefilesCount());
394 }
395 }
396 }
397
398
399
400
401
402 @Test
403 public void testReplayFlushStartMarkers() throws IOException {
404
405 putDataWithFlushes(primaryRegion, 100, 100, 100);
406 int numRows = 200;
407
408
409 reader = createWALReaderForPrimary();
410
411 LOG.info("-- Replaying edits and flush events in secondary");
412
413 FlushDescriptor startFlushDesc = null;
414
415 int lastReplayed = 0;
416 while (true) {
417 WAL.Entry entry = reader.next();
418 if (entry == null) {
419 break;
420 }
421 FlushDescriptor flushDesc
422 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
423 if (flushDesc != null) {
424
425 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
426 long storeMemstoreSize = store.getMemStoreSize();
427 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
428 long storeFlushableSize = store.getFlushableSize();
429
430 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
431 startFlushDesc = flushDesc;
432 LOG.info("-- Replaying flush start in secondary");
433 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
434 assertNull(result.result);
435 assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
436 assertTrue(regionMemstoreSize > 0);
437 assertTrue(storeFlushableSize > 0);
438
439
440 long newStoreMemstoreSize = store.getMemStoreSize();
441 LOG.info("Memstore size reduced by:"
442 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
443 assertTrue(storeMemstoreSize > newStoreMemstoreSize);
444 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
445
446 }
447
448 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
449 } else {
450 lastReplayed = replayEdit(secondaryRegion, entry);
451 }
452 }
453
454
455
456
457 verifyData(secondaryRegion, 0, numRows, cq, families);
458
459
460 LOG.info("-- Replaying same flush start in secondary again");
461 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
462 assertNull(result);
463
464 assertNotNull(secondaryRegion.getPrepareFlushResult());
465 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
466 startFlushDesc.getFlushSequenceNumber());
467 assertTrue(secondaryRegion.getMemstoreSize() > 0);
468 verifyData(secondaryRegion, 0, numRows, cq, families);
469
470
471 FlushDescriptor startFlushDescSmallerSeqId
472 = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50);
473 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId);
474 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId);
475 assertNull(result);
476
477 assertNotNull(secondaryRegion.getPrepareFlushResult());
478 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
479 startFlushDesc.getFlushSequenceNumber());
480 assertTrue(secondaryRegion.getMemstoreSize() > 0);
481 verifyData(secondaryRegion, 0, numRows, cq, families);
482
483
484 FlushDescriptor startFlushDescLargerSeqId
485 = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50);
486 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId);
487 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId);
488 assertNull(result);
489
490 assertNotNull(secondaryRegion.getPrepareFlushResult());
491 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
492 startFlushDesc.getFlushSequenceNumber());
493 assertTrue(secondaryRegion.getMemstoreSize() > 0);
494 verifyData(secondaryRegion, 0, numRows, cq, families);
495
496 LOG.info("-- Verifying edits from secondary");
497 verifyData(secondaryRegion, 0, numRows, cq, families);
498
499 LOG.info("-- Verifying edits from primary.");
500 verifyData(primaryRegion, 0, numRows, cq, families);
501 }
502
503
504
505
506
507 @Test
508 public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException {
509
510 putDataWithFlushes(primaryRegion, 100, 200, 100);
511 int numRows = 300;
512
513
514 reader = createWALReaderForPrimary();
515
516 LOG.info("-- Replaying edits and flush events in secondary");
517 FlushDescriptor startFlushDesc = null;
518 FlushDescriptor commitFlushDesc = null;
519
520 int lastReplayed = 0;
521 while (true) {
522 System.out.println(lastReplayed);
523 WAL.Entry entry = reader.next();
524 if (entry == null) {
525 break;
526 }
527 FlushDescriptor flushDesc
528 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
529 if (flushDesc != null) {
530 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
531
532 if (startFlushDesc == null) {
533 startFlushDesc = flushDesc;
534 } else {
535 LOG.info("-- Replaying flush start in secondary");
536 startFlushDesc = flushDesc;
537 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
538 assertNull(result.result);
539 }
540 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
541
542 if (commitFlushDesc == null) {
543 commitFlushDesc = flushDesc;
544 }
545 }
546
547 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
548 } else {
549 lastReplayed = replayEdit(secondaryRegion, entry);
550 }
551 }
552
553
554
555 verifyData(secondaryRegion, 0, numRows, cq, families);
556
557
558 int expectedStoreFileCount = 0;
559 for (Store s : secondaryRegion.getStores()) {
560 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
561 }
562 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
563
564
565 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
566 + startFlushDesc);
567 assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber());
568
569 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
570 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
571
572
573 expectedStoreFileCount++;
574 for (Store s : secondaryRegion.getStores()) {
575 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
576 }
577 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
578 long newFlushableSize = store.getFlushableSize();
579 assertTrue(newFlushableSize > 0);
580
581
582 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
583 assertEquals(regionMemstoreSize, newRegionMemstoreSize);
584
585 assertNotNull(secondaryRegion.getPrepareFlushResult());
586
587 LOG.info("-- Verifying edits from secondary");
588 verifyData(secondaryRegion, 0, numRows, cq, families);
589
590 LOG.info("-- Verifying edits from primary.");
591 verifyData(primaryRegion, 0, numRows, cq, families);
592 }
593
594
595
596
597
598 @Test
599 public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException {
600
601 putDataWithFlushes(primaryRegion, 100, 100, 100);
602 int numRows = 200;
603
604
605 reader = createWALReaderForPrimary();
606
607 LOG.info("-- Replaying edits and flush events in secondary");
608 FlushDescriptor startFlushDesc = null;
609 FlushDescriptor commitFlushDesc = null;
610
611 int lastReplayed = 0;
612 while (true) {
613 WAL.Entry entry = reader.next();
614 if (entry == null) {
615 break;
616 }
617 FlushDescriptor flushDesc
618 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
619 if (flushDesc != null) {
620 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
621 if (startFlushDesc == null) {
622 LOG.info("-- Replaying flush start in secondary");
623 startFlushDesc = flushDesc;
624 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
625 assertNull(result.result);
626 }
627 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
628
629
630
631 commitFlushDesc =
632 FlushDescriptor.newBuilder(flushDesc)
633 .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50)
634 .build();
635 }
636
637 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
638 } else {
639 lastReplayed = replayEdit(secondaryRegion, entry);
640 }
641 }
642
643
644
645 verifyData(secondaryRegion, 0, numRows, cq, families);
646
647
648 int expectedStoreFileCount = 0;
649 for (Store s : secondaryRegion.getStores()) {
650 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
651 }
652 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
653
654
655 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
656 + startFlushDesc);
657 assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber());
658
659 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
660 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
661
662
663 expectedStoreFileCount++;
664 for (Store s : secondaryRegion.getStores()) {
665 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
666 }
667 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
668 long newFlushableSize = store.getFlushableSize();
669 assertTrue(newFlushableSize > 0);
670
671
672 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
673 assertTrue(newRegionMemstoreSize > 0);
674 assertTrue(regionMemstoreSize > newRegionMemstoreSize);
675
676 assertNull(secondaryRegion.getPrepareFlushResult());
677
678 LOG.info("-- Verifying edits from secondary");
679 verifyData(secondaryRegion, 0, numRows, cq, families);
680
681 LOG.info("-- Verifying edits from primary.");
682 verifyData(primaryRegion, 0, numRows, cq, families);
683 }
684
685
686
687
688
689
690 @Test
691 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
692 throws IOException {
693 testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
694 }
695
696
697
698
699
700
701 @Test
702 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
703 throws IOException {
704 testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
705 }
706
707
708
709
710 public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore)
711 throws IOException {
712
713
714 putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100);
715 int numRows = droppableMemstore ? 100 : 200;
716
717
718 reader = createWALReaderForPrimary();
719
720 LOG.info("-- Replaying edits and flush events in secondary");
721 FlushDescriptor commitFlushDesc = null;
722
723 int lastReplayed = 0;
724 while (true) {
725 WAL.Entry entry = reader.next();
726 if (entry == null) {
727 break;
728 }
729 FlushDescriptor flushDesc
730 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
731 if (flushDesc != null) {
732 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
733
734 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
735 commitFlushDesc = flushDesc;
736 }
737
738 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
739 } else {
740 lastReplayed = replayEdit(secondaryRegion, entry);
741 }
742 }
743
744
745
746 verifyData(secondaryRegion, 0, numRows, cq, families);
747
748
749 int expectedStoreFileCount = 0;
750 for (Store s : secondaryRegion.getStores()) {
751 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
752 }
753 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
754
755
756 assertNull(secondaryRegion.getPrepareFlushResult());
757 assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0);
758
759
760 for (Store store : secondaryRegion.getStores()) {
761 assertTrue(store.getMaxSequenceId() <= secondaryRegion.getSequenceId());
762 }
763
764 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
765 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
766
767
768 expectedStoreFileCount++;
769 for (Store s : secondaryRegion.getStores()) {
770 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
771 }
772 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
773 long newFlushableSize = store.getFlushableSize();
774 if (droppableMemstore) {
775 assertTrue(newFlushableSize == 0);
776 } else {
777 assertTrue(newFlushableSize > 0);
778 }
779
780
781 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
782 if (droppableMemstore) {
783 assertTrue(0 == newRegionMemstoreSize);
784 } else {
785 assertTrue(regionMemstoreSize == newRegionMemstoreSize);
786 }
787
788 LOG.info("-- Verifying edits from secondary");
789 verifyData(secondaryRegion, 0, numRows, cq, families);
790
791 LOG.info("-- Verifying edits from primary.");
792 verifyData(primaryRegion, 0, numRows, cq, families);
793 }
794
795 private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) {
796 return FlushDescriptor.newBuilder(flush)
797 .setFlushSequenceNumber(flushSeqId)
798 .build();
799 }
800
801
802
803
804 @Test
805 public void testReplayRegionOpenEvent() throws IOException {
806 putDataWithFlushes(primaryRegion, 100, 0, 100);
807 int numRows = 100;
808
809
810 primaryRegion.close();
811 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
812
813
814 reader = createWALReaderForPrimary();
815 List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
816
817 LOG.info("-- Replaying edits and region events in secondary");
818 while (true) {
819 WAL.Entry entry = reader.next();
820 if (entry == null) {
821 break;
822 }
823 FlushDescriptor flushDesc
824 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
825 RegionEventDescriptor regionEventDesc
826 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
827
828 if (flushDesc != null) {
829
830 } else if (regionEventDesc != null) {
831 regionEvents.add(regionEventDesc);
832 } else {
833
834 }
835 }
836
837
838 assertEquals(3, regionEvents.size());
839
840
841 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0));
842
843
844 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1));
845
846
847 int expectedStoreFileCount = 0;
848 for (Store s : secondaryRegion.getStores()) {
849 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
850 }
851 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
852 assertTrue(regionMemstoreSize == 0);
853
854
855 LOG.info("Testing replaying region open event " + regionEvents.get(2));
856 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
857
858
859 expectedStoreFileCount++;
860 for (Store s : secondaryRegion.getStores()) {
861 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
862 }
863 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
864 long newFlushableSize = store.getFlushableSize();
865 assertTrue(newFlushableSize == 0);
866
867
868 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
869 assertTrue(newRegionMemstoreSize == 0);
870
871 assertNull(secondaryRegion.getPrepareFlushResult());
872
873 LOG.info("-- Verifying edits from secondary");
874 verifyData(secondaryRegion, 0, numRows, cq, families);
875
876 LOG.info("-- Verifying edits from primary.");
877 verifyData(primaryRegion, 0, numRows, cq, families);
878 }
879
880
881
882
883
884 @Test
885 public void testReplayRegionOpenEventAfterFlushStart() throws IOException {
886 putDataWithFlushes(primaryRegion, 100, 100, 100);
887 int numRows = 200;
888
889
890 primaryRegion.close();
891 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
892
893
894 reader = createWALReaderForPrimary();
895 List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
896
897 LOG.info("-- Replaying edits and region events in secondary");
898 while (true) {
899 WAL.Entry entry = reader.next();
900 if (entry == null) {
901 break;
902 }
903 FlushDescriptor flushDesc
904 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
905 RegionEventDescriptor regionEventDesc
906 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
907
908 if (flushDesc != null) {
909
910 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
911 secondaryRegion.replayWALFlushStartMarker(flushDesc);
912 }
913 } else if (regionEventDesc != null) {
914 regionEvents.add(regionEventDesc);
915 } else {
916 replayEdit(secondaryRegion, entry);
917 }
918 }
919
920
921
922 verifyData(secondaryRegion, 0, numRows, cq, families);
923
924
925 assertEquals(3, regionEvents.size());
926
927
928 int expectedStoreFileCount = 0;
929 for (Store s : secondaryRegion.getStores()) {
930 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
931 }
932
933
934 LOG.info("Testing replaying region open event " + regionEvents.get(2));
935 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
936
937
938 expectedStoreFileCount = 2;
939 for (Store s : secondaryRegion.getStores()) {
940 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
941 }
942 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
943 long newSnapshotSize = store.getSnapshotSize();
944 assertTrue(newSnapshotSize == 0);
945
946
947 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
948 assertTrue(newRegionMemstoreSize == 0);
949
950 assertNull(secondaryRegion.getPrepareFlushResult());
951
952 LOG.info("-- Verifying edits from secondary");
953 verifyData(secondaryRegion, 0, numRows, cq, families);
954
955 LOG.info("-- Verifying edits from primary.");
956 verifyData(primaryRegion, 0, numRows, cq, families);
957 }
958
959
960
961
962
963 @Test
964 public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException {
965 putDataWithFlushes(primaryRegion, 100, 100, 0);
966 int numRows = 100;
967
968
969 primaryRegion.close();
970 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
971
972
973 reader = createWALReaderForPrimary();
974 List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
975 List<WAL.Entry> edits = Lists.newArrayList();
976
977 LOG.info("-- Replaying edits and region events in secondary");
978 while (true) {
979 WAL.Entry entry = reader.next();
980 if (entry == null) {
981 break;
982 }
983 FlushDescriptor flushDesc
984 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
985 RegionEventDescriptor regionEventDesc
986 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
987
988 if (flushDesc != null) {
989
990 } else if (regionEventDesc != null) {
991 regionEvents.add(regionEventDesc);
992 } else {
993 edits.add(entry);
994 }
995 }
996
997
998
999 secondaryRegion.replayWALRegionEventMarker(
1000 RegionEventDescriptor.newBuilder(regionEvents.get(0)).setLogSequenceNumber(
1001 regionEvents.get(2).getLogSequenceNumber()).build());
1002
1003
1004
1005
1006 for (WAL.Entry entry: edits) {
1007 replayEdit(secondaryRegion, entry);
1008 }
1009
1010 boolean expectedFail = false;
1011 try {
1012 verifyData(secondaryRegion, 0, numRows, cq, families);
1013 } catch (AssertionError e) {
1014 expectedFail = true;
1015 }
1016 if (!expectedFail) {
1017 fail("Should have failed this verification");
1018 }
1019 }
1020
1021 @Test
1022 public void testReplayFlushSeqIds() throws IOException {
1023
1024 int start = 0;
1025 LOG.info("-- Writing some data to primary from " + start + " to " + (start+100));
1026 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
1027 LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1028 primaryRegion.flush(true);
1029
1030
1031 reader = createWALReaderForPrimary();
1032
1033 long flushSeqId = -1;
1034 LOG.info("-- Replaying flush events in secondary");
1035 while (true) {
1036 WAL.Entry entry = reader.next();
1037 if (entry == null) {
1038 break;
1039 }
1040 FlushDescriptor flushDesc
1041 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1042 if (flushDesc != null) {
1043 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1044 LOG.info("-- Replaying flush start in secondary");
1045 secondaryRegion.replayWALFlushStartMarker(flushDesc);
1046 flushSeqId = flushDesc.getFlushSequenceNumber();
1047 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1048 LOG.info("-- Replaying flush commit in secondary");
1049 secondaryRegion.replayWALFlushCommitMarker(flushDesc);
1050 assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber());
1051 }
1052 }
1053
1054 }
1055
1056
1057
1058 long readPoint = secondaryRegion.getMVCC().getReadPoint();
1059 assertEquals(flushSeqId, readPoint);
1060
1061
1062 verifyData(secondaryRegion, 0, 100, cq, families);
1063 }
1064
1065 @Test
1066 public void testSeqIdsFromReplay() throws IOException {
1067
1068
1069 String method = name.getMethodName();
1070 byte[] tableName = Bytes.toBytes(method);
1071 byte[] family = Bytes.toBytes("family");
1072
1073 HRegion region = initHRegion(tableName, method, family);
1074 try {
1075
1076 long readPoint = region.getMVCC().getReadPoint();
1077 long origSeqId = readPoint + 100;
1078
1079 Put put = new Put(row).add(family, row, row);
1080 put.setDurability(Durability.SKIP_WAL);
1081 replay(region, put, origSeqId);
1082
1083
1084 assertGet(region, family, row);
1085
1086
1087 assertEquals(origSeqId, region.getSequenceId());
1088
1089
1090
1091
1092 put = new Put(row2).add(family, row2, row2);
1093 put.setDurability(Durability.SKIP_WAL);
1094 replay(region, put, origSeqId - 50);
1095
1096 assertGet(region, family, row2);
1097 } finally {
1098 region.close();
1099 }
1100 }
1101
1102
1103
1104
1105
1106
1107 @SuppressWarnings("unchecked")
1108 @Test
1109 public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException {
1110 secondaryRegion.close();
1111 walSecondary = spy(walSecondary);
1112
1113
1114 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
1115 verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
1116 (WALKey)any(), (WALEdit)any(), anyBoolean());
1117
1118
1119 putDataByReplay(secondaryRegion, 0, 10, cq, families);
1120 secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder().
1121 setFlushSequenceNumber(10)
1122 .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
1123 .setAction(FlushAction.START_FLUSH)
1124 .setEncodedRegionName(
1125 ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1126 .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
1127 .build());
1128
1129 verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
1130 (WALKey)any(), (WALEdit)any(), anyBoolean());
1131
1132 secondaryRegion.close();
1133 verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
1134 (WALKey)any(), (WALEdit)any(), anyBoolean());
1135 }
1136
1137
1138
1139
1140 @Test
1141 public void testRegionReadsEnabledFlag() throws IOException {
1142
1143 putDataByReplay(secondaryRegion, 0, 100, cq, families);
1144
1145 verifyData(secondaryRegion, 0, 100, cq, families);
1146
1147
1148 secondaryRegion.setReadsEnabled(false);
1149 try {
1150 verifyData(secondaryRegion, 0, 100, cq, families);
1151 fail("Should have failed with IOException");
1152 } catch(IOException ex) {
1153
1154 }
1155
1156
1157 putDataByReplay(secondaryRegion, 100, 100, cq, families);
1158
1159
1160 secondaryRegion.setReadsEnabled(true);
1161 verifyData(secondaryRegion, 0, 200, cq, families);
1162 }
1163
1164
1165
1166
1167
1168 @Test
1169 public void testWriteFlushRequestMarker() throws IOException {
1170
1171 FlushResultImpl result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, false);
1172 assertNotNull(result);
1173 assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
1174 assertFalse(result.wroteFlushWalMarker);
1175
1176
1177 result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, true);
1178 assertNotNull(result);
1179 assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
1180 assertTrue(result.wroteFlushWalMarker);
1181
1182 List<FlushDescriptor> flushes = Lists.newArrayList();
1183 reader = createWALReaderForPrimary();
1184 while (true) {
1185 WAL.Entry entry = reader.next();
1186 if (entry == null) {
1187 break;
1188 }
1189 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1190 if (flush != null) {
1191 flushes.add(flush);
1192 }
1193 }
1194
1195 assertEquals(1, flushes.size());
1196 assertNotNull(flushes.get(0));
1197 assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction());
1198 }
1199
1200
1201
1202
1203
1204
1205
1206 @Test
1207 public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
1208 disableReads(secondaryRegion);
1209
1210
1211
1212 primaryRegion.flushcache(true, true);
1213 reader = createWALReaderForPrimary();
1214 while (true) {
1215 WAL.Entry entry = reader.next();
1216 if (entry == null) {
1217 break;
1218 }
1219 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1220 if (flush != null) {
1221 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
1222 }
1223 }
1224
1225
1226 secondaryRegion.get(new Get(Bytes.toBytes(0)));
1227 }
1228
1229
1230
1231
1232
1233
1234
1235 @Test
1236 public void testReplayingFlushRestoresReadsEnabledState() throws IOException {
1237
1238
1239 disableReads(secondaryRegion);
1240
1241
1242 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1243 primaryRegion.flush(true);
1244
1245 reader = createWALReaderForPrimary();
1246 while (true) {
1247 WAL.Entry entry = reader.next();
1248 if (entry == null) {
1249 break;
1250 }
1251 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1252 if (flush != null) {
1253 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
1254 } else {
1255 replayEdit(secondaryRegion, entry);
1256 }
1257 }
1258
1259
1260 verifyData(secondaryRegion, 0, 100, cq, families);
1261 }
1262
1263
1264
1265
1266
1267
1268
1269 @Test
1270 public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException {
1271
1272
1273 disableReads(secondaryRegion);
1274
1275
1276 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1277 primaryRegion.flush(true);
1278
1279 reader = createWALReaderForPrimary();
1280 while (true) {
1281 WAL.Entry entry = reader.next();
1282 if (entry == null) {
1283 break;
1284 }
1285 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1286 if (flush != null) {
1287 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
1288 }
1289 }
1290
1291
1292 verifyData(secondaryRegion, 0, 100, cq, families);
1293 }
1294
1295
1296
1297
1298
1299
1300
1301 @Test
1302 public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
1303
1304 disableReads(secondaryRegion);
1305
1306 primaryRegion.close();
1307 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1308
1309 reader = createWALReaderForPrimary();
1310 while (true) {
1311 WAL.Entry entry = reader.next();
1312 if (entry == null) {
1313 break;
1314 }
1315
1316 RegionEventDescriptor regionEventDesc
1317 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1318
1319 if (regionEventDesc != null) {
1320 secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
1321 }
1322 }
1323
1324
1325 secondaryRegion.get(new Get(Bytes.toBytes(0)));
1326 }
1327
1328 @Test
1329 public void testRefreshStoreFiles() throws IOException {
1330 assertEquals(0, primaryRegion.getStoreFileList(families).size());
1331 assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1332
1333
1334 secondaryRegion.refreshStoreFiles();
1335 assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1336
1337
1338 putDataWithFlushes(primaryRegion, 100, 100, 0);
1339 int numRows = 100;
1340
1341
1342 secondaryRegion.refreshStoreFiles();
1343 assertPathListsEqual(primaryRegion.getStoreFileList(families),
1344 secondaryRegion.getStoreFileList(families));
1345 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1346
1347 LOG.info("-- Verifying edits from secondary");
1348 verifyData(secondaryRegion, 0, numRows, cq, families);
1349
1350
1351 putDataWithFlushes(primaryRegion, 100, 300, 0);
1352 numRows = 300;
1353
1354
1355 secondaryRegion.refreshStoreFiles();
1356 assertPathListsEqual(primaryRegion.getStoreFileList(families),
1357 secondaryRegion.getStoreFileList(families));
1358 assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size());
1359
1360 LOG.info("-- Verifying edits from secondary");
1361 verifyData(secondaryRegion, 0, numRows, cq, families);
1362
1363 if (FSUtils.WINDOWS) {
1364
1365 return;
1366 }
1367
1368
1369 primaryRegion.compactStores();
1370 secondaryRegion.refreshStoreFiles();
1371 assertPathListsEqual(primaryRegion.getStoreFileList(families),
1372 secondaryRegion.getStoreFileList(families));
1373 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1374
1375 LOG.info("-- Verifying edits from secondary");
1376 verifyData(secondaryRegion, 0, numRows, cq, families);
1377
1378 LOG.info("-- Replaying edits in secondary");
1379
1380
1381 assertTrue(secondaryRegion.getMemstoreSize() == 0);
1382 putDataWithFlushes(primaryRegion, 400, 400, 0);
1383 numRows = 400;
1384
1385 reader = createWALReaderForPrimary();
1386 while (true) {
1387 WAL.Entry entry = reader.next();
1388 if (entry == null) {
1389 break;
1390 }
1391 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1392 if (flush != null) {
1393
1394 } else {
1395 replayEdit(secondaryRegion, entry);
1396 }
1397 }
1398
1399 assertTrue(secondaryRegion.getMemstoreSize() > 0);
1400
1401 secondaryRegion.refreshStoreFiles();
1402
1403 assertTrue(secondaryRegion.getMemstoreSize() == 0);
1404
1405 LOG.info("-- Verifying edits from primary");
1406 verifyData(primaryRegion, 0, numRows, cq, families);
1407 LOG.info("-- Verifying edits from secondary");
1408 verifyData(secondaryRegion, 0, numRows, cq, families);
1409 }
1410
1411
1412
1413
1414 private void assertPathListsEqual(List<String> list1, List<String> list2) {
1415 List<Path> l1 = new ArrayList<>(list1.size());
1416 for (String path : list1) {
1417 l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1418 }
1419 List<Path> l2 = new ArrayList<>(list2.size());
1420 for (String path : list2) {
1421 l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1422 }
1423 assertEquals(l1, l2);
1424 }
1425
1426 private void disableReads(HRegion region) {
1427 region.setReadsEnabled(false);
1428 try {
1429 verifyData(region, 0, 1, cq, families);
1430 fail("Should have failed with IOException");
1431 } catch(IOException ex) {
1432
1433 }
1434 }
1435
1436 private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
1437 put.setDurability(Durability.SKIP_WAL);
1438 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
1439 region.batchReplay(new MutationReplay[] {mutation}, replaySeqId);
1440 }
1441
1442
1443
1444
1445 @Test
1446 public void testReplayBulkLoadEvent() throws IOException {
1447 LOG.info("testReplayBulkLoadEvent starts");
1448 putDataWithFlushes(primaryRegion, 100, 0, 100);
1449
1450
1451 primaryRegion.close();
1452 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1453
1454
1455 Random random = new Random();
1456 byte[] randomValues = new byte[20];
1457 random.nextBytes(randomValues);
1458 Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
1459
1460 List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
1461 int expectedLoadFileCount = 0;
1462 for (byte[] family : families) {
1463 familyPaths.add(new Pair<byte[], String>(family, createHFileForFamilies(testPath, family,
1464 randomValues)));
1465 expectedLoadFileCount++;
1466 }
1467 primaryRegion.bulkLoadHFiles(familyPaths, false, null);
1468
1469
1470 reader = createWALReaderForPrimary();
1471
1472 LOG.info("-- Replaying edits and region events in secondary");
1473 BulkLoadDescriptor bulkloadEvent = null;
1474 while (true) {
1475 WAL.Entry entry = reader.next();
1476 if (entry == null) {
1477 break;
1478 }
1479 bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
1480 if (bulkloadEvent != null) {
1481 break;
1482 }
1483 }
1484
1485
1486 assertTrue(bulkloadEvent != null);
1487 assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
1488
1489
1490 secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
1491
1492
1493 List<String> storeFileName = new ArrayList<String>();
1494 for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
1495 storeFileName.addAll(storeDesc.getStoreFileList());
1496 }
1497
1498 for (Store s : secondaryRegion.getStores()) {
1499 for (StoreFile sf : s.getStorefiles()) {
1500 storeFileName.remove(sf.getPath().getName());
1501 }
1502 }
1503 assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty());
1504
1505 LOG.info("-- Verifying edits from secondary");
1506 for (byte[] family : families) {
1507 assertGet(secondaryRegion, family, randomValues);
1508 }
1509 }
1510
1511 @Test
1512 public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
1513
1514
1515 secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder().
1516 setFlushSequenceNumber(Long.MAX_VALUE)
1517 .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
1518 .setAction(FlushAction.COMMIT_FLUSH)
1519 .setEncodedRegionName(
1520 ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1521 .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
1522 .addStoreFlushes(StoreFlushDescriptor.newBuilder()
1523 .setFamilyName(ByteString.copyFrom(families[0]))
1524 .setStoreHomeDir("/store_home_dir")
1525 .addFlushOutput("/foo/baz/bar")
1526 .build())
1527 .build());
1528 }
1529
1530 @Test
1531 public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
1532
1533
1534 secondaryRegion.replayWALCompactionMarker(CompactionDescriptor.newBuilder()
1535 .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
1536 .setEncodedRegionName(
1537 ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1538 .setFamilyName(ByteString.copyFrom(families[0]))
1539 .addCompactionInput("/foo")
1540 .addCompactionOutput("/bar")
1541 .setStoreHomeDir("/store_home_dir")
1542 .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
1543 .build()
1544 , true, true, Long.MAX_VALUE);
1545 }
1546
1547 @Test
1548 public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
1549
1550
1551 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
1552 .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
1553 .setEncodedRegionName(
1554 ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1555 .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
1556 .setEventType(EventType.REGION_OPEN)
1557 .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
1558 .setLogSequenceNumber(Long.MAX_VALUE)
1559 .addStores(StoreDescriptor.newBuilder()
1560 .setFamilyName(ByteString.copyFrom(families[0]))
1561 .setStoreHomeDir("/store_home_dir")
1562 .addStoreFile("/foo")
1563 .build())
1564 .build());
1565 }
1566
1567 @Test
1568 public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
1569
1570
1571 secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
1572 .setTableName(ProtobufUtil.toProtoTableName(primaryRegion.getTableDesc().getTableName()))
1573 .setEncodedRegionName(
1574 ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1575 .setBulkloadSeqNum(Long.MAX_VALUE)
1576 .addStores(StoreDescriptor.newBuilder()
1577 .setFamilyName(ByteString.copyFrom(families[0]))
1578 .setStoreHomeDir("/store_home_dir")
1579 .addStoreFile("/foo")
1580 .build())
1581 .build());
1582 }
1583
1584 private String createHFileForFamilies(Path testPath, byte[] family,
1585 byte[] valueBytes) throws IOException {
1586 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
1587
1588 Path testFile = new Path(testPath, UUID.randomUUID().toString());
1589 FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile);
1590 try {
1591 hFileFactory.withOutputStream(out);
1592 hFileFactory.withFileContext(new HFileContext());
1593 HFile.Writer writer = hFileFactory.create();
1594 try {
1595 writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0l,
1596 KeyValue.Type.Put.getCode(), valueBytes)));
1597 } finally {
1598 writer.close();
1599 }
1600 } finally {
1601 out.close();
1602 }
1603 return testFile.toString();
1604 }
1605
1606
1607
1608
1609
1610 private void putDataWithFlushes(HRegion region, int flushInterval,
1611 int numRows, int numRowsAfterFlush) throws IOException {
1612 int start = 0;
1613 for (; start < numRows; start += flushInterval) {
1614 LOG.info("-- Writing some data to primary from " + start + " to " + (start+flushInterval));
1615 putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families);
1616 LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1617 region.flush(true);
1618 }
1619 LOG.info("-- Writing some more data to primary, not flushing");
1620 putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families);
1621 }
1622
1623 private void putDataByReplay(HRegion region,
1624 int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
1625 for (int i = startRow; i < startRow + numRows; i++) {
1626 Put put = new Put(Bytes.toBytes("" + i));
1627 put.setDurability(Durability.SKIP_WAL);
1628 for (byte[] family : families) {
1629 put.add(family, qf, EnvironmentEdgeManager.currentTime(), null);
1630 }
1631 replay(region, put, i+1);
1632 }
1633 }
1634
1635 private static HRegion initHRegion(byte[] tableName,
1636 String callingMethod, byte[]... families) throws IOException {
1637 return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
1638 callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families);
1639 }
1640
1641 private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
1642 String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
1643 WAL wal, byte[]... families) throws IOException {
1644 return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf,
1645 isReadOnly, durability, wal, families);
1646 }
1647 }