1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.regionserver.wal;
22
23 import java.io.IOException;
24
25 import org.apache.hadoop.hbase.Coprocessor;
26 import org.apache.hadoop.hbase.HRegionInfo;
27 import org.apache.hadoop.hbase.coprocessor.*;
28 import org.apache.hadoop.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30
31
32
33
34
35 @InterfaceAudience.Private
36 public class WALCoprocessorHost
37 extends CoprocessorHost<WALCoprocessorHost.WALEnvironment> {
38
39
40
41
42 static class WALEnvironment extends CoprocessorHost.Environment
43 implements WALCoprocessorEnvironment {
44
45 private FSHLog wal;
46
47 @Override
48 public FSHLog getWAL() {
49 return wal;
50 }
51
52
53
54
55
56
57
58
59
60
61 public WALEnvironment(Class<?> implClass, final Coprocessor impl,
62 final int priority, final int seq, final Configuration conf,
63 final FSHLog hlog) {
64 super(impl, priority, seq, conf);
65 this.wal = hlog;
66 }
67 }
68
69 FSHLog wal;
70
71
72
73
74
75 public WALCoprocessorHost(final FSHLog log, final Configuration conf) {
76
77
78
79
80
81
82 super(null);
83 this.wal = log;
84
85 loadSystemCoprocessors(conf, WAL_COPROCESSOR_CONF_KEY);
86 }
87
88 @Override
89 public WALEnvironment createEnvironment(final Class<?> implClass,
90 final Coprocessor instance, final int priority, final int seq,
91 final Configuration conf) {
92 return new WALEnvironment(implClass, instance, priority, seq, conf,
93 this.wal);
94 }
95
96
97
98
99
100
101
102
103 public boolean preWALWrite(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
104 throws IOException {
105 boolean bypass = false;
106 ObserverContext<WALCoprocessorEnvironment> ctx = null;
107 for (WALEnvironment env: coprocessors) {
108 if (env.getInstance() instanceof
109 org.apache.hadoop.hbase.coprocessor.WALObserver) {
110 ctx = ObserverContext.createAndPrepare(env, ctx);
111 Thread currentThread = Thread.currentThread();
112 ClassLoader cl = currentThread.getContextClassLoader();
113 try {
114 currentThread.setContextClassLoader(env.getClassLoader());
115 ((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()).
116 preWALWrite(ctx, info, logKey, logEdit);
117 } catch (Throwable e) {
118 handleCoprocessorThrowable(env, e);
119 } finally {
120 currentThread.setContextClassLoader(cl);
121 }
122 bypass |= ctx.shouldBypass();
123 if (ctx.shouldComplete()) {
124 break;
125 }
126 }
127 }
128 return bypass;
129 }
130
131
132
133
134
135
136
137 public void postWALWrite(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
138 throws IOException {
139 ObserverContext<WALCoprocessorEnvironment> ctx = null;
140 for (WALEnvironment env: coprocessors) {
141 if (env.getInstance() instanceof
142 org.apache.hadoop.hbase.coprocessor.WALObserver) {
143 ctx = ObserverContext.createAndPrepare(env, ctx);
144 Thread currentThread = Thread.currentThread();
145 ClassLoader cl = currentThread.getContextClassLoader();
146 try {
147 currentThread.setContextClassLoader(env.getClassLoader());
148 ((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()).
149 postWALWrite(ctx, info, logKey, logEdit);
150 } catch (Throwable e) {
151 handleCoprocessorThrowable(env, e);
152 } finally {
153 currentThread.setContextClassLoader(cl);
154 }
155 if (ctx.shouldComplete()) {
156 break;
157 }
158 }
159 }
160 }
161 }