View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.apache.hadoop.classification.InterfaceAudience;
23  import org.apache.hadoop.classification.InterfaceStability;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
26  import org.apache.hadoop.hbase.HConstants;
27  
28  /**
29   * A scheduler that maintains isolated handler pools for general, high-priority and replication
30   * requests.
31   */
32  @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
33  @InterfaceStability.Evolving
34  public class SimpleRpcScheduler extends RpcScheduler {
35    public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
36  
37    public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
38      "hbase.ipc.server.callqueue.read.share";
39    public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
40      "hbase.ipc.server.callqueue.handler.factor";
41    public static final String CALL_QUEUE_MAX_LENGTH_CONF_KEY =
42      "hbase.ipc.server.max.callqueue.length";
43  
44    private int port;
45    private final PriorityFunction priority;
46    private final RpcExecutor callExecutor;
47    private final RpcExecutor priorityExecutor;
48    private final RpcExecutor replicationExecutor;
49  
50    /** What level a high priority call is at. */
51    private final int highPriorityLevel;
52  
53    /**
54     * @param conf
55     * @param handlerCount the number of handler threads that will be used to process calls
56     * @param priorityHandlerCount How many threads for priority handling.
57     * @param replicationHandlerCount How many threads for replication handling.
58     * @param highPriorityLevel
59     * @param priority Function to extract request priority.
60     */
61    public SimpleRpcScheduler(
62        Configuration conf,
63        int handlerCount,
64        int priorityHandlerCount,
65        int replicationHandlerCount,
66        PriorityFunction priority,
67        int highPriorityLevel) {
68      int maxQueueLength = conf.getInt(CALL_QUEUE_MAX_LENGTH_CONF_KEY,
69        conf.getInt("ipc.server.max.callqueue.length",
70          handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER));
71      this.priority = priority;
72      this.highPriorityLevel = highPriorityLevel;
73  
74      float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY,
75        conf.getFloat("ipc.server.callqueue.read.share", 0));
76  
77      float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY,
78        conf.getFloat("ipc.server.callqueue.handler.factor", 0));
79      int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
80  
81      LOG.info("Using default user call queue, count=" + numCallQueues);
82  
83      if (numCallQueues > 1 && callqReadShare > 0) {
84        // multiple read/write queues
85        callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
86            callqReadShare, maxQueueLength);
87      } else {
88        // multiple queues
89        callExecutor = new BalancedQueueRpcExecutor("default", handlerCount,
90            numCallQueues, maxQueueLength);
91      }
92  
93     this.priorityExecutor =
94       priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount,
95         1, maxQueueLength) : null;
96     this.replicationExecutor =
97       replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
98         replicationHandlerCount, 1, maxQueueLength) : null;
99    }
100 
101   @Override
102   public void init(Context context) {
103     this.port = context.getListenerAddress().getPort();
104   }
105 
106     @Override
107   public void start() {
108     callExecutor.start(port);
109     if (priorityExecutor != null) priorityExecutor.start(port);
110     if (replicationExecutor != null) replicationExecutor.start(port);
111   }
112 
113   @Override
114   public void stop() {
115     callExecutor.stop();
116     if (priorityExecutor != null) priorityExecutor.stop();
117     if (replicationExecutor != null) replicationExecutor.stop();
118   }
119 
120   @Override
121   public void dispatch(CallRunner callTask) throws InterruptedException {
122     RpcServer.Call call = callTask.getCall();
123     int level = priority.getPriority(call.getHeader(), call.param);
124     if (priorityExecutor != null && level > highPriorityLevel) {
125       priorityExecutor.dispatch(callTask);
126     } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
127       replicationExecutor.dispatch(callTask);
128     } else {
129       callExecutor.dispatch(callTask);
130     }
131   }
132 
133   @Override
134   public int getGeneralQueueLength() {
135     return callExecutor.getQueueLength();
136   }
137 
138   @Override
139   public int getPriorityQueueLength() {
140     return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
141   }
142 
143   @Override
144   public int getReplicationQueueLength() {
145     return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
146   }
147 
148   @Override
149   public int getActiveRpcHandlerCount() {
150     return callExecutor.getActiveHandlerCount() +
151            (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) +
152            (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
153   }
154 }
155