1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.LinkedBlockingQueue;
25
26 import org.apache.commons.lang.ArrayUtils;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.Abortable;
31 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.classification.InterfaceStability;
34 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
35 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
36 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
37 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
38 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
39 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
40 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
41 import org.apache.hadoop.hbase.util.ReflectionUtils;
42
43 import com.google.protobuf.Message;
44
45
46
47
48
49
50 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
51 @InterfaceStability.Evolving
52 public class RWQueueRpcExecutor extends RpcExecutor {
53 private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
54
55 private final List<BlockingQueue<CallRunner>> queues;
56 private final QueueBalancer writeBalancer;
57 private final QueueBalancer readBalancer;
58 private final QueueBalancer scanBalancer;
59 private final int writeHandlersCount;
60 private final int readHandlersCount;
61 private final int scanHandlersCount;
62 private final int numWriteQueues;
63 private final int numReadQueues;
64 private final int numScanQueues;
65
66 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
67 final float readShare, final int maxQueueLength,
68 final Configuration conf, final Abortable abortable) {
69 this(name, handlerCount, numQueues, readShare, maxQueueLength, 0,
70 conf, abortable, LinkedBlockingQueue.class);
71 }
72
73 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
74 final float readShare, final float scanShare, final int maxQueueLength) {
75 this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, null, null);
76 }
77
78 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
79 final float readShare, final float scanShare, final int maxQueueLength,
80 final Configuration conf, final Abortable abortable) {
81 this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength,
82 conf, abortable, LinkedBlockingQueue.class);
83 }
84
85 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
86 final float readShare, final int maxQueueLength,
87 final Configuration conf, final Abortable abortable,
88 final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
89 this(name, handlerCount, numQueues, readShare, 0, maxQueueLength, conf, abortable,
90 readQueueClass, readQueueInitArgs);
91 }
92
93 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
94 final float readShare, final float scanShare, final int maxQueueLength,
95 final Configuration conf, final Abortable abortable,
96 final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
97 this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
98 calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
99 LinkedBlockingQueue.class, new Object[] {maxQueueLength},
100 readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
101 }
102
103 public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers,
104 final int numWriteQueues, final int numReadQueues,
105 final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
106 final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
107 this(name, writeHandlers, readHandlers, numWriteQueues, numReadQueues, 0,
108 writeQueueClass, writeQueueInitArgs, readQueueClass, readQueueInitArgs);
109 }
110
111 public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers,
112 int numWriteQueues, int numReadQueues, float scanShare,
113 final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
114 final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
115 super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues));
116
117 int numScanQueues = Math.max(0, (int)Math.floor(numReadQueues * scanShare));
118 int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * scanShare));
119 if ((numReadQueues - numScanQueues) > 0) {
120 numReadQueues -= numScanQueues;
121 readHandlers -= scanHandlers;
122 } else {
123 numScanQueues = 0;
124 scanHandlers = 0;
125 }
126
127 this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
128 this.readHandlersCount = Math.max(readHandlers, numReadQueues);
129 this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
130 this.numWriteQueues = numWriteQueues;
131 this.numReadQueues = numReadQueues;
132 this.numScanQueues = numScanQueues;
133 this.writeBalancer = getBalancer(numWriteQueues);
134 this.readBalancer = getBalancer(numReadQueues);
135 this.scanBalancer = getBalancer(numScanQueues);
136
137 queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
138 LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
139 " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
140 ((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
141 " scanHandlers=" + scanHandlersCount));
142
143 for (int i = 0; i < numWriteQueues; ++i) {
144 queues.add((BlockingQueue<CallRunner>)
145 ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
146 }
147
148 for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
149 queues.add((BlockingQueue<CallRunner>)
150 ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
151 }
152 }
153
154 @Override
155 protected void startHandlers(final int port) {
156 startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
157 startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
158 startHandlers(".scan", scanHandlersCount, queues,
159 numWriteQueues + numReadQueues, numScanQueues, port);
160 }
161
162 @Override
163 public boolean dispatch(final CallRunner callTask) throws InterruptedException {
164 RpcServer.Call call = callTask.getCall();
165 int queueIndex;
166 if (isWriteRequest(call.getHeader(), call.param)) {
167 queueIndex = writeBalancer.getNextQueue();
168 } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) {
169 queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
170 } else {
171 queueIndex = numWriteQueues + readBalancer.getNextQueue();
172 }
173 return queues.get(queueIndex).offer(callTask);
174 }
175
176 private boolean isWriteRequest(final RequestHeader header, final Message param) {
177
178 if (param instanceof MultiRequest) {
179 MultiRequest multi = (MultiRequest)param;
180 for (RegionAction regionAction : multi.getRegionActionList()) {
181 for (Action action: regionAction.getActionList()) {
182 if (action.hasMutation()) {
183 return true;
184 }
185 }
186 }
187 }
188 if (param instanceof MutateRequest) {
189 return true;
190 }
191
192
193
194
195
196
197 if (param instanceof RegionServerStatusProtos.ReportRegionStateTransitionRequest) {
198 return true;
199 }
200 if (param instanceof RegionServerStatusProtos.RegionServerStartupRequest) {
201 return true;
202 }
203 if (param instanceof RegionServerStatusProtos.RegionServerReportRequest) {
204 return true;
205 }
206 return false;
207 }
208
209 private boolean isScanRequest(final RequestHeader header, final Message param) {
210 if (param instanceof ScanRequest) {
211
212 ScanRequest request = (ScanRequest)param;
213 return request.hasScannerId();
214 }
215 return false;
216 }
217
218 @Override
219 public int getQueueLength() {
220 int length = 0;
221 for (final BlockingQueue<CallRunner> queue: queues) {
222 length += queue.size();
223 }
224 return length;
225 }
226
227 @Override
228 protected List<BlockingQueue<CallRunner>> getQueues() {
229 return queues;
230 }
231
232
233
234
235
236 private static int calcNumWriters(final int count, final float readShare) {
237 return Math.max(1, count - Math.max(1, (int)Math.round(count * readShare)));
238 }
239
240
241
242
243
244 private static int calcNumReaders(final int count, final float readShare) {
245 return count - calcNumWriters(count, readShare);
246 }
247 }