1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import com.google.common.collect.ImmutableList;
21 import com.google.common.collect.Lists;
22 import com.google.protobuf.BlockingService;
23 import com.google.protobuf.Descriptors.MethodDescriptor;
24 import com.google.protobuf.Message;
25 import com.google.protobuf.RpcController;
26 import com.google.protobuf.ServiceException;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.Abortable;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellScanner;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.HBaseConfiguration;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.KeyValue;
37 import org.apache.hadoop.hbase.client.MetricsConnection;
38 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
39 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
40 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
41 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
42 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
43 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
44 import org.apache.hadoop.hbase.security.User;
45 import org.apache.hadoop.hbase.testclassification.SmallTests;
46 import org.apache.hadoop.hbase.util.Pair;
47 import org.junit.Ignore;
48 import org.junit.Test;
49 import org.junit.experimental.categories.Category;
50 import org.mockito.Mockito;
51
52 import java.io.IOException;
53 import java.net.InetSocketAddress;
54 import java.util.ArrayList;
55 import java.util.List;
56
57 import static org.mockito.Mockito.mock;
58
59 @Category({SmallTests.class})
60 public class TestRpcHandlerException {
61 private static final Log LOG = LogFactory.getLog(TestRpcHandlerException.class);
62 static String example = "xyz";
63 static byte[] CELL_BYTES = example.getBytes();
64 static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
65
66 private final static Configuration CONF = HBaseConfiguration.create();
67 RpcExecutor rpcExecutor = Mockito.mock(RpcExecutor.class);
68
69
70
71
72
73 private static final BlockingService SERVICE =
74 TestRpcServiceProtos.TestProtobufRpcProto
75 .newReflectiveBlockingService(new TestRpcServiceProtos
76 .TestProtobufRpcProto.BlockingInterface() {
77
78 @Override
79 public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
80 throws ServiceException {
81 return null;
82 }
83
84 @Override
85 public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
86 throws ServiceException {
87 return null;
88 }
89
90 @Override
91 public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
92 throws Error, RuntimeException {
93 if (controller instanceof PayloadCarryingRpcController) {
94 PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
95
96
97
98
99
100 CellScanner cellScanner = pcrc.cellScanner();
101 List<Cell> list = null;
102 if (cellScanner != null) {
103 list = new ArrayList<Cell>();
104 try {
105 while (cellScanner.advance()) {
106 list.add(cellScanner.current());
107 throw new StackOverflowError();
108 }
109 } catch (StackOverflowError e) {
110 throw e;
111 } catch (IOException e) {
112 throw new RuntimeException(e);
113 }
114 }
115 cellScanner = CellUtil.createCellScanner(list);
116 ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
117 }
118 return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
119 }
120 });
121
122
123
124
125
126 private static class TestRpcServer extends RpcServer {
127
128 TestRpcServer() throws IOException {
129 this(new FifoRpcScheduler(CONF, 1));
130 }
131
132 TestRpcServer(RpcScheduler scheduler) throws IOException {
133 super(null, "testRpcServer",
134 Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
135 new InetSocketAddress("localhost", 0), CONF, scheduler);
136 }
137
138 @Override
139 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
140 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
141 throws IOException {
142 return super.call(service, md, param, cellScanner, receiveTime, status);
143 }
144 }
145
146
147
148
149
150
151 private class AbortServer implements Abortable {
152 private boolean aborted = false;
153
154 @Override
155 public void abort(String why, Throwable e) {
156 aborted = true;
157 }
158
159 @Override
160 public boolean isAborted() {
161 return aborted;
162 }
163 }
164
165
166
167
168 @Ignore
169 @Test
170 public void testRpcScheduler() throws IOException, InterruptedException {
171 PriorityFunction qosFunction = mock(PriorityFunction.class);
172 Abortable abortable = new AbortServer();
173 RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable, 0);
174 RpcServer rpcServer = new TestRpcServer(scheduler);
175 RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT);
176 try {
177 rpcServer.start();
178 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
179 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
180 PayloadCarryingRpcController controller =
181 new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
182 InetSocketAddress address = rpcServer.getListenerAddress();
183 if (address == null) {
184 throw new IOException("Listener channel is closed");
185 }
186 client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(),
187 address, new MetricsConnection.CallStats());
188 } catch (Throwable e) {
189 assert(abortable.isAborted() == true);
190 } finally {
191 rpcServer.stop();
192 }
193 }
194
195 }