1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import com.google.protobuf.RpcController;
21 import com.google.protobuf.ServiceException;
22 import org.apache.hadoop.conf.Configuration;
23 import org.apache.hadoop.hbase.*;
24
25 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
26 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
27 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
28 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
29 import org.apache.hadoop.hbase.quotas.ThrottlingException;
30 import org.apache.hadoop.hbase.regionserver.HRegionServer;
31 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
32 import org.apache.hadoop.hbase.testclassification.ClientTests;
33 import org.apache.hadoop.hbase.testclassification.MediumTests;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.apache.hadoop.hbase.util.JVMClusterUtil;
36 import org.junit.After;
37 import org.junit.AfterClass;
38 import org.junit.Before;
39 import org.junit.BeforeClass;
40 import org.junit.Test;
41 import org.junit.experimental.categories.Category;
42
43 import java.io.IOException;
44 import java.util.ArrayList;
45 import java.util.List;
46
47 import static org.junit.Assert.*;
48
49 @Category({MediumTests.class, ClientTests.class})
50 public class TestMetaCache {
51 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
52 private static final TableName TABLE_NAME = TableName.valueOf("test_table");
53 private static final byte[] FAMILY = Bytes.toBytes("fam1");
54 private static final byte[] QUALIFIER = Bytes.toBytes("qual");
55 private ConnectionManager.HConnectionImplementation conn;
56 private HRegionServer badRS;
57
58
59
60
61 @BeforeClass
62 public static void setUpBeforeClass() throws Exception {
63 Configuration conf = TEST_UTIL.getConfiguration();
64 conf.set("hbase.client.retries.number", "1");
65 TEST_UTIL.startMiniCluster(1);
66 TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
67 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME.META_TABLE_NAME);
68 }
69
70
71
72
73
74 @AfterClass
75 public static void tearDownAfterClass() throws Exception {
76 TEST_UTIL.shutdownMiniCluster();
77 }
78
79
80
81
82 @Before
83 public void setup() throws Exception {
84 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
85
86 cluster.getConfiguration().setStrings(HConstants.REGION_SERVER_IMPL,
87 RegionServerWithFakeRpcServices.class.getName());
88 JVMClusterUtil.RegionServerThread rsThread = cluster.startRegionServer();
89 rsThread.waitForServerOnline();
90 badRS = rsThread.getRegionServer();
91 assertTrue(badRS.getRSRpcServices() instanceof FakeRSRpcServices);
92 cluster.getConfiguration().setStrings(HConstants.REGION_SERVER_IMPL,
93 HRegionServer.class.getName());
94
95 assertEquals(2, cluster.getRegionServerThreads().size());
96
97 conn = (ConnectionManager.HConnectionImplementation)ConnectionFactory.createConnection(
98 TEST_UTIL.getConfiguration());
99 HTableDescriptor table = new HTableDescriptor(TABLE_NAME);
100 HColumnDescriptor fam = new HColumnDescriptor(FAMILY);
101 fam.setMaxVersions(2);
102 table.addFamily(fam);
103 try (Admin admin = conn.getAdmin()) {
104 admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
105 }
106 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
107 }
108
109
110
111
112 @After
113 public void tearDown() throws Exception {
114
115 }
116
117 @Test
118 public void testPreserveMetaCacheOnException() throws Exception {
119 Table table = conn.getTable(TABLE_NAME);
120 byte[] row = badRS.getOnlineRegions(TABLE_NAME).get(0).getRegionInfo().getStartKey();
121
122 Put put = new Put(row);
123 put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
124 Get get = new Get(row);
125 Append append = new Append(row);
126 append.add(FAMILY, QUALIFIER, Bytes.toBytes(11));
127 Increment increment = new Increment(row);
128 increment.addColumn(FAMILY, QUALIFIER, 10);
129 Delete delete = new Delete(row);
130 delete.addColumn(FAMILY, QUALIFIER);
131 RowMutations mutations = new RowMutations(row);
132 mutations.add(put);
133 mutations.add(delete);
134
135 Exception exp;
136 boolean success;
137 for (int i = 0; i < 50; i++) {
138 exp = null;
139 success =false;
140 try {
141 table.put(put);
142
143 success = true;
144 table.get(get);
145 table.append(append);
146 table.increment(increment);
147 table.delete(delete);
148 table.mutateRow(mutations);
149 } catch (IOException ex) {
150
151 if (ClientExceptionsUtil.isMetaClearingException(ex) || success) {
152 exp = ex;
153 }
154 }
155
156 if(exp != null && ClientExceptionsUtil.isMetaClearingException(exp)) {
157 assertNull(conn.getCachedLocation(TABLE_NAME, row));
158 } else if (success) {
159 assertNotNull(conn.getCachedLocation(TABLE_NAME, row));
160 }
161 }
162 }
163
164 public static List<Throwable> metaCachePreservingExceptions() {
165 return new ArrayList<Throwable>() {{
166 add(new RegionOpeningException(" "));
167 add(new RegionTooBusyException());
168 add(new ThrottlingException(" "));
169 add(new MultiActionResultTooLarge(" "));
170 add(new RetryImmediatelyException(" "));
171 add(new CallQueueTooBigException());
172 }};
173 }
174
175 protected static class RegionServerWithFakeRpcServices extends HRegionServer {
176
177 public RegionServerWithFakeRpcServices(Configuration conf, CoordinatedStateManager cp)
178 throws IOException, InterruptedException {
179 super(conf, cp);
180 }
181
182 @Override
183 protected RSRpcServices createRpcServices() throws IOException {
184 return new FakeRSRpcServices(this);
185 }
186 }
187
188 protected static class FakeRSRpcServices extends RSRpcServices {
189
190 private int numReqs = -1;
191 private int expCount = -1;
192 private List<Throwable> metaCachePreservingExceptions = metaCachePreservingExceptions();
193
194 public FakeRSRpcServices(HRegionServer rs) throws IOException {
195 super(rs);
196 }
197
198 @Override
199 public GetResponse get(final RpcController controller,
200 final ClientProtos.GetRequest request) throws ServiceException {
201 throwSomeExceptions();
202 return super.get(controller, request);
203 }
204
205 @Override
206 public ClientProtos.MutateResponse mutate(final RpcController controller,
207 final ClientProtos.MutateRequest request) throws ServiceException {
208 throwSomeExceptions();
209 return super.mutate(controller, request);
210 }
211
212 @Override
213 public ClientProtos.ScanResponse scan(final RpcController controller,
214 final ClientProtos.ScanRequest request) throws ServiceException {
215 throwSomeExceptions();
216 return super.scan(controller, request);
217 }
218
219
220
221
222
223
224 private void throwSomeExceptions() throws ServiceException {
225 numReqs++;
226
227
228 if (numReqs % 5 ==0) {
229 return;
230 } else if (numReqs % 5 == 1 || numReqs % 5 == 2) {
231 throw new ServiceException(new NotServingRegionException());
232 }
233
234
235
236
237 expCount++;
238 Throwable t = metaCachePreservingExceptions.get(
239 expCount % metaCachePreservingExceptions.size());
240 throw new ServiceException(t);
241 }
242 }
243 }