1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.IOException;
21 import java.util.Arrays;
22 import java.util.List;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.classification.InterfaceStability;
28 import org.apache.hadoop.hbase.errorhandling.ForeignException;
29 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
30 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
31 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
32 import org.apache.zookeeper.KeeperException;
33
34 import com.google.protobuf.InvalidProtocolBufferException;
35
36
37
38
39 @InterfaceAudience.Private
40 public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
41 public static final Log LOG = LogFactory.getLog(ZKProcedureCoordinatorRpcs.class);
42 private ZKProcedureUtil zkProc = null;
43 protected ProcedureCoordinator coordinator = null;
44
45 ZooKeeperWatcher watcher;
46 String procedureType;
47 String coordName;
48
49
50
51
52
53
54
55
56 public ZKProcedureCoordinatorRpcs(ZooKeeperWatcher watcher,
57 String procedureClass, String coordName) throws KeeperException {
58 this.watcher = watcher;
59 this.procedureType = procedureClass;
60 this.coordName = coordName;
61 }
62
63
64
65
66
67
68
69
70
71
72
73 @Override
74 final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
75 throws IOException, IllegalArgumentException {
76 String procName = proc.getName();
77
78 String abortNode = zkProc.getAbortZNode(procName);
79 try {
80
81 if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
82 abort(abortNode);
83 }
84
85
86 } catch (KeeperException e) {
87 LOG.error("Failed to watch abort", e);
88 throw new IOException("Failed while watching abort node:" + abortNode, e);
89 }
90
91
92 String acquire = zkProc.getAcquiredBarrierNode(procName);
93 LOG.debug("Creating acquire znode:" + acquire);
94 try {
95
96 byte[] data = ProtobufUtil.prependPBMagic(info);
97 ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
98
99 for (String node : nodeNames) {
100 String znode = ZKUtil.joinZNode(acquire, node);
101 LOG.debug("Watching for acquire node:" + znode);
102 if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
103 coordinator.memberAcquiredBarrier(procName, node);
104 }
105 }
106 } catch (KeeperException e) {
107 throw new IOException("Failed while creating acquire node:" + acquire, e);
108 }
109 }
110
111 @Override
112 public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException {
113 String procName = proc.getName();
114 String reachedNode = zkProc.getReachedBarrierNode(procName);
115 LOG.debug("Creating reached barrier zk node:" + reachedNode);
116 try {
117
118 ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode);
119
120 for (String node : nodeNames) {
121 String znode = ZKUtil.joinZNode(reachedNode, node);
122 if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
123 coordinator.memberFinishedBarrier(procName, node);
124 }
125 }
126 } catch (KeeperException e) {
127 throw new IOException("Failed while creating reached node:" + reachedNode, e);
128 }
129 }
130
131
132
133
134
135 @Override
136 final public void resetMembers(Procedure proc) throws IOException {
137 String procName = proc.getName();
138 boolean stillGettingNotifications = false;
139 do {
140 try {
141 LOG.debug("Attempting to clean out zk node for op:" + procName);
142 zkProc.clearZNodes(procName);
143 stillGettingNotifications = false;
144 } catch (KeeperException.NotEmptyException e) {
145
146
147 stillGettingNotifications = true;
148 } catch (KeeperException e) {
149 throw new IOException("Failed to complete reset procedure " + procName, e);
150 }
151 } while (stillGettingNotifications);
152 }
153
154
155
156
157
158 final public boolean start(final ProcedureCoordinator coordinator) {
159 if (this.coordinator != null) {
160 throw new IllegalStateException(
161 "ZKProcedureCoordinator already started and already has listener installed");
162 }
163 this.coordinator = coordinator;
164
165 try {
166 this.zkProc = new ZKProcedureUtil(watcher, procedureType) {
167 @Override
168 public void nodeCreated(String path) {
169 if (!isInProcedurePath(path)) return;
170 LOG.debug("Node created: " + path);
171 logZKTree(this.baseZNode);
172 if (isAcquiredPathNode(path)) {
173
174 coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
175 ZKUtil.getNodeName(path));
176 } else if (isReachedPathNode(path)) {
177
178
179
180 coordinator.memberFinishedBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
181 ZKUtil.getNodeName(path));
182 } else if (isAbortPathNode(path)) {
183 abort(path);
184 } else {
185 LOG.debug("Ignoring created notification for node:" + path);
186 }
187 }
188 };
189 zkProc.clearChildZNodes();
190 } catch (KeeperException e) {
191 LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e);
192 return false;
193 }
194
195 LOG.debug("Starting the controller for procedure member:" + coordName);
196 return true;
197 }
198
199
200
201
202
203
204
205 @Override
206 final public void sendAbortToMembers(Procedure proc, ForeignException ee) {
207 String procName = proc.getName();
208 LOG.debug("Aborting procedure '" + procName + "' in zk");
209 String procAbortNode = zkProc.getAbortZNode(procName);
210 try {
211 LOG.debug("Creating abort znode:" + procAbortNode);
212 String source = (ee.getSource() == null) ? coordName : ee.getSource();
213 byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
214
215 ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo);
216 LOG.debug("Finished creating abort node:" + procAbortNode);
217 } catch (KeeperException e) {
218
219
220 zkProc.logZKTree(zkProc.baseZNode);
221 coordinator.rpcConnectionFailure("Failed to post zk node:" + procAbortNode
222 + " to abort procedure '" + procName + "'", new IOException(e));
223 }
224 }
225
226
227
228
229
230 protected void abort(String abortNode) {
231 String procName = ZKUtil.getNodeName(abortNode);
232 ForeignException ee = null;
233 try {
234 byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode);
235 if (!ProtobufUtil.isPBMagicPrefix(data)) {
236 LOG.warn("Got an error notification for op:" + abortNode
237 + " but we can't read the information. Killing the procedure.");
238
239 ee = new ForeignException(coordName, "Data in abort node is illegally formatted. ignoring content.");
240 } else {
241
242 data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
243 ee = ForeignException.deserialize(data);
244 }
245 } catch (InvalidProtocolBufferException e) {
246 LOG.warn("Got an error notification for op:" + abortNode
247 + " but we can't read the information. Killing the procedure.");
248
249 ee = new ForeignException(coordName, e);
250 } catch (KeeperException e) {
251 coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
252 + zkProc.getAbortZnode(), new IOException(e));
253 }
254 coordinator.abortProcedure(procName, ee);
255 }
256
257 @Override
258 final public void close() throws IOException {
259 zkProc.close();
260 }
261
262
263
264
265 final ZKProcedureUtil getZkProcedureUtil() {
266 return zkProc;
267 }
268 }