1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.UnknownHostException;
24 import java.util.Map;
25 import java.util.Map.Entry;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellScanner;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.DoNotRetryIOException;
35 import org.apache.hadoop.hbase.HBaseIOException;
36 import org.apache.hadoop.hbase.HRegionInfo;
37 import org.apache.hadoop.hbase.HRegionLocation;
38 import org.apache.hadoop.hbase.NotServingRegionException;
39 import org.apache.hadoop.hbase.RegionLocations;
40 import org.apache.hadoop.hbase.RemoteExceptionHandler;
41 import org.apache.hadoop.hbase.ServerName;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.UnknownScannerException;
44 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
45 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
46 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
47 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
48 import org.apache.hadoop.hbase.protobuf.RequestConverter;
49 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
50 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
51 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
52 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
53 import org.apache.hadoop.ipc.RemoteException;
54 import org.apache.hadoop.net.DNS;
55
56 import com.google.protobuf.ServiceException;
57 import com.google.protobuf.TextFormat;
58
59
60
61
62
63
64 @InterfaceAudience.Private
65 public class ScannerCallable extends RegionServerCallable<Result[]> {
66 public static final String LOG_SCANNER_LATENCY_CUTOFF
67 = "hbase.client.log.scanner.latency.cutoff";
68 public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
69
70
71 public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
72 protected long scannerId = -1L;
73 protected boolean instantiated = false;
74 protected boolean closed = false;
75 protected boolean renew = false;
76 private Scan scan;
77 private int caching = 1;
78 protected final ClusterConnection cConnection;
79 protected ScanMetrics scanMetrics;
80 private boolean logScannerActivity = false;
81 private int logCutOffLatency = 1000;
82 private static String myAddress;
83 protected final int id;
84 protected boolean serverHasMoreResultsContext;
85 protected boolean serverHasMoreResults;
86
87
88
89
90
91 protected boolean heartbeatMessage = false;
92 static {
93 try {
94 myAddress = DNS.getDefaultHost("default", "default");
95 } catch (UnknownHostException uhe) {
96 LOG.error("cannot determine my address", uhe);
97 }
98 }
99
100
101 protected boolean isRegionServerRemote = true;
102 private long nextCallSeq = 0;
103 protected RpcControllerFactory controllerFactory;
104 protected PayloadCarryingRpcController controller;
105
106
107
108
109
110
111
112
113
114
115 public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
116 ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
117 this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
118 }
119
120
121
122
123
124
125
126
127 public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
128 ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
129 super(connection, tableName, scan.getStartRow());
130 this.id = id;
131 this.cConnection = connection;
132 this.scan = scan;
133 this.scanMetrics = scanMetrics;
134 Configuration conf = connection.getConfiguration();
135 logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
136 logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
137 this.controllerFactory = rpcControllerFactory;
138 }
139
140 PayloadCarryingRpcController getController() {
141 return controller;
142 }
143
144
145
146
147
148 @Override
149 public void prepare(boolean reload) throws IOException {
150 if (Thread.interrupted()) {
151 throw new InterruptedIOException();
152 }
153 RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
154 id, getConnection(), getTableName(), getRow());
155 location = id < rl.size() ? rl.getRegionLocation(id) : null;
156 if (location == null || location.getServerName() == null) {
157
158
159 throw new HBaseIOException("There is no location for replica id #" + id);
160 }
161 ServerName dest = location.getServerName();
162 setStub(super.getConnection().getClient(dest));
163 if (!instantiated || reload) {
164 checkIfRegionServerIsRemote();
165 instantiated = true;
166 }
167
168
169
170
171 if (reload && this.scanMetrics != null) {
172 this.scanMetrics.countOfRPCRetries.incrementAndGet();
173 if (isRegionServerRemote) {
174 this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
175 }
176 }
177 }
178
179
180
181
182
183 protected void checkIfRegionServerIsRemote() {
184 if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
185 isRegionServerRemote = false;
186 } else {
187 isRegionServerRemote = true;
188 }
189 }
190
191
192 @Override
193 public Result [] call(int callTimeout) throws IOException {
194 if (Thread.interrupted()) {
195 throw new InterruptedIOException();
196 }
197 if (closed) {
198 if (scannerId != -1) {
199 close();
200 }
201 } else {
202 if (scannerId == -1L) {
203 this.scannerId = openScanner();
204 } else {
205 Result [] rrs = null;
206 ScanRequest request = null;
207
208 setHeartbeatMessage(false);
209 try {
210 incRPCcallsMetrics();
211 request =
212 RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
213 this.scanMetrics != null, renew);
214 ScanResponse response = null;
215 controller = controllerFactory.newController();
216 controller.setPriority(getTableName());
217 controller.setCallTimeout(callTimeout);
218 try {
219 response = getStub().scan(controller, request);
220
221
222
223
224
225
226
227
228
229 nextCallSeq++;
230 long timestamp = System.currentTimeMillis();
231 setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
232
233 CellScanner cellScanner = controller.cellScanner();
234 rrs = ResponseConverter.getResults(cellScanner, response);
235 if (logScannerActivity) {
236 long now = System.currentTimeMillis();
237 if (now - timestamp > logCutOffLatency) {
238 int rows = rrs == null ? 0 : rrs.length;
239 LOG.info("Took " + (now-timestamp) + "ms to fetch "
240 + rows + " rows from scanner=" + scannerId);
241 }
242 }
243 updateServerSideMetrics(response);
244
245 if (response.hasMoreResults() && !response.getMoreResults()) {
246 scannerId = -1L;
247 closed = true;
248
249 return null;
250 }
251
252
253 if (response.hasMoreResultsInRegion()) {
254
255 setHasMoreResultsContext(true);
256 setServerHasMoreResults(response.getMoreResultsInRegion());
257 } else {
258
259 setHasMoreResultsContext(false);
260 }
261 } catch (ServiceException se) {
262 throw ProtobufUtil.getRemoteException(se);
263 }
264 updateResultsMetrics(rrs);
265 } catch (IOException e) {
266 if (logScannerActivity) {
267 LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
268 + " to " + getLocation(), e);
269 }
270 IOException ioe = e;
271 if (e instanceof RemoteException) {
272 ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
273 }
274 if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
275 try {
276 HRegionLocation location =
277 getConnection().relocateRegion(getTableName(), scan.getStartRow());
278 LOG.info("Scanner=" + scannerId
279 + " expired, current region location is " + location.toString());
280 } catch (Throwable t) {
281 LOG.info("Failed to relocate region", t);
282 }
283 }
284
285
286
287
288
289
290 if (ioe instanceof NotServingRegionException) {
291
292
293
294 if (this.scanMetrics != null) {
295 this.scanMetrics.countOfNSRE.incrementAndGet();
296 }
297 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
298 } else if (ioe instanceof RegionServerStoppedException) {
299
300
301 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
302 } else {
303
304 throw ioe;
305 }
306 }
307 return rrs;
308 }
309 }
310 return null;
311 }
312
313
314
315
316
317
318
319 protected boolean isHeartbeatMessage() {
320 return heartbeatMessage;
321 }
322
323 protected void setHeartbeatMessage(boolean heartbeatMessage) {
324 this.heartbeatMessage = heartbeatMessage;
325 }
326
327 private void incRPCcallsMetrics() {
328 if (this.scanMetrics == null) {
329 return;
330 }
331 this.scanMetrics.countOfRPCcalls.incrementAndGet();
332 if (isRegionServerRemote) {
333 this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
334 }
335 }
336
337 protected void updateResultsMetrics(Result[] rrs) {
338 if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
339 return;
340 }
341 long resultSize = 0;
342 for (Result rr : rrs) {
343 for (Cell cell : rr.rawCells()) {
344 resultSize += CellUtil.estimatedSerializedSizeOf(cell);
345 }
346 }
347 this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
348 if (isRegionServerRemote) {
349 this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
350 }
351 }
352
353
354
355
356
357
358
359 private void updateServerSideMetrics(ScanResponse response) {
360 if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return;
361
362 Map<String, Long> serverMetrics = ResponseConverter.getScanMetrics(response);
363 for (Entry<String, Long> entry : serverMetrics.entrySet()) {
364 this.scanMetrics.addToCounter(entry.getKey(), entry.getValue());
365 }
366 }
367
368 private void close() {
369 if (this.scannerId == -1L) {
370 return;
371 }
372 try {
373 incRPCcallsMetrics();
374 ScanRequest request =
375 RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
376 try {
377 getStub().scan(null, request);
378 } catch (ServiceException se) {
379 throw ProtobufUtil.getRemoteException(se);
380 }
381 } catch (IOException e) {
382 LOG.warn("Ignore, probably already closed", e);
383 }
384 this.scannerId = -1L;
385 }
386
387 protected long openScanner() throws IOException {
388 incRPCcallsMetrics();
389 ScanRequest request =
390 RequestConverter.buildScanRequest(
391 getLocation().getRegionInfo().getRegionName(),
392 this.scan, 0, false);
393 try {
394 ScanResponse response = getStub().scan(null, request);
395 long id = response.getScannerId();
396 if (logScannerActivity) {
397 LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
398 + " on region " + getLocation().toString());
399 }
400 return id;
401 } catch (ServiceException se) {
402 throw ProtobufUtil.getRemoteException(se);
403 }
404 }
405
406 protected Scan getScan() {
407 return scan;
408 }
409
410
411
412
413 public void setClose() {
414 this.closed = true;
415 }
416
417
418
419
420
421
422 public void setRenew(boolean val) {
423 this.renew = val;
424 }
425
426
427
428
429 @Override
430 public HRegionInfo getHRegionInfo() {
431 if (!instantiated) {
432 return null;
433 }
434 return getLocation().getRegionInfo();
435 }
436
437
438
439
440
441 public int getCaching() {
442 return caching;
443 }
444
445 @Override
446 public ClusterConnection getConnection() {
447 return cConnection;
448 }
449
450
451
452
453
454 public void setCaching(int caching) {
455 this.caching = caching;
456 }
457
458 public ScannerCallable getScannerCallableForReplica(int id) {
459 ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
460 this.getScan(), this.scanMetrics, controllerFactory, id);
461 s.setCaching(this.caching);
462 return s;
463 }
464
465
466
467
468
469 protected boolean getServerHasMoreResults() {
470 assert serverHasMoreResultsContext;
471 return this.serverHasMoreResults;
472 }
473
474 protected void setServerHasMoreResults(boolean serverHasMoreResults) {
475 this.serverHasMoreResults = serverHasMoreResults;
476 }
477
478
479
480
481
482
483 protected boolean hasMoreResultsContext() {
484 return serverHasMoreResultsContext;
485 }
486
487 protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
488 this.serverHasMoreResultsContext = serverHasMoreResultsContext;
489 }
490 }