1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.Random;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.LinkedBlockingQueue;
25
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.classification.InterfaceStability;
28 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
29 import org.apache.hadoop.hbase.util.ReflectionUtils;
30
31 import com.google.common.base.Preconditions;
32
33
34
35
36
37 @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
38 @InterfaceStability.Evolving
39 public class BalancedQueueRpcExecutor extends RpcExecutor {
40
41 protected final List<BlockingQueue<CallRunner>> queues;
42 private QueueBalancer balancer;
43
44 public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
45 final int maxQueueLength) {
46 this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength);
47 }
48
49 public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
50 final Class<? extends BlockingQueue> queueClass, Object... initargs) {
51 super(name, Math.max(handlerCount, numQueues));
52 queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
53 this.balancer = getBalancer(numQueues);
54 initializeQueues(numQueues, queueClass, initargs);
55 }
56
57 protected void initializeQueues(final int numQueues,
58 final Class<? extends BlockingQueue> queueClass, Object... initargs) {
59 for (int i = 0; i < numQueues; ++i) {
60 queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(queueClass, initargs));
61 }
62 }
63
64 @Override
65 public void dispatch(final CallRunner callTask) throws InterruptedException {
66 int queueIndex = balancer.getNextQueue();
67 queues.get(queueIndex).put(callTask);
68 }
69
70 @Override
71 public int getQueueLength() {
72 int length = 0;
73 for (final BlockingQueue<CallRunner> queue : queues) {
74 length += queue.size();
75 }
76 return length;
77 }
78
79 @Override
80 public List<BlockingQueue<CallRunner>> getQueues() {
81 return queues;
82 }
83
84 private static abstract class QueueBalancer {
85
86
87
88 public abstract int getNextQueue();
89 }
90
91 public static QueueBalancer getBalancer(int queueSize) {
92 Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
93 if (queueSize == 1) {
94 return ONE_QUEUE;
95 } else {
96 return new RandomQueueBalancer(queueSize);
97 }
98 }
99
100
101
102
103 private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
104
105 @Override
106 public int getNextQueue() {
107 return 0;
108 }
109 };
110
111
112
113
114 private static class RandomQueueBalancer extends QueueBalancer {
115 private int queueSize;
116 private Random random;
117
118 public RandomQueueBalancer(int queueSize) {
119 this.queueSize = queueSize;
120 this.random = new Random();
121 }
122
123 public int getNextQueue() {
124 return random.nextInt(queueSize);
125 }
126 }
127 }