1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.Comparator;
23 import java.util.List;
24
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.Coprocessor;
27 import org.apache.hadoop.hbase.CoprocessorEnvironment;
28 import org.apache.hadoop.hbase.MetaMutationAnnotation;
29 import org.apache.hadoop.hbase.client.Mutation;
30 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
31 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
32 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
33 import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
34
35 public class RegionServerCoprocessorHost extends
36 CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
37
38 private RegionServerServices rsServices;
39
40 public RegionServerCoprocessorHost(RegionServerServices rsServices,
41 Configuration conf) {
42 super(rsServices);
43 this.rsServices = rsServices;
44 this.conf = conf;
45
46 loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
47 }
48
49 @Override
50 public RegionServerEnvironment createEnvironment(Class<?> implClass,
51 Coprocessor instance, int priority, int sequence, Configuration conf) {
52 return new RegionServerEnvironment(implClass, instance, priority,
53 sequence, conf, this.rsServices);
54 }
55
56 public void preStop(String message) throws IOException {
57 ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
58 for (RegionServerEnvironment env : coprocessors) {
59 if (env.getInstance() instanceof RegionServerObserver) {
60 ctx = ObserverContext.createAndPrepare(env, ctx);
61 Thread currentThread = Thread.currentThread();
62 ClassLoader cl = currentThread.getContextClassLoader();
63 try {
64 currentThread.setContextClassLoader(env.getClassLoader());
65 ((RegionServerObserver) env.getInstance()).preStopRegionServer(ctx);
66 } catch (Throwable e) {
67 handleCoprocessorThrowable(env, e);
68 } finally {
69 currentThread.setContextClassLoader(cl);
70 }
71 if (ctx.shouldComplete()) {
72 break;
73 }
74 }
75
76 shutdown(env);
77 }
78 }
79
80 public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException {
81 boolean bypass = false;
82 ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
83 for (RegionServerEnvironment env : coprocessors) {
84 if (env.getInstance() instanceof RegionServerObserver) {
85 ctx = ObserverContext.createAndPrepare(env, ctx);
86 Thread currentThread = Thread.currentThread();
87 ClassLoader cl = currentThread.getContextClassLoader();
88 try {
89 currentThread.setContextClassLoader(env.getClassLoader());
90 ((RegionServerObserver) env.getInstance()).preMerge(ctx, regionA, regionB);
91 } catch (Throwable e) {
92 handleCoprocessorThrowable(env, e);
93 } finally {
94 currentThread.setContextClassLoader(cl);
95 }
96 bypass |= ctx.shouldBypass();
97 if (ctx.shouldComplete()) {
98 break;
99 }
100 }
101 }
102 return bypass;
103 }
104
105 public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion)
106 throws IOException {
107 ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
108 for (RegionServerEnvironment env : coprocessors) {
109 if (env.getInstance() instanceof RegionServerObserver) {
110 ctx = ObserverContext.createAndPrepare(env, ctx);
111 Thread currentThread = Thread.currentThread();
112 ClassLoader cl = currentThread.getContextClassLoader();
113 try {
114 currentThread.setContextClassLoader(env.getClassLoader());
115 ((RegionServerObserver) env.getInstance()).postMerge(ctx, regionA, regionB, mergedRegion);
116 } catch (Throwable e) {
117 handleCoprocessorThrowable(env, e);
118 } finally {
119 currentThread.setContextClassLoader(cl);
120 }
121 if (ctx.shouldComplete()) {
122 break;
123 }
124 }
125 }
126 }
127
128 public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
129 final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
130 boolean bypass = false;
131 ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
132 for (RegionServerEnvironment env : coprocessors) {
133 if (env.getInstance() instanceof RegionServerObserver) {
134 ctx = ObserverContext.createAndPrepare(env, ctx);
135 Thread currentThread = Thread.currentThread();
136 ClassLoader cl = currentThread.getContextClassLoader();
137 try {
138 currentThread.setContextClassLoader(env.getClassLoader());
139 ((RegionServerObserver) env.getInstance()).preMergeCommit(ctx, regionA, regionB,
140 metaEntries);
141 } catch (Throwable e) {
142 handleCoprocessorThrowable(env, e);
143 } finally {
144 currentThread.setContextClassLoader(cl);
145 }
146 bypass |= ctx.shouldBypass();
147 if (ctx.shouldComplete()) {
148 break;
149 }
150 }
151 }
152 return bypass;
153 }
154
155 public void postMergeCommit(final HRegion regionA, final HRegion regionB,
156 final HRegion mergedRegion) throws IOException {
157 ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
158 for (RegionServerEnvironment env : coprocessors) {
159 if (env.getInstance() instanceof RegionServerObserver) {
160 ctx = ObserverContext.createAndPrepare(env, ctx);
161 Thread currentThread = Thread.currentThread();
162 ClassLoader cl = currentThread.getContextClassLoader();
163 try {
164 currentThread.setContextClassLoader(env.getClassLoader());
165 ((RegionServerObserver) env.getInstance()).postMergeCommit(ctx, regionA, regionB,
166 mergedRegion);
167 } catch (Throwable e) {
168 handleCoprocessorThrowable(env, e);
169 } finally {
170 currentThread.setContextClassLoader(cl);
171 }
172 if (ctx.shouldComplete()) {
173 break;
174 }
175 }
176 }
177 }
178
179 public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
180 ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
181 for (RegionServerEnvironment env : coprocessors) {
182 if (env.getInstance() instanceof RegionServerObserver) {
183 ctx = ObserverContext.createAndPrepare(env, ctx);
184 Thread currentThread = Thread.currentThread();
185 ClassLoader cl = currentThread.getContextClassLoader();
186 try {
187 currentThread.setContextClassLoader(env.getClassLoader());
188 ((RegionServerObserver) env.getInstance()).preRollBackMerge(ctx, regionA, regionB);
189 } catch (Throwable e) {
190 handleCoprocessorThrowable(env, e);
191 } finally {
192 currentThread.setContextClassLoader(cl);
193 }
194 if (ctx.shouldComplete()) {
195 break;
196 }
197 }
198 }
199 }
200
201 public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
202 ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
203 for (RegionServerEnvironment env : coprocessors) {
204 if (env.getInstance() instanceof RegionServerObserver) {
205 ctx = ObserverContext.createAndPrepare(env, ctx);
206 Thread currentThread = Thread.currentThread();
207 ClassLoader cl = currentThread.getContextClassLoader();
208 try {
209 currentThread.setContextClassLoader(env.getClassLoader());
210 ((RegionServerObserver) env.getInstance()).postRollBackMerge(ctx, regionA, regionB);
211 } catch (Throwable e) {
212 handleCoprocessorThrowable(env, e);
213 } finally {
214 currentThread.setContextClassLoader(cl);
215 }
216 if (ctx.shouldComplete()) {
217 break;
218 }
219 }
220 }
221 }
222
223
224
225
226
227 static class RegionServerEnvironment extends CoprocessorHost.Environment
228 implements RegionServerCoprocessorEnvironment {
229
230 private RegionServerServices regionServerServices;
231
232 public RegionServerEnvironment(final Class<?> implClass,
233 final Coprocessor impl, final int priority, final int seq,
234 final Configuration conf, final RegionServerServices services) {
235 super(impl, priority, seq, conf);
236 this.regionServerServices = services;
237 }
238
239 @Override
240 public RegionServerServices getRegionServerServices() {
241 return regionServerServices;
242 }
243 }
244
245
246
247
248
249 static class EnvironmentPriorityComparator implements
250 Comparator<CoprocessorEnvironment> {
251 public int compare(final CoprocessorEnvironment env1,
252 final CoprocessorEnvironment env2) {
253 if (env1.getPriority() < env2.getPriority()) {
254 return -1;
255 } else if (env1.getPriority() > env2.getPriority()) {
256 return 1;
257 }
258 if (env1.getLoadSequence() < env2.getLoadSequence()) {
259 return -1;
260 } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
261 return 1;
262 }
263 return 0;
264 }
265 }
266 }