View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import java.io.IOException;
21  import java.util.List;
22  import java.util.concurrent.ExecutorService;
23  
24  import org.apache.hadoop.classification.InterfaceAudience;
25  import org.apache.hadoop.classification.InterfaceStability;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.CoprocessorEnvironment;
28  import org.apache.hadoop.hbase.HRegionLocation;
29  import org.apache.hadoop.hbase.HTableDescriptor;
30  import org.apache.hadoop.hbase.MasterNotRunningException;
31  import org.apache.hadoop.hbase.ServerName;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
34  import org.apache.hadoop.hbase.client.HConnection;
35  import org.apache.hadoop.hbase.client.HConnectionManager;
36  import org.apache.hadoop.hbase.client.HTableInterface;
37  import org.apache.hadoop.hbase.client.Row;
38  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
39  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
40  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
41  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
42  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
43  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
44  import org.apache.hadoop.hbase.regionserver.HRegionServer;
45  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
46  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47  
48  import com.google.protobuf.BlockingRpcChannel;
49  import com.google.protobuf.BlockingService;
50  import com.google.protobuf.Descriptors.MethodDescriptor;
51  import com.google.protobuf.Message;
52  import com.google.protobuf.RpcController;
53  import com.google.protobuf.ServiceException;
54  
55  /**
56   * Connection to an HTable from within a Coprocessor. We can do some nice tricks since we know we
57   * are on a regionserver, for instance skipping the full serialization/deserialization of objects
58   * when talking to the server.
59   * <p>
60   * You should not use this class from any client - its an internal class meant for use by the
61   * coprocessor framework.
62   */
63  @InterfaceAudience.Private
64  @InterfaceStability.Evolving
65  public class CoprocessorHConnection implements HConnection {
66  
67    /**
68     * Create an unmanaged {@link HConnection} based on the environment in which we are running the
69     * coprocessor. The {@link HConnection} must be externally cleaned up (we bypass the usual HTable
70     * cleanup mechanisms since we own everything).
71     * @param env environment hosting the {@link HConnection}
72     * @return an unmanaged {@link HConnection}.
73     * @throws IOException if we cannot create the basic connection
74     */
75    public static HConnection getConnectionForEnvironment(CoprocessorEnvironment env)
76        throws IOException {
77      HConnection connection = HConnectionManager.createConnection(env.getConfiguration());
78      // this bit is a little hacky - just trying to get it going for the moment
79      if (env instanceof RegionCoprocessorEnvironment) {
80        RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env;
81        RegionServerServices services = e.getRegionServerServices();
82        if (services instanceof HRegionServer) {
83          return new CoprocessorHConnection(connection, (HRegionServer) services);
84        }
85      }
86      return connection;
87    }
88  
89    private HConnection delegate;
90    private ServerName serverName;
91    private HRegionServer server;
92  
93    public CoprocessorHConnection(HConnection delegate, HRegionServer server) {
94      this.server = server;
95      this.serverName = server.getServerName();
96      this.delegate = delegate;
97    }
98  
99    public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface
100       getClient(ServerName serverName) throws IOException {
101     // client is trying to reach off-server, so we can't do anything special
102     if (!this.serverName.equals(serverName)) {
103       return delegate.getClient(serverName);
104     }
105     // the client is attempting to write to the same regionserver, we can short-circuit to our
106     // local regionserver
107     final BlockingService blocking = ClientService.newReflectiveBlockingService(this.server);
108     final RpcServerInterface rpc = this.server.getRpcServer();
109 
110     final MonitoredRPCHandler status =
111         TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
112     status.pause("Setting up server-local call");
113 
114     final long timestamp = EnvironmentEdgeManager.currentTimeMillis();
115     BlockingRpcChannel channel = new BlockingRpcChannel() {
116 
117       @Override
118       public Message callBlockingMethod(MethodDescriptor method, RpcController controller,
119           Message request, Message responsePrototype) throws ServiceException {
120         try {
121           // we never need a cell-scanner - everything is already fully formed
122           return rpc.call(blocking, method, request, null, timestamp, status).getFirst();
123         } catch (IOException e) {
124           throw new ServiceException(e);
125         }
126       }
127     };
128     return ClientService.newBlockingStub(channel);
129   }
130 
131   public void abort(String why, Throwable e) {
132     delegate.abort(why, e);
133   }
134 
135   public boolean isAborted() {
136     return delegate.isAborted();
137   }
138 
139   public Configuration getConfiguration() {
140     return delegate.getConfiguration();
141   }
142 
143   public HTableInterface getTable(String tableName) throws IOException {
144     return delegate.getTable(tableName);
145   }
146 
147   public HTableInterface getTable(byte[] tableName) throws IOException {
148     return delegate.getTable(tableName);
149   }
150 
151   public HTableInterface getTable(TableName tableName) throws IOException {
152     return delegate.getTable(tableName);
153   }
154 
155   public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
156     return delegate.getTable(tableName, pool);
157   }
158 
159   public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
160     return delegate.getTable(tableName, pool);
161   }
162 
163   public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
164     return delegate.getTable(tableName, pool);
165   }
166 
167   public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException {
168     return delegate.isMasterRunning();
169   }
170 
171   public boolean isTableEnabled(TableName tableName) throws IOException {
172     return delegate.isTableEnabled(tableName);
173   }
174 
175   public boolean isTableEnabled(byte[] tableName) throws IOException {
176     return delegate.isTableEnabled(tableName);
177   }
178 
179   public boolean isTableDisabled(TableName tableName) throws IOException {
180     return delegate.isTableDisabled(tableName);
181   }
182 
183   public boolean isTableDisabled(byte[] tableName) throws IOException {
184     return delegate.isTableDisabled(tableName);
185   }
186 
187   public boolean isTableAvailable(TableName tableName) throws IOException {
188     return delegate.isTableAvailable(tableName);
189   }
190 
191   public boolean isTableAvailable(byte[] tableName) throws IOException {
192     return delegate.isTableAvailable(tableName);
193   }
194 
195   public boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException {
196     return delegate.isTableAvailable(tableName, splitKeys);
197   }
198 
199   public boolean isTableAvailable(byte[] tableName, byte[][] splitKeys) throws IOException {
200     return delegate.isTableAvailable(tableName, splitKeys);
201   }
202 
203   public HTableDescriptor[] listTables() throws IOException {
204     return delegate.listTables();
205   }
206 
207   public String[] getTableNames() throws IOException {
208     return delegate.getTableNames();
209   }
210 
211   public TableName[] listTableNames() throws IOException {
212     return delegate.listTableNames();
213   }
214 
215   public HTableDescriptor getHTableDescriptor(TableName tableName) throws IOException {
216     return delegate.getHTableDescriptor(tableName);
217   }
218 
219   public HTableDescriptor getHTableDescriptor(byte[] tableName) throws IOException {
220     return delegate.getHTableDescriptor(tableName);
221   }
222 
223   public HRegionLocation locateRegion(TableName tableName, byte[] row) throws IOException {
224     return delegate.locateRegion(tableName, row);
225   }
226 
227   public HRegionLocation locateRegion(byte[] tableName, byte[] row) throws IOException {
228     return delegate.locateRegion(tableName, row);
229   }
230 
231   public void clearRegionCache() {
232     delegate.clearRegionCache();
233   }
234 
235   public void clearRegionCache(TableName tableName) {
236     delegate.clearRegionCache(tableName);
237   }
238 
239   public void clearRegionCache(byte[] tableName) {
240     delegate.clearRegionCache(tableName);
241   }
242 
243   public HRegionLocation relocateRegion(TableName tableName, byte[] row) throws IOException {
244     return delegate.relocateRegion(tableName, row);
245   }
246 
247   public HRegionLocation relocateRegion(byte[] tableName, byte[] row) throws IOException {
248     return delegate.relocateRegion(tableName, row);
249   }
250 
251   public void updateCachedLocations(TableName tableName, byte[] rowkey, Object exception,
252       HRegionLocation source) {
253     delegate.updateCachedLocations(tableName, rowkey, exception, source);
254   }
255 
256   public void updateCachedLocations(byte[] tableName, byte[] rowkey, Object exception,
257       HRegionLocation source) {
258     delegate.updateCachedLocations(tableName, rowkey, exception, source);
259   }
260 
261   public HRegionLocation locateRegion(byte[] regionName) throws IOException {
262     return delegate.locateRegion(regionName);
263   }
264 
265   public List<HRegionLocation> locateRegions(TableName tableName) throws IOException {
266     return delegate.locateRegions(tableName);
267   }
268 
269   public List<HRegionLocation> locateRegions(byte[] tableName) throws IOException {
270     return delegate.locateRegions(tableName);
271   }
272 
273   public List<HRegionLocation>
274       locateRegions(TableName tableName, boolean useCache, boolean offlined) throws IOException {
275     return delegate.locateRegions(tableName, useCache, offlined);
276   }
277 
278   public List<HRegionLocation> locateRegions(byte[] tableName, boolean useCache, boolean offlined)
279       throws IOException {
280     return delegate.locateRegions(tableName, useCache, offlined);
281   }
282 
283   public org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService.BlockingInterface getMaster()
284   throws IOException {
285     return delegate.getMaster();
286   }
287 
288   public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface
289       getAdmin(ServerName serverName) throws IOException {
290     return delegate.getAdmin(serverName);
291   }
292 
293   public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface
294       getAdmin(ServerName serverName, boolean getMaster) throws IOException {
295     return delegate.getAdmin(serverName, getMaster);
296   }
297 
298   public HRegionLocation getRegionLocation(TableName tableName, byte[] row, boolean reload)
299       throws IOException {
300     return delegate.getRegionLocation(tableName, row, reload);
301   }
302 
303   public HRegionLocation getRegionLocation(byte[] tableName, byte[] row, boolean reload)
304       throws IOException {
305     return delegate.getRegionLocation(tableName, row, reload);
306   }
307 
308   public void processBatch(List<? extends Row> actions, TableName tableName, ExecutorService pool,
309       Object[] results) throws IOException, InterruptedException {
310     delegate.processBatch(actions, tableName, pool, results);
311   }
312 
313   public void processBatch(List<? extends Row> actions, byte[] tableName, ExecutorService pool,
314       Object[] results) throws IOException, InterruptedException {
315     delegate.processBatch(actions, tableName, pool, results);
316   }
317 
318   public <R> void processBatchCallback(List<? extends Row> list, TableName tableName,
319       ExecutorService pool, Object[] results, Callback<R> callback) throws IOException,
320       InterruptedException {
321     delegate.processBatchCallback(list, tableName, pool, results, callback);
322   }
323 
324   public <R> void processBatchCallback(List<? extends Row> list, byte[] tableName,
325       ExecutorService pool, Object[] results, Callback<R> callback) throws IOException,
326       InterruptedException {
327     delegate.processBatchCallback(list, tableName, pool, results, callback);
328   }
329 
330   public void setRegionCachePrefetch(TableName tableName, boolean enable) {
331     delegate.setRegionCachePrefetch(tableName, enable);
332   }
333 
334   public void setRegionCachePrefetch(byte[] tableName, boolean enable) {
335     delegate.setRegionCachePrefetch(tableName, enable);
336   }
337 
338   public boolean getRegionCachePrefetch(TableName tableName) {
339     return delegate.getRegionCachePrefetch(tableName);
340   }
341 
342   public boolean getRegionCachePrefetch(byte[] tableName) {
343     return delegate.getRegionCachePrefetch(tableName);
344   }
345 
346   public int getCurrentNrHRS() throws IOException {
347     return delegate.getCurrentNrHRS();
348   }
349 
350   public HTableDescriptor[] getHTableDescriptorsByTableName(List<TableName> tableNames)
351       throws IOException {
352     return delegate.getHTableDescriptorsByTableName(tableNames);
353   }
354 
355   public HTableDescriptor[] getHTableDescriptors(List<String> tableNames) throws IOException {
356     return delegate.getHTableDescriptors(tableNames);
357   }
358 
359   public boolean isClosed() {
360     return delegate.isClosed();
361   }
362 
363   public void clearCaches(ServerName sn) {
364     delegate.clearCaches(sn);
365   }
366 
367   public void close() throws IOException {
368     delegate.close();
369   }
370 
371   public void deleteCachedRegionLocation(HRegionLocation location) {
372     delegate.deleteCachedRegionLocation(location);
373   }
374 
375   public MasterKeepAliveConnection getKeepAliveMasterService()
376       throws MasterNotRunningException {
377     return delegate.getKeepAliveMasterService();
378   }
379 
380   public boolean isDeadServer(ServerName serverName) {
381     return delegate.isDeadServer(serverName);
382   }
383 
384   @Override
385   public NonceGenerator getNonceGenerator() {
386     return null; // don't use nonces for coprocessor connection
387   }
388 }