1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import java.io.IOException;
22
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.classification.InterfaceStability;
32 import org.apache.hadoop.conf.Configuration;
33
34 import com.google.common.base.Strings;
35 import com.google.common.collect.Lists;
36
37 @InterfaceAudience.Private
38 @InterfaceStability.Evolving
39 public abstract class RpcExecutor {
40 private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
41
42 private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
43 private final List<Thread> handlers;
44 private final int handlerCount;
45 private final String name;
46
47 private boolean running;
48
49 public RpcExecutor(final String name, final int handlerCount) {
50 this.handlers = new ArrayList<Thread>(handlerCount);
51 this.handlerCount = handlerCount;
52 this.name = Strings.nullToEmpty(name);
53 }
54
55 public void start(final int port) {
56 running = true;
57 startHandlers(port);
58 }
59
60 public void stop() {
61 running = false;
62 for (Thread handler : handlers) {
63 handler.interrupt();
64 }
65 }
66
67 public int getActiveHandlerCount() {
68 return activeHandlerCount.get();
69 }
70
71
72 public abstract int getQueueLength();
73
74
75 public abstract void dispatch(final CallRunner callTask) throws InterruptedException;
76
77
78 protected abstract List<BlockingQueue<CallRunner>> getQueues();
79
80 protected void startHandlers(final int port) {
81 List<BlockingQueue<CallRunner>> callQueues = getQueues();
82 startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port);
83 }
84
85 protected void startHandlers(final String nameSuffix, final int numHandlers,
86 final List<BlockingQueue<CallRunner>> callQueues,
87 final int qindex, final int qsize, final int port) {
88 final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
89 for (int i = 0; i < numHandlers; i++) {
90 final int index = qindex + (i % qsize);
91 Thread t = new Thread(new Runnable() {
92 @Override
93 public void run() {
94 consumerLoop(callQueues.get(index));
95 }
96 });
97 t.setDaemon(true);
98 t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
99 ",queue=" + index + ",port=" + port);
100 t.start();
101 LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
102 handlers.add(t);
103 }
104 }
105
106 protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
107 boolean interrupted = false;
108 try {
109 while (running) {
110 try {
111 CallRunner task = myQueue.take();
112 try {
113 activeHandlerCount.incrementAndGet();
114 task.run();
115 } finally {
116 activeHandlerCount.decrementAndGet();
117 }
118 } catch (InterruptedException e) {
119 interrupted = true;
120 }
121 }
122 } finally {
123 if (interrupted) {
124 Thread.currentThread().interrupt();
125 }
126 }
127 }
128 }