1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
22 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_get_data_nonode;
23 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
24 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_log_split_batch_success;
25 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
26 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_result;
27 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
28 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
29 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
30 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit;
31 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task;
32 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed;
33 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force;
34 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached;
35 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned;
36 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
37 import static org.junit.Assert.assertEquals;
38 import static org.junit.Assert.assertFalse;
39 import static org.junit.Assert.assertTrue;
40
41 import java.io.IOException;
42 import java.util.List;
43 import java.util.UUID;
44 import java.util.concurrent.atomic.AtomicLong;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.fs.FileSystem;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.hbase.HBaseConfiguration;
52 import org.apache.hadoop.hbase.HBaseTestingUtility;
53 import org.apache.hadoop.hbase.HConstants;
54 import org.apache.hadoop.hbase.HRegionInfo;
55 import org.apache.hadoop.hbase.MediumTests;
56 import org.apache.hadoop.hbase.ServerName;
57 import org.apache.hadoop.hbase.SplitLogCounters;
58 import org.apache.hadoop.hbase.SplitLogTask;
59 import org.apache.hadoop.hbase.Stoppable;
60 import org.apache.hadoop.hbase.Waiter;
61 import org.apache.hadoop.hbase.master.SplitLogManager.Task;
62 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
63 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
64 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
65 import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
66 import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
67 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
68 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
69 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
70 import org.apache.log4j.Level;
71 import org.apache.log4j.Logger;
72 import org.apache.zookeeper.CreateMode;
73 import org.apache.zookeeper.KeeperException;
74 import org.apache.zookeeper.ZooDefs.Ids;
75 import org.junit.After;
76 import org.junit.Assert;
77 import org.junit.Before;
78 import org.junit.Test;
79 import org.junit.experimental.categories.Category;
80 import org.mockito.Mockito;
81
82 @Category(MediumTests.class)
83 public class TestSplitLogManager {
84 private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
85 private final ServerName DUMMY_MASTER = ServerName.valueOf("dummy-master,1,1");
86 private final ServerManager sm = Mockito.mock(ServerManager.class);
87 private final MasterServices master = Mockito.mock(MasterServices.class);
88
89 static {
90 Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
91 }
92
93 private ZooKeeperWatcher zkw;
94 private static boolean stopped = false;
95 private SplitLogManager slm;
96 private Configuration conf;
97 private int to;
98 private RecoveryMode mode;
99
100 private static HBaseTestingUtility TEST_UTIL;
101
102 static Stoppable stopper = new Stoppable() {
103 @Override
104 public void stop(String why) {
105 stopped = true;
106 }
107
108 @Override
109 public boolean isStopped() {
110 return stopped;
111 }
112
113 };
114
115 @Before
116 public void setup() throws Exception {
117 TEST_UTIL = new HBaseTestingUtility();
118 TEST_UTIL.startMiniZKCluster();
119 conf = TEST_UTIL.getConfiguration();
120
121 zkw = new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null);
122 ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
123 ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
124 assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
125 LOG.debug(zkw.baseZNode + " created");
126 ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
127 assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
128 LOG.debug(zkw.splitLogZNode + " created");
129
130 stopped = false;
131 resetCounters();
132
133
134
135 Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true);
136 Mockito.when(master.getServerManager()).thenReturn(sm);
137
138 to = 6000;
139 conf.setInt("hbase.splitlog.manager.timeout", to);
140 conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
141 conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
142 conf.setInt("hfile.format.version", 3);
143 to = to + 4 * 100;
144
145 this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
146 RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
147 }
148
149 @After
150 public void teardown() throws IOException, KeeperException {
151 stopper.stop("");
152 slm.stop();
153 TEST_UTIL.shutdownMiniZKCluster();
154 }
155
156 private interface Expr {
157 long eval();
158 }
159
160 private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems)
161 throws Exception {
162 Expr e = new Expr() {
163 public long eval() {
164 return ctr.get();
165 }
166 };
167 waitForCounter(e, oldval, newval, timems);
168 return;
169 }
170
171 private void waitForCounter(final Expr e, final long oldval, long newval, long timems)
172 throws Exception {
173
174 TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() {
175 @Override
176 public boolean evaluate() throws Exception {
177 return (e.eval() != oldval);
178 }
179 });
180
181 assertEquals(newval, e.eval());
182 }
183
184 private String submitTaskAndWait(TaskBatch batch, String name)
185 throws KeeperException, InterruptedException {
186 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
187 NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
188 zkw.registerListener(listener);
189 ZKUtil.watchAndCheckExists(zkw, tasknode);
190
191 slm.enqueueSplitTask(name, batch);
192 assertEquals(1, batch.installed);
193 assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch);
194 assertEquals(1L, tot_mgr_node_create_queued.get());
195
196 LOG.debug("waiting for task node creation");
197 listener.waitForCreation();
198 LOG.debug("task created");
199 return tasknode;
200 }
201
202
203
204
205
206 @Test
207 public void testTaskCreation() throws Exception {
208
209 LOG.info("TestTaskCreation - test the creation of a task in zk");
210 slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
211 TaskBatch batch = new TaskBatch();
212
213 String tasknode = submitTaskAndWait(batch, "foo/1");
214
215 byte[] data = ZKUtil.getData(zkw, tasknode);
216 SplitLogTask slt = SplitLogTask.parseFrom(data);
217 LOG.info("Task node created " + slt.toString());
218 assertTrue(slt.isUnassigned(DUMMY_MASTER));
219 }
220
221 @Test
222 public void testOrphanTaskAcquisition() throws Exception {
223 LOG.info("TestOrphanTaskAcquisition");
224
225 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
226 SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode);
227 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
228 CreateMode.PERSISTENT);
229
230 slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
231 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
232 Task task = slm.findOrCreateOrphanTask(tasknode);
233 assertTrue(task.isOrphan());
234 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
235 assertFalse(task.isUnassigned());
236 long curt = System.currentTimeMillis();
237 assertTrue((task.last_update <= curt) &&
238 (task.last_update > (curt - 1000)));
239 LOG.info("waiting for manager to resubmit the orphan task");
240 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
241 assertTrue(task.isUnassigned());
242 waitForCounter(tot_mgr_rescan, 0, 1, to + to/2);
243 }
244
245 @Test
246 public void testUnassignedOrphan() throws Exception {
247 LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
248 " startup");
249 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
250
251 SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, this.mode);
252 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
253 CreateMode.PERSISTENT);
254 int version = ZKUtil.checkExists(zkw, tasknode);
255
256 slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
257 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
258 Task task = slm.findOrCreateOrphanTask(tasknode);
259 assertTrue(task.isOrphan());
260 assertTrue(task.isUnassigned());
261
262 waitForCounter(tot_mgr_rescan, 0, 1, to/2);
263 Task task2 = slm.findOrCreateOrphanTask(tasknode);
264 assertTrue(task == task2);
265 LOG.debug("task = " + task);
266 assertEquals(1L, tot_mgr_resubmit.get());
267 assertEquals(1, task.incarnation);
268 assertEquals(0, task.unforcedResubmits.get());
269 assertTrue(task.isOrphan());
270 assertTrue(task.isUnassigned());
271 assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
272 }
273
274 @Test
275 public void testMultipleResubmits() throws Exception {
276 LOG.info("TestMultipleResbmits - no indefinite resubmissions");
277
278 conf.setInt("hbase.splitlog.max.resubmit", 2);
279 slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
280 TaskBatch batch = new TaskBatch();
281
282 String tasknode = submitTaskAndWait(batch, "foo/1");
283 int version = ZKUtil.checkExists(zkw, tasknode);
284 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
285 final ServerName worker2 = ServerName.valueOf("worker2,1,1");
286 final ServerName worker3 = ServerName.valueOf("worker3,1,1");
287 SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
288 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
289 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
290 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
291 int version1 = ZKUtil.checkExists(zkw, tasknode);
292 assertTrue(version1 > version);
293 slt = new SplitLogTask.Owned(worker2, this.mode);
294 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
295 waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
296 waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
297 int version2 = ZKUtil.checkExists(zkw, tasknode);
298 assertTrue(version2 > version1);
299 slt = new SplitLogTask.Owned(worker3, this.mode);
300 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
301 waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
302 waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
303 Thread.sleep(to + to/2);
304 assertEquals(2L, tot_mgr_resubmit.get() - tot_mgr_resubmit_force.get());
305 }
306
307 @Test
308 public void testRescanCleanup() throws Exception {
309 LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
310
311 slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
312 TaskBatch batch = new TaskBatch();
313
314 String tasknode = submitTaskAndWait(batch, "foo/1");
315 int version = ZKUtil.checkExists(zkw, tasknode);
316 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
317 SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
318 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
319 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
320 waitForCounter(new Expr() {
321 @Override
322 public long eval() {
323 return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get());
324 }
325 }, 0, 1, 5*60000);
326 Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.get());
327 int version1 = ZKUtil.checkExists(zkw, tasknode);
328 assertTrue(version1 > version);
329 byte[] taskstate = ZKUtil.getData(zkw, tasknode);
330 slt = SplitLogTask.parseFrom(taskstate);
331 assertTrue(slt.isUnassigned(DUMMY_MASTER));
332
333 waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2);
334 }
335
336 @Test
337 public void testTaskDone() throws Exception {
338 LOG.info("TestTaskDone - cleanup task node once in DONE state");
339
340 slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
341 TaskBatch batch = new TaskBatch();
342 String tasknode = submitTaskAndWait(batch, "foo/1");
343 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
344 SplitLogTask slt = new SplitLogTask.Done(worker1, this.mode);
345 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
346 synchronized (batch) {
347 while (batch.installed != batch.done) {
348 batch.wait();
349 }
350 }
351 waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
352 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
353 }
354
355 @Test
356 public void testTaskErr() throws Exception {
357 LOG.info("TestTaskErr - cleanup task node once in ERR state");
358
359 conf.setInt("hbase.splitlog.max.resubmit", 0);
360 slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
361 TaskBatch batch = new TaskBatch();
362
363 String tasknode = submitTaskAndWait(batch, "foo/1");
364 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
365 SplitLogTask slt = new SplitLogTask.Err(worker1, this.mode);
366 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
367
368 synchronized (batch) {
369 while (batch.installed != batch.error) {
370 batch.wait();
371 }
372 }
373 waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
374 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
375 conf.setInt("hbase.splitlog.max.resubmit", SplitLogManager.DEFAULT_MAX_RESUBMIT);
376 }
377
378 @Test
379 public void testTaskResigned() throws Exception {
380 LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
381 assertEquals(tot_mgr_resubmit.get(), 0);
382 slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
383 assertEquals(tot_mgr_resubmit.get(), 0);
384 TaskBatch batch = new TaskBatch();
385 String tasknode = submitTaskAndWait(batch, "foo/1");
386 assertEquals(tot_mgr_resubmit.get(), 0);
387 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
388 assertEquals(tot_mgr_resubmit.get(), 0);
389 SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode);
390 assertEquals(tot_mgr_resubmit.get(), 0);
391 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
392 int version = ZKUtil.checkExists(zkw, tasknode);
393
394 if (tot_mgr_resubmit.get() == 0) {
395 waitForCounter(tot_mgr_resubmit, 0, 1, to/2);
396 }
397 assertEquals(tot_mgr_resubmit.get(), 1);
398
399 byte[] taskstate = ZKUtil.getData(zkw, tasknode);
400 slt = SplitLogTask.parseFrom(taskstate);
401 assertTrue(slt.isUnassigned(DUMMY_MASTER));
402 }
403
404 @Test
405 public void testUnassignedTimeout() throws Exception {
406 LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
407 " resubmit");
408
409
410 String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
411 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
412 SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
413 zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
414 CreateMode.PERSISTENT);
415
416 slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
417 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
418
419
420 TaskBatch batch = new TaskBatch();
421 submitTaskAndWait(batch, "foo/1");
422
423
424 for (int i = 0; i < (3 * to)/100; i++) {
425 Thread.sleep(100);
426 final ServerName worker2 = ServerName.valueOf("worker1,1,1");
427 slt = new SplitLogTask.Owned(worker2, this.mode);
428 ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
429 }
430
431
432
433 LOG.info("waiting for manager to resubmit the orphan task");
434 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
435
436
437 waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2);
438 }
439
440 @Test
441 public void testDeadWorker() throws Exception {
442 LOG.info("testDeadWorker");
443
444 conf.setLong("hbase.splitlog.max.resubmit", 0);
445 slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
446 TaskBatch batch = new TaskBatch();
447
448 String tasknode = submitTaskAndWait(batch, "foo/1");
449 int version = ZKUtil.checkExists(zkw, tasknode);
450 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
451 SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
452 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
453 if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
454 slm.handleDeadWorker(worker1);
455 if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
456 if (tot_mgr_resubmit_dead_server_task.get() == 0) {
457 waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2);
458 }
459
460 int version1 = ZKUtil.checkExists(zkw, tasknode);
461 assertTrue(version1 > version);
462 byte[] taskstate = ZKUtil.getData(zkw, tasknode);
463 slt = SplitLogTask.parseFrom(taskstate);
464 assertTrue(slt.isUnassigned(DUMMY_MASTER));
465 return;
466 }
467
468 @Test
469 public void testWorkerCrash() throws Exception {
470 slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
471 TaskBatch batch = new TaskBatch();
472
473 String tasknode = submitTaskAndWait(batch, "foo/1");
474 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
475
476 SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
477 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
478 if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
479
480
481 Assert.assertEquals(0, tot_mgr_resubmit.get());
482
483
484 Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
485
486 Thread.sleep(1300);
487
488
489 Assert.assertEquals(1, tot_mgr_resubmit.get());
490 }
491
492 @Test
493 public void testEmptyLogDir() throws Exception {
494 LOG.info("testEmptyLogDir");
495 slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
496 FileSystem fs = TEST_UTIL.getTestFileSystem();
497 Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
498 UUID.randomUUID().toString());
499 fs.mkdirs(emptyLogDirPath);
500 slm.splitLogDistributed(emptyLogDirPath);
501 assertFalse(fs.exists(emptyLogDirPath));
502 }
503
504
505
506
507
508
509 @Test(timeout = 300000)
510 public void testRecoveryRegionRemovedFromZK() throws Exception {
511 LOG.info("testRecoveryRegionRemovedFromZK");
512 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
513 String nodePath =
514 ZKUtil.joinZNode(zkw.recoveringRegionsZNode,
515 HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
516 ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L));
517
518 slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
519 slm.removeStaleRecoveringRegionsFromZK(null);
520
521 List<String> recoveringRegions =
522 zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
523
524 assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty());
525 }
526
527 @Test(timeout=60000)
528 public void testGetPreviousRecoveryMode() throws Exception {
529 LOG.info("testGetPreviousRecoveryMode");
530 SplitLogCounters.resetCounters();
531 Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
532 testConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
533
534 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"),
535 new SplitLogTask.Unassigned(
536 ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(),
537 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
538
539 slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER);
540 assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_SPLITTING);
541
542 zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1);
543 slm.setRecoveryMode(false);
544 assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_REPLAY);
545 }
546
547 }