1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase;
19
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
25 import org.apache.hadoop.hbase.executor.EventType;
26 import org.apache.hadoop.hbase.executor.ExecutorService;
27 import org.apache.hadoop.hbase.executor.ExecutorType;
28 import org.apache.hadoop.hbase.master.AssignmentManager;
29 import org.apache.hadoop.hbase.master.HMaster;
30 import org.apache.hadoop.hbase.master.LoadBalancer;
31 import org.apache.hadoop.hbase.master.MasterServices;
32 import org.apache.hadoop.hbase.master.RegionPlan;
33 import org.apache.hadoop.hbase.master.RegionState;
34 import org.apache.hadoop.hbase.master.ServerManager;
35 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
36 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
37 import org.apache.hadoop.hbase.testclassification.MediumTests;
38 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
39 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
40 import org.junit.AfterClass;
41 import org.junit.BeforeClass;
42 import org.junit.Test;
43 import org.junit.experimental.categories.Category;
44 import org.mockito.Mockito;
45
46 import java.util.ArrayList;
47 import java.util.HashMap;
48 import java.util.HashSet;
49 import java.util.List;
50 import java.util.Map;
51 import java.util.Map.Entry;
52 import java.util.Set;
53
54 import static org.junit.Assert.assertFalse;
55 import static org.junit.Assert.assertNotEquals;
56 import static org.junit.Assert.assertTrue;
57
58
59
60
61
62 @Category(MediumTests.class)
63 public class TestDrainingServer {
64 private static final Log LOG = LogFactory.getLog(TestDrainingServer.class);
65 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
66 private Abortable abortable = new Abortable() {
67 @Override
68 public boolean isAborted() {
69 return false;
70 }
71
72 @Override
73 public void abort(String why, Throwable e) {
74 }
75 };
76
77 @AfterClass
78 public static void afterClass() throws Exception {
79 TEST_UTIL.shutdownMiniZKCluster();
80 }
81
82 @BeforeClass
83 public static void beforeClass() throws Exception {
84 TEST_UTIL.getConfiguration().setBoolean("hbase.assignment.usezk", true);
85 TEST_UTIL.startMiniZKCluster();
86 }
87
88 @Test
89 public void testAssignmentManagerDoesntUseDrainingServer() throws Exception {
90 AssignmentManager am;
91 Configuration conf = TEST_UTIL.getConfiguration();
92 final HMaster master = Mockito.mock(HMaster.class);
93 final MasterServices server = Mockito.mock(MasterServices.class);
94 final ServerManager serverManager = Mockito.mock(ServerManager.class);
95 final ServerName SERVERNAME_A = ServerName.valueOf("mockserver_a.org", 1000, 8000);
96 final ServerName SERVERNAME_B = ServerName.valueOf("mockserver_b.org", 1001, 8000);
97 LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(conf);
98 final HRegionInfo REGIONINFO = new HRegionInfo(TableName.valueOf("table_test"),
99 HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
100
101 ZooKeeperWatcher zkWatcher = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
102 "zkWatcher-Test", abortable, true);
103
104 Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>();
105
106 onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD);
107 onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD);
108
109 Mockito.when(server.getConfiguration()).thenReturn(conf);
110 Mockito.when(server.getServerName()).thenReturn(ServerName.valueOf("masterMock,1,1"));
111 Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher);
112
113 CoordinatedStateManager cp = new ZkCoordinatedStateManager();
114 cp.initialize(server);
115 cp.start();
116
117 Mockito.when(server.getCoordinatedStateManager()).thenReturn(cp);
118
119 Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers);
120 Mockito.when(serverManager.getOnlineServersList())
121 .thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
122
123 Mockito.when(serverManager.createDestinationServersList())
124 .thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
125 Mockito.when(serverManager.createDestinationServersList(null))
126 .thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
127
128 for (ServerName sn : onlineServers.keySet()) {
129 Mockito.when(serverManager.isServerOnline(sn)).thenReturn(true);
130 Mockito.when(serverManager.sendRegionClose(sn, REGIONINFO, -1)).thenReturn(true);
131 Mockito.when(serverManager.sendRegionClose(sn, REGIONINFO, -1, null, false)).thenReturn(true);
132 Mockito.when(serverManager.sendRegionOpen(sn, REGIONINFO, -1, new ArrayList<ServerName>()))
133 .thenReturn(RegionOpeningState.OPENED);
134 Mockito.when(serverManager.sendRegionOpen(sn, REGIONINFO, -1, null))
135 .thenReturn(RegionOpeningState.OPENED);
136 Mockito.when(serverManager.addServerToDrainList(sn)).thenReturn(true);
137 }
138
139 Mockito.when(master.getServerManager()).thenReturn(serverManager);
140
141 am = new AssignmentManager(server, serverManager,
142 balancer, startupMasterExecutor("mockExecutorService"), null, null);
143
144 Mockito.when(master.getAssignmentManager()).thenReturn(am);
145 Mockito.when(master.getZooKeeper()).thenReturn(zkWatcher);
146
147 am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_A));
148
149 zkWatcher.registerListenerFirst(am);
150
151 addServerToDrainedList(SERVERNAME_A, onlineServers, serverManager);
152
153 am.assign(REGIONINFO, true);
154
155 setRegionOpenedOnZK(zkWatcher, SERVERNAME_A, REGIONINFO);
156 setRegionOpenedOnZK(zkWatcher, SERVERNAME_B, REGIONINFO);
157
158 am.waitForAssignment(REGIONINFO);
159
160 assertTrue(am.getRegionStates().isRegionOnline(REGIONINFO));
161 assertNotEquals(am.getRegionStates().getRegionServerOfRegion(REGIONINFO), SERVERNAME_A);
162 }
163
164 @Test
165 public void testAssignmentManagerDoesntUseDrainedServerWithBulkAssign() throws Exception {
166 Configuration conf = TEST_UTIL.getConfiguration();
167 LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(conf);
168 AssignmentManager am;
169 final HMaster master = Mockito.mock(HMaster.class);
170 final MasterServices server = Mockito.mock(MasterServices.class);
171 final ServerManager serverManager = Mockito.mock(ServerManager.class);
172 final ServerName SERVERNAME_A = ServerName.valueOf("mockserverbulk_a.org", 1000, 8000);
173 final ServerName SERVERNAME_B = ServerName.valueOf("mockserverbulk_b.org", 1001, 8000);
174 final ServerName SERVERNAME_C = ServerName.valueOf("mockserverbulk_c.org", 1002, 8000);
175 final ServerName SERVERNAME_D = ServerName.valueOf("mockserverbulk_d.org", 1003, 8000);
176 final ServerName SERVERNAME_E = ServerName.valueOf("mockserverbulk_e.org", 1004, 8000);
177 final Map<HRegionInfo, ServerName> bulk = new HashMap<HRegionInfo, ServerName>();
178
179 Set<ServerName> bunchServersAssigned = new HashSet<ServerName>();
180
181 HRegionInfo REGIONINFO_A = new HRegionInfo(TableName.valueOf("table_A"),
182 HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
183 HRegionInfo REGIONINFO_B = new HRegionInfo(TableName.valueOf("table_B"),
184 HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
185 HRegionInfo REGIONINFO_C = new HRegionInfo(TableName.valueOf("table_C"),
186 HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
187 HRegionInfo REGIONINFO_D = new HRegionInfo(TableName.valueOf("table_D"),
188 HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
189 HRegionInfo REGIONINFO_E = new HRegionInfo(TableName.valueOf("table_E"),
190 HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
191
192 Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>();
193 List<ServerName> drainedServers = new ArrayList<ServerName>();
194
195 onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD);
196 onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD);
197 onlineServers.put(SERVERNAME_C, ServerLoad.EMPTY_SERVERLOAD);
198 onlineServers.put(SERVERNAME_D, ServerLoad.EMPTY_SERVERLOAD);
199 onlineServers.put(SERVERNAME_E, ServerLoad.EMPTY_SERVERLOAD);
200
201 bulk.put(REGIONINFO_A, SERVERNAME_A);
202 bulk.put(REGIONINFO_B, SERVERNAME_B);
203 bulk.put(REGIONINFO_C, SERVERNAME_C);
204 bulk.put(REGIONINFO_D, SERVERNAME_D);
205 bulk.put(REGIONINFO_E, SERVERNAME_E);
206
207 ZooKeeperWatcher zkWatcher = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
208 "zkWatcher-BulkAssignTest", abortable, true);
209
210 Mockito.when(server.getConfiguration()).thenReturn(conf);
211 Mockito.when(server.getServerName()).thenReturn(ServerName.valueOf("masterMock,1,1"));
212 Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher);
213
214 CoordinatedStateManager cp = new ZkCoordinatedStateManager();
215 cp.initialize(server);
216 cp.start();
217
218 Mockito.when(server.getCoordinatedStateManager()).thenReturn(cp);
219
220 Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers);
221 Mockito.when(serverManager.getOnlineServersList()).thenReturn(
222 new ArrayList<ServerName>(onlineServers.keySet()));
223
224 Mockito.when(serverManager.createDestinationServersList()).thenReturn(
225 new ArrayList<ServerName>(onlineServers.keySet()));
226 Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(
227 new ArrayList<ServerName>(onlineServers.keySet()));
228
229 for (Entry<HRegionInfo, ServerName> entry : bulk.entrySet()) {
230 Mockito.when(serverManager.isServerOnline(entry.getValue())).thenReturn(true);
231 Mockito.when(serverManager.sendRegionClose(entry.getValue(),
232 entry.getKey(), -1)).thenReturn(true);
233 Mockito.when(serverManager.sendRegionOpen(entry.getValue(),
234 entry.getKey(), -1, null)).thenReturn(RegionOpeningState.OPENED);
235 Mockito.when(serverManager.addServerToDrainList(entry.getValue())).thenReturn(true);
236 }
237
238 Mockito.when(master.getServerManager()).thenReturn(serverManager);
239
240 drainedServers.add(SERVERNAME_A);
241 drainedServers.add(SERVERNAME_B);
242 drainedServers.add(SERVERNAME_C);
243 drainedServers.add(SERVERNAME_D);
244
245 am = new AssignmentManager(server, serverManager,
246 balancer, startupMasterExecutor("mockExecutorServiceBulk"), null, null);
247
248 Mockito.when(master.getAssignmentManager()).thenReturn(am);
249
250 zkWatcher.registerListener(am);
251
252 for (ServerName drained : drainedServers) {
253 addServerToDrainedList(drained, onlineServers, serverManager);
254 }
255
256 am.assign(bulk);
257
258 Map<String, RegionState> regionsInTransition = am.getRegionStates().getRegionsInTransition();
259 for (Entry<String, RegionState> entry : regionsInTransition.entrySet()) {
260 setRegionOpenedOnZK(zkWatcher, entry.getValue().getServerName(),
261 entry.getValue().getRegion());
262 }
263
264 am.waitForAssignment(REGIONINFO_A);
265 am.waitForAssignment(REGIONINFO_B);
266 am.waitForAssignment(REGIONINFO_C);
267 am.waitForAssignment(REGIONINFO_D);
268 am.waitForAssignment(REGIONINFO_E);
269
270 Map<HRegionInfo, ServerName> regionAssignments = am.getRegionStates().getRegionAssignments();
271 for (Entry<HRegionInfo, ServerName> entry : regionAssignments.entrySet()) {
272 LOG.info("Region Assignment: "
273 + entry.getKey().getRegionNameAsString() + " Server: " + entry.getValue());
274 bunchServersAssigned.add(entry.getValue());
275 }
276
277 for (ServerName sn : drainedServers) {
278 assertFalse(bunchServersAssigned.contains(sn));
279 }
280 }
281
282 private void addServerToDrainedList(ServerName serverName,
283 Map<ServerName, ServerLoad> onlineServers, ServerManager serverManager) {
284 onlineServers.remove(serverName);
285 List<ServerName> availableServers = new ArrayList<ServerName>(onlineServers.keySet());
286 Mockito.when(serverManager.createDestinationServersList()).thenReturn(availableServers);
287 Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(availableServers);
288 }
289
290 private void setRegionOpenedOnZK(final ZooKeeperWatcher zkWatcher, final ServerName serverName,
291 HRegionInfo hregionInfo) throws Exception {
292 int version = ZKAssign.getVersion(zkWatcher, hregionInfo);
293 int versionTransition = ZKAssign.transitionNode(zkWatcher,
294 hregionInfo, serverName, EventType.M_ZK_REGION_OFFLINE,
295 EventType.RS_ZK_REGION_OPENING, version);
296 ZKAssign.transitionNodeOpened(zkWatcher, hregionInfo, serverName, versionTransition);
297 }
298
299 private ExecutorService startupMasterExecutor(final String name) {
300 ExecutorService executor = new ExecutorService(name);
301 executor.startExecutorService(ExecutorType.MASTER_OPEN_REGION, 3);
302 executor.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, 3);
303 executor.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, 3);
304 executor.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, 3);
305 return executor;
306 }
307 }