1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import java.io.IOException;
21 import java.net.InetSocketAddress;
22
23 import com.google.common.collect.Lists;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.HBaseConfiguration;
26 import org.apache.hadoop.hbase.HConstants;
27 import org.apache.hadoop.hbase.MediumTests;
28 import org.apache.hadoop.hbase.ServerName;
29 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
30 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
31 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
32 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
33 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
34 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
35 import org.apache.hadoop.hbase.security.User;
36 import org.apache.log4j.Level;
37 import org.apache.log4j.Logger;
38 import org.junit.Assert;
39 import org.junit.Test;
40 import org.junit.Before;
41 import org.junit.After;
42 import org.junit.experimental.categories.Category;
43
44 import com.google.protobuf.BlockingRpcChannel;
45 import com.google.protobuf.BlockingService;
46 import com.google.protobuf.RpcController;
47 import com.google.protobuf.ServiceException;
48
49
50
51
52
53
54 @Category(MediumTests.class)
55 public class TestProtoBufRpc {
56 public final static String ADDRESS = "0.0.0.0";
57 public static int PORT = 0;
58 private InetSocketAddress isa;
59 private Configuration conf;
60 private RpcServerInterface server;
61
62
63
64
65 static class PBServerImpl
66 implements TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface {
67 @Override
68 public EmptyResponseProto ping(RpcController unused,
69 EmptyRequestProto request) throws ServiceException {
70 return EmptyResponseProto.newBuilder().build();
71 }
72
73 @Override
74 public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
75 throws ServiceException {
76 return EchoResponseProto.newBuilder().setMessage(request.getMessage())
77 .build();
78 }
79
80 @Override
81 public EmptyResponseProto error(RpcController unused,
82 EmptyRequestProto request) throws ServiceException {
83 throw new ServiceException("error", new IOException("error"));
84 }
85 }
86
87 @Before
88 public void setUp() throws IOException {
89 this.conf = HBaseConfiguration.create();
90 Logger log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer");
91 log.setLevel(Level.DEBUG);
92 log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer.trace");
93 log.setLevel(Level.TRACE);
94
95 PBServerImpl serverImpl = new PBServerImpl();
96 BlockingService service =
97 TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(serverImpl);
98
99 this.server = new RpcServer(null, "testrpc",
100 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
101 new InetSocketAddress(ADDRESS, PORT), conf,
102 new FifoRpcScheduler(conf, 10));
103 this.isa = server.getListenerAddress();
104 this.server.start();
105 }
106
107 @After
108 public void tearDown() throws Exception {
109 server.stop();
110 }
111
112 @Test
113 public void testProtoBufRpc() throws Exception {
114 RpcClient rpcClient = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT);
115 try {
116 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
117 ServerName.valueOf(this.isa.getHostName(), this.isa.getPort(), System.currentTimeMillis()),
118 User.getCurrent(), 0);
119 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
120 TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
121
122 TestProtos.EmptyRequestProto emptyRequest =
123 TestProtos.EmptyRequestProto.newBuilder().build();
124 stub.ping(null, emptyRequest);
125
126
127 EchoRequestProto echoRequest = EchoRequestProto.newBuilder().setMessage("hello").build();
128 EchoResponseProto echoResponse = stub.echo(null, echoRequest);
129 Assert.assertEquals(echoResponse.getMessage(), "hello");
130
131
132 try {
133 stub.error(null, emptyRequest);
134 Assert.fail("Expected exception is not thrown");
135 } catch (ServiceException e) {
136 }
137 } finally {
138 rpcClient.stop();
139 }
140 }
141 }