1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24 import static org.mockito.Mockito.mock;
25 import static org.mockito.Mockito.when;
26
27 import java.util.List;
28 import java.util.concurrent.atomic.AtomicLong;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.HBaseConfiguration;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.MediumTests;
37 import org.apache.hadoop.hbase.ServerName;
38 import org.apache.hadoop.hbase.SplitLogCounters;
39 import org.apache.hadoop.hbase.SplitLogTask;
40 import org.apache.hadoop.hbase.Waiter;
41 import org.apache.hadoop.hbase.executor.ExecutorService;
42 import org.apache.hadoop.hbase.executor.ExecutorType;
43 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
44 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
45 import org.apache.hadoop.hbase.util.CancelableProgressable;
46 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
47 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
48 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
49 import org.apache.log4j.Level;
50 import org.apache.log4j.Logger;
51 import org.apache.zookeeper.CreateMode;
52 import org.apache.zookeeper.ZooDefs.Ids;
53 import org.junit.After;
54 import org.junit.Before;
55 import org.junit.Test;
56 import org.junit.experimental.categories.Category;
57
58 @Category(MediumTests.class)
59 public class TestSplitLogWorker {
60 private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class);
61 private static final int WAIT_TIME = 15000;
62 private final ServerName MANAGER = ServerName.valueOf("manager,1,1");
63 static {
64 Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
65 }
66 private final static HBaseTestingUtility TEST_UTIL =
67 new HBaseTestingUtility();
68 private ZooKeeperWatcher zkw;
69 private SplitLogWorker slw;
70 private ExecutorService executorService;
71 private RecoveryMode mode;
72
73 private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)
74 throws Exception {
75 assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval,
76 waitForCounterBoolean(ctr, oldval, newval, timems));
77 }
78
79 private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval,
80 long timems) throws Exception {
81
82 return waitForCounterBoolean(ctr, oldval, newval, timems, true);
83 }
84
85 private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, final long newval,
86 long timems, boolean failIfTimeout) throws Exception {
87
88 long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout,
89 new Waiter.Predicate<Exception>() {
90 @Override
91 public boolean evaluate() throws Exception {
92 return (ctr.get() >= newval);
93 }
94 });
95
96 if( timeWaited > 0) {
97
98 assertEquals(newval, ctr.get());
99 }
100 return true;
101 }
102
103 @Before
104 public void setup() throws Exception {
105 TEST_UTIL.startMiniZKCluster();
106 Configuration conf = TEST_UTIL.getConfiguration();
107 zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
108 "split-log-worker-tests", null);
109 ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
110 ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
111 assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
112 LOG.debug(zkw.baseZNode + " created");
113 ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
114 assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
115 LOG.debug(zkw.splitLogZNode + " created");
116 ZKUtil.createAndFailSilent(zkw, zkw.rsZNode);
117 assertTrue(ZKUtil.checkExists(zkw, zkw.rsZNode) != -1);
118 SplitLogCounters.resetCounters();
119 executorService = new ExecutorService("TestSplitLogWorker");
120 executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
121 this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
122 RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
123 }
124
125 @After
126 public void teardown() throws Exception {
127 if (executorService != null) {
128 executorService.shutdown();
129 }
130 TEST_UTIL.shutdownMiniZKCluster();
131 }
132
133 SplitLogWorker.TaskExecutor neverEndingTask =
134 new SplitLogWorker.TaskExecutor() {
135
136 @Override
137 public Status exec(String name, RecoveryMode mode, CancelableProgressable p) {
138 while (true) {
139 try {
140 Thread.sleep(1000);
141 } catch (InterruptedException e) {
142 return Status.PREEMPTED;
143 }
144 if (!p.progress()) {
145 return Status.PREEMPTED;
146 }
147 }
148 }
149
150 };
151
152 @Test(timeout=60000)
153 public void testAcquireTaskAtStartup() throws Exception {
154 LOG.info("testAcquireTaskAtStartup");
155 SplitLogCounters.resetCounters();
156 final String TATAS = "tatas";
157 final ServerName RS = ServerName.valueOf("rs,1,1");
158 RegionServerServices mockedRS = getRegionServer(RS);
159 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
160 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
161 Ids.OPEN_ACL_UNSAFE,
162 CreateMode.PERSISTENT);
163
164 SplitLogWorker slw =
165 new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
166 slw.start();
167 try {
168 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
169 byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS));
170 SplitLogTask slt = SplitLogTask.parseFrom(bytes);
171 assertTrue(slt.isOwned(RS));
172 } finally {
173 stopSplitLogWorker(slw);
174 }
175 }
176
177 private void stopSplitLogWorker(final SplitLogWorker slw)
178 throws InterruptedException {
179 if (slw != null) {
180 slw.stop();
181 slw.worker.join(WAIT_TIME);
182 if (slw.worker.isAlive()) {
183 assertTrue(("Could not stop the worker thread slw=" + slw) == null);
184 }
185 }
186 }
187
188 @Test(timeout=60000)
189 public void testRaceForTask() throws Exception {
190 LOG.info("testRaceForTask");
191 SplitLogCounters.resetCounters();
192 final String TRFT = "trft";
193 final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
194 final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
195 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
196 new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(),
197 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
198 RegionServerServices mockedRS1 = getRegionServer(SVR1);
199 RegionServerServices mockedRS2 = getRegionServer(SVR2);
200 SplitLogWorker slw1 =
201 new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
202 SplitLogWorker slw2 =
203 new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
204 slw1.start();
205 slw2.start();
206 try {
207 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
208
209
210 assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1,
211 WAIT_TIME, false) ||
212 SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1);
213 byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
214 SplitLogTask slt = SplitLogTask.parseFrom(bytes);
215 assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2));
216 } finally {
217 stopSplitLogWorker(slw1);
218 stopSplitLogWorker(slw2);
219 }
220 }
221
222 @Test(timeout=60000)
223 public void testPreemptTask() throws Exception {
224 LOG.info("testPreemptTask");
225 SplitLogCounters.resetCounters();
226 final ServerName SRV = ServerName.valueOf("tpt_svr,1,1");
227 final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
228 RegionServerServices mockedRS = getRegionServer(SRV);
229 SplitLogWorker slw =
230 new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
231 slw.start();
232 try {
233 Thread.yield();
234 Thread.sleep(1000);
235 waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
236
237
238 zkw.getRecoverableZooKeeper().create(PATH,
239 new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(),
240 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
241
242 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
243 assertEquals(1, slw.taskReadySeq);
244 byte [] bytes = ZKUtil.getData(zkw, PATH);
245 SplitLogTask slt = SplitLogTask.parseFrom(bytes);
246 assertTrue(slt.isOwned(SRV));
247 slt = new SplitLogTask.Owned(MANAGER, this.mode);
248 ZKUtil.setData(zkw, PATH, slt.toByteArray());
249 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
250 } finally {
251 stopSplitLogWorker(slw);
252 }
253 }
254
255 @Test(timeout=60000)
256 public void testMultipleTasks() throws Exception {
257 LOG.info("testMultipleTasks");
258 SplitLogCounters.resetCounters();
259 final ServerName SRV = ServerName.valueOf("tmt_svr,1,1");
260 final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
261 RegionServerServices mockedRS = getRegionServer(SRV);
262 SplitLogWorker slw =
263 new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
264 slw.start();
265 try {
266 Thread.yield();
267 Thread.sleep(100);
268 waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
269
270 SplitLogTask unassignedManager =
271 new SplitLogTask.Unassigned(MANAGER, this.mode);
272 zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
273 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
274
275 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
276
277
278
279 final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2");
280 zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(),
281 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
282
283
284 final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
285 SplitLogTask slt = new SplitLogTask.Owned(anotherWorker, this.mode);
286 ZKUtil.setData(zkw, PATH1, slt.toByteArray());
287 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
288
289 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
290 assertEquals(2, slw.taskReadySeq);
291 byte [] bytes = ZKUtil.getData(zkw, PATH2);
292 slt = SplitLogTask.parseFrom(bytes);
293 assertTrue(slt.isOwned(SRV));
294 } finally {
295 stopSplitLogWorker(slw);
296 }
297 }
298
299 @Test(timeout=60000)
300 public void testRescan() throws Exception {
301 LOG.info("testRescan");
302 SplitLogCounters.resetCounters();
303 final ServerName SRV = ServerName.valueOf("svr,1,1");
304 RegionServerServices mockedRS = getRegionServer(SRV);
305 slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
306 slw.start();
307 Thread.yield();
308 Thread.sleep(100);
309
310 String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
311 SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER, this.mode);
312 zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
313 CreateMode.PERSISTENT);
314
315 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
316
317
318
319 ZKUtil.setData(zkw, task, slt.toByteArray());
320 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
321
322
323 String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
324 rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
325 CreateMode.PERSISTENT_SEQUENTIAL);
326
327 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
328
329
330
331 ZKUtil.setData(zkw, task, slt.toByteArray());
332 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME);
333 waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME);
334
335 List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
336 LOG.debug(nodes);
337 int num = 0;
338 for (String node : nodes) {
339 num++;
340 if (node.startsWith("RESCAN")) {
341 String name = ZKSplitLog.getEncodedNodeName(zkw, node);
342 String fn = ZKSplitLog.getFileName(name);
343 byte [] data = ZKUtil.getData(zkw, ZKUtil.joinZNode(zkw.splitLogZNode, fn));
344 slt = SplitLogTask.parseFrom(data);
345 assertTrue(slt.toString(), slt.isDone(SRV));
346 }
347 }
348 assertEquals(2, num);
349 }
350
351 @Test(timeout=60000)
352 public void testAcquireMultiTasks() throws Exception {
353 LOG.info("testAcquireMultiTasks");
354 SplitLogCounters.resetCounters();
355 final String TATAS = "tatas";
356 final ServerName RS = ServerName.valueOf("rs,1,1");
357 final int maxTasks = 3;
358 Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
359 testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
360 RegionServerServices mockedRS = getRegionServer(RS);
361
362 for (int i = 0; i < maxTasks; i++) {
363 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
364 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
365 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
366 }
367
368 SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
369 slw.start();
370 try {
371 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME);
372 for (int i = 0; i < maxTasks; i++) {
373 byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
374 SplitLogTask slt = SplitLogTask.parseFrom(bytes);
375 assertTrue(slt.isOwned(RS));
376 }
377 } finally {
378 stopSplitLogWorker(slw);
379 }
380 }
381
382
383
384
385
386
387 @Test(timeout=60000)
388 public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception {
389 LOG.info("testAcquireMultiTasks");
390 SplitLogCounters.resetCounters();
391 final String TATAS = "tatas";
392 final ServerName RS = ServerName.valueOf("rs,1,1");
393 final ServerName RS2 = ServerName.valueOf("rs,1,2");
394 final int maxTasks = 3;
395 Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
396 testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
397 RegionServerServices mockedRS = getRegionServer(RS);
398
399
400 String rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS.getServerName());
401 zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
402 rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS2.getServerName());
403 zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
404
405 for (int i = 0; i < maxTasks; i++) {
406 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
407 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
408 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
409 }
410
411 SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
412 slw.start();
413 try {
414 int acquiredTasks = 0;
415 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, WAIT_TIME);
416 for (int i = 0; i < maxTasks; i++) {
417 byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
418 SplitLogTask slt = SplitLogTask.parseFrom(bytes);
419 if (slt.isOwned(RS)) {
420 acquiredTasks++;
421 }
422 }
423 assertEquals(2, acquiredTasks);
424 } finally {
425 stopSplitLogWorker(slw);
426 }
427 }
428
429
430
431
432
433
434 private RegionServerServices getRegionServer(ServerName name) {
435
436 RegionServerServices mockedServer = mock(RegionServerServices.class);
437 when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
438 when(mockedServer.getServerName()).thenReturn(name);
439 when(mockedServer.getZooKeeper()).thenReturn(zkw);
440 when(mockedServer.isStopped()).thenReturn(false);
441 when(mockedServer.getExecutorService()).thenReturn(executorService);
442
443 return mockedServer;
444 }
445
446 }