1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.handler;
20
21 import java.io.IOException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.atomic.AtomicLong;
24
25 import org.apache.commons.lang.mutable.MutableInt;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.Server;
30 import org.apache.hadoop.hbase.ServerName;
31 import org.apache.hadoop.hbase.SplitLogCounters;
32 import org.apache.hadoop.hbase.SplitLogTask;
33 import org.apache.hadoop.hbase.executor.EventHandler;
34 import org.apache.hadoop.hbase.executor.EventType;
35 import org.apache.hadoop.hbase.master.SplitLogManager;
36 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
37 import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
38 import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
39 import org.apache.hadoop.hbase.util.CancelableProgressable;
40 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
41 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
42 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
43 import org.apache.zookeeper.KeeperException;
44
45
46
47
48 @InterfaceAudience.Private
49 public class HLogSplitterHandler extends EventHandler {
50 private static final Log LOG = LogFactory.getLog(HLogSplitterHandler.class);
51 private final ServerName serverName;
52 private final String curTask;
53 private final String wal;
54 private final ZooKeeperWatcher zkw;
55 private final CancelableProgressable reporter;
56 private final AtomicInteger inProgressTasks;
57 private final MutableInt curTaskZKVersion;
58 private final TaskExecutor splitTaskExecutor;
59 private final RecoveryMode mode;
60
61 public HLogSplitterHandler(final Server server, String curTask,
62 final MutableInt curTaskZKVersion,
63 CancelableProgressable reporter,
64 AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) {
65 super(server, EventType.RS_LOG_REPLAY);
66 this.curTask = curTask;
67 this.wal = ZKSplitLog.getFileName(curTask);
68 this.reporter = reporter;
69 this.inProgressTasks = inProgressTasks;
70 this.inProgressTasks.incrementAndGet();
71 this.serverName = server.getServerName();
72 this.zkw = server.getZooKeeper();
73 this.curTaskZKVersion = curTaskZKVersion;
74 this.splitTaskExecutor = splitTaskExecutor;
75 this.mode = mode;
76 }
77
78 @Override
79 public void process() throws IOException {
80 long startTime = System.currentTimeMillis();
81 try {
82 Status status = this.splitTaskExecutor.exec(wal, mode, reporter);
83 switch (status) {
84 case DONE:
85 endTask(zkw, new SplitLogTask.Done(this.serverName, this.mode),
86 SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue());
87 break;
88 case PREEMPTED:
89 SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
90 LOG.warn("task execution prempted " + wal);
91 break;
92 case ERR:
93 if (server != null && !server.isStopped()) {
94 endTask(zkw, new SplitLogTask.Err(this.serverName, this.mode),
95 SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue());
96 break;
97 }
98
99
100
101 case RESIGNED:
102 if (server != null && server.isStopped()) {
103 LOG.info("task execution interrupted because worker is exiting " + curTask);
104 }
105 endTask(zkw, new SplitLogTask.Resigned(this.serverName, this.mode),
106 SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue());
107 break;
108 }
109 } finally {
110 LOG.info("worker " + serverName + " done with task " + curTask + " in "
111 + (System.currentTimeMillis() - startTime) + "ms");
112 this.inProgressTasks.decrementAndGet();
113 }
114 }
115
116
117
118
119
120
121
122 public static void endTask(ZooKeeperWatcher zkw, SplitLogTask slt, AtomicLong ctr, String task,
123 int taskZKVersion) {
124 try {
125 if (ZKUtil.setData(zkw, task, slt.toByteArray(), taskZKVersion)) {
126 LOG.info("successfully transitioned task " + task + " to final state " + slt);
127 ctr.incrementAndGet();
128 return;
129 }
130 LOG.warn("failed to transistion task " + task + " to end state " + slt
131 + " because of version mismatch ");
132 } catch (KeeperException.BadVersionException bve) {
133 LOG.warn("transisition task " + task + " to " + slt
134 + " failed because of version mismatch", bve);
135 } catch (KeeperException.NoNodeException e) {
136 LOG.fatal(
137 "logic error - end task " + task + " " + slt
138 + " failed because task doesn't exist", e);
139 } catch (KeeperException e) {
140 LOG.warn("failed to end task, " + task + " " + slt, e);
141 }
142 SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
143 }
144 }