View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import com.google.common.collect.ImmutableList;
21  import com.google.common.collect.ImmutableMap;
22  import com.google.common.collect.ImmutableSet;
23  import com.google.common.collect.Maps;
24  import com.google.protobuf.Message;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.HBaseConfiguration;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.SmallTests;
29  import org.apache.hadoop.hbase.ipc.RpcServer.Call;
30  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
31  import org.junit.Before;
32  import org.junit.Test;
33  import org.junit.experimental.categories.Category;
34  import org.mockito.invocation.InvocationOnMock;
35  import org.mockito.stubbing.Answer;
36  
37  import java.io.IOException;
38  import java.net.InetSocketAddress;
39  import java.util.List;
40  import java.util.Map;
41  import java.util.concurrent.CountDownLatch;
42  
43  import static org.junit.Assert.assertEquals;
44  import static org.mockito.Matchers.anyObject;
45  import static org.mockito.Mockito.doAnswer;
46  import static org.mockito.Mockito.mock;
47  import static org.mockito.Mockito.timeout;
48  import static org.mockito.Mockito.verify;
49  import static org.mockito.Mockito.when;
50  
51  @Category(SmallTests.class)
52  public class TestSimpleRpcScheduler {
53  
54    private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
55      @Override
56      public InetSocketAddress getListenerAddress() {
57        return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
58      }
59    };
60    private Configuration conf;
61  
62    @Before
63    public void setUp() {
64      conf = HBaseConfiguration.create();
65    }
66  
67    @Test
68    public void testBasic() throws IOException, InterruptedException {
69      PriorityFunction qosFunction = mock(PriorityFunction.class);
70      RpcScheduler scheduler = new SimpleRpcScheduler(
71          conf, 10, 0, 0, qosFunction, 0);
72      scheduler.init(CONTEXT);
73      scheduler.start();
74      CallRunner task = createMockTask();
75      scheduler.dispatch(task);
76      verify(task, timeout(1000)).run();
77      scheduler.stop();
78    }
79  
80    @Test
81    public void testHandlerIsolation() throws IOException, InterruptedException {
82      CallRunner generalTask = createMockTask();
83      CallRunner priorityTask = createMockTask();
84      CallRunner replicationTask = createMockTask();
85      List<CallRunner> tasks = ImmutableList.of(
86          generalTask,
87          priorityTask,
88          replicationTask);
89      Map<CallRunner, Integer> qos = ImmutableMap.of(
90          generalTask, 0,
91          priorityTask, HConstants.HIGH_QOS + 1,
92          replicationTask, HConstants.REPLICATION_QOS);
93      PriorityFunction qosFunction = mock(PriorityFunction.class);
94      final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap();
95      final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
96      Answer<Void> answerToRun = new Answer<Void>() {
97        @Override
98        public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
99          synchronized (handlerThreads) {
100           handlerThreads.put(
101               (CallRunner) invocationOnMock.getMock(),
102               Thread.currentThread());
103         }
104         countDownLatch.countDown();
105         return null;
106       }
107     };
108     for (CallRunner task : tasks) {
109       doAnswer(answerToRun).when(task).run();
110     }
111 
112     RpcScheduler scheduler = new SimpleRpcScheduler(
113         conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS);
114     scheduler.init(CONTEXT);
115     scheduler.start();
116     for (CallRunner task : tasks) {
117       when(qosFunction.getPriority((RPCProtos.RequestHeader) anyObject(), (Message) anyObject()))
118           .thenReturn(qos.get(task));
119       scheduler.dispatch(task);
120     }
121     for (CallRunner task : tasks) {
122       verify(task, timeout(1000)).run();
123     }
124     scheduler.stop();
125 
126     // Tests that these requests are handled by three distinct threads.
127     countDownLatch.await();
128     assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
129   }
130 
131   private CallRunner createMockTask() {
132     Call call = mock(Call.class);
133     CallRunner task = mock(CallRunner.class);
134     when(task.getCall()).thenReturn(call);
135     return task;
136   }
137 }