1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
27
28 import java.io.IOException;
29 import java.lang.reflect.Field;
30 import java.lang.reflect.Modifier;
31 import java.net.SocketTimeoutException;
32 import java.util.ArrayList;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Random;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.SynchronousQueue;
39 import java.util.concurrent.ThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.hadoop.hbase.HBaseConfiguration;
47 import org.apache.hadoop.hbase.HBaseTestingUtility;
48 import org.apache.hadoop.hbase.HConstants;
49 import org.apache.hadoop.hbase.HRegionInfo;
50 import org.apache.hadoop.hbase.HRegionLocation;
51 import org.apache.hadoop.hbase.MediumTests;
52 import org.apache.hadoop.hbase.ServerName;
53 import org.apache.hadoop.hbase.TableName;
54 import org.apache.hadoop.hbase.Waiter;
55 import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
56 import org.apache.hadoop.hbase.exceptions.DeserializationException;
57 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
58 import org.apache.hadoop.hbase.filter.Filter;
59 import org.apache.hadoop.hbase.filter.FilterBase;
60 import org.apache.hadoop.hbase.master.HMaster;
61 import org.apache.hadoop.hbase.regionserver.HRegion;
62 import org.apache.hadoop.hbase.regionserver.HRegionServer;
63 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
64 import org.apache.hadoop.hbase.util.Bytes;
65 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
66 import org.apache.hadoop.hbase.util.JVMClusterUtil;
67 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
68 import org.apache.hadoop.hbase.util.Threads;
69 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
70 import org.junit.AfterClass;
71 import org.junit.Assert;
72 import org.junit.BeforeClass;
73 import org.junit.Ignore;
74 import org.junit.Test;
75 import org.junit.experimental.categories.Category;
76
77 import com.google.common.collect.Lists;
78
79
80
81
82 @Category(MediumTests.class)
83 public class TestHCM {
84 private static final Log LOG = LogFactory.getLog(TestHCM.class);
85 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
86 private static final TableName TABLE_NAME =
87 TableName.valueOf("test");
88 private static final TableName TABLE_NAME1 =
89 TableName.valueOf("test1");
90 private static final TableName TABLE_NAME2 =
91 TableName.valueOf("test2");
92 private static final TableName TABLE_NAME3 =
93 TableName.valueOf("test3");
94 private static final TableName TABLE_NAME4 =
95 TableName.valueOf("test4");
96 private static final byte[] FAM_NAM = Bytes.toBytes("f");
97 private static final byte[] ROW = Bytes.toBytes("bbb");
98 private static final byte[] ROW_X = Bytes.toBytes("xxx");
99 private static Random _randy = new Random();
100
101 @BeforeClass
102 public static void setUpBeforeClass() throws Exception {
103 TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
104 TEST_UTIL.startMiniCluster(2);
105 }
106
107 @AfterClass public static void tearDownAfterClass() throws Exception {
108 TEST_UTIL.shutdownMiniCluster();
109 }
110
111
112 private static int getHConnectionManagerCacheSize(){
113 return HConnectionTestingUtility.getConnectionCount();
114 }
115
116 @Test
117 public void testClusterConnection() throws IOException {
118 ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1,
119 5, TimeUnit.SECONDS,
120 new SynchronousQueue<Runnable>(),
121 Threads.newDaemonThreadFactory("test-hcm"));
122
123 HConnection con1 = HConnectionManager.createConnection(TEST_UTIL.getConfiguration());
124 HConnection con2 = HConnectionManager.createConnection(TEST_UTIL.getConfiguration(), otherPool);
125
126 assertTrue(otherPool == ((HConnectionImplementation)con2).getCurrentBatchPool());
127
128 String tableName = "testClusterConnection";
129 TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
130 HTable t = (HTable)con1.getTable(tableName, otherPool);
131
132 assertNull("Internal Thread pool should be null", ((HConnectionImplementation)con1).getCurrentBatchPool());
133
134 assertTrue(otherPool == t.getPool());
135 t.close();
136
137 t = (HTable)con2.getTable(tableName);
138
139 assertTrue(otherPool == t.getPool());
140 t.close();
141
142 t = (HTable)con2.getTable(Bytes.toBytes(tableName));
143
144 assertTrue(otherPool == t.getPool());
145 t.close();
146
147 t = (HTable)con2.getTable(TableName.valueOf(tableName));
148
149 assertTrue(otherPool == t.getPool());
150 t.close();
151
152 t = (HTable)con1.getTable(tableName);
153 ExecutorService pool = ((HConnectionImplementation)con1).getCurrentBatchPool();
154
155 assertNotNull("An internal Thread pool should have been created", pool);
156
157 assertTrue(t.getPool() == pool);
158 t.close();
159
160 t = (HTable)con1.getTable(tableName);
161
162 assertTrue(t.getPool() == pool);
163 t.close();
164
165 con1.close();
166
167 assertTrue(pool.isShutdown());
168
169 con2.close();
170
171 assertFalse(otherPool.isShutdown());
172 otherPool.shutdownNow();
173 }
174
175 @Ignore ("Fails in IDEs: HBASE-9042") @Test(expected = RegionServerStoppedException.class)
176 public void testClusterStatus() throws Exception {
177 TableName tn =
178 TableName.valueOf("testClusterStatus");
179 byte[] cf = "cf".getBytes();
180 byte[] rk = "rk1".getBytes();
181
182 JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
183 rs.waitForServerOnline();
184 final ServerName sn = rs.getRegionServer().getServerName();
185
186 HTable t = TEST_UTIL.createTable(tn, cf);
187 TEST_UTIL.waitTableAvailable(tn.getName());
188
189 while(TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().
190 getRegionStates().isRegionsInTransition()){
191 Thread.sleep(1);
192 }
193 final HConnectionImplementation hci = (HConnectionImplementation)t.getConnection();
194 while (t.getRegionLocation(rk).getPort() != sn.getPort()){
195 TEST_UTIL.getHBaseAdmin().move(t.getRegionLocation(rk).getRegionInfo().
196 getEncodedNameAsBytes(), Bytes.toBytes(sn.toString()));
197 while(TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().
198 getRegionStates().isRegionsInTransition()){
199 Thread.sleep(1);
200 }
201 hci.clearRegionCache(tn);
202 }
203 Assert.assertNotNull(hci.clusterStatusListener);
204 TEST_UTIL.assertRegionOnServer(t.getRegionLocation(rk).getRegionInfo(), sn, 20000);
205
206 Put p1 = new Put(rk);
207 p1.add(cf, "qual".getBytes(), "val".getBytes());
208 t.put(p1);
209
210 rs.getRegionServer().abort("I'm dead");
211
212
213 TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
214 @Override
215 public boolean evaluate() throws Exception {
216 return TEST_UTIL.getHBaseCluster().getMaster().getServerManager().
217 getDeadServers().isDeadServer(sn);
218 }
219 });
220
221 TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
222 @Override
223 public boolean evaluate() throws Exception {
224 return hci.clusterStatusListener.isDeadServer(sn);
225 }
226 });
227
228 t.close();
229 hci.getClient(sn);
230 }
231
232
233
234
235
236
237 @Test
238 public void testConnectionCut() throws Exception {
239 String tableName = "HCM-testConnectionCut";
240
241 TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
242 boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
243
244 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
245
246 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
247 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
248 c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000);
249
250 HTable table = new HTable(c2, tableName);
251
252 Put p = new Put(FAM_NAM);
253 p.add(FAM_NAM, FAM_NAM, FAM_NAM);
254 table.put(p);
255
256 final HConnectionImplementation hci = (HConnectionImplementation)table.getConnection();
257 final HRegionLocation loc = table.getRegionLocation(FAM_NAM);
258
259 Get get = new Get(FAM_NAM);
260 Assert.assertNotNull(table.get(get));
261
262 get = new Get(FAM_NAM);
263 get.setFilter(new BlockingFilter());
264
265
266 Thread t = new Thread() {
267 @Override
268 public void run() {
269 synchronized (syncBlockingFilter) {
270 try {
271 syncBlockingFilter.wait();
272 } catch (InterruptedException e) {
273 throw new RuntimeException(e);
274 }
275 }
276 hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName());
277 }
278 };
279
280 t.start();
281 try {
282 table.get(get);
283 Assert.fail();
284 } catch (IOException expected) {
285 LOG.debug("Received: " + expected);
286 Assert.assertFalse(expected instanceof SocketTimeoutException);
287 Assert.assertFalse(syncBlockingFilter.get());
288 } finally {
289 syncBlockingFilter.set(true);
290 t.join();
291 HConnectionManager.getConnection(c2).close();
292 TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
293 }
294
295 table.close();
296 }
297
298 protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false);
299
300 public static class BlockingFilter extends FilterBase {
301 @Override
302 public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
303 int i = 0;
304 while (i++ < 1000 && !syncBlockingFilter.get()) {
305 synchronized (syncBlockingFilter) {
306 syncBlockingFilter.notifyAll();
307 }
308 Threads.sleep(100);
309 }
310 syncBlockingFilter.set(true);
311 return false;
312 }
313
314 public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException{
315 return new BlockingFilter();
316 }
317 }
318
319 @Test
320 public void abortingHConnectionRemovesItselfFromHCM() throws Exception {
321
322 Map<HConnectionKey, HConnectionImplementation> oldHBaseInstances =
323 new HashMap<HConnectionKey, HConnectionImplementation>();
324 oldHBaseInstances.putAll(HConnectionManager.CONNECTION_INSTANCES);
325
326 HConnectionManager.CONNECTION_INSTANCES.clear();
327
328 try {
329 HConnection connection = HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
330 connection.abort("test abortingHConnectionRemovesItselfFromHCM", new Exception(
331 "test abortingHConnectionRemovesItselfFromHCM"));
332 Assert.assertNotSame(connection,
333 HConnectionManager.getConnection(TEST_UTIL.getConfiguration()));
334 } finally {
335
336 HConnectionManager.CONNECTION_INSTANCES.clear();
337 HConnectionManager.CONNECTION_INSTANCES.putAll(oldHBaseInstances);
338 }
339 }
340
341
342
343
344
345
346 @Test
347 public void testRegionCaching() throws Exception{
348 TEST_UTIL.createTable(TABLE_NAME, FAM_NAM).close();
349 Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
350 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
351 HTable table = new HTable(conf, TABLE_NAME);
352
353 TEST_UTIL.createMultiRegions(table, FAM_NAM);
354 TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
355 Put put = new Put(ROW);
356 put.add(FAM_NAM, ROW, ROW);
357 table.put(put);
358 HConnectionManager.HConnectionImplementation conn =
359 (HConnectionManager.HConnectionImplementation)table.getConnection();
360
361 assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
362
363 final int nextPort = conn.getCachedLocation(TABLE_NAME, ROW).getPort() + 1;
364 HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW);
365 conn.updateCachedLocation(loc.getRegionInfo(), loc, ServerName.valueOf("127.0.0.1", nextPort,
366 HConstants.LATEST_TIMESTAMP), HConstants.LATEST_TIMESTAMP);
367 Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW).getPort(), nextPort);
368
369 conn.forceDeleteCachedLocation(TABLE_NAME, ROW.clone());
370 HRegionLocation rl = conn.getCachedLocation(TABLE_NAME, ROW);
371 assertNull("What is this location?? " + rl, rl);
372
373
374
375 conn.clearRegionCache(TABLE_NAME);
376 Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME));
377 Put put2 = new Put(ROW);
378 put2.add(FAM_NAM, ROW, ROW);
379 table.put(put2);
380 assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
381 assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone()));
382
383 TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
384 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
385
386
387 while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
388 Thread.sleep(1);
389 }
390
391
392 HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW);
393 byte[] regionName = toMove.getRegionInfo().getRegionName();
394 byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
395
396
397 int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
398 int destServerId = (curServerId == 0 ? 1 : 0);
399
400 HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
401 HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
402
403 ServerName destServerName = destServer.getServerName();
404
405
406 Assert.assertTrue(curServer != destServer);
407 Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName()));
408 Assert.assertFalse( toMove.getPort() == destServerName.getPort());
409 Assert.assertNotNull(curServer.getOnlineRegion(regionName));
410 Assert.assertNull(destServer.getOnlineRegion(regionName));
411 Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
412 getAssignmentManager().getRegionStates().isRegionsInTransition());
413
414
415
416 LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
417 TEST_UTIL.getHBaseAdmin().move(
418 toMove.getRegionInfo().getEncodedNameAsBytes(),
419 destServerName.getServerName().getBytes()
420 );
421
422 while (destServer.getOnlineRegion(regionName) == null ||
423 destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
424 curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
425 master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
426
427 Thread.sleep(1);
428 }
429
430 LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
431
432
433 Assert.assertNull(curServer.getOnlineRegion(regionName));
434 Assert.assertNotNull(destServer.getOnlineRegion(regionName));
435 Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
436 Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
437
438
439
440 Assert.assertFalse(
441 conn.getCachedLocation(TABLE_NAME, ROW).getPort() == destServerName.getPort());
442
443
444
445 LOG.info("Put starting");
446 Put put3 = new Put(ROW);
447 put3.add(FAM_NAM, ROW, ROW);
448 try {
449 table.put(put3);
450 Assert.fail("Unreachable point");
451 } catch (RetriesExhaustedWithDetailsException e){
452 LOG.info("Put done, exception caught: " + e.getClass());
453 Assert.assertEquals(1, e.getNumExceptions());
454 Assert.assertEquals(1, e.getCauses().size());
455 Assert.assertArrayEquals(e.getRow(0).getRow(), ROW);
456
457
458 Throwable cause = HConnectionManager.findException(e.getCause(0));
459 Assert.assertNotNull(cause);
460 Assert.assertTrue(cause instanceof RegionMovedException);
461 }
462 Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW));
463 Assert.assertEquals(
464 "Previous server was " + curServer.getServerName().getHostAndPort(),
465 destServerName.getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getPort());
466
467 Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
468 Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
469
470
471 LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
472 TEST_UTIL.getHBaseAdmin().move(
473 toMove.getRegionInfo().getEncodedNameAsBytes(),
474 curServer.getServerName().getServerName().getBytes()
475 );
476
477 while (curServer.getOnlineRegion(regionName) == null ||
478 destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
479 curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
480 master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
481
482 Thread.sleep(1);
483 }
484
485
486 Assert.assertNotNull(curServer.getOnlineRegion(regionName));
487 Assert.assertNull(destServer.getOnlineRegion(regionName));
488 LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
489
490
491 Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getPort() ==
492 curServer.getServerName().getPort());
493
494 Scan sc = new Scan();
495 sc.setStopRow(ROW);
496 sc.setStartRow(ROW);
497
498
499
500 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
501
502 try {
503 ResultScanner rs = table.getScanner(sc);
504 while (rs.next() != null) {
505 }
506 Assert.fail("Unreachable point");
507 } catch (RetriesExhaustedException e) {
508 LOG.info("Scan done, expected exception caught: " + e.getClass());
509 }
510
511
512 Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
513 Assert.assertEquals(
514 "Previous server was "+destServer.getServerName().getHostAndPort(),
515 curServer.getServerName().getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getPort());
516
517 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
518 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
519 table.close();
520 }
521
522
523
524
525
526 @Test
527 public void testConnectionManagement() throws Exception{
528 HTable table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM);
529 HConnection conn = HConnectionManager.createConnection(TEST_UTIL.getConfiguration());
530 HTableInterface table = conn.getTable(TABLE_NAME1.getName());
531 table.close();
532 assertFalse(conn.isClosed());
533 assertFalse(((HTable)table).getPool().isShutdown());
534 table = conn.getTable(TABLE_NAME1.getName());
535 table.close();
536 assertFalse(((HTable)table).getPool().isShutdown());
537 conn.close();
538 assertTrue(((HTable)table).getPool().isShutdown());
539 table0.close();
540 }
541
542
543
544
545 @Test(timeout = 60000)
546 public void testCacheSeqNums() throws Exception{
547 HTable table = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAM);
548 TEST_UTIL.createMultiRegions(table, FAM_NAM);
549 Put put = new Put(ROW);
550 put.add(FAM_NAM, ROW, ROW);
551 table.put(put);
552 HConnectionManager.HConnectionImplementation conn =
553 (HConnectionManager.HConnectionImplementation)table.getConnection();
554
555 HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW);
556 assertNotNull(location);
557
558 HRegionLocation anySource = new HRegionLocation(location.getRegionInfo(), ServerName.valueOf(
559 location.getHostname(), location.getPort() - 1, 0L));
560
561
562 int nextPort = location.getPort() + 1;
563 conn.updateCachedLocation(location.getRegionInfo(), location,
564 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
565 location = conn.getCachedLocation(TABLE_NAME2, ROW);
566 Assert.assertEquals(nextPort, location.getPort());
567
568
569 nextPort = location.getPort() + 1;
570 conn.updateCachedLocation(location.getRegionInfo(), location,
571 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
572 location = conn.getCachedLocation(TABLE_NAME2, ROW);
573 Assert.assertEquals(nextPort, location.getPort());
574
575
576 nextPort = location.getPort() + 1;
577 conn.updateCachedLocation(location.getRegionInfo(), anySource,
578 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1);
579 location = conn.getCachedLocation(TABLE_NAME2, ROW);
580 Assert.assertEquals(nextPort, location.getPort());
581
582
583 nextPort = location.getPort() + 1;
584 conn.updateCachedLocation(location.getRegionInfo(), anySource,
585 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
586 location = conn.getCachedLocation(TABLE_NAME2, ROW);
587 Assert.assertEquals(nextPort - 1, location.getPort());
588 table.close();
589 }
590
591
592
593
594
595 @Test
596 public void testConnectionSameness() throws Exception {
597 HConnection previousConnection = null;
598 for (int i = 0; i < 2; i++) {
599
600 Configuration configuration = TEST_UTIL.getConfiguration();
601 configuration.set("some_key", String.valueOf(_randy.nextInt()));
602 LOG.info("The hash code of the current configuration is: "
603 + configuration.hashCode());
604 HConnection currentConnection = HConnectionManager
605 .getConnection(configuration);
606 if (previousConnection != null) {
607 assertTrue(
608 "Did not get the same connection even though its key didn't change",
609 previousConnection == currentConnection);
610 }
611 previousConnection = currentConnection;
612
613
614
615
616 configuration.set("other_key", String.valueOf(_randy.nextInt()));
617 }
618 }
619
620
621
622
623
624
625 @Test
626 public void testConnectionUniqueness() throws Exception {
627 int zkmaxconnections = TEST_UTIL.getConfiguration().
628 getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
629 HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS);
630
631
632
633 int maxConnections = Math.min(zkmaxconnections - 1, 20);
634 List<HConnection> connections = new ArrayList<HConnection>(maxConnections);
635 HConnection previousConnection = null;
636 try {
637 for (int i = 0; i < maxConnections; i++) {
638
639 Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
640 configuration.set("some_key", String.valueOf(_randy.nextInt()));
641 configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
642 String.valueOf(_randy.nextInt()));
643 LOG.info("The hash code of the current configuration is: "
644 + configuration.hashCode());
645 HConnection currentConnection =
646 HConnectionManager.getConnection(configuration);
647 if (previousConnection != null) {
648 assertTrue("Got the same connection even though its key changed!",
649 previousConnection != currentConnection);
650 }
651
652
653
654
655 configuration.set("other_key", String.valueOf(_randy.nextInt()));
656
657 previousConnection = currentConnection;
658 LOG.info("The current HConnectionManager#HBASE_INSTANCES cache size is: "
659 + getHConnectionManagerCacheSize());
660 Thread.sleep(50);
661 connections.add(currentConnection);
662 }
663 } finally {
664 for (HConnection c: connections) {
665
666 HConnectionManager.deleteConnection(c.getConfiguration());
667 }
668 }
669 }
670
671 @Test
672 public void testClosing() throws Exception {
673 Configuration configuration =
674 new Configuration(TEST_UTIL.getConfiguration());
675 configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
676 String.valueOf(_randy.nextInt()));
677
678 HConnection c1 = HConnectionManager.createConnection(configuration);
679
680 HConnection c2 = HConnectionManager.createConnection(configuration);
681
682 HConnection c3 = HConnectionManager.getConnection(configuration);
683 HConnection c4 = HConnectionManager.getConnection(configuration);
684 assertTrue(c3 == c4);
685
686 c1.close();
687 assertTrue(c1.isClosed());
688 assertFalse(c2.isClosed());
689 assertFalse(c3.isClosed());
690
691 c3.close();
692
693 assertFalse(c3.isClosed());
694 c3.close();
695 assertTrue(c3.isClosed());
696
697 HConnection c5 = HConnectionManager.getConnection(configuration);
698 assertTrue(c5 != c3);
699
700 assertFalse(c2.isClosed());
701 c2.close();
702 assertTrue(c2.isClosed());
703 c5.close();
704 assertTrue(c5.isClosed());
705 }
706
707
708
709
710
711 @Test
712 public void testCreateConnection() throws Exception {
713 Configuration configuration = TEST_UTIL.getConfiguration();
714 HConnection c1 = HConnectionManager.createConnection(configuration);
715 HConnection c2 = HConnectionManager.createConnection(configuration);
716
717 assertTrue(c1 != c2);
718 assertTrue(c1.getConfiguration() == c2.getConfiguration());
719
720 HConnection c3 = HConnectionManager.getConnection(configuration);
721 assertTrue(c1 != c3);
722 assertTrue(c2 != c3);
723 }
724
725
726
727
728
729
730
731 @Test(timeout = 60000)
732 public void testConnection() throws Exception{
733
734 Configuration c = new Configuration();
735 c.set(HConstants.ZOOKEEPER_QUORUM,
736 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
737 c.set(HConstants.ZOOKEEPER_CLIENT_PORT ,
738 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
739
740
741 HConnection conn = HConnectionManager.getConnection(c);
742 assertTrue( conn.isMasterRunning() );
743 conn.close();
744 }
745
746 private int setNumTries(HConnectionImplementation hci, int newVal) throws Exception {
747 Field numTries = hci.getClass().getDeclaredField("numTries");
748 numTries.setAccessible(true);
749 Field modifiersField = Field.class.getDeclaredField("modifiers");
750 modifiersField.setAccessible(true);
751 modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
752 final int prevNumRetriesVal = (Integer)numTries.get(hci);
753 numTries.set(hci, newVal);
754
755 return prevNumRetriesVal;
756 }
757
758 @Test
759 public void testMulti() throws Exception {
760 HTable table = TEST_UTIL.createTable(TABLE_NAME3, FAM_NAM);
761 TEST_UTIL.createMultiRegions(table, FAM_NAM);
762 HConnectionManager.HConnectionImplementation conn =
763 (HConnectionManager.HConnectionImplementation)
764 HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
765
766
767
768 conn.clearRegionCache(TABLE_NAME3);
769 Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
770
771 TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
772 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
773
774
775 while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
776 Thread.sleep(1);
777 }
778
779 Put put = new Put(ROW_X);
780 put.add(FAM_NAM, ROW_X, ROW_X);
781 table.put(put);
782
783
784 HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X);
785 byte[] regionName = toMove.getRegionInfo().getRegionName();
786 byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
787
788
789 int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
790 int destServerId = (curServerId == 0 ? 1 : 0);
791
792 HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
793 HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
794
795 ServerName destServerName = destServer.getServerName();
796
797
798 List<HRegion> regions = curServer.getOnlineRegions(TABLE_NAME3);
799 byte[] otherRow = null;
800 for (HRegion region : regions) {
801 if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
802 && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) {
803 otherRow = region.getRegionInfo().getStartKey();
804 break;
805 }
806 }
807 assertNotNull(otherRow);
808
809 if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
810 Put put2 = new Put(otherRow);
811 put2.add(FAM_NAM, otherRow, otherRow);
812 table.put(put2);
813
814
815 Assert.assertTrue(curServer != destServer);
816 Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
817 Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
818 Assert.assertNotNull(curServer.getOnlineRegion(regionName));
819 Assert.assertNull(destServer.getOnlineRegion(regionName));
820 Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
821 getAssignmentManager().getRegionStates().isRegionsInTransition());
822
823
824
825 LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
826 TEST_UTIL.getHBaseAdmin().move(
827 toMove.getRegionInfo().getEncodedNameAsBytes(),
828 destServerName.getServerName().getBytes()
829 );
830
831 while (destServer.getOnlineRegion(regionName) == null ||
832 destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
833 curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
834 master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
835
836 Thread.sleep(1);
837 }
838
839 LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
840
841
842 Assert.assertNull(curServer.getOnlineRegion(regionName));
843 Assert.assertNotNull(destServer.getOnlineRegion(regionName));
844 Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
845 Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
846
847
848
849 Assert.assertFalse(
850 conn.getCachedLocation(TABLE_NAME3, ROW_X).getPort() == destServerName.getPort());
851
852
853 final int prevNumRetriesVal = setNumTries(conn, 2);
854
855 Put put3 = new Put(ROW_X);
856 put3.add(FAM_NAM, ROW_X, ROW_X);
857 Put put4 = new Put(otherRow);
858 put4.add(FAM_NAM, otherRow, otherRow);
859
860
861 table.batch(Lists.newArrayList(put4, put3));
862
863
864 setNumTries(conn, prevNumRetriesVal);
865 table.close();
866 conn.close();
867 }
868
869 @Ignore ("Test presumes RETRY_BACKOFF will never change; it has") @Test
870 public void testErrorBackoffTimeCalculation() throws Exception {
871
872 final long ANY_PAUSE = 100;
873 HRegionInfo ri = new HRegionInfo(TABLE_NAME);
874 HRegionLocation location = new HRegionLocation(ri, ServerName.valueOf("127.0.0.1", 1, 0));
875 HRegionLocation diffLocation = new HRegionLocation(ri, ServerName.valueOf("127.0.0.1", 2, 0));
876
877 ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
878 EnvironmentEdgeManager.injectEdge(timeMachine);
879 try {
880 long timeBase = timeMachine.currentTimeMillis();
881 long largeAmountOfTime = ANY_PAUSE * 1000;
882 HConnectionManager.ServerErrorTracker tracker =
883 new HConnectionManager.ServerErrorTracker(largeAmountOfTime, 100);
884
885
886 assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
887
888
889 tracker.reportServerError(location);
890 assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(location, ANY_PAUSE));
891 tracker.reportServerError(location);
892 tracker.reportServerError(location);
893 tracker.reportServerError(location);
894 assertEqualsWithJitter(ANY_PAUSE * 5, tracker.calculateBackoffTime(location, ANY_PAUSE));
895
896
897
898 assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
899 tracker.reportServerError(diffLocation);
900 assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
901
902
903 HRegionInfo ri2 = new HRegionInfo(TABLE_NAME2);
904 HRegionLocation diffRegion = new HRegionLocation(ri2, location.getServerName());
905 assertEqualsWithJitter(ANY_PAUSE * 5, tracker.calculateBackoffTime(diffRegion, ANY_PAUSE));
906
907
908 assertEqualsWithJitter(ANY_PAUSE * 10,
909 tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
910
911
912
913 long timeShift = (long)(ANY_PAUSE * 0.5);
914 timeMachine.setValue(timeBase + timeShift);
915 assertEqualsWithJitter((ANY_PAUSE * 5) - timeShift,
916 tracker.calculateBackoffTime(location, ANY_PAUSE), ANY_PAUSE * 2);
917
918
919 timeMachine.setValue(timeBase + ANY_PAUSE * 100);
920 assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
921
922
923 long timeLeft = (long)(ANY_PAUSE * 0.5);
924 timeMachine.setValue(timeBase + largeAmountOfTime - timeLeft);
925 assertTrue(tracker.canRetryMore(1));
926 tracker.reportServerError(location);
927 assertEquals(timeLeft, tracker.calculateBackoffTime(location, ANY_PAUSE));
928 timeMachine.setValue(timeBase + largeAmountOfTime);
929 assertFalse(tracker.canRetryMore(1));
930 } finally {
931 EnvironmentEdgeManager.reset();
932 }
933 }
934
935 private static void assertEqualsWithJitter(long expected, long actual) {
936 assertEqualsWithJitter(expected, actual, expected);
937 }
938
939 private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) {
940 assertTrue("Value not within jitter: " + expected + " vs " + actual,
941 Math.abs(actual - expected) <= (0.01f * jitterBase));
942 }
943
944
945
946
947
948
949
950
951
952 @Ignore ("Flakey test: See HBASE-8996")@Test
953 public void testDeleteForZKConnLeak() throws Exception {
954 TEST_UTIL.createTable(TABLE_NAME4, FAM_NAM);
955 final Configuration config = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
956 config.setInt("zookeeper.recovery.retry", 1);
957 config.setInt("zookeeper.recovery.retry.intervalmill", 1000);
958 config.setInt("hbase.rpc.timeout", 2000);
959 config.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
960
961 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10,
962 5, TimeUnit.SECONDS,
963 new SynchronousQueue<Runnable>(),
964 Threads.newDaemonThreadFactory("test-hcm-delete"));
965
966 pool.submit(new Runnable() {
967 @Override
968 public void run() {
969 while (!Thread.interrupted()) {
970 try {
971 HConnection conn = HConnectionManager.getConnection(config);
972 LOG.info("Connection " + conn);
973 HConnectionManager.deleteStaleConnection(conn);
974 LOG.info("Connection closed " + conn);
975
976
977
978 Threads.sleep(10);
979 } catch (Exception e) {
980 }
981 }
982 }
983 });
984
985
986 for (int i = 0; i < 30; i++) {
987 HConnection c1 = null;
988 try {
989 c1 = HConnectionManager.getConnection(config);
990 LOG.info("HTable connection " + i + " " + c1);
991 HTable table = new HTable(TABLE_NAME4, c1, pool);
992 table.close();
993 LOG.info("HTable connection " + i + " closed " + c1);
994 } catch (Exception e) {
995 LOG.info("We actually want this to happen!!!! So we can see if we are leaking zk", e);
996 } finally {
997 if (c1 != null) {
998 if (c1.isClosed()) {
999
1000 Field zkwField = c1.getClass().getDeclaredField("keepAliveZookeeper");
1001 zkwField.setAccessible(true);
1002 Object watcher = zkwField.get(c1);
1003
1004 if (watcher != null) {
1005 if (((ZooKeeperWatcher)watcher).getRecoverableZooKeeper().getState().isAlive()) {
1006
1007
1008 Thread.sleep(1000);
1009 if (((ZooKeeperWatcher) watcher).getRecoverableZooKeeper().getState().isAlive()) {
1010 pool.shutdownNow();
1011 fail("Live zookeeper in closed connection");
1012 }
1013 }
1014 }
1015 }
1016 c1.close();
1017 }
1018 }
1019 }
1020 pool.shutdownNow();
1021 }
1022 }
1023