1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Set;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.SynchronousQueue;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.classification.InterfaceAudience;
36 import org.apache.hadoop.hbase.DaemonThreadFactory;
37 import org.apache.hadoop.hbase.errorhandling.ForeignException;
38 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
39
40 import com.google.common.collect.MapMaker;
41
42
43
44
45
46
47
48 @InterfaceAudience.Private
49 public class ProcedureCoordinator {
50 private static final Log LOG = LogFactory.getLog(ProcedureCoordinator.class);
51
52 final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
53 final static long TIMEOUT_MILLIS_DEFAULT = 60000;
54 final static long WAKE_MILLIS_DEFAULT = 500;
55
56 private final ProcedureCoordinatorRpcs rpcs;
57 private final ExecutorService pool;
58 private final long wakeTimeMillis;
59 private final long timeoutMillis;
60
61
62 private final ConcurrentMap<String, Procedure> procedures =
63 new MapMaker().concurrencyLevel(4).weakValues().makeMap();
64
65
66
67
68
69
70
71
72
73
74 public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) {
75 this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT);
76 }
77
78
79
80
81
82
83
84
85
86
87
88 public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool,
89 long timeoutMillis, long wakeTimeMillis) {
90 this.timeoutMillis = timeoutMillis;
91 this.wakeTimeMillis = wakeTimeMillis;
92 this.rpcs = rpcs;
93 this.pool = pool;
94 this.rpcs.start(this);
95 }
96
97
98
99
100
101
102
103 public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) {
104 return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT);
105 }
106
107
108
109
110
111
112
113
114 public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
115 long keepAliveMillis) {
116 return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
117 new SynchronousQueue<Runnable>(),
118 new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool"));
119 }
120
121
122
123
124
125 public void close() throws IOException {
126
127 pool.shutdownNow();
128 rpcs.close();
129 }
130
131
132
133
134
135
136
137
138
139
140 boolean submitProcedure(Procedure proc) {
141
142 if (proc == null) {
143 return false;
144 }
145 String procName = proc.getName();
146
147
148 synchronized (procedures) {
149 Procedure oldProc = procedures.get(procName);
150 if (oldProc != null) {
151
152 if (oldProc.completedLatch.getCount() != 0) {
153 LOG.warn("Procedure " + procName + " currently running. Rejecting new request");
154 return false;
155 }
156 LOG.debug("Procedure " + procName + " was in running list but was completed. Accepting new attempt.");
157 procedures.remove(procName);
158 }
159 }
160
161
162 Future<Void> f = null;
163 try {
164 synchronized (procedures) {
165 this.procedures.put(procName, proc);
166 f = this.pool.submit(proc);
167 }
168 return true;
169 } catch (RejectedExecutionException e) {
170 LOG.warn("Procedure " + procName + " rejected by execution pool. Propagating error and " +
171 "cancelling operation.", e);
172
173 this.procedures.remove(procName);
174
175 proc.receive(new ForeignException(procName, e));
176
177
178 if (f != null) {
179 f.cancel(true);
180 }
181 }
182 return false;
183 }
184
185
186
187
188
189
190
191
192 void rpcConnectionFailure(final String message, final IOException cause) {
193 Collection<Procedure> toNotify = procedures.values();
194
195 for (Procedure proc : toNotify) {
196 if (proc == null) {
197 continue;
198 }
199
200 proc.receive(new ForeignException(proc.getName(), cause));
201 }
202 }
203
204
205
206
207
208
209 public void abortProcedure(String procName, ForeignException reason) {
210
211 synchronized(procedures) {
212 Procedure proc = procedures.get(procName);
213 if (proc == null) {
214 return;
215 }
216 proc.receive(reason);
217 }
218 }
219
220
221
222
223
224
225
226
227 Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
228 List<String> expectedMembers) {
229
230 return new Procedure(this, fed, wakeTimeMillis, timeoutMillis,
231 procName, procArgs, expectedMembers);
232 }
233
234
235
236
237
238
239
240
241
242 public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
243 List<String> expectedMembers) throws RejectedExecutionException {
244 Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers);
245 if (!this.submitProcedure(proc)) {
246 LOG.error("Failed to submit procedure '" + procName + "'");
247 return null;
248 }
249 return proc;
250 }
251
252
253
254
255
256
257
258 void memberAcquiredBarrier(String procName, final String member) {
259 Procedure proc = procedures.get(procName);
260 if (proc == null) {
261 LOG.warn("Member '"+ member +"' is trying to acquire an unknown procedure '"+ procName +"'");
262 return;
263 }
264
265 proc.barrierAcquiredByMember(member);
266 }
267
268
269
270
271
272
273
274 void memberFinishedBarrier(String procName, final String member) {
275 Procedure proc = procedures.get(procName);
276 if (proc == null) {
277 LOG.warn("Member '"+ member +"' is trying to release an unknown procedure '"+ procName +"'");
278 return;
279 }
280 proc.barrierReleasedByMember(member);
281 }
282
283
284
285
286 ProcedureCoordinatorRpcs getRpcs() {
287 return rpcs;
288 }
289
290
291
292
293
294
295
296 public Procedure getProcedure(String name) {
297 return procedures.get(name);
298 }
299
300
301
302
303 public Set<String> getProcedureNames() {
304 return new HashSet<String>(procedures.keySet());
305 }
306 }