View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import static org.junit.Assert.assertEquals;
22  
23  import java.io.IOException;
24  import java.util.List;
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.HBaseTestingUtility;
30  import org.apache.hadoop.hbase.HRegionInfo;
31  import org.apache.hadoop.hbase.MediumTests;
32  import org.apache.hadoop.hbase.MiniHBaseCluster;
33  import org.apache.hadoop.hbase.TableName;
34  import org.apache.hadoop.hbase.ServerName;
35  import org.apache.hadoop.hbase.client.HBaseAdmin;
36  import org.apache.hadoop.hbase.client.HTable;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.regionserver.HRegion;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.util.JVMClusterUtil;
41  
42  import org.junit.AfterClass;
43  import org.junit.BeforeClass;
44  import org.junit.Test;
45  
46  import com.google.common.base.Joiner;
47  import org.junit.experimental.categories.Category;
48  
49  @Category(MediumTests.class)
50  public class TestAssignmentListener {
51    private static final Log LOG = LogFactory.getLog(TestAssignmentListener.class);
52  
53    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
54  
55    static class DummyListener {
56      protected AtomicInteger modified = new AtomicInteger(0);
57  
58      public void awaitModifications(int count) throws InterruptedException {
59        while (!modified.compareAndSet(count, 0)) {
60          Thread.sleep(100);
61        }
62      }
63    }
64  
65    static class DummyAssignmentListener extends DummyListener implements AssignmentListener {
66      private AtomicInteger closeCount = new AtomicInteger(0);
67      private AtomicInteger openCount = new AtomicInteger(0);
68  
69      public DummyAssignmentListener() {
70      }
71  
72      public void regionOpened(final HRegionInfo regionInfo, final ServerName serverName) {
73        LOG.info("Assignment open region=" + regionInfo + " server=" + serverName);
74        openCount.incrementAndGet();
75        modified.incrementAndGet();
76      }
77  
78      public void regionClosed(final HRegionInfo regionInfo) {
79        LOG.info("Assignment close region=" + regionInfo);
80        closeCount.incrementAndGet();
81        modified.incrementAndGet();
82      }
83  
84      public void reset() {
85        openCount.set(0);
86        closeCount.set(0);
87      }
88  
89      public int getLoadCount() {
90        return openCount.get();
91      }
92  
93      public int getCloseCount() {
94        return closeCount.get();
95      }
96    }
97  
98    static class DummyServerListener extends DummyListener implements ServerListener {
99      private AtomicInteger removedCount = new AtomicInteger(0);
100     private AtomicInteger addedCount = new AtomicInteger(0);
101 
102     public DummyServerListener() {
103     }
104 
105     public void serverAdded(final ServerName serverName) {
106       LOG.info("Server added " + serverName);
107       addedCount.incrementAndGet();
108       modified.incrementAndGet();
109     }
110 
111     public void serverRemoved(final ServerName serverName) {
112       LOG.info("Server removed " + serverName);
113       removedCount.incrementAndGet();
114       modified.incrementAndGet();
115     }
116 
117     public void reset() {
118       addedCount.set(0);
119       removedCount.set(0);
120     }
121 
122     public int getAddedCount() {
123       return addedCount.get();
124     }
125 
126     public int getRemovedCount() {
127       return removedCount.get();
128     }
129   }
130 
131   @BeforeClass
132   public static void beforeAllTests() throws Exception {
133     TEST_UTIL.startMiniCluster(2);
134   }
135 
136   @AfterClass
137   public static void afterAllTests() throws Exception {
138     TEST_UTIL.shutdownMiniCluster();
139   }
140 
141   @Test(timeout=60000)
142   public void testServerListener() throws IOException, InterruptedException {
143     ServerManager serverManager = TEST_UTIL.getHBaseCluster().getMaster().getServerManager();
144 
145     DummyServerListener listener = new DummyServerListener();
146     serverManager.registerListener(listener);
147     try {
148       MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
149 
150       // Start a new Region Server
151       miniCluster.startRegionServer();
152       listener.awaitModifications(1);
153       assertEquals(1, listener.getAddedCount());
154       assertEquals(0, listener.getRemovedCount());
155 
156       // Start another Region Server
157       listener.reset();
158       miniCluster.startRegionServer();
159       listener.awaitModifications(1);
160       assertEquals(1, listener.getAddedCount());
161       assertEquals(0, listener.getRemovedCount());
162 
163       int nrs = miniCluster.getRegionServerThreads().size();
164 
165       // Stop a Region Server
166       listener.reset();
167       miniCluster.stopRegionServer(nrs - 1);
168       listener.awaitModifications(1);
169       assertEquals(0, listener.getAddedCount());
170       assertEquals(1, listener.getRemovedCount());
171 
172       // Stop another Region Server
173       listener.reset();
174       miniCluster.stopRegionServer(nrs - 2);
175       listener.awaitModifications(1);
176       assertEquals(0, listener.getAddedCount());
177       assertEquals(1, listener.getRemovedCount());
178     } finally {
179       serverManager.unregisterListener(listener);
180     }
181   }
182 
183   @Test(timeout=60000)
184   public void testAssignmentListener() throws IOException, InterruptedException {
185     AssignmentManager am = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager();
186     HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
187 
188     DummyAssignmentListener listener = new DummyAssignmentListener();
189     am.registerListener(listener);
190     try {
191       final String TABLE_NAME_STR = "testtb";
192       final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
193       final byte[] FAMILY = Bytes.toBytes("cf");
194 
195       // Create a new table, with a single region
196       LOG.info("Create Table");
197       TEST_UTIL.createTable(TABLE_NAME, FAMILY);
198       listener.awaitModifications(1);
199       assertEquals(1, listener.getLoadCount());
200       assertEquals(0, listener.getCloseCount());
201 
202       // Add some data
203       HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME);
204       try {
205         for (int i = 0; i < 10; ++i) {
206           byte[] key = Bytes.toBytes("row-" + i);
207           Put put = new Put(key);
208           put.add(FAMILY, null, key);
209           table.put(put);
210         }
211       } finally {
212         table.close();
213       }
214 
215       // Split the table in two
216       LOG.info("Split Table");
217       listener.reset();
218       admin.split(TABLE_NAME_STR, "row-3");
219       listener.awaitModifications(3);
220       assertEquals(2, listener.getLoadCount());     // daughters added
221       assertEquals(1, listener.getCloseCount());    // parent removed
222 
223       // Wait for the Regions to be mergeable
224       MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
225       int mergeable = 0;
226       while (mergeable < 2) {
227         Thread.sleep(100);
228         admin.majorCompact(TABLE_NAME_STR);
229         mergeable = 0;
230         for (JVMClusterUtil.RegionServerThread regionThread: miniCluster.getRegionServerThreads()) {
231           for (HRegion region: regionThread.getRegionServer().getOnlineRegions(TABLE_NAME)) {
232             mergeable += region.isMergeable() ? 1 : 0;
233           }
234         }
235       }
236 
237       // Merge the two regions
238       LOG.info("Merge Regions");
239       listener.reset();
240       List<HRegionInfo> regions = admin.getTableRegions(TABLE_NAME);
241       assertEquals(2, regions.size());
242       admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(),
243         regions.get(1).getEncodedNameAsBytes(), true);
244       listener.awaitModifications(3);
245       assertEquals(1, admin.getTableRegions(TABLE_NAME).size());
246       assertEquals(1, listener.getLoadCount());     // new merged region added
247       assertEquals(2, listener.getCloseCount());    // daughters removed
248 
249       // Delete the table
250       LOG.info("Drop Table");
251       listener.reset();
252       TEST_UTIL.deleteTable(TABLE_NAME);
253       listener.awaitModifications(1);
254       assertEquals(0, listener.getLoadCount());
255       assertEquals(1, listener.getCloseCount());
256     } finally {
257       am.unregisterListener(listener);
258     }
259   }
260 }