1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.coprocessor;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.NavigableSet;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.Coprocessor;
32 import org.apache.hadoop.hbase.CoprocessorEnvironment;
33 import org.apache.hadoop.hbase.client.Scan;
34 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
37 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
38 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
39 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
40 import org.apache.hadoop.hbase.regionserver.InternalScanner;
41
42 import com.google.protobuf.ByteString;
43 import com.google.protobuf.Message;
44 import com.google.protobuf.RpcCallback;
45 import com.google.protobuf.RpcController;
46 import com.google.protobuf.Service;
47
48
49
50
51
52
53
54
55
56
57
58
59
60 @InterfaceAudience.Private
61 public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message>
62 extends AggregateService implements CoprocessorService, Coprocessor {
63 protected static final Log log = LogFactory.getLog(AggregateImplementation.class);
64 private RegionCoprocessorEnvironment env;
65
66
67
68
69
70
71
72
73 @Override
74 public void getMax(RpcController controller, AggregateRequest request,
75 RpcCallback<AggregateResponse> done) {
76 InternalScanner scanner = null;
77 AggregateResponse response = null;
78 T max = null;
79 try {
80 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
81 T temp;
82 Scan scan = ProtobufUtil.toScan(request.getScan());
83 scanner = env.getRegion().getScanner(scan);
84 List<Cell> results = new ArrayList<Cell>();
85 byte[] colFamily = scan.getFamilies()[0];
86 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
87 byte[] qualifier = null;
88 if (qualifiers != null && !qualifiers.isEmpty()) {
89 qualifier = qualifiers.pollFirst();
90 }
91
92 boolean hasMoreRows = false;
93 do {
94 hasMoreRows = scanner.next(results);
95 for (Cell kv : results) {
96 temp = ci.getValue(colFamily, qualifier, kv);
97 max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;
98 }
99 results.clear();
100 } while (hasMoreRows);
101 if (max != null) {
102 AggregateResponse.Builder builder = AggregateResponse.newBuilder();
103 builder.addFirstPart(ci.getProtoForCellType(max).toByteString());
104 response = builder.build();
105 }
106 } catch (IOException e) {
107 ResponseConverter.setControllerException(controller, e);
108 } finally {
109 if (scanner != null) {
110 try {
111 scanner.close();
112 } catch (IOException ignored) {}
113 }
114 }
115 log.info("Maximum from this region is "
116 + env.getRegion().getRegionNameAsString() + ": " + max);
117 done.run(response);
118 }
119
120
121
122
123
124
125
126
127 @Override
128 public void getMin(RpcController controller, AggregateRequest request,
129 RpcCallback<AggregateResponse> done) {
130 AggregateResponse response = null;
131 InternalScanner scanner = null;
132 T min = null;
133 try {
134 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
135 T temp;
136 Scan scan = ProtobufUtil.toScan(request.getScan());
137 scanner = env.getRegion().getScanner(scan);
138 List<Cell> results = new ArrayList<Cell>();
139 byte[] colFamily = scan.getFamilies()[0];
140 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
141 byte[] qualifier = null;
142 if (qualifiers != null && !qualifiers.isEmpty()) {
143 qualifier = qualifiers.pollFirst();
144 }
145 boolean hasMoreRows = false;
146 do {
147 hasMoreRows = scanner.next(results);
148 for (Cell kv : results) {
149 temp = ci.getValue(colFamily, qualifier, kv);
150 min = (min == null || (temp != null && ci.compare(temp, min) < 0)) ? temp : min;
151 }
152 results.clear();
153 } while (hasMoreRows);
154 if (min != null) {
155 response = AggregateResponse.newBuilder().addFirstPart(
156 ci.getProtoForCellType(min).toByteString()).build();
157 }
158 } catch (IOException e) {
159 ResponseConverter.setControllerException(controller, e);
160 } finally {
161 if (scanner != null) {
162 try {
163 scanner.close();
164 } catch (IOException ignored) {}
165 }
166 }
167 log.info("Minimum from this region is "
168 + env.getRegion().getRegionNameAsString() + ": " + min);
169 done.run(response);
170 }
171
172
173
174
175
176
177
178
179 @Override
180 public void getSum(RpcController controller, AggregateRequest request,
181 RpcCallback<AggregateResponse> done) {
182 AggregateResponse response = null;
183 InternalScanner scanner = null;
184 long sum = 0l;
185 try {
186 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
187 S sumVal = null;
188 T temp;
189 Scan scan = ProtobufUtil.toScan(request.getScan());
190 scanner = env.getRegion().getScanner(scan);
191 byte[] colFamily = scan.getFamilies()[0];
192 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
193 byte[] qualifier = null;
194 if (qualifiers != null && !qualifiers.isEmpty()) {
195 qualifier = qualifiers.pollFirst();
196 }
197 List<Cell> results = new ArrayList<Cell>();
198 boolean hasMoreRows = false;
199 do {
200 hasMoreRows = scanner.next(results);
201 for (Cell kv : results) {
202 temp = ci.getValue(colFamily, qualifier, kv);
203 if (temp != null)
204 sumVal = ci.add(sumVal, ci.castToReturnType(temp));
205 }
206 results.clear();
207 } while (hasMoreRows);
208 if (sumVal != null) {
209 response = AggregateResponse.newBuilder().addFirstPart(
210 ci.getProtoForPromotedType(sumVal).toByteString()).build();
211 }
212 } catch (IOException e) {
213 ResponseConverter.setControllerException(controller, e);
214 } finally {
215 if (scanner != null) {
216 try {
217 scanner.close();
218 } catch (IOException ignored) {}
219 }
220 }
221 log.debug("Sum from this region is "
222 + env.getRegion().getRegionNameAsString() + ": " + sum);
223 done.run(response);
224 }
225
226
227
228
229
230
231 @Override
232 public void getRowNum(RpcController controller, AggregateRequest request,
233 RpcCallback<AggregateResponse> done) {
234 AggregateResponse response = null;
235 long counter = 0l;
236 List<Cell> results = new ArrayList<Cell>();
237 InternalScanner scanner = null;
238 try {
239 Scan scan = ProtobufUtil.toScan(request.getScan());
240 byte[][] colFamilies = scan.getFamilies();
241 byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
242 NavigableSet<byte[]> qualifiers = colFamilies != null ?
243 scan.getFamilyMap().get(colFamily) : null;
244 byte[] qualifier = null;
245 if (qualifiers != null && !qualifiers.isEmpty()) {
246 qualifier = qualifiers.pollFirst();
247 }
248 if (scan.getFilter() == null && qualifier == null)
249 scan.setFilter(new FirstKeyOnlyFilter());
250 scanner = env.getRegion().getScanner(scan);
251 boolean hasMoreRows = false;
252 do {
253 hasMoreRows = scanner.next(results);
254 if (results.size() > 0) {
255 counter++;
256 }
257 results.clear();
258 } while (hasMoreRows);
259 ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter);
260 bb.rewind();
261 response = AggregateResponse.newBuilder().addFirstPart(
262 ByteString.copyFrom(bb)).build();
263 } catch (IOException e) {
264 ResponseConverter.setControllerException(controller, e);
265 } finally {
266 if (scanner != null) {
267 try {
268 scanner.close();
269 } catch (IOException ignored) {}
270 }
271 }
272 log.info("Row counter from this region is "
273 + env.getRegion().getRegionNameAsString() + ": " + counter);
274 done.run(response);
275 }
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290 @Override
291 public void getAvg(RpcController controller, AggregateRequest request,
292 RpcCallback<AggregateResponse> done) {
293 AggregateResponse response = null;
294 InternalScanner scanner = null;
295 try {
296 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
297 S sumVal = null;
298 Long rowCountVal = 0l;
299 Scan scan = ProtobufUtil.toScan(request.getScan());
300 scanner = env.getRegion().getScanner(scan);
301 byte[] colFamily = scan.getFamilies()[0];
302 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
303 byte[] qualifier = null;
304 if (qualifiers != null && !qualifiers.isEmpty()) {
305 qualifier = qualifiers.pollFirst();
306 }
307 List<Cell> results = new ArrayList<Cell>();
308 boolean hasMoreRows = false;
309
310 do {
311 results.clear();
312 hasMoreRows = scanner.next(results);
313 for (Cell kv : results) {
314 sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
315 qualifier, kv)));
316 }
317 rowCountVal++;
318 } while (hasMoreRows);
319 if (sumVal != null) {
320 ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
321 AggregateResponse.Builder pair = AggregateResponse.newBuilder();
322 pair.addFirstPart(first);
323 ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
324 bb.rewind();
325 pair.setSecondPart(ByteString.copyFrom(bb));
326 response = pair.build();
327 }
328 } catch (IOException e) {
329 ResponseConverter.setControllerException(controller, e);
330 } finally {
331 if (scanner != null) {
332 try {
333 scanner.close();
334 } catch (IOException ignored) {}
335 }
336 }
337 done.run(response);
338 }
339
340
341
342
343
344
345
346
347
348
349 @Override
350 public void getStd(RpcController controller, AggregateRequest request,
351 RpcCallback<AggregateResponse> done) {
352 InternalScanner scanner = null;
353 AggregateResponse response = null;
354 try {
355 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
356 S sumVal = null, sumSqVal = null, tempVal = null;
357 long rowCountVal = 0l;
358 Scan scan = ProtobufUtil.toScan(request.getScan());
359 scanner = env.getRegion().getScanner(scan);
360 byte[] colFamily = scan.getFamilies()[0];
361 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
362 byte[] qualifier = null;
363 if (qualifiers != null && !qualifiers.isEmpty()) {
364 qualifier = qualifiers.pollFirst();
365 }
366 List<Cell> results = new ArrayList<Cell>();
367
368 boolean hasMoreRows = false;
369
370 do {
371 tempVal = null;
372 hasMoreRows = scanner.next(results);
373 for (Cell kv : results) {
374 tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
375 qualifier, kv)));
376 }
377 results.clear();
378 sumVal = ci.add(sumVal, tempVal);
379 sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
380 rowCountVal++;
381 } while (hasMoreRows);
382 if (sumVal != null) {
383 ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
384 ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal).toByteString();
385 AggregateResponse.Builder pair = AggregateResponse.newBuilder();
386 pair.addFirstPart(first_sumVal);
387 pair.addFirstPart(first_sumSqVal);
388 ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
389 bb.rewind();
390 pair.setSecondPart(ByteString.copyFrom(bb));
391 response = pair.build();
392 }
393 } catch (IOException e) {
394 ResponseConverter.setControllerException(controller, e);
395 } finally {
396 if (scanner != null) {
397 try {
398 scanner.close();
399 } catch (IOException ignored) {}
400 }
401 }
402 done.run(response);
403 }
404
405
406
407
408
409
410
411
412
413 @Override
414 public void getMedian(RpcController controller, AggregateRequest request,
415 RpcCallback<AggregateResponse> done) {
416 AggregateResponse response = null;
417 InternalScanner scanner = null;
418 try {
419 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
420 S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
421 Scan scan = ProtobufUtil.toScan(request.getScan());
422 scanner = env.getRegion().getScanner(scan);
423 byte[] colFamily = scan.getFamilies()[0];
424 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
425 byte[] valQualifier = null, weightQualifier = null;
426 if (qualifiers != null && !qualifiers.isEmpty()) {
427 valQualifier = qualifiers.pollFirst();
428
429 weightQualifier = qualifiers.pollLast();
430 }
431 List<Cell> results = new ArrayList<Cell>();
432
433 boolean hasMoreRows = false;
434
435 do {
436 tempVal = null;
437 tempWeight = null;
438 hasMoreRows = scanner.next(results);
439 for (Cell kv : results) {
440 tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
441 valQualifier, kv)));
442 if (weightQualifier != null) {
443 tempWeight = ci.add(tempWeight,
444 ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv)));
445 }
446 }
447 results.clear();
448 sumVal = ci.add(sumVal, tempVal);
449 sumWeights = ci.add(sumWeights, tempWeight);
450 } while (hasMoreRows);
451 ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
452 S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights;
453 ByteString first_sumWeights = ci.getProtoForPromotedType(s).toByteString();
454 AggregateResponse.Builder pair = AggregateResponse.newBuilder();
455 pair.addFirstPart(first_sumVal);
456 pair.addFirstPart(first_sumWeights);
457 response = pair.build();
458 } catch (IOException e) {
459 ResponseConverter.setControllerException(controller, e);
460 } finally {
461 if (scanner != null) {
462 try {
463 scanner.close();
464 } catch (IOException ignored) {}
465 }
466 }
467 done.run(response);
468 }
469
470 @SuppressWarnings("unchecked")
471 ColumnInterpreter<T,S,P,Q,R> constructColumnInterpreterFromRequest(
472 AggregateRequest request) throws IOException {
473 String className = request.getInterpreterClassName();
474 Class<?> cls;
475 try {
476 cls = Class.forName(className);
477 ColumnInterpreter<T,S,P,Q,R> ci = (ColumnInterpreter<T, S, P, Q, R>) cls.newInstance();
478 if (request.hasInterpreterSpecificBytes()) {
479 ByteString b = request.getInterpreterSpecificBytes();
480 P initMsg = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 2, b);
481 ci.initialize(initMsg);
482 }
483 return ci;
484 } catch (ClassNotFoundException e) {
485 throw new IOException(e);
486 } catch (InstantiationException e) {
487 throw new IOException(e);
488 } catch (IllegalAccessException e) {
489 throw new IOException(e);
490 }
491 }
492
493 @Override
494 public Service getService() {
495 return this;
496 }
497
498
499
500
501
502
503
504
505
506
507
508 @Override
509 public void start(CoprocessorEnvironment env) throws IOException {
510 if (env instanceof RegionCoprocessorEnvironment) {
511 this.env = (RegionCoprocessorEnvironment)env;
512 } else {
513 throw new CoprocessorException("Must be loaded on a table region!");
514 }
515 }
516
517 @Override
518 public void stop(CoprocessorEnvironment env) throws IOException {
519
520 }
521
522 }