View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.coprocessor;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNotNull;
23  import static org.junit.Assert.assertNull;
24  import static org.junit.Assert.assertTrue;
25  import static org.junit.Assert.fail;
26  
27  import java.io.IOException;
28  import java.util.Collections;
29  import java.util.Map;
30  import java.util.NavigableMap;
31  import java.util.TreeMap;
32  
33  import org.apache.hadoop.hbase.util.ByteStringer;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.HBaseTestingUtility;
38  import org.apache.hadoop.hbase.HColumnDescriptor;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.HRegionInfo;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.MediumTests;
43  import org.apache.hadoop.hbase.ServerName;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.client.HBaseAdmin;
46  import org.apache.hadoop.hbase.client.HTable;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.client.coprocessor.Batch;
49  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
50  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
51  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
52  import org.apache.hadoop.hbase.ipc.ServerRpcController;
53  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
54  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
55  import org.apache.hadoop.hbase.util.Bytes;
56  import org.junit.AfterClass;
57  import org.junit.BeforeClass;
58  import org.junit.Test;
59  import org.junit.experimental.categories.Category;
60  
61  import com.google.protobuf.RpcController;
62  import com.google.protobuf.ServiceException;
63  
64  /**
65   * TestEndpoint: test cases to verify coprocessor Endpoint
66   */
67  @Category(MediumTests.class)
68  public class TestCoprocessorEndpoint {
69    private static final Log LOG = LogFactory.getLog(TestCoprocessorEndpoint.class);
70  
71    private static final TableName TEST_TABLE =
72        TableName.valueOf("TestTable");
73    private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
74    private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
75    private static byte[] ROW = Bytes.toBytes("testRow");
76  
77    private static final int ROWSIZE = 20;
78    private static final int rowSeperator1 = 5;
79    private static final int rowSeperator2 = 12;
80    private static byte[][] ROWS = makeN(ROW, ROWSIZE);
81  
82    private static HBaseTestingUtility util = new HBaseTestingUtility();
83  
84    @BeforeClass
85    public static void setupBeforeClass() throws Exception {
86      // set configure to indicate which cp should be loaded
87      Configuration conf = util.getConfiguration();
88      conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
89          org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
90          ProtobufCoprocessorService.class.getName());
91      conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
92          ProtobufCoprocessorService.class.getName());
93      util.startMiniCluster(2);
94      HBaseAdmin admin = new HBaseAdmin(conf);
95      HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
96      desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
97      admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
98      util.waitUntilAllRegionsAssigned(TEST_TABLE);
99      admin.close();
100 
101     HTable table = new HTable(conf, TEST_TABLE);
102     for (int i = 0; i < ROWSIZE; i++) {
103       Put put = new Put(ROWS[i]);
104       put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
105       table.put(put);
106     }
107     table.close();
108   }
109 
110   @AfterClass
111   public static void tearDownAfterClass() throws Exception {
112     util.shutdownMiniCluster();
113   }
114 
115   private Map<byte [], Long> sum(final HTable table, final byte [] family,
116       final byte [] qualifier, final byte [] start, final byte [] end)
117   throws ServiceException, Throwable {
118     return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
119         start, end,
120       new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
121         @Override
122         public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
123         throws IOException {
124           BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
125               new BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
126           ColumnAggregationProtos.SumRequest.Builder builder =
127             ColumnAggregationProtos.SumRequest.newBuilder();
128           builder.setFamily(ByteStringer.wrap(family));
129           if (qualifier != null && qualifier.length > 0) {
130             builder.setQualifier(ByteStringer.wrap(qualifier));
131           }
132           instance.sum(null, builder.build(), rpcCallback);
133           return rpcCallback.get().getSum();
134         }
135       });
136   }
137 
138   @Test
139   public void testAggregation() throws Throwable {
140     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
141     Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
142       ROWS[0], ROWS[ROWS.length-1]);
143     int sumResult = 0;
144     int expectedResult = 0;
145     for (Map.Entry<byte[], Long> e : results.entrySet()) {
146       LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
147       sumResult += e.getValue();
148     }
149     for (int i = 0; i < ROWSIZE; i++) {
150       expectedResult += i;
151     }
152     assertEquals("Invalid result", expectedResult, sumResult);
153 
154     results.clear();
155 
156     // scan: for region 2 and region 3
157     results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
158       ROWS[rowSeperator1], ROWS[ROWS.length-1]);
159     sumResult = 0;
160     expectedResult = 0;
161     for (Map.Entry<byte[], Long> e : results.entrySet()) {
162       LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
163       sumResult += e.getValue();
164     }
165     for (int i = rowSeperator1; i < ROWSIZE; i++) {
166       expectedResult += i;
167     }
168     assertEquals("Invalid result", expectedResult, sumResult);
169     table.close();
170   }
171 
172   @Test
173   public void testCoprocessorService() throws Throwable {
174     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
175     NavigableMap<HRegionInfo,ServerName> regions = table.getRegionLocations();
176 
177     final TestProtos.EchoRequestProto request =
178         TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
179     final Map<byte[], String> results = Collections.synchronizedMap(
180         new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR));
181     try {
182       // scan: for all regions
183       final RpcController controller = new ServerRpcController();
184       table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
185           ROWS[0], ROWS[ROWS.length - 1],
186           new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
187             public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
188                 throws IOException {
189               LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
190               BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
191               instance.echo(controller, request, callback);
192               TestProtos.EchoResponseProto response = callback.get();
193               LOG.debug("Batch.Call returning result " + response);
194               return response;
195             }
196           },
197           new Batch.Callback<TestProtos.EchoResponseProto>() {
198             public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
199               assertNotNull(result);
200               assertEquals("hello", result.getMessage());
201               results.put(region, result.getMessage());
202             }
203           }
204       );
205       for (Map.Entry<byte[], String> e : results.entrySet()) {
206         LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
207       }
208       assertEquals(3, results.size());
209       for (HRegionInfo info : regions.navigableKeySet()) {
210         LOG.info("Region info is "+info.getRegionNameAsString());
211         assertTrue(results.containsKey(info.getRegionName()));
212       }
213       results.clear();
214 
215       // scan: for region 2 and region 3
216       table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
217           ROWS[rowSeperator1], ROWS[ROWS.length - 1],
218           new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
219             public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
220                 throws IOException {
221               LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
222               BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
223               instance.echo(controller, request, callback);
224               TestProtos.EchoResponseProto response = callback.get();
225               LOG.debug("Batch.Call returning result " + response);
226               return response;
227             }
228           },
229           new Batch.Callback<TestProtos.EchoResponseProto>() {
230             public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
231               assertNotNull(result);
232               assertEquals("hello", result.getMessage());
233               results.put(region, result.getMessage());
234             }
235           }
236       );
237       for (Map.Entry<byte[], String> e : results.entrySet()) {
238         LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
239       }
240       assertEquals(2, results.size());
241     } finally {
242       table.close();
243     }
244   }
245 
246   @Test
247   public void testCoprocessorServiceNullResponse() throws Throwable {
248     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
249     NavigableMap<HRegionInfo,ServerName> regions = table.getRegionLocations();
250 
251     final TestProtos.EchoRequestProto request =
252         TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
253     try {
254       // scan: for all regions
255       final RpcController controller = new ServerRpcController();
256       // test that null results are supported
257       Map<byte[], String> results = table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
258           ROWS[0], ROWS[ROWS.length - 1],
259           new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>() {
260             public String call(TestRpcServiceProtos.TestProtobufRpcProto instance)
261                 throws IOException {
262               BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
263               instance.echo(controller, request, callback);
264               TestProtos.EchoResponseProto response = callback.get();
265               LOG.debug("Batch.Call got result " + response);
266               return null;
267             }
268           }
269       );
270       for (Map.Entry<byte[], String> e : results.entrySet()) {
271         LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
272       }
273       assertEquals(3, results.size());
274       for (HRegionInfo info : regions.navigableKeySet()) {
275         LOG.info("Region info is "+info.getRegionNameAsString());
276         assertTrue(results.containsKey(info.getRegionName()));
277         assertNull(results.get(info.getRegionName()));
278       }
279     } finally {
280       table.close();
281     }
282   }
283 
284   @Test
285   public void testMasterCoprocessorService() throws Throwable {
286     HBaseAdmin admin = util.getHBaseAdmin();
287     final TestProtos.EchoRequestProto request =
288         TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
289     TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
290         TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
291     assertEquals("hello", service.echo(null, request).getMessage());
292   }
293 
294   @Test
295   public void testCoprocessorError() throws Exception {
296     Configuration configuration = new Configuration(util.getConfiguration());
297     // Make it not retry forever
298     configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
299     HTable table = new HTable(configuration, TEST_TABLE);
300 
301     try {
302       CoprocessorRpcChannel protocol = table.coprocessorService(ROWS[0]);
303 
304       TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
305           TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(protocol);
306 
307       service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
308       fail("Should have thrown an exception");
309     } catch (ServiceException e) {
310     } finally {
311       table.close();
312     }
313   }
314 
315   @Test
316   public void testMasterCoprocessorError() throws Throwable {
317     HBaseAdmin admin = util.getHBaseAdmin();
318     TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
319         TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
320     try {
321       service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
322       fail("Should have thrown an exception");
323     } catch (ServiceException e) {
324     }
325   }
326 
327   private static byte[][] makeN(byte[] base, int n) {
328     byte[][] ret = new byte[n][];
329     for (int i = 0; i < n; i++) {
330       ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
331     }
332     return ret;
333   }
334 
335 }
336