View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import com.google.common.collect.ImmutableList;
21  import com.google.common.collect.Lists;
22  import com.google.protobuf.BlockingService;
23  import com.google.protobuf.Descriptors.MethodDescriptor;
24  import com.google.protobuf.Message;
25  import com.google.protobuf.RpcController;
26  import com.google.protobuf.ServiceException;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Abortable;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellScanner;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.HBaseConfiguration;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.client.MetricsConnection;
38  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
39  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
40  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
41  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
42  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
43  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
44  import org.apache.hadoop.hbase.security.User;
45  import org.apache.hadoop.hbase.testclassification.SmallTests;
46  import org.apache.hadoop.hbase.util.Pair;
47  import org.junit.Ignore;
48  import org.junit.Test;
49  import org.junit.experimental.categories.Category;
50  import org.mockito.Mockito;
51  
52  import java.io.IOException;
53  import java.net.InetSocketAddress;
54  import java.util.ArrayList;
55  import java.util.List;
56  
57  import static org.mockito.Mockito.mock;
58  
59  @Category({SmallTests.class})
60  public class TestRpcHandlerException {
61    private static final Log LOG = LogFactory.getLog(TestRpcHandlerException.class);
62    static String example = "xyz";
63    static byte[] CELL_BYTES = example.getBytes();
64    static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
65  
66    private final static Configuration CONF = HBaseConfiguration.create();
67    RpcExecutor rpcExecutor = Mockito.mock(RpcExecutor.class);
68  
69    // We are using the test TestRpcServiceProtos generated classes and Service because they are
70    // available and basic with methods like 'echo', and ping. Below we make a blocking service
71    // by passing in implementation of blocking interface. We use this service in all tests that
72    // follow.
73    private static final BlockingService SERVICE =
74        TestRpcServiceProtos.TestProtobufRpcProto
75        .newReflectiveBlockingService(new TestRpcServiceProtos
76  		  .TestProtobufRpcProto.BlockingInterface() {
77  
78          @Override
79          public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
80              throws ServiceException {
81            return null;
82          }
83  
84          @Override
85          public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
86              throws ServiceException {
87            return null;
88          }
89  
90          @Override
91          public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
92              throws Error, RuntimeException {
93            if (controller instanceof PayloadCarryingRpcController) {
94              PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
95              // If cells, scan them to check we are able to iterate what we were given and since
96              // this is
97              // an echo, just put them back on the controller creating a new block. Tests our
98              // block
99              // building.
100             CellScanner cellScanner = pcrc.cellScanner();
101             List<Cell> list = null;
102             if (cellScanner != null) {
103 		list = new ArrayList<Cell>();
104 		try {
105 			while (cellScanner.advance()) {
106 				list.add(cellScanner.current());
107 				throw new StackOverflowError();
108 			}
109 		} catch (StackOverflowError e) {
110 			throw e;
111 		} catch (IOException e) {
112 			throw new RuntimeException(e);
113 		}
114             }
115             cellScanner = CellUtil.createCellScanner(list);
116             ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
117           }
118           return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
119         }
120       });
121 
122   /**
123    * Instance of server. We actually don't do anything speical in here so could just use
124    * HBaseRpcServer directly.
125    */
126   private static class TestRpcServer extends RpcServer {
127 
128     TestRpcServer() throws IOException {
129       this(new FifoRpcScheduler(CONF, 1));
130     }
131 
132     TestRpcServer(RpcScheduler scheduler) throws IOException {
133       super(null, "testRpcServer",
134 		  Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
135 		  new InetSocketAddress("localhost", 0), CONF, scheduler);
136     }
137 
138     @Override
139     public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
140       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
141           throws IOException {
142       return super.call(service, md, param, cellScanner, receiveTime, status);
143     }
144   }
145 
146   /** Tests that the rpc scheduler is called when requests arrive.
147    *  When Rpc handler thread dies, the client will hang and the test will fail.
148    *  The test is meant to be a unit test to test the behavior.
149    *
150    * */
151   private class AbortServer implements Abortable {
152     private boolean aborted = false;
153 
154     @Override
155     public void abort(String why, Throwable e) {
156       aborted = true;
157     }
158 
159     @Override
160     public boolean isAborted() {
161       return aborted;
162     }
163   }
164 
165   /* This is a unit test to make sure to abort region server when the number of Rpc handler thread
166    * caught errors exceeds the threshold. Client will hang when RS aborts.
167    */
168   @Ignore
169   @Test
170   public void testRpcScheduler() throws IOException, InterruptedException {
171     PriorityFunction qosFunction = mock(PriorityFunction.class);
172     Abortable abortable = new AbortServer();
173     RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable, 0);
174     RpcServer rpcServer = new TestRpcServer(scheduler);
175     RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT);
176     try {
177       rpcServer.start();
178       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
179       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
180       PayloadCarryingRpcController controller =
181           new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
182       InetSocketAddress address = rpcServer.getListenerAddress();
183       if (address == null) {
184         throw new IOException("Listener channel is closed");
185       }
186       client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(),
187           address, new MetricsConnection.CallStats());
188     } catch (Throwable e) {
189       assert(abortable.isAborted() == true);
190     } finally {
191       rpcServer.stop();
192     }
193   }
194 
195 }