1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import java.io.IOException;
21 import java.util.List;
22 import java.util.concurrent.ExecutorService;
23
24 import org.apache.hadoop.classification.InterfaceAudience;
25 import org.apache.hadoop.classification.InterfaceStability;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.CoprocessorEnvironment;
28 import org.apache.hadoop.hbase.HRegionLocation;
29 import org.apache.hadoop.hbase.HTableDescriptor;
30 import org.apache.hadoop.hbase.MasterNotRunningException;
31 import org.apache.hadoop.hbase.ServerName;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
34 import org.apache.hadoop.hbase.client.HConnection;
35 import org.apache.hadoop.hbase.client.HConnectionManager;
36 import org.apache.hadoop.hbase.client.HTableInterface;
37 import org.apache.hadoop.hbase.client.Row;
38 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
39 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
40 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
41 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
42 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
43 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
44 import org.apache.hadoop.hbase.regionserver.HRegionServer;
45 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
46 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47
48 import com.google.protobuf.BlockingRpcChannel;
49 import com.google.protobuf.BlockingService;
50 import com.google.protobuf.Descriptors.MethodDescriptor;
51 import com.google.protobuf.Message;
52 import com.google.protobuf.RpcController;
53 import com.google.protobuf.ServiceException;
54
55
56
57
58
59
60
61
62
63 @InterfaceAudience.Private
64 @InterfaceStability.Evolving
65 public class CoprocessorHConnection implements HConnection {
66
67
68
69
70
71
72
73
74
75 public static HConnection getConnectionForEnvironment(CoprocessorEnvironment env)
76 throws IOException {
77 HConnection connection = HConnectionManager.createConnection(env.getConfiguration());
78
79 if (env instanceof RegionCoprocessorEnvironment) {
80 RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env;
81 RegionServerServices services = e.getRegionServerServices();
82 if (services instanceof HRegionServer) {
83 return new CoprocessorHConnection(connection, (HRegionServer) services);
84 }
85 }
86 return connection;
87 }
88
89 private HConnection delegate;
90 private ServerName serverName;
91 private HRegionServer server;
92
93 public CoprocessorHConnection(HConnection delegate, HRegionServer server) {
94 this.server = server;
95 this.serverName = server.getServerName();
96 this.delegate = delegate;
97 }
98
99 public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface
100 getClient(ServerName serverName) throws IOException {
101
102 if (!this.serverName.equals(serverName)) {
103 return delegate.getClient(serverName);
104 }
105
106
107 final BlockingService blocking = ClientService.newReflectiveBlockingService(this.server);
108 final RpcServerInterface rpc = this.server.getRpcServer();
109
110 final MonitoredRPCHandler status =
111 TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
112 status.pause("Setting up server-local call");
113
114 final long timestamp = EnvironmentEdgeManager.currentTimeMillis();
115 BlockingRpcChannel channel = new BlockingRpcChannel() {
116
117 @Override
118 public Message callBlockingMethod(MethodDescriptor method, RpcController controller,
119 Message request, Message responsePrototype) throws ServiceException {
120 try {
121
122 return rpc.call(blocking, method, request, null, timestamp, status).getFirst();
123 } catch (IOException e) {
124 throw new ServiceException(e);
125 }
126 }
127 };
128 return ClientService.newBlockingStub(channel);
129 }
130
131 public void abort(String why, Throwable e) {
132 delegate.abort(why, e);
133 }
134
135 public boolean isAborted() {
136 return delegate.isAborted();
137 }
138
139 public Configuration getConfiguration() {
140 return delegate.getConfiguration();
141 }
142
143 public HTableInterface getTable(String tableName) throws IOException {
144 return delegate.getTable(tableName);
145 }
146
147 public HTableInterface getTable(byte[] tableName) throws IOException {
148 return delegate.getTable(tableName);
149 }
150
151 public HTableInterface getTable(TableName tableName) throws IOException {
152 return delegate.getTable(tableName);
153 }
154
155 public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
156 return delegate.getTable(tableName, pool);
157 }
158
159 public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
160 return delegate.getTable(tableName, pool);
161 }
162
163 public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
164 return delegate.getTable(tableName, pool);
165 }
166
167 public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException {
168 return delegate.isMasterRunning();
169 }
170
171 public boolean isTableEnabled(TableName tableName) throws IOException {
172 return delegate.isTableEnabled(tableName);
173 }
174
175 public boolean isTableEnabled(byte[] tableName) throws IOException {
176 return delegate.isTableEnabled(tableName);
177 }
178
179 public boolean isTableDisabled(TableName tableName) throws IOException {
180 return delegate.isTableDisabled(tableName);
181 }
182
183 public boolean isTableDisabled(byte[] tableName) throws IOException {
184 return delegate.isTableDisabled(tableName);
185 }
186
187 public boolean isTableAvailable(TableName tableName) throws IOException {
188 return delegate.isTableAvailable(tableName);
189 }
190
191 public boolean isTableAvailable(byte[] tableName) throws IOException {
192 return delegate.isTableAvailable(tableName);
193 }
194
195 public boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException {
196 return delegate.isTableAvailable(tableName, splitKeys);
197 }
198
199 public boolean isTableAvailable(byte[] tableName, byte[][] splitKeys) throws IOException {
200 return delegate.isTableAvailable(tableName, splitKeys);
201 }
202
203 public HTableDescriptor[] listTables() throws IOException {
204 return delegate.listTables();
205 }
206
207 public String[] getTableNames() throws IOException {
208 return delegate.getTableNames();
209 }
210
211 public TableName[] listTableNames() throws IOException {
212 return delegate.listTableNames();
213 }
214
215 public HTableDescriptor getHTableDescriptor(TableName tableName) throws IOException {
216 return delegate.getHTableDescriptor(tableName);
217 }
218
219 public HTableDescriptor getHTableDescriptor(byte[] tableName) throws IOException {
220 return delegate.getHTableDescriptor(tableName);
221 }
222
223 public HRegionLocation locateRegion(TableName tableName, byte[] row) throws IOException {
224 return delegate.locateRegion(tableName, row);
225 }
226
227 public HRegionLocation locateRegion(byte[] tableName, byte[] row) throws IOException {
228 return delegate.locateRegion(tableName, row);
229 }
230
231 public void clearRegionCache() {
232 delegate.clearRegionCache();
233 }
234
235 public void clearRegionCache(TableName tableName) {
236 delegate.clearRegionCache(tableName);
237 }
238
239 public void clearRegionCache(byte[] tableName) {
240 delegate.clearRegionCache(tableName);
241 }
242
243 public HRegionLocation relocateRegion(TableName tableName, byte[] row) throws IOException {
244 return delegate.relocateRegion(tableName, row);
245 }
246
247 public HRegionLocation relocateRegion(byte[] tableName, byte[] row) throws IOException {
248 return delegate.relocateRegion(tableName, row);
249 }
250
251 public void updateCachedLocations(TableName tableName, byte[] rowkey, Object exception,
252 HRegionLocation source) {
253 delegate.updateCachedLocations(tableName, rowkey, exception, source);
254 }
255
256 public void updateCachedLocations(byte[] tableName, byte[] rowkey, Object exception,
257 HRegionLocation source) {
258 delegate.updateCachedLocations(tableName, rowkey, exception, source);
259 }
260
261 public HRegionLocation locateRegion(byte[] regionName) throws IOException {
262 return delegate.locateRegion(regionName);
263 }
264
265 public List<HRegionLocation> locateRegions(TableName tableName) throws IOException {
266 return delegate.locateRegions(tableName);
267 }
268
269 public List<HRegionLocation> locateRegions(byte[] tableName) throws IOException {
270 return delegate.locateRegions(tableName);
271 }
272
273 public List<HRegionLocation>
274 locateRegions(TableName tableName, boolean useCache, boolean offlined) throws IOException {
275 return delegate.locateRegions(tableName, useCache, offlined);
276 }
277
278 public List<HRegionLocation> locateRegions(byte[] tableName, boolean useCache, boolean offlined)
279 throws IOException {
280 return delegate.locateRegions(tableName, useCache, offlined);
281 }
282
283 public org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService.BlockingInterface getMaster()
284 throws IOException {
285 return delegate.getMaster();
286 }
287
288 public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface
289 getAdmin(ServerName serverName) throws IOException {
290 return delegate.getAdmin(serverName);
291 }
292
293 public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface
294 getAdmin(ServerName serverName, boolean getMaster) throws IOException {
295 return delegate.getAdmin(serverName, getMaster);
296 }
297
298 public HRegionLocation getRegionLocation(TableName tableName, byte[] row, boolean reload)
299 throws IOException {
300 return delegate.getRegionLocation(tableName, row, reload);
301 }
302
303 public HRegionLocation getRegionLocation(byte[] tableName, byte[] row, boolean reload)
304 throws IOException {
305 return delegate.getRegionLocation(tableName, row, reload);
306 }
307
308 public void processBatch(List<? extends Row> actions, TableName tableName, ExecutorService pool,
309 Object[] results) throws IOException, InterruptedException {
310 delegate.processBatch(actions, tableName, pool, results);
311 }
312
313 public void processBatch(List<? extends Row> actions, byte[] tableName, ExecutorService pool,
314 Object[] results) throws IOException, InterruptedException {
315 delegate.processBatch(actions, tableName, pool, results);
316 }
317
318 public <R> void processBatchCallback(List<? extends Row> list, TableName tableName,
319 ExecutorService pool, Object[] results, Callback<R> callback) throws IOException,
320 InterruptedException {
321 delegate.processBatchCallback(list, tableName, pool, results, callback);
322 }
323
324 public <R> void processBatchCallback(List<? extends Row> list, byte[] tableName,
325 ExecutorService pool, Object[] results, Callback<R> callback) throws IOException,
326 InterruptedException {
327 delegate.processBatchCallback(list, tableName, pool, results, callback);
328 }
329
330 public void setRegionCachePrefetch(TableName tableName, boolean enable) {
331 delegate.setRegionCachePrefetch(tableName, enable);
332 }
333
334 public void setRegionCachePrefetch(byte[] tableName, boolean enable) {
335 delegate.setRegionCachePrefetch(tableName, enable);
336 }
337
338 public boolean getRegionCachePrefetch(TableName tableName) {
339 return delegate.getRegionCachePrefetch(tableName);
340 }
341
342 public boolean getRegionCachePrefetch(byte[] tableName) {
343 return delegate.getRegionCachePrefetch(tableName);
344 }
345
346 public int getCurrentNrHRS() throws IOException {
347 return delegate.getCurrentNrHRS();
348 }
349
350 public HTableDescriptor[] getHTableDescriptorsByTableName(List<TableName> tableNames)
351 throws IOException {
352 return delegate.getHTableDescriptorsByTableName(tableNames);
353 }
354
355 public HTableDescriptor[] getHTableDescriptors(List<String> tableNames) throws IOException {
356 return delegate.getHTableDescriptors(tableNames);
357 }
358
359 public boolean isClosed() {
360 return delegate.isClosed();
361 }
362
363 public void clearCaches(ServerName sn) {
364 delegate.clearCaches(sn);
365 }
366
367 public void close() throws IOException {
368 delegate.close();
369 }
370
371 public void deleteCachedRegionLocation(HRegionLocation location) {
372 delegate.deleteCachedRegionLocation(location);
373 }
374
375 public MasterKeepAliveConnection getKeepAliveMasterService()
376 throws MasterNotRunningException {
377 return delegate.getKeepAliveMasterService();
378 }
379
380 public boolean isDeadServer(ServerName serverName) {
381 return delegate.isDeadServer(serverName);
382 }
383
384 @Override
385 public NonceGenerator getNonceGenerator() {
386 return null;
387 }
388 }