1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.junit.Assert.assertArrayEquals;
21 import static org.junit.Assert.assertTrue;
22 import static org.mockito.Mockito.doAnswer;
23 import static org.mockito.Mockito.spy;
24
25 import java.io.IOException;
26 import java.util.Collection;
27 import java.util.Map;
28
29 import org.apache.commons.lang.mutable.MutableBoolean;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.DroppedSnapshotException;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HColumnDescriptor;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.NamespaceDescriptor;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.client.Connection;
40 import org.apache.hadoop.hbase.client.Get;
41 import org.apache.hadoop.hbase.client.HBaseAdmin;
42 import org.apache.hadoop.hbase.client.Put;
43 import org.apache.hadoop.hbase.client.Result;
44 import org.apache.hadoop.hbase.client.Table;
45 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
46 import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
47 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
48 import org.apache.hadoop.hbase.testclassification.MediumTests;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51 import org.apache.hadoop.hbase.wal.WAL;
52 import org.junit.After;
53 import org.junit.Before;
54 import org.junit.Test;
55 import org.junit.experimental.categories.Category;
56 import org.mockito.Matchers;
57 import org.mockito.invocation.InvocationOnMock;
58 import org.mockito.stubbing.Answer;
59
60
61
62
63 @Category({ MediumTests.class })
64 public class TestSplitWalDataLoss {
65
66 private static final Log LOG = LogFactory.getLog(TestSplitWalDataLoss.class);
67
68 private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
69
70 private NamespaceDescriptor namespace = NamespaceDescriptor.create(getClass().getSimpleName())
71 .build();
72
73 private TableName tableName = TableName.valueOf(namespace.getName(), "dataloss");
74
75 private byte[] family = Bytes.toBytes("f");
76
77 private byte[] qualifier = Bytes.toBytes("q");
78
79 @Before
80 public void setUp() throws Exception {
81 testUtil.getConfiguration().setInt("hbase.regionserver.msginterval", 30000);
82 testUtil.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
83 testUtil.startMiniCluster(2);
84 HBaseAdmin admin = testUtil.getHBaseAdmin();
85 admin.createNamespace(namespace);
86 admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(family)));
87 testUtil.waitTableAvailable(tableName);
88 }
89
90 @After
91 public void tearDown() throws Exception {
92 testUtil.shutdownMiniCluster();
93 }
94
95 @Test
96 public void test() throws IOException, InterruptedException {
97 final HRegionServer rs = testUtil.getRSForFirstRegionInTable(tableName);
98 final HRegion region = (HRegion) rs.getOnlineRegions(tableName).get(0);
99 HRegion spiedRegion = spy(region);
100 final MutableBoolean flushed = new MutableBoolean(false);
101 final MutableBoolean reported = new MutableBoolean(false);
102 doAnswer(new Answer<FlushResult>() {
103 @Override
104 public FlushResult answer(InvocationOnMock invocation) throws Throwable {
105 synchronized (flushed) {
106 flushed.setValue(true);
107 flushed.notifyAll();
108 }
109 synchronized (reported) {
110 while (!reported.booleanValue()) {
111 reported.wait();
112 }
113 }
114 rs.getWAL(region.getRegionInfo()).abortCacheFlush(
115 region.getRegionInfo().getEncodedNameAsBytes());
116 throw new DroppedSnapshotException("testcase");
117 }
118 }).when(spiedRegion).internalFlushCacheAndCommit(Matchers.<WAL> any(),
119 Matchers.<MonitoredTask> any(), Matchers.<PrepareFlushResult> any(),
120 Matchers.<Collection<Store>> any());
121
122 String key = null;
123 for (Map.Entry<String, Region> entry: rs.onlineRegions.entrySet()) {
124 if (entry.getValue().getRegionInfo().getTable().equals(this.tableName)) {
125 key = entry.getKey();
126 break;
127 }
128 }
129 rs.onlineRegions.put(key, spiedRegion);
130 Connection conn = testUtil.getConnection();
131
132 try (Table table = conn.getTable(tableName)) {
133 table.put(new Put(Bytes.toBytes("row0")).addColumn(family, qualifier, Bytes.toBytes("val0")));
134 }
135 long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
136 LOG.info("CHANGE OLDEST " + oldestSeqIdOfStore);
137 assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
138 rs.cacheFlusher.requestFlush(spiedRegion, false);
139 synchronized (flushed) {
140 while (!flushed.booleanValue()) {
141 flushed.wait();
142 }
143 }
144 try (Table table = conn.getTable(tableName)) {
145 table.put(new Put(Bytes.toBytes("row1")).addColumn(family, qualifier, Bytes.toBytes("val1")));
146 }
147 long now = EnvironmentEdgeManager.currentTime();
148 rs.tryRegionServerReport(now - 500, now);
149 synchronized (reported) {
150 reported.setValue(true);
151 reported.notifyAll();
152 }
153 while (testUtil.getRSForFirstRegionInTable(tableName) == rs) {
154 Thread.sleep(100);
155 }
156 try (Table table = conn.getTable(tableName)) {
157 Result result = table.get(new Get(Bytes.toBytes("row0")));
158 assertArrayEquals(Bytes.toBytes("val0"), result.getValue(family, qualifier));
159 }
160 }
161 }