1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Random;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.LinkedBlockingQueue;
26
27 import org.apache.commons.lang.ArrayUtils;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.classification.InterfaceStability;
32 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
33 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
34 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
35 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
36 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
37 import org.apache.hadoop.hbase.util.ReflectionUtils;
38
39 import com.google.protobuf.Message;
40
41
42
43
44
45 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
46 @InterfaceStability.Evolving
47 public class RWQueueRpcExecutor extends RpcExecutor {
48 private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
49
50 private final List<BlockingQueue<CallRunner>> queues;
51 private final Random balancer = new Random();
52 private final int writeHandlersCount;
53 private final int readHandlersCount;
54 private final int numWriteQueues;
55 private final int numReadQueues;
56
57 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
58 final float readShare, final int maxQueueLength) {
59 this(name, handlerCount, numQueues, readShare, maxQueueLength,
60 LinkedBlockingQueue.class);
61 }
62
63 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
64 final float readShare, final int maxQueueLength,
65 final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
66 this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
67 calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare),
68 LinkedBlockingQueue.class, new Object[] {maxQueueLength},
69 readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
70 }
71
72 public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers,
73 final int numWriteQueues, final int numReadQueues,
74 final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
75 final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
76 super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues));
77
78 this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
79 this.readHandlersCount = Math.max(readHandlers, numReadQueues);
80 this.numWriteQueues = numWriteQueues;
81 this.numReadQueues = numReadQueues;
82
83 queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
84 LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
85 " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount);
86
87 for (int i = 0; i < numWriteQueues; ++i) {
88 queues.add((BlockingQueue<CallRunner>)
89 ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
90 }
91
92 for (int i = 0; i < numReadQueues; ++i) {
93 queues.add((BlockingQueue<CallRunner>)
94 ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
95 }
96 }
97
98 @Override
99 protected void startHandlers(final int port) {
100 startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
101 startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
102 }
103
104 @Override
105 public void dispatch(final CallRunner callTask) throws InterruptedException {
106 RpcServer.Call call = callTask.getCall();
107 int queueIndex;
108 if (isWriteRequest(call.getHeader(), call.param)) {
109 queueIndex = balancer.nextInt(numWriteQueues);
110 } else {
111 queueIndex = numWriteQueues + balancer.nextInt(numReadQueues);
112 }
113 queues.get(queueIndex).put(callTask);
114 }
115
116 private boolean isWriteRequest(final RequestHeader header, final Message param) {
117
118 String methodName = header.getMethodName();
119 if (methodName.equalsIgnoreCase("multi") && param instanceof MultiRequest) {
120 MultiRequest multi = (MultiRequest)param;
121 for (RegionAction regionAction : multi.getRegionActionList()) {
122 for (Action action: regionAction.getActionList()) {
123 if (action.hasMutation()) {
124 return true;
125 }
126 }
127 }
128 }
129 return false;
130 }
131
132 @Override
133 public int getQueueLength() {
134 int length = 0;
135 for (final BlockingQueue<CallRunner> queue: queues) {
136 length += queue.size();
137 }
138 return length;
139 }
140
141 @Override
142 protected List<BlockingQueue<CallRunner>> getQueues() {
143 return queues;
144 }
145
146
147
148
149
150 private static int calcNumWriters(final int count, final float readShare) {
151 return Math.max(1, count - Math.max(1, (int)Math.round(count * readShare)));
152 }
153
154
155
156
157
158 private static int calcNumReaders(final int count, final float readShare) {
159 return count - calcNumWriters(count, readShare);
160 }
161 }