1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.coprocessor;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.Cell;
28 import org.apache.hadoop.hbase.CellUtil;
29 import org.apache.hadoop.hbase.Coprocessor;
30 import org.apache.hadoop.hbase.CoprocessorEnvironment;
31 import org.apache.hadoop.hbase.client.Scan;
32 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.ColumnAggregationService;
33 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumRequest;
34 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
35 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
36 import org.apache.hadoop.hbase.regionserver.InternalScanner;
37 import org.apache.hadoop.hbase.util.Bytes;
38
39 import com.google.protobuf.RpcCallback;
40 import com.google.protobuf.RpcController;
41 import com.google.protobuf.Service;
42
43
44
45
46
47 public class ColumnAggregationEndpoint extends ColumnAggregationService
48 implements Coprocessor, CoprocessorService {
49 private static final Log LOG = LogFactory.getLog(ColumnAggregationEndpoint.class);
50 private RegionCoprocessorEnvironment env = null;
51
52 @Override
53 public Service getService() {
54 return this;
55 }
56
57 @Override
58 public void start(CoprocessorEnvironment env) throws IOException {
59 if (env instanceof RegionCoprocessorEnvironment) {
60 this.env = (RegionCoprocessorEnvironment)env;
61 return;
62 }
63 throw new CoprocessorException("Must be loaded on a table region!");
64 }
65
66 @Override
67 public void stop(CoprocessorEnvironment env) throws IOException {
68
69 }
70
71 @Override
72 public void sum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
73
74 Scan scan = new Scan();
75
76 byte [] family = request.getFamily().toByteArray();
77 byte [] qualifier = request.hasQualifier()? request.getQualifier().toByteArray(): null;
78 if (request.hasQualifier()) {
79 scan.addColumn(family, qualifier);
80 } else {
81 scan.addFamily(family);
82 }
83 int sumResult = 0;
84 InternalScanner scanner = null;
85 try {
86 scanner = this.env.getRegion().getScanner(scan);
87 List<Cell> curVals = new ArrayList<Cell>();
88 boolean hasMore = false;
89 do {
90 curVals.clear();
91 hasMore = scanner.next(curVals);
92 for (Cell kv : curVals) {
93 if (CellUtil.matchingQualifier(kv, qualifier)) {
94 sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
95 }
96 }
97 } while (hasMore);
98 } catch (IOException e) {
99 ResponseConverter.setControllerException(controller, e);
100
101 sumResult = -1;
102 LOG.info("Setting sum result to -1 to indicate error", e);
103 } finally {
104 if (scanner != null) {
105 try {
106 scanner.close();
107 } catch (IOException e) {
108 ResponseConverter.setControllerException(controller, e);
109 sumResult = -1;
110 LOG.info("Setting sum result to -1 to indicate error", e);
111 }
112 }
113 }
114 LOG.info("Returning result " + sumResult);
115 done.run(SumResponse.newBuilder().setSum(sumResult).build());
116 }
117 }