View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import com.google.common.collect.ImmutableList;
21  import com.google.common.collect.ImmutableMap;
22  import com.google.common.collect.ImmutableSet;
23  import com.google.common.collect.Maps;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import com.google.protobuf.Message;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.HBaseConfiguration;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.client.Put;
31  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
32  import org.apache.hadoop.hbase.security.User;
33  import org.apache.hadoop.hbase.testclassification.SmallTests;
34  import org.apache.hadoop.hbase.ipc.RpcServer.Call;
35  import org.apache.hadoop.hbase.protobuf.RequestConverter;
36  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
37  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
38  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.util.Threads;
41  import org.junit.Before;
42  import org.junit.Test;
43  import org.junit.experimental.categories.Category;
44  import org.mockito.invocation.InvocationOnMock;
45  import org.mockito.stubbing.Answer;
46  
47  import java.io.IOException;
48  import java.net.InetSocketAddress;
49  import java.util.ArrayList;
50  import java.util.List;
51  import java.util.Map;
52  import java.util.concurrent.CountDownLatch;
53  
54  import static org.junit.Assert.assertEquals;
55  import static org.junit.Assert.assertNotEquals;
56  import static org.mockito.Matchers.any;
57  import static org.mockito.Matchers.anyObject;
58  import static org.mockito.Matchers.eq;
59  import static org.mockito.Mockito.doAnswer;
60  import static org.mockito.Mockito.mock;
61  import static org.mockito.Mockito.timeout;
62  import static org.mockito.Mockito.verify;
63  import static org.mockito.Mockito.when;
64  
65  @Category(SmallTests.class)
66  public class TestSimpleRpcScheduler {
67    private static final Log LOG = LogFactory.getLog(TestSimpleRpcScheduler.class);
68  
69    private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
70      @Override
71      public InetSocketAddress getListenerAddress() {
72        return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
73      }
74    };
75    private Configuration conf;
76  
77    @Before
78    public void setUp() {
79      conf = HBaseConfiguration.create();
80    }
81  
82    @Test
83    public void testBasic() throws IOException, InterruptedException {
84      PriorityFunction qosFunction = mock(PriorityFunction.class);
85      RpcScheduler scheduler = new SimpleRpcScheduler(
86          conf, 10, 0, 0, qosFunction, 0);
87      scheduler.init(CONTEXT);
88      scheduler.start();
89      CallRunner task = createMockTask();
90      task.setStatus(new MonitoredRPCHandlerImpl());
91      scheduler.dispatch(task);
92      verify(task, timeout(1000)).run();
93      scheduler.stop();
94    }
95  
96    @Test
97    public void testHandlerIsolation() throws IOException, InterruptedException {
98      CallRunner generalTask = createMockTask();
99      CallRunner priorityTask = createMockTask();
100     CallRunner replicationTask = createMockTask();
101     List<CallRunner> tasks = ImmutableList.of(
102         generalTask,
103         priorityTask,
104         replicationTask);
105     Map<CallRunner, Integer> qos = ImmutableMap.of(
106         generalTask, 0,
107         priorityTask, HConstants.HIGH_QOS + 1,
108         replicationTask, HConstants.REPLICATION_QOS);
109     PriorityFunction qosFunction = mock(PriorityFunction.class);
110     final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap();
111     final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
112     Answer<Void> answerToRun = new Answer<Void>() {
113       @Override
114       public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
115         synchronized (handlerThreads) {
116           handlerThreads.put(
117               (CallRunner) invocationOnMock.getMock(),
118               Thread.currentThread());
119         }
120         countDownLatch.countDown();
121         return null;
122       }
123     };
124     for (CallRunner task : tasks) {
125       task.setStatus(new MonitoredRPCHandlerImpl());
126       doAnswer(answerToRun).when(task).run();
127     }
128 
129     RpcScheduler scheduler = new SimpleRpcScheduler(
130         conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS);
131     scheduler.init(CONTEXT);
132     scheduler.start();
133     for (CallRunner task : tasks) {
134       when(qosFunction.getPriority((RPCProtos.RequestHeader) anyObject(),
135         (Message) anyObject(), (User) anyObject()))
136           .thenReturn(qos.get(task));
137       scheduler.dispatch(task);
138     }
139     for (CallRunner task : tasks) {
140       verify(task, timeout(1000)).run();
141     }
142     scheduler.stop();
143 
144     // Tests that these requests are handled by three distinct threads.
145     countDownLatch.await();
146     assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
147   }
148 
149   private CallRunner createMockTask() {
150     Call call = mock(Call.class);
151     CallRunner task = mock(CallRunner.class);
152     when(task.getCall()).thenReturn(call);
153     return task;
154   }
155 
156   @Test
157   public void testRpcScheduler() throws Exception {
158     testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
159     testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
160   }
161 
162   private void testRpcScheduler(final String queueType) throws Exception {
163     Configuration schedConf = HBaseConfiguration.create();
164     schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType);
165 
166     PriorityFunction priority = mock(PriorityFunction.class);
167     when(priority.getPriority(any(RequestHeader.class),
168       any(Message.class), any(User.class)))
169       .thenReturn(HConstants.NORMAL_QOS);
170 
171     RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
172                                                     HConstants.QOS_THRESHOLD);
173     try {
174       scheduler.start();
175 
176       CallRunner smallCallTask = mock(CallRunner.class);
177       RpcServer.Call smallCall = mock(RpcServer.Call.class);
178       RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
179       when(smallCallTask.getCall()).thenReturn(smallCall);
180       when(smallCall.getHeader()).thenReturn(smallHead);
181 
182       CallRunner largeCallTask = mock(CallRunner.class);
183       RpcServer.Call largeCall = mock(RpcServer.Call.class);
184       RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
185       when(largeCallTask.getCall()).thenReturn(largeCall);
186       when(largeCall.getHeader()).thenReturn(largeHead);
187 
188       CallRunner hugeCallTask = mock(CallRunner.class);
189       RpcServer.Call hugeCall = mock(RpcServer.Call.class);
190       RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
191       when(hugeCallTask.getCall()).thenReturn(hugeCall);
192       when(hugeCall.getHeader()).thenReturn(hugeHead);
193 
194       when(priority.getDeadline(eq(smallHead), any(Message.class))).thenReturn(0L);
195       when(priority.getDeadline(eq(largeHead), any(Message.class))).thenReturn(50L);
196       when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L);
197 
198       final ArrayList<Integer> work = new ArrayList<Integer>();
199       doAnswerTaskExecution(smallCallTask, work, 10, 250);
200       doAnswerTaskExecution(largeCallTask, work, 50, 250);
201       doAnswerTaskExecution(hugeCallTask, work, 100, 250);
202 
203       scheduler.dispatch(smallCallTask);
204       scheduler.dispatch(smallCallTask);
205       scheduler.dispatch(smallCallTask);
206       scheduler.dispatch(hugeCallTask);
207       scheduler.dispatch(smallCallTask);
208       scheduler.dispatch(largeCallTask);
209       scheduler.dispatch(smallCallTask);
210       scheduler.dispatch(smallCallTask);
211 
212       while (work.size() < 8) {
213         Threads.sleepWithoutInterrupt(100);
214       }
215 
216       int seqSum = 0;
217       int totalTime = 0;
218       for (int i = 0; i < work.size(); ++i) {
219         LOG.debug("Request i=" + i + " value=" + work.get(i));
220         seqSum += work.get(i);
221         totalTime += seqSum;
222       }
223       LOG.debug("Total Time: " + totalTime);
224 
225       // -> [small small small huge small large small small]
226       // -> NO REORDER   [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue)
227       // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
228       if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
229         assertEquals(530, totalTime);
230       } else /* if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) */ {
231         assertEquals(930, totalTime);
232       }
233     } finally {
234       scheduler.stop();
235     }
236   }
237 
238   @Test
239   public void testScanQueues() throws Exception {
240     Configuration schedConf = HBaseConfiguration.create();
241     schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
242     schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
243     schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
244 
245     PriorityFunction priority = mock(PriorityFunction.class);
246     when(priority.getPriority(any(RequestHeader.class), any(Message.class),
247       any(User.class))).thenReturn(HConstants.NORMAL_QOS);
248 
249     RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority,
250                                                     HConstants.QOS_THRESHOLD);
251     try {
252       scheduler.start();
253 
254       CallRunner putCallTask = mock(CallRunner.class);
255       RpcServer.Call putCall = mock(RpcServer.Call.class);
256       putCall.param = RequestConverter.buildMutateRequest(
257           Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
258       RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
259       when(putCallTask.getCall()).thenReturn(putCall);
260       when(putCall.getHeader()).thenReturn(putHead);
261 
262       CallRunner getCallTask = mock(CallRunner.class);
263       RpcServer.Call getCall = mock(RpcServer.Call.class);
264       RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
265       when(getCallTask.getCall()).thenReturn(getCall);
266       when(getCall.getHeader()).thenReturn(getHead);
267 
268       CallRunner scanCallTask = mock(CallRunner.class);
269       RpcServer.Call scanCall = mock(RpcServer.Call.class);
270       scanCall.param = ScanRequest.newBuilder().setScannerId(1).build();
271       RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
272       when(scanCallTask.getCall()).thenReturn(scanCall);
273       when(scanCall.getHeader()).thenReturn(scanHead);
274 
275       ArrayList<Integer> work = new ArrayList<Integer>();
276       doAnswerTaskExecution(putCallTask, work, 1, 1000);
277       doAnswerTaskExecution(getCallTask, work, 2, 1000);
278       doAnswerTaskExecution(scanCallTask, work, 3, 1000);
279 
280       // There are 3 queues: [puts], [gets], [scans]
281       // so the calls will be interleaved
282       scheduler.dispatch(putCallTask);
283       scheduler.dispatch(putCallTask);
284       scheduler.dispatch(putCallTask);
285       scheduler.dispatch(getCallTask);
286       scheduler.dispatch(getCallTask);
287       scheduler.dispatch(getCallTask);
288       scheduler.dispatch(scanCallTask);
289       scheduler.dispatch(scanCallTask);
290       scheduler.dispatch(scanCallTask);
291 
292       while (work.size() < 6) {
293         Threads.sleepWithoutInterrupt(100);
294       }
295 
296       for (int i = 0; i < work.size() - 2; i += 3) {
297         assertNotEquals(work.get(i + 0), work.get(i + 1));
298         assertNotEquals(work.get(i + 0), work.get(i + 2));
299         assertNotEquals(work.get(i + 1), work.get(i + 2));
300       }
301     } finally {
302       scheduler.stop();
303     }
304   }
305 
306   private void doAnswerTaskExecution(final CallRunner callTask,
307       final ArrayList<Integer> results, final int value, final int sleepInterval) {
308     callTask.setStatus(new MonitoredRPCHandlerImpl());
309     doAnswer(new Answer<Object>() {
310       @Override
311       public Object answer(InvocationOnMock invocation) {
312         synchronized (results) {
313           results.add(value);
314         }
315         Threads.sleepWithoutInterrupt(sleepInterval);
316         return null;
317       }
318     }).when(callTask).run();
319   }
320 }