1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.CellScannable;
28 import org.apache.hadoop.hbase.DoNotRetryIOException;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.HRegionLocation;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
33 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
34 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
35 import org.apache.hadoop.hbase.protobuf.RequestConverter;
36 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
37 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
38 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
39 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
40 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
41 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
42
43 import com.google.protobuf.ServiceException;
44
45
46
47
48
49
50
51 class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
52 private final MultiAction<R> multiAction;
53 private final boolean cellBlock;
54 private RpcControllerFactory rpcFactory;
55
56 MultiServerCallable(final HConnection connection, final TableName tableName,
57 final HRegionLocation location, final RpcControllerFactory rpcFactory,
58 final MultiAction<R> multi) {
59 super(connection, tableName, null);
60 this.multiAction = multi;
61 this.rpcFactory = rpcFactory;
62 setLocation(location);
63 this.cellBlock = isCellBlock();
64 }
65
66 MultiAction<R> getMulti() {
67 return this.multiAction;
68 }
69
70 @Override
71 public MultiResponse call() throws IOException {
72 int countOfActions = this.multiAction.size();
73 if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
74 MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
75 RegionAction.Builder regionActionBuilder = RegionAction.newBuilder();
76 ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
77 MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
78 List<CellScannable> cells = null;
79
80 long nonceGroup = multiAction.getNonceGroup();
81 if (nonceGroup != HConstants.NO_NONCE) {
82 multiRequestBuilder.setNonceGroup(nonceGroup);
83 }
84 for (Map.Entry<byte[], List<Action<R>>> e: this.multiAction.actions.entrySet()) {
85 final byte [] regionName = e.getKey();
86 final List<Action<R>> actions = e.getValue();
87 regionActionBuilder.clear();
88 regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
89 HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName) );
90
91
92 if (this.cellBlock) {
93
94 if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
95
96
97 regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells,
98 regionActionBuilder, actionBuilder, mutationBuilder);
99 } else {
100 regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions,
101 regionActionBuilder, actionBuilder, mutationBuilder);
102 }
103 multiRequestBuilder.addRegionAction(regionActionBuilder.build());
104 }
105
106
107
108 PayloadCarryingRpcController controller = rpcFactory.newController(cells);
109 controller.setPriority(getTableName());
110 ClientProtos.MultiResponse responseProto;
111 ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
112 try {
113 responseProto = getStub().multi(controller, requestProto);
114 } catch (ServiceException e) {
115 throw ProtobufUtil.getRemoteException(e);
116 }
117 return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
118 }
119
120
121
122
123
124
125
126 private boolean isCellBlock() {
127
128
129 HConnection connection = getConnection();
130 if (connection == null) return true;
131 Configuration configuration = connection.getConfiguration();
132 if (configuration == null) return true;
133 String codec = configuration.get(HConstants.RPC_CODEC_CONF_KEY, "");
134 return codec != null && codec.length() > 0;
135 }
136
137 @Override
138 public void prepare(boolean reload) throws IOException {
139
140 setStub(getConnection().getClient(getLocation().getServerName()));
141 }
142 }