1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Future;
26 import java.util.concurrent.RejectedExecutionException;
27 import java.util.concurrent.SynchronousQueue;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.classification.InterfaceAudience;
34 import org.apache.hadoop.hbase.DaemonThreadFactory;
35 import org.apache.hadoop.hbase.errorhandling.ForeignException;
36
37 import com.google.common.collect.MapMaker;
38
39
40
41
42
43
44
45
46
47
48 @InterfaceAudience.Private
49 public class ProcedureMember implements Closeable {
50 private static final Log LOG = LogFactory.getLog(ProcedureMember.class);
51
52 final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
53
54 private final SubprocedureFactory builder;
55 private final ProcedureMemberRpcs rpcs;
56
57 private final ConcurrentMap<String,Subprocedure> subprocs =
58 new MapMaker().concurrencyLevel(4).weakValues().makeMap();
59 private final ExecutorService pool;
60
61
62
63
64
65
66
67
68 public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool,
69 SubprocedureFactory factory) {
70 this.pool = pool;
71 this.rpcs = rpcs;
72 this.builder = factory;
73 }
74
75
76
77
78
79
80
81 public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) {
82 return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
83 }
84
85
86
87
88
89
90
91
92 public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
93 long keepAliveMillis) {
94 return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
95 new SynchronousQueue<Runnable>(),
96 new DaemonThreadFactory("member: '" + memberName + "' subprocedure-pool"));
97 }
98
99
100
101
102
103
104 ProcedureMemberRpcs getRpcs() {
105 return rpcs;
106 }
107
108
109
110
111
112
113
114
115
116
117 public Subprocedure createSubprocedure(String opName, byte[] data) {
118 return builder.buildSubprocedure(opName, data);
119 }
120
121
122
123
124
125
126
127
128 public boolean submitSubprocedure(Subprocedure subproc) {
129
130 if (subproc == null) {
131 LOG.warn("Submitted null subprocedure, nothing to run here.");
132 return false;
133 }
134
135 String procName = subproc.getName();
136 if (procName == null || procName.length() == 0) {
137 LOG.error("Subproc name cannot be null or the empty string");
138 return false;
139 }
140
141
142 Subprocedure rsub;
143 synchronized (subprocs) {
144 rsub = subprocs.get(procName);
145 }
146 if (rsub != null) {
147 if (!rsub.isComplete()) {
148 LOG.error("Subproc '" + procName + "' is already running. Bailing out");
149 return false;
150 }
151 LOG.warn("A completed old subproc " + procName + " is still present, removing");
152 subprocs.remove(procName);
153 }
154
155 LOG.debug("Submitting new Subprocedure:" + procName);
156
157
158 Future<Void> future = null;
159 try {
160 synchronized (subprocs) {
161 subprocs.put(procName, subproc);
162 }
163 future = this.pool.submit(subproc);
164 return true;
165 } catch (RejectedExecutionException e) {
166 synchronized (subprocs) {
167 subprocs.remove(procName);
168 }
169
170 String msg = "Subprocedure pool is full!";
171 subproc.cancel(msg, e.getCause());
172
173
174 if (future != null) {
175 future.cancel(true);
176 }
177 }
178
179 LOG.error("Failed to start subprocedure '" + procName + "'");
180 return false;
181 }
182
183
184
185
186
187 public void receivedReachedGlobalBarrier(String procName) {
188 Subprocedure subproc = subprocs.get(procName);
189 if (subproc == null) {
190 LOG.warn("Unexpected reached glabal barrier message for Sub-Procedure '" + procName + "'");
191 return;
192 }
193 subproc.receiveReachedGlobalBarrier();
194 }
195
196
197
198
199 @Override
200 public void close() throws IOException {
201
202 pool.shutdownNow();
203 }
204
205
206
207
208
209
210
211 boolean closeAndWait(long timeoutMs) throws InterruptedException {
212 pool.shutdown();
213 return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
214 }
215
216
217
218
219
220
221
222
223
224
225
226 public void controllerConnectionFailure(final String message, final IOException cause) {
227 Collection<Subprocedure> toNotify = subprocs.values();
228 LOG.error(message, cause);
229 for (Subprocedure sub : toNotify) {
230
231 sub.cancel(message, cause);
232 }
233 }
234
235
236
237
238
239
240 public void receiveAbortProcedure(String procName, ForeignException ee) {
241 LOG.debug("Request received to abort procedure " + procName, ee);
242
243 Subprocedure sub = subprocs.get(procName);
244 if (sub == null) {
245 LOG.info("Received abort on procedure with no local subprocedure " + procName +
246 ", ignoring it.", ee);
247 return;
248 }
249 String msg = "Propagating foreign exception to subprocedure " + sub.getName();
250 LOG.error(msg, ee);
251 sub.cancel(msg, ee);
252 }
253 }