View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNull;
23  import static org.junit.Assert.assertTrue;
24  
25  import java.io.IOException;
26  import java.util.Map;
27  import java.util.Map.Entry;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.Coprocessor;
32  import org.apache.hadoop.hbase.CoprocessorEnvironment;
33  import org.apache.hadoop.hbase.HBaseTestingUtility;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.HRegionInfo;
36  import org.apache.hadoop.hbase.HRegionLocation;
37  import org.apache.hadoop.hbase.MediumTests;
38  import org.apache.hadoop.hbase.ServerName;
39  import org.apache.hadoop.hbase.client.HTable;
40  import org.apache.hadoop.hbase.client.Put;
41  import org.apache.hadoop.hbase.client.coprocessor.Batch;
42  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
43  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
44  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
45  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
46  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos;
47  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.CountRequest;
48  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.CountResponse;
49  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.HelloRequest;
50  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.HelloResponse;
51  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.IncrementCountRequest;
52  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.IncrementCountResponse;
53  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.NoopRequest;
54  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.NoopResponse;
55  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.PingRequest;
56  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.PingResponse;
57  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.junit.After;
60  import org.junit.AfterClass;
61  import org.junit.Before;
62  import org.junit.BeforeClass;
63  import org.junit.Test;
64  import org.junit.experimental.categories.Category;
65  
66  import com.google.protobuf.RpcCallback;
67  import com.google.protobuf.RpcController;
68  import com.google.protobuf.Service;
69  import com.google.protobuf.ServiceException;
70  
71  @Category(MediumTests.class)
72  public class TestServerCustomProtocol {
73    private static final Log LOG = LogFactory.getLog(TestServerCustomProtocol.class);
74    static final String WHOAREYOU = "Who are you?";
75    static final String NOBODY = "nobody";
76    static final String HELLO = "Hello, ";
77  
78    /* Test protocol implementation */
79    public static class PingHandler extends PingProtos.PingService
80    implements Coprocessor, CoprocessorService {
81      private int counter = 0;
82  
83      @Override
84      public void start(CoprocessorEnvironment env) throws IOException {
85        if (env instanceof RegionCoprocessorEnvironment) return;
86        throw new CoprocessorException("Must be loaded on a table region!");
87      }
88  
89      @Override
90      public void stop(CoprocessorEnvironment env) throws IOException {
91        // Nothing to do.
92      }
93  
94      @Override
95      public void ping(RpcController controller, PingRequest request,
96          RpcCallback<PingResponse> done) {
97        this.counter++;
98        done.run(PingResponse.newBuilder().setPong("pong").build());
99      }
100 
101     @Override
102     public void count(RpcController controller, CountRequest request,
103         RpcCallback<CountResponse> done) {
104       done.run(CountResponse.newBuilder().setCount(this.counter).build());
105     }
106 
107     @Override
108     public void increment(RpcController controller,
109         IncrementCountRequest request, RpcCallback<IncrementCountResponse> done) {
110       this.counter += request.getDiff();
111       done.run(IncrementCountResponse.newBuilder().setCount(this.counter).build());
112     }
113 
114     @Override
115     public void hello(RpcController controller, HelloRequest request,
116         RpcCallback<HelloResponse> done) {
117       if (!request.hasName()) done.run(HelloResponse.newBuilder().setResponse(WHOAREYOU).build());
118       else if (request.getName().equals(NOBODY)) done.run(HelloResponse.newBuilder().build());
119       else done.run(HelloResponse.newBuilder().setResponse(HELLO + request.getName()).build());
120     }
121 
122     @Override
123     public void noop(RpcController controller, NoopRequest request,
124         RpcCallback<NoopResponse> done) {
125       done.run(NoopResponse.newBuilder().build());
126     }
127 
128     @Override
129     public Service getService() {
130       return this;
131     }
132   }
133 
134   private static final byte[] TEST_TABLE = Bytes.toBytes("test");
135   private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
136 
137   private static final byte[] ROW_A = Bytes.toBytes("aaa");
138   private static final byte[] ROW_B = Bytes.toBytes("bbb");
139   private static final byte[] ROW_C = Bytes.toBytes("ccc");
140 
141   private static final byte[] ROW_AB = Bytes.toBytes("abb");
142   private static final byte[] ROW_BC = Bytes.toBytes("bcc");
143 
144   private static HBaseTestingUtility util = new HBaseTestingUtility();
145 
146   @BeforeClass
147   public static void setupBeforeClass() throws Exception {
148     util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
149       PingHandler.class.getName());
150     util.startMiniCluster();
151   }
152 
153   @Before
154   public void before()  throws Exception {
155     HTable table = util.createTable(TEST_TABLE, TEST_FAMILY);
156     util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY,
157       new byte[][]{ HConstants.EMPTY_BYTE_ARRAY, ROW_B, ROW_C});
158 
159     Put puta = new Put( ROW_A );
160     puta.add(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
161     table.put(puta);
162 
163     Put putb = new Put( ROW_B );
164     putb.add(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
165     table.put(putb);
166 
167     Put putc = new Put( ROW_C );
168     putc.add(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
169     table.put(putc);
170   }
171 
172   @After
173   public void after() throws Exception {
174     util.deleteTable(TEST_TABLE);
175   }
176 
177   @AfterClass
178   public static void tearDownAfterClass() throws Exception {
179     util.shutdownMiniCluster();
180   }
181 
182   @Test
183   public void testSingleProxy() throws Throwable {
184     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
185     Map<byte [], String> results = ping(table, null, null);
186     // There are three regions so should get back three results.
187     assertEquals(3, results.size());
188     for (Map.Entry<byte [], String> e: results.entrySet()) {
189       assertEquals("Invalid custom protocol response", "pong", e.getValue());
190     }
191     hello(table, "George", HELLO + "George");
192     LOG.info("Did george");
193     hello(table, null, "Who are you?");
194     LOG.info("Who are you");
195     hello(table, NOBODY, null);
196     LOG.info(NOBODY);
197     Map<byte [], Integer> intResults = table.coprocessorService(PingProtos.PingService.class,
198       null, null,
199       new Batch.Call<PingProtos.PingService, Integer>() {
200         @Override
201         public Integer call(PingProtos.PingService instance) throws IOException {
202           BlockingRpcCallback<PingProtos.CountResponse> rpcCallback =
203             new BlockingRpcCallback<PingProtos.CountResponse>();
204           instance.count(null, PingProtos.CountRequest.newBuilder().build(), rpcCallback);
205           return rpcCallback.get().getCount();
206         }
207       });
208     int count = -1;
209     for (Map.Entry<byte [], Integer> e: intResults.entrySet()) {
210       assertTrue(e.getValue() > 0);
211       count = e.getValue();
212     }
213     final int diff = 5;
214     intResults = table.coprocessorService(PingProtos.PingService.class,
215       null, null,
216       new Batch.Call<PingProtos.PingService, Integer>() {
217         @Override
218         public Integer call(PingProtos.PingService instance) throws IOException {
219           BlockingRpcCallback<PingProtos.IncrementCountResponse> rpcCallback =
220             new BlockingRpcCallback<PingProtos.IncrementCountResponse>();
221           instance.increment(null, PingProtos.IncrementCountRequest.newBuilder().setDiff(diff).build(),
222             rpcCallback);
223           return rpcCallback.get().getCount();
224         }
225       });
226     // There are three regions so should get back three results.
227     assertEquals(3, results.size());
228     for (Map.Entry<byte [], Integer> e: intResults.entrySet()) {
229       assertEquals(e.getValue().intValue(), count + diff);
230     }
231     table.close();
232   }
233 
234   private Map<byte [], String> hello(final HTable table, final String send, final String response)
235   throws ServiceException, Throwable {
236     Map<byte [], String> results = hello(table, send);
237     for (Map.Entry<byte [], String> e: results.entrySet()) {
238       assertEquals("Invalid custom protocol response", response, e.getValue());
239     }
240     return results;
241   }
242 
243   private Map<byte [], String> hello(final HTable table, final String send)
244   throws ServiceException, Throwable {
245     return hello(table, send, null, null);
246   }
247 
248   private Map<byte [], String> hello(final HTable table, final String send, final byte [] start,
249       final byte [] end)
250   throws ServiceException, Throwable {
251     return table.coprocessorService(PingProtos.PingService.class,
252         start, end,
253         new Batch.Call<PingProtos.PingService, String>() {
254           @Override
255           public String call(PingProtos.PingService instance) throws IOException {
256             BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
257               new BlockingRpcCallback<PingProtos.HelloResponse>();
258             PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
259             if (send != null) builder.setName(send);
260             instance.hello(null, builder.build(), rpcCallback);
261             PingProtos.HelloResponse r = rpcCallback.get();
262             return r != null && r.hasResponse()? r.getResponse(): null;
263           }
264         });
265   }
266 
267   private Map<byte [], String> compoundOfHelloAndPing(final HTable table, final byte [] start,
268       final byte [] end)
269   throws ServiceException, Throwable {
270     return table.coprocessorService(PingProtos.PingService.class,
271         start, end,
272         new Batch.Call<PingProtos.PingService, String>() {
273           @Override
274           public String call(PingProtos.PingService instance) throws IOException {
275             BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
276               new BlockingRpcCallback<PingProtos.HelloResponse>();
277             PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
278             // Call ping on same instance.  Use result calling hello on same instance.
279             builder.setName(doPing(instance));
280             instance.hello(null, builder.build(), rpcCallback);
281             PingProtos.HelloResponse r = rpcCallback.get();
282             return r != null && r.hasResponse()? r.getResponse(): null;
283           }
284         });
285   }
286 
287   private Map<byte [], String> noop(final HTable table, final byte [] start,
288       final byte [] end)
289   throws ServiceException, Throwable {
290     return table.coprocessorService(PingProtos.PingService.class, start, end,
291         new Batch.Call<PingProtos.PingService, String>() {
292           @Override
293           public String call(PingProtos.PingService instance) throws IOException {
294             BlockingRpcCallback<PingProtos.NoopResponse> rpcCallback =
295               new BlockingRpcCallback<PingProtos.NoopResponse>();
296             PingProtos.NoopRequest.Builder builder = PingProtos.NoopRequest.newBuilder();
297             instance.noop(null, builder.build(), rpcCallback);
298             rpcCallback.get();
299             // Looks like null is expected when void.  That is what the test below is looking for
300             return null;
301           }
302         });
303   }
304 
305   @Test
306   public void testSingleMethod() throws Throwable {
307     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
308     Map<byte [], String> results = table.coprocessorService(PingProtos.PingService.class,
309       null, ROW_A,
310       new Batch.Call<PingProtos.PingService, String>() {
311         @Override
312         public String call(PingProtos.PingService instance) throws IOException {
313           BlockingRpcCallback<PingProtos.PingResponse> rpcCallback =
314             new BlockingRpcCallback<PingProtos.PingResponse>();
315           instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback);
316           return rpcCallback.get().getPong();
317         }
318       });
319     // Should have gotten results for 1 of the three regions only since we specified
320     // rows from 1 region
321     assertEquals(1, results.size());
322     verifyRegionResults(table, results, ROW_A);
323 
324     final String name = "NAME";
325     results = hello(table, name, null, ROW_A);
326     // Should have gotten results for 1 of the three regions only since we specified
327     // rows from 1 region
328     assertEquals(1, results.size());
329     verifyRegionResults(table, results, "Hello, NAME", ROW_A);
330     table.close();
331   }
332 
333   @Test
334   public void testRowRange() throws Throwable {
335     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
336     for (Entry<HRegionInfo, ServerName> e: table.getRegionLocations().entrySet()) {
337       LOG.info("Region " + e.getKey().getRegionNameAsString() + ", servername=" + e.getValue());
338     }
339     // Here are what regions looked like on a run:
340     //
341     // test,,1355943549657.c65d4822d8bdecc033a96451f3a0f55d.
342     // test,bbb,1355943549661.110393b070dd1ed93441e0bc9b3ffb7e.
343     // test,ccc,1355943549665.c3d6d125141359cbbd2a43eaff3cdf74.
344 
345     Map<byte [], String> results = ping(table, null, ROW_A);
346     // Should contain first region only.
347     assertEquals(1, results.size());
348     verifyRegionResults(table, results, ROW_A);
349 
350     // Test start row + empty end
351     results = ping(table, ROW_BC, null);
352     assertEquals(2, results.size());
353     // should contain last 2 regions
354     HRegionLocation loc = table.getRegionLocation(ROW_A, true);
355     assertNull("Should be missing region for row aaa (prior to start row)",
356       results.get(loc.getRegionInfo().getRegionName()));
357     verifyRegionResults(table, results, ROW_B);
358     verifyRegionResults(table, results, ROW_C);
359 
360     // test empty start + end
361     results = ping(table, null, ROW_BC);
362     // should contain the first 2 regions
363     assertEquals(2, results.size());
364     verifyRegionResults(table, results, ROW_A);
365     verifyRegionResults(table, results, ROW_B);
366     loc = table.getRegionLocation(ROW_C, true);
367     assertNull("Should be missing region for row ccc (past stop row)",
368         results.get(loc.getRegionInfo().getRegionName()));
369 
370     // test explicit start + end
371     results = ping(table, ROW_AB, ROW_BC);
372     // should contain first 2 regions
373     assertEquals(2, results.size());
374     verifyRegionResults(table, results, ROW_A);
375     verifyRegionResults(table, results, ROW_B);
376     loc = table.getRegionLocation(ROW_C, true);
377     assertNull("Should be missing region for row ccc (past stop row)",
378         results.get(loc.getRegionInfo().getRegionName()));
379 
380     // test single region
381     results = ping(table, ROW_B, ROW_BC);
382     // should only contain region bbb
383     assertEquals(1, results.size());
384     verifyRegionResults(table, results, ROW_B);
385     loc = table.getRegionLocation(ROW_A, true);
386     assertNull("Should be missing region for row aaa (prior to start)",
387         results.get(loc.getRegionInfo().getRegionName()));
388     loc = table.getRegionLocation(ROW_C, true);
389     assertNull("Should be missing region for row ccc (past stop row)",
390         results.get(loc.getRegionInfo().getRegionName()));
391     table.close();
392   }
393 
394   private Map<byte [], String> ping(final HTable table, final byte [] start, final byte [] end)
395   throws ServiceException, Throwable {
396     return table.coprocessorService(PingProtos.PingService.class, start, end,
397       new Batch.Call<PingProtos.PingService, String>() {
398         @Override
399         public String call(PingProtos.PingService instance) throws IOException {
400           return doPing(instance);
401         }
402       });
403   }
404 
405   private static String doPing(PingProtos.PingService instance) throws IOException {
406     BlockingRpcCallback<PingProtos.PingResponse> rpcCallback =
407         new BlockingRpcCallback<PingProtos.PingResponse>();
408       instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback);
409       return rpcCallback.get().getPong();
410   }
411 
412   @Test
413   public void testCompoundCall() throws Throwable {
414     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
415     Map<byte [], String> results = compoundOfHelloAndPing(table, ROW_A, ROW_C);
416     verifyRegionResults(table, results, "Hello, pong", ROW_A);
417     verifyRegionResults(table, results, "Hello, pong", ROW_B);
418     verifyRegionResults(table, results, "Hello, pong", ROW_C);
419     table.close();
420   }
421 
422   @Test
423   public void testNullCall() throws Throwable {
424     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
425     Map<byte[],String> results = hello(table, null, ROW_A, ROW_C);
426     verifyRegionResults(table, results, "Who are you?", ROW_A);
427     verifyRegionResults(table, results, "Who are you?", ROW_B);
428     verifyRegionResults(table, results, "Who are you?", ROW_C);
429   }
430 
431   @Test
432   public void testNullReturn() throws Throwable {
433     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
434     Map<byte[],String> results = hello(table, "nobody", ROW_A, ROW_C);
435     verifyRegionResults(table, results, null, ROW_A);
436     verifyRegionResults(table, results, null, ROW_B);
437     verifyRegionResults(table, results, null, ROW_C);
438   }
439 
440   @Test
441   public void testEmptyReturnType() throws Throwable {
442     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
443     Map<byte[],String> results = noop(table, ROW_A, ROW_C);
444     assertEquals("Should have results from three regions", 3, results.size());
445     // all results should be null
446     for (Object v : results.values()) {
447       assertNull(v);
448     }
449   }
450 
451   private void verifyRegionResults(HTable table,
452       Map<byte[],String> results, byte[] row) throws Exception {
453     verifyRegionResults(table, results, "pong", row);
454   }
455 
456   private void verifyRegionResults(HTable table,
457       Map<byte[], String> results, String expected, byte[] row)
458   throws Exception {
459     for (Map.Entry<byte [], String> e: results.entrySet()) {
460       LOG.info("row=" + Bytes.toString(row) + ", expected=" + expected +
461        ", result key=" + Bytes.toString(e.getKey()) +
462        ", value=" + e.getValue());
463     }
464     HRegionLocation loc = table.getRegionLocation(row, true);
465     byte[] region = loc.getRegionInfo().getRegionName();
466     assertTrue("Results should contain region " +
467       Bytes.toStringBinary(region) + " for row '" + Bytes.toStringBinary(row)+ "'",
468       results.containsKey(region));
469     assertEquals("Invalid result for row '"+Bytes.toStringBinary(row)+"'",
470       expected, results.get(region));
471   }
472 }