1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.List;
27 import java.util.concurrent.atomic.AtomicLong;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.KeyValue;
36 import org.apache.hadoop.hbase.KeyValueUtil;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.io.TimeRange;
39 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
40 import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
41
42
43
44
45
46 @InterfaceAudience.LimitedPrivate("Coprocessor")
47 public class StoreFileScanner implements KeyValueScanner {
48 private static final Log LOG = LogFactory.getLog(HStore.class);
49
50
51 private final StoreFile.Reader reader;
52 private final HFileScanner hfs;
53 private Cell cur = null;
54
55 private boolean realSeekDone;
56 private boolean delayedReseek;
57 private Cell delayedSeekKV;
58
59 private boolean enforceMVCC = false;
60 private boolean hasMVCCInfo = false;
61
62
63 private boolean stopSkippingKVsIfNextRow = false;
64
65 private static AtomicLong seekCount;
66
67 private ScanQueryMatcher matcher;
68
69 private long readPt;
70
71
72
73
74
75 public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC,
76 boolean hasMVCC, long readPt) {
77 this.readPt = readPt;
78 this.reader = reader;
79 this.hfs = hfs;
80 this.enforceMVCC = useMVCC;
81 this.hasMVCCInfo = hasMVCC;
82 }
83
84 boolean isPrimaryReplica() {
85 return reader.isPrimaryReplicaReader();
86 }
87
88
89
90
91
92 public static List<StoreFileScanner> getScannersForStoreFiles(
93 Collection<StoreFile> files,
94 boolean cacheBlocks,
95 boolean usePread, long readPt) throws IOException {
96 return getScannersForStoreFiles(files, cacheBlocks,
97 usePread, false, false, readPt);
98 }
99
100
101
102
103 public static List<StoreFileScanner> getScannersForStoreFiles(
104 Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
105 boolean isCompaction, boolean useDropBehind, long readPt) throws IOException {
106 return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
107 useDropBehind, null, readPt);
108 }
109
110
111
112
113
114
115 public static List<StoreFileScanner> getScannersForStoreFiles(
116 Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
117 boolean isCompaction, boolean canUseDrop,
118 ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException {
119 List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
120 files.size());
121 for (StoreFile file : files) {
122 StoreFile.Reader r = file.createReader(canUseDrop);
123 r.setReplicaStoreFile(isPrimaryReplica);
124 StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
125 isCompaction, readPt);
126 scanner.setScanQueryMatcher(matcher);
127 scanners.add(scanner);
128 }
129 return scanners;
130 }
131
132 public static List<StoreFileScanner> getScannersForStoreFiles(
133 Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
134 boolean isCompaction, boolean canUseDrop,
135 ScanQueryMatcher matcher, long readPt) throws IOException {
136 return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, canUseDrop,
137 matcher, readPt, true);
138 }
139
140 public String toString() {
141 return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
142 }
143
144 public Cell peek() {
145 return cur;
146 }
147
148 public Cell next() throws IOException {
149 Cell retKey = cur;
150
151 try {
152
153 if (cur != null) {
154 hfs.next();
155 setCurrentCell(hfs.getKeyValue());
156 if (hasMVCCInfo || this.reader.isBulkLoaded()) {
157 skipKVsNewerThanReadpoint();
158 }
159 }
160 } catch (FileNotFoundException e) {
161 throw e;
162 } catch(IOException e) {
163 throw new IOException("Could not iterate " + this, e);
164 }
165 return retKey;
166 }
167
168 public boolean seek(Cell key) throws IOException {
169 if (seekCount != null) seekCount.incrementAndGet();
170
171 try {
172 try {
173 if(!seekAtOrAfter(hfs, key)) {
174 close();
175 return false;
176 }
177
178 setCurrentCell(hfs.getKeyValue());
179
180 if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
181 return skipKVsNewerThanReadpoint();
182 } else {
183 return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
184 }
185 } finally {
186 realSeekDone = true;
187 }
188 } catch (FileNotFoundException e) {
189 throw e;
190 } catch (IOException ioe) {
191 throw new IOException("Could not seek " + this + " to key " + key, ioe);
192 }
193 }
194
195 public boolean reseek(Cell key) throws IOException {
196 if (seekCount != null) seekCount.incrementAndGet();
197
198 try {
199 try {
200 if (!reseekAtOrAfter(hfs, key)) {
201 close();
202 return false;
203 }
204 setCurrentCell(hfs.getKeyValue());
205
206 if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
207 return skipKVsNewerThanReadpoint();
208 } else {
209 return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
210 }
211 } finally {
212 realSeekDone = true;
213 }
214 } catch (FileNotFoundException e) {
215 throw e;
216 } catch (IOException ioe) {
217 throw new IOException("Could not reseek " + this + " to key " + key,
218 ioe);
219 }
220 }
221
222 protected void setCurrentCell(Cell newVal) throws IOException {
223 this.cur = newVal;
224 if (this.cur != null && this.reader.isBulkLoaded()) {
225 CellUtil.setSequenceId(cur, this.reader.getSequenceID());
226 }
227 }
228
229 protected boolean skipKVsNewerThanReadpoint() throws IOException {
230
231
232 Cell startKV = cur;
233 while(enforceMVCC
234 && cur != null
235 && (cur.getMvccVersion() > readPt)) {
236 hfs.next();
237 setCurrentCell(hfs.getKeyValue());
238 if (this.stopSkippingKVsIfNextRow
239 && getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
240 cur.getRowLength(), startKV.getRowArray(), startKV.getRowOffset(),
241 startKV.getRowLength()) > 0) {
242 return false;
243 }
244 }
245
246 if (cur == null) {
247 close();
248 return false;
249 }
250
251 return true;
252 }
253
254 public void close() {
255
256 cur = null;
257 }
258
259
260
261
262
263
264
265
266 public static boolean seekAtOrAfter(HFileScanner s, Cell k)
267 throws IOException {
268 int result = s.seekTo(k);
269 if(result < 0) {
270 if (result == HConstants.INDEX_KEY_MAGIC) {
271
272 return true;
273 }
274
275 return s.seekTo();
276 } else if(result > 0) {
277
278
279 return s.next();
280 }
281
282 return true;
283 }
284
285 static boolean reseekAtOrAfter(HFileScanner s, Cell k)
286 throws IOException {
287
288 int result = s.reseekTo(k);
289 if (result <= 0) {
290 if (result == HConstants.INDEX_KEY_MAGIC) {
291
292 return true;
293 }
294
295
296
297 if (!s.isSeeked()) {
298 return s.seekTo();
299 }
300 return true;
301 }
302
303
304 return s.next();
305 }
306
307 @Override
308 public long getSequenceID() {
309 return reader.getSequenceID();
310 }
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326 @Override
327 public boolean requestSeek(Cell kv, boolean forward, boolean useBloom)
328 throws IOException {
329 if (kv.getFamilyLength() == 0) {
330 useBloom = false;
331 }
332
333 boolean haveToSeek = true;
334 if (useBloom) {
335
336 if (reader.getBloomFilterType() == BloomType.ROWCOL) {
337 haveToSeek = reader.passesGeneralBloomFilter(kv.getRowArray(),
338 kv.getRowOffset(), kv.getRowLength(), kv.getQualifierArray(),
339 kv.getQualifierOffset(), kv.getQualifierLength());
340 } else if (this.matcher != null && !matcher.hasNullColumnInQuery() &&
341 ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) {
342
343
344 haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(),
345 kv.getRowOffset(), kv.getRowLength());
346 }
347 }
348
349 delayedReseek = forward;
350 delayedSeekKV = kv;
351
352 if (haveToSeek) {
353
354
355 realSeekDone = false;
356 long maxTimestampInFile = reader.getMaxTimestamp();
357 long seekTimestamp = kv.getTimestamp();
358 if (seekTimestamp > maxTimestampInFile) {
359
360
361
362
363
364
365 setCurrentCell(KeyValueUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
366 } else {
367
368
369
370
371 enforceSeek();
372 }
373 return cur != null;
374 }
375
376
377
378
379
380
381
382
383 setCurrentCell(KeyValueUtil.createLastOnRowCol(kv));
384
385 realSeekDone = true;
386 return true;
387 }
388
389 Reader getReader() {
390 return reader;
391 }
392
393 KeyValue.KVComparator getComparator() {
394 return reader.getComparator();
395 }
396
397 @Override
398 public boolean realSeekDone() {
399 return realSeekDone;
400 }
401
402 @Override
403 public void enforceSeek() throws IOException {
404 if (realSeekDone)
405 return;
406
407 if (delayedReseek) {
408 reseek(delayedSeekKV);
409 } else {
410 seek(delayedSeekKV);
411 }
412 }
413
414 public void setScanQueryMatcher(ScanQueryMatcher matcher) {
415 this.matcher = matcher;
416 }
417
418 @Override
419 public boolean isFileScanner() {
420 return true;
421 }
422
423
424
425 static final long getSeekCount() {
426 return seekCount.get();
427 }
428 static final void instrument() {
429 seekCount = new AtomicLong();
430 }
431
432 @Override
433 public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
434
435 byte[] cf = store.getFamily().getName();
436 TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
437 if (timeRange == null) {
438 timeRange = scan.getTimeRange();
439 }
440 return reader.passesTimerangeFilter(timeRange, oldestUnexpiredTS) && reader
441 .passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, scan.getFamilyMap().get(cf));
442 }
443
444 @Override
445 @SuppressWarnings("deprecation")
446 public boolean seekToPreviousRow(Cell key) throws IOException {
447 try {
448 try {
449 KeyValue seekKey = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(),
450 key.getRowLength());
451 if (seekCount != null) seekCount.incrementAndGet();
452 if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(),
453 seekKey.getKeyLength())) {
454 close();
455 return false;
456 }
457 KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(hfs.getKeyValue()
458 .getRowArray(), hfs.getKeyValue().getRowOffset(), hfs.getKeyValue().getRowLength());
459
460 if (seekCount != null) seekCount.incrementAndGet();
461 if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
462 close();
463 return false;
464 }
465
466 setCurrentCell(hfs.getKeyValue());
467 this.stopSkippingKVsIfNextRow = true;
468 boolean resultOfSkipKVs;
469 try {
470 resultOfSkipKVs = skipKVsNewerThanReadpoint();
471 } finally {
472 this.stopSkippingKVsIfNextRow = false;
473 }
474 if (!resultOfSkipKVs
475 || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) {
476 return seekToPreviousRow(firstKeyOfPreviousRow);
477 }
478
479 return true;
480 } finally {
481 realSeekDone = true;
482 }
483 } catch (FileNotFoundException e) {
484 throw e;
485 } catch (IOException ioe) {
486 throw new IOException("Could not seekToPreviousRow " + this + " to key "
487 + key, ioe);
488 }
489 }
490
491 @Override
492 public boolean seekToLastRow() throws IOException {
493 byte[] lastRow = reader.getLastRowKey();
494 if (lastRow == null) {
495 return false;
496 }
497 KeyValue seekKey = KeyValueUtil.createFirstOnRow(lastRow);
498 if (seek(seekKey)) {
499 return true;
500 } else {
501 return seekToPreviousRow(seekKey);
502 }
503 }
504
505 @Override
506 public boolean backwardSeek(Cell key) throws IOException {
507 seek(key);
508 if (cur == null
509 || getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
510 cur.getRowLength(), key.getRowArray(), key.getRowOffset(),
511 key.getRowLength()) > 0) {
512 return seekToPreviousRow(key);
513 }
514 return true;
515 }
516
517 @Override
518 public Cell getNextIndexedKey() {
519 return hfs.getNextIndexedKey();
520 }
521 }