1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.InetSocketAddress;
24 import java.security.Key;
25 import java.security.KeyException;
26 import java.security.PrivilegedExceptionAction;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.Iterator;
33 import java.util.List;
34 import java.util.NavigableSet;
35 import java.util.Set;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.CompletionService;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.ExecutorCompletionService;
41 import java.util.concurrent.Future;
42 import java.util.concurrent.ThreadPoolExecutor;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.locks.ReentrantReadWriteLock;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.fs.FileSystem;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.hbase.Cell;
52 import org.apache.hadoop.hbase.CellComparator;
53 import org.apache.hadoop.hbase.CellUtil;
54 import org.apache.hadoop.hbase.CompoundConfiguration;
55 import org.apache.hadoop.hbase.HColumnDescriptor;
56 import org.apache.hadoop.hbase.HConstants;
57 import org.apache.hadoop.hbase.HRegionInfo;
58 import org.apache.hadoop.hbase.KeyValue;
59 import org.apache.hadoop.hbase.RemoteExceptionHandler;
60 import org.apache.hadoop.hbase.TableName;
61 import org.apache.hadoop.hbase.Tag;
62 import org.apache.hadoop.hbase.TagType;
63 import org.apache.hadoop.hbase.classification.InterfaceAudience;
64 import org.apache.hadoop.hbase.client.Scan;
65 import org.apache.hadoop.hbase.conf.ConfigurationManager;
66 import org.apache.hadoop.hbase.io.compress.Compression;
67 import org.apache.hadoop.hbase.io.crypto.Cipher;
68 import org.apache.hadoop.hbase.io.crypto.Encryption;
69 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
70 import org.apache.hadoop.hbase.io.hfile.HFile;
71 import org.apache.hadoop.hbase.io.hfile.HFileContext;
72 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
73 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
74 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
75 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
76 import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
77 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
78 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
79 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
80 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
81 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
82 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
83 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
84 import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
85 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
86 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
87 import org.apache.hadoop.hbase.security.EncryptionUtil;
88 import org.apache.hadoop.hbase.security.User;
89 import org.apache.hadoop.hbase.util.Bytes;
90 import org.apache.hadoop.hbase.util.ChecksumType;
91 import org.apache.hadoop.hbase.util.ClassSize;
92 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
93 import org.apache.hadoop.hbase.util.ReflectionUtils;
94 import org.apache.hadoop.util.StringUtils;
95 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
96
97 import com.google.common.annotations.VisibleForTesting;
98 import com.google.common.base.Preconditions;
99 import com.google.common.collect.ImmutableCollection;
100 import com.google.common.collect.ImmutableList;
101 import com.google.common.collect.Lists;
102 import com.google.common.collect.Sets;
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117 @InterfaceAudience.Private
118 public class HStore implements Store {
119 private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
120 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
121 "hbase.server.compactchecker.interval.multiplier";
122 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
123 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
124 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
125
126 private static final Log LOG = LogFactory.getLog(HStore.class);
127
128 protected final MemStore memstore;
129
130 private final HRegion region;
131 private final HColumnDescriptor family;
132 private final HRegionFileSystem fs;
133 private Configuration conf;
134 private final CacheConfig cacheConf;
135 private long lastCompactSize = 0;
136 volatile boolean forceMajor = false;
137
138 static int closeCheckInterval = 0;
139 private volatile long storeSize = 0L;
140 private volatile long totalUncompressedBytes = 0L;
141
142
143
144
145
146
147
148
149
150
151 final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
152 private final boolean verifyBulkLoads;
153
154 private ScanInfo scanInfo;
155
156
157 final List<StoreFile> filesCompacting = Lists.newArrayList();
158
159
160 private final Set<ChangedReadersObserver> changedReaderObservers =
161 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
162
163 private final int blocksize;
164 private HFileDataBlockEncoder dataBlockEncoder;
165
166
167 private ChecksumType checksumType;
168 private int bytesPerChecksum;
169
170
171 private final KeyValue.KVComparator comparator;
172
173 final StoreEngine<?, ?, ?, ?> storeEngine;
174
175 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
176 private volatile OffPeakHours offPeakHours;
177
178 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
179 private int flushRetriesNumber;
180 private int pauseTime;
181
182 private long blockingFileCount;
183 private int compactionCheckMultiplier;
184
185 private Encryption.Context cryptoContext = Encryption.Context.NONE;
186
187 private volatile long flushedCellsCount = 0;
188 private volatile long compactedCellsCount = 0;
189 private volatile long majorCompactedCellsCount = 0;
190 private volatile long flushedCellsSize = 0;
191 private volatile long compactedCellsSize = 0;
192 private volatile long majorCompactedCellsSize = 0;
193
194
195
196
197
198
199
200
201
202 protected HStore(final HRegion region, final HColumnDescriptor family,
203 final Configuration confParam) throws IOException {
204
205 HRegionInfo info = region.getRegionInfo();
206 this.fs = region.getRegionFileSystem();
207
208
209 fs.createStoreDir(family.getNameAsString());
210 this.region = region;
211 this.family = family;
212
213
214
215 this.conf = new CompoundConfiguration()
216 .add(confParam)
217 .addStringMap(region.getTableDesc().getConfiguration())
218 .addStringMap(family.getConfiguration())
219 .addWritableMap(family.getValues());
220 this.blocksize = family.getBlocksize();
221
222 this.dataBlockEncoder =
223 new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
224
225 this.comparator = info.getComparator();
226
227 long timeToPurgeDeletes =
228 Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
229 LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
230 "ms in store " + this);
231
232 long ttl = determineTTLFromFamily(family);
233
234
235 scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator);
236 String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
237 this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
238 Configuration.class, KeyValue.KVComparator.class }, new Object[] { conf, this.comparator });
239 this.offPeakHours = OffPeakHours.getInstance(conf);
240
241
242 this.cacheConf = new CacheConfig(conf, family);
243
244 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
245
246 this.blockingFileCount =
247 conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
248 this.compactionCheckMultiplier = conf.getInt(
249 COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
250 if (this.compactionCheckMultiplier <= 0) {
251 LOG.error("Compaction check period multiplier must be positive, setting default: "
252 + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
253 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
254 }
255
256 if (HStore.closeCheckInterval == 0) {
257 HStore.closeCheckInterval = conf.getInt(
258 "hbase.hstore.close.check.interval", 10*1000*1000
259 }
260
261 this.storeEngine = StoreEngine.create(this, this.conf, this.comparator);
262 this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
263
264
265 this.checksumType = getChecksumType(conf);
266
267 this.bytesPerChecksum = getBytesPerChecksum(conf);
268 flushRetriesNumber = conf.getInt(
269 "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
270 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
271 if (flushRetriesNumber <= 0) {
272 throw new IllegalArgumentException(
273 "hbase.hstore.flush.retries.number must be > 0, not "
274 + flushRetriesNumber);
275 }
276
277
278 String cipherName = family.getEncryptionType();
279 if (cipherName != null) {
280 Cipher cipher;
281 Key key;
282 byte[] keyBytes = family.getEncryptionKey();
283 if (keyBytes != null) {
284
285 String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
286 User.getCurrent().getShortName());
287 try {
288
289 key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
290 } catch (KeyException e) {
291
292
293 if (LOG.isDebugEnabled()) {
294 LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
295 }
296 String alternateKeyName =
297 conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
298 if (alternateKeyName != null) {
299 try {
300 key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
301 } catch (KeyException ex) {
302 throw new IOException(ex);
303 }
304 } else {
305 throw new IOException(e);
306 }
307 }
308
309 cipher = Encryption.getCipher(conf, key.getAlgorithm());
310 if (cipher == null) {
311 throw new RuntimeException("Cipher '" + key.getAlgorithm() + "' is not available");
312 }
313
314
315
316 if (!cipher.getName().equalsIgnoreCase(cipherName)) {
317 throw new RuntimeException("Encryption for family '" + family.getNameAsString() +
318 "' configured with type '" + cipherName +
319 "' but key specifies algorithm '" + cipher.getName() + "'");
320 }
321 } else {
322
323 cipher = Encryption.getCipher(conf, cipherName);
324 if (cipher == null) {
325 throw new RuntimeException("Cipher '" + cipherName + "' is not available");
326 }
327 key = cipher.getRandomKey();
328 }
329 cryptoContext = Encryption.newContext(conf);
330 cryptoContext.setCipher(cipher);
331 cryptoContext.setKey(key);
332 }
333 }
334
335
336
337
338
339 private static long determineTTLFromFamily(final HColumnDescriptor family) {
340
341 long ttl = family.getTimeToLive();
342 if (ttl == HConstants.FOREVER) {
343
344 ttl = Long.MAX_VALUE;
345 } else if (ttl == -1) {
346 ttl = Long.MAX_VALUE;
347 } else {
348
349 ttl *= 1000;
350 }
351 return ttl;
352 }
353
354 @Override
355 public String getColumnFamilyName() {
356 return this.family.getNameAsString();
357 }
358
359 @Override
360 public TableName getTableName() {
361 return this.getRegionInfo().getTable();
362 }
363
364 @Override
365 public FileSystem getFileSystem() {
366 return this.fs.getFileSystem();
367 }
368
369 public HRegionFileSystem getRegionFileSystem() {
370 return this.fs;
371 }
372
373
374 @Override
375 public long getStoreFileTtl() {
376
377 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
378 }
379
380 @Override
381 public long getMemstoreFlushSize() {
382
383 return this.region.memstoreFlushSize;
384 }
385
386 @Override
387 public long getFlushableSize() {
388 return this.memstore.getFlushableSize();
389 }
390
391 @Override
392 public long getSnapshotSize() {
393 return this.memstore.getSnapshotSize();
394 }
395
396 @Override
397 public long getCompactionCheckMultiplier() {
398 return this.compactionCheckMultiplier;
399 }
400
401 @Override
402 public long getBlockingFileCount() {
403 return blockingFileCount;
404 }
405
406
407
408
409
410
411
412 public static int getBytesPerChecksum(Configuration conf) {
413 return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
414 HFile.DEFAULT_BYTES_PER_CHECKSUM);
415 }
416
417
418
419
420
421
422 public static ChecksumType getChecksumType(Configuration conf) {
423 String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
424 if (checksumName == null) {
425 return ChecksumType.getDefaultChecksumType();
426 } else {
427 return ChecksumType.nameToType(checksumName);
428 }
429 }
430
431
432
433
434 public static int getCloseCheckInterval() {
435 return closeCheckInterval;
436 }
437
438 @Override
439 public HColumnDescriptor getFamily() {
440 return this.family;
441 }
442
443
444
445
446 @Override
447 public long getMaxSequenceId() {
448 return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
449 }
450
451 @Override
452 public long getMaxMemstoreTS() {
453 return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
454 }
455
456
457
458
459
460
461
462 @Deprecated
463 public static Path getStoreHomedir(final Path tabledir,
464 final HRegionInfo hri, final byte[] family) {
465 return getStoreHomedir(tabledir, hri.getEncodedName(), family);
466 }
467
468
469
470
471
472
473
474 @Deprecated
475 public static Path getStoreHomedir(final Path tabledir,
476 final String encodedName, final byte[] family) {
477 return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
478 }
479
480 @Override
481 public HFileDataBlockEncoder getDataBlockEncoder() {
482 return dataBlockEncoder;
483 }
484
485
486
487
488
489 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
490 this.dataBlockEncoder = blockEncoder;
491 }
492
493
494
495
496
497
498 private List<StoreFile> loadStoreFiles() throws IOException {
499 Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
500 return openStoreFiles(files);
501 }
502
503 private List<StoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException {
504 if (files == null || files.size() == 0) {
505 return new ArrayList<StoreFile>();
506 }
507
508 ThreadPoolExecutor storeFileOpenerThreadPool =
509 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
510 this.getColumnFamilyName());
511 CompletionService<StoreFile> completionService =
512 new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
513
514 int totalValidStoreFile = 0;
515 for (final StoreFileInfo storeFileInfo: files) {
516
517 completionService.submit(new Callable<StoreFile>() {
518 @Override
519 public StoreFile call() throws IOException {
520 StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
521 return storeFile;
522 }
523 });
524 totalValidStoreFile++;
525 }
526
527 ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
528 IOException ioe = null;
529 try {
530 for (int i = 0; i < totalValidStoreFile; i++) {
531 try {
532 Future<StoreFile> future = completionService.take();
533 StoreFile storeFile = future.get();
534 long length = storeFile.getReader().length();
535 this.storeSize += length;
536 this.totalUncompressedBytes +=
537 storeFile.getReader().getTotalUncompressedBytes();
538 if (LOG.isDebugEnabled()) {
539 LOG.debug("loaded " + storeFile.toStringDetailed());
540 }
541 results.add(storeFile);
542 } catch (InterruptedException e) {
543 if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
544 } catch (ExecutionException e) {
545 if (ioe == null) ioe = new IOException(e.getCause());
546 }
547 }
548 } finally {
549 storeFileOpenerThreadPool.shutdownNow();
550 }
551 if (ioe != null) {
552
553 boolean evictOnClose =
554 cacheConf != null? cacheConf.shouldEvictOnClose(): true;
555 for (StoreFile file : results) {
556 try {
557 if (file != null) file.closeReader(evictOnClose);
558 } catch (IOException e) {
559 LOG.warn(e.getMessage());
560 }
561 }
562 throw ioe;
563 }
564
565 return results;
566 }
567
568
569
570
571
572
573
574
575 @Override
576 public void refreshStoreFiles() throws IOException {
577 Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName());
578 refreshStoreFilesInternal(newFiles);
579 }
580
581 @Override
582 public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
583 List<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(newFiles.size());
584 for (String file : newFiles) {
585 storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file));
586 }
587 refreshStoreFilesInternal(storeFiles);
588 }
589
590
591
592
593
594
595
596
597 private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
598 StoreFileManager sfm = storeEngine.getStoreFileManager();
599 Collection<StoreFile> currentFiles = sfm.getStorefiles();
600 if (currentFiles == null) currentFiles = new ArrayList<StoreFile>(0);
601
602 if (newFiles == null) newFiles = new ArrayList<StoreFileInfo>(0);
603
604 HashMap<StoreFileInfo, StoreFile> currentFilesSet =
605 new HashMap<StoreFileInfo, StoreFile>(currentFiles.size());
606 for (StoreFile sf : currentFiles) {
607 currentFilesSet.put(sf.getFileInfo(), sf);
608 }
609 HashSet<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
610
611 Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
612 Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
613
614 if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
615 return;
616 }
617
618 LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString()
619 + " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles);
620
621 Set<StoreFile> toBeRemovedStoreFiles = new HashSet<StoreFile>(toBeRemovedFiles.size());
622 for (StoreFileInfo sfi : toBeRemovedFiles) {
623 toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
624 }
625
626
627 List<StoreFile> openedFiles = openStoreFiles(toBeAddedFiles);
628
629
630 replaceStoreFiles(toBeRemovedStoreFiles, openedFiles);
631
632
633
634
635 if (!toBeAddedFiles.isEmpty()) {
636 region.getMVCC().advanceTo(this.getMaxSequenceId());
637 }
638
639
640 completeCompaction(toBeRemovedStoreFiles, false);
641 }
642
643 private StoreFile createStoreFileAndReader(final Path p) throws IOException {
644 StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
645 return createStoreFileAndReader(info);
646 }
647
648 private StoreFile createStoreFileAndReader(final StoreFileInfo info)
649 throws IOException {
650 info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
651 StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
652 this.family.getBloomFilterType());
653 StoreFile.Reader r = storeFile.createReader();
654 r.setReplicaStoreFile(isPrimaryReplicaStore());
655 return storeFile;
656 }
657
658 @Override
659 public long add(final Cell cell) {
660 lock.readLock().lock();
661 try {
662 return this.memstore.add(cell);
663 } finally {
664 lock.readLock().unlock();
665 }
666 }
667
668 @Override
669 public long timeOfOldestEdit() {
670 return memstore.timeOfOldestEdit();
671 }
672
673
674
675
676
677
678
679 protected long delete(final KeyValue kv) {
680 lock.readLock().lock();
681 try {
682 return this.memstore.delete(kv);
683 } finally {
684 lock.readLock().unlock();
685 }
686 }
687
688 @Override
689 public void rollback(final Cell cell) {
690 lock.readLock().lock();
691 try {
692 this.memstore.rollback(cell);
693 } finally {
694 lock.readLock().unlock();
695 }
696 }
697
698
699
700
701 @Override
702 public Collection<StoreFile> getStorefiles() {
703 return this.storeEngine.getStoreFileManager().getStorefiles();
704 }
705
706 @Override
707 public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
708 HFile.Reader reader = null;
709 try {
710 LOG.info("Validating hfile at " + srcPath + " for inclusion in "
711 + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
712 reader = HFile.createReader(srcPath.getFileSystem(conf),
713 srcPath, cacheConf, conf);
714 reader.loadFileInfo();
715
716 byte[] firstKey = reader.getFirstRowKey();
717 Preconditions.checkState(firstKey != null, "First key can not be null");
718 byte[] lk = reader.getLastKey();
719 Preconditions.checkState(lk != null, "Last key can not be null");
720 byte[] lastKey = KeyValue.createKeyValueFromKey(lk).getRow();
721
722 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
723 " last=" + Bytes.toStringBinary(lastKey));
724 LOG.debug("Region bounds: first=" +
725 Bytes.toStringBinary(getRegionInfo().getStartKey()) +
726 " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
727
728 if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
729 throw new WrongRegionException(
730 "Bulk load file " + srcPath.toString() + " does not fit inside region "
731 + this.getRegionInfo().getRegionNameAsString());
732 }
733
734 if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE,
735 HConstants.DEFAULT_MAX_FILE_SIZE)) {
736 LOG.warn("Trying to bulk load hfile " + srcPath.toString() + " with size: " +
737 reader.length() + " bytes can be problematic as it may lead to oversplitting.");
738 }
739
740 if (verifyBulkLoads) {
741 long verificationStartTime = EnvironmentEdgeManager.currentTime();
742 LOG.info("Full verification started for bulk load hfile: " + srcPath.toString());
743 Cell prevCell = null;
744 HFileScanner scanner = reader.getScanner(false, false, false);
745 scanner.seekTo();
746 do {
747 Cell cell = scanner.getKeyValue();
748 if (prevCell != null) {
749 if (CellComparator.compareRows(prevCell, cell) > 0) {
750 throw new InvalidHFileException("Previous row is greater than"
751 + " current row: path=" + srcPath + " previous="
752 + CellUtil.getCellKeyAsString(prevCell) + " current="
753 + CellUtil.getCellKeyAsString(cell));
754 }
755 if (CellComparator.compareFamilies(prevCell, cell) != 0) {
756 throw new InvalidHFileException("Previous key had different"
757 + " family compared to current key: path=" + srcPath
758 + " previous="
759 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(),
760 prevCell.getFamilyLength())
761 + " current="
762 + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
763 cell.getFamilyLength()));
764 }
765 }
766 prevCell = cell;
767 } while (scanner.next());
768 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString()
769 + " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime)
770 + " ms");
771 }
772 } finally {
773 if (reader != null) reader.close();
774 }
775 }
776
777 @Override
778 public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
779 Path srcPath = new Path(srcPathStr);
780 Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
781
782 LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
783 + dstPath + " - updating store file list.");
784
785 StoreFile sf = createStoreFileAndReader(dstPath);
786 bulkLoadHFile(sf);
787
788 LOG.info("Successfully loaded store file " + srcPath + " into store " + this
789 + " (new location: " + dstPath + ")");
790
791 return dstPath;
792 }
793
794 @Override
795 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
796 StoreFile sf = createStoreFileAndReader(fileInfo);
797 bulkLoadHFile(sf);
798 }
799
800 private void bulkLoadHFile(StoreFile sf) throws IOException {
801 StoreFile.Reader r = sf.getReader();
802 this.storeSize += r.length();
803 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
804
805
806 this.lock.writeLock().lock();
807 try {
808 this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
809 } finally {
810
811
812
813
814
815 this.lock.writeLock().unlock();
816 }
817 notifyChangedReadersObservers();
818 LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
819 if (LOG.isTraceEnabled()) {
820 String traceMessage = "BULK LOAD time,size,store size,store files ["
821 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize
822 + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
823 LOG.trace(traceMessage);
824 }
825 }
826
827 @Override
828 public ImmutableCollection<StoreFile> close() throws IOException {
829 this.lock.writeLock().lock();
830 try {
831
832 ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
833
834 if (!result.isEmpty()) {
835
836 ThreadPoolExecutor storeFileCloserThreadPool = this.region
837 .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
838 + this.getColumnFamilyName());
839
840
841 CompletionService<Void> completionService =
842 new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
843 for (final StoreFile f : result) {
844 completionService.submit(new Callable<Void>() {
845 @Override
846 public Void call() throws IOException {
847 boolean evictOnClose =
848 cacheConf != null? cacheConf.shouldEvictOnClose(): true;
849 f.closeReader(evictOnClose);
850 return null;
851 }
852 });
853 }
854
855 IOException ioe = null;
856 try {
857 for (int i = 0; i < result.size(); i++) {
858 try {
859 Future<Void> future = completionService.take();
860 future.get();
861 } catch (InterruptedException e) {
862 if (ioe == null) {
863 ioe = new InterruptedIOException();
864 ioe.initCause(e);
865 }
866 } catch (ExecutionException e) {
867 if (ioe == null) ioe = new IOException(e.getCause());
868 }
869 }
870 } finally {
871 storeFileCloserThreadPool.shutdownNow();
872 }
873 if (ioe != null) throw ioe;
874 }
875 LOG.info("Closed " + this);
876 return result;
877 } finally {
878 this.lock.writeLock().unlock();
879 }
880 }
881
882
883
884
885
886
887 void snapshot() {
888 this.lock.writeLock().lock();
889 try {
890 this.memstore.snapshot();
891 } finally {
892 this.lock.writeLock().unlock();
893 }
894 }
895
896
897
898
899
900
901
902
903
904 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
905 MonitoredTask status) throws IOException {
906
907
908
909
910
911 StoreFlusher flusher = storeEngine.getStoreFlusher();
912 IOException lastException = null;
913 for (int i = 0; i < flushRetriesNumber; i++) {
914 try {
915 List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status);
916 Path lastPathName = null;
917 try {
918 for (Path pathName : pathNames) {
919 lastPathName = pathName;
920 validateStoreFile(pathName);
921 }
922 return pathNames;
923 } catch (Exception e) {
924 LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
925 if (e instanceof IOException) {
926 lastException = (IOException) e;
927 } else {
928 lastException = new IOException(e);
929 }
930 }
931 } catch (IOException e) {
932 LOG.warn("Failed flushing store file, retrying num=" + i, e);
933 lastException = e;
934 }
935 if (lastException != null && i < (flushRetriesNumber - 1)) {
936 try {
937 Thread.sleep(pauseTime);
938 } catch (InterruptedException e) {
939 IOException iie = new InterruptedIOException();
940 iie.initCause(e);
941 throw iie;
942 }
943 }
944 }
945 throw lastException;
946 }
947
948
949
950
951
952
953
954
955 private StoreFile commitFile(final Path path, final long logCacheFlushId, MonitoredTask status)
956 throws IOException {
957
958 Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
959
960 status.setStatus("Flushing " + this + ": reopening flushed file");
961 StoreFile sf = createStoreFileAndReader(dstPath);
962
963 StoreFile.Reader r = sf.getReader();
964 this.storeSize += r.length();
965 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
966
967 if (LOG.isInfoEnabled()) {
968 LOG.info("Added " + sf + ", entries=" + r.getEntries() +
969 ", sequenceid=" + logCacheFlushId +
970 ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1));
971 }
972 return sf;
973 }
974
975 @Override
976 public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
977 boolean isCompaction, boolean includeMVCCReadpoint,
978 boolean includesTag)
979 throws IOException {
980 return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
981 includesTag, false);
982 }
983
984
985
986
987
988
989
990
991
992 @Override
993 public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
994 boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
995 boolean shouldDropBehind)
996 throws IOException {
997 final CacheConfig writerCacheConf;
998 if (isCompaction) {
999
1000 writerCacheConf = new CacheConfig(cacheConf);
1001 writerCacheConf.setCacheDataOnWrite(false);
1002 } else {
1003 writerCacheConf = cacheConf;
1004 }
1005 InetSocketAddress[] favoredNodes = null;
1006 if (region.getRegionServerServices() != null) {
1007 favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
1008 region.getRegionInfo().getEncodedName());
1009 }
1010 HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
1011 cryptoContext);
1012 StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
1013 this.getFileSystem())
1014 .withFilePath(fs.createTempName())
1015 .withComparator(comparator)
1016 .withBloomType(family.getBloomFilterType())
1017 .withMaxKeyCount(maxKeyCount)
1018 .withFavoredNodes(favoredNodes)
1019 .withFileContext(hFileContext)
1020 .withShouldDropCacheBehind(shouldDropBehind)
1021 .build();
1022 return w;
1023 }
1024
1025 private HFileContext createFileContext(Compression.Algorithm compression,
1026 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) {
1027 if (compression == null) {
1028 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
1029 }
1030 HFileContext hFileContext = new HFileContextBuilder()
1031 .withIncludesMvcc(includeMVCCReadpoint)
1032 .withIncludesTags(includesTag)
1033 .withCompression(compression)
1034 .withCompressTags(family.isCompressTags())
1035 .withChecksumType(checksumType)
1036 .withBytesPerCheckSum(bytesPerChecksum)
1037 .withBlockSize(blocksize)
1038 .withHBaseCheckSum(true)
1039 .withDataBlockEncoding(family.getDataBlockEncoding())
1040 .withEncryptionContext(cryptoContext)
1041 .withCreateTime(EnvironmentEdgeManager.currentTime())
1042 .build();
1043 return hFileContext;
1044 }
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054 private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
1055 throws IOException {
1056 this.lock.writeLock().lock();
1057 try {
1058 this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
1059 if (snapshotId > 0) {
1060 this.memstore.clearSnapshot(snapshotId);
1061 }
1062 } finally {
1063
1064
1065
1066
1067
1068 this.lock.writeLock().unlock();
1069 }
1070
1071
1072 notifyChangedReadersObservers();
1073
1074 if (LOG.isTraceEnabled()) {
1075 long totalSize = 0;
1076 for (StoreFile sf : sfs) {
1077 totalSize += sf.getReader().length();
1078 }
1079 String traceMessage = "FLUSH time,count,size,store size,store files ["
1080 + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
1081 + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
1082 LOG.trace(traceMessage);
1083 }
1084 return needsCompaction();
1085 }
1086
1087
1088
1089
1090
1091 private void notifyChangedReadersObservers() throws IOException {
1092 for (ChangedReadersObserver o: this.changedReaderObservers) {
1093 o.updateReaders();
1094 }
1095 }
1096
1097
1098
1099
1100
1101
1102 @Override
1103 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
1104 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1105 byte[] stopRow, long readPt) throws IOException {
1106 Collection<StoreFile> storeFilesToScan;
1107 List<KeyValueScanner> memStoreScanners;
1108 this.lock.readLock().lock();
1109 try {
1110 storeFilesToScan =
1111 this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
1112 memStoreScanners = this.memstore.getScanners(readPt);
1113 } finally {
1114 this.lock.readLock().unlock();
1115 }
1116
1117
1118
1119
1120
1121
1122 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
1123 cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
1124 List<KeyValueScanner> scanners =
1125 new ArrayList<KeyValueScanner>(sfScanners.size()+1);
1126 scanners.addAll(sfScanners);
1127
1128 scanners.addAll(memStoreScanners);
1129 return scanners;
1130 }
1131
1132 @Override
1133 public void addChangedReaderObserver(ChangedReadersObserver o) {
1134 this.changedReaderObservers.add(o);
1135 }
1136
1137 @Override
1138 public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1139
1140 this.changedReaderObservers.remove(o);
1141 }
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190 @Override
1191 public List<StoreFile> compact(CompactionContext compaction,
1192 CompactionThroughputController throughputController) throws IOException {
1193 return compact(compaction, throughputController, null);
1194 }
1195
1196 @Override
1197 public List<StoreFile> compact(CompactionContext compaction,
1198 CompactionThroughputController throughputController, User user) throws IOException {
1199 assert compaction != null;
1200 List<StoreFile> sfs = null;
1201 CompactionRequest cr = compaction.getRequest();
1202 try {
1203
1204
1205
1206 long compactionStartTime = EnvironmentEdgeManager.currentTime();
1207 assert compaction.hasSelection();
1208 Collection<StoreFile> filesToCompact = cr.getFiles();
1209 assert !filesToCompact.isEmpty();
1210 synchronized (filesCompacting) {
1211
1212
1213 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1214 }
1215
1216
1217 LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
1218 + this + " of " + this.getRegionInfo().getRegionNameAsString()
1219 + " into tmpdir=" + fs.getTempDir() + ", totalSize="
1220 + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
1221
1222
1223 List<Path> newFiles = compaction.compact(throughputController, user);
1224
1225
1226 if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
1227 LOG.warn("hbase.hstore.compaction.complete is set to false");
1228 sfs = new ArrayList<StoreFile>(newFiles.size());
1229 final boolean evictOnClose =
1230 cacheConf != null? cacheConf.shouldEvictOnClose(): true;
1231 for (Path newFile : newFiles) {
1232
1233 StoreFile sf = createStoreFileAndReader(newFile);
1234 sf.closeReader(evictOnClose);
1235 sfs.add(sf);
1236 }
1237 return sfs;
1238 }
1239
1240 sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
1241 writeCompactionWalRecord(filesToCompact, sfs);
1242 replaceStoreFiles(filesToCompact, sfs);
1243 if (cr.isMajor()) {
1244 majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
1245 majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
1246 } else {
1247 compactedCellsCount += getCompactionProgress().totalCompactingKVs;
1248 compactedCellsSize += getCompactionProgress().totalCompactedSize;
1249 }
1250
1251 completeCompaction(filesToCompact, true);
1252
1253 logCompactionEndMessage(cr, sfs, compactionStartTime);
1254 return sfs;
1255 } finally {
1256 finishCompactionRequest(cr);
1257 }
1258 }
1259
1260 private List<StoreFile> moveCompatedFilesIntoPlace(
1261 final CompactionRequest cr, List<Path> newFiles, User user) throws IOException {
1262 List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size());
1263 for (Path newFile : newFiles) {
1264 assert newFile != null;
1265 final StoreFile sf = moveFileIntoPlace(newFile);
1266 if (this.getCoprocessorHost() != null) {
1267 final Store thisStore = this;
1268 if (user == null) {
1269 getCoprocessorHost().postCompact(thisStore, sf, cr);
1270 } else {
1271 try {
1272 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1273 @Override
1274 public Void run() throws Exception {
1275 getCoprocessorHost().postCompact(thisStore, sf, cr);
1276 return null;
1277 }
1278 });
1279 } catch (InterruptedException ie) {
1280 InterruptedIOException iioe = new InterruptedIOException();
1281 iioe.initCause(ie);
1282 throw iioe;
1283 }
1284 }
1285 }
1286 assert sf != null;
1287 sfs.add(sf);
1288 }
1289 return sfs;
1290 }
1291
1292
1293 StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
1294 validateStoreFile(newFile);
1295
1296 Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
1297 return createStoreFileAndReader(destPath);
1298 }
1299
1300
1301
1302
1303
1304
1305 private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
1306 Collection<StoreFile> newFiles) throws IOException {
1307 if (region.getWAL() == null) return;
1308 List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size());
1309 for (StoreFile f : filesCompacted) {
1310 inputPaths.add(f.getPath());
1311 }
1312 List<Path> outputPaths = new ArrayList<Path>(newFiles.size());
1313 for (StoreFile f : newFiles) {
1314 outputPaths.add(f.getPath());
1315 }
1316 HRegionInfo info = this.region.getRegionInfo();
1317 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1318 family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
1319 WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
1320 this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
1321 }
1322
1323 @VisibleForTesting
1324 void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
1325 final Collection<StoreFile> result) throws IOException {
1326 this.lock.writeLock().lock();
1327 try {
1328 this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
1329 filesCompacting.removeAll(compactedFiles);
1330 } finally {
1331 this.lock.writeLock().unlock();
1332 }
1333 }
1334
1335
1336
1337
1338
1339
1340
1341 private void logCompactionEndMessage(
1342 CompactionRequest cr, List<StoreFile> sfs, long compactionStartTime) {
1343 long now = EnvironmentEdgeManager.currentTime();
1344 StringBuilder message = new StringBuilder(
1345 "Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
1346 + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
1347 + this + " of " + this.getRegionInfo().getRegionNameAsString() + " into ");
1348 if (sfs.isEmpty()) {
1349 message.append("none, ");
1350 } else {
1351 for (StoreFile sf: sfs) {
1352 message.append(sf.getPath().getName());
1353 message.append("(size=");
1354 message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));
1355 message.append("), ");
1356 }
1357 }
1358 message.append("total size for store is ")
1359 .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize, "", 1))
1360 .append(". This selection was in queue for ")
1361 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1362 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1363 .append(" to execute.");
1364 LOG.info(message.toString());
1365 if (LOG.isTraceEnabled()) {
1366 int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1367 long resultSize = 0;
1368 for (StoreFile sf : sfs) {
1369 resultSize += sf.getReader().length();
1370 }
1371 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1372 + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1373 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]";
1374 LOG.trace(traceMessage);
1375 }
1376 }
1377
1378
1379
1380
1381
1382
1383
1384 @Override
1385 public void replayCompactionMarker(CompactionDescriptor compaction,
1386 boolean pickCompactionFiles, boolean removeFiles)
1387 throws IOException {
1388 LOG.debug("Completing compaction from the WAL marker");
1389 List<String> compactionInputs = compaction.getCompactionInputList();
1390 List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406 String familyName = this.getColumnFamilyName();
1407 List<String> inputFiles = new ArrayList<String>(compactionInputs.size());
1408 for (String compactionInput : compactionInputs) {
1409 Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
1410 inputFiles.add(inputPath.getName());
1411 }
1412
1413
1414 List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
1415 for (StoreFile sf : this.getStorefiles()) {
1416 if (inputFiles.contains(sf.getPath().getName())) {
1417 inputStoreFiles.add(sf);
1418 }
1419 }
1420
1421
1422 List<StoreFile> outputStoreFiles = new ArrayList<StoreFile>(compactionOutputs.size());
1423
1424 if (pickCompactionFiles) {
1425 for (StoreFile sf : this.getStorefiles()) {
1426 compactionOutputs.remove(sf.getPath().getName());
1427 }
1428 for (String compactionOutput : compactionOutputs) {
1429 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput);
1430 StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
1431 outputStoreFiles.add(storeFile);
1432 }
1433 }
1434
1435 if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
1436 LOG.info("Replaying compaction marker, replacing input files: " +
1437 inputStoreFiles + " with output files : " + outputStoreFiles);
1438 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
1439 this.completeCompaction(inputStoreFiles, removeFiles);
1440 }
1441 }
1442
1443
1444
1445
1446
1447
1448
1449
1450 public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
1451 List<StoreFile> filesToCompact;
1452 boolean isMajor;
1453
1454 this.lock.readLock().lock();
1455 try {
1456 synchronized (filesCompacting) {
1457 filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
1458 if (!filesCompacting.isEmpty()) {
1459
1460
1461 StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1462 int idx = filesToCompact.indexOf(last);
1463 Preconditions.checkArgument(idx != -1);
1464 filesToCompact.subList(0, idx + 1).clear();
1465 }
1466 int count = filesToCompact.size();
1467 if (N > count) {
1468 throw new RuntimeException("Not enough files");
1469 }
1470
1471 filesToCompact = filesToCompact.subList(count - N, count);
1472 isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
1473 filesCompacting.addAll(filesToCompact);
1474 Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1475 }
1476 } finally {
1477 this.lock.readLock().unlock();
1478 }
1479
1480 try {
1481
1482 List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
1483 .compactForTesting(filesToCompact, isMajor);
1484 for (Path newFile: newFiles) {
1485
1486 StoreFile sf = moveFileIntoPlace(newFile);
1487 if (this.getCoprocessorHost() != null) {
1488 this.getCoprocessorHost().postCompact(this, sf, null);
1489 }
1490 replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
1491 completeCompaction(filesToCompact, true);
1492 }
1493 } finally {
1494 synchronized (filesCompacting) {
1495 filesCompacting.removeAll(filesToCompact);
1496 }
1497 }
1498 }
1499
1500 @Override
1501 public boolean hasReferences() {
1502 return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
1503 }
1504
1505 @Override
1506 public CompactionProgress getCompactionProgress() {
1507 return this.storeEngine.getCompactor().getProgress();
1508 }
1509
1510 @Override
1511 public boolean isMajorCompaction() throws IOException {
1512 for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1513
1514 if (sf.getReader() == null) {
1515 LOG.debug("StoreFile " + sf + " has null Reader");
1516 return false;
1517 }
1518 }
1519 return storeEngine.getCompactionPolicy().isMajorCompaction(
1520 this.storeEngine.getStoreFileManager().getStorefiles());
1521 }
1522
1523 @Override
1524 public CompactionContext requestCompaction() throws IOException {
1525 return requestCompaction(Store.NO_PRIORITY, null);
1526 }
1527
1528 @Override
1529 public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
1530 throws IOException {
1531 return requestCompaction(priority, baseRequest, null);
1532 }
1533 @Override
1534 public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
1535 User user) throws IOException {
1536
1537 if (!this.areWritesEnabled()) {
1538 return null;
1539 }
1540
1541
1542 removeUnneededFiles();
1543
1544 final CompactionContext compaction = storeEngine.createCompaction();
1545 CompactionRequest request = null;
1546 this.lock.readLock().lock();
1547 try {
1548 synchronized (filesCompacting) {
1549 final Store thisStore = this;
1550
1551 if (this.getCoprocessorHost() != null) {
1552 final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1553 boolean override = false;
1554 if (user == null) {
1555 override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
1556 baseRequest);
1557 } else {
1558 try {
1559 override = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
1560 @Override
1561 public Boolean run() throws Exception {
1562 return getCoprocessorHost().preCompactSelection(thisStore, candidatesForCoproc,
1563 baseRequest);
1564 }
1565 });
1566 } catch (InterruptedException ie) {
1567 InterruptedIOException iioe = new InterruptedIOException();
1568 iioe.initCause(ie);
1569 throw iioe;
1570 }
1571 }
1572 if (override) {
1573
1574 compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
1575 }
1576 }
1577
1578
1579 if (!compaction.hasSelection()) {
1580 boolean isUserCompaction = priority == Store.PRIORITY_USER;
1581 boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
1582 offPeakCompactionTracker.compareAndSet(false, true);
1583 try {
1584 compaction.select(this.filesCompacting, isUserCompaction,
1585 mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
1586 } catch (IOException e) {
1587 if (mayUseOffPeak) {
1588 offPeakCompactionTracker.set(false);
1589 }
1590 throw e;
1591 }
1592 assert compaction.hasSelection();
1593 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1594
1595 offPeakCompactionTracker.set(false);
1596 }
1597 }
1598 if (this.getCoprocessorHost() != null) {
1599 if (user == null) {
1600 this.getCoprocessorHost().postCompactSelection(
1601 this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
1602 } else {
1603 try {
1604 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1605 @Override
1606 public Void run() throws Exception {
1607 getCoprocessorHost().postCompactSelection(
1608 thisStore,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest);
1609 return null;
1610 }
1611 });
1612 } catch (InterruptedException ie) {
1613 InterruptedIOException iioe = new InterruptedIOException();
1614 iioe.initCause(ie);
1615 throw iioe;
1616 }
1617 }
1618 }
1619
1620
1621 if (baseRequest != null) {
1622
1623
1624 compaction.forceSelect(
1625 baseRequest.combineWith(compaction.getRequest()));
1626 }
1627
1628 request = compaction.getRequest();
1629 final Collection<StoreFile> selectedFiles = request.getFiles();
1630 if (selectedFiles.isEmpty()) {
1631 return null;
1632 }
1633
1634 addToCompactingFiles(selectedFiles);
1635
1636
1637 this.forceMajor = this.forceMajor && !request.isMajor();
1638
1639
1640
1641 request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1642 request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1643 }
1644 } finally {
1645 this.lock.readLock().unlock();
1646 }
1647
1648 LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
1649 + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
1650 + (request.isAllFiles() ? " (all files)" : ""));
1651 this.region.reportCompactionRequestStart(request.isMajor());
1652 return compaction;
1653 }
1654
1655
1656 private void addToCompactingFiles(final Collection<StoreFile> filesToAdd) {
1657 if (filesToAdd == null) return;
1658
1659 if (!Collections.disjoint(filesCompacting, filesToAdd)) {
1660 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
1661 }
1662 filesCompacting.addAll(filesToAdd);
1663 Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1664 }
1665
1666 private void removeUnneededFiles() throws IOException {
1667 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
1668 if (getFamily().getMinVersions() > 0) {
1669 LOG.debug("Skipping expired store file removal due to min version being " +
1670 getFamily().getMinVersions());
1671 return;
1672 }
1673 this.lock.readLock().lock();
1674 Collection<StoreFile> delSfs = null;
1675 try {
1676 synchronized (filesCompacting) {
1677 long cfTtl = getStoreFileTtl();
1678 if (cfTtl != Long.MAX_VALUE) {
1679 delSfs = storeEngine.getStoreFileManager().getUnneededFiles(
1680 EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
1681 addToCompactingFiles(delSfs);
1682 }
1683 }
1684 } finally {
1685 this.lock.readLock().unlock();
1686 }
1687 if (delSfs == null || delSfs.isEmpty()) return;
1688
1689 Collection<StoreFile> newFiles = new ArrayList<StoreFile>();
1690 writeCompactionWalRecord(delSfs, newFiles);
1691 replaceStoreFiles(delSfs, newFiles);
1692 completeCompaction(delSfs);
1693 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
1694 + this + " of " + this.getRegionInfo().getRegionNameAsString()
1695 + "; total size for store is " + TraditionalBinaryPrefix.long2String(storeSize, "", 1));
1696 }
1697
1698 @Override
1699 public void cancelRequestedCompaction(CompactionContext compaction) {
1700 finishCompactionRequest(compaction.getRequest());
1701 }
1702
1703 private void finishCompactionRequest(CompactionRequest cr) {
1704 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1705 if (cr.isOffPeak()) {
1706 offPeakCompactionTracker.set(false);
1707 cr.setOffPeak(false);
1708 }
1709 synchronized (filesCompacting) {
1710 filesCompacting.removeAll(cr.getFiles());
1711 }
1712 }
1713
1714
1715
1716
1717
1718
1719
1720 private void validateStoreFile(Path path)
1721 throws IOException {
1722 StoreFile storeFile = null;
1723 try {
1724 storeFile = createStoreFileAndReader(path);
1725 } catch (IOException e) {
1726 LOG.error("Failed to open store file : " + path
1727 + ", keeping it in tmp location", e);
1728 throw e;
1729 } finally {
1730 if (storeFile != null) {
1731 storeFile.closeReader(false);
1732 }
1733 }
1734 }
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750 @VisibleForTesting
1751 protected void completeCompaction(final Collection<StoreFile> compactedFiles)
1752 throws IOException {
1753 completeCompaction(compactedFiles, true);
1754 }
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771 @VisibleForTesting
1772 protected void completeCompaction(final Collection<StoreFile> compactedFiles, boolean removeFiles)
1773 throws IOException {
1774 try {
1775
1776
1777
1778
1779 notifyChangedReadersObservers();
1780
1781
1782
1783 LOG.debug("Removing store files after compaction...");
1784 boolean evictOnClose =
1785 cacheConf != null? cacheConf.shouldEvictOnClose(): true;
1786 for (StoreFile compactedFile : compactedFiles) {
1787 compactedFile.closeReader(evictOnClose);
1788 }
1789 if (removeFiles) {
1790 this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
1791 }
1792 } catch (IOException e) {
1793 e = RemoteExceptionHandler.checkIOException(e);
1794 LOG.error("Failed removing compacted files in " + this +
1795 ". Files we were trying to remove are " + compactedFiles.toString() +
1796 "; some of them may have been already removed", e);
1797 }
1798
1799
1800 this.storeSize = 0L;
1801 this.totalUncompressedBytes = 0L;
1802 for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1803 StoreFile.Reader r = hsf.getReader();
1804 if (r == null) {
1805 LOG.warn("StoreFile " + hsf + " has a null Reader");
1806 continue;
1807 }
1808 this.storeSize += r.length();
1809 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1810 }
1811 }
1812
1813
1814
1815
1816
1817 int versionsToReturn(final int wantedVersions) {
1818 if (wantedVersions <= 0) {
1819 throw new IllegalArgumentException("Number of versions must be > 0");
1820 }
1821
1822 int maxVersions = this.family.getMaxVersions();
1823 return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1824 }
1825
1826
1827
1828
1829
1830
1831 static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
1832
1833
1834 if (cell.getTagsLength() > 0) {
1835
1836
1837
1838 Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
1839 cell.getTagsLength());
1840 while (i.hasNext()) {
1841 Tag t = i.next();
1842 if (TagType.TTL_TAG_TYPE == t.getType()) {
1843
1844
1845 long ts = cell.getTimestamp();
1846 assert t.getTagLength() == Bytes.SIZEOF_LONG;
1847 long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
1848 if (ts + ttl < now) {
1849 return true;
1850 }
1851
1852
1853 break;
1854 }
1855 }
1856 }
1857 return false;
1858 }
1859
1860 @Override
1861 public Cell getRowKeyAtOrBefore(final byte[] row) throws IOException {
1862
1863
1864
1865
1866
1867
1868 long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.scanInfo.getTtl();
1869
1870 KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
1871
1872 GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
1873 this.comparator, kv, ttlToUse, this.getRegionInfo().isMetaRegion());
1874 this.lock.readLock().lock();
1875 try {
1876
1877 this.memstore.getRowKeyAtOrBefore(state);
1878
1879
1880 Iterator<StoreFile> sfIterator = this.storeEngine.getStoreFileManager()
1881 .getCandidateFilesForRowKeyBefore(state.getTargetKey());
1882 while (sfIterator.hasNext()) {
1883 StoreFile sf = sfIterator.next();
1884 sfIterator.remove();
1885 boolean haveNewCandidate = rowAtOrBeforeFromStoreFile(sf, state);
1886 Cell candidate = state.getCandidate();
1887
1888 if (candidate != null && CellUtil.matchingRow(candidate, row)) {
1889 return candidate;
1890 }
1891 if (haveNewCandidate) {
1892 sfIterator = this.storeEngine.getStoreFileManager().updateCandidateFilesForRowKeyBefore(
1893 sfIterator, state.getTargetKey(), candidate);
1894 }
1895 }
1896 return state.getCandidate();
1897 } finally {
1898 this.lock.readLock().unlock();
1899 }
1900 }
1901
1902
1903
1904
1905
1906
1907
1908
1909 private boolean rowAtOrBeforeFromStoreFile(final StoreFile f,
1910 final GetClosestRowBeforeTracker state)
1911 throws IOException {
1912 StoreFile.Reader r = f.getReader();
1913 if (r == null) {
1914 LOG.warn("StoreFile " + f + " has a null Reader");
1915 return false;
1916 }
1917 if (r.getEntries() == 0) {
1918 LOG.warn("StoreFile " + f + " is a empty store file");
1919 return false;
1920 }
1921
1922 byte [] fk = r.getFirstKey();
1923 if (fk == null) return false;
1924 KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1925 byte [] lk = r.getLastKey();
1926 KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1927 KeyValue firstOnRow = state.getTargetKey();
1928 if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
1929
1930
1931 if (!state.isTargetTable(lastKV)) return false;
1932
1933
1934 firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
1935 }
1936
1937 HFileScanner scanner = r.getScanner(true, true, false);
1938
1939 if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1940
1941
1942 if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1943
1944 while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
1945 firstOnRow.getKeyLength())) {
1946 Cell kv = scanner.getKeyValue();
1947 if (!state.isTargetTable(kv)) break;
1948 if (!state.isBetterCandidate(kv)) break;
1949
1950 firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
1951
1952 if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1953
1954 if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1955 }
1956 return false;
1957 }
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967 private boolean seekToScanner(final HFileScanner scanner,
1968 final KeyValue firstOnRow,
1969 final KeyValue firstKV)
1970 throws IOException {
1971 KeyValue kv = firstOnRow;
1972
1973 if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
1974 int result = scanner.seekTo(kv);
1975 return result != -1;
1976 }
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988 private boolean walkForwardInSingleRow(final HFileScanner scanner,
1989 final KeyValue firstOnRow,
1990 final GetClosestRowBeforeTracker state)
1991 throws IOException {
1992 boolean foundCandidate = false;
1993 do {
1994 Cell kv = scanner.getKeyValue();
1995
1996 if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
1997
1998 if (state.isTooFar(kv, firstOnRow)) break;
1999 if (state.isExpired(kv)) {
2000 continue;
2001 }
2002
2003 if (state.handle(kv)) {
2004 foundCandidate = true;
2005 break;
2006 }
2007 } while(scanner.next());
2008 return foundCandidate;
2009 }
2010
2011 @Override
2012 public boolean canSplit() {
2013 this.lock.readLock().lock();
2014 try {
2015
2016 boolean result = !hasReferences();
2017 if (!result && LOG.isDebugEnabled()) {
2018 LOG.debug("Cannot split region due to reference files being there");
2019 }
2020 return result;
2021 } finally {
2022 this.lock.readLock().unlock();
2023 }
2024 }
2025
2026 @Override
2027 public byte[] getSplitPoint() {
2028 this.lock.readLock().lock();
2029 try {
2030
2031 assert !this.getRegionInfo().isMetaRegion();
2032
2033 if (hasReferences()) {
2034 return null;
2035 }
2036 return this.storeEngine.getStoreFileManager().getSplitPoint();
2037 } catch(IOException e) {
2038 LOG.warn("Failed getting store size for " + this, e);
2039 } finally {
2040 this.lock.readLock().unlock();
2041 }
2042 return null;
2043 }
2044
2045 @Override
2046 public long getLastCompactSize() {
2047 return this.lastCompactSize;
2048 }
2049
2050 @Override
2051 public long getSize() {
2052 return storeSize;
2053 }
2054
2055 @Override
2056 public void triggerMajorCompaction() {
2057 this.forceMajor = true;
2058 }
2059
2060
2061
2062
2063
2064
2065 @Override
2066 public KeyValueScanner getScanner(Scan scan,
2067 final NavigableSet<byte []> targetCols, long readPt) throws IOException {
2068 lock.readLock().lock();
2069 try {
2070 KeyValueScanner scanner = null;
2071 if (this.getCoprocessorHost() != null) {
2072 scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
2073 }
2074 if (scanner == null) {
2075 scanner = scan.isReversed() ? new ReversedStoreScanner(this,
2076 getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
2077 getScanInfo(), scan, targetCols, readPt);
2078 }
2079 return scanner;
2080 } finally {
2081 lock.readLock().unlock();
2082 }
2083 }
2084
2085 @Override
2086 public String toString() {
2087 return this.getColumnFamilyName();
2088 }
2089
2090 @Override
2091 public int getStorefilesCount() {
2092 return this.storeEngine.getStoreFileManager().getStorefileCount();
2093 }
2094
2095 @Override
2096 public long getStoreSizeUncompressed() {
2097 return this.totalUncompressedBytes;
2098 }
2099
2100 @Override
2101 public long getStorefilesSize() {
2102 long size = 0;
2103 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
2104 StoreFile.Reader r = s.getReader();
2105 if (r == null) {
2106 LOG.warn("StoreFile " + s + " has a null Reader");
2107 continue;
2108 }
2109 size += r.length();
2110 }
2111 return size;
2112 }
2113
2114 @Override
2115 public long getStorefilesIndexSize() {
2116 long size = 0;
2117 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
2118 StoreFile.Reader r = s.getReader();
2119 if (r == null) {
2120 LOG.warn("StoreFile " + s + " has a null Reader");
2121 continue;
2122 }
2123 size += r.indexSize();
2124 }
2125 return size;
2126 }
2127
2128 @Override
2129 public long getTotalStaticIndexSize() {
2130 long size = 0;
2131 for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2132 StoreFile.Reader r = s.getReader();
2133 if (r == null) {
2134 continue;
2135 }
2136 size += r.getUncompressedDataIndexSize();
2137 }
2138 return size;
2139 }
2140
2141 @Override
2142 public long getTotalStaticBloomSize() {
2143 long size = 0;
2144 for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2145 StoreFile.Reader r = s.getReader();
2146 if (r == null) {
2147 continue;
2148 }
2149 size += r.getTotalBloomSize();
2150 }
2151 return size;
2152 }
2153
2154 @Override
2155 public long getMemStoreSize() {
2156 return this.memstore.size();
2157 }
2158
2159 @Override
2160 public int getCompactPriority() {
2161 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
2162 if (priority == PRIORITY_USER) {
2163 LOG.warn("Compaction priority is USER despite there being no user compaction");
2164 }
2165 return priority;
2166 }
2167
2168 @Override
2169 public boolean throttleCompaction(long compactionSize) {
2170 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
2171 }
2172
2173 public HRegion getHRegion() {
2174 return this.region;
2175 }
2176
2177 @Override
2178 public RegionCoprocessorHost getCoprocessorHost() {
2179 return this.region.getCoprocessorHost();
2180 }
2181
2182 @Override
2183 public HRegionInfo getRegionInfo() {
2184 return this.fs.getRegionInfo();
2185 }
2186
2187 @Override
2188 public boolean areWritesEnabled() {
2189 return this.region.areWritesEnabled();
2190 }
2191
2192 @Override
2193 public long getSmallestReadPoint() {
2194 return this.region.getSmallestReadPoint();
2195 }
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210 public long updateColumnValue(byte [] row, byte [] f,
2211 byte [] qualifier, long newValue)
2212 throws IOException {
2213
2214 this.lock.readLock().lock();
2215 try {
2216 long now = EnvironmentEdgeManager.currentTime();
2217
2218 return this.memstore.updateColumnValue(row,
2219 f,
2220 qualifier,
2221 newValue,
2222 now);
2223
2224 } finally {
2225 this.lock.readLock().unlock();
2226 }
2227 }
2228
2229 @Override
2230 public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
2231 this.lock.readLock().lock();
2232 try {
2233 return this.memstore.upsert(cells, readpoint);
2234 } finally {
2235 this.lock.readLock().unlock();
2236 }
2237 }
2238
2239 @Override
2240 public StoreFlushContext createFlushContext(long cacheFlushId) {
2241 return new StoreFlusherImpl(cacheFlushId);
2242 }
2243
2244 private final class StoreFlusherImpl implements StoreFlushContext {
2245
2246 private long cacheFlushSeqNum;
2247 private MemStoreSnapshot snapshot;
2248 private List<Path> tempFiles;
2249 private List<Path> committedFiles;
2250 private long cacheFlushCount;
2251 private long cacheFlushSize;
2252
2253 private StoreFlusherImpl(long cacheFlushSeqNum) {
2254 this.cacheFlushSeqNum = cacheFlushSeqNum;
2255 }
2256
2257
2258
2259
2260
2261 @Override
2262 public void prepare() {
2263 this.snapshot = memstore.snapshot();
2264 this.cacheFlushCount = snapshot.getCellsCount();
2265 this.cacheFlushSize = snapshot.getSize();
2266 committedFiles = new ArrayList<Path>(1);
2267 }
2268
2269 @Override
2270 public void flushCache(MonitoredTask status) throws IOException {
2271 tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status);
2272 }
2273
2274 @Override
2275 public boolean commit(MonitoredTask status) throws IOException {
2276 if (this.tempFiles == null || this.tempFiles.isEmpty()) {
2277 return false;
2278 }
2279 List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
2280 for (Path storeFilePath : tempFiles) {
2281 try {
2282 storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status));
2283 } catch (IOException ex) {
2284 LOG.error("Failed to commit store file " + storeFilePath, ex);
2285
2286 for (StoreFile sf : storeFiles) {
2287 Path pathToDelete = sf.getPath();
2288 try {
2289 sf.deleteReader();
2290 } catch (IOException deleteEx) {
2291 LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
2292 Runtime.getRuntime().halt(1);
2293 }
2294 }
2295 throw new IOException("Failed to commit the flush", ex);
2296 }
2297 }
2298
2299 for (StoreFile sf : storeFiles) {
2300 if (HStore.this.getCoprocessorHost() != null) {
2301 HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
2302 }
2303 committedFiles.add(sf.getPath());
2304 }
2305
2306 HStore.this.flushedCellsCount += cacheFlushCount;
2307 HStore.this.flushedCellsSize += cacheFlushSize;
2308
2309
2310 return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
2311 }
2312
2313 @Override
2314 public List<Path> getCommittedFiles() {
2315 return committedFiles;
2316 }
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326 @Override
2327 public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
2328 throws IOException {
2329 List<StoreFile> storeFiles = new ArrayList<StoreFile>(fileNames.size());
2330 for (String file : fileNames) {
2331
2332 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file);
2333 StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
2334 storeFiles.add(storeFile);
2335 HStore.this.storeSize += storeFile.getReader().length();
2336 HStore.this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
2337 if (LOG.isInfoEnabled()) {
2338 LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() +
2339 " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() +
2340 ", sequenceid=" + + storeFile.getReader().getSequenceID() +
2341 ", filesize=" + StringUtils.humanReadableInt(storeFile.getReader().length()));
2342 }
2343 }
2344
2345 long snapshotId = -1;
2346 if (dropMemstoreSnapshot && snapshot != null) {
2347 snapshotId = snapshot.getId();
2348 }
2349 HStore.this.updateStorefiles(storeFiles, snapshotId);
2350 }
2351
2352
2353
2354
2355
2356 @Override
2357 public void abort() throws IOException {
2358 if (snapshot == null) {
2359 return;
2360 }
2361 HStore.this.updateStorefiles(new ArrayList<StoreFile>(0), snapshot.getId());
2362 }
2363 }
2364
2365 @Override
2366 public boolean needsCompaction() {
2367 return this.storeEngine.needsCompaction(this.filesCompacting);
2368 }
2369
2370 @Override
2371 public CacheConfig getCacheConfig() {
2372 return this.cacheConf;
2373 }
2374
2375 public static final long FIXED_OVERHEAD =
2376 ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
2377 + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
2378
2379 public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
2380 + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
2381 + ClassSize.CONCURRENT_SKIPLISTMAP
2382 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
2383 + ScanInfo.FIXED_OVERHEAD);
2384
2385 @Override
2386 public long heapSize() {
2387 return DEEP_OVERHEAD + this.memstore.heapSize();
2388 }
2389
2390 @Override
2391 public KeyValue.KVComparator getComparator() {
2392 return comparator;
2393 }
2394
2395 @Override
2396 public ScanInfo getScanInfo() {
2397 return scanInfo;
2398 }
2399
2400
2401
2402
2403
2404 void setScanInfo(ScanInfo scanInfo) {
2405 this.scanInfo = scanInfo;
2406 }
2407
2408 @Override
2409 public boolean hasTooManyStoreFiles() {
2410 return getStorefilesCount() > this.blockingFileCount;
2411 }
2412
2413 @Override
2414 public long getFlushedCellsCount() {
2415 return flushedCellsCount;
2416 }
2417
2418 @Override
2419 public long getFlushedCellsSize() {
2420 return flushedCellsSize;
2421 }
2422
2423 @Override
2424 public long getCompactedCellsCount() {
2425 return compactedCellsCount;
2426 }
2427
2428 @Override
2429 public long getCompactedCellsSize() {
2430 return compactedCellsSize;
2431 }
2432
2433 @Override
2434 public long getMajorCompactedCellsCount() {
2435 return majorCompactedCellsCount;
2436 }
2437
2438 @Override
2439 public long getMajorCompactedCellsSize() {
2440 return majorCompactedCellsSize;
2441 }
2442
2443
2444
2445
2446
2447 @VisibleForTesting
2448 public StoreEngine<?, ?, ?, ?> getStoreEngine() {
2449 return this.storeEngine;
2450 }
2451
2452 protected OffPeakHours getOffPeakHours() {
2453 return this.offPeakHours;
2454 }
2455
2456
2457
2458
2459 @Override
2460 public void onConfigurationChange(Configuration conf) {
2461 this.conf = new CompoundConfiguration()
2462 .add(conf)
2463 .addWritableMap(family.getValues());
2464 this.storeEngine.compactionPolicy.setConf(conf);
2465 this.offPeakHours = OffPeakHours.getInstance(conf);
2466 }
2467
2468
2469
2470
2471 @Override
2472 public void registerChildren(ConfigurationManager manager) {
2473
2474 }
2475
2476
2477
2478
2479 @Override
2480 public void deregisterChildren(ConfigurationManager manager) {
2481
2482 }
2483
2484 @Override
2485 public double getCompactionPressure() {
2486 return storeEngine.getStoreFileManager().getCompactionPressure();
2487 }
2488
2489 @Override
2490 public boolean isPrimaryReplicaStore() {
2491 return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;
2492 }
2493 }