1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.coprocessor;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.Coprocessor;
28 import org.apache.hadoop.hbase.CoprocessorEnvironment;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.MediumTests;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.Append;
33 import org.apache.hadoop.hbase.client.Delete;
34 import org.apache.hadoop.hbase.client.Durability;
35 import org.apache.hadoop.hbase.client.Get;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.client.HTableInterface;
38 import org.apache.hadoop.hbase.client.Increment;
39 import org.apache.hadoop.hbase.client.Put;
40 import org.apache.hadoop.hbase.client.Result;
41 import org.apache.hadoop.hbase.client.ResultScanner;
42 import org.apache.hadoop.hbase.client.Row;
43 import org.apache.hadoop.hbase.client.RowMutations;
44 import org.apache.hadoop.hbase.client.Scan;
45 import org.apache.hadoop.hbase.client.coprocessor.Batch;
46 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
47 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.VersionInfo;
50 import org.junit.After;
51 import org.junit.AfterClass;
52 import org.junit.Before;
53 import org.junit.BeforeClass;
54 import org.junit.Test;
55 import org.junit.experimental.categories.Category;
56
57 import static org.junit.Assert.*;
58
59
60
61
62
63 @Category(MediumTests.class)
64 public class TestHTableWrapper {
65
66 private static final HBaseTestingUtility util = new HBaseTestingUtility();
67
68 private static final byte[] TEST_TABLE = Bytes.toBytes("test");
69 private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
70
71 private static final byte[] ROW_A = Bytes.toBytes("aaa");
72 private static final byte[] ROW_B = Bytes.toBytes("bbb");
73 private static final byte[] ROW_C = Bytes.toBytes("ccc");
74 private static final byte[] ROW_D = Bytes.toBytes("ddd");
75 private static final byte[] ROW_E = Bytes.toBytes("eee");
76
77 private static final byte[] qualifierCol1 = Bytes.toBytes("col1");
78
79 private static final byte[] bytes1 = Bytes.toBytes(1);
80 private static final byte[] bytes2 = Bytes.toBytes(2);
81 private static final byte[] bytes3 = Bytes.toBytes(3);
82 private static final byte[] bytes4 = Bytes.toBytes(4);
83 private static final byte[] bytes5 = Bytes.toBytes(5);
84
85 static class DummyRegionObserver extends BaseRegionObserver {
86 }
87
88 private HTableInterface hTableInterface;
89 private HTable table;
90
91 @BeforeClass
92 public static void setupBeforeClass() throws Exception {
93 util.startMiniCluster();
94 }
95
96 @AfterClass
97 public static void tearDownAfterClass() throws Exception {
98 util.shutdownMiniCluster();
99 }
100
101 @Before
102 public void before() throws Exception {
103 table = util.createTable(TEST_TABLE, TEST_FAMILY);
104
105 Put puta = new Put(ROW_A);
106 puta.add(TEST_FAMILY, qualifierCol1, bytes1);
107 table.put(puta);
108
109 Put putb = new Put(ROW_B);
110 putb.add(TEST_FAMILY, qualifierCol1, bytes2);
111 table.put(putb);
112
113 Put putc = new Put(ROW_C);
114 putc.add(TEST_FAMILY, qualifierCol1, bytes3);
115 table.put(putc);
116 }
117
118 @After
119 public void after() throws Exception {
120 try {
121 if (table != null) {
122 table.close();
123 }
124 } finally {
125 util.deleteTable(TEST_TABLE);
126 }
127 }
128
129 @Test
130 public void testHTableInterfaceMethods() throws Exception {
131 Configuration conf = util.getConfiguration();
132 MasterCoprocessorHost cpHost = util.getMiniHBaseCluster().getMaster().getCoprocessorHost();
133 Class<?> implClazz = DummyRegionObserver.class;
134 cpHost.load(implClazz, Coprocessor.PRIORITY_HIGHEST, conf);
135 CoprocessorEnvironment env = cpHost.findCoprocessorEnvironment(implClazz.getName());
136 assertEquals(Coprocessor.VERSION, env.getVersion());
137 assertEquals(VersionInfo.getVersion(), env.getHBaseVersion());
138 hTableInterface = env.getTable(TableName.valueOf(TEST_TABLE));
139 checkHTableInterfaceMethods();
140 cpHost.shutdown(env);
141 }
142
143 private void checkHTableInterfaceMethods() throws Exception {
144 checkConf();
145 checkNameAndDescriptor();
146 checkAutoFlush();
147 checkBufferSize();
148 checkExists();
149 checkGetRowOrBefore();
150 checkAppend();
151 checkPutsAndDeletes();
152 checkCheckAndPut();
153 checkCheckAndDelete();
154 checkIncrementColumnValue();
155 checkIncrement();
156 checkBatch();
157 checkCoprocessorService();
158 checkMutateRow();
159 checkResultScanner();
160
161 hTableInterface.flushCommits();
162 hTableInterface.close();
163 }
164
165 private void checkConf() {
166 Configuration confExpected = util.getConfiguration();
167 Configuration confActual = hTableInterface.getConfiguration();
168 assertTrue(confExpected == confActual);
169 }
170
171 private void checkNameAndDescriptor() throws IOException {
172 assertArrayEquals(TEST_TABLE, hTableInterface.getTableName());
173 assertEquals(table.getTableDescriptor(), hTableInterface.getTableDescriptor());
174 }
175
176 private void checkAutoFlush() {
177 boolean initialAutoFlush = hTableInterface.isAutoFlush();
178 hTableInterface.setAutoFlushTo(false);
179 assertFalse(hTableInterface.isAutoFlush());
180 hTableInterface.setAutoFlush(true, true);
181 assertTrue(hTableInterface.isAutoFlush());
182 hTableInterface.setAutoFlushTo(initialAutoFlush);
183 }
184
185 private void checkBufferSize() throws IOException {
186 long initialWriteBufferSize = hTableInterface.getWriteBufferSize();
187 hTableInterface.setWriteBufferSize(12345L);
188 assertEquals(12345L, hTableInterface.getWriteBufferSize());
189 hTableInterface.setWriteBufferSize(initialWriteBufferSize);
190 }
191
192 private void checkExists() throws IOException {
193 boolean ex = hTableInterface.exists(new Get(ROW_A).addColumn(TEST_FAMILY, qualifierCol1));
194 assertTrue(ex);
195
196 Boolean[] exArray = hTableInterface.exists(Arrays.asList(new Get[] {
197 new Get(ROW_A).addColumn(TEST_FAMILY, qualifierCol1),
198 new Get(ROW_B).addColumn(TEST_FAMILY, qualifierCol1),
199 new Get(ROW_C).addColumn(TEST_FAMILY, qualifierCol1),
200 new Get(Bytes.toBytes("does not exist")).addColumn(TEST_FAMILY, qualifierCol1), }));
201 assertArrayEquals(new Boolean[] { Boolean.TRUE, Boolean.TRUE, Boolean.TRUE, Boolean.FALSE },
202 exArray);
203 }
204
205 @SuppressWarnings("deprecation")
206 private void checkGetRowOrBefore() throws IOException {
207 Result rowOrBeforeResult = hTableInterface.getRowOrBefore(ROW_A, TEST_FAMILY);
208 assertArrayEquals(ROW_A, rowOrBeforeResult.getRow());
209 }
210
211 private void checkAppend() throws IOException {
212 final byte[] appendValue = Bytes.toBytes("append");
213 Append append = new Append(qualifierCol1).add(TEST_FAMILY, qualifierCol1, appendValue);
214 Result appendResult = hTableInterface.append(append);
215 byte[] appendedRow = appendResult.getRow();
216 checkRowValue(appendedRow, appendValue);
217 }
218
219 private void checkPutsAndDeletes() throws IOException {
220
221 Put putD = new Put(ROW_D).add(TEST_FAMILY, qualifierCol1, bytes2);
222 hTableInterface.put(putD);
223 checkRowValue(ROW_D, bytes2);
224
225
226 Delete delete = new Delete(ROW_D);
227 hTableInterface.delete(delete);
228 checkRowValue(ROW_D, null);
229
230
231 Put[] puts = new Put[] { new Put(ROW_D).add(TEST_FAMILY, qualifierCol1, bytes2),
232 new Put(ROW_E).add(TEST_FAMILY, qualifierCol1, bytes3) };
233 hTableInterface.put(Arrays.asList(puts));
234 checkRowsValues(new byte[][] { ROW_D, ROW_E }, new byte[][] { bytes2, bytes3 });
235
236
237 Delete[] deletes = new Delete[] { new Delete(ROW_D), new Delete(ROW_E) };
238 hTableInterface.delete(new ArrayList<Delete>(Arrays.asList(deletes)));
239 checkRowsValues(new byte[][] { ROW_D, ROW_E }, new byte[][] { null, null });
240 }
241
242 private void checkCheckAndPut() throws IOException {
243 Put putC = new Put(ROW_C).add(TEST_FAMILY, qualifierCol1, bytes5);
244 assertFalse(hTableInterface.checkAndPut(ROW_C, TEST_FAMILY, qualifierCol1,
245 putC
246 assertTrue(hTableInterface.checkAndPut(ROW_C, TEST_FAMILY, qualifierCol1,
247 putC
248 checkRowValue(ROW_C, bytes5);
249 }
250
251 private void checkCheckAndDelete() throws IOException {
252 Delete delete = new Delete(ROW_C);
253 assertFalse(hTableInterface.checkAndDelete(ROW_C, TEST_FAMILY, qualifierCol1, bytes4, delete));
254 assertTrue(hTableInterface.checkAndDelete(ROW_C, TEST_FAMILY, qualifierCol1, bytes5, delete));
255 checkRowValue(ROW_C, null);
256 }
257
258 private void checkIncrementColumnValue() throws IOException {
259 hTableInterface.put(new Put(ROW_A).add(TEST_FAMILY, qualifierCol1, Bytes.toBytes(1L)));
260 checkRowValue(ROW_A, Bytes.toBytes(1L));
261
262 final long newVal = hTableInterface
263 .incrementColumnValue(ROW_A, TEST_FAMILY, qualifierCol1, 10L);
264 assertEquals(11L, newVal);
265 checkRowValue(ROW_A, Bytes.toBytes(11L));
266
267 final long newVal2 = hTableInterface.incrementColumnValue(ROW_A, TEST_FAMILY, qualifierCol1,
268 -10L, Durability.SYNC_WAL);
269 assertEquals(1L, newVal2);
270 checkRowValue(ROW_A, Bytes.toBytes(1L));
271 }
272
273 private void checkIncrement() throws IOException {
274 hTableInterface.increment(new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, -5L));
275 checkRowValue(ROW_A, Bytes.toBytes(-4L));
276 }
277
278 private void checkBatch() throws IOException, InterruptedException {
279 Object[] results1 = hTableInterface.batch(Arrays.asList(new Row[] {
280 new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L),
281 new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L) }));
282 assertEquals(2, results1.length);
283 for (Object r2 : results1) {
284 assertTrue(r2 instanceof Result);
285 }
286 checkRowValue(ROW_A, Bytes.toBytes(0L));
287 Object[] results2 = new Result[2];
288 hTableInterface.batch(
289 Arrays.asList(new Row[] { new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L),
290 new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L) }), results2);
291 for (Object r2 : results2) {
292 assertTrue(r2 instanceof Result);
293 }
294 checkRowValue(ROW_A, Bytes.toBytes(4L));
295
296
297 final long[] updateCounter = new long[] { 0L };
298 Object[] results3 = hTableInterface.batchCallback(
299 Arrays.asList(new Row[] { new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L),
300 new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L) }),
301 new Batch.Callback<Result>() {
302 @Override
303 public void update(byte[] region, byte[] row, Result result) {
304 updateCounter[0]++;
305 }
306 });
307 assertEquals(2, updateCounter[0]);
308 assertEquals(2, results3.length);
309 for (Object r3 : results3) {
310 assertTrue(r3 instanceof Result);
311 }
312 checkRowValue(ROW_A, Bytes.toBytes(8L));
313
314 Object[] results4 = new Result[2];
315 updateCounter[0] = 0L;
316 hTableInterface.batchCallback(
317 Arrays.asList(new Row[] { new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L),
318 new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L) }), results4,
319 new Batch.Callback<Result>() {
320 @Override
321 public void update(byte[] region, byte[] row, Result result) {
322 updateCounter[0]++;
323 }
324 });
325 assertEquals(2, updateCounter[0]);
326 for (Object r2 : results4) {
327 assertTrue(r2 instanceof Result);
328 }
329 checkRowValue(ROW_A, Bytes.toBytes(12L));
330 }
331
332 private void checkCoprocessorService() {
333 CoprocessorRpcChannel crc = hTableInterface.coprocessorService(ROW_A);
334 assertNotNull(crc);
335 }
336
337 private void checkMutateRow() throws IOException {
338 Put put = new Put(ROW_A).add(TEST_FAMILY, qualifierCol1, bytes1);
339 RowMutations rowMutations = new RowMutations(ROW_A);
340 rowMutations.add(put);
341 hTableInterface.mutateRow(rowMutations);
342 checkRowValue(ROW_A, bytes1);
343 }
344
345 private void checkResultScanner() throws IOException {
346 ResultScanner resultScanner = hTableInterface.getScanner(TEST_FAMILY);
347 Result[] results = resultScanner.next(10);
348 assertEquals(3, results.length);
349
350 resultScanner = hTableInterface.getScanner(TEST_FAMILY, qualifierCol1);
351 results = resultScanner.next(10);
352 assertEquals(3, results.length);
353
354 resultScanner = hTableInterface.getScanner(new Scan(ROW_A, ROW_C));
355 results = resultScanner.next(10);
356 assertEquals(2, results.length);
357 }
358
359 private void checkRowValue(byte[] row, byte[] expectedValue) throws IOException {
360 Get get = new Get(row).addColumn(TEST_FAMILY, qualifierCol1);
361 Result result = hTableInterface.get(get);
362 byte[] actualValue = result.getValue(TEST_FAMILY, qualifierCol1);
363 assertArrayEquals(expectedValue, actualValue);
364 }
365
366 private void checkRowsValues(byte[][] rows, byte[][] expectedValues) throws IOException {
367 if (rows.length != expectedValues.length) {
368 throw new IllegalArgumentException();
369 }
370 Get[] gets = new Get[rows.length];
371 for (int i = 0; i < gets.length; i++) {
372 gets[i] = new Get(rows[i]).addColumn(TEST_FAMILY, qualifierCol1);
373 }
374 Result[] results = hTableInterface.get(Arrays.asList(gets));
375 for (int i = 0; i < expectedValues.length; i++) {
376 byte[] actualValue = results[i].getValue(TEST_FAMILY, qualifierCol1);
377 assertArrayEquals(expectedValues[i], actualValue);
378 }
379 }
380
381 }