View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.io.IOException;
27  import java.net.InetSocketAddress;
28  import java.util.ArrayList;
29  import java.util.List;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.HBaseConfiguration;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.MediumTests;
37  import org.apache.hadoop.hbase.ServerName;
38  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
39  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg;
40  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse;
41  import org.apache.hadoop.hbase.security.User;
42  import org.apache.log4j.AppenderSkeleton;
43  import org.apache.log4j.Level;
44  import org.apache.log4j.Logger;
45  import org.apache.log4j.spi.LoggingEvent;
46  import org.junit.Test;
47  import org.junit.experimental.categories.Category;
48  
49  import com.google.common.collect.Lists;
50  import com.google.protobuf.BlockingRpcChannel;
51  import com.google.protobuf.BlockingService;
52  import com.google.protobuf.RpcController;
53  import com.google.protobuf.ServiceException;
54  
55  /**
56   * Test that delayed RPCs work. Fire up three calls, the first of which should
57   * be delayed. Check that the last two, which are undelayed, return before the
58   * first one.
59   */
60  @Category(MediumTests.class) // Fails sometimes with small tests
61  public class TestDelayedRpc {
62    private static final Log LOG = LogFactory.getLog(TestDelayedRpc.class);
63    public static RpcServerInterface rpcServer;
64    public static final int UNDELAYED = 0;
65    public static final int DELAYED = 1;
66    private static final int RPC_CLIENT_TIMEOUT = 30000;
67  
68    @Test (timeout=60000)
69    public void testDelayedRpcImmediateReturnValue() throws Exception {
70      testDelayedRpc(false);
71    }
72  
73    @Test (timeout=60000)
74    public void testDelayedRpcDelayedReturnValue() throws Exception {
75      testDelayedRpc(true);
76    }
77  
78    private void testDelayedRpc(boolean delayReturnValue) throws Exception {
79      LOG.info("Running testDelayedRpc delayReturnValue=" + delayReturnValue);
80      Configuration conf = HBaseConfiguration.create();
81      InetSocketAddress isa = new InetSocketAddress("localhost", 0);
82      TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
83      BlockingService service =
84        TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
85      rpcServer = new RpcServer(null, "testDelayedRpc",
86          Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
87          isa,
88          conf,
89          new FifoRpcScheduler(conf, 1));
90      rpcServer.start();
91      RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
92      try {
93        BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
94            ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
95                rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
96            User.getCurrent(), RPC_CLIENT_TIMEOUT);
97        TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
98          TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
99        List<Integer> results = new ArrayList<Integer>();
100       // Setting true sets 'delayed' on the client.
101       TestThread th1 = new TestThread(stub, true, results);
102       // Setting 'false' means we will return UNDELAYED as response immediately.
103       TestThread th2 = new TestThread(stub, false, results);
104       TestThread th3 = new TestThread(stub, false, results);
105       th1.start();
106       Thread.sleep(100);
107       th2.start();
108       Thread.sleep(200);
109       th3.start();
110 
111       th1.join();
112       th2.join();
113       th3.join();
114 
115       // We should get the two undelayed responses first.
116       assertEquals(UNDELAYED, results.get(0).intValue());
117       assertEquals(UNDELAYED, results.get(1).intValue());
118       assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :  0xDEADBEEF);
119     } finally {
120       rpcClient.stop();
121     }
122   }
123 
124   private static class ListAppender extends AppenderSkeleton {
125     private final List<String> messages = new ArrayList<String>();
126 
127     @Override
128     protected void append(LoggingEvent event) {
129       messages.add(event.getMessage().toString());
130     }
131 
132     @Override
133     public void close() {
134     }
135 
136     @Override
137     public boolean requiresLayout() {
138       return false;
139     }
140 
141     public List<String> getMessages() {
142       return messages;
143     }
144   }
145 
146   /**
147    * Tests that we see a WARN message in the logs.
148    * @throws Exception
149    */
150   @Test (timeout=60000)
151   public void testTooManyDelayedRpcs() throws Exception {
152     Configuration conf = HBaseConfiguration.create();
153     final int MAX_DELAYED_RPC = 10;
154     conf.setInt("hbase.ipc.warn.delayedrpc.number", MAX_DELAYED_RPC);
155     // Set up an appender to catch the "Too many delayed calls" that we expect.
156     ListAppender listAppender = new ListAppender();
157     Logger log = Logger.getLogger("org.apache.hadoop.ipc.RpcServer");
158     log.addAppender(listAppender);
159     log.setLevel(Level.WARN);
160 
161 
162     InetSocketAddress isa = new InetSocketAddress("localhost", 0);
163     TestDelayedImplementation instance = new TestDelayedImplementation(true);
164     BlockingService service =
165       TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
166     rpcServer = new RpcServer(null, "testTooManyDelayedRpcs",
167       Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
168         isa,
169         conf,
170         new FifoRpcScheduler(conf, 1));
171     rpcServer.start();
172     RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
173     try {
174       BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
175           ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
176               rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
177           User.getCurrent(), RPC_CLIENT_TIMEOUT);
178       TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
179         TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
180       Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
181       for (int i = 0; i < MAX_DELAYED_RPC; i++) {
182         threads[i] = new TestThread(stub, true, null);
183         threads[i].start();
184       }
185 
186       /* No warnings till here. */
187       assertTrue(listAppender.getMessages().isEmpty());
188 
189       /* This should give a warning. */
190       threads[MAX_DELAYED_RPC] = new TestThread(stub, true, null);
191       threads[MAX_DELAYED_RPC].start();
192 
193       for (int i = 0; i < MAX_DELAYED_RPC; i++) {
194         threads[i].join();
195       }
196 
197       assertFalse(listAppender.getMessages().isEmpty());
198       assertTrue(listAppender.getMessages().get(0).startsWith("Too many delayed calls"));
199 
200       log.removeAppender(listAppender);
201     } finally {
202       rpcClient.stop();
203     }
204   }
205 
206   public static class TestDelayedImplementation
207   implements TestDelayedRpcProtos.TestDelayedService.BlockingInterface {
208     /**
209      * Should the return value of delayed call be set at the end of the delay
210      * or at call return.
211      */
212     private final boolean delayReturnValue;
213 
214     /**
215      * @param delayReturnValue Should the response to the delayed call be set
216      * at the start or the end of the delay.
217      */
218     public TestDelayedImplementation(boolean delayReturnValue) {
219       this.delayReturnValue = delayReturnValue;
220     }
221 
222     @Override
223     public TestResponse test(final RpcController rpcController, final TestArg testArg)
224     throws ServiceException {
225       boolean delay = testArg.getDelay();
226       TestResponse.Builder responseBuilder = TestResponse.newBuilder();
227       if (!delay) {
228         responseBuilder.setResponse(UNDELAYED);
229         return responseBuilder.build();
230       }
231       final Delayable call = RpcServer.getCurrentCall();
232       call.startDelay(delayReturnValue);
233       new Thread() {
234         @Override
235         public void run() {
236           try {
237             Thread.sleep(500);
238             TestResponse.Builder responseBuilder = TestResponse.newBuilder();
239             call.endDelay(delayReturnValue ?
240                 responseBuilder.setResponse(DELAYED).build() : null);
241           } catch (Exception e) {
242             e.printStackTrace();
243           }
244         }
245       }.start();
246       // This value should go back to client only if the response is set
247       // immediately at delay time.
248       responseBuilder.setResponse(0xDEADBEEF);
249       return responseBuilder.build();
250     }
251   }
252 
253   public static class TestThread extends Thread {
254     private final TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub;
255     private final boolean delay;
256     private final List<Integer> results;
257 
258     public TestThread(TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub,
259         boolean delay, List<Integer> results) {
260       this.stub = stub;
261       this.delay = delay;
262       this.results = results;
263     }
264 
265     @Override
266     public void run() {
267       Integer result;
268       try {
269         result = new Integer(stub.test(null, TestArg.newBuilder().setDelay(delay).build()).
270           getResponse());
271       } catch (ServiceException e) {
272         throw new RuntimeException(e);
273       }
274       if (results != null) {
275         synchronized (results) {
276           results.add(result);
277         }
278       }
279     }
280   }
281 
282   @Test
283   public void testEndDelayThrowing() throws IOException {
284     Configuration conf = HBaseConfiguration.create();
285     InetSocketAddress isa = new InetSocketAddress("localhost", 0);
286     FaultyTestDelayedImplementation instance = new FaultyTestDelayedImplementation();
287     BlockingService service =
288       TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
289     rpcServer = new RpcServer(null, "testEndDelayThrowing",
290         Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
291         isa,
292         conf,
293         new FifoRpcScheduler(conf, 1));
294     rpcServer.start();
295     RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
296     try {
297       BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
298           ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
299               rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
300         User.getCurrent(), 1000);
301       TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
302         TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
303 
304       int result = 0xDEADBEEF;
305 
306       try {
307         result = stub.test(null, TestArg.newBuilder().setDelay(false).build()).getResponse();
308       } catch (Exception e) {
309         fail("No exception should have been thrown.");
310       }
311       assertEquals(result, UNDELAYED);
312 
313       boolean caughtException = false;
314       try {
315         result = stub.test(null, TestArg.newBuilder().setDelay(true).build()).getResponse();
316       } catch(Exception e) {
317         // Exception thrown by server is enclosed in a RemoteException.
318         if (e.getCause().getMessage().contains("java.lang.Exception: Something went wrong")) {
319           caughtException = true;
320         }
321         LOG.warn("Caught exception, expected=" + caughtException);
322       }
323       assertTrue(caughtException);
324     } finally {
325       rpcClient.stop();
326     }
327   }
328 
329   /**
330    * Delayed calls to this class throw an exception.
331    */
332   private static class FaultyTestDelayedImplementation extends TestDelayedImplementation {
333     public FaultyTestDelayedImplementation() {
334       super(false);
335     }
336 
337     @Override
338     public TestResponse test(RpcController rpcController, TestArg arg)
339     throws ServiceException {
340       LOG.info("In faulty test, delay=" + arg.getDelay());
341       if (!arg.getDelay()) return TestResponse.newBuilder().setResponse(UNDELAYED).build();
342       Delayable call = RpcServer.getCurrentCall();
343       call.startDelay(true);
344       LOG.info("In faulty test, delaying");
345       try {
346         call.endDelayThrowing(new Exception("Something went wrong"));
347       } catch (IOException e) {
348         e.printStackTrace();
349       }
350       // Client will receive the Exception, not this value.
351       return TestResponse.newBuilder().setResponse(DELAYED).build();
352     }
353   }
354 }