1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.BindException;
24 import java.net.InetSocketAddress;
25 import java.net.UnknownHostException;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Map.Entry;
33 import java.util.NavigableMap;
34 import java.util.Set;
35 import java.util.TreeSet;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.atomic.AtomicLong;
38
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.hbase.Cell;
43 import org.apache.hadoop.hbase.CellScannable;
44 import org.apache.hadoop.hbase.CellScanner;
45 import org.apache.hadoop.hbase.CellUtil;
46 import org.apache.hadoop.hbase.DoNotRetryIOException;
47 import org.apache.hadoop.hbase.DroppedSnapshotException;
48 import org.apache.hadoop.hbase.HBaseIOException;
49 import org.apache.hadoop.hbase.HConstants;
50 import org.apache.hadoop.hbase.HRegionInfo;
51 import org.apache.hadoop.hbase.HTableDescriptor;
52 import org.apache.hadoop.hbase.MetaTableAccessor;
53 import org.apache.hadoop.hbase.MultiActionResultTooLarge;
54 import org.apache.hadoop.hbase.NotServingRegionException;
55 import org.apache.hadoop.hbase.ServerName;
56 import org.apache.hadoop.hbase.TableName;
57 import org.apache.hadoop.hbase.UnknownScannerException;
58 import org.apache.hadoop.hbase.classification.InterfaceAudience;
59 import org.apache.hadoop.hbase.client.Append;
60 import org.apache.hadoop.hbase.client.ConnectionUtils;
61 import org.apache.hadoop.hbase.client.Delete;
62 import org.apache.hadoop.hbase.client.Durability;
63 import org.apache.hadoop.hbase.client.Get;
64 import org.apache.hadoop.hbase.client.Increment;
65 import org.apache.hadoop.hbase.client.Mutation;
66 import org.apache.hadoop.hbase.client.Put;
67 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
68 import org.apache.hadoop.hbase.client.Result;
69 import org.apache.hadoop.hbase.client.RowMutations;
70 import org.apache.hadoop.hbase.client.Scan;
71 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
72 import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
73 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
74 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
75 import org.apache.hadoop.hbase.exceptions.MergeRegionException;
76 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
77 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
78 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
79 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
80 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
81 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
82 import org.apache.hadoop.hbase.ipc.PriorityFunction;
83 import org.apache.hadoop.hbase.ipc.QosPriority;
84 import org.apache.hadoop.hbase.ipc.RpcCallContext;
85 import org.apache.hadoop.hbase.ipc.RpcServer;
86 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
87 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
88 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
89 import org.apache.hadoop.hbase.ipc.ServerRpcController;
90 import org.apache.hadoop.hbase.master.MasterRpcServices;
91 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
92 import org.apache.hadoop.hbase.protobuf.RequestConverter;
93 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
94 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
95 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
96 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
97 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
98 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
99 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
100 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
101 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
102 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
103 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
104 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
105 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
106 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
107 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
108 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
109 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
110 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
111 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
112 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
113 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
114 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
115 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
119 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
120 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
121 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
123 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
125 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
128 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
130 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
131 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
132 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
135 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
136 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
137 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
139 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
145 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
146 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
147 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
148 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
149 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
150 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
151 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
152 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
153 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
154 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
155 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
156 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics;
157 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
158 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
159 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
160 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
161 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
162 import org.apache.hadoop.hbase.quotas.OperationQuota;
163 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
164 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
165 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
166 import org.apache.hadoop.hbase.regionserver.Region.Operation;
167 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
168 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
169 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
170 import org.apache.hadoop.hbase.wal.WAL;
171 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
172 import org.apache.hadoop.hbase.security.User;
173 import org.apache.hadoop.hbase.util.Bytes;
174 import org.apache.hadoop.hbase.util.Counter;
175 import org.apache.hadoop.hbase.util.DNS;
176 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
177 import org.apache.hadoop.hbase.util.Pair;
178 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
179 import org.apache.hadoop.hbase.util.Strings;
180 import org.apache.hadoop.hbase.wal.WALKey;
181 import org.apache.hadoop.hbase.wal.WALSplitter;
182 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
183 import org.apache.zookeeper.KeeperException;
184
185 import com.google.common.annotations.VisibleForTesting;
186 import com.google.protobuf.ByteString;
187 import com.google.protobuf.Message;
188 import com.google.protobuf.RpcController;
189 import com.google.protobuf.ServiceException;
190 import com.google.protobuf.TextFormat;
191
192
193
194
195 @InterfaceAudience.Private
196 @SuppressWarnings("deprecation")
197 public class RSRpcServices implements HBaseRPCErrorHandler,
198 AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
199 ConfigurationObserver {
200 protected static final Log LOG = LogFactory.getLog(RSRpcServices.class);
201
202
203 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
204 "hbase.region.server.rpc.scheduler.factory.class";
205
206
207
208
209
210
211 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
212 "hbase.region.server.rpc.minimum.scan.time.limit.delta";
213
214
215
216 private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
217
218
219 final Counter requestCount = new Counter();
220
221 final RpcServerInterface rpcServer;
222 final InetSocketAddress isa;
223
224 private final HRegionServer regionServer;
225 private final long maxScannerResultSize;
226
227
228 private final PriorityFunction priority;
229
230 private final AtomicLong scannerIdGen = new AtomicLong(0L);
231 private final ConcurrentHashMap<String, RegionScannerHolder> scanners =
232 new ConcurrentHashMap<String, RegionScannerHolder>();
233
234
235
236
237 private final int scannerLeaseTimeoutPeriod;
238
239
240
241
242 private final int rpcTimeout;
243
244
245
246
247 private final long minimumScanTimeLimitDelta;
248
249
250
251
252 private static class RegionScannerHolder {
253 private AtomicLong nextCallSeq = new AtomicLong(0);
254 private RegionScanner s;
255 private Region r;
256
257 public RegionScannerHolder(RegionScanner s, Region r) {
258 this.s = s;
259 this.r = r;
260 }
261
262 private long getNextCallSeq() {
263 return nextCallSeq.get();
264 }
265
266 private void incNextCallSeq() {
267 nextCallSeq.incrementAndGet();
268 }
269
270 private void rollbackNextCallSeq() {
271 nextCallSeq.decrementAndGet();
272 }
273 }
274
275
276
277
278
279 private class ScannerListener implements LeaseListener {
280 private final String scannerName;
281
282 ScannerListener(final String n) {
283 this.scannerName = n;
284 }
285
286 @Override
287 public void leaseExpired() {
288 RegionScannerHolder rsh = scanners.remove(this.scannerName);
289 if (rsh != null) {
290 RegionScanner s = rsh.s;
291 LOG.info("Scanner " + this.scannerName + " lease expired on region "
292 + s.getRegionInfo().getRegionNameAsString());
293 try {
294 Region region = regionServer.getRegion(s.getRegionInfo().getRegionName());
295 if (region != null && region.getCoprocessorHost() != null) {
296 region.getCoprocessorHost().preScannerClose(s);
297 }
298
299 s.close();
300 if (region != null && region.getCoprocessorHost() != null) {
301 region.getCoprocessorHost().postScannerClose(s);
302 }
303 } catch (IOException e) {
304 LOG.error("Closing scanner for "
305 + s.getRegionInfo().getRegionNameAsString(), e);
306 }
307 } else {
308 LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
309 " scanner found, hence no chance to close that related scanner!");
310 }
311 }
312 }
313
314 private static ResultOrException getResultOrException(
315 final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) {
316 return getResultOrException(ResponseConverter.buildActionResult(r, stats), index);
317 }
318
319 private static ResultOrException getResultOrException(final Exception e, final int index) {
320 return getResultOrException(ResponseConverter.buildActionResult(e), index);
321 }
322
323 private static ResultOrException getResultOrException(
324 final ResultOrException.Builder builder, final int index) {
325 return builder.setIndex(index).build();
326 }
327
328
329
330
331
332
333
334 private long startNonceOperation(final MutationProto mutation, long nonceGroup)
335 throws IOException, OperationConflictException {
336 if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
337 boolean canProceed = false;
338 try {
339 canProceed = regionServer.nonceManager.startOperation(
340 nonceGroup, mutation.getNonce(), regionServer);
341 } catch (InterruptedException ex) {
342 throw new InterruptedIOException("Nonce start operation interrupted");
343 }
344 if (!canProceed) {
345
346 String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
347 + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
348 + "] may have already completed";
349 throw new OperationConflictException(message);
350 }
351 return mutation.getNonce();
352 }
353
354
355
356
357
358
359
360 private void endNonceOperation(final MutationProto mutation,
361 long nonceGroup, boolean success) {
362 if (regionServer.nonceManager != null && mutation.hasNonce()) {
363 regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
364 }
365 }
366
367
368
369
370 private boolean isClientCellBlockSupport() {
371 RpcCallContext context = RpcServer.getCurrentCall();
372 return context != null && context.isClientCellBlockSupported();
373 }
374
375 private void addResult(final MutateResponse.Builder builder,
376 final Result result, final PayloadCarryingRpcController rpcc) {
377 if (result == null) return;
378 if (isClientCellBlockSupport()) {
379 builder.setResult(ProtobufUtil.toResultNoData(result));
380 rpcc.setCellScanner(result.cellScanner());
381 } else {
382 ClientProtos.Result pbr = ProtobufUtil.toResult(result);
383 builder.setResult(pbr);
384 }
385 }
386
387 private void addResults(final ScanResponse.Builder builder, final List<Result> results,
388 final RpcController controller, boolean isDefaultRegion) {
389 builder.setStale(!isDefaultRegion);
390 if (results == null || results.isEmpty()) return;
391 if (isClientCellBlockSupport()) {
392 for (Result res : results) {
393 builder.addCellsPerResult(res.size());
394 builder.addPartialFlagPerResult(res.isPartial());
395 }
396 ((PayloadCarryingRpcController)controller).
397 setCellScanner(CellUtil.createCellScanner(results));
398 } else {
399 for (Result res: results) {
400 ClientProtos.Result pbr = ProtobufUtil.toResult(res);
401 builder.addResults(pbr);
402 }
403 }
404 }
405
406
407
408
409
410
411
412
413
414 private ClientProtos.RegionLoadStats mutateRows(final Region region,
415 final List<ClientProtos.Action> actions,
416 final CellScanner cellScanner) throws IOException {
417 if (!region.getRegionInfo().isMetaTable()) {
418 regionServer.cacheFlusher.reclaimMemStoreMemory();
419 }
420 RowMutations rm = null;
421 for (ClientProtos.Action action: actions) {
422 if (action.hasGet()) {
423 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
424 action.getGet());
425 }
426 MutationType type = action.getMutation().getMutateType();
427 if (rm == null) {
428 rm = new RowMutations(action.getMutation().getRow().toByteArray());
429 }
430 switch (type) {
431 case PUT:
432 rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
433 break;
434 case DELETE:
435 rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
436 break;
437 default:
438 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
439 }
440 }
441 region.mutateRow(rm);
442 return ((HRegion)region).getRegionStats();
443 }
444
445
446
447
448
449
450
451
452
453
454
455
456
457 private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
458 final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
459 CompareOp compareOp, ByteArrayComparable comparator) throws IOException {
460 if (!region.getRegionInfo().isMetaTable()) {
461 regionServer.cacheFlusher.reclaimMemStoreMemory();
462 }
463 RowMutations rm = null;
464 for (ClientProtos.Action action: actions) {
465 if (action.hasGet()) {
466 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
467 action.getGet());
468 }
469 MutationType type = action.getMutation().getMutateType();
470 if (rm == null) {
471 rm = new RowMutations(action.getMutation().getRow().toByteArray());
472 }
473 switch (type) {
474 case PUT:
475 rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
476 break;
477 case DELETE:
478 rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
479 break;
480 default:
481 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
482 }
483 }
484 return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
485 }
486
487
488
489
490
491
492
493
494
495
496
497 private Result append(final Region region, final OperationQuota quota, final MutationProto m,
498 final CellScanner cellScanner, long nonceGroup) throws IOException {
499 long before = EnvironmentEdgeManager.currentTime();
500 Append append = ProtobufUtil.toAppend(m, cellScanner);
501 quota.addMutation(append);
502 Result r = null;
503 if (region.getCoprocessorHost() != null) {
504 r = region.getCoprocessorHost().preAppend(append);
505 }
506 if (r == null) {
507 long nonce = startNonceOperation(m, nonceGroup);
508 boolean success = false;
509 try {
510 r = region.append(append, nonceGroup, nonce);
511 success = true;
512 } finally {
513 endNonceOperation(m, nonceGroup, success);
514 }
515 if (region.getCoprocessorHost() != null) {
516 region.getCoprocessorHost().postAppend(append, r);
517 }
518 }
519 if (regionServer.metricsRegionServer != null) {
520 regionServer.metricsRegionServer.updateAppend(
521 EnvironmentEdgeManager.currentTime() - before);
522 }
523 return r;
524 }
525
526
527
528
529
530
531
532
533
534 private Result increment(final Region region, final OperationQuota quota,
535 final MutationProto mutation, final CellScanner cells, long nonceGroup) throws IOException {
536 long before = EnvironmentEdgeManager.currentTime();
537 Increment increment = ProtobufUtil.toIncrement(mutation, cells);
538 quota.addMutation(increment);
539 Result r = null;
540 if (region.getCoprocessorHost() != null) {
541 r = region.getCoprocessorHost().preIncrement(increment);
542 }
543 if (r == null) {
544 long nonce = startNonceOperation(mutation, nonceGroup);
545 boolean success = false;
546 try {
547 r = region.increment(increment, nonceGroup, nonce);
548 success = true;
549 } finally {
550 endNonceOperation(mutation, nonceGroup, success);
551 }
552 if (region.getCoprocessorHost() != null) {
553 r = region.getCoprocessorHost().postIncrement(increment, r);
554 }
555 }
556 if (regionServer.metricsRegionServer != null) {
557 regionServer.metricsRegionServer.updateIncrement(
558 EnvironmentEdgeManager.currentTime() - before);
559 }
560 return r;
561 }
562
563
564
565
566
567
568
569
570
571
572
573
574 private List<CellScannable> doNonAtomicRegionMutation(final Region region,
575 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
576 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
577
578
579
580
581 List<ClientProtos.Action> mutations = null;
582 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
583 RpcCallContext context = RpcServer.getCurrentCall();
584 IOException sizeIOE = null;
585 Object lastBlock = null;
586 for (ClientProtos.Action action : actions.getActionList()) {
587 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
588 try {
589 Result r = null;
590
591 if (context != null
592 && context.isRetryImmediatelySupported()
593 && (context.getResponseCellSize() > maxQuotaResultSize
594 || context.getResponseBlockSize() > maxQuotaResultSize)) {
595
596
597
598 if (sizeIOE == null ) {
599
600
601
602
603 sizeIOE = new MultiActionResultTooLarge("Max size exceeded"
604 + " CellSize: " + context.getResponseCellSize()
605 + " BlockSize: " + context.getResponseBlockSize());
606
607
608
609 rpcServer.getMetrics().exception(sizeIOE);
610 }
611
612
613
614
615
616 resultOrExceptionBuilder = ResultOrException.newBuilder().
617 setException(ResponseConverter.buildException(sizeIOE));
618 resultOrExceptionBuilder.setIndex(action.getIndex());
619 builder.addResultOrException(resultOrExceptionBuilder.build());
620 if (cellScanner != null) {
621 skipCellsForMutation(action, cellScanner);
622 }
623 continue;
624 }
625 if (action.hasGet()) {
626 long before = EnvironmentEdgeManager.currentTime();
627 try {
628 Get get = ProtobufUtil.toGet(action.getGet());
629 r = region.get(get);
630 } finally {
631 if (regionServer.metricsRegionServer != null) {
632 regionServer.metricsRegionServer.updateGet(
633 EnvironmentEdgeManager.currentTime() - before);
634 }
635 }
636 } else if (action.hasServiceCall()) {
637 resultOrExceptionBuilder = ResultOrException.newBuilder();
638 try {
639 Message result = execServiceOnRegion(region, action.getServiceCall());
640 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
641 ClientProtos.CoprocessorServiceResult.newBuilder();
642 resultOrExceptionBuilder.setServiceResult(
643 serviceResultBuilder.setValue(
644 serviceResultBuilder.getValueBuilder()
645 .setName(result.getClass().getName())
646 .setValue(result.toByteString())));
647 } catch (IOException ioe) {
648 rpcServer.getMetrics().exception(ioe);
649 resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
650 }
651 } else if (action.hasMutation()) {
652 MutationType type = action.getMutation().getMutateType();
653 if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
654 !mutations.isEmpty()) {
655
656 doBatchOp(builder, region, quota, mutations, cellScanner);
657 mutations.clear();
658 }
659 switch (type) {
660 case APPEND:
661 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
662 break;
663 case INCREMENT:
664 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup);
665 break;
666 case PUT:
667 case DELETE:
668
669 if (mutations == null) {
670 mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
671 }
672 mutations.add(action);
673 break;
674 default:
675 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
676 }
677 } else {
678 throw new HBaseIOException("Unexpected Action type");
679 }
680 if (r != null) {
681 ClientProtos.Result pbResult = null;
682 if (isClientCellBlockSupport()) {
683 pbResult = ProtobufUtil.toResultNoData(r);
684
685 if (cellsToReturn == null) {
686 cellsToReturn = new ArrayList<CellScannable>();
687 }
688 cellsToReturn.add(r);
689 } else {
690 pbResult = ProtobufUtil.toResult(r);
691 }
692 lastBlock = addSize(context, r, lastBlock);
693 resultOrExceptionBuilder =
694 ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
695 }
696
697
698
699
700 } catch (IOException ie) {
701 rpcServer.getMetrics().exception(ie);
702 resultOrExceptionBuilder = ResultOrException.newBuilder().
703 setException(ResponseConverter.buildException(ie));
704 }
705 if (resultOrExceptionBuilder != null) {
706
707 resultOrExceptionBuilder.setIndex(action.getIndex());
708 builder.addResultOrException(resultOrExceptionBuilder.build());
709 }
710 }
711
712 if (mutations != null && !mutations.isEmpty()) {
713 doBatchOp(builder, region, quota, mutations, cellScanner);
714 }
715 return cellsToReturn;
716 }
717
718
719
720
721
722
723
724
725 private void doBatchOp(final RegionActionResult.Builder builder, final Region region,
726 final OperationQuota quota,
727 final List<ClientProtos.Action> mutations, final CellScanner cells) {
728 Mutation[] mArray = new Mutation[mutations.size()];
729 long before = EnvironmentEdgeManager.currentTime();
730 boolean batchContainsPuts = false, batchContainsDelete = false;
731 try {
732 int i = 0;
733 for (ClientProtos.Action action: mutations) {
734 MutationProto m = action.getMutation();
735 Mutation mutation;
736 if (m.getMutateType() == MutationType.PUT) {
737 mutation = ProtobufUtil.toPut(m, cells);
738 batchContainsPuts = true;
739 } else {
740 mutation = ProtobufUtil.toDelete(m, cells);
741 batchContainsDelete = true;
742 }
743 mArray[i++] = mutation;
744 quota.addMutation(mutation);
745 }
746
747 if (!region.getRegionInfo().isMetaTable()) {
748 regionServer.cacheFlusher.reclaimMemStoreMemory();
749 }
750
751 OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE,
752 HConstants.NO_NONCE);
753 for (i = 0; i < codes.length; i++) {
754 int index = mutations.get(i).getIndex();
755 Exception e = null;
756 switch (codes[i].getOperationStatusCode()) {
757 case BAD_FAMILY:
758 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
759 builder.addResultOrException(getResultOrException(e, index));
760 break;
761
762 case SANITY_CHECK_FAILURE:
763 e = new FailedSanityCheckException(codes[i].getExceptionMsg());
764 builder.addResultOrException(getResultOrException(e, index));
765 break;
766
767 default:
768 e = new DoNotRetryIOException(codes[i].getExceptionMsg());
769 builder.addResultOrException(getResultOrException(e, index));
770 break;
771
772 case SUCCESS:
773 builder.addResultOrException(getResultOrException(
774 ClientProtos.Result.getDefaultInstance(), index,
775 ((HRegion) region).getRegionStats()));
776 break;
777 }
778 }
779 } catch (IOException ie) {
780 for (int i = 0; i < mutations.size(); i++) {
781 builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
782 }
783 }
784 if (regionServer.metricsRegionServer != null) {
785 long after = EnvironmentEdgeManager.currentTime();
786 if (batchContainsPuts) {
787 regionServer.metricsRegionServer.updatePut(after - before);
788 }
789 if (batchContainsDelete) {
790 regionServer.metricsRegionServer.updateDelete(after - before);
791 }
792 }
793 }
794
795
796
797
798
799
800
801
802
803
804
805 private OperationStatus [] doReplayBatchOp(final Region region,
806 final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
807 long before = EnvironmentEdgeManager.currentTime();
808 boolean batchContainsPuts = false, batchContainsDelete = false;
809 try {
810 for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
811 WALSplitter.MutationReplay m = it.next();
812
813 if (m.type == MutationType.PUT) {
814 batchContainsPuts = true;
815 } else {
816 batchContainsDelete = true;
817 }
818
819 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
820 List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
821 if (metaCells != null && !metaCells.isEmpty()) {
822 for (Cell metaCell : metaCells) {
823 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
824 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
825 HRegion hRegion = (HRegion)region;
826 if (compactionDesc != null) {
827
828
829 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
830 replaySeqId);
831 continue;
832 }
833 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
834 if (flushDesc != null && !isDefaultReplica) {
835 hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
836 continue;
837 }
838 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
839 if (regionEvent != null && !isDefaultReplica) {
840 hRegion.replayWALRegionEventMarker(regionEvent);
841 continue;
842 }
843 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
844 if (bulkLoadEvent != null) {
845 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
846 continue;
847 }
848 }
849 it.remove();
850 }
851 }
852 requestCount.add(mutations.size());
853 if (!region.getRegionInfo().isMetaTable()) {
854 regionServer.cacheFlusher.reclaimMemStoreMemory();
855 }
856 return region.batchReplay(mutations.toArray(
857 new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
858 } finally {
859 if (regionServer.metricsRegionServer != null) {
860 long after = EnvironmentEdgeManager.currentTime();
861 if (batchContainsPuts) {
862 regionServer.metricsRegionServer.updatePut(after - before);
863 }
864 if (batchContainsDelete) {
865 regionServer.metricsRegionServer.updateDelete(after - before);
866 }
867 }
868 }
869 }
870
871 private void closeAllScanners() {
872
873
874 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
875 try {
876 e.getValue().s.close();
877 } catch (IOException ioe) {
878 LOG.warn("Closing scanner " + e.getKey(), ioe);
879 }
880 }
881 }
882
883 public RSRpcServices(HRegionServer rs) throws IOException {
884 regionServer = rs;
885
886 RpcSchedulerFactory rpcSchedulerFactory;
887 try {
888 Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
889 REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
890 SimpleRpcSchedulerFactory.class);
891 rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
892 } catch (InstantiationException e) {
893 throw new IllegalArgumentException(e);
894 } catch (IllegalAccessException e) {
895 throw new IllegalArgumentException(e);
896 }
897
898 InetSocketAddress initialIsa;
899 InetSocketAddress bindAddress;
900 if(this instanceof MasterRpcServices) {
901 String hostname = getHostname(rs.conf, true);
902 int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
903
904 initialIsa = new InetSocketAddress(hostname, port);
905 bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port);
906 } else {
907 String hostname = getHostname(rs.conf, false);
908 int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT,
909 HConstants.DEFAULT_REGIONSERVER_PORT);
910
911 initialIsa = new InetSocketAddress(hostname, port);
912 bindAddress = new InetSocketAddress(
913 rs.conf.get("hbase.regionserver.ipc.address", hostname), port);
914 }
915 if (initialIsa.getAddress() == null) {
916 throw new IllegalArgumentException("Failed resolve of " + initialIsa);
917 }
918 priority = createPriority();
919 String name = rs.getProcessName() + "/" + initialIsa.toString();
920
921 ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
922 try {
923 rpcServer = new RpcServer(rs, name, getServices(),
924 bindAddress,
925 rs.conf,
926 rpcSchedulerFactory.create(rs.conf, this, rs));
927 } catch (BindException be) {
928 String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT :
929 HConstants.REGIONSERVER_PORT;
930 throw new IOException(be.getMessage() + ". To switch ports use the '" + configName +
931 "' configuration property.", be.getCause() != null ? be.getCause() : be);
932 }
933
934 scannerLeaseTimeoutPeriod = rs.conf.getInt(
935 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
936 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
937 maxScannerResultSize = rs.conf.getLong(
938 HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
939 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
940 rpcTimeout = rs.conf.getInt(
941 HConstants.HBASE_RPC_TIMEOUT_KEY,
942 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
943 minimumScanTimeLimitDelta = rs.conf.getLong(
944 REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
945 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
946
947 InetSocketAddress address = rpcServer.getListenerAddress();
948 if (address == null) {
949 throw new IOException("Listener channel is closed");
950 }
951
952 isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
953 rpcServer.setErrorHandler(this);
954 rs.setName(name);
955 }
956
957 @Override
958 public void onConfigurationChange(Configuration newConf) {
959 if (rpcServer instanceof ConfigurationObserver) {
960 ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf);
961 }
962 }
963
964 protected PriorityFunction createPriority() {
965 return new AnnotationReadingPriorityFunction(this);
966 }
967
968 public static String getHostname(Configuration conf, boolean isMaster)
969 throws UnknownHostException {
970 String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY :
971 HRegionServer.RS_HOSTNAME_KEY);
972 if (hostname == null || hostname.isEmpty()) {
973 String masterOrRS = isMaster ? "master" : "regionserver";
974 return Strings.domainNamePointerToHostName(DNS.getDefaultHost(
975 conf.get("hbase." + masterOrRS + ".dns.interface", "default"),
976 conf.get("hbase." + masterOrRS + ".dns.nameserver", "default")));
977 } else {
978 LOG.info("hostname is configured to be " + hostname);
979 return hostname;
980 }
981 }
982
983 RegionScanner getScanner(long scannerId) {
984 String scannerIdString = Long.toString(scannerId);
985 RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
986 if (scannerHolder != null) {
987 return scannerHolder.s;
988 }
989 return null;
990 }
991
992
993
994
995
996 long getScannerVirtualTime(long scannerId) {
997 String scannerIdString = Long.toString(scannerId);
998 RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
999 if (scannerHolder != null) {
1000 return scannerHolder.getNextCallSeq();
1001 }
1002 return 0L;
1003 }
1004
1005 long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException {
1006 long scannerId = this.scannerIdGen.incrementAndGet();
1007 String scannerName = String.valueOf(scannerId);
1008
1009 RegionScannerHolder existing =
1010 scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
1011 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
1012
1013 regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1014 new ScannerListener(scannerName));
1015 return scannerId;
1016 }
1017
1018
1019
1020
1021
1022 Object addSize(RpcCallContext context, Result r, Object lastBlock) {
1023 if (context != null && !r.isEmpty()) {
1024 for (Cell c : r.rawCells()) {
1025 context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c));
1026
1027
1028
1029
1030
1031
1032 byte[] valueArray = c.getValueArray();
1033 if (valueArray != lastBlock) {
1034 context.incrementResponseBlockSize(valueArray.length);
1035 lastBlock = valueArray;
1036 }
1037 }
1038 }
1039 return lastBlock;
1040 }
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051 Region getRegion(
1052 final RegionSpecifier regionSpecifier) throws IOException {
1053 return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(),
1054 ProtobufUtil.getRegionEncodedName(regionSpecifier));
1055 }
1056
1057 @VisibleForTesting
1058 public PriorityFunction getPriority() {
1059 return priority;
1060 }
1061
1062 @VisibleForTesting
1063 public Configuration getConfiguration() {
1064 return regionServer.getConfiguration();
1065 }
1066
1067 private RegionServerQuotaManager getQuotaManager() {
1068 return regionServer.getRegionServerQuotaManager();
1069 }
1070
1071 void start() {
1072 rpcServer.start();
1073 }
1074
1075 void stop() {
1076 closeAllScanners();
1077 rpcServer.stop();
1078 }
1079
1080
1081
1082
1083
1084
1085 protected void checkOpen() throws IOException {
1086 if (regionServer.isAborted()) {
1087 throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1088 }
1089 if (regionServer.isStopped()) {
1090 throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1091 }
1092 if (!regionServer.fsOk) {
1093 throw new RegionServerStoppedException("File system not available");
1094 }
1095 if (!regionServer.isOnline()) {
1096 throw new ServerNotRunningYetException("Server is not running yet");
1097 }
1098 }
1099
1100
1101
1102
1103 protected List<BlockingServiceAndInterface> getServices() {
1104 List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
1105 bssi.add(new BlockingServiceAndInterface(
1106 ClientService.newReflectiveBlockingService(this),
1107 ClientService.BlockingInterface.class));
1108 bssi.add(new BlockingServiceAndInterface(
1109 AdminService.newReflectiveBlockingService(this),
1110 AdminService.BlockingInterface.class));
1111 return bssi;
1112 }
1113
1114 public InetSocketAddress getSocketAddress() {
1115 return isa;
1116 }
1117
1118 @Override
1119 public int getPriority(RequestHeader header, Message param, User user) {
1120 return priority.getPriority(header, param, user);
1121 }
1122
1123 @Override
1124 public long getDeadline(RequestHeader header, Message param) {
1125 return priority.getDeadline(header, param);
1126 }
1127
1128
1129
1130
1131
1132
1133
1134
1135 @Override
1136 public boolean checkOOME(final Throwable e) {
1137 boolean stop = false;
1138 try {
1139 if (e instanceof OutOfMemoryError
1140 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1141 || (e.getMessage() != null && e.getMessage().contains(
1142 "java.lang.OutOfMemoryError"))) {
1143 stop = true;
1144 LOG.fatal("Run out of memory; " + getClass().getSimpleName()
1145 + " will abort itself immediately", e);
1146 }
1147 } finally {
1148 if (stop) {
1149 Runtime.getRuntime().halt(1);
1150 }
1151 }
1152 return stop;
1153 }
1154
1155
1156
1157
1158
1159
1160
1161
1162 @Override
1163 @QosPriority(priority=HConstants.ADMIN_QOS)
1164 public CloseRegionResponse closeRegion(final RpcController controller,
1165 final CloseRegionRequest request) throws ServiceException {
1166 final ServerName sn = (request.hasDestinationServer() ?
1167 ProtobufUtil.toServerName(request.getDestinationServer()) : null);
1168
1169 try {
1170 checkOpen();
1171 if (request.hasServerStartCode()) {
1172
1173 long serverStartCode = request.getServerStartCode();
1174 if (regionServer.serverName.getStartcode() != serverStartCode) {
1175 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1176 "different server with startCode: " + serverStartCode + ", this server is: "
1177 + regionServer.serverName));
1178 }
1179 }
1180 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1181
1182
1183 final Region region = regionServer.getFromOnlineRegions(encodedRegionName);
1184 if ((region != null) && (region .getCoprocessorHost() != null)) {
1185 region.getCoprocessorHost().preClose(false);
1186 }
1187
1188 requestCount.increment();
1189 LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1190 CloseRegionCoordination.CloseRegionDetails crd = regionServer.getCoordinatedStateManager()
1191 .getCloseRegionCoordination().parseFromProtoRequest(request);
1192
1193 boolean closed = regionServer.closeRegion(encodedRegionName, false, crd, sn);
1194 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1195 return builder.build();
1196 } catch (IOException ie) {
1197 throw new ServiceException(ie);
1198 }
1199 }
1200
1201
1202
1203
1204
1205
1206
1207
1208 @Override
1209 @QosPriority(priority=HConstants.ADMIN_QOS)
1210 public CompactRegionResponse compactRegion(final RpcController controller,
1211 final CompactRegionRequest request) throws ServiceException {
1212 try {
1213 checkOpen();
1214 requestCount.increment();
1215 Region region = getRegion(request.getRegion());
1216 region.startRegionOperation(Operation.COMPACT_REGION);
1217 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1218 boolean major = false;
1219 byte [] family = null;
1220 Store store = null;
1221 if (request.hasFamily()) {
1222 family = request.getFamily().toByteArray();
1223 store = region.getStore(family);
1224 if (store == null) {
1225 throw new ServiceException(new IOException("column family " + Bytes.toString(family)
1226 + " does not exist in region " + region.getRegionInfo().getRegionNameAsString()));
1227 }
1228 }
1229 if (request.hasMajor()) {
1230 major = request.getMajor();
1231 }
1232 if (major) {
1233 if (family != null) {
1234 store.triggerMajorCompaction();
1235 } else {
1236 region.triggerMajorCompaction();
1237 }
1238 }
1239
1240 String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
1241 if (LOG.isTraceEnabled()) {
1242 LOG.trace("User-triggered compaction requested for region "
1243 + region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
1244 }
1245 String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
1246 if(family != null) {
1247 regionServer.compactSplitThread.requestCompaction(region, store, log,
1248 Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1249 } else {
1250 regionServer.compactSplitThread.requestCompaction(region, log,
1251 Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1252 }
1253 return CompactRegionResponse.newBuilder().build();
1254 } catch (IOException ie) {
1255 throw new ServiceException(ie);
1256 }
1257 }
1258
1259
1260
1261
1262
1263
1264
1265
1266 @Override
1267 @QosPriority(priority=HConstants.ADMIN_QOS)
1268 public FlushRegionResponse flushRegion(final RpcController controller,
1269 final FlushRegionRequest request) throws ServiceException {
1270 try {
1271 checkOpen();
1272 requestCount.increment();
1273 Region region = getRegion(request.getRegion());
1274 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1275 boolean shouldFlush = true;
1276 if (request.hasIfOlderThanTs()) {
1277 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1278 }
1279 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1280 if (shouldFlush) {
1281 boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
1282 request.getWriteFlushWalMarker() : false;
1283 long startTime = EnvironmentEdgeManager.currentTime();
1284
1285 HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
1286 ((HRegion)region).flushcache(true, writeFlushWalMarker);
1287 if (flushResult.isFlushSucceeded()) {
1288 long endTime = EnvironmentEdgeManager.currentTime();
1289 regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1290 }
1291 boolean compactionNeeded = flushResult.isCompactionNeeded();
1292 if (compactionNeeded) {
1293 regionServer.compactSplitThread.requestSystemCompaction(region,
1294 "Compaction through user triggered flush");
1295 }
1296 builder.setFlushed(flushResult.isFlushSucceeded());
1297 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1298 }
1299 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1300 return builder.build();
1301 } catch (DroppedSnapshotException ex) {
1302
1303
1304
1305
1306 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1307 throw new ServiceException(ex);
1308 } catch (IOException ie) {
1309 throw new ServiceException(ie);
1310 }
1311 }
1312
1313 @Override
1314 @QosPriority(priority=HConstants.ADMIN_QOS)
1315 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1316 final GetOnlineRegionRequest request) throws ServiceException {
1317 try {
1318 checkOpen();
1319 requestCount.increment();
1320 Map<String, Region> onlineRegions = regionServer.onlineRegions;
1321 List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
1322 for (Region region: onlineRegions.values()) {
1323 list.add(region.getRegionInfo());
1324 }
1325 Collections.sort(list);
1326 return ResponseConverter.buildGetOnlineRegionResponse(list);
1327 } catch (IOException ie) {
1328 throw new ServiceException(ie);
1329 }
1330 }
1331
1332 @Override
1333 @QosPriority(priority=HConstants.ADMIN_QOS)
1334 public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1335 final GetRegionInfoRequest request) throws ServiceException {
1336 try {
1337 checkOpen();
1338 requestCount.increment();
1339 Region region = getRegion(request.getRegion());
1340 HRegionInfo info = region.getRegionInfo();
1341 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1342 builder.setRegionInfo(HRegionInfo.convert(info));
1343 if (request.hasCompactionState() && request.getCompactionState()) {
1344 builder.setCompactionState(region.getCompactionState());
1345 }
1346 builder.setIsRecovering(region.isRecovering());
1347 return builder.build();
1348 } catch (IOException ie) {
1349 throw new ServiceException(ie);
1350 }
1351 }
1352
1353
1354
1355
1356
1357
1358
1359
1360 @Override
1361 @QosPriority(priority=HConstants.ADMIN_QOS)
1362 public GetServerInfoResponse getServerInfo(final RpcController controller,
1363 final GetServerInfoRequest request) throws ServiceException {
1364 try {
1365 checkOpen();
1366 } catch (IOException ie) {
1367 throw new ServiceException(ie);
1368 }
1369 requestCount.increment();
1370 int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1371 return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1372 }
1373
1374 @Override
1375 @QosPriority(priority=HConstants.ADMIN_QOS)
1376 public GetStoreFileResponse getStoreFile(final RpcController controller,
1377 final GetStoreFileRequest request) throws ServiceException {
1378 try {
1379 checkOpen();
1380 Region region = getRegion(request.getRegion());
1381 requestCount.increment();
1382 Set<byte[]> columnFamilies;
1383 if (request.getFamilyCount() == 0) {
1384 columnFamilies = region.getTableDesc().getFamiliesKeys();
1385 } else {
1386 columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
1387 for (ByteString cf: request.getFamilyList()) {
1388 columnFamilies.add(cf.toByteArray());
1389 }
1390 }
1391 int nCF = columnFamilies.size();
1392 List<String> fileList = region.getStoreFileList(
1393 columnFamilies.toArray(new byte[nCF][]));
1394 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1395 builder.addAllStoreFile(fileList);
1396 return builder.build();
1397 } catch (IOException ie) {
1398 throw new ServiceException(ie);
1399 }
1400 }
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410 @Override
1411 @QosPriority(priority = HConstants.ADMIN_QOS)
1412 public MergeRegionsResponse mergeRegions(final RpcController controller,
1413 final MergeRegionsRequest request) throws ServiceException {
1414 try {
1415 checkOpen();
1416 requestCount.increment();
1417 Region regionA = getRegion(request.getRegionA());
1418 Region regionB = getRegion(request.getRegionB());
1419 boolean forcible = request.getForcible();
1420 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1421 regionA.startRegionOperation(Operation.MERGE_REGION);
1422 regionB.startRegionOperation(Operation.MERGE_REGION);
1423 if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
1424 regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1425 throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
1426 }
1427 LOG.info("Receiving merging request for " + regionA + ", " + regionB
1428 + ",forcible=" + forcible);
1429 long startTime = EnvironmentEdgeManager.currentTime();
1430 FlushResult flushResult = regionA.flush(true);
1431 if (flushResult.isFlushSucceeded()) {
1432 long endTime = EnvironmentEdgeManager.currentTime();
1433 regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1434 }
1435 startTime = EnvironmentEdgeManager.currentTime();
1436 flushResult = regionB.flush(true);
1437 if (flushResult.isFlushSucceeded()) {
1438 long endTime = EnvironmentEdgeManager.currentTime();
1439 regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1440 }
1441 regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
1442 masterSystemTime, RpcServer.getRequestUser());
1443 return MergeRegionsResponse.newBuilder().build();
1444 } catch (DroppedSnapshotException ex) {
1445 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1446 throw new ServiceException(ex);
1447 } catch (IOException ie) {
1448 throw new ServiceException(ie);
1449 }
1450 }
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475 @Override
1476 @QosPriority(priority=HConstants.ADMIN_QOS)
1477 public OpenRegionResponse openRegion(final RpcController controller,
1478 final OpenRegionRequest request) throws ServiceException {
1479 requestCount.increment();
1480 if (request.hasServerStartCode()) {
1481
1482 long serverStartCode = request.getServerStartCode();
1483 if (regionServer.serverName.getStartcode() != serverStartCode) {
1484 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1485 "different server with startCode: " + serverStartCode + ", this server is: "
1486 + regionServer.serverName));
1487 }
1488 }
1489
1490 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1491 final int regionCount = request.getOpenInfoCount();
1492 final Map<TableName, HTableDescriptor> htds =
1493 new HashMap<TableName, HTableDescriptor>(regionCount);
1494 final boolean isBulkAssign = regionCount > 1;
1495 try {
1496 checkOpen();
1497 } catch (IOException ie) {
1498 TableName tableName = null;
1499 if (regionCount == 1) {
1500 RegionInfo ri = request.getOpenInfo(0).getRegion();
1501 if (ri != null) {
1502 tableName = ProtobufUtil.toTableName(ri.getTableName());
1503 }
1504 }
1505 if (!TableName.META_TABLE_NAME.equals(tableName)) {
1506 throw new ServiceException(ie);
1507 }
1508
1509 int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1510 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2;
1511 long endTime = System.currentTimeMillis() + timeout;
1512 synchronized (regionServer.online) {
1513 try {
1514 while (System.currentTimeMillis() <= endTime
1515 && !regionServer.isStopped() && !regionServer.isOnline()) {
1516 regionServer.online.wait(regionServer.msgInterval);
1517 }
1518 checkOpen();
1519 } catch (InterruptedException t) {
1520 Thread.currentThread().interrupt();
1521 throw new ServiceException(t);
1522 } catch (IOException e) {
1523 throw new ServiceException(e);
1524 }
1525 }
1526 }
1527
1528 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1529
1530 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1531 final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
1532 OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager().
1533 getOpenRegionCoordination();
1534 OpenRegionCoordination.OpenRegionDetails ord =
1535 coordination.parseFromProtoRequest(regionOpenInfo);
1536
1537 HTableDescriptor htd;
1538 try {
1539 final Region onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName());
1540 if (onlineRegion != null) {
1541
1542 if (onlineRegion.getCoprocessorHost() != null) {
1543 onlineRegion.getCoprocessorHost().preOpen();
1544 }
1545
1546
1547 Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1548 regionServer.getConnection(), region.getRegionName());
1549 if (regionServer.serverName.equals(p.getSecond())) {
1550 Boolean closing = regionServer.regionsInTransitionInRS.get(region.getEncodedNameAsBytes());
1551
1552
1553
1554
1555
1556 if (!Boolean.FALSE.equals(closing)
1557 && regionServer.getFromOnlineRegions(region.getEncodedName()) != null) {
1558 LOG.warn("Attempted open of " + region.getEncodedName()
1559 + " but already online on this server");
1560 builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
1561 continue;
1562 }
1563 } else {
1564 LOG.warn("The region " + region.getEncodedName() + " is online on this server"
1565 + " but hbase:meta does not have this server - continue opening.");
1566 regionServer.removeFromOnlineRegions(onlineRegion, null);
1567 }
1568 }
1569 LOG.info("Open " + region.getRegionNameAsString());
1570 htd = htds.get(region.getTable());
1571 if (htd == null) {
1572 htd = regionServer.tableDescriptors.get(region.getTable());
1573 htds.put(region.getTable(), htd);
1574 }
1575
1576 final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent(
1577 region.getEncodedNameAsBytes(), Boolean.TRUE);
1578
1579 if (Boolean.FALSE.equals(previous)) {
1580
1581
1582 coordination.tryTransitionFromOfflineToFailedOpen(regionServer, region, ord);
1583
1584 throw new RegionAlreadyInTransitionException("Received OPEN for the region:"
1585 + region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
1586 }
1587
1588 if (Boolean.TRUE.equals(previous)) {
1589
1590 LOG.info("Receiving OPEN for the region:" +
1591 region.getRegionNameAsString() + " , which we are already trying to OPEN"
1592 + " - ignoring this new request for this region.");
1593 }
1594
1595
1596
1597 regionServer.removeFromMovedRegions(region.getEncodedName());
1598
1599 if (previous == null) {
1600
1601 if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
1602 region.getEncodedName())) {
1603
1604
1605 if (!regionOpenInfo.hasOpenForDistributedLogReplay()
1606 || regionOpenInfo.getOpenForDistributedLogReplay()) {
1607 regionServer.recoveringRegions.put(region.getEncodedName(), null);
1608 } else {
1609
1610
1611 List<String> tmpRegions = new ArrayList<String>();
1612 tmpRegions.add(region.getEncodedName());
1613 ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(),
1614 tmpRegions);
1615 }
1616 }
1617
1618
1619 if (region.isMetaRegion()) {
1620 regionServer.service.submit(new OpenMetaHandler(
1621 regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1622 } else {
1623 regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
1624 regionOpenInfo.getFavoredNodesList());
1625 regionServer.service.submit(new OpenRegionHandler(
1626 regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1627 }
1628 }
1629
1630 builder.addOpeningState(RegionOpeningState.OPENED);
1631
1632 } catch (KeeperException zooKeeperEx) {
1633 LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
1634 throw new ServiceException(zooKeeperEx);
1635 } catch (IOException ie) {
1636 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
1637 if (isBulkAssign) {
1638 builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
1639 } else {
1640 throw new ServiceException(ie);
1641 }
1642 }
1643 }
1644 return builder.build();
1645 }
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658 @Override
1659 public WarmupRegionResponse warmupRegion(final RpcController controller,
1660 final WarmupRegionRequest request) throws ServiceException {
1661
1662 RegionInfo regionInfo = request.getRegionInfo();
1663 final HRegionInfo region = HRegionInfo.convert(regionInfo);
1664 HTableDescriptor htd;
1665 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
1666
1667 try {
1668 checkOpen();
1669 String encodedName = region.getEncodedName();
1670 byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1671 final Region onlineRegion = regionServer.getFromOnlineRegions(encodedName);
1672
1673 if (onlineRegion != null) {
1674 LOG.info("Region already online. Skipping warming up " + region);
1675 return response;
1676 }
1677
1678 if (LOG.isDebugEnabled()) {
1679 LOG.debug("Warming up Region " + region.getRegionNameAsString());
1680 }
1681
1682 htd = regionServer.tableDescriptors.get(region.getTable());
1683
1684 if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
1685 LOG.info("Region is in transition. Skipping warmup " + region);
1686 return response;
1687 }
1688
1689 HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
1690 regionServer.getConfiguration(), regionServer, null);
1691
1692 } catch (IOException ie) {
1693 LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
1694 throw new ServiceException(ie);
1695 }
1696
1697 return response;
1698 }
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708 @Override
1709 @QosPriority(priority = HConstants.REPLAY_QOS)
1710 public ReplicateWALEntryResponse replay(final RpcController controller,
1711 final ReplicateWALEntryRequest request) throws ServiceException {
1712 long before = EnvironmentEdgeManager.currentTime();
1713 CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
1714 try {
1715 checkOpen();
1716 List<WALEntry> entries = request.getEntryList();
1717 if (entries == null || entries.isEmpty()) {
1718
1719 return ReplicateWALEntryResponse.newBuilder().build();
1720 }
1721 ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
1722 Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
1723 RegionCoprocessorHost coprocessorHost =
1724 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
1725 ? region.getCoprocessorHost()
1726 : null;
1727 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
1728
1729
1730 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1731 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
1732
1733 for (WALEntry entry : entries) {
1734 if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
1735 throw new NotServingRegionException("Replay request contains entries from multiple " +
1736 "regions. First region:" + regionName.toStringUtf8() + " , other region:"
1737 + entry.getKey().getEncodedRegionName());
1738 }
1739 if (regionServer.nonceManager != null && isPrimary) {
1740 long nonceGroup = entry.getKey().hasNonceGroup()
1741 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1742 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1743 regionServer.nonceManager.reportOperationFromWal(
1744 nonceGroup,
1745 nonce,
1746 entry.getKey().getWriteTime());
1747 }
1748 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
1749 new Pair<WALKey, WALEdit>();
1750 List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
1751 cells, walEntry, durability);
1752 if (coprocessorHost != null) {
1753
1754
1755 if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
1756 walEntry.getSecond())) {
1757
1758 continue;
1759 }
1760 walEntries.add(walEntry);
1761 }
1762 if(edits!=null && !edits.isEmpty()) {
1763 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
1764 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
1765 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
1766
1767 for (int i = 0; result != null && i < result.length; i++) {
1768 if (result[i] != OperationStatus.SUCCESS) {
1769 throw new IOException(result[i].getExceptionMsg());
1770 }
1771 }
1772 }
1773 }
1774
1775
1776 WAL wal = getWAL(region);
1777 if (wal != null) {
1778 wal.sync();
1779 }
1780
1781 if (coprocessorHost != null) {
1782 for (Pair<WALKey, WALEdit> entry : walEntries) {
1783 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
1784 entry.getSecond());
1785 }
1786 }
1787 return ReplicateWALEntryResponse.newBuilder().build();
1788 } catch (IOException ie) {
1789 throw new ServiceException(ie);
1790 } finally {
1791 if (regionServer.metricsRegionServer != null) {
1792 regionServer.metricsRegionServer.updateReplay(
1793 EnvironmentEdgeManager.currentTime() - before);
1794 }
1795 }
1796 }
1797
1798 WAL getWAL(Region region) {
1799 return ((HRegion)region).getWAL();
1800 }
1801
1802
1803
1804
1805
1806
1807
1808
1809 @Override
1810 @QosPriority(priority=HConstants.REPLICATION_QOS)
1811 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
1812 final ReplicateWALEntryRequest request) throws ServiceException {
1813 try {
1814 checkOpen();
1815 if (regionServer.replicationSinkHandler != null) {
1816 requestCount.increment();
1817 List<WALEntry> entries = request.getEntryList();
1818 CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
1819 regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
1820 regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
1821 regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
1822 return ReplicateWALEntryResponse.newBuilder().build();
1823 } else {
1824 throw new ServiceException("Replication services are not initialized yet");
1825 }
1826 } catch (IOException ie) {
1827 throw new ServiceException(ie);
1828 }
1829 }
1830
1831
1832
1833
1834
1835
1836
1837 @Override
1838 public RollWALWriterResponse rollWALWriter(final RpcController controller,
1839 final RollWALWriterRequest request) throws ServiceException {
1840 try {
1841 checkOpen();
1842 requestCount.increment();
1843 regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
1844 regionServer.walRoller.requestRollAll();
1845 regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
1846 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
1847 return builder.build();
1848 } catch (IOException ie) {
1849 throw new ServiceException(ie);
1850 }
1851 }
1852
1853
1854
1855
1856
1857
1858
1859
1860 @Override
1861 @QosPriority(priority=HConstants.ADMIN_QOS)
1862 public SplitRegionResponse splitRegion(final RpcController controller,
1863 final SplitRegionRequest request) throws ServiceException {
1864 try {
1865 checkOpen();
1866 requestCount.increment();
1867 Region region = getRegion(request.getRegion());
1868 region.startRegionOperation(Operation.SPLIT_REGION);
1869 if (region.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1870 throw new IOException("Can't split replicas directly. "
1871 + "Replicas are auto-split when their primary is split.");
1872 }
1873 LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString());
1874 long startTime = EnvironmentEdgeManager.currentTime();
1875 FlushResult flushResult = region.flush(true);
1876 if (flushResult.isFlushSucceeded()) {
1877 long endTime = EnvironmentEdgeManager.currentTime();
1878 regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1879 }
1880 byte[] splitPoint = null;
1881 if (request.hasSplitPoint()) {
1882 splitPoint = request.getSplitPoint().toByteArray();
1883 }
1884 ((HRegion)region).forceSplit(splitPoint);
1885 regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(),
1886 RpcServer.getRequestUser());
1887 return SplitRegionResponse.newBuilder().build();
1888 } catch (DroppedSnapshotException ex) {
1889 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1890 throw new ServiceException(ex);
1891 } catch (IOException ie) {
1892 throw new ServiceException(ie);
1893 }
1894 }
1895
1896
1897
1898
1899
1900
1901
1902
1903 @Override
1904 @QosPriority(priority=HConstants.ADMIN_QOS)
1905 public StopServerResponse stopServer(final RpcController controller,
1906 final StopServerRequest request) throws ServiceException {
1907 requestCount.increment();
1908 String reason = request.getReason();
1909 regionServer.stop(reason);
1910 return StopServerResponse.newBuilder().build();
1911 }
1912
1913 @Override
1914 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
1915 UpdateFavoredNodesRequest request) throws ServiceException {
1916 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
1917 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
1918 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
1919 HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion());
1920 regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
1921 regionUpdateInfo.getFavoredNodesList());
1922 }
1923 respBuilder.setResponse(openInfoList.size());
1924 return respBuilder.build();
1925 }
1926
1927
1928
1929
1930
1931
1932 @Override
1933 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
1934 final BulkLoadHFileRequest request) throws ServiceException {
1935 try {
1936 checkOpen();
1937 requestCount.increment();
1938 Region region = getRegion(request.getRegion());
1939 List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
1940 for (FamilyPath familyPath: request.getFamilyPathList()) {
1941 familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
1942 familyPath.getPath()));
1943 }
1944 boolean bypass = false;
1945 if (region.getCoprocessorHost() != null) {
1946 bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
1947 }
1948 boolean loaded = false;
1949 if (!bypass) {
1950 loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
1951 }
1952 if (region.getCoprocessorHost() != null) {
1953 loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
1954 }
1955 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
1956 builder.setLoaded(loaded);
1957 return builder.build();
1958 } catch (IOException ie) {
1959 throw new ServiceException(ie);
1960 }
1961 }
1962
1963 @Override
1964 public CoprocessorServiceResponse execService(final RpcController controller,
1965 final CoprocessorServiceRequest request) throws ServiceException {
1966 try {
1967 checkOpen();
1968 requestCount.increment();
1969 Region region = getRegion(request.getRegion());
1970 Message result = execServiceOnRegion(region, request.getCall());
1971 CoprocessorServiceResponse.Builder builder =
1972 CoprocessorServiceResponse.newBuilder();
1973 builder.setRegion(RequestConverter.buildRegionSpecifier(
1974 RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName()));
1975 builder.setValue(
1976 builder.getValueBuilder().setName(result.getClass().getName())
1977 .setValue(result.toByteString()));
1978 return builder.build();
1979 } catch (IOException ie) {
1980 throw new ServiceException(ie);
1981 }
1982 }
1983
1984 private Message execServiceOnRegion(Region region,
1985 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
1986
1987 ServerRpcController execController = new ServerRpcController();
1988 return region.execService(execController, serviceCall);
1989 }
1990
1991
1992
1993
1994
1995
1996
1997
1998 @Override
1999 public GetResponse get(final RpcController controller,
2000 final GetRequest request) throws ServiceException {
2001 long before = EnvironmentEdgeManager.currentTime();
2002 OperationQuota quota = null;
2003 try {
2004 checkOpen();
2005 requestCount.increment();
2006 Region region = getRegion(request.getRegion());
2007
2008 GetResponse.Builder builder = GetResponse.newBuilder();
2009 ClientProtos.Get get = request.getGet();
2010 Boolean existence = null;
2011 Result r = null;
2012 quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2013
2014 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2015 if (get.getColumnCount() != 1) {
2016 throw new DoNotRetryIOException(
2017 "get ClosestRowBefore supports one and only one family now, not "
2018 + get.getColumnCount() + " families");
2019 }
2020 byte[] row = get.getRow().toByteArray();
2021 byte[] family = get.getColumn(0).getFamily().toByteArray();
2022 r = region.getClosestRowBefore(row, family);
2023 } else {
2024 Get clientGet = ProtobufUtil.toGet(get);
2025 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2026 existence = region.getCoprocessorHost().preExists(clientGet);
2027 }
2028 if (existence == null) {
2029 r = region.get(clientGet);
2030 if (get.getExistenceOnly()) {
2031 boolean exists = r.getExists();
2032 if (region.getCoprocessorHost() != null) {
2033 exists = region.getCoprocessorHost().postExists(clientGet, exists);
2034 }
2035 existence = exists;
2036 }
2037 }
2038 }
2039 if (existence != null){
2040 ClientProtos.Result pbr =
2041 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2042 builder.setResult(pbr);
2043 } else if (r != null) {
2044 ClientProtos.Result pbr = ProtobufUtil.toResult(r);
2045 builder.setResult(pbr);
2046 }
2047 if (r != null) {
2048 quota.addGetResult(r);
2049 }
2050 return builder.build();
2051 } catch (IOException ie) {
2052 throw new ServiceException(ie);
2053 } finally {
2054 if (regionServer.metricsRegionServer != null) {
2055 regionServer.metricsRegionServer.updateGet(
2056 EnvironmentEdgeManager.currentTime() - before);
2057 }
2058 if (quota != null) {
2059 quota.close();
2060 }
2061 }
2062 }
2063
2064
2065
2066
2067
2068
2069
2070
2071 @Override
2072 public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2073 throws ServiceException {
2074 try {
2075 checkOpen();
2076 } catch (IOException ie) {
2077 throw new ServiceException(ie);
2078 }
2079
2080
2081
2082 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2083 CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
2084 if (controller != null) {
2085 controller.setCellScanner(null);
2086 }
2087
2088 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2089
2090
2091 List<CellScannable> cellsToReturn = null;
2092 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2093 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2094 Boolean processed = null;
2095
2096 for (RegionAction regionAction : request.getRegionActionList()) {
2097 this.requestCount.add(regionAction.getActionCount());
2098 OperationQuota quota;
2099 Region region;
2100 regionActionResultBuilder.clear();
2101 try {
2102 region = getRegion(regionAction.getRegion());
2103 quota = getQuotaManager().checkQuota(region, regionAction.getActionList());
2104 } catch (IOException e) {
2105 rpcServer.getMetrics().exception(e);
2106 regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2107 responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2108
2109
2110
2111 if (cellScanner != null) {
2112 skipCellsForMutations(regionAction.getActionList(), cellScanner);
2113 }
2114 continue;
2115 }
2116
2117 if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2118
2119
2120 try {
2121 if (request.hasCondition()) {
2122 Condition condition = request.getCondition();
2123 byte[] row = condition.getRow().toByteArray();
2124 byte[] family = condition.getFamily().toByteArray();
2125 byte[] qualifier = condition.getQualifier().toByteArray();
2126 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2127 ByteArrayComparable comparator =
2128 ProtobufUtil.toComparator(condition.getComparator());
2129 processed = checkAndRowMutate(region, regionAction.getActionList(),
2130 cellScanner, row, family, qualifier, compareOp, comparator);
2131 } else {
2132 ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
2133 cellScanner);
2134
2135 if(stats != null) {
2136 responseBuilder.addRegionActionResult(RegionActionResult.newBuilder()
2137 .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)));
2138 }
2139 processed = Boolean.TRUE;
2140 }
2141 } catch (IOException e) {
2142 rpcServer.getMetrics().exception(e);
2143
2144 regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2145 }
2146 } else {
2147
2148 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2149 regionActionResultBuilder, cellsToReturn, nonceGroup);
2150 }
2151 responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2152 quota.close();
2153 }
2154
2155 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2156 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2157 }
2158 if (processed != null) {
2159 responseBuilder.setProcessed(processed);
2160 }
2161 return responseBuilder.build();
2162 }
2163
2164 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2165 for (Action action : actions) {
2166 skipCellsForMutation(action, cellScanner);
2167 }
2168 }
2169
2170 private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2171 try {
2172 if (action.hasMutation()) {
2173 MutationProto m = action.getMutation();
2174 if (m.hasAssociatedCellCount()) {
2175 for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2176 cellScanner.advance();
2177 }
2178 }
2179 }
2180 } catch (IOException e) {
2181
2182
2183
2184 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2185 }
2186 }
2187
2188
2189
2190
2191
2192
2193
2194
2195 @Override
2196 public MutateResponse mutate(final RpcController rpcc,
2197 final MutateRequest request) throws ServiceException {
2198
2199
2200 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2201 CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2202 OperationQuota quota = null;
2203
2204 if (controller != null) {
2205 controller.setCellScanner(null);
2206 }
2207 try {
2208 checkOpen();
2209 requestCount.increment();
2210 Region region = getRegion(request.getRegion());
2211 MutateResponse.Builder builder = MutateResponse.newBuilder();
2212 MutationProto mutation = request.getMutation();
2213 if (!region.getRegionInfo().isMetaTable()) {
2214 regionServer.cacheFlusher.reclaimMemStoreMemory();
2215 }
2216 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2217 Result r = null;
2218 Boolean processed = null;
2219 MutationType type = mutation.getMutateType();
2220 long mutationSize = 0;
2221 quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2222 switch (type) {
2223 case APPEND:
2224
2225 r = append(region, quota, mutation, cellScanner, nonceGroup);
2226 break;
2227 case INCREMENT:
2228
2229 r = increment(region, quota, mutation, cellScanner, nonceGroup);
2230 break;
2231 case PUT:
2232 Put put = ProtobufUtil.toPut(mutation, cellScanner);
2233 quota.addMutation(put);
2234 if (request.hasCondition()) {
2235 Condition condition = request.getCondition();
2236 byte[] row = condition.getRow().toByteArray();
2237 byte[] family = condition.getFamily().toByteArray();
2238 byte[] qualifier = condition.getQualifier().toByteArray();
2239 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2240 ByteArrayComparable comparator =
2241 ProtobufUtil.toComparator(condition.getComparator());
2242 if (region.getCoprocessorHost() != null) {
2243 processed = region.getCoprocessorHost().preCheckAndPut(
2244 row, family, qualifier, compareOp, comparator, put);
2245 }
2246 if (processed == null) {
2247 boolean result = region.checkAndMutate(row, family,
2248 qualifier, compareOp, comparator, put, true);
2249 if (region.getCoprocessorHost() != null) {
2250 result = region.getCoprocessorHost().postCheckAndPut(row, family,
2251 qualifier, compareOp, comparator, put, result);
2252 }
2253 processed = result;
2254 }
2255 } else {
2256 region.put(put);
2257 processed = Boolean.TRUE;
2258 }
2259 break;
2260 case DELETE:
2261 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2262 quota.addMutation(delete);
2263 if (request.hasCondition()) {
2264 Condition condition = request.getCondition();
2265 byte[] row = condition.getRow().toByteArray();
2266 byte[] family = condition.getFamily().toByteArray();
2267 byte[] qualifier = condition.getQualifier().toByteArray();
2268 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2269 ByteArrayComparable comparator =
2270 ProtobufUtil.toComparator(condition.getComparator());
2271 if (region.getCoprocessorHost() != null) {
2272 processed = region.getCoprocessorHost().preCheckAndDelete(
2273 row, family, qualifier, compareOp, comparator, delete);
2274 }
2275 if (processed == null) {
2276 boolean result = region.checkAndMutate(row, family,
2277 qualifier, compareOp, comparator, delete, true);
2278 if (region.getCoprocessorHost() != null) {
2279 result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2280 qualifier, compareOp, comparator, delete, result);
2281 }
2282 processed = result;
2283 }
2284 } else {
2285 region.delete(delete);
2286 processed = Boolean.TRUE;
2287 }
2288 break;
2289 default:
2290 throw new DoNotRetryIOException(
2291 "Unsupported mutate type: " + type.name());
2292 }
2293 if (processed != null) builder.setProcessed(processed.booleanValue());
2294 addResult(builder, r, controller);
2295 return builder.build();
2296 } catch (IOException ie) {
2297 regionServer.checkFileSystem();
2298 throw new ServiceException(ie);
2299 } finally {
2300 if (quota != null) {
2301 quota.close();
2302 }
2303 }
2304 }
2305
2306
2307
2308
2309
2310
2311
2312
2313 @Override
2314 public ScanResponse scan(final RpcController controller, final ScanRequest request)
2315 throws ServiceException {
2316 OperationQuota quota = null;
2317 Leases.Lease lease = null;
2318 String scannerName = null;
2319 try {
2320 if (!request.hasScannerId() && !request.hasScan()) {
2321 throw new DoNotRetryIOException(
2322 "Missing required input: scannerId or scan");
2323 }
2324 long scannerId = -1;
2325 if (request.hasScannerId()) {
2326 scannerId = request.getScannerId();
2327 scannerName = String.valueOf(scannerId);
2328 }
2329 try {
2330 checkOpen();
2331 } catch (IOException e) {
2332
2333
2334 if (scannerName != null) {
2335 LOG.debug("Server shutting down and client tried to access missing scanner "
2336 + scannerName);
2337 if (regionServer.leases != null) {
2338 try {
2339 regionServer.leases.cancelLease(scannerName);
2340 } catch (LeaseException le) {
2341
2342 if (LOG.isTraceEnabled()) {
2343 LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2344 }
2345 }
2346 }
2347 }
2348 throw e;
2349 }
2350 requestCount.increment();
2351
2352 int ttl = 0;
2353 Region region = null;
2354 RegionScanner scanner = null;
2355 RegionScannerHolder rsh = null;
2356 boolean moreResults = true;
2357 boolean closeScanner = false;
2358 boolean isSmallScan = false;
2359 RpcCallContext context = RpcServer.getCurrentCall();
2360 Object lastBlock = null;
2361
2362 ScanResponse.Builder builder = ScanResponse.newBuilder();
2363 if (request.hasCloseScanner()) {
2364 closeScanner = request.getCloseScanner();
2365 }
2366 int rows = closeScanner ? 0 : 1;
2367 if (request.hasNumberOfRows()) {
2368 rows = request.getNumberOfRows();
2369 }
2370 if (request.hasScannerId()) {
2371 rsh = scanners.get(scannerName);
2372 if (rsh == null) {
2373 LOG.info("Client tried to access missing scanner " + scannerName);
2374 throw new UnknownScannerException(
2375 "Name: " + scannerName + ", already closed?");
2376 }
2377 scanner = rsh.s;
2378 HRegionInfo hri = scanner.getRegionInfo();
2379 region = regionServer.getRegion(hri.getRegionName());
2380 if (region != rsh.r) {
2381 throw new NotServingRegionException("Region was re-opened after the scanner"
2382 + scannerName + " was created: " + hri.getRegionNameAsString());
2383 }
2384 } else {
2385 region = getRegion(request.getRegion());
2386 ClientProtos.Scan protoScan = request.getScan();
2387 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
2388 Scan scan = ProtobufUtil.toScan(protoScan);
2389
2390 if (!isLoadingCfsOnDemandSet) {
2391 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2392 }
2393
2394 isSmallScan = scan.isSmall();
2395 if (!scan.hasFamilies()) {
2396
2397 for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
2398 scan.addFamily(family);
2399 }
2400 }
2401
2402 if (region.getCoprocessorHost() != null) {
2403 scanner = region.getCoprocessorHost().preScannerOpen(scan);
2404 }
2405 if (scanner == null) {
2406 scanner = region.getScanner(scan);
2407 }
2408 if (region.getCoprocessorHost() != null) {
2409 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
2410 }
2411 scannerId = addScanner(scanner, region);
2412 scannerName = String.valueOf(scannerId);
2413 ttl = this.scannerLeaseTimeoutPeriod;
2414 }
2415 if (request.hasRenew() && request.getRenew()) {
2416 rsh = scanners.get(scannerName);
2417 lease = regionServer.leases.removeLease(scannerName);
2418 if (lease != null && rsh != null) {
2419 regionServer.leases.addLease(lease);
2420
2421 rsh.incNextCallSeq();
2422 }
2423 return builder.build();
2424 }
2425
2426 quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
2427 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
2428 if (rows > 0) {
2429
2430
2431
2432 if (request.hasNextCallSeq()) {
2433 if (rsh == null) {
2434 rsh = scanners.get(scannerName);
2435 }
2436 if (rsh != null) {
2437 if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
2438 throw new OutOfOrderScannerNextException(
2439 "Expected nextCallSeq: " + rsh.getNextCallSeq()
2440 + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
2441 "; request=" + TextFormat.shortDebugString(request));
2442 }
2443
2444 rsh.incNextCallSeq();
2445 }
2446 }
2447 try {
2448
2449
2450 lease = regionServer.leases.removeLease(scannerName);
2451 List<Result> results = new ArrayList<Result>();
2452
2453 boolean done = false;
2454
2455 if (region != null && region.getCoprocessorHost() != null) {
2456 Boolean bypass = region.getCoprocessorHost().preScannerNext(
2457 scanner, results, rows);
2458 if (!results.isEmpty()) {
2459 for (Result r : results) {
2460 lastBlock = addSize(context, r, lastBlock);
2461 }
2462 }
2463 if (bypass != null && bypass.booleanValue()) {
2464 done = true;
2465 }
2466 }
2467
2468 if (!done) {
2469 long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
2470 if (maxResultSize <= 0) {
2471 maxResultSize = maxQuotaResultSize;
2472 }
2473
2474
2475
2476 List<Cell> values = new ArrayList<Cell>(32);
2477 region.startRegionOperation(Operation.SCAN);
2478 try {
2479 int i = 0;
2480 synchronized(scanner) {
2481 boolean stale = (region.getRegionInfo().getReplicaId() != 0);
2482 boolean clientHandlesPartials =
2483 request.hasClientHandlesPartials() && request.getClientHandlesPartials();
2484 boolean clientHandlesHeartbeats =
2485 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
2486
2487
2488
2489
2490
2491
2492 boolean serverGuaranteesOrderOfPartials = results.isEmpty();
2493 boolean allowPartialResults =
2494 clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
2495 boolean moreRows = false;
2496
2497
2498
2499
2500
2501
2502
2503
2504 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
2505
2506
2507
2508 long timeLimit = -1;
2509
2510
2511
2512
2513 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
2514 long timeLimitDelta;
2515 if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
2516 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
2517 } else {
2518 timeLimitDelta =
2519 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
2520 }
2521
2522
2523
2524 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
2525 timeLimit = System.currentTimeMillis() + timeLimitDelta;
2526 }
2527
2528 final LimitScope sizeScope =
2529 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2530 final LimitScope timeScope =
2531 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2532
2533 boolean trackMetrics =
2534 request.hasTrackScanMetrics() && request.getTrackScanMetrics();
2535
2536
2537
2538 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
2539 contextBuilder.setSizeLimit(sizeScope, maxResultSize);
2540 contextBuilder.setBatchLimit(scanner.getBatch());
2541 contextBuilder.setTimeLimit(timeScope, timeLimit);
2542 contextBuilder.setTrackMetrics(trackMetrics);
2543 ScannerContext scannerContext = contextBuilder.build();
2544
2545 boolean limitReached = false;
2546 while (i < rows) {
2547
2548
2549
2550
2551
2552 scannerContext.setBatchProgress(0);
2553
2554
2555 moreRows = scanner.nextRaw(values, scannerContext);
2556
2557 if (!values.isEmpty()) {
2558 final boolean partial = scannerContext.partialResultFormed();
2559 Result r = Result.create(values, null, stale, partial);
2560 lastBlock = addSize(context, r, lastBlock);
2561 results.add(r);
2562 i++;
2563 }
2564
2565 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
2566 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
2567 boolean rowLimitReached = i >= rows;
2568 limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
2569
2570 if (limitReached || !moreRows) {
2571 if (LOG.isTraceEnabled()) {
2572 LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
2573 + moreRows + " scannerContext: " + scannerContext);
2574 }
2575
2576
2577
2578
2579 if (moreRows) {
2580
2581 builder.setHeartbeatMessage(timeLimitReached);
2582 }
2583 break;
2584 }
2585 values.clear();
2586 }
2587
2588 if (limitReached || moreRows) {
2589
2590 builder.setMoreResultsInRegion(true);
2591 } else {
2592
2593 builder.setMoreResultsInRegion(false);
2594 }
2595
2596
2597
2598 if (trackMetrics) {
2599 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
2600 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
2601 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
2602
2603 for (Entry<String, Long> entry : metrics.entrySet()) {
2604 pairBuilder.setName(entry.getKey());
2605 pairBuilder.setValue(entry.getValue());
2606 metricBuilder.addMetrics(pairBuilder.build());
2607 }
2608
2609 builder.setScanMetrics(metricBuilder.build());
2610 }
2611 }
2612 region.updateReadRequestsCount(i);
2613 long responseCellSize = context != null ? context.getResponseCellSize() : 0;
2614 region.getMetrics().updateScanNext(responseCellSize);
2615 if (regionServer.metricsRegionServer != null) {
2616 regionServer.metricsRegionServer.updateScannerNext(responseCellSize);
2617 }
2618 } finally {
2619 region.closeRegionOperation();
2620 }
2621
2622
2623 if (region != null && region.getCoprocessorHost() != null) {
2624 region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
2625 }
2626 }
2627
2628 quota.addScanResult(results);
2629
2630
2631
2632
2633 if (scanner.isFilterDone() && results.isEmpty()) {
2634 moreResults = false;
2635 results = null;
2636 } else {
2637 addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
2638 }
2639 } catch (IOException e) {
2640
2641
2642
2643 if (rsh != null && request.hasNextCallSeq()) {
2644 rsh.rollbackNextCallSeq();
2645 }
2646 throw e;
2647 } finally {
2648
2649
2650 if (scanners.containsKey(scannerName)) {
2651 if (lease != null) regionServer.leases.addLease(lease);
2652 ttl = this.scannerLeaseTimeoutPeriod;
2653 }
2654 }
2655 }
2656
2657 if (!moreResults || closeScanner) {
2658 ttl = 0;
2659 moreResults = false;
2660 if (region != null && region.getCoprocessorHost() != null) {
2661 if (region.getCoprocessorHost().preScannerClose(scanner)) {
2662 return builder.build();
2663 }
2664 }
2665 rsh = scanners.remove(scannerName);
2666 if (rsh != null) {
2667 scanner = rsh.s;
2668 scanner.close();
2669 regionServer.leases.cancelLease(scannerName);
2670 if (region != null && region.getCoprocessorHost() != null) {
2671 region.getCoprocessorHost().postScannerClose(scanner);
2672 }
2673 }
2674 }
2675
2676 if (ttl > 0) {
2677 builder.setTtl(ttl);
2678 }
2679 builder.setScannerId(scannerId);
2680 builder.setMoreResults(moreResults);
2681 return builder.build();
2682 } catch (IOException ie) {
2683 if (scannerName != null && ie instanceof NotServingRegionException) {
2684 RegionScannerHolder rsh = scanners.remove(scannerName);
2685 if (rsh != null) {
2686 try {
2687 RegionScanner scanner = rsh.s;
2688 LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ...");
2689 scanner.close();
2690 regionServer.leases.cancelLease(scannerName);
2691 } catch (IOException e) {
2692 LOG.warn("Getting exception closing " + scannerName, e);
2693 }
2694 }
2695 }
2696 throw new ServiceException(ie);
2697 } finally {
2698 if (quota != null) {
2699 quota.close();
2700 }
2701 }
2702 }
2703
2704 @Override
2705 public CoprocessorServiceResponse execRegionServerService(RpcController controller,
2706 CoprocessorServiceRequest request) throws ServiceException {
2707 return regionServer.execRegionServerService(controller, request);
2708 }
2709
2710 @Override
2711 public UpdateConfigurationResponse updateConfiguration(
2712 RpcController controller, UpdateConfigurationRequest request)
2713 throws ServiceException {
2714 try {
2715 this.regionServer.updateConfiguration();
2716 } catch (Exception e) {
2717 throw new ServiceException(e);
2718 }
2719 return UpdateConfigurationResponse.getDefaultInstance();
2720 }
2721 }