View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import static org.junit.Assert.assertEquals;
22  
23  import java.io.IOException;
24  import java.util.List;
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.HBaseTestingUtility;
30  import org.apache.hadoop.hbase.HRegionInfo;
31  import org.apache.hadoop.hbase.testclassification.MediumTests;
32  import org.apache.hadoop.hbase.MiniHBaseCluster;
33  import org.apache.hadoop.hbase.TableName;
34  import org.apache.hadoop.hbase.ServerName;
35  import org.apache.hadoop.hbase.client.Admin;
36  import org.apache.hadoop.hbase.client.HTable;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.client.Table;
39  import org.apache.hadoop.hbase.regionserver.HRegion;
40  import org.apache.hadoop.hbase.regionserver.Region;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.JVMClusterUtil;
43  import org.junit.AfterClass;
44  import org.junit.BeforeClass;
45  import org.junit.Test;
46  import org.junit.experimental.categories.Category;
47  
48  @Category(MediumTests.class)
49  public class TestAssignmentListener {
50    private static final Log LOG = LogFactory.getLog(TestAssignmentListener.class);
51  
52    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
53  
54    static class DummyListener {
55      protected AtomicInteger modified = new AtomicInteger(0);
56  
57      public void awaitModifications(int count) throws InterruptedException {
58        while (!modified.compareAndSet(count, 0)) {
59          Thread.sleep(100);
60        }
61      }
62    }
63  
64    static class DummyAssignmentListener extends DummyListener implements AssignmentListener {
65      private AtomicInteger closeCount = new AtomicInteger(0);
66      private AtomicInteger openCount = new AtomicInteger(0);
67  
68      public DummyAssignmentListener() {
69      }
70  
71      @Override
72      public void regionOpened(final HRegionInfo regionInfo, final ServerName serverName) {
73        LOG.info("Assignment open region=" + regionInfo + " server=" + serverName);
74        openCount.incrementAndGet();
75        modified.incrementAndGet();
76      }
77  
78      @Override
79      public void regionClosed(final HRegionInfo regionInfo) {
80        LOG.info("Assignment close region=" + regionInfo);
81        closeCount.incrementAndGet();
82        modified.incrementAndGet();
83      }
84  
85      public void reset() {
86        openCount.set(0);
87        closeCount.set(0);
88      }
89  
90      public int getLoadCount() {
91        return openCount.get();
92      }
93  
94      public int getCloseCount() {
95        return closeCount.get();
96      }
97    }
98  
99    static class DummyServerListener extends DummyListener implements ServerListener {
100     private AtomicInteger removedCount = new AtomicInteger(0);
101     private AtomicInteger addedCount = new AtomicInteger(0);
102 
103     public DummyServerListener() {
104     }
105 
106     @Override
107     public void serverAdded(final ServerName serverName) {
108       LOG.info("Server added " + serverName);
109       addedCount.incrementAndGet();
110       modified.incrementAndGet();
111     }
112 
113     @Override
114     public void serverRemoved(final ServerName serverName) {
115       LOG.info("Server removed " + serverName);
116       removedCount.incrementAndGet();
117       modified.incrementAndGet();
118     }
119 
120     public void reset() {
121       addedCount.set(0);
122       removedCount.set(0);
123     }
124 
125     public int getAddedCount() {
126       return addedCount.get();
127     }
128 
129     public int getRemovedCount() {
130       return removedCount.get();
131     }
132   }
133 
134   @BeforeClass
135   public static void beforeAllTests() throws Exception {
136     TEST_UTIL.startMiniCluster(2);
137   }
138 
139   @AfterClass
140   public static void afterAllTests() throws Exception {
141     TEST_UTIL.shutdownMiniCluster();
142   }
143 
144   @Test(timeout=60000)
145   public void testServerListener() throws IOException, InterruptedException {
146     ServerManager serverManager = TEST_UTIL.getHBaseCluster().getMaster().getServerManager();
147 
148     DummyServerListener listener = new DummyServerListener();
149     serverManager.registerListener(listener);
150     try {
151       MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
152 
153       // Start a new Region Server
154       miniCluster.startRegionServer();
155       listener.awaitModifications(1);
156       assertEquals(1, listener.getAddedCount());
157       assertEquals(0, listener.getRemovedCount());
158 
159       // Start another Region Server
160       listener.reset();
161       miniCluster.startRegionServer();
162       listener.awaitModifications(1);
163       assertEquals(1, listener.getAddedCount());
164       assertEquals(0, listener.getRemovedCount());
165 
166       int nrs = miniCluster.getRegionServerThreads().size();
167 
168       // Stop a Region Server
169       listener.reset();
170       miniCluster.stopRegionServer(nrs - 1);
171       listener.awaitModifications(1);
172       assertEquals(0, listener.getAddedCount());
173       assertEquals(1, listener.getRemovedCount());
174 
175       // Stop another Region Server
176       listener.reset();
177       miniCluster.stopRegionServer(nrs - 2);
178       listener.awaitModifications(1);
179       assertEquals(0, listener.getAddedCount());
180       assertEquals(1, listener.getRemovedCount());
181     } finally {
182       serverManager.unregisterListener(listener);
183     }
184   }
185 
186   @Test(timeout=60000)
187   public void testAssignmentListener() throws IOException, InterruptedException {
188     AssignmentManager am = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager();
189     Admin admin = TEST_UTIL.getHBaseAdmin();
190 
191     DummyAssignmentListener listener = new DummyAssignmentListener();
192     am.registerListener(listener);
193     try {
194       final String TABLE_NAME_STR = "testtb";
195       final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
196       final byte[] FAMILY = Bytes.toBytes("cf");
197 
198       // Create a new table, with a single region
199       LOG.info("Create Table");
200       TEST_UTIL.createTable(TABLE_NAME, FAMILY);
201       listener.awaitModifications(1);
202       assertEquals(1, listener.getLoadCount());
203       assertEquals(0, listener.getCloseCount());
204 
205       // Add some data
206       Table table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME);
207       try {
208         for (int i = 0; i < 10; ++i) {
209           byte[] key = Bytes.toBytes("row-" + i);
210           Put put = new Put(key);
211           put.add(FAMILY, null, key);
212           table.put(put);
213         }
214       } finally {
215         table.close();
216       }
217 
218       // Split the table in two
219       LOG.info("Split Table");
220       listener.reset();
221       admin.split(TABLE_NAME, Bytes.toBytes("row-3"));
222       listener.awaitModifications(3);
223       assertEquals(2, listener.getLoadCount());     // daughters added
224       assertEquals(1, listener.getCloseCount());    // parent removed
225 
226       // Wait for the Regions to be mergeable
227       MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
228       int mergeable = 0;
229       while (mergeable < 2) {
230         Thread.sleep(100);
231         admin.majorCompact(TABLE_NAME);
232         mergeable = 0;
233         for (JVMClusterUtil.RegionServerThread regionThread: miniCluster.getRegionServerThreads()) {
234           for (Region region: regionThread.getRegionServer().getOnlineRegions(TABLE_NAME)) {
235             mergeable += ((HRegion)region).isMergeable() ? 1 : 0;
236           }
237         }
238       }
239 
240       // Merge the two regions
241       LOG.info("Merge Regions");
242       listener.reset();
243       List<HRegionInfo> regions = admin.getTableRegions(TABLE_NAME);
244       assertEquals(2, regions.size());
245       admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(),
246         regions.get(1).getEncodedNameAsBytes(), true);
247       listener.awaitModifications(3);
248       assertEquals(1, admin.getTableRegions(TABLE_NAME).size());
249       assertEquals(1, listener.getLoadCount());     // new merged region added
250       assertEquals(2, listener.getCloseCount());    // daughters removed
251 
252       // Delete the table
253       LOG.info("Drop Table");
254       listener.reset();
255       TEST_UTIL.deleteTable(TABLE_NAME);
256       listener.awaitModifications(1);
257       assertEquals(0, listener.getLoadCount());
258       assertEquals(1, listener.getCloseCount());
259     } finally {
260       am.unregisterListener(listener);
261     }
262   }
263 }