View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.coprocessor;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.util.Collections;
25  import java.util.Map;
26  import java.util.TreeMap;
27  
28  import org.apache.hadoop.hbase.util.ByteStringer;
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.HBaseTestingUtility;
33  import org.apache.hadoop.hbase.HColumnDescriptor;
34  import org.apache.hadoop.hbase.HTableDescriptor;
35  import org.apache.hadoop.hbase.MediumTests;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.client.HBaseAdmin;
38  import org.apache.hadoop.hbase.client.HTable;
39  import org.apache.hadoop.hbase.client.Put;
40  import org.apache.hadoop.hbase.client.coprocessor.Batch;
41  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
42  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
43  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos;
44  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos;
45  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.junit.AfterClass;
48  import org.junit.BeforeClass;
49  import org.junit.Test;
50  import org.junit.experimental.categories.Category;
51  
52  import com.google.protobuf.ServiceException;
53  
54  /**
55   * TestEndpoint: test cases to verify the batch execution of coprocessor Endpoint
56   */
57  @Category(MediumTests.class)
58  public class TestBatchCoprocessorEndpoint {
59    private static final Log LOG = LogFactory.getLog(TestBatchCoprocessorEndpoint.class);
60  
61    private static final TableName TEST_TABLE =
62        TableName.valueOf("TestTable");
63    private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
64    private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
65    private static byte[] ROW = Bytes.toBytes("testRow");
66  
67    private static final int ROWSIZE = 20;
68    private static final int rowSeperator1 = 5;
69    private static final int rowSeperator2 = 12;
70    private static byte[][] ROWS = makeN(ROW, ROWSIZE);
71  
72    private static HBaseTestingUtility util = new HBaseTestingUtility();
73  
74    @BeforeClass
75    public static void setupBeforeClass() throws Exception {
76      // set configure to indicate which cp should be loaded
77      Configuration conf = util.getConfiguration();
78      conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
79          org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
80          ProtobufCoprocessorService.class.getName(),
81          ColumnAggregationEndpointWithErrors.class.getName(),
82          ColumnAggregationEndpointNullResponse.class.getName());
83      conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
84          ProtobufCoprocessorService.class.getName());
85      util.startMiniCluster(2);
86      HBaseAdmin admin = new HBaseAdmin(conf);
87      HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
88      desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
89      admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
90      util.waitUntilAllRegionsAssigned(TEST_TABLE);
91      admin.close();
92  
93      HTable table = new HTable(conf, TEST_TABLE);
94      for (int i = 0; i < ROWSIZE; i++) {
95        Put put = new Put(ROWS[i]);
96        put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
97        table.put(put);
98      }
99      table.close();
100   }
101 
102   @AfterClass
103   public static void tearDownAfterClass() throws Exception {
104     util.shutdownMiniCluster();
105   }
106 
107   @Test
108   public void testAggregationNullResponse() throws Throwable {
109     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
110     ColumnAggregationWithNullResponseProtos.SumRequest.Builder builder =
111         ColumnAggregationWithNullResponseProtos.SumRequest
112         .newBuilder();
113     builder.setFamily(ByteStringer.wrap(TEST_FAMILY));
114     if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) {
115       builder.setQualifier(ByteStringer.wrap(TEST_QUALIFIER));
116     }
117     Map<byte[], ColumnAggregationWithNullResponseProtos.SumResponse> results =
118         table.batchCoprocessorService(
119             ColumnAggregationServiceNullResponse.getDescriptor().findMethodByName("sum"),
120             builder.build(), ROWS[0], ROWS[ROWS.length - 1],
121             ColumnAggregationWithNullResponseProtos.SumResponse.getDefaultInstance());
122 
123     int sumResult = 0;
124     int expectedResult = 0;
125     for (Map.Entry<byte[], ColumnAggregationWithNullResponseProtos.SumResponse> e :
126         results.entrySet()) {
127       LOG.info("Got value " + e.getValue().getSum() + " for region "
128           + Bytes.toStringBinary(e.getKey()));
129       sumResult += e.getValue().getSum();
130     }
131     for (int i = 0; i < rowSeperator2; i++) {
132       expectedResult += i;
133     }
134     assertEquals("Invalid result", expectedResult, sumResult);
135     table.close();
136   }
137 
138   private static byte[][] makeN(byte[] base, int n) {
139     byte[][] ret = new byte[n][];
140     for (int i = 0; i < n; i++) {
141       ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
142     }
143     return ret;
144   }
145 
146   private Map<byte[], SumResponse> sum(final HTable table, final byte[] family,
147       final byte[] qualifier, final byte[] start, final byte[] end) throws ServiceException,
148       Throwable {
149     ColumnAggregationProtos.SumRequest.Builder builder = ColumnAggregationProtos.SumRequest
150         .newBuilder();
151     builder.setFamily(ByteStringer.wrap(family));
152     if (qualifier != null && qualifier.length > 0) {
153       builder.setQualifier(ByteStringer.wrap(qualifier));
154     }
155     return table.batchCoprocessorService(
156         ColumnAggregationProtos.ColumnAggregationService.getDescriptor().findMethodByName("sum"),
157         builder.build(), start, end, ColumnAggregationProtos.SumResponse.getDefaultInstance());
158   }
159 
160   @Test
161   public void testAggregationWithReturnValue() throws Throwable {
162     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
163     Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0],
164         ROWS[ROWS.length - 1]);
165     int sumResult = 0;
166     int expectedResult = 0;
167     for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
168       LOG.info("Got value " + e.getValue().getSum() + " for region "
169           + Bytes.toStringBinary(e.getKey()));
170       sumResult += e.getValue().getSum();
171     }
172     for (int i = 0; i < ROWSIZE; i++) {
173       expectedResult += i;
174     }
175     assertEquals("Invalid result", expectedResult, sumResult);
176 
177     results.clear();
178 
179     // scan: for region 2 and region 3
180     results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1],
181         ROWS[ROWS.length - 1]);
182     sumResult = 0;
183     expectedResult = 0;
184     for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
185       LOG.info("Got value " + e.getValue().getSum() + " for region "
186           + Bytes.toStringBinary(e.getKey()));
187       sumResult += e.getValue().getSum();
188     }
189     for (int i = rowSeperator1; i < ROWSIZE; i++) {
190       expectedResult += i;
191     }
192     assertEquals("Invalid result", expectedResult, sumResult);
193     table.close();
194   }
195 
196   @Test
197   public void testAggregation() throws Throwable {
198     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
199     Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
200         ROWS[0], ROWS[ROWS.length - 1]);
201     int sumResult = 0;
202     int expectedResult = 0;
203     for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
204       LOG.info("Got value " + e.getValue().getSum() + " for region "
205           + Bytes.toStringBinary(e.getKey()));
206       sumResult += e.getValue().getSum();
207     }
208     for (int i = 0; i < ROWSIZE; i++) {
209       expectedResult += i;
210     }
211     assertEquals("Invalid result", expectedResult, sumResult);
212 
213     // scan: for region 2 and region 3
214     results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1], ROWS[ROWS.length - 1]);
215     sumResult = 0;
216     expectedResult = 0;
217     for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
218       LOG.info("Got value " + e.getValue().getSum() + " for region "
219           + Bytes.toStringBinary(e.getKey()));
220       sumResult += e.getValue().getSum();
221     }
222     for (int i = rowSeperator1; i < ROWSIZE; i++) {
223       expectedResult += i;
224     }
225     assertEquals("Invalid result", expectedResult, sumResult);
226     table.close();
227   }
228 
229   @Test
230   public void testAggregationWithErrors() throws Throwable {
231     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
232     final Map<byte[], ColumnAggregationWithErrorsProtos.SumResponse> results =
233         Collections.synchronizedMap(
234             new TreeMap<byte[], ColumnAggregationWithErrorsProtos.SumResponse>(
235                 Bytes.BYTES_COMPARATOR
236             ));
237     ColumnAggregationWithErrorsProtos.SumRequest.Builder builder =
238         ColumnAggregationWithErrorsProtos.SumRequest
239         .newBuilder();
240     builder.setFamily(ByteStringer.wrap(TEST_FAMILY));
241     if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) {
242       builder.setQualifier(ByteStringer.wrap(TEST_QUALIFIER));
243     }
244 
245     boolean hasError = false;
246     try {
247       table.batchCoprocessorService(
248           ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors.getDescriptor()
249               .findMethodByName("sum"),
250           builder.build(), ROWS[0], ROWS[ROWS.length - 1],
251           ColumnAggregationWithErrorsProtos.SumResponse.getDefaultInstance(),
252           new Batch.Callback<ColumnAggregationWithErrorsProtos.SumResponse>() {
253 
254             @Override
255             public void update(byte[] region, byte[] row,
256                 ColumnAggregationWithErrorsProtos.SumResponse result) {
257               results.put(region, result);
258             }
259           });
260     } catch (Throwable t) {
261       LOG.info("Exceptions in coprocessor service", t);
262       hasError = true;
263     }
264 
265     int sumResult = 0;
266     int expectedResult = 0;
267     for (Map.Entry<byte[], ColumnAggregationWithErrorsProtos.SumResponse> e : results.entrySet()) {
268       LOG.info("Got value " + e.getValue().getSum() + " for region "
269           + Bytes.toStringBinary(e.getKey()));
270       sumResult += e.getValue().getSum();
271     }
272     for (int i = 0; i < rowSeperator2; i++) {
273       expectedResult += i;
274     }
275     assertEquals("Invalid result", expectedResult, sumResult);
276     assertTrue(hasError);
277     table.close();
278   }
279 }