View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import java.io.IOException;
22  
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.concurrent.BlockingQueue;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.classification.InterfaceAudience;
31  import org.apache.hadoop.classification.InterfaceStability;
32  import org.apache.hadoop.conf.Configuration;
33  
34  import com.google.common.base.Strings;
35  import com.google.common.collect.Lists;
36  
37  @InterfaceAudience.Private
38  @InterfaceStability.Evolving
39  public abstract class RpcExecutor {
40    private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
41  
42    private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
43    private final List<Thread> handlers;
44    private final int handlerCount;
45    private final String name;
46  
47    private boolean running;
48  
49    public RpcExecutor(final String name, final int handlerCount) {
50      this.handlers = new ArrayList<Thread>(handlerCount);
51      this.handlerCount = handlerCount;
52      this.name = Strings.nullToEmpty(name);
53    }
54  
55    public void start(final int port) {
56      running = true;
57      startHandlers(port);
58    }
59  
60    public void stop() {
61      running = false;
62      for (Thread handler : handlers) {
63        handler.interrupt();
64      }
65    }
66  
67    public int getActiveHandlerCount() {
68      return activeHandlerCount.get();
69    }
70  
71    /** Returns the length of the pending queue */
72    public abstract int getQueueLength();
73  
74    /** Add the request to the executor queue */
75    public abstract void dispatch(final CallRunner callTask) throws InterruptedException;
76  
77    /** Returns the list of request queues */
78    protected abstract List<BlockingQueue<CallRunner>> getQueues();
79  
80    protected void startHandlers(final int port) {
81      List<BlockingQueue<CallRunner>> callQueues = getQueues();
82      startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port);
83    }
84  
85    protected void startHandlers(final String nameSuffix, final int numHandlers,
86        final List<BlockingQueue<CallRunner>> callQueues,
87        final int qindex, final int qsize, final int port) {
88      final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
89      for (int i = 0; i < numHandlers; i++) {
90        final int index = qindex + (i % qsize);
91        Thread t = new Thread(new Runnable() {
92          @Override
93          public void run() {
94            consumerLoop(callQueues.get(index));
95          }
96        });
97        t.setDaemon(true);
98        t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
99            ",queue=" + index + ",port=" + port);
100       t.start();
101       LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
102       handlers.add(t);
103     }
104   }
105 
106   protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
107     boolean interrupted = false;
108     try {
109       while (running) {
110         try {
111           CallRunner task = myQueue.take();
112           try {
113             activeHandlerCount.incrementAndGet();
114             task.run();
115           } finally {
116             activeHandlerCount.decrementAndGet();
117           }
118         } catch (InterruptedException e) {
119           interrupted = true;
120         }
121       }
122     } finally {
123       if (interrupted) {
124         Thread.currentThread().interrupt();
125       }
126     }
127   }
128 }