1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.apache.hadoop.classification.InterfaceAudience;
23 import org.apache.hadoop.classification.InterfaceStability;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
26 import org.apache.hadoop.hbase.HConstants;
27
28
29
30
31
32 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
33 @InterfaceStability.Evolving
34 public class SimpleRpcScheduler extends RpcScheduler {
35 public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
36
37 public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
38 "hbase.ipc.server.callqueue.read.share";
39 public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
40 "hbase.ipc.server.callqueue.handler.factor";
41 public static final String CALL_QUEUE_MAX_LENGTH_CONF_KEY =
42 "hbase.ipc.server.max.callqueue.length";
43
44 private int port;
45 private final PriorityFunction priority;
46 private final RpcExecutor callExecutor;
47 private final RpcExecutor priorityExecutor;
48 private final RpcExecutor replicationExecutor;
49
50
51 private final int highPriorityLevel;
52
53
54
55
56
57
58
59
60
61 public SimpleRpcScheduler(
62 Configuration conf,
63 int handlerCount,
64 int priorityHandlerCount,
65 int replicationHandlerCount,
66 PriorityFunction priority,
67 int highPriorityLevel) {
68 int maxQueueLength = conf.getInt(CALL_QUEUE_MAX_LENGTH_CONF_KEY,
69 conf.getInt("ipc.server.max.callqueue.length",
70 handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER));
71 this.priority = priority;
72 this.highPriorityLevel = highPriorityLevel;
73
74 float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY,
75 conf.getFloat("ipc.server.callqueue.read.share", 0));
76
77 float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY,
78 conf.getFloat("ipc.server.callqueue.handler.factor", 0));
79 int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
80
81 LOG.info("Using default user call queue, count=" + numCallQueues);
82
83 if (numCallQueues > 1 && callqReadShare > 0) {
84
85 callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
86 callqReadShare, maxQueueLength);
87 } else {
88
89 callExecutor = new BalancedQueueRpcExecutor("default", handlerCount,
90 numCallQueues, maxQueueLength);
91 }
92
93 this.priorityExecutor =
94 priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount,
95 1, maxQueueLength) : null;
96 this.replicationExecutor =
97 replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
98 replicationHandlerCount, 1, maxQueueLength) : null;
99 }
100
101 @Override
102 public void init(Context context) {
103 this.port = context.getListenerAddress().getPort();
104 }
105
106 @Override
107 public void start() {
108 callExecutor.start(port);
109 if (priorityExecutor != null) priorityExecutor.start(port);
110 if (replicationExecutor != null) replicationExecutor.start(port);
111 }
112
113 @Override
114 public void stop() {
115 callExecutor.stop();
116 if (priorityExecutor != null) priorityExecutor.stop();
117 if (replicationExecutor != null) replicationExecutor.stop();
118 }
119
120 @Override
121 public void dispatch(CallRunner callTask) throws InterruptedException {
122 RpcServer.Call call = callTask.getCall();
123 int level = priority.getPriority(call.getHeader(), call.param);
124 if (priorityExecutor != null && level > highPriorityLevel) {
125 priorityExecutor.dispatch(callTask);
126 } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
127 replicationExecutor.dispatch(callTask);
128 } else {
129 callExecutor.dispatch(callTask);
130 }
131 }
132
133 @Override
134 public int getGeneralQueueLength() {
135 return callExecutor.getQueueLength();
136 }
137
138 @Override
139 public int getPriorityQueueLength() {
140 return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
141 }
142
143 @Override
144 public int getReplicationQueueLength() {
145 return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
146 }
147
148 @Override
149 public int getActiveRpcHandlerCount() {
150 return callExecutor.getActiveHandlerCount() +
151 (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) +
152 (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
153 }
154 }
155