1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master;
19
20 import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21 import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
26
27 import java.io.IOException;
28 import java.io.InterruptedIOException;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.HashSet;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ConcurrentMap;
37 import java.util.concurrent.atomic.AtomicInteger;
38 import java.util.concurrent.locks.ReentrantLock;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.classification.InterfaceAudience;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.fs.FileStatus;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.fs.PathFilter;
48 import org.apache.hadoop.hbase.Chore;
49 import org.apache.hadoop.hbase.HConstants;
50 import org.apache.hadoop.hbase.HRegionInfo;
51 import org.apache.hadoop.hbase.ServerName;
52 import org.apache.hadoop.hbase.SplitLogCounters;
53 import org.apache.hadoop.hbase.SplitLogTask;
54 import org.apache.hadoop.hbase.Stoppable;
55 import org.apache.hadoop.hbase.exceptions.DeserializationException;
56 import org.apache.hadoop.hbase.io.hfile.HFile;
57 import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
58 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
59 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
60 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
61 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
62 import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
63 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
64 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
65 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
66 import org.apache.hadoop.hbase.util.FSUtils;
67 import org.apache.hadoop.hbase.util.Pair;
68 import org.apache.hadoop.hbase.util.Threads;
69 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
70 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
71 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
72 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
73 import org.apache.hadoop.util.StringUtils;
74 import org.apache.zookeeper.AsyncCallback;
75 import org.apache.zookeeper.CreateMode;
76 import org.apache.zookeeper.KeeperException;
77 import org.apache.zookeeper.KeeperException.NoNodeException;
78 import org.apache.zookeeper.ZooDefs.Ids;
79 import org.apache.zookeeper.data.Stat;
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 @InterfaceAudience.Private
110 public class SplitLogManager extends ZooKeeperListener {
111 private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
112
113 public static final int DEFAULT_TIMEOUT = 120000;
114 public static final int DEFAULT_ZK_RETRIES = 3;
115 public static final int DEFAULT_MAX_RESUBMIT = 3;
116 public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000);
117
118 private final Stoppable stopper;
119 private final MasterServices master;
120 private final ServerName serverName;
121 private final TaskFinisher taskFinisher;
122 private FileSystem fs;
123 private Configuration conf;
124
125 private long zkretries;
126 private long resubmit_threshold;
127 private long timeout;
128 private long unassignedTimeout;
129 private long lastTaskCreateTime = Long.MAX_VALUE;
130 public boolean ignoreZKDeleteForTesting = false;
131 private volatile long lastRecoveringNodeCreationTime = 0;
132
133
134 private long checkRecoveringTimeThreshold = 15000;
135 private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections
136 .synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>());
137
138
139
140
141
142 protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
143
144 private volatile RecoveryMode recoveryMode;
145 private volatile boolean isDrainingDone = false;
146
147 private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
148 private TimeoutMonitor timeoutMonitor;
149
150 private volatile Set<ServerName> deadWorkers = null;
151 private final Object deadWorkersLock = new Object();
152
153 private Set<String> failedDeletions = null;
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
170 Stoppable stopper, MasterServices master, ServerName serverName)
171 throws InterruptedIOException, KeeperException {
172 this(zkw, conf, stopper, master, serverName, false, null);
173 }
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192 public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
193 Stoppable stopper, MasterServices master, ServerName serverName, boolean masterRecovery)
194 throws InterruptedIOException, KeeperException {
195 this(zkw, conf, stopper, master, serverName, masterRecovery, new TaskFinisher() {
196 @Override
197 public Status finish(ServerName workerName, String logfile) {
198 try {
199 HLogSplitter.finishSplitLogFile(logfile, conf);
200 } catch (IOException e) {
201 LOG.warn("Could not finish splitting of log file " + logfile, e);
202 return Status.ERR;
203 }
204 return Status.DONE;
205 }
206 });
207 }
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224 public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
225 Stoppable stopper, MasterServices master,
226 ServerName serverName, boolean masterRecovery, TaskFinisher tf)
227 throws InterruptedIOException, KeeperException {
228 super(zkw);
229 this.taskFinisher = tf;
230 this.conf = conf;
231 this.stopper = stopper;
232 this.master = master;
233 this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
234 this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
235 this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
236 this.unassignedTimeout =
237 conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
238
239
240 setRecoveryMode(true);
241
242 LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout +
243 ", distributedLogReplay=" + (this.recoveryMode == RecoveryMode.LOG_REPLAY));
244
245 this.serverName = serverName;
246 this.timeoutMonitor = new TimeoutMonitor(
247 conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
248
249 this.failedDeletions = Collections.synchronizedSet(new HashSet<String>());
250
251 if (!masterRecovery) {
252 Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
253 + ".splitLogManagerTimeoutMonitor");
254 }
255
256 if (this.watcher != null) {
257 this.watcher.registerListener(this);
258 lookForOrphans();
259 }
260 }
261
262 private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
263 List<FileStatus> fileStatus = new ArrayList<FileStatus>();
264 for (Path hLogDir : logDirs) {
265 this.fs = hLogDir.getFileSystem(conf);
266 if (!fs.exists(hLogDir)) {
267 LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
268 continue;
269 }
270 FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
271 if (logfiles == null || logfiles.length == 0) {
272 LOG.info(hLogDir + " is empty dir, no logs to split");
273 } else {
274 for (FileStatus status : logfiles)
275 fileStatus.add(status);
276 }
277 }
278 FileStatus[] a = new FileStatus[fileStatus.size()];
279 return fileStatus.toArray(a);
280 }
281
282
283
284
285
286
287
288
289
290 public long splitLogDistributed(final Path logDir) throws IOException {
291 List<Path> logDirs = new ArrayList<Path>();
292 logDirs.add(logDir);
293 return splitLogDistributed(logDirs);
294 }
295
296
297
298
299
300
301
302
303
304
305
306 public long splitLogDistributed(final List<Path> logDirs) throws IOException {
307 if (logDirs.isEmpty()) {
308 return 0;
309 }
310 Set<ServerName> serverNames = new HashSet<ServerName>();
311 for (Path logDir : logDirs) {
312 try {
313 ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logDir);
314 if (serverName != null) {
315 serverNames.add(serverName);
316 }
317 } catch (IllegalArgumentException e) {
318
319 LOG.warn("Cannot parse server name from " + logDir);
320 }
321 }
322 return splitLogDistributed(serverNames, logDirs, null);
323 }
324
325
326
327
328
329
330
331
332
333
334
335
336 public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
337 PathFilter filter) throws IOException {
338 MonitoredTask status = TaskMonitor.get().createStatus(
339 "Doing distributed log split in " + logDirs);
340 FileStatus[] logfiles = getFileList(logDirs, filter);
341 status.setStatus("Checking directory contents...");
342 LOG.debug("Scheduling batch of logs to split");
343 SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
344 LOG.info("started splitting " + logfiles.length + " logs in " + logDirs);
345 long t = EnvironmentEdgeManager.currentTimeMillis();
346 long totalSize = 0;
347 TaskBatch batch = new TaskBatch();
348 Boolean isMetaRecovery = (filter == null) ? null : false;
349 for (FileStatus lf : logfiles) {
350
351
352
353
354
355 totalSize += lf.getLen();
356 String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
357 if (!enqueueSplitTask(pathToLog, batch)) {
358 throw new IOException("duplicate log split scheduled for " + lf.getPath());
359 }
360 }
361 waitForSplittingCompletion(batch, status);
362
363 if (filter == MasterFileSystem.META_FILTER
364
365
366 isMetaRecovery = true;
367 }
368 this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery);
369
370 if (batch.done != batch.installed) {
371 batch.isDead = true;
372 SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
373 LOG.warn("error while splitting logs in " + logDirs +
374 " installed = " + batch.installed + " but only " + batch.done + " done");
375 String msg = "error or interrupted while splitting logs in "
376 + logDirs + " Task = " + batch;
377 status.abort(msg);
378 throw new IOException(msg);
379 }
380 for(Path logDir: logDirs){
381 status.setStatus("Cleaning up log directory...");
382 try {
383 if (fs.exists(logDir) && !fs.delete(logDir, false)) {
384 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
385 }
386 } catch (IOException ioe) {
387 FileStatus[] files = fs.listStatus(logDir);
388 if (files != null && files.length > 0) {
389 LOG.warn("returning success without actually splitting and " +
390 "deleting all the log files in path " + logDir);
391 } else {
392 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
393 }
394 }
395 SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
396 }
397 String msg = "finished splitting (more than or equal to) " + totalSize +
398 " bytes in " + batch.installed + " log files in " + logDirs + " in " +
399 (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms";
400 status.markComplete(msg);
401 LOG.info(msg);
402 return totalSize;
403 }
404
405
406
407
408
409
410
411
412 boolean enqueueSplitTask(String taskname, TaskBatch batch) {
413 SplitLogCounters.tot_mgr_log_split_start.incrementAndGet();
414
415
416 String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
417 lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
418 Task oldtask = createTaskIfAbsent(path, batch);
419 if (oldtask == null) {
420
421 createNode(path, zkretries);
422 return true;
423 }
424 return false;
425 }
426
427 private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
428 synchronized (batch) {
429 while ((batch.done + batch.error) != batch.installed) {
430 try {
431 status.setStatus("Waiting for distributed tasks to finish. "
432 + " scheduled=" + batch.installed
433 + " done=" + batch.done
434 + " error=" + batch.error);
435 int remaining = batch.installed - (batch.done + batch.error);
436 int actual = activeTasks(batch);
437 if (remaining != actual) {
438 LOG.warn("Expected " + remaining
439 + " active tasks, but actually there are " + actual);
440 }
441 int remainingInZK = remainingTasksInZK();
442 if (remainingInZK >= 0 && actual > remainingInZK) {
443 LOG.warn("Expected at least" + actual
444 + " tasks in ZK, but actually there are " + remainingInZK);
445 }
446 if (remainingInZK == 0 || actual == 0) {
447 LOG.warn("No more task remaining (ZK or task map), splitting "
448 + "should have completed. Remaining tasks in ZK " + remainingInZK
449 + ", active tasks in map " + actual);
450 if (remainingInZK == 0 && actual == 0) {
451 return;
452 }
453 }
454 batch.wait(100);
455 if (stopper.isStopped()) {
456 LOG.warn("Stopped while waiting for log splits to be completed");
457 return;
458 }
459 } catch (InterruptedException e) {
460 LOG.warn("Interrupted while waiting for log splits to be completed");
461 Thread.currentThread().interrupt();
462 return;
463 }
464 }
465 }
466 }
467
468 private int activeTasks(final TaskBatch batch) {
469 int count = 0;
470 for (Task t: tasks.values()) {
471 if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
472 count++;
473 }
474 }
475 return count;
476 }
477
478 private int remainingTasksInZK() {
479 int count = 0;
480 try {
481 List<String> tasks =
482 ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
483 if (tasks != null) {
484 for (String t: tasks) {
485 if (!ZKSplitLog.isRescanNode(watcher, t)) {
486 count++;
487 }
488 }
489 }
490 } catch (KeeperException ke) {
491 LOG.warn("Failed to check remaining tasks", ke);
492 count = -1;
493 }
494 return count;
495 }
496
497
498
499
500
501
502
503
504 private void
505 removeRecoveringRegionsFromZK(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
506 if (this.recoveryMode != RecoveryMode.LOG_REPLAY) {
507
508 return;
509 }
510
511 final String metaEncodeRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedName();
512 int count = 0;
513 Set<String> recoveredServerNameSet = new HashSet<String>();
514 if (serverNames != null) {
515 for (ServerName tmpServerName : serverNames) {
516 recoveredServerNameSet.add(tmpServerName.getServerName());
517 }
518 }
519
520 try {
521 this.recoveringRegionLock.lock();
522
523 List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
524 if (tasks != null) {
525 for (String t : tasks) {
526 if (!ZKSplitLog.isRescanNode(watcher, t)) {
527 count++;
528 }
529 }
530 }
531 if (count == 0 && this.master.isInitialized()
532 && !this.master.getServerManager().areDeadServersInProgress()) {
533
534 deleteRecoveringRegionZNodes(watcher, null);
535
536
537 lastRecoveringNodeCreationTime = Long.MAX_VALUE;
538 } else if (!recoveredServerNameSet.isEmpty()) {
539
540 List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
541 if (regions != null) {
542 for (String region : regions) {
543 if(isMetaRecovery != null) {
544 if ((isMetaRecovery && !region.equalsIgnoreCase(metaEncodeRegionName))
545 || (!isMetaRecovery && region.equalsIgnoreCase(metaEncodeRegionName))) {
546
547
548 continue;
549 }
550 }
551 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
552 List<String> failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
553 if (failedServers == null || failedServers.isEmpty()) {
554 ZKUtil.deleteNode(watcher, nodePath);
555 continue;
556 }
557 if (recoveredServerNameSet.containsAll(failedServers)) {
558 ZKUtil.deleteNodeRecursively(watcher, nodePath);
559 } else {
560 for (String failedServer : failedServers) {
561 if (recoveredServerNameSet.contains(failedServer)) {
562 String tmpPath = ZKUtil.joinZNode(nodePath, failedServer);
563 ZKUtil.deleteNode(watcher, tmpPath);
564 }
565 }
566 }
567 }
568 }
569 }
570 } catch (KeeperException ke) {
571 LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke);
572 if (serverNames != null && !serverNames.isEmpty()) {
573 this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames,
574 isMetaRecovery));
575 }
576 } finally {
577 this.recoveringRegionLock.unlock();
578 }
579 }
580
581
582
583
584
585
586
587 void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
588 throws KeeperException {
589
590 Set<String> knownFailedServers = new HashSet<String>();
591 if (failedServers != null) {
592 for (ServerName tmpServerName : failedServers) {
593 knownFailedServers.add(tmpServerName.getServerName());
594 }
595 }
596
597 this.recoveringRegionLock.lock();
598 try {
599 List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
600 if (tasks != null) {
601 for (String t : tasks) {
602 byte[] data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t));
603 if (data != null) {
604 SplitLogTask slt = null;
605 try {
606 slt = SplitLogTask.parseFrom(data);
607 } catch (DeserializationException e) {
608 LOG.warn("Failed parse data for znode " + t, e);
609 }
610 if (slt != null && slt.isDone()) {
611 continue;
612 }
613 }
614
615 t = ZKSplitLog.getFileName(t);
616 ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(new Path(t));
617 if (serverName != null) {
618 knownFailedServers.add(serverName.getServerName());
619 } else {
620 LOG.warn("Found invalid WAL log file name:" + t);
621 }
622 }
623 }
624
625
626 List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
627 if (regions != null) {
628 for (String region : regions) {
629 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
630 List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
631 if (regionFailedServers == null || regionFailedServers.isEmpty()) {
632 ZKUtil.deleteNode(watcher, nodePath);
633 continue;
634 }
635 boolean needMoreRecovery = false;
636 for (String tmpFailedServer : regionFailedServers) {
637 if (knownFailedServers.contains(tmpFailedServer)) {
638 needMoreRecovery = true;
639 break;
640 }
641 }
642 if (!needMoreRecovery) {
643 ZKUtil.deleteNodeRecursively(watcher, nodePath);
644 }
645 }
646 }
647 } finally {
648 this.recoveringRegionLock.unlock();
649 }
650 }
651
652 public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) {
653 try {
654 if (regions == null) {
655
656 LOG.info("Garbage collecting all recovering regions.");
657 ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
658 } else {
659 for (String curRegion : regions) {
660 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion);
661 ZKUtil.deleteNodeRecursively(watcher, nodePath);
662 }
663 }
664 } catch (KeeperException e) {
665 LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
666 }
667 }
668
669 private void setDone(String path, TerminationStatus status) {
670 Task task = tasks.get(path);
671 if (task == null) {
672 if (!ZKSplitLog.isRescanNode(watcher, path)) {
673 SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
674 LOG.debug("unacquired orphan task is done " + path);
675 }
676 } else {
677 synchronized (task) {
678 if (task.status == IN_PROGRESS) {
679 if (status == SUCCESS) {
680 SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
681 LOG.info("Done splitting " + path);
682 } else {
683 SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
684 LOG.warn("Error splitting " + path);
685 }
686 task.status = status;
687 if (task.batch != null) {
688 synchronized (task.batch) {
689 if (status == SUCCESS) {
690 task.batch.done++;
691 } else {
692 task.batch.error++;
693 }
694 task.batch.notify();
695 }
696 }
697 }
698 }
699 }
700
701
702
703
704
705 deleteNode(path, zkretries);
706 return;
707 }
708
709 private void createNode(String path, Long retry_count) {
710 SplitLogTask slt = new SplitLogTask.Unassigned(serverName, this.recoveryMode);
711 ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count);
712 SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
713 return;
714 }
715
716 private void createNodeSuccess(String path) {
717 LOG.debug("put up splitlog task at znode " + path);
718 getDataSetWatch(path, zkretries);
719 }
720
721 private void createNodeFailure(String path) {
722
723 LOG.warn("failed to create task node" + path);
724 setDone(path, FAILURE);
725 }
726
727
728 private void getDataSetWatch(String path, Long retry_count) {
729 this.watcher.getRecoverableZooKeeper().getZooKeeper().
730 getData(path, this.watcher,
731 new GetDataAsyncCallback(true), retry_count);
732 SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
733 }
734
735 private void tryGetDataSetWatch(String path) {
736
737 this.watcher.getRecoverableZooKeeper().getZooKeeper().
738 getData(path, this.watcher,
739 new GetDataAsyncCallback(false), Long.valueOf(-1)
740 SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
741 }
742
743 private void getDataSetWatchSuccess(String path, byte[] data, int version)
744 throws DeserializationException {
745 if (data == null) {
746 if (version == Integer.MIN_VALUE) {
747
748 setDone(path, SUCCESS);
749 return;
750 }
751 SplitLogCounters.tot_mgr_null_data.incrementAndGet();
752 LOG.fatal("logic error - got null data " + path);
753 setDone(path, FAILURE);
754 return;
755 }
756 data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
757 SplitLogTask slt = SplitLogTask.parseFrom(data);
758 if (slt.isUnassigned()) {
759 LOG.debug("task not yet acquired " + path + " ver = " + version);
760 handleUnassignedTask(path);
761 } else if (slt.isOwned()) {
762 heartbeat(path, version, slt.getServerName());
763 } else if (slt.isResigned()) {
764 LOG.info("task " + path + " entered state: " + slt.toString());
765 resubmitOrFail(path, FORCE);
766 } else if (slt.isDone()) {
767 LOG.info("task " + path + " entered state: " + slt.toString());
768 if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
769 if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
770 setDone(path, SUCCESS);
771 } else {
772 resubmitOrFail(path, CHECK);
773 }
774 } else {
775 setDone(path, SUCCESS);
776 }
777 } else if (slt.isErr()) {
778 LOG.info("task " + path + " entered state: " + slt.toString());
779 resubmitOrFail(path, CHECK);
780 } else {
781 LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + slt.toString());
782 setDone(path, FAILURE);
783 }
784 }
785
786 private void getDataSetWatchFailure(String path) {
787 LOG.warn("failed to set data watch " + path);
788 setDone(path, FAILURE);
789 }
790
791
792
793
794
795
796
797
798
799
800 private void handleUnassignedTask(String path) {
801 if (ZKSplitLog.isRescanNode(watcher, path)) {
802 return;
803 }
804 Task task = findOrCreateOrphanTask(path);
805 if (task.isOrphan() && (task.incarnation == 0)) {
806 LOG.info("resubmitting unassigned orphan task " + path);
807
808
809 resubmit(path, task, FORCE);
810 }
811 }
812
813
814
815
816
817
818
819 private boolean needAbandonRetries(int statusCode, String action) {
820 if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
821 LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
822 + "action=" + action);
823 return true;
824 }
825 return false;
826 }
827
828 private void heartbeat(String path, int new_version, ServerName workerName) {
829 Task task = findOrCreateOrphanTask(path);
830 if (new_version != task.last_version) {
831 if (task.isUnassigned()) {
832 LOG.info("task " + path + " acquired by " + workerName);
833 }
834 task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(), new_version, workerName);
835 SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
836 } else {
837
838
839
840
841 }
842 return;
843 }
844
845 private boolean resubmit(String path, Task task, ResubmitDirective directive) {
846
847 if (task.status != IN_PROGRESS) {
848 return false;
849 }
850 int version;
851 if (directive != FORCE) {
852
853
854
855
856
857 final long time = EnvironmentEdgeManager.currentTimeMillis() - task.last_update;
858 final boolean alive = master.getServerManager() != null ?
859 master.getServerManager().isServerOnline(task.cur_worker_name) : true;
860 if (alive && time < timeout) {
861 LOG.trace("Skipping the resubmit of " + task.toString() + " because the server " +
862 task.cur_worker_name + " is not marked as dead, we waited for " + time +
863 " while the timeout is " + timeout);
864 return false;
865 }
866 if (task.unforcedResubmits.get() >= resubmit_threshold) {
867 if (!task.resubmitThresholdReached) {
868 task.resubmitThresholdReached = true;
869 SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
870 LOG.info("Skipping resubmissions of task " + path +
871 " because threshold " + resubmit_threshold + " reached");
872 }
873 return false;
874 }
875
876 version = task.last_version;
877 } else {
878 SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet();
879 version = -1;
880 }
881 LOG.info("resubmitting task " + path);
882 task.incarnation++;
883 try {
884
885 SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName, this.recoveryMode);
886 if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
887 LOG.debug("failed to resubmit task " + path +
888 " version changed");
889 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
890 return false;
891 }
892 } catch (NoNodeException e) {
893 LOG.warn("failed to resubmit because znode doesn't exist " + path +
894 " task done (or forced done by removing the znode)");
895 try {
896 getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
897 } catch (DeserializationException e1) {
898 LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
899 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
900 return false;
901 }
902 return false;
903 } catch (KeeperException.BadVersionException e) {
904 LOG.debug("failed to resubmit task " + path + " version changed");
905 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
906 return false;
907 } catch (KeeperException e) {
908 SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
909 LOG.warn("failed to resubmit " + path, e);
910 return false;
911 }
912
913 if (directive != FORCE) {
914 task.unforcedResubmits.incrementAndGet();
915 }
916 task.setUnassigned();
917 createRescanNode(Long.MAX_VALUE);
918 SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
919 return true;
920 }
921
922 private void resubmitOrFail(String path, ResubmitDirective directive) {
923 if (resubmit(path, findOrCreateOrphanTask(path), directive) == false) {
924 setDone(path, FAILURE);
925 }
926 }
927
928 private void deleteNode(String path, Long retries) {
929 SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
930
931
932
933 this.watcher.getRecoverableZooKeeper().getZooKeeper().
934 delete(path, -1, new DeleteAsyncCallback(),
935 retries);
936 }
937
938 private void deleteNodeSuccess(String path) {
939 if (ignoreZKDeleteForTesting) {
940 return;
941 }
942 Task task;
943 task = tasks.remove(path);
944 if (task == null) {
945 if (ZKSplitLog.isRescanNode(watcher, path)) {
946 SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
947 }
948 SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
949 LOG.debug("deleted task without in memory state " + path);
950 return;
951 }
952 synchronized (task) {
953 task.status = DELETED;
954 task.notify();
955 }
956 SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
957 }
958
959 private void deleteNodeFailure(String path) {
960 LOG.info("Failed to delete node " + path + " and will retry soon.");
961 return;
962 }
963
964
965
966
967
968
969 private void createRescanNode(long retries) {
970
971
972
973
974
975
976
977 lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
978 SplitLogTask slt = new SplitLogTask.Done(this.serverName, this.recoveryMode);
979 this.watcher.getRecoverableZooKeeper().getZooKeeper().
980 create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(),
981 Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
982 new CreateRescanAsyncCallback(), Long.valueOf(retries));
983 }
984
985 private void createRescanSuccess(String path) {
986 SplitLogCounters.tot_mgr_rescan.incrementAndGet();
987 getDataSetWatch(path, zkretries);
988 }
989
990 private void createRescanFailure() {
991 LOG.fatal("logic failure, rescan failure must not happen");
992 }
993
994
995
996
997
998
999 private Task createTaskIfAbsent(String path, TaskBatch batch) {
1000 Task oldtask;
1001
1002
1003 Task newtask = new Task();
1004 newtask.batch = batch;
1005 oldtask = tasks.putIfAbsent(path, newtask);
1006 if (oldtask == null) {
1007 batch.installed++;
1008 return null;
1009 }
1010
1011 synchronized (oldtask) {
1012 if (oldtask.isOrphan()) {
1013 if (oldtask.status == SUCCESS) {
1014
1015
1016
1017
1018 return (null);
1019 }
1020 if (oldtask.status == IN_PROGRESS) {
1021 oldtask.batch = batch;
1022 batch.installed++;
1023 LOG.debug("Previously orphan task " + path + " is now being waited upon");
1024 return null;
1025 }
1026 while (oldtask.status == FAILURE) {
1027 LOG.debug("wait for status of task " + path + " to change to DELETED");
1028 SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
1029 try {
1030 oldtask.wait();
1031 } catch (InterruptedException e) {
1032 Thread.currentThread().interrupt();
1033 LOG.warn("Interrupted when waiting for znode delete callback");
1034
1035 break;
1036 }
1037 }
1038 if (oldtask.status != DELETED) {
1039 LOG.warn("Failure because previously failed task" +
1040 " state still present. Waiting for znode delete callback" +
1041 " path=" + path);
1042 return oldtask;
1043 }
1044
1045 Task t = tasks.putIfAbsent(path, newtask);
1046 if (t == null) {
1047 batch.installed++;
1048 return null;
1049 }
1050 LOG.fatal("Logic error. Deleted task still present in tasks map");
1051 assert false : "Deleted task still present in tasks map";
1052 return t;
1053 }
1054 LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
1055 return oldtask;
1056 }
1057 }
1058
1059 Task findOrCreateOrphanTask(String path) {
1060 Task orphanTask = new Task();
1061 Task task;
1062 task = tasks.putIfAbsent(path, orphanTask);
1063 if (task == null) {
1064 LOG.info("creating orphan task " + path);
1065 SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
1066 task = orphanTask;
1067 }
1068 return task;
1069 }
1070
1071 @Override
1072 public void nodeDataChanged(String path) {
1073 Task task;
1074 task = tasks.get(path);
1075 if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
1076 if (task != null) {
1077 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
1078 }
1079 getDataSetWatch(path, zkretries);
1080 }
1081 }
1082
1083 public void stop() {
1084 if (timeoutMonitor != null) {
1085 timeoutMonitor.interrupt();
1086 }
1087 }
1088
1089 private void lookForOrphans() {
1090 List<String> orphans;
1091 try {
1092 orphans = ZKUtil.listChildrenNoWatch(this.watcher,
1093 this.watcher.splitLogZNode);
1094 if (orphans == null) {
1095 LOG.warn("could not get children of " + this.watcher.splitLogZNode);
1096 return;
1097 }
1098 } catch (KeeperException e) {
1099 LOG.warn("could not get children of " + this.watcher.splitLogZNode +
1100 " " + StringUtils.stringifyException(e));
1101 return;
1102 }
1103 int rescan_nodes = 0;
1104 for (String path : orphans) {
1105 String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path);
1106 if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
1107 rescan_nodes++;
1108 LOG.debug("found orphan rescan node " + path);
1109 } else {
1110 LOG.info("found orphan task " + path);
1111 }
1112 getDataSetWatch(nodepath, zkretries);
1113 }
1114 LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " +
1115 rescan_nodes + " rescan nodes");
1116 }
1117
1118
1119
1120
1121
1122
1123
1124 void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions)
1125 throws KeeperException {
1126 if (userRegions == null || (this.recoveryMode != RecoveryMode.LOG_REPLAY)) {
1127 return;
1128 }
1129
1130 try {
1131 this.recoveringRegionLock.lock();
1132
1133 this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTimeMillis();
1134
1135 for (HRegionInfo region : userRegions) {
1136 String regionEncodeName = region.getEncodedName();
1137 long retries = this.zkretries;
1138
1139 do {
1140 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);
1141 long lastRecordedFlushedSequenceId = -1;
1142 try {
1143 long lastSequenceId = this.master.getServerManager().getLastFlushedSequenceId(
1144 regionEncodeName.getBytes());
1145
1146
1147
1148
1149
1150 byte[] data = ZKUtil.getData(this.watcher, nodePath);
1151 if (data == null) {
1152 ZKUtil.createSetData(this.watcher, nodePath,
1153 ZKUtil.positionToByteArray(lastSequenceId));
1154 } else {
1155 lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
1156 if (lastRecordedFlushedSequenceId < lastSequenceId) {
1157
1158 ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
1159 }
1160 }
1161
1162 nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
1163 if (lastSequenceId <= lastRecordedFlushedSequenceId) {
1164
1165 lastSequenceId = lastRecordedFlushedSequenceId;
1166 }
1167 ZKUtil.createSetData(this.watcher, nodePath,
1168 ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));
1169 LOG.debug("Mark region " + regionEncodeName + " recovering from failed region server "
1170 + serverName);
1171
1172
1173 break;
1174 } catch (KeeperException e) {
1175
1176 if (retries <= 1) {
1177 throw e;
1178 }
1179
1180 try {
1181 Thread.sleep(20);
1182 } catch (Exception ignoreE) {
1183
1184 }
1185 }
1186 } while ((--retries) > 0 && (!this.stopper.isStopped()));
1187 }
1188 } finally {
1189 this.recoveringRegionLock.unlock();
1190 }
1191 }
1192
1193
1194
1195
1196
1197 public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
1198 long lastRecordedFlushedSequenceId = -1l;
1199 try {
1200 lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes);
1201 } catch (DeserializationException e) {
1202 lastRecordedFlushedSequenceId = -1l;
1203 LOG.warn("Can't parse last flushed sequence Id", e);
1204 }
1205 return lastRecordedFlushedSequenceId;
1206 }
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216 public static boolean
1217 isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName)
1218 throws KeeperException {
1219 boolean result = false;
1220 String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName);
1221
1222 byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath);
1223 if (node != null) {
1224 result = true;
1225 }
1226 return result;
1227 }
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237 public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw,
1238 String serverName, String encodedRegionName) throws IOException {
1239
1240
1241
1242
1243
1244
1245
1246 RegionStoreSequenceIds result = null;
1247 String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
1248 nodePath = ZKUtil.joinZNode(nodePath, serverName);
1249 try {
1250 byte[] data = ZKUtil.getData(zkw, nodePath);
1251 if (data != null) {
1252 result = ZKUtil.parseRegionStoreSequenceIds(data);
1253 }
1254 } catch (KeeperException e) {
1255 throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
1256 + serverName + "; region=" + encodedRegionName, e);
1257 } catch (DeserializationException e) {
1258 LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e);
1259 }
1260 return result;
1261 }
1262
1263
1264
1265
1266
1267
1268
1269
1270 public void setRecoveryMode(boolean isForInitialization) throws KeeperException {
1271 if(this.isDrainingDone) {
1272
1273
1274 return;
1275 }
1276 if(this.watcher == null) {
1277
1278 this.isDrainingDone = true;
1279 this.recoveryMode = RecoveryMode.LOG_SPLITTING;
1280 return;
1281 }
1282 boolean hasSplitLogTask = false;
1283 boolean hasRecoveringRegions = false;
1284 RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN;
1285 RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ?
1286 RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING;
1287
1288
1289 List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
1290 if (regions != null && !regions.isEmpty()) {
1291 hasRecoveringRegions = true;
1292 previousRecoveryMode = RecoveryMode.LOG_REPLAY;
1293 }
1294 if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
1295
1296 List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
1297 if (tasks != null && !tasks.isEmpty()) {
1298 hasSplitLogTask = true;
1299 if (isForInitialization) {
1300
1301 for (String task : tasks) {
1302 try {
1303 byte[] data = ZKUtil.getData(this.watcher,
1304 ZKUtil.joinZNode(watcher.splitLogZNode, task));
1305 if (data == null) continue;
1306 SplitLogTask slt = SplitLogTask.parseFrom(data);
1307 previousRecoveryMode = slt.getMode();
1308 if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
1309
1310
1311
1312 previousRecoveryMode = RecoveryMode.LOG_SPLITTING;
1313 }
1314 break;
1315 } catch (DeserializationException e) {
1316 LOG.warn("Failed parse data for znode " + task, e);
1317 }
1318 }
1319 }
1320 }
1321 }
1322
1323 synchronized(this) {
1324 if(this.isDrainingDone) {
1325 return;
1326 }
1327 if (!hasSplitLogTask && !hasRecoveringRegions) {
1328 this.isDrainingDone = true;
1329 this.recoveryMode = recoveryModeInConfig;
1330 return;
1331 } else if (!isForInitialization) {
1332
1333 return;
1334 }
1335
1336 if (previousRecoveryMode != RecoveryMode.UNKNOWN) {
1337 this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig);
1338 this.recoveryMode = previousRecoveryMode;
1339 } else {
1340 this.recoveryMode = recoveryModeInConfig;
1341 }
1342 }
1343 }
1344
1345 public RecoveryMode getRecoveryMode() {
1346 return this.recoveryMode;
1347 }
1348
1349
1350
1351
1352
1353
1354 private boolean isDistributedLogReplay(Configuration conf) {
1355 boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
1356 HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
1357 int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
1358 if (LOG.isDebugEnabled()) {
1359 LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version);
1360 }
1361
1362 return dlr && (version >= 3);
1363 }
1364
1365
1366
1367
1368
1369
1370
1371 static class TaskBatch {
1372 int installed = 0;
1373 int done = 0;
1374 int error = 0;
1375 volatile boolean isDead = false;
1376
1377 @Override
1378 public String toString() {
1379 return ("installed = " + installed + " done = " + done + " error = " + error);
1380 }
1381 }
1382
1383
1384
1385
1386 static class Task {
1387 volatile long last_update;
1388 volatile int last_version;
1389 volatile ServerName cur_worker_name;
1390 volatile TaskBatch batch;
1391 volatile TerminationStatus status;
1392 volatile int incarnation;
1393 final AtomicInteger unforcedResubmits = new AtomicInteger();
1394 volatile boolean resubmitThresholdReached;
1395
1396 @Override
1397 public String toString() {
1398 return ("last_update = " + last_update +
1399 " last_version = " + last_version +
1400 " cur_worker_name = " + cur_worker_name +
1401 " status = " + status +
1402 " incarnation = " + incarnation +
1403 " resubmits = " + unforcedResubmits.get() +
1404 " batch = " + batch);
1405 }
1406
1407 Task() {
1408 incarnation = 0;
1409 last_version = -1;
1410 status = IN_PROGRESS;
1411 setUnassigned();
1412 }
1413
1414 public boolean isOrphan() {
1415 return (batch == null || batch.isDead);
1416 }
1417
1418 public boolean isUnassigned() {
1419 return (cur_worker_name == null);
1420 }
1421
1422 public void heartbeatNoDetails(long time) {
1423 last_update = time;
1424 }
1425
1426 public void heartbeat(long time, int version, ServerName worker) {
1427 last_version = version;
1428 last_update = time;
1429 cur_worker_name = worker;
1430 }
1431
1432 public void setUnassigned() {
1433 cur_worker_name = null;
1434 last_update = -1;
1435 }
1436 }
1437
1438 void handleDeadWorker(ServerName workerName) {
1439
1440
1441 synchronized (deadWorkersLock) {
1442 if (deadWorkers == null) {
1443 deadWorkers = new HashSet<ServerName>(100);
1444 }
1445 deadWorkers.add(workerName);
1446 }
1447 LOG.info("dead splitlog worker " + workerName);
1448 }
1449
1450 void handleDeadWorkers(Set<ServerName> serverNames) {
1451 synchronized (deadWorkersLock) {
1452 if (deadWorkers == null) {
1453 deadWorkers = new HashSet<ServerName>(100);
1454 }
1455 deadWorkers.addAll(serverNames);
1456 }
1457 LOG.info("dead splitlog workers " + serverNames);
1458 }
1459
1460
1461
1462
1463
1464 private class TimeoutMonitor extends Chore {
1465 private long lastLog = 0;
1466
1467 public TimeoutMonitor(final int period, Stoppable stopper) {
1468 super("SplitLogManager Timeout Monitor", period, stopper);
1469 }
1470
1471 @Override
1472 protected void chore() {
1473 int resubmitted = 0;
1474 int unassigned = 0;
1475 int tot = 0;
1476 boolean found_assigned_task = false;
1477 Set<ServerName> localDeadWorkers;
1478
1479 synchronized (deadWorkersLock) {
1480 localDeadWorkers = deadWorkers;
1481 deadWorkers = null;
1482 }
1483
1484 for (Map.Entry<String, Task> e : tasks.entrySet()) {
1485 String path = e.getKey();
1486 Task task = e.getValue();
1487 ServerName cur_worker = task.cur_worker_name;
1488 tot++;
1489
1490
1491
1492
1493
1494 if (task.isUnassigned()) {
1495 unassigned++;
1496 continue;
1497 }
1498 found_assigned_task = true;
1499 if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
1500 SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
1501 if (resubmit(path, task, FORCE)) {
1502 resubmitted++;
1503 } else {
1504 handleDeadWorker(cur_worker);
1505 LOG.warn("Failed to resubmit task " + path + " owned by dead " +
1506 cur_worker + ", will retry.");
1507 }
1508 } else if (resubmit(path, task, CHECK)) {
1509 resubmitted++;
1510 }
1511 }
1512 if (tot > 0) {
1513 long now = EnvironmentEdgeManager.currentTimeMillis();
1514 if (now > lastLog + 5000) {
1515 lastLog = now;
1516 LOG.info("total tasks = " + tot + " unassigned = " + unassigned + " tasks=" + tasks);
1517 }
1518 }
1519 if (resubmitted > 0) {
1520 LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
1521 }
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531 if (tot > 0 && !found_assigned_task &&
1532 ((EnvironmentEdgeManager.currentTimeMillis() - lastTaskCreateTime) >
1533 unassignedTimeout)) {
1534 for (Map.Entry<String, Task> e : tasks.entrySet()) {
1535 String path = e.getKey();
1536 Task task = e.getValue();
1537
1538
1539
1540
1541 if (task.isUnassigned() && (task.status != FAILURE)) {
1542
1543 tryGetDataSetWatch(path);
1544 }
1545 }
1546 createRescanNode(Long.MAX_VALUE);
1547 SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
1548 LOG.debug("resubmitting unassigned task(s) after timeout");
1549 }
1550
1551
1552 if (failedDeletions.size() > 0) {
1553 List<String> tmpPaths = new ArrayList<String>(failedDeletions);
1554 for (String tmpPath : tmpPaths) {
1555
1556 deleteNode(tmpPath, zkretries);
1557 }
1558 failedDeletions.removeAll(tmpPaths);
1559 }
1560
1561
1562 long timeInterval = EnvironmentEdgeManager.currentTimeMillis()
1563 - lastRecoveringNodeCreationTime;
1564 if (!failedRecoveringRegionDeletions.isEmpty()
1565 || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
1566
1567 if (!failedRecoveringRegionDeletions.isEmpty()) {
1568 List<Pair<Set<ServerName>, Boolean>> previouslyFailedDeletions =
1569 new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions);
1570 failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions);
1571 for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) {
1572 removeRecoveringRegionsFromZK(failedDeletion.getFirst(), failedDeletion.getSecond());
1573 }
1574 } else {
1575 removeRecoveringRegionsFromZK(null, null);
1576 }
1577 }
1578 }
1579 }
1580
1581
1582
1583
1584
1585 class CreateAsyncCallback implements AsyncCallback.StringCallback {
1586 private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class);
1587
1588 @Override
1589 public void processResult(int rc, String path, Object ctx, String name) {
1590 SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
1591 if (rc != 0) {
1592 if (needAbandonRetries(rc, "Create znode " + path)) {
1593 createNodeFailure(path);
1594 return;
1595 }
1596 if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
1597
1598
1599
1600
1601
1602
1603 LOG.debug("found pre-existing znode " + path);
1604 SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
1605 } else {
1606 Long retry_count = (Long)ctx;
1607 LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " +
1608 path + " remaining retries=" + retry_count);
1609 if (retry_count == 0) {
1610 SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
1611 createNodeFailure(path);
1612 } else {
1613 SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
1614 createNode(path, retry_count - 1);
1615 }
1616 return;
1617 }
1618 }
1619 createNodeSuccess(path);
1620 }
1621 }
1622
1623
1624
1625
1626
1627 class GetDataAsyncCallback implements AsyncCallback.DataCallback {
1628 private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
1629 private boolean completeTaskOnNoNode;
1630
1631
1632
1633
1634
1635
1636
1637 public GetDataAsyncCallback(boolean completeTaskOnNoNode) {
1638 this.completeTaskOnNoNode = completeTaskOnNoNode;
1639 }
1640
1641 @Override
1642 public void processResult(int rc, String path, Object ctx, byte[] data,
1643 Stat stat) {
1644 SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
1645 if (rc != 0) {
1646 if (needAbandonRetries(rc, "GetData from znode " + path)) {
1647 return;
1648 }
1649 if (rc == KeeperException.Code.NONODE.intValue()) {
1650 SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
1651 LOG.warn("task znode " + path + " vanished.");
1652 if (completeTaskOnNoNode) {
1653
1654
1655
1656 try {
1657 getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
1658 } catch (DeserializationException e) {
1659 LOG.warn("Deserialization problem", e);
1660 }
1661 }
1662 return;
1663 }
1664 Long retry_count = (Long) ctx;
1665
1666 if (retry_count < 0) {
1667 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1668 path + ". Ignoring error. No error handling. No retrying.");
1669 return;
1670 }
1671 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1672 path + " remaining retries=" + retry_count);
1673 if (retry_count == 0) {
1674 SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
1675 getDataSetWatchFailure(path);
1676 } else {
1677 SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
1678 getDataSetWatch(path, retry_count - 1);
1679 }
1680 return;
1681 }
1682 try {
1683 getDataSetWatchSuccess(path, data, stat.getVersion());
1684 } catch (DeserializationException e) {
1685 LOG.warn("Deserialization problem", e);
1686 }
1687 return;
1688 }
1689 }
1690
1691
1692
1693
1694
1695 class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
1696 private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class);
1697
1698 @Override
1699 public void processResult(int rc, String path, Object ctx) {
1700 SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
1701 if (rc != 0) {
1702 if (needAbandonRetries(rc, "Delete znode " + path)) {
1703 failedDeletions.add(path);
1704 return;
1705 }
1706 if (rc != KeeperException.Code.NONODE.intValue()) {
1707 SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
1708 Long retry_count = (Long) ctx;
1709 LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " +
1710 path + " remaining retries=" + retry_count);
1711 if (retry_count == 0) {
1712 LOG.warn("delete failed " + path);
1713 failedDeletions.add(path);
1714 deleteNodeFailure(path);
1715 } else {
1716 deleteNode(path, retry_count - 1);
1717 }
1718 return;
1719 } else {
1720 LOG.info(path +
1721 " does not exist. Either was created but deleted behind our" +
1722 " back by another pending delete OR was deleted" +
1723 " in earlier retry rounds. zkretries = " + (Long) ctx);
1724 }
1725 } else {
1726 LOG.debug("deleted " + path);
1727 }
1728 deleteNodeSuccess(path);
1729 }
1730 }
1731
1732
1733
1734
1735
1736
1737
1738
1739 class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
1740 private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class);
1741
1742 @Override
1743 public void processResult(int rc, String path, Object ctx, String name) {
1744 if (rc != 0) {
1745 if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
1746 return;
1747 }
1748 Long retry_count = (Long)ctx;
1749 LOG.warn("rc=" + KeeperException.Code.get(rc) + " for "+ path +
1750 " remaining retries=" + retry_count);
1751 if (retry_count == 0) {
1752 createRescanFailure();
1753 } else {
1754 createRescanNode(retry_count - 1);
1755 }
1756 return;
1757 }
1758
1759 createRescanSuccess(name);
1760 }
1761 }
1762
1763
1764
1765
1766
1767
1768
1769 public interface TaskFinisher {
1770
1771
1772
1773 enum Status {
1774
1775
1776
1777 DONE(),
1778
1779
1780
1781 ERR();
1782 }
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792 Status finish(ServerName workerName, String taskname);
1793 }
1794
1795 enum ResubmitDirective {
1796 CHECK(),
1797 FORCE();
1798 }
1799
1800 enum TerminationStatus {
1801 IN_PROGRESS("in_progress"),
1802 SUCCESS("success"),
1803 FAILURE("failure"),
1804 DELETED("deleted");
1805
1806 String statusMsg;
1807 TerminationStatus(String msg) {
1808 statusMsg = msg;
1809 }
1810
1811 @Override
1812 public String toString() {
1813 return statusMsg;
1814 }
1815 }
1816 }