1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.coprocessor;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.HashSet;
30 import java.util.List;
31 import java.util.Set;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicLong;
35
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.client.Table;
38 import org.apache.hadoop.hbase.util.ByteStringer;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.Cell;
41 import org.apache.hadoop.hbase.CellUtil;
42 import org.apache.hadoop.hbase.HBaseTestingUtility;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.KeyValue;
45 import org.apache.hadoop.hbase.testclassification.MediumTests;
46 import org.apache.hadoop.hbase.client.Delete;
47 import org.apache.hadoop.hbase.client.Get;
48 import org.apache.hadoop.hbase.client.IsolationLevel;
49 import org.apache.hadoop.hbase.client.Mutation;
50 import org.apache.hadoop.hbase.client.Put;
51 import org.apache.hadoop.hbase.client.Scan;
52 import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient;
53 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos;
54 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest;
55 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse;
56 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorRequest;
57 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorResponse;
58 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorRequest;
59 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorResponse;
60 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorRequest;
61 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorResponse;
62 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
63 import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessRequest;
64 import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessResponse;
65 import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService;
66 import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
67 import org.apache.hadoop.hbase.regionserver.HRegion;
68 import org.apache.hadoop.hbase.regionserver.InternalScanner;
69 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
70 import org.apache.hadoop.hbase.util.Bytes;
71 import org.junit.AfterClass;
72 import org.junit.BeforeClass;
73 import org.junit.Test;
74 import org.junit.experimental.categories.Category;
75
76 import com.google.protobuf.Message;
77 import org.apache.commons.logging.Log;
78 import org.apache.commons.logging.LogFactory;
79
80
81
82
83
84 @Category(MediumTests.class)
85 public class TestRowProcessorEndpoint {
86
87 private static final Log LOG = LogFactory.getLog(TestRowProcessorEndpoint.class);
88
89 private static final TableName TABLE = TableName.valueOf("testtable");
90 private final static byte[] ROW = Bytes.toBytes("testrow");
91 private final static byte[] ROW2 = Bytes.toBytes("testrow2");
92 private final static byte[] FAM = Bytes.toBytes("friendlist");
93
94
95 private final static byte[] A = Bytes.toBytes("a");
96 private final static byte[] B = Bytes.toBytes("b");
97 private final static byte[] C = Bytes.toBytes("c");
98 private final static byte[] D = Bytes.toBytes("d");
99 private final static byte[] E = Bytes.toBytes("e");
100 private final static byte[] F = Bytes.toBytes("f");
101 private final static byte[] G = Bytes.toBytes("g");
102 private final static byte[] COUNTER = Bytes.toBytes("counter");
103 private final static AtomicLong myTimer = new AtomicLong(0);
104 private final AtomicInteger failures = new AtomicInteger(0);
105
106 private static HBaseTestingUtility util = new HBaseTestingUtility();
107 private static volatile int expectedCounter = 0;
108 private static int rowSize, row2Size;
109
110 private volatile static Table table = null;
111 private volatile static boolean swapped = false;
112 private volatile CountDownLatch startSignal;
113 private volatile CountDownLatch doneSignal;
114
115 @BeforeClass
116 public static void setupBeforeClass() throws Exception {
117 Configuration conf = util.getConfiguration();
118 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
119 RowProcessorEndpoint.class.getName());
120 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
121 conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
122 util.startMiniCluster();
123 }
124
125 @AfterClass
126 public static void tearDownAfterClass() throws Exception {
127 util.shutdownMiniCluster();
128 }
129
130 public void prepareTestData() throws Exception {
131 try {
132 util.getHBaseAdmin().disableTable(TABLE);
133 util.getHBaseAdmin().deleteTable(TABLE);
134 } catch (Exception e) {
135
136 }
137 table = util.createTable(TABLE, FAM);
138 {
139 Put put = new Put(ROW);
140 put.add(FAM, A, Bytes.add(B, C));
141 put.add(FAM, B, Bytes.add(D, E, F));
142 put.add(FAM, C, G);
143 table.put(put);
144 rowSize = put.size();
145 }
146 Put put = new Put(ROW2);
147 put.add(FAM, D, E);
148 put.add(FAM, F, G);
149 table.put(put);
150 row2Size = put.size();
151 }
152
153 @Test
154 public void testDoubleScan() throws Throwable {
155 prepareTestData();
156
157 CoprocessorRpcChannel channel = table.coprocessorService(ROW);
158 RowProcessorEndpoint.FriendsOfFriendsProcessor processor =
159 new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A);
160 RowProcessorService.BlockingInterface service =
161 RowProcessorService.newBlockingStub(channel);
162 ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
163 ProcessResponse protoResult = service.process(null, request);
164 FriendsOfFriendsProcessorResponse response =
165 FriendsOfFriendsProcessorResponse.parseFrom(protoResult.getRowProcessorResult());
166 Set<String> result = new HashSet<String>();
167 result.addAll(response.getResultList());
168 Set<String> expected =
169 new HashSet<String>(Arrays.asList(new String[]{"d", "e", "f", "g"}));
170 Get get = new Get(ROW);
171 LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells()));
172 assertEquals(expected, result);
173 }
174
175 @Test
176 public void testReadModifyWrite() throws Throwable {
177 prepareTestData();
178 failures.set(0);
179 int numThreads = 100;
180 concurrentExec(new IncrementRunner(), numThreads);
181 Get get = new Get(ROW);
182 LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells()));
183 int finalCounter = incrementCounter(table);
184 assertEquals(numThreads + 1, finalCounter);
185 assertEquals(0, failures.get());
186 }
187
188 class IncrementRunner implements Runnable {
189 @Override
190 public void run() {
191 try {
192 incrementCounter(table);
193 } catch (Throwable e) {
194 e.printStackTrace();
195 }
196 }
197 }
198
199 private int incrementCounter(Table table) throws Throwable {
200 CoprocessorRpcChannel channel = table.coprocessorService(ROW);
201 RowProcessorEndpoint.IncrementCounterProcessor processor =
202 new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
203 RowProcessorService.BlockingInterface service =
204 RowProcessorService.newBlockingStub(channel);
205 ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
206 ProcessResponse protoResult = service.process(null, request);
207 IncCounterProcessorResponse response = IncCounterProcessorResponse
208 .parseFrom(protoResult.getRowProcessorResult());
209 Integer result = response.getResponse();
210 return result;
211 }
212
213 private void concurrentExec(
214 final Runnable task, final int numThreads) throws Throwable {
215 startSignal = new CountDownLatch(numThreads);
216 doneSignal = new CountDownLatch(numThreads);
217 for (int i = 0; i < numThreads; ++i) {
218 new Thread(new Runnable() {
219 @Override
220 public void run() {
221 try {
222 startSignal.countDown();
223 startSignal.await();
224 task.run();
225 } catch (Throwable e) {
226 failures.incrementAndGet();
227 e.printStackTrace();
228 }
229 doneSignal.countDown();
230 }
231 }).start();
232 }
233 doneSignal.await();
234 }
235
236 @Test
237 public void testMultipleRows() throws Throwable {
238 prepareTestData();
239 failures.set(0);
240 int numThreads = 100;
241 concurrentExec(new SwapRowsRunner(), numThreads);
242 LOG.debug("row keyvalues:" +
243 stringifyKvs(table.get(new Get(ROW)).listCells()));
244 LOG.debug("row2 keyvalues:" +
245 stringifyKvs(table.get(new Get(ROW2)).listCells()));
246 assertEquals(rowSize, table.get(new Get(ROW)).listCells().size());
247 assertEquals(row2Size, table.get(new Get(ROW2)).listCells().size());
248 assertEquals(0, failures.get());
249 }
250
251 class SwapRowsRunner implements Runnable {
252 @Override
253 public void run() {
254 try {
255 swapRows(table);
256 } catch (Throwable e) {
257 e.printStackTrace();
258 }
259 }
260 }
261
262 private void swapRows(Table table) throws Throwable {
263 CoprocessorRpcChannel channel = table.coprocessorService(ROW);
264 RowProcessorEndpoint.RowSwapProcessor processor =
265 new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2);
266 RowProcessorService.BlockingInterface service =
267 RowProcessorService.newBlockingStub(channel);
268 ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
269 service.process(null, request);
270 }
271
272 @Test
273 public void testTimeout() throws Throwable {
274 prepareTestData();
275 CoprocessorRpcChannel channel = table.coprocessorService(ROW);
276 RowProcessorEndpoint.TimeoutProcessor processor =
277 new RowProcessorEndpoint.TimeoutProcessor(ROW);
278 RowProcessorService.BlockingInterface service =
279 RowProcessorService.newBlockingStub(channel);
280 ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
281 boolean exceptionCaught = false;
282 try {
283 service.process(null, request);
284 } catch (Exception e) {
285 exceptionCaught = true;
286 }
287 assertTrue(exceptionCaught);
288 }
289
290
291
292
293
294
295
296
297 public static class RowProcessorEndpoint<S extends Message,T extends Message>
298 extends BaseRowProcessorEndpoint<S,T> implements CoprocessorService {
299 public static class IncrementCounterProcessor extends
300 BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest,
301 IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
302 int counter = 0;
303 byte[] row = new byte[0];
304
305
306
307
308 IncrementCounterProcessor() {
309 }
310
311 IncrementCounterProcessor(byte[] row) {
312 this.row = row;
313 }
314
315 @Override
316 public Collection<byte[]> getRowsToLock() {
317 return Collections.singleton(row);
318 }
319
320 @Override
321 public IncCounterProcessorResponse getResult() {
322 IncCounterProcessorResponse.Builder i = IncCounterProcessorResponse.newBuilder();
323 i.setResponse(counter);
324 return i.build();
325 }
326
327 @Override
328 public boolean readOnly() {
329 return false;
330 }
331
332 @Override
333 public void process(long now, HRegion region,
334 List<Mutation> mutations, WALEdit walEdit) throws IOException {
335
336 List<Cell> kvs = new ArrayList<Cell>();
337 Scan scan = new Scan(row, row);
338 scan.addColumn(FAM, COUNTER);
339 doScan(region, scan, kvs);
340 counter = kvs.size() == 0 ? 0 :
341 Bytes.toInt(CellUtil.cloneValue(kvs.iterator().next()));
342
343
344 assertEquals(expectedCounter, counter);
345
346
347 counter += 1;
348 expectedCounter += 1;
349
350
351 Put p = new Put(row);
352 KeyValue kv =
353 new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter));
354 p.add(kv);
355 mutations.add(p);
356 walEdit.add(kv);
357
358
359 KeyValue metaKv = new KeyValue(
360 row, WALEdit.METAFAMILY,
361 Bytes.toBytes("I just increment counter"),
362 Bytes.toBytes(counter));
363 walEdit.add(metaKv);
364 }
365
366 @Override
367 public IncCounterProcessorRequest getRequestData() throws IOException {
368 IncCounterProcessorRequest.Builder builder = IncCounterProcessorRequest.newBuilder();
369 builder.setCounter(counter);
370 builder.setRow(ByteStringer.wrap(row));
371 return builder.build();
372 }
373
374 @Override
375 public void initialize(IncCounterProcessorRequest msg) {
376 this.row = msg.getRow().toByteArray();
377 this.counter = msg.getCounter();
378 }
379 }
380
381 public static class FriendsOfFriendsProcessor extends
382 BaseRowProcessor<FriendsOfFriendsProcessorRequest, FriendsOfFriendsProcessorResponse> {
383 byte[] row = null;
384 byte[] person = null;
385 final Set<String> result = new HashSet<String>();
386
387
388
389
390 FriendsOfFriendsProcessor() {
391 }
392
393 FriendsOfFriendsProcessor(byte[] row, byte[] person) {
394 this.row = row;
395 this.person = person;
396 }
397
398 @Override
399 public Collection<byte[]> getRowsToLock() {
400 return Collections.singleton(row);
401 }
402
403 @Override
404 public FriendsOfFriendsProcessorResponse getResult() {
405 FriendsOfFriendsProcessorResponse.Builder builder =
406 FriendsOfFriendsProcessorResponse.newBuilder();
407 builder.addAllResult(result);
408 return builder.build();
409 }
410
411 @Override
412 public boolean readOnly() {
413 return true;
414 }
415
416 @Override
417 public void process(long now, HRegion region,
418 List<Mutation> mutations, WALEdit walEdit) throws IOException {
419 List<Cell> kvs = new ArrayList<Cell>();
420 {
421 Scan scan = new Scan(row, row);
422 scan.addColumn(FAM, person);
423 doScan(region, scan, kvs);
424 }
425
426
427 Scan scan = new Scan(row, row);
428 for (Cell kv : kvs) {
429 byte[] friends = CellUtil.cloneValue(kv);
430 for (byte f : friends) {
431 scan.addColumn(FAM, new byte[]{f});
432 }
433 }
434 doScan(region, scan, kvs);
435
436
437 result.clear();
438 for (Cell kv : kvs) {
439 for (byte b : CellUtil.cloneValue(kv)) {
440 result.add((char)b + "");
441 }
442 }
443 }
444
445 @Override
446 public FriendsOfFriendsProcessorRequest getRequestData() throws IOException {
447 FriendsOfFriendsProcessorRequest.Builder builder =
448 FriendsOfFriendsProcessorRequest.newBuilder();
449 builder.setPerson(ByteStringer.wrap(person));
450 builder.setRow(ByteStringer.wrap(row));
451 builder.addAllResult(result);
452 FriendsOfFriendsProcessorRequest f = builder.build();
453 return f;
454 }
455
456 @Override
457 public void initialize(FriendsOfFriendsProcessorRequest request)
458 throws IOException {
459 this.person = request.getPerson().toByteArray();
460 this.row = request.getRow().toByteArray();
461 result.clear();
462 result.addAll(request.getResultList());
463 }
464 }
465
466 public static class RowSwapProcessor extends
467 BaseRowProcessor<RowSwapProcessorRequest, RowSwapProcessorResponse> {
468 byte[] row1 = new byte[0];
469 byte[] row2 = new byte[0];
470
471
472
473
474 RowSwapProcessor() {
475 }
476
477 RowSwapProcessor(byte[] row1, byte[] row2) {
478 this.row1 = row1;
479 this.row2 = row2;
480 }
481
482 @Override
483 public Collection<byte[]> getRowsToLock() {
484 List<byte[]> rows = new ArrayList<byte[]>();
485 rows.add(row1);
486 rows.add(row2);
487 return rows;
488 }
489
490 @Override
491 public boolean readOnly() {
492 return false;
493 }
494
495 @Override
496 public RowSwapProcessorResponse getResult() {
497 return RowSwapProcessorResponse.getDefaultInstance();
498 }
499
500 @Override
501 public void process(long now, HRegion region,
502 List<Mutation> mutations, WALEdit walEdit) throws IOException {
503
504
505
506 now = myTimer.getAndIncrement();
507
508
509 List<Cell> kvs1 = new ArrayList<Cell>();
510 List<Cell> kvs2 = new ArrayList<Cell>();
511 doScan(region, new Scan(row1, row1), kvs1);
512 doScan(region, new Scan(row2, row2), kvs2);
513
514
515 if (swapped) {
516 assertEquals(rowSize, kvs2.size());
517 assertEquals(row2Size, kvs1.size());
518 } else {
519 assertEquals(rowSize, kvs1.size());
520 assertEquals(row2Size, kvs2.size());
521 }
522 swapped = !swapped;
523
524
525 List<List<Cell>> kvs = new ArrayList<List<Cell>>();
526 kvs.add(kvs1);
527 kvs.add(kvs2);
528 byte[][] rows = new byte[][]{row1, row2};
529 for (int i = 0; i < kvs.size(); ++i) {
530 for (Cell kv : kvs.get(i)) {
531
532 Delete d = new Delete(rows[i]);
533 KeyValue kvDelete =
534 new KeyValue(rows[i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
535 kv.getTimestamp(), KeyValue.Type.Delete);
536 d.addDeleteMarker(kvDelete);
537 Put p = new Put(rows[1 - i]);
538 KeyValue kvAdd =
539 new KeyValue(rows[1 - i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
540 now, CellUtil.cloneValue(kv));
541 p.add(kvAdd);
542 mutations.add(d);
543 walEdit.add(kvDelete);
544 mutations.add(p);
545 walEdit.add(kvAdd);
546 }
547 }
548 }
549
550 @Override
551 public String getName() {
552 return "swap";
553 }
554
555 @Override
556 public RowSwapProcessorRequest getRequestData() throws IOException {
557 RowSwapProcessorRequest.Builder builder = RowSwapProcessorRequest.newBuilder();
558 builder.setRow1(ByteStringer.wrap(row1));
559 builder.setRow2(ByteStringer.wrap(row2));
560 return builder.build();
561 }
562
563 @Override
564 public void initialize(RowSwapProcessorRequest msg) {
565 this.row1 = msg.getRow1().toByteArray();
566 this.row2 = msg.getRow2().toByteArray();
567 }
568 }
569
570 public static class TimeoutProcessor extends
571 BaseRowProcessor<TimeoutProcessorRequest, TimeoutProcessorResponse> {
572
573 byte[] row = new byte[0];
574
575
576
577
578 public TimeoutProcessor() {
579 }
580
581 public TimeoutProcessor(byte[] row) {
582 this.row = row;
583 }
584
585 public Collection<byte[]> getRowsToLock() {
586 return Collections.singleton(row);
587 }
588
589 @Override
590 public TimeoutProcessorResponse getResult() {
591 return TimeoutProcessorResponse.getDefaultInstance();
592 }
593
594 @Override
595 public void process(long now, HRegion region,
596 List<Mutation> mutations, WALEdit walEdit) throws IOException {
597 try {
598
599 Thread.sleep(100 * 1000L);
600 } catch (Exception e) {
601 throw new IOException(e);
602 }
603 }
604
605 @Override
606 public boolean readOnly() {
607 return true;
608 }
609
610 @Override
611 public String getName() {
612 return "timeout";
613 }
614
615 @Override
616 public TimeoutProcessorRequest getRequestData() throws IOException {
617 TimeoutProcessorRequest.Builder builder = TimeoutProcessorRequest.newBuilder();
618 builder.setRow(ByteStringer.wrap(row));
619 return builder.build();
620 }
621
622 @Override
623 public void initialize(TimeoutProcessorRequest msg) throws IOException {
624 this.row = msg.getRow().toByteArray();
625 }
626 }
627
628 public static void doScan(
629 HRegion region, Scan scan, List<Cell> result) throws IOException {
630 InternalScanner scanner = null;
631 try {
632 scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
633 scanner = region.getScanner(scan);
634 result.clear();
635 scanner.next(result);
636 } finally {
637 if (scanner != null) scanner.close();
638 }
639 }
640 }
641
642 static String stringifyKvs(Collection<Cell> kvs) {
643 StringBuilder out = new StringBuilder();
644 out.append("[");
645 if (kvs != null) {
646 for (Cell kv : kvs) {
647 byte[] col = CellUtil.cloneQualifier(kv);
648 byte[] val = CellUtil.cloneValue(kv);
649 if (Bytes.equals(col, COUNTER)) {
650 out.append(Bytes.toStringBinary(col) + ":" +
651 Bytes.toInt(val) + " ");
652 } else {
653 out.append(Bytes.toStringBinary(col) + ":" +
654 Bytes.toStringBinary(val) + " ");
655 }
656 }
657 }
658 out.append("]");
659 return out.toString();
660 }
661
662 }