View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.fail;
24  
25  import java.io.IOException;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.LargeTests;
30  import org.apache.hadoop.hbase.MiniHBaseCluster;
31  import org.apache.hadoop.hbase.client.Get;
32  import org.apache.hadoop.hbase.client.HTable;
33  import org.apache.hadoop.hbase.client.Put;
34  import org.apache.hadoop.hbase.client.Result;
35  import org.apache.hadoop.hbase.client.ResultScanner;
36  import org.apache.hadoop.hbase.client.Scan;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.hbase.util.JVMClusterUtil;
39  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
40  import org.junit.Before;
41  import org.junit.Test;
42  import org.junit.experimental.categories.Category;
43  
44  /**
45   * Test handling of changes to the number of a peer's regionservers.
46   */
47  @Category(LargeTests.class)
48  public class TestReplicationChangingPeerRegionservers extends TestReplicationBase {
49  
50    private static final Log LOG = LogFactory.getLog(TestReplicationChangingPeerRegionservers.class);
51  
52    /**
53     * @throws java.lang.Exception
54     */
55    @Before
56    public void setUp() throws Exception {
57      htable1.setAutoFlush(false, true);
58      // Starting and stopping replication can make us miss new logs,
59      // rolling like this makes sure the most recent one gets added to the queue
60      for (JVMClusterUtil.RegionServerThread r :
61                            utility1.getHBaseCluster().getRegionServerThreads()) {
62        r.getRegionServer().getWAL().rollWriter();
63      }
64      utility1.truncateTable(tableName);
65      // truncating the table will send one Delete per row to the slave cluster
66      // in an async fashion, which is why we cannot just call truncateTable on
67      // utility2 since late writes could make it to the slave in some way.
68      // Instead, we truncate the first table and wait for all the Deletes to
69      // make it to the slave.
70      Scan scan = new Scan();
71      int lastCount = 0;
72      for (int i = 0; i < NB_RETRIES; i++) {
73        if (i == NB_RETRIES - 1) {
74          fail("Waited too much time for truncate");
75        }
76        ResultScanner scanner = htable2.getScanner(scan);
77        Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
78        scanner.close();
79        if (res.length != 0) {
80          if (res.length < lastCount) {
81            i--; // Don't increment timeout if we make progress
82          }
83          lastCount = res.length;
84          LOG.info("Still got " + res.length + " rows");
85          Thread.sleep(SLEEP_TIME);
86        } else {
87          break;
88        }
89      }
90    }
91  
92    @Test(timeout = 300000)
93    public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException {
94  
95      LOG.info("testSimplePutDelete");
96      MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster();
97  
98      doPutTest(Bytes.toBytes(1));
99  
100     int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0;
101     peerCluster.stopRegionServer(rsToStop);
102     peerCluster.waitOnRegionServer(rsToStop);
103 
104     // Sanity check
105     assertEquals(1, peerCluster.getRegionServerThreads().size());
106 
107     doPutTest(Bytes.toBytes(2));
108 
109     peerCluster.startRegionServer();
110 
111     // Sanity check
112     assertEquals(2, peerCluster.getRegionServerThreads().size());
113 
114     doPutTest(Bytes.toBytes(3));
115 
116   }
117 
118   private void doPutTest(byte[] row) throws IOException, InterruptedException {
119     Put put = new Put(row);
120     put.add(famName, row, row);
121 
122     htable1 = new HTable(conf1, tableName);
123     htable1.put(put);
124 
125     Get get = new Get(row);
126     for (int i = 0; i < NB_RETRIES; i++) {
127       if (i == NB_RETRIES - 1) {
128         fail("Waited too much time for put replication");
129       }
130       Result res = htable2.get(get);
131       if (res.size() == 0) {
132         LOG.info("Row not available");
133         Thread.sleep(SLEEP_TIME);
134       } else {
135         assertArrayEquals(res.value(), row);
136         break;
137       }
138     }
139 
140   }
141 
142 }