View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.coprocessor;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.Coprocessor;
28  import org.apache.hadoop.hbase.CoprocessorEnvironment;
29  import org.apache.hadoop.hbase.HBaseTestingUtility;
30  import org.apache.hadoop.hbase.MediumTests;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.Append;
33  import org.apache.hadoop.hbase.client.Delete;
34  import org.apache.hadoop.hbase.client.Durability;
35  import org.apache.hadoop.hbase.client.Get;
36  import org.apache.hadoop.hbase.client.HTable;
37  import org.apache.hadoop.hbase.client.HTableInterface;
38  import org.apache.hadoop.hbase.client.Increment;
39  import org.apache.hadoop.hbase.client.Put;
40  import org.apache.hadoop.hbase.client.Result;
41  import org.apache.hadoop.hbase.client.ResultScanner;
42  import org.apache.hadoop.hbase.client.Row;
43  import org.apache.hadoop.hbase.client.RowMutations;
44  import org.apache.hadoop.hbase.client.Scan;
45  import org.apache.hadoop.hbase.client.coprocessor.Batch;
46  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
47  import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.util.VersionInfo;
50  import org.junit.After;
51  import org.junit.AfterClass;
52  import org.junit.Before;
53  import org.junit.BeforeClass;
54  import org.junit.Test;
55  import org.junit.experimental.categories.Category;
56  
57  import static org.junit.Assert.*;
58  
59  /**
60   * Tests class {@link org.apache.hadoop.hbase.coprocessor.CoprocessorHost.Environment.HTableWrapper}
61   * by invoking its methods and briefly asserting the result is reasonable.
62   */
63  @Category(MediumTests.class)
64  public class TestHTableWrapper {
65  
66    private static final HBaseTestingUtility util = new HBaseTestingUtility();
67  
68    private static final byte[] TEST_TABLE = Bytes.toBytes("test");
69    private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
70  
71    private static final byte[] ROW_A = Bytes.toBytes("aaa");
72    private static final byte[] ROW_B = Bytes.toBytes("bbb");
73    private static final byte[] ROW_C = Bytes.toBytes("ccc");
74    private static final byte[] ROW_D = Bytes.toBytes("ddd");
75    private static final byte[] ROW_E = Bytes.toBytes("eee");
76  
77    private static final byte[] qualifierCol1 = Bytes.toBytes("col1");
78  
79    private static final byte[] bytes1 = Bytes.toBytes(1);
80    private static final byte[] bytes2 = Bytes.toBytes(2);
81    private static final byte[] bytes3 = Bytes.toBytes(3);
82    private static final byte[] bytes4 = Bytes.toBytes(4);
83    private static final byte[] bytes5 = Bytes.toBytes(5);
84  
85    static class DummyRegionObserver extends BaseRegionObserver {
86    }
87  
88    private HTableInterface hTableInterface;
89    private HTable table;
90  
91    @BeforeClass
92    public static void setupBeforeClass() throws Exception {
93      util.startMiniCluster();
94    }
95  
96    @AfterClass
97    public static void tearDownAfterClass() throws Exception {
98      util.shutdownMiniCluster();
99    }
100 
101   @Before
102   public void before() throws Exception {
103     table = util.createTable(TEST_TABLE, TEST_FAMILY);
104 
105     Put puta = new Put(ROW_A);
106     puta.add(TEST_FAMILY, qualifierCol1, bytes1);
107     table.put(puta);
108 
109     Put putb = new Put(ROW_B);
110     putb.add(TEST_FAMILY, qualifierCol1, bytes2);
111     table.put(putb);
112 
113     Put putc = new Put(ROW_C);
114     putc.add(TEST_FAMILY, qualifierCol1, bytes3);
115     table.put(putc);
116   }
117 
118   @After
119   public void after() throws Exception {
120     try {
121       if (table != null) {
122         table.close();
123       }
124     } finally {
125       util.deleteTable(TEST_TABLE);
126     }
127   }
128 
129   @Test
130   public void testHTableInterfaceMethods() throws Exception {
131     Configuration conf = util.getConfiguration();
132     MasterCoprocessorHost cpHost = util.getMiniHBaseCluster().getMaster().getCoprocessorHost();
133     Class<?> implClazz = DummyRegionObserver.class;
134     cpHost.load(implClazz, Coprocessor.PRIORITY_HIGHEST, conf);
135     CoprocessorEnvironment env = cpHost.findCoprocessorEnvironment(implClazz.getName());
136     assertEquals(Coprocessor.VERSION, env.getVersion());
137     assertEquals(VersionInfo.getVersion(), env.getHBaseVersion());
138     hTableInterface = env.getTable(TableName.valueOf(TEST_TABLE));
139     checkHTableInterfaceMethods();
140     cpHost.shutdown(env);
141   }
142 
143   private void checkHTableInterfaceMethods() throws Exception {
144     checkConf();
145     checkNameAndDescriptor();
146     checkAutoFlush();
147     checkBufferSize();
148     checkExists();
149     checkGetRowOrBefore();
150     checkAppend();
151     checkPutsAndDeletes();
152     checkCheckAndPut();
153     checkCheckAndDelete();
154     checkIncrementColumnValue();
155     checkIncrement();
156     checkBatch();
157     checkCoprocessorService();
158     checkMutateRow();
159     checkResultScanner();
160 
161     hTableInterface.flushCommits();
162     hTableInterface.close();
163   }
164 
165   private void checkConf() {
166     Configuration confExpected = util.getConfiguration();
167     Configuration confActual = hTableInterface.getConfiguration();
168     assertTrue(confExpected == confActual);
169   }
170 
171   private void checkNameAndDescriptor() throws IOException {
172     assertArrayEquals(TEST_TABLE, hTableInterface.getTableName());
173     assertEquals(table.getTableDescriptor(), hTableInterface.getTableDescriptor());
174   }
175 
176   private void checkAutoFlush() {
177     boolean initialAutoFlush = hTableInterface.isAutoFlush();
178     hTableInterface.setAutoFlushTo(false);
179     assertFalse(hTableInterface.isAutoFlush());
180     hTableInterface.setAutoFlush(true, true);
181     assertTrue(hTableInterface.isAutoFlush());
182     hTableInterface.setAutoFlushTo(initialAutoFlush);
183   }
184 
185   private void checkBufferSize() throws IOException {
186     long initialWriteBufferSize = hTableInterface.getWriteBufferSize();
187     hTableInterface.setWriteBufferSize(12345L);
188     assertEquals(12345L, hTableInterface.getWriteBufferSize());
189     hTableInterface.setWriteBufferSize(initialWriteBufferSize);
190   }
191 
192   private void checkExists() throws IOException {
193     boolean ex = hTableInterface.exists(new Get(ROW_A).addColumn(TEST_FAMILY, qualifierCol1));
194     assertTrue(ex);
195 
196     Boolean[] exArray = hTableInterface.exists(Arrays.asList(new Get[] {
197         new Get(ROW_A).addColumn(TEST_FAMILY, qualifierCol1),
198         new Get(ROW_B).addColumn(TEST_FAMILY, qualifierCol1),
199         new Get(ROW_C).addColumn(TEST_FAMILY, qualifierCol1),
200         new Get(Bytes.toBytes("does not exist")).addColumn(TEST_FAMILY, qualifierCol1), }));
201     assertArrayEquals(new Boolean[] { Boolean.TRUE, Boolean.TRUE, Boolean.TRUE, Boolean.FALSE },
202         exArray);
203   }
204 
205   @SuppressWarnings("deprecation")
206   private void checkGetRowOrBefore() throws IOException {
207     Result rowOrBeforeResult = hTableInterface.getRowOrBefore(ROW_A, TEST_FAMILY);
208     assertArrayEquals(ROW_A, rowOrBeforeResult.getRow());
209   }
210 
211   private void checkAppend() throws IOException {
212     final byte[] appendValue = Bytes.toBytes("append");
213     Append append = new Append(qualifierCol1).add(TEST_FAMILY, qualifierCol1, appendValue);
214     Result appendResult = hTableInterface.append(append);
215     byte[] appendedRow = appendResult.getRow();
216     checkRowValue(appendedRow, appendValue);
217   }
218 
219   private void checkPutsAndDeletes() throws IOException {
220     // put:
221     Put putD = new Put(ROW_D).add(TEST_FAMILY, qualifierCol1, bytes2);
222     hTableInterface.put(putD);
223     checkRowValue(ROW_D, bytes2);
224 
225     // delete:
226     Delete delete = new Delete(ROW_D);
227     hTableInterface.delete(delete);
228     checkRowValue(ROW_D, null);
229 
230     // multiple puts:
231     Put[] puts = new Put[] { new Put(ROW_D).add(TEST_FAMILY, qualifierCol1, bytes2),
232         new Put(ROW_E).add(TEST_FAMILY, qualifierCol1, bytes3) };
233     hTableInterface.put(Arrays.asList(puts));
234     checkRowsValues(new byte[][] { ROW_D, ROW_E }, new byte[][] { bytes2, bytes3 });
235 
236     // multiple deletes:
237     Delete[] deletes = new Delete[] { new Delete(ROW_D), new Delete(ROW_E) };
238     hTableInterface.delete(new ArrayList<Delete>(Arrays.asList(deletes)));
239     checkRowsValues(new byte[][] { ROW_D, ROW_E }, new byte[][] { null, null });
240   }
241 
242   private void checkCheckAndPut() throws IOException {
243     Put putC = new Put(ROW_C).add(TEST_FAMILY, qualifierCol1, bytes5);
244     assertFalse(hTableInterface.checkAndPut(ROW_C, TEST_FAMILY, qualifierCol1, /* expect */bytes4,
245       putC/* newValue */));
246     assertTrue(hTableInterface.checkAndPut(ROW_C, TEST_FAMILY, qualifierCol1, /* expect */bytes3,
247       putC/* newValue */));
248     checkRowValue(ROW_C, bytes5);
249   }
250 
251   private void checkCheckAndDelete() throws IOException {
252     Delete delete = new Delete(ROW_C);
253     assertFalse(hTableInterface.checkAndDelete(ROW_C, TEST_FAMILY, qualifierCol1, bytes4, delete));
254     assertTrue(hTableInterface.checkAndDelete(ROW_C, TEST_FAMILY, qualifierCol1, bytes5, delete));
255     checkRowValue(ROW_C, null);
256   }
257 
258   private void checkIncrementColumnValue() throws IOException {
259     hTableInterface.put(new Put(ROW_A).add(TEST_FAMILY, qualifierCol1, Bytes.toBytes(1L)));
260     checkRowValue(ROW_A, Bytes.toBytes(1L));
261 
262     final long newVal = hTableInterface
263         .incrementColumnValue(ROW_A, TEST_FAMILY, qualifierCol1, 10L);
264     assertEquals(11L, newVal);
265     checkRowValue(ROW_A, Bytes.toBytes(11L));
266 
267     final long newVal2 = hTableInterface.incrementColumnValue(ROW_A, TEST_FAMILY, qualifierCol1,
268         -10L, Durability.SYNC_WAL);
269     assertEquals(1L, newVal2);
270     checkRowValue(ROW_A, Bytes.toBytes(1L));
271   }
272 
273   private void checkIncrement() throws IOException {
274     hTableInterface.increment(new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, -5L));
275     checkRowValue(ROW_A, Bytes.toBytes(-4L));
276   }
277 
278   private void checkBatch() throws IOException, InterruptedException {
279     Object[] results1 = hTableInterface.batch(Arrays.asList(new Row[] {
280         new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L),
281         new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L) }));
282     assertEquals(2, results1.length);
283     for (Object r2 : results1) {
284       assertTrue(r2 instanceof Result);
285     }
286     checkRowValue(ROW_A, Bytes.toBytes(0L));
287     Object[] results2 = new Result[2];
288     hTableInterface.batch(
289         Arrays.asList(new Row[] { new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L),
290             new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L) }), results2);
291     for (Object r2 : results2) {
292       assertTrue(r2 instanceof Result);
293     }
294     checkRowValue(ROW_A, Bytes.toBytes(4L));
295 
296     // with callbacks:
297     final long[] updateCounter = new long[] { 0L };
298     Object[] results3 = hTableInterface.batchCallback(
299         Arrays.asList(new Row[] { new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L),
300             new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L) }),
301         new Batch.Callback<Result>() {
302           @Override
303           public void update(byte[] region, byte[] row, Result result) {
304             updateCounter[0]++;
305           }
306         });
307     assertEquals(2, updateCounter[0]);
308     assertEquals(2, results3.length);
309     for (Object r3 : results3) {
310       assertTrue(r3 instanceof Result);
311     }
312     checkRowValue(ROW_A, Bytes.toBytes(8L));
313 
314     Object[] results4 = new Result[2];
315     updateCounter[0] = 0L;
316     hTableInterface.batchCallback(
317         Arrays.asList(new Row[] { new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L),
318             new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L) }), results4,
319         new Batch.Callback<Result>() {
320           @Override
321           public void update(byte[] region, byte[] row, Result result) {
322             updateCounter[0]++;
323           }
324         });
325     assertEquals(2, updateCounter[0]);
326     for (Object r2 : results4) {
327       assertTrue(r2 instanceof Result);
328     }
329     checkRowValue(ROW_A, Bytes.toBytes(12L));
330   }
331 
332   private void checkCoprocessorService() {
333     CoprocessorRpcChannel crc = hTableInterface.coprocessorService(ROW_A);
334     assertNotNull(crc);
335   }
336 
337   private void checkMutateRow() throws IOException {
338     Put put = new Put(ROW_A).add(TEST_FAMILY, qualifierCol1, bytes1);
339     RowMutations rowMutations = new RowMutations(ROW_A);
340     rowMutations.add(put);
341     hTableInterface.mutateRow(rowMutations);
342     checkRowValue(ROW_A, bytes1);
343   }
344 
345   private void checkResultScanner() throws IOException {
346     ResultScanner resultScanner = hTableInterface.getScanner(TEST_FAMILY);
347     Result[] results = resultScanner.next(10);
348     assertEquals(3, results.length);
349 
350     resultScanner = hTableInterface.getScanner(TEST_FAMILY, qualifierCol1);
351     results = resultScanner.next(10);
352     assertEquals(3, results.length);
353 
354     resultScanner = hTableInterface.getScanner(new Scan(ROW_A, ROW_C));
355     results = resultScanner.next(10);
356     assertEquals(2, results.length);
357   }
358 
359   private void checkRowValue(byte[] row, byte[] expectedValue) throws IOException {
360     Get get = new Get(row).addColumn(TEST_FAMILY, qualifierCol1);
361     Result result = hTableInterface.get(get);
362     byte[] actualValue = result.getValue(TEST_FAMILY, qualifierCol1);
363     assertArrayEquals(expectedValue, actualValue);
364   }
365 
366   private void checkRowsValues(byte[][] rows, byte[][] expectedValues) throws IOException {
367     if (rows.length != expectedValues.length) {
368       throw new IllegalArgumentException();
369     }
370     Get[] gets = new Get[rows.length];
371     for (int i = 0; i < gets.length; i++) {
372       gets[i] = new Get(rows[i]).addColumn(TEST_FAMILY, qualifierCol1);
373     }
374     Result[] results = hTableInterface.get(Arrays.asList(gets));
375     for (int i = 0; i < expectedValues.length; i++) {
376       byte[] actualValue = results[i].getValue(TEST_FAMILY, qualifierCol1);
377       assertArrayEquals(expectedValues[i], actualValue);
378     }
379   }
380 
381 }