View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
22  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_get_data_nonode;
23  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
24  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_log_split_batch_success;
25  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
26  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_result;
27  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
28  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
29  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
30  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit;
31  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task;
32  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed;
33  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force;
34  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached;
35  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned;
36  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
37  import static org.junit.Assert.assertEquals;
38  import static org.junit.Assert.assertFalse;
39  import static org.junit.Assert.assertTrue;
40  
41  import java.io.IOException;
42  import java.util.List;
43  import java.util.UUID;
44  import java.util.concurrent.atomic.AtomicLong;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.hbase.HBaseConfiguration;
52  import org.apache.hadoop.hbase.HBaseTestingUtility;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HRegionInfo;
55  import org.apache.hadoop.hbase.MediumTests;
56  import org.apache.hadoop.hbase.ServerName;
57  import org.apache.hadoop.hbase.SplitLogCounters;
58  import org.apache.hadoop.hbase.SplitLogTask;
59  import org.apache.hadoop.hbase.Stoppable;
60  import org.apache.hadoop.hbase.Waiter;
61  import org.apache.hadoop.hbase.master.SplitLogManager.Task;
62  import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
63  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
64  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
65  import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
66  import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
67  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
68  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
69  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
70  import org.apache.log4j.Level;
71  import org.apache.log4j.Logger;
72  import org.apache.zookeeper.CreateMode;
73  import org.apache.zookeeper.KeeperException;
74  import org.apache.zookeeper.ZooDefs.Ids;
75  import org.junit.After;
76  import org.junit.Assert;
77  import org.junit.Before;
78  import org.junit.Test;
79  import org.junit.experimental.categories.Category;
80  import org.mockito.Mockito;
81  
82  @Category(MediumTests.class)
83  public class TestSplitLogManager {
84    private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
85    private final ServerName DUMMY_MASTER = ServerName.valueOf("dummy-master,1,1");
86    private final ServerManager sm = Mockito.mock(ServerManager.class);
87    private final MasterServices master =  Mockito.mock(MasterServices.class);
88  
89    static {
90      Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
91    }
92  
93    private ZooKeeperWatcher zkw;
94    private static boolean stopped = false;
95    private SplitLogManager slm;
96    private Configuration conf;
97    private int to;
98    private RecoveryMode mode;
99  
100   private static HBaseTestingUtility TEST_UTIL;
101 
102   static Stoppable stopper = new Stoppable() {
103     @Override
104     public void stop(String why) {
105       stopped = true;
106     }
107 
108     @Override
109     public boolean isStopped() {
110       return stopped;
111     }
112 
113   };
114 
115   @Before
116   public void setup() throws Exception {
117     TEST_UTIL = new HBaseTestingUtility();
118     TEST_UTIL.startMiniZKCluster();
119     conf = TEST_UTIL.getConfiguration();
120     // Use a different ZK wrapper instance for each tests.
121     zkw = new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null);
122     ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
123     ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
124     assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
125     LOG.debug(zkw.baseZNode + " created");
126     ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
127     assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
128     LOG.debug(zkw.splitLogZNode + " created");
129 
130     stopped = false;
131     resetCounters();
132 
133     // By default, we let the test manage the error as before, so the server
134     //  does not appear as dead from the master point of view, only from the split log pov.
135     Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true);
136     Mockito.when(master.getServerManager()).thenReturn(sm);
137 
138     to = 6000;
139     conf.setInt("hbase.splitlog.manager.timeout", to);
140     conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
141     conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
142     conf.setInt("hfile.format.version", 3);
143     to = to + 4 * 100;
144     
145     this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
146         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
147   }
148 
149   @After
150   public void teardown() throws IOException, KeeperException {
151     stopper.stop("");
152     slm.stop();
153     TEST_UTIL.shutdownMiniZKCluster();
154   }
155 
156   private interface Expr {
157     long eval();
158   }
159 
160   private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems)
161       throws Exception {
162     Expr e = new Expr() {
163       public long eval() {
164         return ctr.get();
165       }
166     };
167     waitForCounter(e, oldval, newval, timems);
168     return;
169   }
170 
171   private void waitForCounter(final Expr e, final long oldval, long newval, long timems)
172       throws Exception {
173 
174     TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() {
175         @Override
176         public boolean evaluate() throws Exception {
177             return (e.eval() != oldval);
178         }
179     });
180 
181     assertEquals(newval, e.eval());
182   }
183 
184   private String submitTaskAndWait(TaskBatch batch, String name)
185   throws KeeperException, InterruptedException {
186     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
187     NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
188     zkw.registerListener(listener);
189     ZKUtil.watchAndCheckExists(zkw, tasknode);
190 
191     slm.enqueueSplitTask(name, batch);
192     assertEquals(1, batch.installed);
193     assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch);
194     assertEquals(1L, tot_mgr_node_create_queued.get());
195 
196     LOG.debug("waiting for task node creation");
197     listener.waitForCreation();
198     LOG.debug("task created");
199     return tasknode;
200   }
201 
202   /**
203    * Test whether the splitlog correctly creates a task in zookeeper
204    * @throws Exception
205    */
206   @Test
207   public void testTaskCreation() throws Exception {
208 
209     LOG.info("TestTaskCreation - test the creation of a task in zk");
210     slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
211     TaskBatch batch = new TaskBatch();
212 
213     String tasknode = submitTaskAndWait(batch, "foo/1");
214 
215     byte[] data = ZKUtil.getData(zkw, tasknode);
216     SplitLogTask slt = SplitLogTask.parseFrom(data);
217     LOG.info("Task node created " + slt.toString());
218     assertTrue(slt.isUnassigned(DUMMY_MASTER));
219   }
220 
221   @Test
222   public void testOrphanTaskAcquisition() throws Exception {
223     LOG.info("TestOrphanTaskAcquisition");
224 
225     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
226     SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode);
227     zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
228         CreateMode.PERSISTENT);
229 
230     slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
231     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
232     Task task = slm.findOrCreateOrphanTask(tasknode);
233     assertTrue(task.isOrphan());
234     waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
235     assertFalse(task.isUnassigned());
236     long curt = System.currentTimeMillis();
237     assertTrue((task.last_update <= curt) &&
238         (task.last_update > (curt - 1000)));
239     LOG.info("waiting for manager to resubmit the orphan task");
240     waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
241     assertTrue(task.isUnassigned());
242     waitForCounter(tot_mgr_rescan, 0, 1, to + to/2);
243   }
244 
245   @Test
246   public void testUnassignedOrphan() throws Exception {
247     LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
248         " startup");
249     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
250     //create an unassigned orphan task
251     SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, this.mode);
252     zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
253         CreateMode.PERSISTENT);
254     int version = ZKUtil.checkExists(zkw, tasknode);
255 
256     slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
257     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
258     Task task = slm.findOrCreateOrphanTask(tasknode);
259     assertTrue(task.isOrphan());
260     assertTrue(task.isUnassigned());
261     // wait for RESCAN node to be created
262     waitForCounter(tot_mgr_rescan, 0, 1, to/2);
263     Task task2 = slm.findOrCreateOrphanTask(tasknode);
264     assertTrue(task == task2);
265     LOG.debug("task = " + task);
266     assertEquals(1L, tot_mgr_resubmit.get());
267     assertEquals(1, task.incarnation);
268     assertEquals(0, task.unforcedResubmits.get());
269     assertTrue(task.isOrphan());
270     assertTrue(task.isUnassigned());
271     assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
272   }
273 
274   @Test
275   public void testMultipleResubmits() throws Exception {
276     LOG.info("TestMultipleResbmits - no indefinite resubmissions");
277 
278     conf.setInt("hbase.splitlog.max.resubmit", 2);
279     slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
280     TaskBatch batch = new TaskBatch();
281 
282     String tasknode = submitTaskAndWait(batch, "foo/1");
283     int version = ZKUtil.checkExists(zkw, tasknode);
284     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
285     final ServerName worker2 = ServerName.valueOf("worker2,1,1");
286     final ServerName worker3 = ServerName.valueOf("worker3,1,1");
287     SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
288     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
289     waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
290     waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
291     int version1 = ZKUtil.checkExists(zkw, tasknode);
292     assertTrue(version1 > version);
293     slt = new SplitLogTask.Owned(worker2, this.mode);
294     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
295     waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
296     waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
297     int version2 = ZKUtil.checkExists(zkw, tasknode);
298     assertTrue(version2 > version1);
299     slt = new SplitLogTask.Owned(worker3, this.mode);
300     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
301     waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
302     waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
303     Thread.sleep(to + to/2);
304     assertEquals(2L, tot_mgr_resubmit.get() - tot_mgr_resubmit_force.get());
305   }
306 
307   @Test
308   public void testRescanCleanup() throws Exception {
309     LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
310 
311     slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
312     TaskBatch batch = new TaskBatch();
313 
314     String tasknode = submitTaskAndWait(batch, "foo/1");
315     int version = ZKUtil.checkExists(zkw, tasknode);
316     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
317     SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
318     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
319     waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
320     waitForCounter(new Expr() {
321       @Override
322       public long eval() {
323         return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get());
324       }
325     }, 0, 1, 5*60000); // wait long enough
326     Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.get());
327     int version1 = ZKUtil.checkExists(zkw, tasknode);
328     assertTrue(version1 > version);
329     byte[] taskstate = ZKUtil.getData(zkw, tasknode);
330     slt = SplitLogTask.parseFrom(taskstate);
331     assertTrue(slt.isUnassigned(DUMMY_MASTER));
332 
333     waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2);
334   }
335 
336   @Test
337   public void testTaskDone() throws Exception {
338     LOG.info("TestTaskDone - cleanup task node once in DONE state");
339 
340     slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
341     TaskBatch batch = new TaskBatch();
342     String tasknode = submitTaskAndWait(batch, "foo/1");
343     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
344     SplitLogTask slt = new SplitLogTask.Done(worker1, this.mode);
345     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
346     synchronized (batch) {
347       while (batch.installed != batch.done) {
348         batch.wait();
349       }
350     }
351     waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
352     assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
353   }
354 
355   @Test
356   public void testTaskErr() throws Exception {
357     LOG.info("TestTaskErr - cleanup task node once in ERR state");
358 
359     conf.setInt("hbase.splitlog.max.resubmit", 0);
360     slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
361     TaskBatch batch = new TaskBatch();
362 
363     String tasknode = submitTaskAndWait(batch, "foo/1");
364     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
365     SplitLogTask slt = new SplitLogTask.Err(worker1, this.mode);
366     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
367 
368     synchronized (batch) {
369       while (batch.installed != batch.error) {
370         batch.wait();
371       }
372     }
373     waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
374     assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
375     conf.setInt("hbase.splitlog.max.resubmit", SplitLogManager.DEFAULT_MAX_RESUBMIT);
376   }
377 
378   @Test
379   public void testTaskResigned() throws Exception {
380     LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
381     assertEquals(tot_mgr_resubmit.get(), 0);
382     slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
383     assertEquals(tot_mgr_resubmit.get(), 0);
384     TaskBatch batch = new TaskBatch();
385     String tasknode = submitTaskAndWait(batch, "foo/1");
386     assertEquals(tot_mgr_resubmit.get(), 0);
387     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
388     assertEquals(tot_mgr_resubmit.get(), 0);
389     SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode);
390     assertEquals(tot_mgr_resubmit.get(), 0);
391     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
392     int version = ZKUtil.checkExists(zkw, tasknode);
393     // Could be small race here.
394     if (tot_mgr_resubmit.get() == 0) {
395       waitForCounter(tot_mgr_resubmit, 0, 1, to/2);
396     }
397     assertEquals(tot_mgr_resubmit.get(), 1);
398 
399     byte[] taskstate = ZKUtil.getData(zkw, tasknode);
400     slt = SplitLogTask.parseFrom(taskstate);
401     assertTrue(slt.isUnassigned(DUMMY_MASTER));
402   }
403 
404   @Test
405   public void testUnassignedTimeout() throws Exception {
406     LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
407         " resubmit");
408 
409     // create an orphan task in OWNED state
410     String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
411     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
412     SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
413     zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
414         CreateMode.PERSISTENT);
415 
416     slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
417     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
418 
419     // submit another task which will stay in unassigned mode
420     TaskBatch batch = new TaskBatch();
421     submitTaskAndWait(batch, "foo/1");
422 
423     // keep updating the orphan owned node every to/2 seconds
424     for (int i = 0; i < (3 * to)/100; i++) {
425       Thread.sleep(100);
426       final ServerName worker2 = ServerName.valueOf("worker1,1,1");
427       slt = new SplitLogTask.Owned(worker2, this.mode);
428       ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
429     }
430 
431     // since we have stopped heartbeating the owned node therefore it should
432     // get resubmitted
433     LOG.info("waiting for manager to resubmit the orphan task");
434     waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
435 
436     // now all the nodes are unassigned. manager should post another rescan
437     waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2);
438   }
439 
440   @Test
441   public void testDeadWorker() throws Exception {
442     LOG.info("testDeadWorker");
443 
444     conf.setLong("hbase.splitlog.max.resubmit", 0);
445     slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
446     TaskBatch batch = new TaskBatch();
447 
448     String tasknode = submitTaskAndWait(batch, "foo/1");
449     int version = ZKUtil.checkExists(zkw, tasknode);
450     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
451     SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
452     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
453     if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
454     slm.handleDeadWorker(worker1);
455     if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
456     if (tot_mgr_resubmit_dead_server_task.get() == 0) {
457       waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2);
458     }
459 
460     int version1 = ZKUtil.checkExists(zkw, tasknode);
461     assertTrue(version1 > version);
462     byte[] taskstate = ZKUtil.getData(zkw, tasknode);
463     slt = SplitLogTask.parseFrom(taskstate);
464     assertTrue(slt.isUnassigned(DUMMY_MASTER));
465     return;
466   }
467 
468   @Test
469   public void testWorkerCrash() throws Exception {
470     slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
471     TaskBatch batch = new TaskBatch();
472 
473     String tasknode = submitTaskAndWait(batch, "foo/1");
474     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
475 
476     SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
477     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
478     if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
479 
480     // Not yet resubmitted.
481     Assert.assertEquals(0, tot_mgr_resubmit.get());
482 
483     // This server becomes dead
484     Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
485 
486     Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
487 
488     // It has been resubmitted
489     Assert.assertEquals(1, tot_mgr_resubmit.get());
490   }
491 
492   @Test
493   public void testEmptyLogDir() throws Exception {
494     LOG.info("testEmptyLogDir");
495     slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
496     FileSystem fs = TEST_UTIL.getTestFileSystem();
497     Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
498         UUID.randomUUID().toString());
499     fs.mkdirs(emptyLogDirPath);
500     slm.splitLogDistributed(emptyLogDirPath);
501     assertFalse(fs.exists(emptyLogDirPath));
502   }
503 
504   /**
505    * The following test case is aiming to test the situation when distributedLogReplay is turned off
506    * and restart a cluster there should no recovery regions in ZK left.
507    * @throws Exception
508    */
509   @Test(timeout = 300000)
510   public void testRecoveryRegionRemovedFromZK() throws Exception {
511     LOG.info("testRecoveryRegionRemovedFromZK");
512     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
513     String nodePath =
514         ZKUtil.joinZNode(zkw.recoveringRegionsZNode,
515           HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
516     ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L));
517 
518     slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
519     slm.removeStaleRecoveringRegionsFromZK(null);
520 
521     List<String> recoveringRegions =
522         zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
523 
524     assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty());
525   }
526   
527   @Test(timeout=60000)
528   public void testGetPreviousRecoveryMode() throws Exception {
529     LOG.info("testGetPreviousRecoveryMode");
530     SplitLogCounters.resetCounters();
531     Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
532     testConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
533 
534     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"),
535       new SplitLogTask.Unassigned(
536         ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(),
537         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
538 
539     slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER);
540     assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_SPLITTING);
541     
542     zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1);
543     slm.setRecoveryMode(false);
544     assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_REPLAY);
545   }
546   
547 }