View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertNull;
25  import static org.junit.Assert.assertTrue;
26  import static org.junit.Assert.fail;
27  
28  import java.io.IOException;
29  import java.lang.reflect.Field;
30  import java.lang.reflect.Modifier;
31  import java.net.SocketTimeoutException;
32  import java.util.ArrayList;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.Random;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.SynchronousQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.hbase.HBaseConfiguration;
47  import org.apache.hadoop.hbase.HBaseTestingUtility;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.HRegionLocation;
51  import org.apache.hadoop.hbase.MediumTests;
52  import org.apache.hadoop.hbase.ServerName;
53  import org.apache.hadoop.hbase.TableName;
54  import org.apache.hadoop.hbase.Waiter;
55  import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
56  import org.apache.hadoop.hbase.exceptions.DeserializationException;
57  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
58  import org.apache.hadoop.hbase.filter.Filter;
59  import org.apache.hadoop.hbase.filter.FilterBase;
60  import org.apache.hadoop.hbase.master.HMaster;
61  import org.apache.hadoop.hbase.regionserver.HRegion;
62  import org.apache.hadoop.hbase.regionserver.HRegionServer;
63  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
64  import org.apache.hadoop.hbase.util.Bytes;
65  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
66  import org.apache.hadoop.hbase.util.JVMClusterUtil;
67  import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
68  import org.apache.hadoop.hbase.util.Threads;
69  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
70  import org.junit.AfterClass;
71  import org.junit.Assert;
72  import org.junit.BeforeClass;
73  import org.junit.Ignore;
74  import org.junit.Test;
75  import org.junit.experimental.categories.Category;
76  
77  import com.google.common.collect.Lists;
78  
79  /**
80   * This class is for testing HCM features
81   */
82  @Category(MediumTests.class)
83  public class TestHCM {
84    private static final Log LOG = LogFactory.getLog(TestHCM.class);
85    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
86    private static final TableName TABLE_NAME =
87        TableName.valueOf("test");
88    private static final TableName TABLE_NAME1 =
89        TableName.valueOf("test1");
90    private static final TableName TABLE_NAME2 =
91        TableName.valueOf("test2");
92    private static final TableName TABLE_NAME3 =
93        TableName.valueOf("test3");
94    private static final TableName TABLE_NAME4 =
95        TableName.valueOf("test4");
96    private static final byte[] FAM_NAM = Bytes.toBytes("f");
97    private static final byte[] ROW = Bytes.toBytes("bbb");
98    private static final byte[] ROW_X = Bytes.toBytes("xxx");
99    private static Random _randy = new Random();
100 
101   @BeforeClass
102   public static void setUpBeforeClass() throws Exception {
103     TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
104     TEST_UTIL.startMiniCluster(2);
105   }
106 
107   @AfterClass public static void tearDownAfterClass() throws Exception {
108     TEST_UTIL.shutdownMiniCluster();
109   }
110 
111 
112   private static int getHConnectionManagerCacheSize(){
113     return HConnectionTestingUtility.getConnectionCount();
114   }
115 
116   @Test
117   public void testClusterConnection() throws IOException {
118     ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1,
119         5, TimeUnit.SECONDS,
120         new SynchronousQueue<Runnable>(),
121         Threads.newDaemonThreadFactory("test-hcm"));
122 
123     HConnection con1 = HConnectionManager.createConnection(TEST_UTIL.getConfiguration());
124     HConnection con2 = HConnectionManager.createConnection(TEST_UTIL.getConfiguration(), otherPool);
125     // make sure the internally created ExecutorService is the one passed
126     assertTrue(otherPool == ((HConnectionImplementation)con2).getCurrentBatchPool());
127 
128     String tableName = "testClusterConnection";
129     TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
130     HTable t = (HTable)con1.getTable(tableName, otherPool);
131     // make sure passing a pool to the getTable does not trigger creation of an internal pool
132     assertNull("Internal Thread pool should be null", ((HConnectionImplementation)con1).getCurrentBatchPool());
133     // table should use the pool passed
134     assertTrue(otherPool == t.getPool());
135     t.close();
136 
137     t = (HTable)con2.getTable(tableName);
138     // table should use the connectin's internal pool
139     assertTrue(otherPool == t.getPool());
140     t.close();
141 
142     t = (HTable)con2.getTable(Bytes.toBytes(tableName));
143     // try other API too
144     assertTrue(otherPool == t.getPool());
145     t.close();
146 
147     t = (HTable)con2.getTable(TableName.valueOf(tableName));
148     // try other API too
149     assertTrue(otherPool == t.getPool());
150     t.close();
151 
152     t = (HTable)con1.getTable(tableName);
153     ExecutorService pool = ((HConnectionImplementation)con1).getCurrentBatchPool();
154     // make sure an internal pool was created
155     assertNotNull("An internal Thread pool should have been created", pool);
156     // and that the table is using it
157     assertTrue(t.getPool() == pool);
158     t.close();
159 
160     t = (HTable)con1.getTable(tableName);
161     // still using the *same* internal pool
162     assertTrue(t.getPool() == pool);
163     t.close();
164 
165     con1.close();
166     // if the pool was created on demand it should be closed upon connectin close
167     assertTrue(pool.isShutdown());
168 
169     con2.close();
170     // if the pool is passed, it is not closed
171     assertFalse(otherPool.isShutdown());
172     otherPool.shutdownNow();
173   }
174 
175   @Ignore ("Fails in IDEs: HBASE-9042") @Test(expected = RegionServerStoppedException.class)
176   public void testClusterStatus() throws Exception {
177     TableName tn =
178         TableName.valueOf("testClusterStatus");
179     byte[] cf = "cf".getBytes();
180     byte[] rk = "rk1".getBytes();
181 
182     JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
183     rs.waitForServerOnline();
184     final ServerName sn = rs.getRegionServer().getServerName();
185 
186     HTable t = TEST_UTIL.createTable(tn, cf);
187     TEST_UTIL.waitTableAvailable(tn.getName());
188 
189     while(TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().
190         getRegionStates().isRegionsInTransition()){
191       Thread.sleep(1);
192     }
193     final HConnectionImplementation hci =  (HConnectionImplementation)t.getConnection();
194     while (t.getRegionLocation(rk).getPort() != sn.getPort()){
195       TEST_UTIL.getHBaseAdmin().move(t.getRegionLocation(rk).getRegionInfo().
196           getEncodedNameAsBytes(), Bytes.toBytes(sn.toString()));
197       while(TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().
198           getRegionStates().isRegionsInTransition()){
199         Thread.sleep(1);
200       }
201       hci.clearRegionCache(tn);
202     }
203     Assert.assertNotNull(hci.clusterStatusListener);
204     TEST_UTIL.assertRegionOnServer(t.getRegionLocation(rk).getRegionInfo(), sn, 20000);
205 
206     Put p1 = new Put(rk);
207     p1.add(cf, "qual".getBytes(), "val".getBytes());
208     t.put(p1);
209 
210     rs.getRegionServer().abort("I'm dead");
211 
212     // We want the status to be updated. That's a least 10 second
213     TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
214       @Override
215       public boolean evaluate() throws Exception {
216         return TEST_UTIL.getHBaseCluster().getMaster().getServerManager().
217             getDeadServers().isDeadServer(sn);
218       }
219     });
220 
221     TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
222       @Override
223       public boolean evaluate() throws Exception {
224         return hci.clusterStatusListener.isDeadServer(sn);
225       }
226     });
227 
228     t.close();
229     hci.getClient(sn);  // will throw an exception: RegionServerStoppedException
230   }
231 
232   /**
233    * Test that the connection to the dead server is cut immediately when we receive the
234    *  notification.
235    * @throws Exception
236    */
237   @Test
238   public void testConnectionCut() throws Exception {
239     String tableName = "HCM-testConnectionCut";
240 
241     TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
242     boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
243 
244     Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
245     // We want to work on a separate connection.
246     c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
247     c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
248     c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000);
249 
250     HTable table = new HTable(c2, tableName);
251 
252     Put p = new Put(FAM_NAM);
253     p.add(FAM_NAM, FAM_NAM, FAM_NAM);
254     table.put(p);
255 
256     final HConnectionImplementation hci =  (HConnectionImplementation)table.getConnection();
257     final HRegionLocation loc = table.getRegionLocation(FAM_NAM);
258 
259     Get get = new Get(FAM_NAM);
260     Assert.assertNotNull(table.get(get));
261 
262     get = new Get(FAM_NAM);
263     get.setFilter(new BlockingFilter());
264 
265     // This thread will mark the server as dead while we're waiting during a get.
266     Thread t = new Thread() {
267       @Override
268       public void run() {
269         synchronized (syncBlockingFilter) {
270           try {
271             syncBlockingFilter.wait();
272           } catch (InterruptedException e) {
273             throw new RuntimeException(e);
274           }
275         }
276         hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName());
277       }
278     };
279 
280     t.start();
281     try {
282       table.get(get);
283       Assert.fail();
284     } catch (IOException expected) {
285       LOG.debug("Received: " + expected);
286       Assert.assertFalse(expected instanceof SocketTimeoutException);
287       Assert.assertFalse(syncBlockingFilter.get());
288     } finally {
289       syncBlockingFilter.set(true);
290       t.join();
291       HConnectionManager.getConnection(c2).close();
292       TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
293     }
294 
295     table.close();
296   }
297 
298   protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false);
299 
300   public static class BlockingFilter extends FilterBase {
301     @Override
302     public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
303       int i = 0;
304       while (i++ < 1000 && !syncBlockingFilter.get()) {
305         synchronized (syncBlockingFilter) {
306           syncBlockingFilter.notifyAll();
307         }
308         Threads.sleep(100);
309       }
310       syncBlockingFilter.set(true);
311       return false;
312     }
313 
314     public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException{
315       return new BlockingFilter();
316     }
317   }
318 
319   @Test
320   public void abortingHConnectionRemovesItselfFromHCM() throws Exception {
321     // Save off current HConnections
322     Map<HConnectionKey, HConnectionImplementation> oldHBaseInstances =
323         new HashMap<HConnectionKey, HConnectionImplementation>();
324     oldHBaseInstances.putAll(HConnectionManager.CONNECTION_INSTANCES);
325 
326     HConnectionManager.CONNECTION_INSTANCES.clear();
327 
328     try {
329       HConnection connection = HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
330       connection.abort("test abortingHConnectionRemovesItselfFromHCM", new Exception(
331           "test abortingHConnectionRemovesItselfFromHCM"));
332       Assert.assertNotSame(connection,
333         HConnectionManager.getConnection(TEST_UTIL.getConfiguration()));
334     } finally {
335       // Put original HConnections back
336       HConnectionManager.CONNECTION_INSTANCES.clear();
337       HConnectionManager.CONNECTION_INSTANCES.putAll(oldHBaseInstances);
338     }
339   }
340 
341   /**
342    * Test that when we delete a location using the first row of a region
343    * that we really delete it.
344    * @throws Exception
345    */
346   @Test
347   public void testRegionCaching() throws Exception{
348     TEST_UTIL.createTable(TABLE_NAME, FAM_NAM).close();
349     Configuration conf =  new Configuration(TEST_UTIL.getConfiguration());
350     conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
351     HTable table = new HTable(conf, TABLE_NAME);
352 
353     TEST_UTIL.createMultiRegions(table, FAM_NAM);
354     TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
355     Put put = new Put(ROW);
356     put.add(FAM_NAM, ROW, ROW);
357     table.put(put);
358     HConnectionManager.HConnectionImplementation conn =
359       (HConnectionManager.HConnectionImplementation)table.getConnection();
360 
361     assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
362 
363     final int nextPort = conn.getCachedLocation(TABLE_NAME, ROW).getPort() + 1;
364     HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW);
365     conn.updateCachedLocation(loc.getRegionInfo(), loc, ServerName.valueOf("127.0.0.1", nextPort,
366         HConstants.LATEST_TIMESTAMP), HConstants.LATEST_TIMESTAMP);
367     Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW).getPort(), nextPort);
368 
369     conn.forceDeleteCachedLocation(TABLE_NAME, ROW.clone());
370     HRegionLocation rl = conn.getCachedLocation(TABLE_NAME, ROW);
371     assertNull("What is this location?? " + rl, rl);
372 
373     // We're now going to move the region and check that it works for the client
374     // First a new put to add the location in the cache
375     conn.clearRegionCache(TABLE_NAME);
376     Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME));
377     Put put2 = new Put(ROW);
378     put2.add(FAM_NAM, ROW, ROW);
379     table.put(put2);
380     assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
381     assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone()));
382 
383     TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
384     HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
385 
386     // We can wait for all regions to be online, that makes log reading easier when debugging
387     while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
388       Thread.sleep(1);
389     }
390 
391     // Now moving the region to the second server
392     HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW);
393     byte[] regionName = toMove.getRegionInfo().getRegionName();
394     byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
395 
396     // Choose the other server.
397     int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
398     int destServerId = (curServerId == 0 ? 1 : 0);
399 
400     HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
401     HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
402 
403     ServerName destServerName = destServer.getServerName();
404 
405     // Check that we are in the expected state
406     Assert.assertTrue(curServer != destServer);
407     Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName()));
408     Assert.assertFalse( toMove.getPort() == destServerName.getPort());
409     Assert.assertNotNull(curServer.getOnlineRegion(regionName));
410     Assert.assertNull(destServer.getOnlineRegion(regionName));
411     Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
412         getAssignmentManager().getRegionStates().isRegionsInTransition());
413 
414     // Moving. It's possible that we don't have all the regions online at this point, so
415     //  the test must depends only on the region we're looking at.
416     LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
417     TEST_UTIL.getHBaseAdmin().move(
418       toMove.getRegionInfo().getEncodedNameAsBytes(),
419       destServerName.getServerName().getBytes()
420     );
421 
422     while (destServer.getOnlineRegion(regionName) == null ||
423         destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
424         curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
425         master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
426       // wait for the move to be finished
427       Thread.sleep(1);
428     }
429 
430     LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
431 
432     // Check our new state.
433     Assert.assertNull(curServer.getOnlineRegion(regionName));
434     Assert.assertNotNull(destServer.getOnlineRegion(regionName));
435     Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
436     Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
437 
438 
439     // Cache was NOT updated and points to the wrong server
440     Assert.assertFalse(
441         conn.getCachedLocation(TABLE_NAME, ROW).getPort() == destServerName.getPort());
442 
443     // This part relies on a number of tries equals to 1.
444     // We do a put and expect the cache to be updated, even if we don't retry
445     LOG.info("Put starting");
446     Put put3 = new Put(ROW);
447     put3.add(FAM_NAM, ROW, ROW);
448     try {
449       table.put(put3);
450       Assert.fail("Unreachable point");
451     } catch (RetriesExhaustedWithDetailsException e){
452       LOG.info("Put done, exception caught: " + e.getClass());
453       Assert.assertEquals(1, e.getNumExceptions());
454       Assert.assertEquals(1, e.getCauses().size());
455       Assert.assertArrayEquals(e.getRow(0).getRow(), ROW);
456 
457       // Check that we unserialized the exception as expected
458       Throwable cause = HConnectionManager.findException(e.getCause(0));
459       Assert.assertNotNull(cause);
460       Assert.assertTrue(cause instanceof RegionMovedException);
461     }
462     Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW));
463     Assert.assertEquals(
464         "Previous server was " + curServer.getServerName().getHostAndPort(),
465         destServerName.getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getPort());
466 
467     Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
468     Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
469 
470     // We move it back to do another test with a scan
471     LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
472     TEST_UTIL.getHBaseAdmin().move(
473       toMove.getRegionInfo().getEncodedNameAsBytes(),
474       curServer.getServerName().getServerName().getBytes()
475     );
476 
477     while (curServer.getOnlineRegion(regionName) == null ||
478         destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
479         curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
480         master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
481       // wait for the move to be finished
482       Thread.sleep(1);
483     }
484 
485     // Check our new state.
486     Assert.assertNotNull(curServer.getOnlineRegion(regionName));
487     Assert.assertNull(destServer.getOnlineRegion(regionName));
488     LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
489 
490     // Cache was NOT updated and points to the wrong server
491     Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getPort() ==
492       curServer.getServerName().getPort());
493 
494     Scan sc = new Scan();
495     sc.setStopRow(ROW);
496     sc.setStartRow(ROW);
497 
498     // The scanner takes the max retries from the connection configuration, not the table as
499     // the put.
500     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
501 
502     try {
503       ResultScanner rs = table.getScanner(sc);
504       while (rs.next() != null) {
505       }
506       Assert.fail("Unreachable point");
507     } catch (RetriesExhaustedException e) {
508       LOG.info("Scan done, expected exception caught: " + e.getClass());
509     }
510 
511     // Cache is updated with the right value.
512     Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
513     Assert.assertEquals(
514       "Previous server was "+destServer.getServerName().getHostAndPort(),
515       curServer.getServerName().getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getPort());
516 
517     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
518         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
519     table.close();
520   }
521 
522   /**
523    * Test that Connection or Pool are not closed when managed externally
524    * @throws Exception
525    */
526   @Test
527   public void testConnectionManagement() throws Exception{
528     HTable table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM);
529     HConnection conn = HConnectionManager.createConnection(TEST_UTIL.getConfiguration());
530     HTableInterface table = conn.getTable(TABLE_NAME1.getName());
531     table.close();
532     assertFalse(conn.isClosed());
533     assertFalse(((HTable)table).getPool().isShutdown());
534     table = conn.getTable(TABLE_NAME1.getName());
535     table.close();
536     assertFalse(((HTable)table).getPool().isShutdown());
537     conn.close();
538     assertTrue(((HTable)table).getPool().isShutdown());
539     table0.close();
540   }
541 
542   /**
543    * Test that stale cache updates don't override newer cached values.
544    */
545   @Test(timeout = 60000)
546   public void testCacheSeqNums() throws Exception{
547     HTable table = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAM);
548     TEST_UTIL.createMultiRegions(table, FAM_NAM);
549     Put put = new Put(ROW);
550     put.add(FAM_NAM, ROW, ROW);
551     table.put(put);
552     HConnectionManager.HConnectionImplementation conn =
553       (HConnectionManager.HConnectionImplementation)table.getConnection();
554 
555     HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW);
556     assertNotNull(location);
557 
558     HRegionLocation anySource = new HRegionLocation(location.getRegionInfo(), ServerName.valueOf(
559         location.getHostname(), location.getPort() - 1, 0L));
560 
561     // Same server as already in cache reporting - overwrites any value despite seqNum.
562     int nextPort = location.getPort() + 1;
563     conn.updateCachedLocation(location.getRegionInfo(), location,
564         ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
565     location = conn.getCachedLocation(TABLE_NAME2, ROW);
566     Assert.assertEquals(nextPort, location.getPort());
567 
568     // No source specified - same.
569     nextPort = location.getPort() + 1;
570     conn.updateCachedLocation(location.getRegionInfo(), location,
571         ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
572     location = conn.getCachedLocation(TABLE_NAME2, ROW);
573     Assert.assertEquals(nextPort, location.getPort());
574 
575     // Higher seqNum - overwrites lower seqNum.
576     nextPort = location.getPort() + 1;
577     conn.updateCachedLocation(location.getRegionInfo(), anySource,
578         ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1);
579     location = conn.getCachedLocation(TABLE_NAME2, ROW);
580     Assert.assertEquals(nextPort, location.getPort());
581 
582     // Lower seqNum - does not overwrite higher seqNum.
583     nextPort = location.getPort() + 1;
584     conn.updateCachedLocation(location.getRegionInfo(), anySource,
585         ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
586     location = conn.getCachedLocation(TABLE_NAME2, ROW);
587     Assert.assertEquals(nextPort - 1, location.getPort());
588     table.close();
589   }
590 
591   /**
592    * Make sure that {@link Configuration} instances that are essentially the
593    * same map to the same {@link HConnection} instance.
594    */
595   @Test
596   public void testConnectionSameness() throws Exception {
597     HConnection previousConnection = null;
598     for (int i = 0; i < 2; i++) {
599       // set random key to differentiate the connection from previous ones
600       Configuration configuration = TEST_UTIL.getConfiguration();
601       configuration.set("some_key", String.valueOf(_randy.nextInt()));
602       LOG.info("The hash code of the current configuration is: "
603           + configuration.hashCode());
604       HConnection currentConnection = HConnectionManager
605           .getConnection(configuration);
606       if (previousConnection != null) {
607         assertTrue(
608             "Did not get the same connection even though its key didn't change",
609             previousConnection == currentConnection);
610       }
611       previousConnection = currentConnection;
612       // change the configuration, so that it is no longer reachable from the
613       // client's perspective. However, since its part of the LRU doubly linked
614       // list, it will eventually get thrown out, at which time it should also
615       // close the corresponding {@link HConnection}.
616       configuration.set("other_key", String.valueOf(_randy.nextInt()));
617     }
618   }
619 
620   /**
621    * Makes sure that there is no leaking of
622    * {@link HConnectionManager.HConnectionImplementation} in the {@link HConnectionManager}
623    * class.
624    */
625   @Test
626   public void testConnectionUniqueness() throws Exception {
627     int zkmaxconnections = TEST_UTIL.getConfiguration().
628       getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
629           HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS);
630     // Test up to a max that is < the maximum number of zk connections.  If we
631     // go above zk connections, we just fall into cycle where we are failing
632     // to set up a session and test runs for a long time.
633     int maxConnections = Math.min(zkmaxconnections - 1, 20);
634     List<HConnection> connections = new ArrayList<HConnection>(maxConnections);
635     HConnection previousConnection = null;
636     try {
637       for (int i = 0; i < maxConnections; i++) {
638         // set random key to differentiate the connection from previous ones
639         Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
640         configuration.set("some_key", String.valueOf(_randy.nextInt()));
641         configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
642             String.valueOf(_randy.nextInt()));
643         LOG.info("The hash code of the current configuration is: "
644             + configuration.hashCode());
645         HConnection currentConnection =
646           HConnectionManager.getConnection(configuration);
647         if (previousConnection != null) {
648           assertTrue("Got the same connection even though its key changed!",
649               previousConnection != currentConnection);
650         }
651         // change the configuration, so that it is no longer reachable from the
652         // client's perspective. However, since its part of the LRU doubly linked
653         // list, it will eventually get thrown out, at which time it should also
654         // close the corresponding {@link HConnection}.
655         configuration.set("other_key", String.valueOf(_randy.nextInt()));
656 
657         previousConnection = currentConnection;
658         LOG.info("The current HConnectionManager#HBASE_INSTANCES cache size is: "
659             + getHConnectionManagerCacheSize());
660         Thread.sleep(50);
661         connections.add(currentConnection);
662       }
663     } finally {
664       for (HConnection c: connections) {
665         // Clean up connections made so we don't interfere w/ subsequent tests.
666         HConnectionManager.deleteConnection(c.getConfiguration());
667       }
668     }
669   }
670 
671   @Test
672   public void testClosing() throws Exception {
673     Configuration configuration =
674       new Configuration(TEST_UTIL.getConfiguration());
675     configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
676         String.valueOf(_randy.nextInt()));
677 
678     HConnection c1 = HConnectionManager.createConnection(configuration);
679     // We create two connections with the same key.
680     HConnection c2 = HConnectionManager.createConnection(configuration);
681 
682     HConnection c3 = HConnectionManager.getConnection(configuration);
683     HConnection c4 = HConnectionManager.getConnection(configuration);
684     assertTrue(c3 == c4);
685 
686     c1.close();
687     assertTrue(c1.isClosed());
688     assertFalse(c2.isClosed());
689     assertFalse(c3.isClosed());
690 
691     c3.close();
692     // still a reference left
693     assertFalse(c3.isClosed());
694     c3.close();
695     assertTrue(c3.isClosed());
696     // c3 was removed from the cache
697     HConnection c5 = HConnectionManager.getConnection(configuration);
698     assertTrue(c5 != c3);
699 
700     assertFalse(c2.isClosed());
701     c2.close();
702     assertTrue(c2.isClosed());
703     c5.close();
704     assertTrue(c5.isClosed());
705   }
706 
707   /**
708    * Trivial test to verify that nobody messes with
709    * {@link HConnectionManager#createConnection(Configuration)}
710    */
711   @Test
712   public void testCreateConnection() throws Exception {
713     Configuration configuration = TEST_UTIL.getConfiguration();
714     HConnection c1 = HConnectionManager.createConnection(configuration);
715     HConnection c2 = HConnectionManager.createConnection(configuration);
716     // created from the same configuration, yet they are different
717     assertTrue(c1 != c2);
718     assertTrue(c1.getConfiguration() == c2.getConfiguration());
719     // make sure these were not cached
720     HConnection c3 = HConnectionManager.getConnection(configuration);
721     assertTrue(c1 != c3);
722     assertTrue(c2 != c3);
723   }
724 
725 
726   /**
727    * This test checks that one can connect to the cluster with only the
728    *  ZooKeeper quorum set. Other stuff like master address will be read
729    *  from ZK by the client.
730    */
731   @Test(timeout = 60000)
732   public void testConnection() throws Exception{
733     // We create an empty config and add the ZK address.
734     Configuration c = new Configuration();
735     c.set(HConstants.ZOOKEEPER_QUORUM,
736       TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
737     c.set(HConstants.ZOOKEEPER_CLIENT_PORT ,
738       TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
739 
740     // This should be enough to connect
741     HConnection conn = HConnectionManager.getConnection(c);
742     assertTrue( conn.isMasterRunning() );
743     conn.close();
744   }
745 
746   private int setNumTries(HConnectionImplementation hci, int newVal) throws Exception {
747     Field numTries = hci.getClass().getDeclaredField("numTries");
748     numTries.setAccessible(true);
749     Field modifiersField = Field.class.getDeclaredField("modifiers");
750     modifiersField.setAccessible(true);
751     modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
752     final int prevNumRetriesVal = (Integer)numTries.get(hci);
753     numTries.set(hci, newVal);
754 
755     return prevNumRetriesVal;
756   }
757 
758   @Test
759   public void testMulti() throws Exception {
760     HTable table = TEST_UTIL.createTable(TABLE_NAME3, FAM_NAM);
761     TEST_UTIL.createMultiRegions(table, FAM_NAM);
762     HConnectionManager.HConnectionImplementation conn =
763       (HConnectionManager.HConnectionImplementation)
764         HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
765 
766     // We're now going to move the region and check that it works for the client
767     // First a new put to add the location in the cache
768     conn.clearRegionCache(TABLE_NAME3);
769     Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
770 
771     TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
772     HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
773 
774     // We can wait for all regions to be online, that makes log reading easier when debugging
775     while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
776       Thread.sleep(1);
777     }
778 
779     Put put = new Put(ROW_X);
780     put.add(FAM_NAM, ROW_X, ROW_X);
781     table.put(put);
782 
783     // Now moving the region to the second server
784     HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X);
785     byte[] regionName = toMove.getRegionInfo().getRegionName();
786     byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
787 
788     // Choose the other server.
789     int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
790     int destServerId = (curServerId == 0 ? 1 : 0);
791 
792     HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
793     HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
794 
795     ServerName destServerName = destServer.getServerName();
796 
797     //find another row in the cur server that is less than ROW_X
798     List<HRegion> regions = curServer.getOnlineRegions(TABLE_NAME3);
799     byte[] otherRow = null;
800     for (HRegion region : regions) {
801       if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
802           && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) {
803         otherRow = region.getRegionInfo().getStartKey();
804         break;
805       }
806     }
807     assertNotNull(otherRow);
808     // If empty row, set it to first row.-f
809     if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
810     Put put2 = new Put(otherRow);
811     put2.add(FAM_NAM, otherRow, otherRow);
812     table.put(put2); //cache put2's location
813 
814     // Check that we are in the expected state
815     Assert.assertTrue(curServer != destServer);
816     Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
817     Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
818     Assert.assertNotNull(curServer.getOnlineRegion(regionName));
819     Assert.assertNull(destServer.getOnlineRegion(regionName));
820     Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
821         getAssignmentManager().getRegionStates().isRegionsInTransition());
822 
823     // Moving. It's possible that we don't have all the regions online at this point, so
824     //  the test must depends only on the region we're looking at.
825     LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
826     TEST_UTIL.getHBaseAdmin().move(
827       toMove.getRegionInfo().getEncodedNameAsBytes(),
828       destServerName.getServerName().getBytes()
829     );
830 
831     while (destServer.getOnlineRegion(regionName) == null ||
832         destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
833         curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
834         master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
835       // wait for the move to be finished
836       Thread.sleep(1);
837     }
838 
839     LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
840 
841     // Check our new state.
842     Assert.assertNull(curServer.getOnlineRegion(regionName));
843     Assert.assertNotNull(destServer.getOnlineRegion(regionName));
844     Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
845     Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
846 
847 
848     // Cache was NOT updated and points to the wrong server
849     Assert.assertFalse(
850         conn.getCachedLocation(TABLE_NAME3, ROW_X).getPort() == destServerName.getPort());
851 
852     // Hijack the number of retry to fail after 2 tries
853     final int prevNumRetriesVal = setNumTries(conn, 2);
854 
855     Put put3 = new Put(ROW_X);
856     put3.add(FAM_NAM, ROW_X, ROW_X);
857     Put put4 = new Put(otherRow);
858     put4.add(FAM_NAM, otherRow, otherRow);
859 
860     // do multi
861     table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row,
862                                                  // second we get RegionMovedException.
863 
864     setNumTries(conn, prevNumRetriesVal);
865     table.close();
866     conn.close();
867   }
868 
869   @Ignore ("Test presumes RETRY_BACKOFF will never change; it has") @Test
870   public void testErrorBackoffTimeCalculation() throws Exception {
871     // TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not.
872     final long ANY_PAUSE = 100;
873     HRegionInfo ri = new HRegionInfo(TABLE_NAME);
874     HRegionLocation location = new HRegionLocation(ri, ServerName.valueOf("127.0.0.1", 1, 0));
875     HRegionLocation diffLocation = new HRegionLocation(ri, ServerName.valueOf("127.0.0.1", 2, 0));
876 
877     ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
878     EnvironmentEdgeManager.injectEdge(timeMachine);
879     try {
880       long timeBase = timeMachine.currentTimeMillis();
881       long largeAmountOfTime = ANY_PAUSE * 1000;
882       HConnectionManager.ServerErrorTracker tracker =
883           new HConnectionManager.ServerErrorTracker(largeAmountOfTime, 100);
884 
885       // The default backoff is 0.
886       assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
887 
888       // Check some backoff values from HConstants sequence.
889       tracker.reportServerError(location);
890       assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(location, ANY_PAUSE));
891       tracker.reportServerError(location);
892       tracker.reportServerError(location);
893       tracker.reportServerError(location);
894       assertEqualsWithJitter(ANY_PAUSE * 5, tracker.calculateBackoffTime(location, ANY_PAUSE));
895 
896       // All of this shouldn't affect backoff for different location.
897 
898       assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
899       tracker.reportServerError(diffLocation);
900       assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
901 
902       // But should still work for a different region in the same location.
903       HRegionInfo ri2 = new HRegionInfo(TABLE_NAME2);
904       HRegionLocation diffRegion = new HRegionLocation(ri2, location.getServerName());
905       assertEqualsWithJitter(ANY_PAUSE * 5, tracker.calculateBackoffTime(diffRegion, ANY_PAUSE));
906 
907       // Check with different base.
908       assertEqualsWithJitter(ANY_PAUSE * 10,
909           tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
910 
911       // See that time from last error is taken into account. Time shift is applied after jitter,
912       // so pass the original expected backoff as the base for jitter.
913       long timeShift = (long)(ANY_PAUSE * 0.5);
914       timeMachine.setValue(timeBase + timeShift);
915       assertEqualsWithJitter((ANY_PAUSE * 5) - timeShift,
916         tracker.calculateBackoffTime(location, ANY_PAUSE), ANY_PAUSE * 2);
917 
918       // However we should not go into negative.
919       timeMachine.setValue(timeBase + ANY_PAUSE * 100);
920       assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
921 
922       // We also should not go over the boundary; last retry would be on it.
923       long timeLeft = (long)(ANY_PAUSE * 0.5);
924       timeMachine.setValue(timeBase + largeAmountOfTime - timeLeft);
925       assertTrue(tracker.canRetryMore(1));
926       tracker.reportServerError(location);
927       assertEquals(timeLeft, tracker.calculateBackoffTime(location, ANY_PAUSE));
928       timeMachine.setValue(timeBase + largeAmountOfTime);
929       assertFalse(tracker.canRetryMore(1));
930     } finally {
931       EnvironmentEdgeManager.reset();
932     }
933   }
934 
935   private static void assertEqualsWithJitter(long expected, long actual) {
936     assertEqualsWithJitter(expected, actual, expected);
937   }
938 
939   private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) {
940     assertTrue("Value not within jitter: " + expected + " vs " + actual,
941         Math.abs(actual - expected) <= (0.01f * jitterBase));
942   }
943 
944   /**
945    * Tests that a destroyed connection does not have a live zookeeper.
946    * Below is timing based.  We put up a connection to a table and then close the connection while
947    * having a background thread running that is forcing close of the connection to try and
948    * provoke a close catastrophe; we are hoping for a car crash so we can see if we are leaking
949    * zk connections.
950    * @throws Exception
951    */
952   @Ignore ("Flakey test: See HBASE-8996")@Test
953   public void testDeleteForZKConnLeak() throws Exception {
954     TEST_UTIL.createTable(TABLE_NAME4, FAM_NAM);
955     final Configuration config = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
956     config.setInt("zookeeper.recovery.retry", 1);
957     config.setInt("zookeeper.recovery.retry.intervalmill", 1000);
958     config.setInt("hbase.rpc.timeout", 2000);
959     config.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
960 
961     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10,
962       5, TimeUnit.SECONDS,
963       new SynchronousQueue<Runnable>(),
964       Threads.newDaemonThreadFactory("test-hcm-delete"));
965 
966     pool.submit(new Runnable() {
967       @Override
968       public void run() {
969         while (!Thread.interrupted()) {
970           try {
971             HConnection conn = HConnectionManager.getConnection(config);
972             LOG.info("Connection " + conn);
973             HConnectionManager.deleteStaleConnection(conn);
974             LOG.info("Connection closed " + conn);
975             // TODO: This sleep time should be less than the time that it takes to open and close
976             // a table.  Ideally we would do a few runs first to measure.  For now this is
977             // timing based; hopefully we hit the bad condition.
978             Threads.sleep(10);
979           } catch (Exception e) {
980           }
981         }
982       }
983     });
984 
985     // Use connection multiple times.
986     for (int i = 0; i < 30; i++) {
987       HConnection c1 = null;
988       try {
989         c1 = HConnectionManager.getConnection(config);
990         LOG.info("HTable connection " + i + " " + c1);
991         HTable table = new HTable(TABLE_NAME4, c1, pool);
992         table.close();
993         LOG.info("HTable connection " + i + " closed " + c1);
994       } catch (Exception e) {
995         LOG.info("We actually want this to happen!!!!  So we can see if we are leaking zk", e);
996       } finally {
997         if (c1 != null) {
998           if (c1.isClosed()) {
999             // cannot use getZooKeeper as method instantiates watcher if null
1000             Field zkwField = c1.getClass().getDeclaredField("keepAliveZookeeper");
1001             zkwField.setAccessible(true);
1002             Object watcher = zkwField.get(c1);
1003 
1004             if (watcher != null) {
1005               if (((ZooKeeperWatcher)watcher).getRecoverableZooKeeper().getState().isAlive()) {
1006                 // non-synchronized access to watcher; sleep and check again in case zk connection
1007                 // hasn't been cleaned up yet.
1008                 Thread.sleep(1000);
1009                 if (((ZooKeeperWatcher) watcher).getRecoverableZooKeeper().getState().isAlive()) {
1010                   pool.shutdownNow();
1011                   fail("Live zookeeper in closed connection");
1012                 }
1013               }
1014             }
1015           }
1016           c1.close();
1017         }
1018       }
1019     }
1020     pool.shutdownNow();
1021   }
1022 }
1023