View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import java.util.ArrayList;
21  import java.util.List;
22  import java.util.Random;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.LinkedBlockingQueue;
25  
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.classification.InterfaceStability;
28  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
29  import org.apache.hadoop.hbase.util.ReflectionUtils;
30  
31  import com.google.common.base.Preconditions;
32  
33  /**
34   * An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains
35   * efficient with a single queue via an inlinable queue balancing mechanism.
36   */
37  @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
38  @InterfaceStability.Evolving
39  public class BalancedQueueRpcExecutor extends RpcExecutor {
40  
41    protected final List<BlockingQueue<CallRunner>> queues;
42    private QueueBalancer balancer;
43  
44    public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
45        final int maxQueueLength) {
46      this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength);
47    }
48  
49    public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
50        final Class<? extends BlockingQueue> queueClass, Object... initargs) {
51      super(name, Math.max(handlerCount, numQueues));
52      queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
53      this.balancer = getBalancer(numQueues);
54      initializeQueues(numQueues, queueClass, initargs);
55    }
56  
57    protected void initializeQueues(final int numQueues,
58        final Class<? extends BlockingQueue> queueClass, Object... initargs) {
59      for (int i = 0; i < numQueues; ++i) {
60        queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(queueClass, initargs));
61      }
62    }
63  
64    @Override
65    public void dispatch(final CallRunner callTask) throws InterruptedException {
66      int queueIndex = balancer.getNextQueue();
67      queues.get(queueIndex).put(callTask);
68    }
69  
70    @Override
71    public int getQueueLength() {
72      int length = 0;
73      for (final BlockingQueue<CallRunner> queue : queues) {
74        length += queue.size();
75      }
76      return length;
77    }
78  
79    @Override
80    public List<BlockingQueue<CallRunner>> getQueues() {
81      return queues;
82    }
83  
84    private static abstract class QueueBalancer {
85      /**
86       * @return the index of the next queue to which a request should be inserted
87       */
88      public abstract int getNextQueue();
89    }
90  
91    public static QueueBalancer getBalancer(int queueSize) {
92      Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
93      if (queueSize == 1) {
94        return ONE_QUEUE;
95      } else {
96        return new RandomQueueBalancer(queueSize);
97      }
98    }
99  
100   /**
101    * All requests go to the first queue, at index 0
102    */
103   private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
104 
105     @Override
106     public int getNextQueue() {
107       return 0;
108     }
109   };
110 
111   /**
112    * Queue balancer that just randomly selects a queue in the range [0, num queues).
113    */
114   private static class RandomQueueBalancer extends QueueBalancer {
115     private int queueSize;
116     private Random random;
117 
118     public RandomQueueBalancer(int queueSize) {
119       this.queueSize = queueSize;
120       this.random = new Random();
121     }
122 
123     public int getNextQueue() {
124       return random.nextInt(queueSize);
125     }
126   }
127 }