1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.catalog;
20
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.net.ConnectException;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import junit.framework.Assert;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.Abortable;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HRegionInfo;
37 import org.apache.hadoop.hbase.HRegionLocation;
38 import org.apache.hadoop.hbase.MediumTests;
39 import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
40 import org.apache.hadoop.hbase.ServerName;
41 import org.apache.hadoop.hbase.client.HConnection;
42 import org.apache.hadoop.hbase.client.HConnectionManager;
43 import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
44 import org.apache.hadoop.hbase.client.Result;
45 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
46 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
47 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
48 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
49 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
50 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
51 import org.apache.hadoop.hbase.util.Threads;
52 import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
53 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
54 import org.apache.hadoop.util.Progressable;
55 import org.apache.zookeeper.KeeperException;
56 import org.junit.After;
57 import org.junit.AfterClass;
58 import org.junit.Before;
59 import org.junit.BeforeClass;
60 import org.junit.Test;
61 import org.junit.experimental.categories.Category;
62 import org.mockito.Mockito;
63
64 import com.google.protobuf.RpcController;
65 import com.google.protobuf.ServiceException;
66
67
68
69
70 @Category(MediumTests.class)
71 public class TestCatalogTracker {
72 private static final Log LOG = LogFactory.getLog(TestCatalogTracker.class);
73 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
74 private static final ServerName SN =
75 ServerName.valueOf("example.org", 1234, System.currentTimeMillis());
76 private ZooKeeperWatcher watcher;
77 private Abortable abortable;
78
79 @BeforeClass public static void beforeClass() throws Exception {
80
81 UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
82 UTIL.startMiniZKCluster();
83 }
84
85 @AfterClass public static void afterClass() throws IOException {
86 UTIL.getZkCluster().shutdown();
87 }
88
89 @Before public void before() throws IOException {
90 this.abortable = new Abortable() {
91 @Override
92 public void abort(String why, Throwable e) {
93 LOG.info(why, e);
94 }
95
96 @Override
97 public boolean isAborted() {
98 return false;
99 }
100 };
101 this.watcher = new ZooKeeperWatcher(UTIL.getConfiguration(),
102 this.getClass().getSimpleName(), this.abortable, true);
103 }
104
105 @After public void after() {
106 try {
107
108
109 MetaRegionTracker.deleteMetaLocation(this.watcher);
110 } catch (KeeperException e) {
111 LOG.warn("Unable to delete hbase:meta location", e);
112 }
113
114
115 HConnectionManager.deleteConnection(UTIL.getConfiguration());
116
117 this.watcher.close();
118 }
119
120 private CatalogTracker constructAndStartCatalogTracker(final HConnection c)
121 throws IOException, InterruptedException {
122 CatalogTracker ct = new CatalogTracker(this.watcher, UTIL.getConfiguration(),
123 c, this.abortable);
124 ct.start();
125 return ct;
126 }
127
128
129
130
131
132
133
134 @Test public void testThatIfMETAMovesWeAreNotified()
135 throws IOException, InterruptedException, KeeperException {
136 HConnection connection = Mockito.mock(HConnection.class);
137 constructAndStartCatalogTracker(connection);
138
139 MetaRegionTracker.setMetaLocation(this.watcher,
140 ServerName.valueOf("example.com", 1234, System.currentTimeMillis()));
141 }
142
143
144
145
146
147
148
149 @Test public void testInterruptWaitOnMeta()
150 throws IOException, InterruptedException, ServiceException {
151 final ClientProtos.ClientService.BlockingInterface client =
152 Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
153 HConnection connection = mockConnection(null, client);
154
155 Mockito.when(client.get((RpcController)Mockito.any(), (GetRequest)Mockito.any())).
156 thenReturn(GetResponse.newBuilder().build());
157 final CatalogTracker ct = constructAndStartCatalogTracker(connection);
158 ServerName meta = ct.getMetaLocation();
159 Assert.assertNull(meta);
160 Thread t = new Thread() {
161 @Override
162 public void run() {
163 try {
164 ct.waitForMeta();
165 } catch (InterruptedException e) {
166 throw new RuntimeException("Interrupted", e);
167 }
168 }
169 };
170 t.start();
171 while (!t.isAlive())
172 Threads.sleep(1);
173 Threads.sleep(1);
174 assertTrue(t.isAlive());
175 ct.stop();
176
177 t.join();
178 }
179
180 private void testVerifyMetaRegionLocationWithException(Exception ex)
181 throws IOException, InterruptedException, KeeperException, ServiceException {
182
183 final ClientProtos.ClientService.BlockingInterface implementation =
184 Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
185 HConnection connection = mockConnection(null, implementation);
186
187
188 Mockito.when(implementation.get((RpcController) Mockito.any(), (GetRequest) Mockito.any())).
189 thenThrow(new ServiceException(ex));
190
191 final CatalogTracker ct = constructAndStartCatalogTracker(connection);
192
193 MetaRegionTracker.setMetaLocation(this.watcher, SN);
194 long timeout = UTIL.getConfiguration().
195 getLong("hbase.catalog.verification.timeout", 1000);
196 Assert.assertFalse(ct.verifyMetaRegionLocation(timeout));
197 }
198
199
200
201
202
203
204
205
206 @Test
207 public void testGetMetaServerConnectionFails()
208 throws IOException, InterruptedException, KeeperException, ServiceException {
209 testVerifyMetaRegionLocationWithException(new ConnectException("Connection refused"));
210 }
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226 @Test
227 public void testVerifyMetaRegionServerNotRunning()
228 throws IOException, InterruptedException, KeeperException, ServiceException {
229 testVerifyMetaRegionLocationWithException(new ServerNotRunningYetException("mock"));
230 }
231
232
233
234
235
236
237
238
239 @Test
240 public void testVerifyMetaRegionLocationFails()
241 throws IOException, InterruptedException, KeeperException, ServiceException {
242 HConnection connection = Mockito.mock(HConnection.class);
243 ServiceException connectException =
244 new ServiceException(new ConnectException("Connection refused"));
245 final AdminProtos.AdminService.BlockingInterface implementation =
246 Mockito.mock(AdminProtos.AdminService.BlockingInterface.class);
247 Mockito.when(implementation.getRegionInfo((RpcController)Mockito.any(),
248 (GetRegionInfoRequest)Mockito.any())).thenThrow(connectException);
249 Mockito.when(connection.getAdmin(Mockito.any(ServerName.class), Mockito.anyBoolean())).
250 thenReturn(implementation);
251 final CatalogTracker ct = constructAndStartCatalogTracker(connection);
252
253 MetaRegionTracker.setMetaLocation(this.watcher,
254 ServerName.valueOf("example.com", 1234, System.currentTimeMillis()));
255 Assert.assertFalse(ct.verifyMetaRegionLocation(100));
256 }
257
258 @Test (expected = NotAllMetaRegionsOnlineException.class)
259 public void testTimeoutWaitForMeta()
260 throws IOException, InterruptedException {
261 HConnection connection = Mockito.mock(HConnection.class);
262 final CatalogTracker ct = constructAndStartCatalogTracker(connection);
263 ct.waitForMeta(100);
264 }
265
266
267
268
269
270
271
272 @Test public void testNoTimeoutWaitForMeta()
273 throws IOException, InterruptedException, KeeperException {
274 HConnection connection = Mockito.mock(HConnection.class);
275 final CatalogTracker ct = constructAndStartCatalogTracker(connection);
276 ServerName hsa = ct.getMetaLocation();
277 Assert.assertNull(hsa);
278
279
280 Thread t = new WaitOnMetaThread(ct);
281 startWaitAliveThenWaitItLives(t, 1);
282
283 hsa = setMetaLocation();
284
285 t.join();
286
287 Assert.assertTrue(ct.getMetaLocation().equals(hsa));
288 }
289
290 private ServerName setMetaLocation() throws KeeperException {
291 MetaRegionTracker.setMetaLocation(this.watcher, SN);
292 return SN;
293 }
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310 private HConnection mockConnection(final AdminProtos.AdminService.BlockingInterface admin,
311 final ClientProtos.ClientService.BlockingInterface client)
312 throws IOException {
313 HConnection connection =
314 HConnectionTestingUtility.getMockedConnection(UTIL.getConfiguration());
315 Mockito.doNothing().when(connection).close();
316
317 final HRegionLocation anyLocation =
318 new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, SN);
319 Mockito.when(connection.getRegionLocation((TableName) Mockito.any(),
320 (byte[]) Mockito.any(), Mockito.anyBoolean())).
321 thenReturn(anyLocation);
322 Mockito.when(connection.locateRegion((TableName) Mockito.any(),
323 (byte[]) Mockito.any())).
324 thenReturn(anyLocation);
325 if (admin != null) {
326
327 Mockito.when(connection.getAdmin(Mockito.any(ServerName.class))).
328 thenReturn(admin);
329 }
330 if (client != null) {
331
332 Mockito.when(connection.getClient(Mockito.any(ServerName.class))).
333 thenReturn(client);
334 }
335 return connection;
336 }
337
338
339
340
341
342
343 private Result getMetaTableRowResult() throws IOException {
344 return MetaMockingUtil.getMetaTableRowResult(HRegionInfo.FIRST_META_REGIONINFO, SN);
345 }
346
347 private void startWaitAliveThenWaitItLives(final Thread t, final int ms) {
348 t.start();
349 while(!t.isAlive()) {
350
351 }
352
353 Threads.sleep(ms);
354 Assert.assertTrue("Assert " + t.getName() + " still waiting", t.isAlive());
355 }
356
357 class CountingProgressable implements Progressable {
358 final AtomicInteger counter = new AtomicInteger(0);
359 @Override
360 public void progress() {
361 this.counter.incrementAndGet();
362 }
363 }
364
365
366
367
368 class WaitOnMetaThread extends Thread {
369 final CatalogTracker ct;
370
371 WaitOnMetaThread(final CatalogTracker ct) {
372 super("WaitOnMeta");
373 this.ct = ct;
374 }
375
376 @Override
377 public void run() {
378 try {
379 doWaiting();
380 } catch (InterruptedException e) {
381 throw new RuntimeException("Failed wait", e);
382 }
383 LOG.info("Exiting " + getName());
384 }
385
386 void doWaiting() throws InterruptedException {
387 try {
388 while (this.ct.waitForMeta(100) == null);
389 } catch (NotAllMetaRegionsOnlineException e) {
390
391 }
392 }
393 }
394
395 }