View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import java.io.IOException;
21  import java.net.InetSocketAddress;
22  
23  import com.google.common.collect.Lists;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.hbase.HBaseConfiguration;
26  import org.apache.hadoop.hbase.HConstants;
27  import org.apache.hadoop.hbase.MediumTests;
28  import org.apache.hadoop.hbase.ServerName;
29  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
30  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
31  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
32  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
33  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
34  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
35  import org.apache.hadoop.hbase.security.User;
36  import org.apache.log4j.Level;
37  import org.apache.log4j.Logger;
38  import org.junit.Assert;
39  import org.junit.Test;
40  import org.junit.Before;
41  import org.junit.After;
42  import org.junit.experimental.categories.Category;
43  
44  import com.google.protobuf.BlockingRpcChannel;
45  import com.google.protobuf.BlockingService;
46  import com.google.protobuf.RpcController;
47  import com.google.protobuf.ServiceException;
48  
49  /**
50   * Test for testing protocol buffer based RPC mechanism.
51   * This test depends on test.proto definition of types in <code>src/test/protobuf/test.proto</code>
52   * and protobuf service definition from <code>src/test/protobuf/test_rpc_service.proto</code>
53   */
54  @Category(MediumTests.class)
55  public class TestProtoBufRpc {
56    public final static String ADDRESS = "0.0.0.0";
57    public static int PORT = 0;
58    private InetSocketAddress isa;
59    private Configuration conf;
60    private RpcServerInterface server;
61  
62    /**
63     * Implementation of the test service defined out in TestRpcServiceProtos
64     */
65    static class PBServerImpl
66    implements TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface {
67      @Override
68      public EmptyResponseProto ping(RpcController unused,
69          EmptyRequestProto request) throws ServiceException {
70        return EmptyResponseProto.newBuilder().build();
71      }
72  
73      @Override
74      public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
75          throws ServiceException {
76        return EchoResponseProto.newBuilder().setMessage(request.getMessage())
77            .build();
78      }
79  
80      @Override
81      public EmptyResponseProto error(RpcController unused,
82          EmptyRequestProto request) throws ServiceException {
83        throw new ServiceException("error", new IOException("error"));
84      }
85    }
86  
87    @Before
88    public  void setUp() throws IOException { // Setup server for both protocols
89      this.conf = HBaseConfiguration.create();
90      Logger log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer");
91      log.setLevel(Level.DEBUG);
92      log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer.trace");
93      log.setLevel(Level.TRACE);
94      // Create server side implementation
95      PBServerImpl serverImpl = new PBServerImpl();
96      BlockingService service =
97        TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(serverImpl);
98      // Get RPC server for server side implementation
99      this.server = new RpcServer(null, "testrpc",
100         Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
101         new InetSocketAddress(ADDRESS, PORT), conf,
102         new FifoRpcScheduler(conf, 10));
103     this.isa = server.getListenerAddress();
104     this.server.start();
105   }
106 
107   @After
108   public void tearDown() throws Exception {
109     server.stop();
110   }
111 
112   @Test
113   public void testProtoBufRpc() throws Exception {
114     RpcClient rpcClient = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT);
115     try {
116       BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
117           ServerName.valueOf(this.isa.getHostName(), this.isa.getPort(), System.currentTimeMillis()),
118         User.getCurrent(), 0);
119       TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
120         TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
121       // Test ping method
122       TestProtos.EmptyRequestProto emptyRequest =
123         TestProtos.EmptyRequestProto.newBuilder().build();
124       stub.ping(null, emptyRequest);
125 
126       // Test echo method
127       EchoRequestProto echoRequest = EchoRequestProto.newBuilder().setMessage("hello").build();
128       EchoResponseProto echoResponse = stub.echo(null, echoRequest);
129       Assert.assertEquals(echoResponse.getMessage(), "hello");
130 
131       // Test error method - error should be thrown as RemoteException
132       try {
133         stub.error(null, emptyRequest);
134         Assert.fail("Expected exception is not thrown");
135       } catch (ServiceException e) {
136       }
137     } finally {
138       rpcClient.stop();
139     }
140   }
141 }