1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.coprocessor;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.List;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.Cell;
27 import org.apache.hadoop.hbase.CellUtil;
28 import org.apache.hadoop.hbase.Coprocessor;
29 import org.apache.hadoop.hbase.CoprocessorEnvironment;
30 import org.apache.hadoop.hbase.DoNotRetryIOException;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.client.Scan;
33 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos;
34 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.SumRequest;
35 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.SumResponse;
36 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
37 import org.apache.hadoop.hbase.regionserver.InternalScanner;
38 import org.apache.hadoop.hbase.regionserver.Region;
39 import org.apache.hadoop.hbase.util.Bytes;
40
41 import com.google.protobuf.RpcCallback;
42 import com.google.protobuf.RpcController;
43 import com.google.protobuf.Service;
44
45
46
47
48
49
50 public class ColumnAggregationEndpointWithErrors
51 extends
52 ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors
53 implements Coprocessor, CoprocessorService {
54 private static final Log LOG = LogFactory.getLog(ColumnAggregationEndpointWithErrors.class);
55 private RegionCoprocessorEnvironment env = null;
56 @Override
57 public Service getService() {
58 return this;
59 }
60
61 @Override
62 public void start(CoprocessorEnvironment env) throws IOException {
63 if (env instanceof RegionCoprocessorEnvironment) {
64 this.env = (RegionCoprocessorEnvironment)env;
65 return;
66 }
67 throw new CoprocessorException("Must be loaded on a table region!");
68 }
69
70 @Override
71 public void stop(CoprocessorEnvironment env) throws IOException {
72
73 }
74
75 @Override
76 public void sum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
77
78 Scan scan = new Scan();
79
80 byte[] family = request.getFamily().toByteArray();
81 byte[] qualifier = request.hasQualifier() ? request.getQualifier().toByteArray() : null;
82 if (request.hasQualifier()) {
83 scan.addColumn(family, qualifier);
84 } else {
85 scan.addFamily(family);
86 }
87 int sumResult = 0;
88 InternalScanner scanner = null;
89 try {
90 Region region = this.env.getRegion();
91
92 if (Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)) {
93 throw new DoNotRetryIOException("An expected exception");
94 }
95 scanner = region.getScanner(scan);
96 List<Cell> curVals = new ArrayList<Cell>();
97 boolean hasMore = false;
98 do {
99 curVals.clear();
100 hasMore = scanner.next(curVals);
101 for (Cell kv : curVals) {
102 if (CellUtil.matchingQualifier(kv, qualifier)) {
103 sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
104 }
105 }
106 } while (hasMore);
107 } catch (IOException e) {
108 ResponseConverter.setControllerException(controller, e);
109
110 sumResult = -1;
111 LOG.info("Setting sum result to -1 to indicate error", e);
112 } finally {
113 if (scanner != null) {
114 try {
115 scanner.close();
116 } catch (IOException e) {
117 ResponseConverter.setControllerException(controller, e);
118 sumResult = -1;
119 LOG.info("Setting sum result to -1 to indicate error", e);
120 }
121 }
122 }
123 done.run(SumResponse.newBuilder().setSum(sumResult).build());
124 }
125 }