View Javadoc

1   /**
2     * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master;
19  
20  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
26  
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.util.ArrayList;
30  import java.util.Collections;
31  import java.util.HashSet;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.atomic.AtomicInteger;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.classification.InterfaceAudience;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.PathFilter;
48  import org.apache.hadoop.hbase.Chore;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.ServerName;
52  import org.apache.hadoop.hbase.SplitLogCounters;
53  import org.apache.hadoop.hbase.SplitLogTask;
54  import org.apache.hadoop.hbase.Stoppable;
55  import org.apache.hadoop.hbase.exceptions.DeserializationException;
56  import org.apache.hadoop.hbase.io.hfile.HFile;
57  import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
58  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
59  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
60  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
61  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
62  import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
63  import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
64  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
65  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
66  import org.apache.hadoop.hbase.util.FSUtils;
67  import org.apache.hadoop.hbase.util.Pair;
68  import org.apache.hadoop.hbase.util.Threads;
69  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
70  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
71  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
72  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
73  import org.apache.hadoop.util.StringUtils;
74  import org.apache.zookeeper.AsyncCallback;
75  import org.apache.zookeeper.CreateMode;
76  import org.apache.zookeeper.KeeperException;
77  import org.apache.zookeeper.KeeperException.NoNodeException;
78  import org.apache.zookeeper.ZooDefs.Ids;
79  import org.apache.zookeeper.data.Stat;
80  
81  /**
82   * Distributes the task of log splitting to the available region servers.
83   * Coordination happens via zookeeper. For every log file that has to be split a
84   * znode is created under <code>/hbase/splitlog</code>. SplitLogWorkers race to grab a task.
85   *
86   * <p>SplitLogManager monitors the task znodes that it creates using the
87   * timeoutMonitor thread. If a task's progress is slow then
88   * {@link #resubmit(String, Task, ResubmitDirective)} will take away the task from the owner
89   * {@link SplitLogWorker} and the task will be up for grabs again. When the task is done then the
90   * task's znode is deleted by SplitLogManager.
91   *
92   * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
93   * log files. The caller thread waits in this method until all the log files
94   * have been split.
95   *
96   * <p>All the zookeeper calls made by this class are asynchronous. This is mainly
97   * to help reduce response time seen by the callers.
98   *
99   * <p>There is race in this design between the SplitLogManager and the
100  * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
101  * already been completed by a SplitLogWorker. We rely on the idempotency of
102  * the log splitting task for correctness.
103  *
104  * <p>It is also assumed that every log splitting task is unique and once
105  * completed (either with success or with error) it will be not be submitted
106  * again. If a task is resubmitted then there is a risk that old "delete task"
107  * can delete the re-submission.
108  */
109 @InterfaceAudience.Private
110 public class SplitLogManager extends ZooKeeperListener {
111   private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
112 
113   public static final int DEFAULT_TIMEOUT = 120000;
114   public static final int DEFAULT_ZK_RETRIES = 3;
115   public static final int DEFAULT_MAX_RESUBMIT = 3;
116   public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
117 
118   private final Stoppable stopper;
119   private final MasterServices master;
120   private final ServerName serverName;
121   private final TaskFinisher taskFinisher;
122   private FileSystem fs;
123   private Configuration conf;
124 
125   private long zkretries;
126   private long resubmit_threshold;
127   private long timeout;
128   private long unassignedTimeout;
129   private long lastTaskCreateTime = Long.MAX_VALUE;
130   public boolean ignoreZKDeleteForTesting = false;
131   private volatile long lastRecoveringNodeCreationTime = 0;
132   // When lastRecoveringNodeCreationTime is older than the following threshold, we'll check
133   // whether to GC stale recovering znodes
134   private long checkRecoveringTimeThreshold = 15000; // 15 seconds
135   private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections
136       .synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>());
137 
138   /**
139    * In distributedLogReplay mode, we need touch both splitlog and recovering-regions znodes in one
140    * operation. So the lock is used to guard such cases.
141    */
142   protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
143 
144   private volatile RecoveryMode recoveryMode;
145   private volatile boolean isDrainingDone = false;
146 
147   private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
148   private TimeoutMonitor timeoutMonitor;
149 
150   private volatile Set<ServerName> deadWorkers = null;
151   private final Object deadWorkersLock = new Object();
152 
153   private Set<String> failedDeletions = null;
154 
155   /**
156    * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
157    *   Stoppable stopper, MasterServices master, ServerName serverName,
158    *   boolean masterRecovery, TaskFinisher tf)}
159    * with masterRecovery = false, and tf = null.  Used in unit tests.
160    *
161    * @param zkw the ZK watcher
162    * @param conf the HBase configuration
163    * @param stopper the stoppable in case anything is wrong
164    * @param master the master services
165    * @param serverName the master server name
166    * @throws KeeperException
167    * @throws InterruptedIOException
168    */
169     public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
170        Stoppable stopper, MasterServices master, ServerName serverName)
171        throws InterruptedIOException, KeeperException {
172     this(zkw, conf, stopper, master, serverName, false, null);
173   }
174 
175   /**
176    * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
177    *   Stoppable stopper, MasterServices master, ServerName serverName,
178    *   boolean masterRecovery, TaskFinisher tf)}
179    * that provides a task finisher for copying recovered edits to their final destination.
180    * The task finisher has to be robust because it can be arbitrarily restarted or called
181    * multiple times.
182    *
183    * @param zkw the ZK watcher
184    * @param conf the HBase configuration
185    * @param stopper the stoppable in case anything is wrong
186    * @param master the master services
187    * @param serverName the master server name
188    * @param masterRecovery an indication if the master is in recovery
189    * @throws KeeperException
190    * @throws InterruptedIOException
191    */
192   public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
193       Stoppable stopper, MasterServices master, ServerName serverName, boolean masterRecovery) 
194       throws InterruptedIOException, KeeperException {
195     this(zkw, conf, stopper, master, serverName, masterRecovery, new TaskFinisher() {
196       @Override
197       public Status finish(ServerName workerName, String logfile) {
198         try {
199           HLogSplitter.finishSplitLogFile(logfile, conf);
200         } catch (IOException e) {
201           LOG.warn("Could not finish splitting of log file " + logfile, e);
202           return Status.ERR;
203         }
204         return Status.DONE;
205       }
206     });
207   }
208 
209   /**
210    * Its OK to construct this object even when region-servers are not online. It
211    * does lookup the orphan tasks in zk but it doesn't block waiting for them
212    * to be done.
213    *
214    * @param zkw the ZK watcher
215    * @param conf the HBase configuration
216    * @param stopper the stoppable in case anything is wrong
217    * @param master the master services
218    * @param serverName the master server name
219    * @param masterRecovery an indication if the master is in recovery
220    * @param tf task finisher
221    * @throws KeeperException
222    * @throws InterruptedIOException
223    */
224   public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
225         Stoppable stopper, MasterServices master,
226         ServerName serverName, boolean masterRecovery, TaskFinisher tf) 
227       throws InterruptedIOException, KeeperException {
228     super(zkw);
229     this.taskFinisher = tf;
230     this.conf = conf;
231     this.stopper = stopper;
232     this.master = master;
233     this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
234     this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
235     this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
236     this.unassignedTimeout =
237       conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
238 
239     // Determine recovery mode  
240     setRecoveryMode(true);
241 
242     LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout +
243       ", distributedLogReplay=" + (this.recoveryMode == RecoveryMode.LOG_REPLAY));
244 
245     this.serverName = serverName;
246     this.timeoutMonitor = new TimeoutMonitor(
247       conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
248 
249     this.failedDeletions = Collections.synchronizedSet(new HashSet<String>());
250 
251     if (!masterRecovery) {
252       Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
253           + ".splitLogManagerTimeoutMonitor");
254     }
255     // Watcher can be null during tests with Mock'd servers.
256     if (this.watcher != null) {
257       this.watcher.registerListener(this);
258       lookForOrphans();
259     }
260   }
261 
262   private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
263     List<FileStatus> fileStatus = new ArrayList<FileStatus>();
264     for (Path hLogDir : logDirs) {
265       this.fs = hLogDir.getFileSystem(conf);
266       if (!fs.exists(hLogDir)) {
267         LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
268         continue;
269       }
270       FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
271       if (logfiles == null || logfiles.length == 0) {
272         LOG.info(hLogDir + " is empty dir, no logs to split");
273       } else {
274         for (FileStatus status : logfiles)
275           fileStatus.add(status);
276       }
277     }
278     FileStatus[] a = new FileStatus[fileStatus.size()];
279     return fileStatus.toArray(a);
280   }
281 
282   /**
283    * @param logDir
284    *            one region sever hlog dir path in .logs
285    * @throws IOException
286    *             if there was an error while splitting any log file
287    * @return cumulative size of the logfiles split
288    * @throws IOException
289    */
290   public long splitLogDistributed(final Path logDir) throws IOException {
291     List<Path> logDirs = new ArrayList<Path>();
292     logDirs.add(logDir);
293     return splitLogDistributed(logDirs);
294   }
295 
296   /**
297    * The caller will block until all the log files of the given region server
298    * have been processed - successfully split or an error is encountered - by an
299    * available worker region server. This method must only be called after the
300    * region servers have been brought online.
301    *
302    * @param logDirs List of log dirs to split
303    * @throws IOException If there was an error while splitting any log file
304    * @return cumulative size of the logfiles split
305    */
306   public long splitLogDistributed(final List<Path> logDirs) throws IOException {
307     if (logDirs.isEmpty()) {
308       return 0;
309     }
310     Set<ServerName> serverNames = new HashSet<ServerName>();
311     for (Path logDir : logDirs) {
312       try {
313         ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logDir);
314         if (serverName != null) {
315           serverNames.add(serverName);
316         }
317       } catch (IllegalArgumentException e) {
318         // ignore invalid format error.
319         LOG.warn("Cannot parse server name from " + logDir);
320       }
321     }
322     return splitLogDistributed(serverNames, logDirs, null);
323   }
324 
325   /**
326    * The caller will block until all the hbase:meta log files of the given region server
327    * have been processed - successfully split or an error is encountered - by an
328    * available worker region server. This method must only be called after the
329    * region servers have been brought online.
330    *
331    * @param logDirs List of log dirs to split
332    * @param filter the Path filter to select specific files for considering
333    * @throws IOException If there was an error while splitting any log file
334    * @return cumulative size of the logfiles split
335    */
336   public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
337       PathFilter filter) throws IOException {
338     MonitoredTask status = TaskMonitor.get().createStatus(
339           "Doing distributed log split in " + logDirs);
340     FileStatus[] logfiles = getFileList(logDirs, filter);
341     status.setStatus("Checking directory contents...");
342     LOG.debug("Scheduling batch of logs to split");
343     SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
344     LOG.info("started splitting " + logfiles.length + " logs in " + logDirs);
345     long t = EnvironmentEdgeManager.currentTimeMillis();
346     long totalSize = 0;
347     TaskBatch batch = new TaskBatch();
348     Boolean isMetaRecovery = (filter == null) ? null : false;
349     for (FileStatus lf : logfiles) {
350       // TODO If the log file is still being written to - which is most likely
351       // the case for the last log file - then its length will show up here
352       // as zero. The size of such a file can only be retrieved after
353       // recover-lease is done. totalSize will be under in most cases and the
354       // metrics that it drives will also be under-reported.
355       totalSize += lf.getLen();
356       String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
357       if (!enqueueSplitTask(pathToLog, batch)) {
358         throw new IOException("duplicate log split scheduled for " + lf.getPath());
359       }
360     }
361     waitForSplittingCompletion(batch, status);
362     // remove recovering regions from ZK
363     if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
364       // we split meta regions and user regions separately therefore logfiles are either all for
365       // meta or user regions but won't for both( we could have mixed situations in tests)
366       isMetaRecovery = true;
367     }
368     this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery);
369 
370     if (batch.done != batch.installed) {
371       batch.isDead = true;
372       SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
373       LOG.warn("error while splitting logs in " + logDirs +
374       " installed = " + batch.installed + " but only " + batch.done + " done");
375       String msg = "error or interrupted while splitting logs in "
376         + logDirs + " Task = " + batch;
377       status.abort(msg);
378       throw new IOException(msg);
379     }
380     for(Path logDir: logDirs){
381       status.setStatus("Cleaning up log directory...");
382       try {
383         if (fs.exists(logDir) && !fs.delete(logDir, false)) {
384           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
385         }
386       } catch (IOException ioe) {
387         FileStatus[] files = fs.listStatus(logDir);
388         if (files != null && files.length > 0) {
389           LOG.warn("returning success without actually splitting and " +
390               "deleting all the log files in path " + logDir);
391         } else {
392           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
393         }
394       }
395       SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
396     }
397     String msg = "finished splitting (more than or equal to) " + totalSize +
398         " bytes in " + batch.installed + " log files in " + logDirs + " in " +
399         (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms";
400     status.markComplete(msg);
401     LOG.info(msg);
402     return totalSize;
403   }
404 
405   /**
406    * Add a task entry to splitlog znode if it is not already there.
407    *
408    * @param taskname the path of the log to be split
409    * @param batch the batch this task belongs to
410    * @return true if a new entry is created, false if it is already there.
411    */
412   boolean enqueueSplitTask(String taskname, TaskBatch batch) {
413     SplitLogCounters.tot_mgr_log_split_start.incrementAndGet();
414     // This is a znode path under the splitlog dir with the rest of the path made up of an
415     // url encoding of the passed in log to split.
416     String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
417     lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
418     Task oldtask = createTaskIfAbsent(path, batch);
419     if (oldtask == null) {
420       // publish the task in zk
421       createNode(path, zkretries);
422       return true;
423     }
424     return false;
425   }
426 
427   private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
428     synchronized (batch) {
429       while ((batch.done + batch.error) != batch.installed) {
430         try {
431           status.setStatus("Waiting for distributed tasks to finish. "
432               + " scheduled=" + batch.installed
433               + " done=" + batch.done
434               + " error=" + batch.error);
435           int remaining = batch.installed - (batch.done + batch.error);
436           int actual = activeTasks(batch);
437           if (remaining != actual) {
438             LOG.warn("Expected " + remaining
439               + " active tasks, but actually there are " + actual);
440           }
441           int remainingInZK = remainingTasksInZK();
442           if (remainingInZK >= 0 && actual > remainingInZK) {
443             LOG.warn("Expected at least" + actual
444               + " tasks in ZK, but actually there are " + remainingInZK);
445           }
446           if (remainingInZK == 0 || actual == 0) {
447             LOG.warn("No more task remaining (ZK or task map), splitting "
448               + "should have completed. Remaining tasks in ZK " + remainingInZK
449               + ", active tasks in map " + actual);
450             if (remainingInZK == 0 && actual == 0) {
451               return;
452             }
453           }
454           batch.wait(100);
455           if (stopper.isStopped()) {
456             LOG.warn("Stopped while waiting for log splits to be completed");
457             return;
458           }
459         } catch (InterruptedException e) {
460           LOG.warn("Interrupted while waiting for log splits to be completed");
461           Thread.currentThread().interrupt();
462           return;
463         }
464       }
465     }
466   }
467 
468   private int activeTasks(final TaskBatch batch) {
469     int count = 0;
470     for (Task t: tasks.values()) {
471       if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
472         count++;
473       }
474     }
475     return count;
476   }
477 
478   private int remainingTasksInZK() {
479     int count = 0;
480     try {
481       List<String> tasks =
482         ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
483       if (tasks != null) {
484         for (String t: tasks) {
485           if (!ZKSplitLog.isRescanNode(watcher, t)) {
486             count++;
487           }
488         }
489       }
490     } catch (KeeperException ke) {
491       LOG.warn("Failed to check remaining tasks", ke);
492       count = -1;
493     }
494     return count;
495   }
496 
497   /**
498    * It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the
499    * region server hosting the region can allow reads to the recovered region
500    * @param serverNames servers which are just recovered
501    * @param isMetaRecovery whether current recovery is for the meta region on
502    *          <code>serverNames<code>
503    */
504   private void
505       removeRecoveringRegionsFromZK(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
506     if (this.recoveryMode != RecoveryMode.LOG_REPLAY) {
507       // the function is only used in WALEdit direct replay mode
508       return;
509     }
510 
511     final String metaEncodeRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedName();
512     int count = 0;
513     Set<String> recoveredServerNameSet = new HashSet<String>();
514     if (serverNames != null) {
515       for (ServerName tmpServerName : serverNames) {
516         recoveredServerNameSet.add(tmpServerName.getServerName());
517       }
518     }
519 
520     try {
521       this.recoveringRegionLock.lock();
522 
523       List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
524       if (tasks != null) {
525         for (String t : tasks) {
526           if (!ZKSplitLog.isRescanNode(watcher, t)) {
527             count++;
528           }
529         }
530       }
531       if (count == 0 && this.master.isInitialized()
532           && !this.master.getServerManager().areDeadServersInProgress()) {
533         // no splitting work items left
534         deleteRecoveringRegionZNodes(watcher, null);
535         // reset lastRecoveringNodeCreationTime because we cleared all recovering znodes at
536         // this point.
537         lastRecoveringNodeCreationTime = Long.MAX_VALUE;
538       } else if (!recoveredServerNameSet.isEmpty()) {
539         // remove recovering regions which doesn't have any RS associated with it
540         List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
541         if (regions != null) {
542           for (String region : regions) {
543             if(isMetaRecovery != null) {
544               if ((isMetaRecovery && !region.equalsIgnoreCase(metaEncodeRegionName))
545                   || (!isMetaRecovery && region.equalsIgnoreCase(metaEncodeRegionName))) {
546                 // skip non-meta regions when recovering the meta region or
547                 // skip the meta region when recovering user regions
548                 continue;
549               }
550             }
551             String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
552             List<String> failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
553             if (failedServers == null || failedServers.isEmpty()) {
554               ZKUtil.deleteNode(watcher, nodePath);
555               continue;
556             }
557             if (recoveredServerNameSet.containsAll(failedServers)) {
558               ZKUtil.deleteNodeRecursively(watcher, nodePath);
559             } else {
560               for (String failedServer : failedServers) {
561                 if (recoveredServerNameSet.contains(failedServer)) {
562                   String tmpPath = ZKUtil.joinZNode(nodePath, failedServer);
563                   ZKUtil.deleteNode(watcher, tmpPath);
564                 }
565               }
566             }
567           }
568         }
569       }
570     } catch (KeeperException ke) {
571       LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke);
572       if (serverNames != null && !serverNames.isEmpty()) {
573         this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames,
574             isMetaRecovery));
575       }
576     } finally {
577       this.recoveringRegionLock.unlock();
578     }
579   }
580 
581   /**
582    * It removes stale recovering regions under /hbase/recovering-regions/[encoded region name]
583    * during master initialization phase.
584    * @param failedServers A set of known failed servers
585    * @throws KeeperException
586    */
587   void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
588       throws KeeperException {
589 
590     Set<String> knownFailedServers = new HashSet<String>();
591     if (failedServers != null) {
592       for (ServerName tmpServerName : failedServers) {
593         knownFailedServers.add(tmpServerName.getServerName());
594       }
595     }
596 
597     this.recoveringRegionLock.lock();
598     try {
599       List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
600       if (tasks != null) {
601         for (String t : tasks) {
602           byte[] data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t));
603           if (data != null) {
604             SplitLogTask slt = null;
605             try {
606               slt = SplitLogTask.parseFrom(data);
607             } catch (DeserializationException e) {
608               LOG.warn("Failed parse data for znode " + t, e);
609             }
610             if (slt != null && slt.isDone()) {
611               continue;
612             }
613           }
614           // decode the file name
615           t = ZKSplitLog.getFileName(t);
616           ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(new Path(t));
617           if (serverName != null) {
618             knownFailedServers.add(serverName.getServerName());
619           } else {
620             LOG.warn("Found invalid WAL log file name:" + t);
621           }
622         }
623       }
624 
625       // remove recovering regions which doesn't have any RS associated with it
626       List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
627       if (regions != null) {
628         for (String region : regions) {
629           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
630           List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
631           if (regionFailedServers == null || regionFailedServers.isEmpty()) {
632             ZKUtil.deleteNode(watcher, nodePath);
633             continue;
634           }
635           boolean needMoreRecovery = false;
636           for (String tmpFailedServer : regionFailedServers) {
637             if (knownFailedServers.contains(tmpFailedServer)) {
638               needMoreRecovery = true;
639               break;
640             }
641           }
642           if (!needMoreRecovery) {
643             ZKUtil.deleteNodeRecursively(watcher, nodePath);
644           }
645         }
646       }
647     } finally {
648       this.recoveringRegionLock.unlock();
649     }
650   }
651 
652   public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) {
653     try {
654       if (regions == null) {
655         // remove all children under /home/recovering-regions
656         LOG.info("Garbage collecting all recovering regions.");
657         ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
658       } else {
659         for (String curRegion : regions) {
660           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion);
661           ZKUtil.deleteNodeRecursively(watcher, nodePath);
662         }
663       }
664     } catch (KeeperException e) {
665       LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
666     }
667   }
668 
669   private void setDone(String path, TerminationStatus status) {
670     Task task = tasks.get(path);
671     if (task == null) {
672       if (!ZKSplitLog.isRescanNode(watcher, path)) {
673         SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
674         LOG.debug("unacquired orphan task is done " + path);
675       }
676     } else {
677       synchronized (task) {
678         if (task.status == IN_PROGRESS) {
679           if (status == SUCCESS) {
680             SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
681             LOG.info("Done splitting " + path);
682           } else {
683             SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
684             LOG.warn("Error splitting " + path);
685           }
686           task.status = status;
687           if (task.batch != null) {
688             synchronized (task.batch) {
689               if (status == SUCCESS) {
690                 task.batch.done++;
691               } else {
692                 task.batch.error++;
693               }
694               task.batch.notify();
695             }
696           }
697         }
698       }
699     }
700     // delete the task node in zk. It's an async
701     // call and no one is blocked waiting for this node to be deleted. All
702     // task names are unique (log.<timestamp>) there is no risk of deleting
703     // a future task.
704     // if a deletion fails, TimeoutMonitor will retry the same deletion later
705     deleteNode(path, zkretries);
706     return;
707   }
708 
709   private void createNode(String path, Long retry_count) {
710     SplitLogTask slt = new SplitLogTask.Unassigned(serverName, this.recoveryMode);
711     ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count);
712     SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
713     return;
714   }
715 
716   private void createNodeSuccess(String path) {
717     LOG.debug("put up splitlog task at znode " + path);
718     getDataSetWatch(path, zkretries);
719   }
720 
721   private void createNodeFailure(String path) {
722     // TODO the Manager should split the log locally instead of giving up
723     LOG.warn("failed to create task node" + path);
724     setDone(path, FAILURE);
725   }
726 
727 
728   private void getDataSetWatch(String path, Long retry_count) {
729     this.watcher.getRecoverableZooKeeper().getZooKeeper().
730         getData(path, this.watcher,
731         new GetDataAsyncCallback(true), retry_count);
732     SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
733   }
734 
735   private void tryGetDataSetWatch(String path) {
736     // A negative retry count will lead to ignoring all error processing.
737     this.watcher.getRecoverableZooKeeper().getZooKeeper().
738         getData(path, this.watcher,
739         new GetDataAsyncCallback(false), Long.valueOf(-1) /* retry count */);
740     SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
741   }
742 
743   private void getDataSetWatchSuccess(String path, byte[] data, int version)
744   throws DeserializationException {
745     if (data == null) {
746       if (version == Integer.MIN_VALUE) {
747         // assume all done. The task znode suddenly disappeared.
748         setDone(path, SUCCESS);
749         return;
750       }
751       SplitLogCounters.tot_mgr_null_data.incrementAndGet();
752       LOG.fatal("logic error - got null data " + path);
753       setDone(path, FAILURE);
754       return;
755     }
756     data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
757     SplitLogTask slt = SplitLogTask.parseFrom(data);
758     if (slt.isUnassigned()) {
759       LOG.debug("task not yet acquired " + path + " ver = " + version);
760       handleUnassignedTask(path);
761     } else if (slt.isOwned()) {
762       heartbeat(path, version, slt.getServerName());
763     } else if (slt.isResigned()) {
764       LOG.info("task " + path + " entered state: " + slt.toString());
765       resubmitOrFail(path, FORCE);
766     } else if (slt.isDone()) {
767       LOG.info("task " + path + " entered state: " + slt.toString());
768       if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
769         if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
770           setDone(path, SUCCESS);
771         } else {
772           resubmitOrFail(path, CHECK);
773         }
774       } else {
775         setDone(path, SUCCESS);
776       }
777     } else if (slt.isErr()) {
778       LOG.info("task " + path + " entered state: " + slt.toString());
779       resubmitOrFail(path, CHECK);
780     } else {
781       LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + slt.toString());
782       setDone(path, FAILURE);
783     }
784   }
785 
786   private void getDataSetWatchFailure(String path) {
787     LOG.warn("failed to set data watch " + path);
788     setDone(path, FAILURE);
789   }
790 
791   /**
792    * It is possible for a task to stay in UNASSIGNED state indefinitely - say
793    * SplitLogManager wants to resubmit a task. It forces the task to UNASSIGNED
794    * state but it dies before it could create the RESCAN task node to signal
795    * the SplitLogWorkers to pick up the task. To prevent this scenario the
796    * SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup.
797    *
798    * @param path
799    */
800   private void handleUnassignedTask(String path) {
801     if (ZKSplitLog.isRescanNode(watcher, path)) {
802       return;
803     }
804     Task task = findOrCreateOrphanTask(path);
805     if (task.isOrphan() && (task.incarnation == 0)) {
806       LOG.info("resubmitting unassigned orphan task " + path);
807       // ignore failure to resubmit. The timeout-monitor will handle it later
808       // albeit in a more crude fashion
809       resubmit(path, task, FORCE);
810     }
811   }
812 
813   /**
814    * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions
815    * @param statusCode integer value of a ZooKeeper exception code
816    * @param action description message about the retried action
817    * @return true when need to abandon retries otherwise false
818    */
819   private boolean needAbandonRetries(int statusCode, String action) {
820     if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
821       LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
822           + "action=" + action);
823       return true;
824     }
825     return false;
826   }
827 
828   private void heartbeat(String path, int new_version, ServerName workerName) {
829     Task task = findOrCreateOrphanTask(path);
830     if (new_version != task.last_version) {
831       if (task.isUnassigned()) {
832         LOG.info("task " + path + " acquired by " + workerName);
833       }
834       task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(), new_version, workerName);
835       SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
836     } else {
837       // duplicate heartbeats - heartbeats w/o zk node version
838       // changing - are possible. The timeout thread does
839       // getDataSetWatch() just to check whether a node still
840       // exists or not
841     }
842     return;
843   }
844 
845   private boolean resubmit(String path, Task task, ResubmitDirective directive) {
846     // its ok if this thread misses the update to task.deleted. It will fail later
847     if (task.status != IN_PROGRESS) {
848       return false;
849     }
850     int version;
851     if (directive != FORCE) {
852       // We're going to resubmit:
853       //  1) immediately if the worker server is now marked as dead
854       //  2) after a configurable timeout if the server is not marked as dead but has still not
855       //       finished the task. This allows to continue if the worker cannot actually handle it,
856       //       for any reason.
857       final long time = EnvironmentEdgeManager.currentTimeMillis() - task.last_update;
858       final boolean alive = master.getServerManager() != null ?
859           master.getServerManager().isServerOnline(task.cur_worker_name) : true;
860       if (alive && time < timeout) {
861         LOG.trace("Skipping the resubmit of " + task.toString() + "  because the server " +
862             task.cur_worker_name + " is not marked as dead, we waited for " + time +
863             " while the timeout is " + timeout);
864         return false;
865       }
866       if (task.unforcedResubmits.get() >= resubmit_threshold) {
867         if (!task.resubmitThresholdReached) {
868           task.resubmitThresholdReached = true;
869           SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
870           LOG.info("Skipping resubmissions of task " + path +
871               " because threshold " + resubmit_threshold + " reached");
872         }
873         return false;
874       }
875       // race with heartbeat() that might be changing last_version
876       version = task.last_version;
877     } else {
878       SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet();
879       version = -1;
880     }
881     LOG.info("resubmitting task " + path);
882     task.incarnation++;
883     try {
884       // blocking zk call but this is done from the timeout thread
885       SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName, this.recoveryMode);
886       if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
887         LOG.debug("failed to resubmit task " + path +
888             " version changed");
889         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
890         return false;
891       }
892     } catch (NoNodeException e) {
893       LOG.warn("failed to resubmit because znode doesn't exist " + path +
894           " task done (or forced done by removing the znode)");
895       try {
896         getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
897       } catch (DeserializationException e1) {
898         LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
899         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
900         return false;
901       }
902       return false;
903     } catch (KeeperException.BadVersionException e) {
904       LOG.debug("failed to resubmit task " + path + " version changed");
905       task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
906       return false;
907     } catch (KeeperException e) {
908       SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
909       LOG.warn("failed to resubmit " + path, e);
910       return false;
911     }
912     // don't count forced resubmits
913     if (directive != FORCE) {
914       task.unforcedResubmits.incrementAndGet();
915     }
916     task.setUnassigned();
917     createRescanNode(Long.MAX_VALUE);
918     SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
919     return true;
920   }
921 
922   private void resubmitOrFail(String path, ResubmitDirective directive) {
923     if (resubmit(path, findOrCreateOrphanTask(path), directive) == false) {
924       setDone(path, FAILURE);
925     }
926   }
927 
928   private void deleteNode(String path, Long retries) {
929     SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
930     // Once a task znode is ready for delete, that is it is in the TASK_DONE
931     // state, then no one should be writing to it anymore. That is no one
932     // will be updating the znode version any more.
933     this.watcher.getRecoverableZooKeeper().getZooKeeper().
934       delete(path, -1, new DeleteAsyncCallback(),
935         retries);
936   }
937 
938   private void deleteNodeSuccess(String path) {
939     if (ignoreZKDeleteForTesting) {
940       return;
941     }
942     Task task;
943     task = tasks.remove(path);
944     if (task == null) {
945       if (ZKSplitLog.isRescanNode(watcher, path)) {
946         SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
947       }
948       SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
949       LOG.debug("deleted task without in memory state " + path);
950       return;
951     }
952     synchronized (task) {
953       task.status = DELETED;
954       task.notify();
955     }
956     SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
957   }
958 
959   private void deleteNodeFailure(String path) {
960     LOG.info("Failed to delete node " + path + " and will retry soon.");
961     return;
962   }
963 
964   /**
965    * signal the workers that a task was resubmitted by creating the
966    * RESCAN node.
967    * @throws KeeperException
968    */
969   private void createRescanNode(long retries) {
970     // The RESCAN node will be deleted almost immediately by the
971     // SplitLogManager as soon as it is created because it is being
972     // created in the DONE state. This behavior prevents a buildup
973     // of RESCAN nodes. But there is also a chance that a SplitLogWorker
974     // might miss the watch-trigger that creation of RESCAN node provides.
975     // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
976     // therefore this behavior is safe.
977     lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
978     SplitLogTask slt = new SplitLogTask.Done(this.serverName, this.recoveryMode);
979     this.watcher.getRecoverableZooKeeper().getZooKeeper().
980       create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(),
981         Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
982         new CreateRescanAsyncCallback(), Long.valueOf(retries));
983   }
984 
985   private void createRescanSuccess(String path) {
986     SplitLogCounters.tot_mgr_rescan.incrementAndGet();
987     getDataSetWatch(path, zkretries);
988   }
989 
990   private void createRescanFailure() {
991     LOG.fatal("logic failure, rescan failure must not happen");
992   }
993 
994   /**
995    * @param path
996    * @param batch
997    * @return null on success, existing task on error
998    */
999   private Task createTaskIfAbsent(String path, TaskBatch batch) {
1000     Task oldtask;
1001     // batch.installed is only changed via this function and
1002     // a single thread touches batch.installed.
1003     Task newtask = new Task();
1004     newtask.batch = batch;
1005     oldtask = tasks.putIfAbsent(path, newtask);
1006     if (oldtask == null) {
1007       batch.installed++;
1008       return  null;
1009     }
1010     // new task was not used.
1011     synchronized (oldtask) {
1012       if (oldtask.isOrphan()) {
1013         if (oldtask.status == SUCCESS) {
1014           // The task is already done. Do not install the batch for this
1015           // task because it might be too late for setDone() to update
1016           // batch.done. There is no need for the batch creator to wait for
1017           // this task to complete.
1018           return (null);
1019         }
1020         if (oldtask.status == IN_PROGRESS) {
1021           oldtask.batch = batch;
1022           batch.installed++;
1023           LOG.debug("Previously orphan task " + path + " is now being waited upon");
1024           return null;
1025         }
1026         while (oldtask.status == FAILURE) {
1027           LOG.debug("wait for status of task " + path + " to change to DELETED");
1028           SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
1029           try {
1030             oldtask.wait();
1031           } catch (InterruptedException e) {
1032             Thread.currentThread().interrupt();
1033             LOG.warn("Interrupted when waiting for znode delete callback");
1034             // fall through to return failure
1035             break;
1036           }
1037         }
1038         if (oldtask.status != DELETED) {
1039           LOG.warn("Failure because previously failed task" +
1040               " state still present. Waiting for znode delete callback" +
1041               " path=" + path);
1042           return oldtask;
1043         }
1044         // reinsert the newTask and it must succeed this time
1045         Task t = tasks.putIfAbsent(path, newtask);
1046         if (t == null) {
1047           batch.installed++;
1048           return  null;
1049         }
1050         LOG.fatal("Logic error. Deleted task still present in tasks map");
1051         assert false : "Deleted task still present in tasks map";
1052         return t;
1053       }
1054       LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
1055       return oldtask;
1056     }
1057   }
1058 
1059   Task findOrCreateOrphanTask(String path) {
1060     Task orphanTask = new Task();
1061     Task task;
1062     task = tasks.putIfAbsent(path, orphanTask);
1063     if (task == null) {
1064       LOG.info("creating orphan task " + path);
1065       SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
1066       task = orphanTask;
1067     }
1068     return task;
1069   }
1070 
1071   @Override
1072   public void nodeDataChanged(String path) {
1073     Task task;
1074     task = tasks.get(path);
1075     if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
1076       if (task != null) {
1077         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
1078       }
1079       getDataSetWatch(path, zkretries);
1080     }
1081   }
1082 
1083   public void stop() {
1084     if (timeoutMonitor != null) {
1085       timeoutMonitor.interrupt();
1086     }
1087   }
1088 
1089   private void lookForOrphans() {
1090     List<String> orphans;
1091     try {
1092        orphans = ZKUtil.listChildrenNoWatch(this.watcher,
1093           this.watcher.splitLogZNode);
1094       if (orphans == null) {
1095         LOG.warn("could not get children of " + this.watcher.splitLogZNode);
1096         return;
1097       }
1098     } catch (KeeperException e) {
1099       LOG.warn("could not get children of " + this.watcher.splitLogZNode +
1100           " " + StringUtils.stringifyException(e));
1101       return;
1102     }
1103     int rescan_nodes = 0;
1104     for (String path : orphans) {
1105       String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path);
1106       if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
1107         rescan_nodes++;
1108         LOG.debug("found orphan rescan node " + path);
1109       } else {
1110         LOG.info("found orphan task " + path);
1111       }
1112       getDataSetWatch(nodepath, zkretries);
1113     }
1114     LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " +
1115         rescan_nodes + " rescan nodes");
1116   }
1117 
1118   /**
1119    * Create znodes /hbase/recovering-regions/[region_ids...]/[failed region server names ...] for
1120    * all regions of the passed in region servers
1121    * @param serverName the name of a region server
1122    * @param userRegions user regiones assigned on the region server
1123    */
1124   void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions)
1125       throws KeeperException {
1126     if (userRegions == null || (this.recoveryMode != RecoveryMode.LOG_REPLAY)) {
1127       return;
1128     }
1129 
1130     try {
1131       this.recoveringRegionLock.lock();
1132       // mark that we're creating recovering znodes
1133       this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTimeMillis();
1134 
1135       for (HRegionInfo region : userRegions) {
1136         String regionEncodeName = region.getEncodedName();
1137         long retries = this.zkretries;
1138 
1139         do {
1140           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);
1141           long lastRecordedFlushedSequenceId = -1;
1142           try {
1143             long lastSequenceId = this.master.getServerManager().getLastFlushedSequenceId(
1144               regionEncodeName.getBytes());
1145 
1146             /*
1147              * znode layout: .../region_id[last known flushed sequence id]/failed server[last known
1148              * flushed sequence id for the server]
1149              */
1150             byte[] data = ZKUtil.getData(this.watcher, nodePath);
1151             if (data == null) {
1152               ZKUtil.createSetData(this.watcher, nodePath,
1153                 ZKUtil.positionToByteArray(lastSequenceId));
1154             } else {
1155               lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
1156               if (lastRecordedFlushedSequenceId < lastSequenceId) {
1157                 // update last flushed sequence id in the region level
1158                 ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
1159               }
1160             }
1161             // go one level deeper with server name
1162             nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
1163             if (lastSequenceId <= lastRecordedFlushedSequenceId) {
1164               // the newly assigned RS failed even before any flush to the region
1165               lastSequenceId = lastRecordedFlushedSequenceId;
1166             }
1167             ZKUtil.createSetData(this.watcher, nodePath,
1168               ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));
1169             LOG.debug("Mark region " + regionEncodeName + " recovering from failed region server "
1170                 + serverName);
1171 
1172             // break retry loop
1173             break;
1174           } catch (KeeperException e) {
1175             // ignore ZooKeeper exceptions inside retry loop
1176             if (retries <= 1) {
1177               throw e;
1178             }
1179             // wait a little bit for retry
1180             try {
1181               Thread.sleep(20);
1182             } catch (Exception ignoreE) {
1183               // ignore
1184             }
1185           }
1186         } while ((--retries) > 0 && (!this.stopper.isStopped()));
1187       }
1188     } finally {
1189       this.recoveringRegionLock.unlock();
1190     }
1191   }
1192 
1193   /**
1194    * @param bytes - Content of a failed region server or recovering region znode.
1195    * @return long - The last flushed sequence Id for the region server
1196    */
1197   public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
1198     long lastRecordedFlushedSequenceId = -1l;
1199     try {
1200       lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes);
1201     } catch (DeserializationException e) {
1202       lastRecordedFlushedSequenceId = -1l;
1203       LOG.warn("Can't parse last flushed sequence Id", e);
1204     }
1205     return lastRecordedFlushedSequenceId;
1206   }
1207 
1208   /**
1209    * check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists
1210    * and set watcher as well.
1211    * @param zkw
1212    * @param regionEncodedName region encode name
1213    * @return true when /hbase/recovering-regions/<current region encoded name> exists
1214    * @throws KeeperException
1215    */
1216   public static boolean
1217       isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName)
1218           throws KeeperException {
1219     boolean result = false;
1220     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName);
1221 
1222     byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath);
1223     if (node != null) {
1224       result = true;
1225     }
1226     return result;
1227   }
1228 
1229   /**
1230    * This function is used in distributedLogReplay to fetch last flushed sequence id from ZK
1231    * @param zkw
1232    * @param serverName
1233    * @param encodedRegionName
1234    * @return the last flushed sequence ids recorded in ZK of the region for <code>serverName<code>
1235    * @throws IOException
1236    */
1237   public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw,
1238       String serverName, String encodedRegionName) throws IOException {
1239     // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits,
1240     // last flushed sequence Id changes when newly assigned RS flushes writes to the region.
1241     // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed
1242     // sequence Id name space (sequence Id only valid for a particular RS instance), changes
1243     // when different newly assigned RS flushes the region.
1244     // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of
1245     // last flushed sequence Id for each failed RS instance.
1246     RegionStoreSequenceIds result = null;
1247     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
1248     nodePath = ZKUtil.joinZNode(nodePath, serverName);
1249     try {
1250       byte[] data = ZKUtil.getData(zkw, nodePath);
1251       if (data != null) {
1252         result = ZKUtil.parseRegionStoreSequenceIds(data);
1253       }
1254     } catch (KeeperException e) {
1255       throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
1256           + serverName + "; region=" + encodedRegionName, e);
1257     } catch (DeserializationException e) {
1258       LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e);
1259     }
1260     return result;
1261   }
1262   
1263   /**
1264    * This function is to set recovery mode from outstanding split log tasks from before or
1265    * current configuration setting
1266    * @param isForInitialization
1267    * @throws KeeperException
1268    * @throws InterruptedIOException
1269    */
1270   public void setRecoveryMode(boolean isForInitialization) throws KeeperException {
1271     if(this.isDrainingDone) {
1272       // when there is no outstanding splitlogtask after master start up, we already have up to date
1273       // recovery mode
1274       return;
1275     }
1276     if(this.watcher == null) {
1277       // when watcher is null(testing code) and recovery mode can only be LOG_SPLITTING
1278       this.isDrainingDone = true;
1279       this.recoveryMode = RecoveryMode.LOG_SPLITTING;
1280       return;
1281     }
1282     boolean hasSplitLogTask = false;
1283     boolean hasRecoveringRegions = false;
1284     RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN;
1285     RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ? 
1286       RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING;
1287 
1288     // Firstly check if there are outstanding recovering regions
1289     List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
1290     if (regions != null && !regions.isEmpty()) {
1291       hasRecoveringRegions = true;
1292       previousRecoveryMode = RecoveryMode.LOG_REPLAY;
1293     }
1294     if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
1295       // Secondly check if there are outstanding split log task
1296       List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
1297       if (tasks != null && !tasks.isEmpty()) {
1298         hasSplitLogTask = true;
1299         if (isForInitialization) {
1300           // during initialization, try to get recovery mode from splitlogtask
1301           for (String task : tasks) {
1302             try {
1303               byte[] data = ZKUtil.getData(this.watcher,
1304                 ZKUtil.joinZNode(watcher.splitLogZNode, task));
1305               if (data == null) continue;
1306               SplitLogTask slt = SplitLogTask.parseFrom(data);
1307               previousRecoveryMode = slt.getMode();
1308               if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
1309                 // created by old code base where we don't set recovery mode in splitlogtask
1310                 // we can safely set to LOG_SPLITTING because we're in master initialization code 
1311                 // before SSH is enabled & there is no outstanding recovering regions
1312                 previousRecoveryMode = RecoveryMode.LOG_SPLITTING;
1313               }
1314               break;
1315             } catch (DeserializationException e) {
1316               LOG.warn("Failed parse data for znode " + task, e);
1317             }
1318           }
1319         }
1320       }
1321     }
1322 
1323     synchronized(this) {
1324       if(this.isDrainingDone) {
1325         return;
1326       }
1327       if (!hasSplitLogTask && !hasRecoveringRegions) {
1328         this.isDrainingDone = true;
1329         this.recoveryMode = recoveryModeInConfig;
1330         return;
1331       } else if (!isForInitialization) {
1332         // splitlogtask hasn't drained yet, keep existing recovery mode
1333         return;
1334       }
1335   
1336       if (previousRecoveryMode != RecoveryMode.UNKNOWN) {
1337         this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig);
1338         this.recoveryMode = previousRecoveryMode;
1339       } else {
1340         this.recoveryMode = recoveryModeInConfig;
1341       }
1342     }
1343   }
1344 
1345   public RecoveryMode getRecoveryMode() {
1346     return this.recoveryMode;
1347   }
1348   
1349   /**
1350    * Returns if distributed log replay is turned on or not
1351    * @param conf
1352    * @return true when distributed log replay is turned on
1353    */
1354   private boolean isDistributedLogReplay(Configuration conf) {
1355     boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
1356       HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
1357     int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
1358     if (LOG.isDebugEnabled()) {
1359       LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version);
1360     }
1361     // For distributed log replay, hfile version must be 3 at least; we need tag support.
1362     return dlr && (version >= 3);
1363   }
1364 
1365   /**
1366    * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
1367    * Clients threads use this object to wait for all their tasks to be done.
1368    * <p>
1369    * All access is synchronized.
1370    */
1371   static class TaskBatch {
1372     int installed = 0;
1373     int done = 0;
1374     int error = 0;
1375     volatile boolean isDead = false;
1376 
1377     @Override
1378     public String toString() {
1379       return ("installed = " + installed + " done = " + done + " error = " + error);
1380     }
1381   }
1382 
1383   /**
1384    * in memory state of an active task.
1385    */
1386   static class Task {
1387     volatile long last_update;
1388     volatile int last_version;
1389     volatile ServerName cur_worker_name;
1390     volatile TaskBatch batch;
1391     volatile TerminationStatus status;
1392     volatile int incarnation;
1393     final AtomicInteger unforcedResubmits = new AtomicInteger();
1394     volatile boolean resubmitThresholdReached;
1395 
1396     @Override
1397     public String toString() {
1398       return ("last_update = " + last_update +
1399           " last_version = " + last_version +
1400           " cur_worker_name = " + cur_worker_name +
1401           " status = " + status +
1402           " incarnation = " + incarnation +
1403           " resubmits = " + unforcedResubmits.get() +
1404           " batch = " + batch);
1405     }
1406 
1407     Task() {
1408       incarnation = 0;
1409       last_version = -1;
1410       status = IN_PROGRESS;
1411       setUnassigned();
1412     }
1413 
1414     public boolean isOrphan() {
1415       return (batch == null || batch.isDead);
1416     }
1417 
1418     public boolean isUnassigned() {
1419       return (cur_worker_name == null);
1420     }
1421 
1422     public void heartbeatNoDetails(long time) {
1423       last_update = time;
1424     }
1425 
1426     public void heartbeat(long time, int version, ServerName worker) {
1427       last_version = version;
1428       last_update = time;
1429       cur_worker_name = worker;
1430     }
1431 
1432     public void setUnassigned() {
1433       cur_worker_name = null;
1434       last_update = -1;
1435     }
1436   }
1437 
1438   void handleDeadWorker(ServerName workerName) {
1439     // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
1440     // to reason about concurrency. Makes it easier to retry.
1441     synchronized (deadWorkersLock) {
1442       if (deadWorkers == null) {
1443         deadWorkers = new HashSet<ServerName>(100);
1444       }
1445       deadWorkers.add(workerName);
1446     }
1447     LOG.info("dead splitlog worker " + workerName);
1448   }
1449 
1450   void handleDeadWorkers(Set<ServerName> serverNames) {
1451     synchronized (deadWorkersLock) {
1452       if (deadWorkers == null) {
1453         deadWorkers = new HashSet<ServerName>(100);
1454       }
1455       deadWorkers.addAll(serverNames);
1456     }
1457     LOG.info("dead splitlog workers " + serverNames);
1458   }
1459 
1460   /**
1461    * Periodically checks all active tasks and resubmits the ones that have timed
1462    * out
1463    */
1464   private class TimeoutMonitor extends Chore {
1465     private long lastLog = 0;
1466 
1467     public TimeoutMonitor(final int period, Stoppable stopper) {
1468       super("SplitLogManager Timeout Monitor", period, stopper);
1469     }
1470 
1471     @Override
1472     protected void chore() {
1473       int resubmitted = 0;
1474       int unassigned = 0;
1475       int tot = 0;
1476       boolean found_assigned_task = false;
1477       Set<ServerName> localDeadWorkers;
1478 
1479       synchronized (deadWorkersLock) {
1480         localDeadWorkers = deadWorkers;
1481         deadWorkers = null;
1482       }
1483 
1484       for (Map.Entry<String, Task> e : tasks.entrySet()) {
1485         String path = e.getKey();
1486         Task task = e.getValue();
1487         ServerName cur_worker = task.cur_worker_name;
1488         tot++;
1489         // don't easily resubmit a task which hasn't been picked up yet. It
1490         // might be a long while before a SplitLogWorker is free to pick up a
1491         // task. This is because a SplitLogWorker picks up a task one at a
1492         // time. If we want progress when there are no region servers then we
1493         // will have to run a SplitLogWorker thread in the Master.
1494         if (task.isUnassigned()) {
1495           unassigned++;
1496           continue;
1497         }
1498         found_assigned_task = true;
1499         if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
1500           SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
1501           if (resubmit(path, task, FORCE)) {
1502             resubmitted++;
1503           } else {
1504             handleDeadWorker(cur_worker);
1505             LOG.warn("Failed to resubmit task " + path + " owned by dead " +
1506                 cur_worker + ", will retry.");
1507           }
1508         } else if (resubmit(path, task, CHECK)) {
1509           resubmitted++;
1510         }
1511       }
1512       if (tot > 0) {
1513         long now = EnvironmentEdgeManager.currentTimeMillis();
1514         if (now > lastLog + 5000) {
1515           lastLog = now;
1516           LOG.info("total tasks = " + tot + " unassigned = " + unassigned + " tasks=" + tasks);
1517         }
1518       }
1519       if (resubmitted > 0) {
1520         LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
1521       }
1522       // If there are pending tasks and all of them have been unassigned for
1523       // some time then put up a RESCAN node to ping the workers.
1524       // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
1525       // because a. it is very unlikely that every worker had a
1526       // transient error when trying to grab the task b. if there are no
1527       // workers then all tasks wills stay unassigned indefinitely and the
1528       // manager will be indefinitely creating RESCAN nodes. TODO may be the
1529       // master should spawn both a manager and a worker thread to guarantee
1530       // that there is always one worker in the system
1531       if (tot > 0 && !found_assigned_task &&
1532           ((EnvironmentEdgeManager.currentTimeMillis() - lastTaskCreateTime) >
1533           unassignedTimeout)) {
1534         for (Map.Entry<String, Task> e : tasks.entrySet()) {
1535           String path = e.getKey();
1536           Task task = e.getValue();
1537           // we have to do task.isUnassigned() check again because tasks might
1538           // have been asynchronously assigned. There is no locking required
1539           // for these checks ... it is OK even if tryGetDataSetWatch() is
1540           // called unnecessarily for a task
1541           if (task.isUnassigned() && (task.status != FAILURE)) {
1542             // We just touch the znode to make sure its still there
1543             tryGetDataSetWatch(path);
1544           }
1545         }
1546         createRescanNode(Long.MAX_VALUE);
1547         SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
1548         LOG.debug("resubmitting unassigned task(s) after timeout");
1549       }
1550 
1551       // Retry previously failed deletes
1552       if (failedDeletions.size() > 0) {
1553         List<String> tmpPaths = new ArrayList<String>(failedDeletions);
1554         for (String tmpPath : tmpPaths) {
1555           // deleteNode is an async call
1556           deleteNode(tmpPath, zkretries);
1557         }
1558         failedDeletions.removeAll(tmpPaths);
1559       }
1560 
1561       // Garbage collect left-over /hbase/recovering-regions/... znode
1562       long timeInterval = EnvironmentEdgeManager.currentTimeMillis()
1563           - lastRecoveringNodeCreationTime;
1564       if (!failedRecoveringRegionDeletions.isEmpty()
1565           || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
1566         // inside the function there have more checks before GC anything
1567         if (!failedRecoveringRegionDeletions.isEmpty()) {
1568           List<Pair<Set<ServerName>, Boolean>> previouslyFailedDeletions =
1569               new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions);
1570           failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions);
1571           for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) {
1572             removeRecoveringRegionsFromZK(failedDeletion.getFirst(), failedDeletion.getSecond());
1573           }
1574         } else {
1575           removeRecoveringRegionsFromZK(null, null);
1576         }
1577       }
1578     }
1579   }
1580 
1581   /**
1582    * Asynchronous handler for zk create node results.
1583    * Retries on failures.
1584    */
1585   class CreateAsyncCallback implements AsyncCallback.StringCallback {
1586     private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class);
1587 
1588     @Override
1589     public void processResult(int rc, String path, Object ctx, String name) {
1590       SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
1591       if (rc != 0) {
1592         if (needAbandonRetries(rc, "Create znode " + path)) {
1593           createNodeFailure(path);
1594           return;
1595         }
1596         if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
1597           // What if there is a delete pending against this pre-existing
1598           // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
1599           // state. Only operations that will be carried out on this node by
1600           // this manager are get-znode-data, task-finisher and delete-znode.
1601           // And all code pieces correctly handle the case of suddenly
1602           // disappearing task-znode.
1603           LOG.debug("found pre-existing znode " + path);
1604           SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
1605         } else {
1606           Long retry_count = (Long)ctx;
1607           LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " +
1608               path + " remaining retries=" + retry_count);
1609           if (retry_count == 0) {
1610             SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
1611             createNodeFailure(path);
1612           } else {
1613             SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
1614             createNode(path, retry_count - 1);
1615           }
1616           return;
1617         }
1618       }
1619       createNodeSuccess(path);
1620     }
1621   }
1622 
1623   /**
1624    * Asynchronous handler for zk get-data-set-watch on node results.
1625    * Retries on failures.
1626    */
1627   class GetDataAsyncCallback implements AsyncCallback.DataCallback {
1628     private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
1629     private boolean completeTaskOnNoNode;
1630 
1631     /**
1632      * @param completeTaskOnNoNode Complete the task if the znode cannot be found.
1633      * Since in-memory task creation and znode creation are not atomic, there might be
1634      * a race where there is a task in memory but the znode is not created yet (TimeoutMonitor).
1635      * In this case completeTaskOnNoNode should be set to false. See HBASE-11217.
1636      */
1637     public GetDataAsyncCallback(boolean completeTaskOnNoNode) {
1638       this.completeTaskOnNoNode = completeTaskOnNoNode;
1639     }
1640 
1641     @Override
1642     public void processResult(int rc, String path, Object ctx, byte[] data,
1643         Stat stat) {
1644       SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
1645       if (rc != 0) {
1646         if (needAbandonRetries(rc, "GetData from znode " + path)) {
1647           return;
1648         }
1649         if (rc == KeeperException.Code.NONODE.intValue()) {
1650           SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
1651           LOG.warn("task znode " + path + " vanished.");
1652           if (completeTaskOnNoNode) {
1653             // The task znode has been deleted. Must be some pending delete
1654             // that deleted the task. Assume success because a task-znode is
1655             // is only deleted after TaskFinisher is successful.
1656             try {
1657               getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
1658             } catch (DeserializationException e) {
1659               LOG.warn("Deserialization problem", e);
1660             }
1661           }
1662           return;
1663         }
1664         Long retry_count = (Long) ctx;
1665 
1666         if (retry_count < 0) {
1667           LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1668               path + ". Ignoring error. No error handling. No retrying.");
1669           return;
1670         }
1671         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1672             path + " remaining retries=" + retry_count);
1673         if (retry_count == 0) {
1674           SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
1675           getDataSetWatchFailure(path);
1676         } else {
1677           SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
1678           getDataSetWatch(path, retry_count - 1);
1679         }
1680         return;
1681       }
1682       try {
1683         getDataSetWatchSuccess(path, data, stat.getVersion());
1684       } catch (DeserializationException e) {
1685         LOG.warn("Deserialization problem", e);
1686       }
1687       return;
1688     }
1689   }
1690 
1691   /**
1692    * Asynchronous handler for zk delete node results.
1693    * Retries on failures.
1694    */
1695   class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
1696     private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class);
1697 
1698     @Override
1699     public void processResult(int rc, String path, Object ctx) {
1700       SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
1701       if (rc != 0) {
1702         if (needAbandonRetries(rc, "Delete znode " + path)) {
1703           failedDeletions.add(path);
1704           return;
1705         }
1706         if (rc != KeeperException.Code.NONODE.intValue()) {
1707           SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
1708           Long retry_count = (Long) ctx;
1709           LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " +
1710               path + " remaining retries=" + retry_count);
1711           if (retry_count == 0) {
1712             LOG.warn("delete failed " + path);
1713             failedDeletions.add(path);
1714             deleteNodeFailure(path);
1715           } else {
1716             deleteNode(path, retry_count - 1);
1717           }
1718           return;
1719         } else {
1720           LOG.info(path +
1721             " does not exist. Either was created but deleted behind our" +
1722             " back by another pending delete OR was deleted" +
1723             " in earlier retry rounds. zkretries = " + (Long) ctx);
1724         }
1725       } else {
1726         LOG.debug("deleted " + path);
1727       }
1728       deleteNodeSuccess(path);
1729     }
1730   }
1731 
1732   /**
1733    * Asynchronous handler for zk create RESCAN-node results.
1734    * Retries on failures.
1735    * <p>
1736    * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal
1737    * for all the {@link SplitLogWorker}s to rescan for new tasks.
1738    */
1739   class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
1740     private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class);
1741 
1742     @Override
1743     public void processResult(int rc, String path, Object ctx, String name) {
1744       if (rc != 0) {
1745         if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
1746           return;
1747         }
1748         Long retry_count = (Long)ctx;
1749         LOG.warn("rc=" + KeeperException.Code.get(rc) + " for "+ path +
1750             " remaining retries=" + retry_count);
1751         if (retry_count == 0) {
1752           createRescanFailure();
1753         } else {
1754           createRescanNode(retry_count - 1);
1755         }
1756         return;
1757       }
1758       // path is the original arg, name is the actual name that was created
1759       createRescanSuccess(name);
1760     }
1761   }
1762 
1763   /**
1764    * {@link SplitLogManager} can use objects implementing this interface to
1765    * finish off a partially done task by {@link SplitLogWorker}. This provides
1766    * a serialization point at the end of the task processing. Must be
1767    * restartable and idempotent.
1768    */
1769   public interface TaskFinisher {
1770     /**
1771      * status that can be returned finish()
1772      */
1773     enum Status {
1774       /**
1775        * task completed successfully
1776        */
1777       DONE(),
1778       /**
1779        * task completed with error
1780        */
1781       ERR();
1782     }
1783     /**
1784      * finish the partially done task. workername provides clue to where the
1785      * partial results of the partially done tasks are present. taskname is the
1786      * name of the task that was put up in zookeeper.
1787      * <p>
1788      * @param workerName
1789      * @param taskname
1790      * @return DONE if task completed successfully, ERR otherwise
1791      */
1792     Status finish(ServerName workerName, String taskname);
1793   }
1794 
1795   enum ResubmitDirective {
1796     CHECK(),
1797     FORCE();
1798   }
1799 
1800   enum TerminationStatus {
1801     IN_PROGRESS("in_progress"),
1802     SUCCESS("success"),
1803     FAILURE("failure"),
1804     DELETED("deleted");
1805 
1806     String statusMsg;
1807     TerminationStatus(String msg) {
1808       statusMsg = msg;
1809     }
1810 
1811     @Override
1812     public String toString() {
1813       return statusMsg;
1814     }
1815   }
1816 }