View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
22  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
23  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
24  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
25  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
26  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
27  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit;
28  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task;
29  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed;
30  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force;
31  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached;
32  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned;
33  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
34  import static org.junit.Assert.assertEquals;
35  import static org.junit.Assert.assertFalse;
36  import static org.junit.Assert.assertTrue;
37  
38  import java.io.IOException;
39  import java.util.List;
40  import java.util.Map;
41  import java.util.UUID;
42  import java.util.concurrent.atomic.AtomicLong;
43  
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.hbase.ChoreService;
50  import org.apache.hadoop.hbase.CoordinatedStateManager;
51  import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
52  import org.apache.hadoop.hbase.HBaseTestingUtility;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HRegionInfo;
55  import org.apache.hadoop.hbase.testclassification.MediumTests;
56  import org.apache.hadoop.hbase.Server;
57  import org.apache.hadoop.hbase.ServerName;
58  import org.apache.hadoop.hbase.SplitLogCounters;
59  import org.apache.hadoop.hbase.SplitLogTask;
60  import org.apache.hadoop.hbase.Stoppable;
61  import org.apache.hadoop.hbase.Waiter;
62  import org.apache.hadoop.hbase.client.ClusterConnection;
63  import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
64  import org.apache.hadoop.hbase.master.SplitLogManager.Task;
65  import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
66  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
67  import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
68  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
69  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
70  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
71  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
72  import org.apache.log4j.Level;
73  import org.apache.log4j.Logger;
74  import org.apache.zookeeper.CreateMode;
75  import org.apache.zookeeper.KeeperException;
76  import org.apache.zookeeper.ZooDefs.Ids;
77  import org.junit.After;
78  import org.junit.Assert;
79  import org.junit.Before;
80  import org.junit.Ignore;
81  import org.junit.Test;
82  import org.junit.experimental.categories.Category;
83  import org.mockito.Mockito;
84  
85  @Category(MediumTests.class)
86  public class TestSplitLogManager {
87    private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
88    private final ServerName DUMMY_MASTER = ServerName.valueOf("dummy-master,1,1");
89    private final ServerManager sm = Mockito.mock(ServerManager.class);
90    private final MasterServices master = Mockito.mock(MasterServices.class);
91  
92    static {
93      Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
94    }
95  
96    private ZooKeeperWatcher zkw;
97    private DummyServer ds;
98    private static boolean stopped = false;
99    private SplitLogManager slm;
100   private Configuration conf;
101   private int to;
102   private RecoveryMode mode;
103 
104   private static HBaseTestingUtility TEST_UTIL;
105 
106   class DummyServer implements Server {
107     private ZooKeeperWatcher zkw;
108     private Configuration conf;
109     private CoordinatedStateManager cm;
110 
111     public DummyServer(ZooKeeperWatcher zkw, Configuration conf) {
112       this.zkw = zkw;
113       this.conf = conf;
114       cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
115       cm.initialize(this);
116     }
117 
118     @Override
119     public void abort(String why, Throwable e) {
120     }
121 
122     @Override
123     public boolean isAborted() {
124       return false;
125     }
126 
127     @Override
128     public void stop(String why) {
129     }
130 
131     @Override
132     public boolean isStopped() {
133       return false;
134     }
135 
136     @Override
137     public Configuration getConfiguration() {
138       return conf;
139     }
140 
141     @Override
142     public ZooKeeperWatcher getZooKeeper() {
143       return zkw;
144     }
145 
146     @Override
147     public ServerName getServerName() {
148       return null;
149     }
150 
151     @Override
152     public CoordinatedStateManager getCoordinatedStateManager() {
153       return cm;
154     }
155 
156     @Override
157     public ClusterConnection getConnection() {
158       return null;
159     }
160 
161     @Override
162     public MetaTableLocator getMetaTableLocator() {
163       return null;
164     }
165 
166     @Override
167     public ChoreService getChoreService() {
168       return null;
169     }
170   }
171 
172   static Stoppable stopper = new Stoppable() {
173     @Override
174     public void stop(String why) {
175       stopped = true;
176     }
177 
178     @Override
179     public boolean isStopped() {
180       return stopped;
181     }
182   };
183 
184   @Before
185   public void setup() throws Exception {
186     TEST_UTIL = new HBaseTestingUtility();
187     TEST_UTIL.startMiniZKCluster();
188     conf = TEST_UTIL.getConfiguration();
189     // Use a different ZK wrapper instance for each tests.
190     zkw =
191         new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null);
192     ds = new DummyServer(zkw, conf);
193 
194     ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
195     ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
196     assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
197     LOG.debug(zkw.baseZNode + " created");
198     ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
199     assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
200     LOG.debug(zkw.splitLogZNode + " created");
201 
202     stopped = false;
203     resetCounters();
204 
205     // By default, we let the test manage the error as before, so the server
206     // does not appear as dead from the master point of view, only from the split log pov.
207     Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true);
208     Mockito.when(master.getServerManager()).thenReturn(sm);
209 
210     to = 12000;
211     conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to);
212     conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
213 
214     conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
215     to = to + 4 * 100;
216 
217     this.mode =
218         (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY
219             : RecoveryMode.LOG_SPLITTING);
220   }
221 
222   @After
223   public void teardown() throws IOException, KeeperException {
224     stopper.stop("");
225     if (slm != null) slm.stop();
226     TEST_UTIL.shutdownMiniZKCluster();
227   }
228 
229   private interface Expr {
230     long eval();
231   }
232 
233   private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems)
234       throws Exception {
235     Expr e = new Expr() {
236       @Override
237       public long eval() {
238         return ctr.get();
239       }
240     };
241     waitForCounter(e, oldval, newval, timems);
242     return;
243   }
244 
245   private void waitForCounter(final Expr e, final long oldval, long newval, long timems)
246       throws Exception {
247 
248     TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() {
249       @Override
250       public boolean evaluate() throws Exception {
251         return (e.eval() != oldval);
252       }
253     });
254 
255     assertEquals(newval, e.eval());
256   }
257 
258   private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException,
259       InterruptedException {
260     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
261     NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
262     zkw.registerListener(listener);
263     ZKUtil.watchAndCheckExists(zkw, tasknode);
264 
265     slm.enqueueSplitTask(name, batch);
266     assertEquals(1, batch.installed);
267     assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch);
268     assertEquals(1L, tot_mgr_node_create_queued.get());
269 
270     LOG.debug("waiting for task node creation");
271     listener.waitForCreation();
272     LOG.debug("task created");
273     return tasknode;
274   }
275 
276   /**
277    * Test whether the splitlog correctly creates a task in zookeeper
278    * @throws Exception
279    */
280   @Test (timeout=180000)
281   public void testTaskCreation() throws Exception {
282 
283     LOG.info("TestTaskCreation - test the creation of a task in zk");
284     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
285     TaskBatch batch = new TaskBatch();
286 
287     String tasknode = submitTaskAndWait(batch, "foo/1");
288 
289     byte[] data = ZKUtil.getData(zkw, tasknode);
290     SplitLogTask slt = SplitLogTask.parseFrom(data);
291     LOG.info("Task node created " + slt.toString());
292     assertTrue(slt.isUnassigned(DUMMY_MASTER));
293   }
294 
295   @Test (timeout=180000)
296   public void testOrphanTaskAcquisition() throws Exception {
297     LOG.info("TestOrphanTaskAcquisition");
298 
299     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
300     SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode);
301     zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
302         CreateMode.PERSISTENT);
303 
304     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
305     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
306     Task task = slm.findOrCreateOrphanTask(tasknode);
307     assertTrue(task.isOrphan());
308     waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
309     assertFalse(task.isUnassigned());
310     long curt = System.currentTimeMillis();
311     assertTrue((task.last_update <= curt) &&
312         (task.last_update > (curt - 1000)));
313     LOG.info("waiting for manager to resubmit the orphan task");
314     waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
315     assertTrue(task.isUnassigned());
316     waitForCounter(tot_mgr_rescan, 0, 1, to + to/2);
317   }
318 
319   @Test (timeout=180000)
320   public void testUnassignedOrphan() throws Exception {
321     LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
322         " startup");
323     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
324     //create an unassigned orphan task
325     SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, this.mode);
326     zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
327         CreateMode.PERSISTENT);
328     int version = ZKUtil.checkExists(zkw, tasknode);
329 
330     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
331     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
332     Task task = slm.findOrCreateOrphanTask(tasknode);
333     assertTrue(task.isOrphan());
334     assertTrue(task.isUnassigned());
335     // wait for RESCAN node to be created
336     waitForCounter(tot_mgr_rescan, 0, 1, to/2);
337     Task task2 = slm.findOrCreateOrphanTask(tasknode);
338     assertTrue(task == task2);
339     LOG.debug("task = " + task);
340     assertEquals(1L, tot_mgr_resubmit.get());
341     assertEquals(1, task.incarnation.get());
342     assertEquals(0, task.unforcedResubmits.get());
343     assertTrue(task.isOrphan());
344     assertTrue(task.isUnassigned());
345     assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
346   }
347 
348   @Test (timeout=180000)
349   public void testMultipleResubmits() throws Exception {
350     LOG.info("TestMultipleResbmits - no indefinite resubmissions");
351     conf.setInt("hbase.splitlog.max.resubmit", 2);
352     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
353     TaskBatch batch = new TaskBatch();
354 
355     String tasknode = submitTaskAndWait(batch, "foo/1");
356     int version = ZKUtil.checkExists(zkw, tasknode);
357     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
358     final ServerName worker2 = ServerName.valueOf("worker2,1,1");
359     final ServerName worker3 = ServerName.valueOf("worker3,1,1");
360     SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
361     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
362     waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
363     waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
364     int version1 = ZKUtil.checkExists(zkw, tasknode);
365     assertTrue(version1 > version);
366     slt = new SplitLogTask.Owned(worker2, this.mode);
367     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
368     waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
369     waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
370     int version2 = ZKUtil.checkExists(zkw, tasknode);
371     assertTrue(version2 > version1);
372     slt = new SplitLogTask.Owned(worker3, this.mode);
373     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
374     waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
375     waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
376     Thread.sleep(to + to/2);
377     assertEquals(2L, tot_mgr_resubmit.get() - tot_mgr_resubmit_force.get());
378   }
379 
380   @Test (timeout=180000)
381   public void testRescanCleanup() throws Exception {
382     LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
383 
384     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
385     TaskBatch batch = new TaskBatch();
386 
387     String tasknode = submitTaskAndWait(batch, "foo/1");
388     int version = ZKUtil.checkExists(zkw, tasknode);
389     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
390     SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
391     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
392     waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
393     waitForCounter(new Expr() {
394       @Override
395       public long eval() {
396         return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get());
397       }
398     }, 0, 1, 5*60000); // wait long enough
399     Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.get());
400     int version1 = ZKUtil.checkExists(zkw, tasknode);
401     assertTrue(version1 > version);
402     byte[] taskstate = ZKUtil.getData(zkw, tasknode);
403     slt = SplitLogTask.parseFrom(taskstate);
404     assertTrue(slt.isUnassigned(DUMMY_MASTER));
405 
406     waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2);
407   }
408 
409   @Test (timeout=180000)
410   public void testTaskDone() throws Exception {
411     LOG.info("TestTaskDone - cleanup task node once in DONE state");
412 
413     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
414     TaskBatch batch = new TaskBatch();
415     String tasknode = submitTaskAndWait(batch, "foo/1");
416     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
417     SplitLogTask slt = new SplitLogTask.Done(worker1, this.mode);
418     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
419     synchronized (batch) {
420       while (batch.installed != batch.done) {
421         batch.wait();
422       }
423     }
424     waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
425     assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
426   }
427 
428   @Test (timeout=180000)
429   public void testTaskErr() throws Exception {
430     LOG.info("TestTaskErr - cleanup task node once in ERR state");
431 
432     conf.setInt("hbase.splitlog.max.resubmit", 0);
433     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
434     TaskBatch batch = new TaskBatch();
435 
436     String tasknode = submitTaskAndWait(batch, "foo/1");
437     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
438     SplitLogTask slt = new SplitLogTask.Err(worker1, this.mode);
439     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
440 
441     synchronized (batch) {
442       while (batch.installed != batch.error) {
443         batch.wait();
444       }
445     }
446     waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
447     assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
448     conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT);
449   }
450 
451   @Test (timeout=180000)
452   public void testTaskResigned() throws Exception {
453     LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
454     assertEquals(tot_mgr_resubmit.get(), 0);
455     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
456     assertEquals(tot_mgr_resubmit.get(), 0);
457     TaskBatch batch = new TaskBatch();
458     String tasknode = submitTaskAndWait(batch, "foo/1");
459     assertEquals(tot_mgr_resubmit.get(), 0);
460     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
461     assertEquals(tot_mgr_resubmit.get(), 0);
462     SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode);
463     assertEquals(tot_mgr_resubmit.get(), 0);
464     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
465     int version = ZKUtil.checkExists(zkw, tasknode);
466     // Could be small race here.
467     if (tot_mgr_resubmit.get() == 0) {
468       waitForCounter(tot_mgr_resubmit, 0, 1, to/2);
469     }
470     assertEquals(tot_mgr_resubmit.get(), 1);
471 
472     byte[] taskstate = ZKUtil.getData(zkw, tasknode);
473     slt = SplitLogTask.parseFrom(taskstate);
474     assertTrue(slt.isUnassigned(DUMMY_MASTER));
475   }
476 
477   @Test (timeout=180000)
478   public void testUnassignedTimeout() throws Exception {
479     LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
480         " resubmit");
481 
482     // create an orphan task in OWNED state
483     String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
484     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
485     SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
486     zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
487         CreateMode.PERSISTENT);
488 
489     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
490     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
491 
492     // submit another task which will stay in unassigned mode
493     TaskBatch batch = new TaskBatch();
494     submitTaskAndWait(batch, "foo/1");
495 
496     // keep updating the orphan owned node every to/2 seconds
497     for (int i = 0; i < (3 * to)/100; i++) {
498       Thread.sleep(100);
499       final ServerName worker2 = ServerName.valueOf("worker1,1,1");
500       slt = new SplitLogTask.Owned(worker2, this.mode);
501       ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
502     }
503 
504     // since we have stopped heartbeating the owned node therefore it should
505     // get resubmitted
506     LOG.info("waiting for manager to resubmit the orphan task");
507     waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
508 
509     // now all the nodes are unassigned. manager should post another rescan
510     waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2);
511   }
512 
513   @Test (timeout=180000)
514   public void testDeadWorker() throws Exception {
515     LOG.info("testDeadWorker");
516 
517     conf.setLong("hbase.splitlog.max.resubmit", 0);
518     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
519     TaskBatch batch = new TaskBatch();
520 
521     String tasknode = submitTaskAndWait(batch, "foo/1");
522     int version = ZKUtil.checkExists(zkw, tasknode);
523     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
524     SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
525     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
526     if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
527     slm.handleDeadWorker(worker1);
528     if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
529     if (tot_mgr_resubmit_dead_server_task.get() == 0) {
530       waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2);
531     }
532 
533     int version1 = ZKUtil.checkExists(zkw, tasknode);
534     assertTrue(version1 > version);
535     byte[] taskstate = ZKUtil.getData(zkw, tasknode);
536     slt = SplitLogTask.parseFrom(taskstate);
537     assertTrue(slt.isUnassigned(DUMMY_MASTER));
538     return;
539   }
540 
541   @Test (timeout=180000)
542   public void testWorkerCrash() throws Exception {
543     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
544     TaskBatch batch = new TaskBatch();
545 
546     String tasknode = submitTaskAndWait(batch, "foo/1");
547     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
548 
549     SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
550     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
551     if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
552 
553     // Not yet resubmitted.
554     Assert.assertEquals(0, tot_mgr_resubmit.get());
555 
556     // This server becomes dead
557     Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
558 
559     Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
560 
561     // It has been resubmitted
562     Assert.assertEquals(1, tot_mgr_resubmit.get());
563   }
564 
565   @Test (timeout=180000)
566   public void testEmptyLogDir() throws Exception {
567     LOG.info("testEmptyLogDir");
568     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
569     FileSystem fs = TEST_UTIL.getTestFileSystem();
570     Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
571         UUID.randomUUID().toString());
572     fs.mkdirs(emptyLogDirPath);
573     slm.splitLogDistributed(emptyLogDirPath);
574     assertFalse(fs.exists(emptyLogDirPath));
575   }
576 
577   @Test (timeout = 60000)
578   public void testLogFilesAreArchived() throws Exception {
579     LOG.info("testLogFilesAreArchived");
580     final SplitLogManager slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
581     FileSystem fs = TEST_UTIL.getTestFileSystem();
582     Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
583     conf.set(HConstants.HBASE_DIR, dir.toString());
584     Path logDirPath = new Path(dir, UUID.randomUUID().toString());
585     fs.mkdirs(logDirPath);
586     // create an empty log file
587     String logFile = ServerName.valueOf("foo", 1, 1).toString();
588     fs.create(new Path(logDirPath, logFile)).close();
589 
590     // spin up a thread mocking split done.
591     new Thread() {
592       @Override
593       public void run() {
594         boolean done = false;
595         while (!done) {
596           for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
597             final ServerName worker1 = ServerName.valueOf("worker1,1,1");
598             SplitLogTask slt = new SplitLogTask.Done(worker1, RecoveryMode.LOG_SPLITTING);
599             boolean encounteredZKException = false;
600             try {
601               ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
602             } catch (KeeperException e) {
603               LOG.warn(e);
604               encounteredZKException = true;
605             }
606             if (!encounteredZKException) {
607               done = true;
608             }
609           }
610         }
611       };
612     }.start();
613 
614     slm.splitLogDistributed(logDirPath);
615 
616     assertFalse(fs.exists(logDirPath));
617   }
618 
619   /**
620    * The following test case is aiming to test the situation when distributedLogReplay is turned off
621    * and restart a cluster there should no recovery regions in ZK left.
622    * @throws Exception
623    */
624   @Test(timeout = 300000)
625   public void testRecoveryRegionRemovedFromZK() throws Exception {
626     LOG.info("testRecoveryRegionRemovedFromZK");
627     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
628     String nodePath =
629         ZKUtil.joinZNode(zkw.recoveringRegionsZNode,
630           HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
631     ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L));
632 
633     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
634     slm.removeStaleRecoveringRegions(null);
635 
636     List<String> recoveringRegions =
637         zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
638 
639     assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty());
640   }
641 
642   @Ignore("DLR is broken by HBASE-12751") @Test(timeout=60000)
643   public void testGetPreviousRecoveryMode() throws Exception {
644     LOG.info("testGetPreviousRecoveryMode");
645     SplitLogCounters.resetCounters();
646     // Not actually enabling DLR for the cluster, just for the ZkCoordinatedStateManager to use.
647     // The test is just manipulating ZK manually anyways.
648     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
649 
650     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"),
651       new SplitLogTask.Unassigned(
652         ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(),
653         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
654 
655     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
656     LOG.info("Mode1=" + slm.getRecoveryMode());
657     assertTrue(slm.isLogSplitting());
658     zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1);
659     LOG.info("Mode2=" + slm.getRecoveryMode());
660     slm.setRecoveryMode(false);
661     LOG.info("Mode3=" + slm.getRecoveryMode());
662     assertTrue("Mode4=" + slm.getRecoveryMode(), slm.isLogReplaying());
663   }
664 }