1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.FileNotFoundException;
23 import java.io.InterruptedIOException;
24 import java.net.InetSocketAddress;
25 import java.security.Key;
26 import java.security.KeyException;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.NavigableSet;
33 import java.util.Set;
34 import java.util.SortedSet;
35 import java.util.concurrent.Callable;
36 import java.util.concurrent.CompletionService;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.ExecutorCompletionService;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.ThreadPoolExecutor;
42 import java.util.concurrent.atomic.AtomicBoolean;
43 import java.util.concurrent.atomic.AtomicLong;
44 import java.util.concurrent.locks.ReentrantReadWriteLock;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.classification.InterfaceAudience;
49 import org.apache.hadoop.conf.Configuration;
50 import org.apache.hadoop.fs.FileSystem;
51 import org.apache.hadoop.fs.Path;
52 import org.apache.hadoop.hbase.Cell;
53 import org.apache.hadoop.hbase.CompoundConfiguration;
54 import org.apache.hadoop.hbase.HColumnDescriptor;
55 import org.apache.hadoop.hbase.HConstants;
56 import org.apache.hadoop.hbase.HRegionInfo;
57 import org.apache.hadoop.hbase.KeyValue;
58 import org.apache.hadoop.hbase.RemoteExceptionHandler;
59 import org.apache.hadoop.hbase.TableName;
60 import org.apache.hadoop.hbase.client.Scan;
61 import org.apache.hadoop.hbase.io.compress.Compression;
62 import org.apache.hadoop.hbase.io.crypto.Cipher;
63 import org.apache.hadoop.hbase.io.crypto.Encryption;
64 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
65 import org.apache.hadoop.hbase.io.hfile.HFile;
66 import org.apache.hadoop.hbase.io.hfile.HFileContext;
67 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
68 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
69 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
70 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
71 import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
72 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
73 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
74 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
75 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
76 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
77 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
78 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
79 import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
80 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
81 import org.apache.hadoop.hbase.security.EncryptionUtil;
82 import org.apache.hadoop.hbase.security.User;
83 import org.apache.hadoop.hbase.util.Bytes;
84 import org.apache.hadoop.hbase.util.ChecksumType;
85 import org.apache.hadoop.hbase.util.ClassSize;
86 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
87 import org.apache.hadoop.util.StringUtils;
88
89 import com.google.common.annotations.VisibleForTesting;
90 import com.google.common.base.Preconditions;
91 import com.google.common.collect.ImmutableCollection;
92 import com.google.common.collect.ImmutableList;
93 import com.google.common.collect.Lists;
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118 @InterfaceAudience.Private
119 public class HStore implements Store {
120 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
121 "hbase.server.compactchecker.interval.multiplier";
122 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
123 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
124 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
125
126 static final Log LOG = LogFactory.getLog(HStore.class);
127
128 protected final MemStore memstore;
129
130 private final HRegion region;
131 private final HColumnDescriptor family;
132 private final HRegionFileSystem fs;
133 private final Configuration conf;
134 private final CacheConfig cacheConf;
135 private long lastCompactSize = 0;
136 volatile boolean forceMajor = false;
137
138 static int closeCheckInterval = 0;
139 private volatile long storeSize = 0L;
140 private volatile long totalUncompressedBytes = 0L;
141
142
143
144
145
146
147
148
149
150
151 final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
152 private final boolean verifyBulkLoads;
153
154 private ScanInfo scanInfo;
155
156 final List<StoreFile> filesCompacting = Lists.newArrayList();
157
158
159 private final Set<ChangedReadersObserver> changedReaderObservers =
160 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
161
162 private final int blocksize;
163 private HFileDataBlockEncoder dataBlockEncoder;
164
165
166 private ChecksumType checksumType;
167 private int bytesPerChecksum;
168
169
170 private final KeyValue.KVComparator comparator;
171
172 final StoreEngine<?, ?, ?, ?> storeEngine;
173
174 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
175 private final OffPeakHours offPeakHours;
176
177 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
178 private int flushRetriesNumber;
179 private int pauseTime;
180
181 private long blockingFileCount;
182 private int compactionCheckMultiplier;
183
184 private Encryption.Context cryptoContext = Encryption.Context.NONE;
185
186
187
188
189
190
191
192
193
194 protected HStore(final HRegion region, final HColumnDescriptor family,
195 final Configuration confParam) throws IOException {
196
197 HRegionInfo info = region.getRegionInfo();
198 this.fs = region.getRegionFileSystem();
199
200
201 fs.createStoreDir(family.getNameAsString());
202 this.region = region;
203 this.family = family;
204
205
206
207 this.conf = new CompoundConfiguration()
208 .add(confParam)
209 .addStringMap(region.getTableDesc().getConfiguration())
210 .addStringMap(family.getConfiguration())
211 .addWritableMap(family.getValues());
212 this.blocksize = family.getBlocksize();
213
214 this.dataBlockEncoder =
215 new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
216
217 this.comparator = info.getComparator();
218
219 long timeToPurgeDeletes =
220 Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
221 LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
222 "ms in store " + this);
223
224 long ttl = determineTTLFromFamily(family);
225
226
227 scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
228 this.memstore = new MemStore(conf, this.comparator);
229 this.offPeakHours = OffPeakHours.getInstance(conf);
230
231
232 this.cacheConf = new CacheConfig(conf, family);
233
234 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
235
236 this.blockingFileCount =
237 conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
238 this.compactionCheckMultiplier = conf.getInt(
239 COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
240 if (this.compactionCheckMultiplier <= 0) {
241 LOG.error("Compaction check period multiplier must be positive, setting default: "
242 + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
243 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
244 }
245
246 if (HStore.closeCheckInterval == 0) {
247 HStore.closeCheckInterval = conf.getInt(
248 "hbase.hstore.close.check.interval", 10*1000*1000
249 }
250
251 this.storeEngine = StoreEngine.create(this, this.conf, this.comparator);
252 this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
253
254
255 this.checksumType = getChecksumType(conf);
256
257 this.bytesPerChecksum = getBytesPerChecksum(conf);
258 flushRetriesNumber = conf.getInt(
259 "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
260 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
261 if (flushRetriesNumber <= 0) {
262 throw new IllegalArgumentException(
263 "hbase.hstore.flush.retries.number must be > 0, not "
264 + flushRetriesNumber);
265 }
266
267
268 String cipherName = family.getEncryptionType();
269 if (cipherName != null) {
270 Cipher cipher;
271 Key key;
272 byte[] keyBytes = family.getEncryptionKey();
273 if (keyBytes != null) {
274
275 String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
276 User.getCurrent().getShortName());
277 try {
278
279 key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
280 } catch (KeyException e) {
281
282
283 if (LOG.isDebugEnabled()) {
284 LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
285 }
286 String alternateKeyName =
287 conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
288 if (alternateKeyName != null) {
289 try {
290 key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
291 } catch (KeyException ex) {
292 throw new IOException(ex);
293 }
294 } else {
295 throw new IOException(e);
296 }
297 }
298
299 cipher = Encryption.getCipher(conf, key.getAlgorithm());
300 if (cipher == null) {
301 throw new RuntimeException("Cipher '" + cipher + "' is not available");
302 }
303
304
305
306 if (!cipher.getName().equalsIgnoreCase(cipherName)) {
307 throw new RuntimeException("Encryption for family '" + family.getNameAsString() +
308 "' configured with type '" + cipherName +
309 "' but key specifies algorithm '" + cipher.getName() + "'");
310 }
311 } else {
312
313 cipher = Encryption.getCipher(conf, cipherName);
314 if (cipher == null) {
315 throw new RuntimeException("Cipher '" + cipher + "' is not available");
316 }
317 key = cipher.getRandomKey();
318 }
319 cryptoContext = Encryption.newContext(conf);
320 cryptoContext.setCipher(cipher);
321 cryptoContext.setKey(key);
322 }
323 }
324
325
326
327
328
329 private static long determineTTLFromFamily(final HColumnDescriptor family) {
330
331 long ttl = family.getTimeToLive();
332 if (ttl == HConstants.FOREVER) {
333
334 ttl = Long.MAX_VALUE;
335 } else if (ttl == -1) {
336 ttl = Long.MAX_VALUE;
337 } else {
338
339 ttl *= 1000;
340 }
341 return ttl;
342 }
343
344 @Override
345 public String getColumnFamilyName() {
346 return this.family.getNameAsString();
347 }
348
349 @Override
350 public TableName getTableName() {
351 return this.getRegionInfo().getTable();
352 }
353
354 @Override
355 public FileSystem getFileSystem() {
356 return this.fs.getFileSystem();
357 }
358
359 public HRegionFileSystem getRegionFileSystem() {
360 return this.fs;
361 }
362
363
364 @Override
365 public long getStoreFileTtl() {
366
367 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
368 }
369
370 @Override
371 public long getMemstoreFlushSize() {
372
373 return this.region.memstoreFlushSize;
374 }
375
376 @Override
377 public long getFlushableSize() {
378 return this.memstore.getFlushableSize();
379 }
380
381 @Override
382 public long getCompactionCheckMultiplier() {
383 return this.compactionCheckMultiplier;
384 }
385
386 @Override
387 public long getBlockingFileCount() {
388 return blockingFileCount;
389 }
390
391
392
393
394
395
396
397 public static int getBytesPerChecksum(Configuration conf) {
398 return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
399 HFile.DEFAULT_BYTES_PER_CHECKSUM);
400 }
401
402
403
404
405
406
407 public static ChecksumType getChecksumType(Configuration conf) {
408 String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
409 if (checksumName == null) {
410 return HFile.DEFAULT_CHECKSUM_TYPE;
411 } else {
412 return ChecksumType.nameToType(checksumName);
413 }
414 }
415
416
417
418
419 public static int getCloseCheckInterval() {
420 return closeCheckInterval;
421 }
422
423 @Override
424 public HColumnDescriptor getFamily() {
425 return this.family;
426 }
427
428
429
430
431 long getMaxSequenceId() {
432 return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
433 }
434
435 @Override
436 public long getMaxMemstoreTS() {
437 return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
438 }
439
440
441
442
443
444
445
446 @Deprecated
447 public static Path getStoreHomedir(final Path tabledir,
448 final HRegionInfo hri, final byte[] family) {
449 return getStoreHomedir(tabledir, hri.getEncodedName(), family);
450 }
451
452
453
454
455
456
457
458 @Deprecated
459 public static Path getStoreHomedir(final Path tabledir,
460 final String encodedName, final byte[] family) {
461 return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
462 }
463
464 @Override
465 public HFileDataBlockEncoder getDataBlockEncoder() {
466 return dataBlockEncoder;
467 }
468
469
470
471
472
473 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
474 this.dataBlockEncoder = blockEncoder;
475 }
476
477
478
479
480
481
482 private List<StoreFile> loadStoreFiles() throws IOException {
483 Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
484 if (files == null || files.size() == 0) {
485 return new ArrayList<StoreFile>();
486 }
487
488
489 ThreadPoolExecutor storeFileOpenerThreadPool =
490 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
491 this.getColumnFamilyName());
492 CompletionService<StoreFile> completionService =
493 new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
494
495 int totalValidStoreFile = 0;
496 for (final StoreFileInfo storeFileInfo: files) {
497
498 completionService.submit(new Callable<StoreFile>() {
499 @Override
500 public StoreFile call() throws IOException {
501 StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
502 return storeFile;
503 }
504 });
505 totalValidStoreFile++;
506 }
507
508 ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
509 IOException ioe = null;
510 try {
511 for (int i = 0; i < totalValidStoreFile; i++) {
512 try {
513 Future<StoreFile> future = completionService.take();
514 StoreFile storeFile = future.get();
515 long length = storeFile.getReader().length();
516 this.storeSize += length;
517 this.totalUncompressedBytes +=
518 storeFile.getReader().getTotalUncompressedBytes();
519 if (LOG.isDebugEnabled()) {
520 LOG.debug("loaded " + storeFile.toStringDetailed());
521 }
522 results.add(storeFile);
523 } catch (InterruptedException e) {
524 if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
525 } catch (ExecutionException e) {
526 if (ioe == null) ioe = new IOException(e.getCause());
527 }
528 }
529 } finally {
530 storeFileOpenerThreadPool.shutdownNow();
531 }
532 if (ioe != null) {
533
534 for (StoreFile file : results) {
535 try {
536 if (file != null) file.closeReader(true);
537 } catch (IOException e) {
538 LOG.warn(e.getMessage());
539 }
540 }
541 throw ioe;
542 }
543
544 return results;
545 }
546
547 private StoreFile createStoreFileAndReader(final Path p) throws IOException {
548 StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
549 return createStoreFileAndReader(info);
550 }
551
552 private StoreFile createStoreFileAndReader(final StoreFileInfo info)
553 throws IOException {
554 info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
555 StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
556 this.family.getBloomFilterType());
557 storeFile.createReader();
558 return storeFile;
559 }
560
561 @Override
562 public long add(final KeyValue kv) {
563 lock.readLock().lock();
564 try {
565 return this.memstore.add(kv);
566 } finally {
567 lock.readLock().unlock();
568 }
569 }
570
571 @Override
572 public long timeOfOldestEdit() {
573 return memstore.timeOfOldestEdit();
574 }
575
576
577
578
579
580
581
582 protected long delete(final KeyValue kv) {
583 lock.readLock().lock();
584 try {
585 return this.memstore.delete(kv);
586 } finally {
587 lock.readLock().unlock();
588 }
589 }
590
591 @Override
592 public void rollback(final KeyValue kv) {
593 lock.readLock().lock();
594 try {
595 this.memstore.rollback(kv);
596 } finally {
597 lock.readLock().unlock();
598 }
599 }
600
601
602
603
604 @Override
605 public Collection<StoreFile> getStorefiles() {
606 return this.storeEngine.getStoreFileManager().getStorefiles();
607 }
608
609 @Override
610 public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
611 HFile.Reader reader = null;
612 try {
613 LOG.info("Validating hfile at " + srcPath + " for inclusion in "
614 + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
615 reader = HFile.createReader(srcPath.getFileSystem(conf),
616 srcPath, cacheConf, conf);
617 reader.loadFileInfo();
618
619 byte[] firstKey = reader.getFirstRowKey();
620 Preconditions.checkState(firstKey != null, "First key can not be null");
621 byte[] lk = reader.getLastKey();
622 Preconditions.checkState(lk != null, "Last key can not be null");
623 byte[] lastKey = KeyValue.createKeyValueFromKey(lk).getRow();
624
625 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
626 " last=" + Bytes.toStringBinary(lastKey));
627 LOG.debug("Region bounds: first=" +
628 Bytes.toStringBinary(getRegionInfo().getStartKey()) +
629 " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
630
631 if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
632 throw new WrongRegionException(
633 "Bulk load file " + srcPath.toString() + " does not fit inside region "
634 + this.getRegionInfo().getRegionNameAsString());
635 }
636
637 if (verifyBulkLoads) {
638 KeyValue prevKV = null;
639 HFileScanner scanner = reader.getScanner(false, false, false);
640 scanner.seekTo();
641 do {
642 KeyValue kv = scanner.getKeyValue();
643 if (prevKV != null) {
644 if (Bytes.compareTo(prevKV.getBuffer(), prevKV.getRowOffset(),
645 prevKV.getRowLength(), kv.getBuffer(), kv.getRowOffset(),
646 kv.getRowLength()) > 0) {
647 throw new InvalidHFileException("Previous row is greater than"
648 + " current row: path=" + srcPath + " previous="
649 + Bytes.toStringBinary(prevKV.getKey()) + " current="
650 + Bytes.toStringBinary(kv.getKey()));
651 }
652 if (Bytes.compareTo(prevKV.getBuffer(), prevKV.getFamilyOffset(),
653 prevKV.getFamilyLength(), kv.getBuffer(), kv.getFamilyOffset(),
654 kv.getFamilyLength()) != 0) {
655 throw new InvalidHFileException("Previous key had different"
656 + " family compared to current key: path=" + srcPath
657 + " previous=" + Bytes.toStringBinary(prevKV.getFamily())
658 + " current=" + Bytes.toStringBinary(kv.getFamily()));
659 }
660 }
661 prevKV = kv;
662 } while (scanner.next());
663 }
664 } finally {
665 if (reader != null) reader.close();
666 }
667 }
668
669 @Override
670 public void bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
671 Path srcPath = new Path(srcPathStr);
672 Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
673
674 StoreFile sf = createStoreFileAndReader(dstPath);
675
676 StoreFile.Reader r = sf.getReader();
677 this.storeSize += r.length();
678 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
679
680 LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() +
681 "' as " + dstPath + " - updating store file list.");
682
683
684 this.lock.writeLock().lock();
685 try {
686 this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
687 } finally {
688
689
690
691
692
693 this.lock.writeLock().unlock();
694 }
695 notifyChangedReadersObservers();
696 LOG.info("Successfully loaded store file " + srcPath
697 + " into store " + this + " (new location: " + dstPath + ")");
698 if (LOG.isTraceEnabled()) {
699 String traceMessage = "BULK LOAD time,size,store size,store files ["
700 + EnvironmentEdgeManager.currentTimeMillis() + "," + r.length() + "," + storeSize
701 + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
702 LOG.trace(traceMessage);
703 }
704 }
705
706 @Override
707 public ImmutableCollection<StoreFile> close() throws IOException {
708 this.lock.writeLock().lock();
709 try {
710
711 ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
712
713 if (!result.isEmpty()) {
714
715 ThreadPoolExecutor storeFileCloserThreadPool = this.region
716 .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
717 + this.getColumnFamilyName());
718
719
720 CompletionService<Void> completionService =
721 new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
722 for (final StoreFile f : result) {
723 completionService.submit(new Callable<Void>() {
724 @Override
725 public Void call() throws IOException {
726 f.closeReader(true);
727 return null;
728 }
729 });
730 }
731
732 IOException ioe = null;
733 try {
734 for (int i = 0; i < result.size(); i++) {
735 try {
736 Future<Void> future = completionService.take();
737 future.get();
738 } catch (InterruptedException e) {
739 if (ioe == null) {
740 ioe = new InterruptedIOException();
741 ioe.initCause(e);
742 }
743 } catch (ExecutionException e) {
744 if (ioe == null) ioe = new IOException(e.getCause());
745 }
746 }
747 } finally {
748 storeFileCloserThreadPool.shutdownNow();
749 }
750 if (ioe != null) throw ioe;
751 }
752 LOG.info("Closed " + this);
753 return result;
754 } finally {
755 this.lock.writeLock().unlock();
756 }
757 }
758
759
760
761
762
763
764 void snapshot() {
765 this.lock.writeLock().lock();
766 try {
767 this.memstore.snapshot();
768 } finally {
769 this.lock.writeLock().unlock();
770 }
771 }
772
773
774
775
776
777
778
779
780
781
782
783
784 protected List<Path> flushCache(final long logCacheFlushId,
785 SortedSet<KeyValue> snapshot,
786 TimeRangeTracker snapshotTimeRangeTracker,
787 AtomicLong flushedSize,
788 MonitoredTask status) throws IOException {
789
790
791
792
793
794 StoreFlusher flusher = storeEngine.getStoreFlusher();
795 IOException lastException = null;
796 for (int i = 0; i < flushRetriesNumber; i++) {
797 try {
798 List<Path> pathNames = flusher.flushSnapshot(
799 snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
800 Path lastPathName = null;
801 try {
802 for (Path pathName : pathNames) {
803 lastPathName = pathName;
804 validateStoreFile(pathName);
805 }
806 return pathNames;
807 } catch (Exception e) {
808 LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
809 if (e instanceof IOException) {
810 lastException = (IOException) e;
811 } else {
812 lastException = new IOException(e);
813 }
814 }
815 } catch (IOException e) {
816 LOG.warn("Failed flushing store file, retrying num=" + i, e);
817 lastException = e;
818 }
819 if (lastException != null && i < (flushRetriesNumber - 1)) {
820 try {
821 Thread.sleep(pauseTime);
822 } catch (InterruptedException e) {
823 IOException iie = new InterruptedIOException();
824 iie.initCause(e);
825 throw iie;
826 }
827 }
828 }
829 throw lastException;
830 }
831
832
833
834
835
836
837
838 private StoreFile commitFile(final Path path,
839 final long logCacheFlushId,
840 TimeRangeTracker snapshotTimeRangeTracker,
841 AtomicLong flushedSize,
842 MonitoredTask status)
843 throws IOException {
844
845 Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
846
847 status.setStatus("Flushing " + this + ": reopening flushed file");
848 StoreFile sf = createStoreFileAndReader(dstPath);
849
850 StoreFile.Reader r = sf.getReader();
851 this.storeSize += r.length();
852 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
853
854 if (LOG.isInfoEnabled()) {
855 LOG.info("Added " + sf + ", entries=" + r.getEntries() +
856 ", sequenceid=" + logCacheFlushId +
857 ", filesize=" + StringUtils.humanReadableInt(r.length()));
858 }
859 return sf;
860 }
861
862
863
864
865
866
867
868
869
870 @Override
871 public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
872 boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag)
873 throws IOException {
874 final CacheConfig writerCacheConf;
875 if (isCompaction) {
876
877 writerCacheConf = new CacheConfig(cacheConf);
878 writerCacheConf.setCacheDataOnWrite(false);
879 } else {
880 writerCacheConf = cacheConf;
881 }
882 InetSocketAddress[] favoredNodes = null;
883 if (region.getRegionServerServices() != null) {
884 favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
885 region.getRegionInfo().getEncodedName());
886 }
887 HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
888 cryptoContext);
889 StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
890 this.getFileSystem())
891 .withFilePath(fs.createTempName())
892 .withComparator(comparator)
893 .withBloomType(family.getBloomFilterType())
894 .withMaxKeyCount(maxKeyCount)
895 .withFavoredNodes(favoredNodes)
896 .withFileContext(hFileContext)
897 .build();
898 return w;
899 }
900
901 private HFileContext createFileContext(Compression.Algorithm compression,
902 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) {
903 if (compression == null) {
904 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
905 }
906 HFileContext hFileContext = new HFileContextBuilder()
907 .withIncludesMvcc(includeMVCCReadpoint)
908 .withIncludesTags(includesTag)
909 .withCompression(compression)
910 .withCompressTags(family.shouldCompressTags())
911 .withChecksumType(checksumType)
912 .withBytesPerCheckSum(bytesPerChecksum)
913 .withBlockSize(blocksize)
914 .withHBaseCheckSum(true)
915 .withDataBlockEncoding(family.getDataBlockEncoding())
916 .withEncryptionContext(cryptoContext)
917 .build();
918 return hFileContext;
919 }
920
921
922
923
924
925
926
927
928
929 private boolean updateStorefiles(
930 final List<StoreFile> sfs, final SortedSet<KeyValue> set) throws IOException {
931 this.lock.writeLock().lock();
932 try {
933 this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
934 this.memstore.clearSnapshot(set);
935 } finally {
936
937
938
939
940
941 this.lock.writeLock().unlock();
942 }
943
944
945 notifyChangedReadersObservers();
946
947 if (LOG.isTraceEnabled()) {
948 long totalSize = 0;
949 for (StoreFile sf : sfs) {
950 totalSize += sf.getReader().length();
951 }
952 String traceMessage = "FLUSH time,count,size,store size,store files ["
953 + EnvironmentEdgeManager.currentTimeMillis() + "," + sfs.size() + "," + totalSize
954 + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
955 LOG.trace(traceMessage);
956 }
957 return needsCompaction();
958 }
959
960
961
962
963
964 private void notifyChangedReadersObservers() throws IOException {
965 for (ChangedReadersObserver o: this.changedReaderObservers) {
966 o.updateReaders();
967 }
968 }
969
970
971
972
973
974
975 @Override
976 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
977 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
978 byte[] stopRow, long readPt) throws IOException {
979 Collection<StoreFile> storeFilesToScan;
980 List<KeyValueScanner> memStoreScanners;
981 this.lock.readLock().lock();
982 try {
983 storeFilesToScan =
984 this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
985 memStoreScanners = this.memstore.getScanners(readPt);
986 } finally {
987 this.lock.readLock().unlock();
988 }
989
990
991
992
993
994
995 List<StoreFileScanner> sfScanners = StoreFileScanner
996 .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher,
997 readPt);
998 List<KeyValueScanner> scanners =
999 new ArrayList<KeyValueScanner>(sfScanners.size()+1);
1000 scanners.addAll(sfScanners);
1001
1002 scanners.addAll(memStoreScanners);
1003 return scanners;
1004 }
1005
1006 @Override
1007 public void addChangedReaderObserver(ChangedReadersObserver o) {
1008 this.changedReaderObservers.add(o);
1009 }
1010
1011 @Override
1012 public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1013
1014 this.changedReaderObservers.remove(o);
1015 }
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064 @Override
1065 public List<StoreFile> compact(CompactionContext compaction) throws IOException {
1066 assert compaction != null && compaction.hasSelection();
1067 CompactionRequest cr = compaction.getRequest();
1068 Collection<StoreFile> filesToCompact = cr.getFiles();
1069 assert !filesToCompact.isEmpty();
1070 synchronized (filesCompacting) {
1071
1072
1073 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1074 }
1075
1076
1077 LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
1078 + this + " of " + this.getRegionInfo().getRegionNameAsString()
1079 + " into tmpdir=" + fs.getTempDir() + ", totalSize="
1080 + StringUtils.humanReadableInt(cr.getSize()));
1081
1082 long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
1083 List<StoreFile> sfs = null;
1084 try {
1085
1086 List<Path> newFiles = compaction.compact();
1087
1088
1089 if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
1090 LOG.warn("hbase.hstore.compaction.complete is set to false");
1091 sfs = new ArrayList<StoreFile>(newFiles.size());
1092 for (Path newFile : newFiles) {
1093
1094 StoreFile sf = createStoreFileAndReader(newFile);
1095 sf.closeReader(true);
1096 sfs.add(sf);
1097 }
1098 return sfs;
1099 }
1100
1101 sfs = moveCompatedFilesIntoPlace(cr, newFiles);
1102 writeCompactionWalRecord(filesToCompact, sfs);
1103 replaceStoreFiles(filesToCompact, sfs);
1104
1105 completeCompaction(filesToCompact);
1106 } finally {
1107 finishCompactionRequest(cr);
1108 }
1109 logCompactionEndMessage(cr, sfs, compactionStartTime);
1110 return sfs;
1111 }
1112
1113 private List<StoreFile> moveCompatedFilesIntoPlace(
1114 CompactionRequest cr, List<Path> newFiles) throws IOException {
1115 List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size());
1116 for (Path newFile : newFiles) {
1117 assert newFile != null;
1118 StoreFile sf = moveFileIntoPlace(newFile);
1119 if (this.getCoprocessorHost() != null) {
1120 this.getCoprocessorHost().postCompact(this, sf, cr);
1121 }
1122 assert sf != null;
1123 sfs.add(sf);
1124 }
1125 return sfs;
1126 }
1127
1128
1129 StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
1130 validateStoreFile(newFile);
1131
1132 Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
1133 return createStoreFileAndReader(destPath);
1134 }
1135
1136
1137
1138
1139
1140
1141 private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
1142 Collection<StoreFile> newFiles) throws IOException {
1143 if (region.getLog() == null) return;
1144 List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size());
1145 for (StoreFile f : filesCompacted) {
1146 inputPaths.add(f.getPath());
1147 }
1148 List<Path> outputPaths = new ArrayList<Path>(newFiles.size());
1149 for (StoreFile f : newFiles) {
1150 outputPaths.add(f.getPath());
1151 }
1152 HRegionInfo info = this.region.getRegionInfo();
1153 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1154 family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
1155 HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
1156 this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
1157 }
1158
1159 private void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
1160 final Collection<StoreFile> result) throws IOException {
1161 this.lock.writeLock().lock();
1162 try {
1163 this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
1164 filesCompacting.removeAll(compactedFiles);
1165 } finally {
1166 this.lock.writeLock().unlock();
1167 }
1168 }
1169
1170
1171
1172
1173
1174
1175
1176 private void logCompactionEndMessage(
1177 CompactionRequest cr, List<StoreFile> sfs, long compactionStartTime) {
1178 long now = EnvironmentEdgeManager.currentTimeMillis();
1179 StringBuilder message = new StringBuilder(
1180 "Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
1181 + cr.getFiles().size() + " file(s) in " + this + " of "
1182 + this.getRegionInfo().getRegionNameAsString()
1183 + " into ");
1184 if (sfs.isEmpty()) {
1185 message.append("none, ");
1186 } else {
1187 for (StoreFile sf: sfs) {
1188 message.append(sf.getPath().getName());
1189 message.append("(size=");
1190 message.append(StringUtils.humanReadableInt(sf.getReader().length()));
1191 message.append("), ");
1192 }
1193 }
1194 message.append("total size for store is ")
1195 .append(StringUtils.humanReadableInt(storeSize))
1196 .append(". This selection was in queue for ")
1197 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1198 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1199 .append(" to execute.");
1200 LOG.info(message.toString());
1201 if (LOG.isTraceEnabled()) {
1202 int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1203 long resultSize = 0;
1204 for (StoreFile sf : sfs) {
1205 resultSize += sf.getReader().length();
1206 }
1207 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1208 + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1209 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]";
1210 LOG.trace(traceMessage);
1211 }
1212 }
1213
1214
1215
1216
1217
1218
1219
1220 @Override
1221 public void completeCompactionMarker(CompactionDescriptor compaction)
1222 throws IOException {
1223 LOG.debug("Completing compaction from the WAL marker");
1224 List<String> compactionInputs = compaction.getCompactionInputList();
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240 String familyName = this.getColumnFamilyName();
1241 List<Path> inputPaths = new ArrayList<Path>(compactionInputs.size());
1242 for (String compactionInput : compactionInputs) {
1243 Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
1244 inputPaths.add(inputPath);
1245 }
1246
1247
1248 List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
1249 for (StoreFile sf : this.getStorefiles()) {
1250 if (inputPaths.contains(sf.getQualifiedPath())) {
1251 inputStoreFiles.add(sf);
1252 }
1253 }
1254
1255 this.replaceStoreFiles(inputStoreFiles, Collections.EMPTY_LIST);
1256 this.completeCompaction(inputStoreFiles);
1257 }
1258
1259
1260
1261
1262
1263
1264
1265
1266 public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
1267 List<StoreFile> filesToCompact;
1268 boolean isMajor;
1269
1270 this.lock.readLock().lock();
1271 try {
1272 synchronized (filesCompacting) {
1273 filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
1274 if (!filesCompacting.isEmpty()) {
1275
1276
1277 StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1278 int idx = filesToCompact.indexOf(last);
1279 Preconditions.checkArgument(idx != -1);
1280 filesToCompact.subList(0, idx + 1).clear();
1281 }
1282 int count = filesToCompact.size();
1283 if (N > count) {
1284 throw new RuntimeException("Not enough files");
1285 }
1286
1287 filesToCompact = filesToCompact.subList(count - N, count);
1288 isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
1289 filesCompacting.addAll(filesToCompact);
1290 Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1291 }
1292 } finally {
1293 this.lock.readLock().unlock();
1294 }
1295
1296 try {
1297
1298 List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
1299 .compactForTesting(filesToCompact, isMajor);
1300 for (Path newFile: newFiles) {
1301
1302 StoreFile sf = moveFileIntoPlace(newFile);
1303 if (this.getCoprocessorHost() != null) {
1304 this.getCoprocessorHost().postCompact(this, sf, null);
1305 }
1306 replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
1307 completeCompaction(filesToCompact);
1308 }
1309 } finally {
1310 synchronized (filesCompacting) {
1311 filesCompacting.removeAll(filesToCompact);
1312 }
1313 }
1314 }
1315
1316 @Override
1317 public boolean hasReferences() {
1318 return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
1319 }
1320
1321 @Override
1322 public CompactionProgress getCompactionProgress() {
1323 return this.storeEngine.getCompactor().getProgress();
1324 }
1325
1326 @Override
1327 public boolean isMajorCompaction() throws IOException {
1328 for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1329
1330 if (sf.getReader() == null) {
1331 LOG.debug("StoreFile " + sf + " has null Reader");
1332 return false;
1333 }
1334 }
1335 return storeEngine.getCompactionPolicy().isMajorCompaction(
1336 this.storeEngine.getStoreFileManager().getStorefiles());
1337 }
1338
1339 @Override
1340 public CompactionContext requestCompaction() throws IOException {
1341 return requestCompaction(Store.NO_PRIORITY, null);
1342 }
1343
1344 @Override
1345 public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
1346 throws IOException {
1347
1348 if (!this.areWritesEnabled()) {
1349 return null;
1350 }
1351
1352 CompactionContext compaction = storeEngine.createCompaction();
1353 this.lock.readLock().lock();
1354 try {
1355 synchronized (filesCompacting) {
1356
1357 if (this.getCoprocessorHost() != null) {
1358 List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1359 boolean override = this.getCoprocessorHost().preCompactSelection(
1360 this, candidatesForCoproc, baseRequest);
1361 if (override) {
1362
1363 compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
1364 }
1365 }
1366
1367
1368 if (!compaction.hasSelection()) {
1369 boolean isUserCompaction = priority == Store.PRIORITY_USER;
1370 boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
1371 offPeakCompactionTracker.compareAndSet(false, true);
1372 try {
1373 compaction.select(this.filesCompacting, isUserCompaction,
1374 mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
1375 } catch (IOException e) {
1376 if (mayUseOffPeak) {
1377 offPeakCompactionTracker.set(false);
1378 }
1379 throw e;
1380 }
1381 assert compaction.hasSelection();
1382 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1383
1384 offPeakCompactionTracker.set(false);
1385 }
1386 }
1387 if (this.getCoprocessorHost() != null) {
1388 this.getCoprocessorHost().postCompactSelection(
1389 this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
1390 }
1391
1392
1393 if (baseRequest != null) {
1394
1395
1396 compaction.forceSelect(
1397 baseRequest.combineWith(compaction.getRequest()));
1398 }
1399
1400
1401 final Collection<StoreFile> selectedFiles = compaction.getRequest().getFiles();
1402 if (selectedFiles.isEmpty()) {
1403 return null;
1404 }
1405
1406
1407 if (!Collections.disjoint(filesCompacting, selectedFiles)) {
1408 Preconditions.checkArgument(false, "%s overlaps with %s",
1409 selectedFiles, filesCompacting);
1410 }
1411 filesCompacting.addAll(selectedFiles);
1412 Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1413
1414
1415 boolean isMajor = selectedFiles.size() == this.getStorefilesCount();
1416 this.forceMajor = this.forceMajor && !isMajor;
1417
1418
1419
1420 compaction.getRequest().setPriority(
1421 (priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1422 compaction.getRequest().setIsMajor(isMajor);
1423 compaction.getRequest().setDescription(
1424 getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1425 }
1426 } finally {
1427 this.lock.readLock().unlock();
1428 }
1429
1430 LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + ": Initiating "
1431 + (compaction.getRequest().isMajor() ? "major" : "minor") + " compaction");
1432 this.region.reportCompactionRequestStart(compaction.getRequest().isMajor());
1433 return compaction;
1434 }
1435
1436 @Override
1437 public void cancelRequestedCompaction(CompactionContext compaction) {
1438 finishCompactionRequest(compaction.getRequest());
1439 }
1440
1441 private void finishCompactionRequest(CompactionRequest cr) {
1442 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1443 if (cr.isOffPeak()) {
1444 offPeakCompactionTracker.set(false);
1445 cr.setOffPeak(false);
1446 }
1447 synchronized (filesCompacting) {
1448 filesCompacting.removeAll(cr.getFiles());
1449 }
1450 }
1451
1452
1453
1454
1455
1456
1457
1458 private void validateStoreFile(Path path)
1459 throws IOException {
1460 StoreFile storeFile = null;
1461 try {
1462 storeFile = createStoreFileAndReader(path);
1463 } catch (IOException e) {
1464 LOG.error("Failed to open store file : " + path
1465 + ", keeping it in tmp location", e);
1466 throw e;
1467 } finally {
1468 if (storeFile != null) {
1469 storeFile.closeReader(false);
1470 }
1471 }
1472 }
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489 @VisibleForTesting
1490 protected void completeCompaction(final Collection<StoreFile> compactedFiles)
1491 throws IOException {
1492 try {
1493
1494
1495
1496
1497 notifyChangedReadersObservers();
1498
1499
1500
1501 LOG.debug("Removing store files after compaction...");
1502 for (StoreFile compactedFile : compactedFiles) {
1503 compactedFile.closeReader(true);
1504 }
1505 this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
1506 } catch (IOException e) {
1507 e = RemoteExceptionHandler.checkIOException(e);
1508 LOG.error("Failed removing compacted files in " + this +
1509 ". Files we were trying to remove are " + compactedFiles.toString() +
1510 "; some of them may have been already removed", e);
1511 }
1512
1513
1514 this.storeSize = 0L;
1515 this.totalUncompressedBytes = 0L;
1516 for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1517 StoreFile.Reader r = hsf.getReader();
1518 if (r == null) {
1519 LOG.warn("StoreFile " + hsf + " has a null Reader");
1520 continue;
1521 }
1522 this.storeSize += r.length();
1523 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1524 }
1525 }
1526
1527
1528
1529
1530
1531 int versionsToReturn(final int wantedVersions) {
1532 if (wantedVersions <= 0) {
1533 throw new IllegalArgumentException("Number of versions must be > 0");
1534 }
1535
1536 int maxVersions = this.family.getMaxVersions();
1537 return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1538 }
1539
1540 static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
1541 return key.getTimestamp() < oldestTimestamp;
1542 }
1543
1544 @Override
1545 public KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException {
1546
1547
1548
1549
1550
1551
1552 long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.scanInfo.getTtl();
1553
1554 KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
1555
1556 GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
1557 this.comparator, kv, ttlToUse, this.getRegionInfo().isMetaRegion());
1558 this.lock.readLock().lock();
1559 try {
1560
1561 this.memstore.getRowKeyAtOrBefore(state);
1562
1563
1564 Iterator<StoreFile> sfIterator = this.storeEngine.getStoreFileManager()
1565 .getCandidateFilesForRowKeyBefore(state.getTargetKey());
1566 while (sfIterator.hasNext()) {
1567 StoreFile sf = sfIterator.next();
1568 sfIterator.remove();
1569 boolean haveNewCandidate = rowAtOrBeforeFromStoreFile(sf, state);
1570 KeyValue keyv = state.getCandidate();
1571
1572 if (keyv != null && keyv.matchingRow(row)) return state.getCandidate();
1573 if (haveNewCandidate) {
1574 sfIterator = this.storeEngine.getStoreFileManager().updateCandidateFilesForRowKeyBefore(
1575 sfIterator, state.getTargetKey(), state.getCandidate());
1576 }
1577 }
1578 return state.getCandidate();
1579 } finally {
1580 this.lock.readLock().unlock();
1581 }
1582 }
1583
1584
1585
1586
1587
1588
1589
1590
1591 private boolean rowAtOrBeforeFromStoreFile(final StoreFile f,
1592 final GetClosestRowBeforeTracker state)
1593 throws IOException {
1594 StoreFile.Reader r = f.getReader();
1595 if (r == null) {
1596 LOG.warn("StoreFile " + f + " has a null Reader");
1597 return false;
1598 }
1599 if (r.getEntries() == 0) {
1600 LOG.warn("StoreFile " + f + " is a empty store file");
1601 return false;
1602 }
1603
1604 byte [] fk = r.getFirstKey();
1605 if (fk == null) return false;
1606 KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1607 byte [] lk = r.getLastKey();
1608 KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1609 KeyValue firstOnRow = state.getTargetKey();
1610 if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
1611
1612
1613 if (!state.isTargetTable(lastKV)) return false;
1614
1615
1616 firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
1617 }
1618
1619 HFileScanner scanner = r.getScanner(true, true, false);
1620
1621 if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1622
1623
1624 if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1625
1626 while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
1627 firstOnRow.getKeyLength())) {
1628 KeyValue kv = scanner.getKeyValue();
1629 if (!state.isTargetTable(kv)) break;
1630 if (!state.isBetterCandidate(kv)) break;
1631
1632 firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
1633
1634 if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1635
1636 if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1637 }
1638 return false;
1639 }
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649 private boolean seekToScanner(final HFileScanner scanner,
1650 final KeyValue firstOnRow,
1651 final KeyValue firstKV)
1652 throws IOException {
1653 KeyValue kv = firstOnRow;
1654
1655 if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
1656 int result = scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(),
1657 kv.getKeyLength());
1658 return result != -1;
1659 }
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671 private boolean walkForwardInSingleRow(final HFileScanner scanner,
1672 final KeyValue firstOnRow,
1673 final GetClosestRowBeforeTracker state)
1674 throws IOException {
1675 boolean foundCandidate = false;
1676 do {
1677 KeyValue kv = scanner.getKeyValue();
1678
1679 if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
1680
1681 if (state.isTooFar(kv, firstOnRow)) break;
1682 if (state.isExpired(kv)) {
1683 continue;
1684 }
1685
1686 if (state.handle(kv)) {
1687 foundCandidate = true;
1688 break;
1689 }
1690 } while(scanner.next());
1691 return foundCandidate;
1692 }
1693
1694 @Override
1695 public boolean canSplit() {
1696 this.lock.readLock().lock();
1697 try {
1698
1699 boolean result = !hasReferences();
1700 if (!result && LOG.isDebugEnabled()) {
1701 LOG.debug("Cannot split region due to reference files being there");
1702 }
1703 return result;
1704 } finally {
1705 this.lock.readLock().unlock();
1706 }
1707 }
1708
1709 @Override
1710 public byte[] getSplitPoint() {
1711 this.lock.readLock().lock();
1712 try {
1713
1714 assert !this.getRegionInfo().isMetaRegion();
1715
1716 if (hasReferences()) {
1717 return null;
1718 }
1719 return this.storeEngine.getStoreFileManager().getSplitPoint();
1720 } catch(IOException e) {
1721 LOG.warn("Failed getting store size for " + this, e);
1722 } finally {
1723 this.lock.readLock().unlock();
1724 }
1725 return null;
1726 }
1727
1728 @Override
1729 public long getLastCompactSize() {
1730 return this.lastCompactSize;
1731 }
1732
1733 @Override
1734 public long getSize() {
1735 return storeSize;
1736 }
1737
1738 @Override
1739 public void triggerMajorCompaction() {
1740 this.forceMajor = true;
1741 }
1742
1743 boolean getForceMajorCompaction() {
1744 return this.forceMajor;
1745 }
1746
1747
1748
1749
1750
1751 @Override
1752 public KeyValueScanner getScanner(Scan scan,
1753 final NavigableSet<byte []> targetCols, long readPt) throws IOException {
1754 lock.readLock().lock();
1755 try {
1756 KeyValueScanner scanner = null;
1757 if (this.getCoprocessorHost() != null) {
1758 scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
1759 }
1760 if (scanner == null) {
1761 scanner = scan.isReversed() ? new ReversedStoreScanner(this,
1762 getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
1763 getScanInfo(), scan, targetCols, readPt);
1764 }
1765 return scanner;
1766 } finally {
1767 lock.readLock().unlock();
1768 }
1769 }
1770
1771 @Override
1772 public String toString() {
1773 return this.getColumnFamilyName();
1774 }
1775
1776 @Override
1777
1778 public int getStorefilesCount() {
1779 return this.storeEngine.getStoreFileManager().getStorefileCount();
1780 }
1781
1782 @Override
1783 public long getStoreSizeUncompressed() {
1784 return this.totalUncompressedBytes;
1785 }
1786
1787 @Override
1788 public long getStorefilesSize() {
1789 long size = 0;
1790 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1791 StoreFile.Reader r = s.getReader();
1792 if (r == null) {
1793 LOG.warn("StoreFile " + s + " has a null Reader");
1794 continue;
1795 }
1796 size += r.length();
1797 }
1798 return size;
1799 }
1800
1801 @Override
1802 public long getStorefilesIndexSize() {
1803 long size = 0;
1804 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1805 StoreFile.Reader r = s.getReader();
1806 if (r == null) {
1807 LOG.warn("StoreFile " + s + " has a null Reader");
1808 continue;
1809 }
1810 size += r.indexSize();
1811 }
1812 return size;
1813 }
1814
1815 @Override
1816 public long getTotalStaticIndexSize() {
1817 long size = 0;
1818 for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1819 size += s.getReader().getUncompressedDataIndexSize();
1820 }
1821 return size;
1822 }
1823
1824 @Override
1825 public long getTotalStaticBloomSize() {
1826 long size = 0;
1827 for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1828 StoreFile.Reader r = s.getReader();
1829 size += r.getTotalBloomSize();
1830 }
1831 return size;
1832 }
1833
1834 @Override
1835 public long getMemStoreSize() {
1836 return this.memstore.heapSize();
1837 }
1838
1839 @Override
1840 public int getCompactPriority() {
1841 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
1842 if (priority == PRIORITY_USER) {
1843 LOG.warn("Compaction priority is USER despite there being no user compaction");
1844 }
1845 return priority;
1846 }
1847
1848 @Override
1849 public boolean throttleCompaction(long compactionSize) {
1850 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
1851 }
1852
1853 public HRegion getHRegion() {
1854 return this.region;
1855 }
1856
1857 @Override
1858 public RegionCoprocessorHost getCoprocessorHost() {
1859 return this.region.getCoprocessorHost();
1860 }
1861
1862 @Override
1863 public HRegionInfo getRegionInfo() {
1864 return this.fs.getRegionInfo();
1865 }
1866
1867 @Override
1868 public boolean areWritesEnabled() {
1869 return this.region.areWritesEnabled();
1870 }
1871
1872 @Override
1873 public long getSmallestReadPoint() {
1874 return this.region.getSmallestReadPoint();
1875 }
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890 public long updateColumnValue(byte [] row, byte [] f,
1891 byte [] qualifier, long newValue)
1892 throws IOException {
1893
1894 this.lock.readLock().lock();
1895 try {
1896 long now = EnvironmentEdgeManager.currentTimeMillis();
1897
1898 return this.memstore.updateColumnValue(row,
1899 f,
1900 qualifier,
1901 newValue,
1902 now);
1903
1904 } finally {
1905 this.lock.readLock().unlock();
1906 }
1907 }
1908
1909 @Override
1910 public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
1911 this.lock.readLock().lock();
1912 try {
1913 return this.memstore.upsert(cells, readpoint);
1914 } finally {
1915 this.lock.readLock().unlock();
1916 }
1917 }
1918
1919 @Override
1920 public StoreFlushContext createFlushContext(long cacheFlushId) {
1921 return new StoreFlusherImpl(cacheFlushId);
1922 }
1923
1924 private class StoreFlusherImpl implements StoreFlushContext {
1925
1926 private long cacheFlushSeqNum;
1927 private SortedSet<KeyValue> snapshot;
1928 private List<Path> tempFiles;
1929 private TimeRangeTracker snapshotTimeRangeTracker;
1930 private final AtomicLong flushedSize = new AtomicLong();
1931
1932 private StoreFlusherImpl(long cacheFlushSeqNum) {
1933 this.cacheFlushSeqNum = cacheFlushSeqNum;
1934 }
1935
1936
1937
1938
1939
1940 @Override
1941 public void prepare() {
1942 memstore.snapshot();
1943 this.snapshot = memstore.getSnapshot();
1944 this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
1945 }
1946
1947 @Override
1948 public void flushCache(MonitoredTask status) throws IOException {
1949 tempFiles = HStore.this.flushCache(
1950 cacheFlushSeqNum, snapshot, snapshotTimeRangeTracker, flushedSize, status);
1951 }
1952
1953 @Override
1954 public boolean commit(MonitoredTask status) throws IOException {
1955 if (this.tempFiles == null || this.tempFiles.isEmpty()) {
1956 return false;
1957 }
1958 List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
1959 for (Path storeFilePath : tempFiles) {
1960 try {
1961 storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum,
1962 snapshotTimeRangeTracker, flushedSize, status));
1963 } catch (IOException ex) {
1964 LOG.error("Failed to commit store file " + storeFilePath, ex);
1965
1966 for (StoreFile sf : storeFiles) {
1967 Path pathToDelete = sf.getPath();
1968 try {
1969 sf.deleteReader();
1970 } catch (IOException deleteEx) {
1971 LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
1972 Runtime.getRuntime().halt(1);
1973 }
1974 }
1975 throw new IOException("Failed to commit the flush", ex);
1976 }
1977 }
1978
1979 if (HStore.this.getCoprocessorHost() != null) {
1980 for (StoreFile sf : storeFiles) {
1981 HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
1982 }
1983 }
1984
1985 return HStore.this.updateStorefiles(storeFiles, snapshot);
1986 }
1987 }
1988
1989 @Override
1990 public boolean needsCompaction() {
1991 return this.storeEngine.needsCompaction(this.filesCompacting);
1992 }
1993
1994 @Override
1995 public CacheConfig getCacheConfig() {
1996 return this.cacheConf;
1997 }
1998
1999 public static final long FIXED_OVERHEAD =
2000 ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
2001 + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
2002
2003 public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
2004 + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
2005 + ClassSize.CONCURRENT_SKIPLISTMAP
2006 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
2007 + ScanInfo.FIXED_OVERHEAD);
2008
2009 @Override
2010 public long heapSize() {
2011 return DEEP_OVERHEAD + this.memstore.heapSize();
2012 }
2013
2014 @Override
2015 public KeyValue.KVComparator getComparator() {
2016 return comparator;
2017 }
2018
2019 @Override
2020 public ScanInfo getScanInfo() {
2021 return scanInfo;
2022 }
2023
2024
2025
2026
2027
2028 void setScanInfo(ScanInfo scanInfo) {
2029 this.scanInfo = scanInfo;
2030 }
2031
2032 @Override
2033 public boolean hasTooManyStoreFiles() {
2034 return getStorefilesCount() > this.blockingFileCount;
2035 }
2036 }