View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.mockito.Matchers.anyInt;
22  import static org.mockito.Mockito.doThrow;
23  import static org.mockito.Mockito.spy;
24  
25  import java.io.IOException;
26  import java.net.InetSocketAddress;
27  import java.net.Socket;
28  import java.util.ArrayList;
29  import java.util.List;
30  
31  import javax.net.SocketFactory;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.CellScannable;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.HBaseConfiguration;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.HRegionInfo;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.KeyValueUtil;
43  import org.apache.hadoop.hbase.ServerName;
44  import org.apache.hadoop.hbase.Waiter;
45  import org.apache.hadoop.hbase.testclassification.SmallTests;
46  import org.apache.hadoop.hbase.client.MetricsConnection;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.client.RowMutations;
49  import org.apache.hadoop.hbase.codec.Codec;
50  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
51  import org.apache.hadoop.hbase.protobuf.RequestConverter;
52  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
53  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
54  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
55  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
56  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
57  import org.apache.hadoop.hbase.security.User;
58  import org.apache.hadoop.hbase.testclassification.SmallTests;
59  import org.apache.hadoop.net.NetUtils;
60  import org.junit.experimental.categories.Category;
61  import org.mockito.Mockito;
62  import org.mockito.invocation.InvocationOnMock;
63  import org.mockito.stubbing.Answer;
64  
65  import com.google.protobuf.ByteString;
66  import com.google.protobuf.Descriptors.MethodDescriptor;
67  
68  @Category({ SmallTests.class })
69  public class TestIPC extends AbstractTestIPC {
70  
71    private static final Log LOG = LogFactory.getLog(TestIPC.class);
72  
73    @Override
74    protected RpcClientImpl createRpcClientNoCodec(Configuration conf) {
75      return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) {
76        @Override
77        Codec getCodec() {
78          return null;
79        }
80      };
81    }
82  
83    @Override
84    protected RpcClientImpl createRpcClient(Configuration conf) {
85      return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
86    }
87  
88    @Override
89    protected RpcClientImpl createRpcClientRTEDuringConnectionSetup(Configuration conf)
90        throws IOException {
91      SocketFactory spyFactory = spy(NetUtils.getDefaultSocketFactory(conf));
92      Mockito.doAnswer(new Answer<Socket>() {
93        @Override
94        public Socket answer(InvocationOnMock invocation) throws Throwable {
95          Socket s = spy((Socket) invocation.callRealMethod());
96          doThrow(new RuntimeException("Injected fault")).when(s).setSoTimeout(anyInt());
97          return s;
98        }
99      }).when(spyFactory).createSocket();
100 
101     return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory);
102   }
103 
104   public static void main(String[] args) throws IOException, SecurityException,
105       NoSuchMethodException, InterruptedException {
106     if (args.length != 2) {
107       System.out.println("Usage: TestIPC <CYCLES> <CELLS_PER_CYCLE>");
108       return;
109     }
110     // ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO);
111     // ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO);
112     int cycles = Integer.parseInt(args[0]);
113     int cellcount = Integer.parseInt(args[1]);
114     Configuration conf = HBaseConfiguration.create();
115     TestRpcServer rpcServer = new TestRpcServer();
116     MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
117     EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
118     RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
119     KeyValue kv = BIG_CELL;
120     Put p = new Put(CellUtil.cloneRow(kv));
121     for (int i = 0; i < cellcount; i++) {
122       p.add(kv);
123     }
124     RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
125     rm.add(p);
126     try {
127       rpcServer.start();
128       long startTime = System.currentTimeMillis();
129       User user = User.getCurrent();
130       InetSocketAddress address = rpcServer.getListenerAddress();
131       if (address == null) {
132         throw new IOException("Listener channel is closed");
133       }
134       for (int i = 0; i < cycles; i++) {
135         List<CellScannable> cells = new ArrayList<CellScannable>();
136         // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
137         ClientProtos.RegionAction.Builder builder =
138             RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
139               RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
140               MutationProto.newBuilder());
141         builder.setRegion(RegionSpecifier
142             .newBuilder()
143             .setType(RegionSpecifierType.REGION_NAME)
144             .setValue(
145               ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
146         if (i % 100000 == 0) {
147           LOG.info("" + i);
148           // Uncomment this for a thread dump every so often.
149           // ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
150           // "Thread dump " + Thread.currentThread().getName());
151         }
152         PayloadCarryingRpcController pcrc =
153             new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
154         // Pair<Message, CellScanner> response =
155         client.call(pcrc, md, builder.build(), param, user, address,
156             new MetricsConnection.CallStats());
157         /*
158          * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(),
159          * count);
160          */
161       }
162       LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in "
163           + (System.currentTimeMillis() - startTime) + "ms");
164     } finally {
165       client.close();
166       rpcServer.stop();
167     }
168   }
169 
170 }