1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import static org.junit.Assert.assertEquals;
22
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.HRegionInfo;
31 import org.apache.hadoop.hbase.MediumTests;
32 import org.apache.hadoop.hbase.MiniHBaseCluster;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.ServerName;
35 import org.apache.hadoop.hbase.client.HBaseAdmin;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.regionserver.HRegion;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.util.JVMClusterUtil;
41
42 import org.junit.AfterClass;
43 import org.junit.BeforeClass;
44 import org.junit.Test;
45
46 import com.google.common.base.Joiner;
47 import org.junit.experimental.categories.Category;
48
49 @Category(MediumTests.class)
50 public class TestAssignmentListener {
51 private static final Log LOG = LogFactory.getLog(TestAssignmentListener.class);
52
53 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
54
55 static class DummyListener {
56 protected AtomicInteger modified = new AtomicInteger(0);
57
58 public void awaitModifications(int count) throws InterruptedException {
59 while (!modified.compareAndSet(count, 0)) {
60 Thread.sleep(100);
61 }
62 }
63 }
64
65 static class DummyAssignmentListener extends DummyListener implements AssignmentListener {
66 private AtomicInteger closeCount = new AtomicInteger(0);
67 private AtomicInteger openCount = new AtomicInteger(0);
68
69 public DummyAssignmentListener() {
70 }
71
72 public void regionOpened(final HRegionInfo regionInfo, final ServerName serverName) {
73 LOG.info("Assignment open region=" + regionInfo + " server=" + serverName);
74 openCount.incrementAndGet();
75 modified.incrementAndGet();
76 }
77
78 public void regionClosed(final HRegionInfo regionInfo) {
79 LOG.info("Assignment close region=" + regionInfo);
80 closeCount.incrementAndGet();
81 modified.incrementAndGet();
82 }
83
84 public void reset() {
85 openCount.set(0);
86 closeCount.set(0);
87 }
88
89 public int getLoadCount() {
90 return openCount.get();
91 }
92
93 public int getCloseCount() {
94 return closeCount.get();
95 }
96 }
97
98 static class DummyServerListener extends DummyListener implements ServerListener {
99 private AtomicInteger removedCount = new AtomicInteger(0);
100 private AtomicInteger addedCount = new AtomicInteger(0);
101
102 public DummyServerListener() {
103 }
104
105 public void serverAdded(final ServerName serverName) {
106 LOG.info("Server added " + serverName);
107 addedCount.incrementAndGet();
108 modified.incrementAndGet();
109 }
110
111 public void serverRemoved(final ServerName serverName) {
112 LOG.info("Server removed " + serverName);
113 removedCount.incrementAndGet();
114 modified.incrementAndGet();
115 }
116
117 public void reset() {
118 addedCount.set(0);
119 removedCount.set(0);
120 }
121
122 public int getAddedCount() {
123 return addedCount.get();
124 }
125
126 public int getRemovedCount() {
127 return removedCount.get();
128 }
129 }
130
131 @BeforeClass
132 public static void beforeAllTests() throws Exception {
133 TEST_UTIL.startMiniCluster(2);
134 }
135
136 @AfterClass
137 public static void afterAllTests() throws Exception {
138 TEST_UTIL.shutdownMiniCluster();
139 }
140
141 @Test(timeout=60000)
142 public void testServerListener() throws IOException, InterruptedException {
143 ServerManager serverManager = TEST_UTIL.getHBaseCluster().getMaster().getServerManager();
144
145 DummyServerListener listener = new DummyServerListener();
146 serverManager.registerListener(listener);
147 try {
148 MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
149
150
151 miniCluster.startRegionServer();
152 listener.awaitModifications(1);
153 assertEquals(1, listener.getAddedCount());
154 assertEquals(0, listener.getRemovedCount());
155
156
157 listener.reset();
158 miniCluster.startRegionServer();
159 listener.awaitModifications(1);
160 assertEquals(1, listener.getAddedCount());
161 assertEquals(0, listener.getRemovedCount());
162
163 int nrs = miniCluster.getRegionServerThreads().size();
164
165
166 listener.reset();
167 miniCluster.stopRegionServer(nrs - 1);
168 listener.awaitModifications(1);
169 assertEquals(0, listener.getAddedCount());
170 assertEquals(1, listener.getRemovedCount());
171
172
173 listener.reset();
174 miniCluster.stopRegionServer(nrs - 2);
175 listener.awaitModifications(1);
176 assertEquals(0, listener.getAddedCount());
177 assertEquals(1, listener.getRemovedCount());
178 } finally {
179 serverManager.unregisterListener(listener);
180 }
181 }
182
183 @Test(timeout=60000)
184 public void testAssignmentListener() throws IOException, InterruptedException {
185 AssignmentManager am = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager();
186 HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
187
188 DummyAssignmentListener listener = new DummyAssignmentListener();
189 am.registerListener(listener);
190 try {
191 final String TABLE_NAME_STR = "testtb";
192 final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
193 final byte[] FAMILY = Bytes.toBytes("cf");
194
195
196 LOG.info("Create Table");
197 TEST_UTIL.createTable(TABLE_NAME, FAMILY);
198 listener.awaitModifications(1);
199 assertEquals(1, listener.getLoadCount());
200 assertEquals(0, listener.getCloseCount());
201
202
203 HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME);
204 try {
205 for (int i = 0; i < 10; ++i) {
206 byte[] key = Bytes.toBytes("row-" + i);
207 Put put = new Put(key);
208 put.add(FAMILY, null, key);
209 table.put(put);
210 }
211 } finally {
212 table.close();
213 }
214
215
216 LOG.info("Split Table");
217 listener.reset();
218 admin.split(TABLE_NAME_STR, "row-3");
219 listener.awaitModifications(3);
220 assertEquals(2, listener.getLoadCount());
221 assertEquals(1, listener.getCloseCount());
222
223
224 MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
225 int mergeable = 0;
226 while (mergeable < 2) {
227 Thread.sleep(100);
228 admin.majorCompact(TABLE_NAME_STR);
229 mergeable = 0;
230 for (JVMClusterUtil.RegionServerThread regionThread: miniCluster.getRegionServerThreads()) {
231 for (HRegion region: regionThread.getRegionServer().getOnlineRegions(TABLE_NAME)) {
232 mergeable += region.isMergeable() ? 1 : 0;
233 }
234 }
235 }
236
237
238 LOG.info("Merge Regions");
239 listener.reset();
240 List<HRegionInfo> regions = admin.getTableRegions(TABLE_NAME);
241 assertEquals(2, regions.size());
242 admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(),
243 regions.get(1).getEncodedNameAsBytes(), true);
244 listener.awaitModifications(3);
245 assertEquals(1, admin.getTableRegions(TABLE_NAME).size());
246 assertEquals(1, listener.getLoadCount());
247 assertEquals(2, listener.getCloseCount());
248
249
250 LOG.info("Drop Table");
251 listener.reset();
252 TEST_UTIL.deleteTable(TABLE_NAME);
253 listener.awaitModifications(1);
254 assertEquals(0, listener.getLoadCount());
255 assertEquals(1, listener.getCloseCount());
256 } finally {
257 am.unregisterListener(listener);
258 }
259 }
260 }