1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import java.io.IOException;
22
23 import org.apache.hadoop.hbase.util.ByteStringer;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.TableName;
28 import org.apache.hadoop.hbase.client.HConnection;
29 import org.apache.hadoop.hbase.client.RegionServerCallable;
30 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
31 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
32 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
33 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
34 import org.apache.hadoop.hbase.util.Bytes;
35
36 import com.google.protobuf.Descriptors;
37 import com.google.protobuf.Message;
38
39
40
41
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
49 private static Log LOG = LogFactory.getLog(RegionCoprocessorRpcChannel.class);
50
51 private final HConnection connection;
52 private final TableName table;
53 private final byte[] row;
54 private byte[] lastRegion;
55
56 private RpcRetryingCallerFactory rpcFactory;
57
58 private RpcControllerFactory rpcController;
59
60 public RegionCoprocessorRpcChannel(HConnection conn, TableName table, byte[] row,
61 RpcRetryingCallerFactory rpcFactory, RpcControllerFactory rpcControllerFactory) {
62 this.connection = conn;
63 this.table = table;
64 this.row = row;
65 this.rpcFactory = rpcFactory;
66 this.rpcController = rpcControllerFactory;
67 }
68
69 @Override
70 protected Message callExecService(Descriptors.MethodDescriptor method,
71 Message request, Message responsePrototype)
72 throws IOException {
73 if (LOG.isTraceEnabled()) {
74 LOG.trace("Call: "+method.getName()+", "+request.toString());
75 }
76
77 if (row == null) {
78 throw new IllegalArgumentException("Missing row property for remote region location");
79 }
80
81 final ClientProtos.CoprocessorServiceCall call =
82 ClientProtos.CoprocessorServiceCall.newBuilder()
83 .setRow(ByteStringer.wrap(row))
84 .setServiceName(method.getService().getFullName())
85 .setMethodName(method.getName())
86 .setRequest(request.toByteString()).build();
87 final PayloadCarryingRpcController controller = rpcController.newController();
88 controller.setPriority(table);
89 RegionServerCallable<CoprocessorServiceResponse> callable =
90 new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
91 public CoprocessorServiceResponse call() throws Exception {
92 byte[] regionName = getLocation().getRegionInfo().getRegionName();
93 return ProtobufUtil.execService(getStub(), call, regionName, controller);
94 }
95 };
96 CoprocessorServiceResponse result = rpcFactory.<CoprocessorServiceResponse> newCaller()
97 .callWithRetries(callable);
98 Message response = null;
99 if (result.getValue().hasValue()) {
100 response = responsePrototype.newBuilderForType()
101 .mergeFrom(result.getValue().getValue()).build();
102 } else {
103 response = responsePrototype.getDefaultInstanceForType();
104 }
105 lastRegion = result.getRegion().getValue().toByteArray();
106 if (LOG.isTraceEnabled()) {
107 LOG.trace("Result is region=" + Bytes.toStringBinary(lastRegion) + ", value=" + response);
108 }
109 return response;
110 }
111
112 public byte[] getLastRegion() {
113 return lastRegion;
114 }
115 }