1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Map.Entry;
31 import java.util.TreeMap;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.classification.InterfaceAudience;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.KeyValue;
39 import org.apache.hadoop.hbase.KeyValue.KVComparator;
40 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hbase.util.ConcatenatedLists;
43 import org.apache.hadoop.util.StringUtils;
44
45 import com.google.common.collect.ImmutableCollection;
46 import com.google.common.collect.ImmutableList;
47 import com.google.common.collect.Lists;
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 @InterfaceAudience.Private
67 public class StripeStoreFileManager
68 implements StoreFileManager, StripeCompactionPolicy.StripeInformationProvider {
69 static final Log LOG = LogFactory.getLog(StripeStoreFileManager.class);
70
71
72
73
74 public static final byte[] STRIPE_START_KEY = Bytes.toBytes("STRIPE_START_KEY");
75 public static final byte[] STRIPE_END_KEY = Bytes.toBytes("STRIPE_END_KEY");
76
77 private final static Bytes.RowEndKeyComparator MAP_COMPARATOR = new Bytes.RowEndKeyComparator();
78
79
80
81
82 public final static byte[] OPEN_KEY = HConstants.EMPTY_BYTE_ARRAY;
83 final static byte[] INVALID_KEY = null;
84
85
86
87
88
89 private static class State {
90
91
92
93
94
95 public byte[][] stripeEndRows = new byte[0][];
96
97
98
99
100
101
102 public ArrayList<ImmutableList<StoreFile>> stripeFiles
103 = new ArrayList<ImmutableList<StoreFile>>();
104
105 public ImmutableList<StoreFile> level0Files = ImmutableList.<StoreFile>of();
106
107
108 public ImmutableList<StoreFile> allFilesCached = ImmutableList.<StoreFile>of();
109 }
110 private State state = null;
111
112
113 private HashMap<StoreFile, byte[]> fileStarts = new HashMap<StoreFile, byte[]>();
114 private HashMap<StoreFile, byte[]> fileEnds = new HashMap<StoreFile, byte[]>();
115
116
117
118 private static final byte[] INVALID_KEY_IN_MAP = new byte[0];
119
120 private final KVComparator kvComparator;
121 private StripeStoreConfig config;
122
123 private final int blockingFileCount;
124
125 public StripeStoreFileManager(
126 KVComparator kvComparator, Configuration conf, StripeStoreConfig config) {
127 this.kvComparator = kvComparator;
128 this.config = config;
129 this.blockingFileCount = conf.getInt(
130 HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
131 }
132
133 @Override
134 public void loadFiles(List<StoreFile> storeFiles) {
135 loadUnclassifiedStoreFiles(storeFiles);
136 }
137
138 @Override
139 public Collection<StoreFile> getStorefiles() {
140 return state.allFilesCached;
141 }
142
143 @Override
144 public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
145 CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
146 cmc.mergeResults(null, sfs);
147 debugDumpState("Added new files");
148 }
149
150 @Override
151 public ImmutableCollection<StoreFile> clearFiles() {
152 ImmutableCollection<StoreFile> result = state.allFilesCached;
153 this.state = new State();
154 this.fileStarts.clear();
155 this.fileEnds.clear();
156 return result;
157 }
158
159 @Override
160 public int getStorefileCount() {
161 return state.allFilesCached.size();
162 }
163
164
165
166 @Override
167 public Iterator<StoreFile> getCandidateFilesForRowKeyBefore(final KeyValue targetKey) {
168 KeyBeforeConcatenatedLists result = new KeyBeforeConcatenatedLists();
169
170 result.addSublist(state.level0Files);
171 if (!state.stripeFiles.isEmpty()) {
172 int lastStripeIndex = findStripeForRow(targetKey.getRow(), false);
173 for (int stripeIndex = lastStripeIndex; stripeIndex >= 0; --stripeIndex) {
174 result.addSublist(state.stripeFiles.get(stripeIndex));
175 }
176 }
177 return result.iterator();
178 }
179
180
181
182
183 @Override
184 public Iterator<StoreFile> updateCandidateFilesForRowKeyBefore(
185 Iterator<StoreFile> candidateFiles, final KeyValue targetKey, final KeyValue candidate) {
186 KeyBeforeConcatenatedLists.Iterator original =
187 (KeyBeforeConcatenatedLists.Iterator)candidateFiles;
188 assert original != null;
189 ArrayList<List<StoreFile>> components = original.getComponents();
190 for (int firstIrrelevant = 0; firstIrrelevant < components.size(); ++firstIrrelevant) {
191 StoreFile sf = components.get(firstIrrelevant).get(0);
192 byte[] endKey = endOf(sf);
193
194
195
196 if (!isInvalid(endKey) && !isOpen(endKey)
197 && (nonOpenRowCompare(endKey, targetKey.getRow()) <= 0)) {
198 original.removeComponents(firstIrrelevant);
199 break;
200 }
201 }
202 return original;
203 }
204
205 @Override
206
207
208
209
210
211
212
213 public byte[] getSplitPoint() throws IOException {
214 if (this.getStorefileCount() == 0) return null;
215 if (state.stripeFiles.size() <= 1) {
216 return getSplitPointFromAllFiles();
217 }
218 int leftIndex = -1, rightIndex = state.stripeFiles.size();
219 long leftSize = 0, rightSize = 0;
220 long lastLeftSize = 0, lastRightSize = 0;
221 while (rightIndex - 1 != leftIndex) {
222 if (leftSize >= rightSize) {
223 --rightIndex;
224 lastRightSize = getStripeFilesSize(rightIndex);
225 rightSize += lastRightSize;
226 } else {
227 ++leftIndex;
228 lastLeftSize = getStripeFilesSize(leftIndex);
229 leftSize += lastLeftSize;
230 }
231 }
232 if (leftSize == 0 || rightSize == 0) {
233 String errMsg = String.format("Cannot split on a boundary - left index %d size %d, "
234 + "right index %d size %d", leftIndex, leftSize, rightIndex, rightSize);
235 debugDumpState(errMsg);
236 LOG.warn(errMsg);
237 return getSplitPointFromAllFiles();
238 }
239 double ratio = (double)rightSize / leftSize;
240 if (ratio < 1) {
241 ratio = 1 / ratio;
242 }
243 if (config.getMaxSplitImbalance() > ratio) return state.stripeEndRows[leftIndex];
244
245
246
247
248
249 boolean isRightLarger = rightSize >= leftSize;
250 double newRatio = isRightLarger
251 ? getMidStripeSplitRatio(leftSize, rightSize, lastRightSize)
252 : getMidStripeSplitRatio(rightSize, leftSize, lastLeftSize);
253 if (newRatio < 1) {
254 newRatio = 1 / newRatio;
255 }
256 if (newRatio >= ratio) return state.stripeEndRows[leftIndex];
257 LOG.debug("Splitting the stripe - ratio w/o split " + ratio + ", ratio with split "
258 + newRatio + " configured ratio " + config.getMaxSplitImbalance());
259
260 return StoreUtils.getLargestFile(state.stripeFiles.get(
261 isRightLarger ? rightIndex : leftIndex)).getFileSplitPoint(this.kvComparator);
262 }
263
264 private byte[] getSplitPointFromAllFiles() throws IOException {
265 ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
266 sfs.addSublist(state.level0Files);
267 sfs.addAllSublists(state.stripeFiles);
268 if (sfs.isEmpty()) return null;
269 return StoreUtils.getLargestFile(sfs).getFileSplitPoint(this.kvComparator);
270 }
271
272 private double getMidStripeSplitRatio(long smallerSize, long largerSize, long lastLargerSize) {
273 return (double)(largerSize - lastLargerSize / 2f) / (smallerSize + lastLargerSize / 2f);
274 }
275
276 @Override
277 public Collection<StoreFile> getFilesForScanOrGet(
278 boolean isGet, byte[] startRow, byte[] stopRow) {
279 if (state.stripeFiles.isEmpty()) {
280 return state.level0Files;
281 }
282
283 int firstStripe = findStripeForRow(startRow, true);
284 int lastStripe = findStripeForRow(stopRow, false);
285 assert firstStripe <= lastStripe;
286 if (firstStripe == lastStripe && state.level0Files.isEmpty()) {
287 return state.stripeFiles.get(firstStripe);
288 }
289 if (firstStripe == 0 && lastStripe == (state.stripeFiles.size() - 1)) {
290 return state.allFilesCached;
291 }
292
293 ConcatenatedLists<StoreFile> result = new ConcatenatedLists<StoreFile>();
294 result.addAllSublists(state.stripeFiles.subList(firstStripe, lastStripe + 1));
295 result.addSublist(state.level0Files);
296 return result;
297 }
298
299 @Override
300 public void addCompactionResults(
301 Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException {
302
303 LOG.debug("Attempting to merge compaction results: " + compactedFiles.size()
304 + " files replaced by " + results.size());
305
306
307 CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
308 cmc.mergeResults(compactedFiles, results);
309 debugDumpState("Merged compaction results");
310 }
311
312 @Override
313 public int getStoreCompactionPriority() {
314
315
316
317 int fc = getStorefileCount();
318 if (state.stripeFiles.isEmpty() || (this.blockingFileCount <= fc)) {
319 return this.blockingFileCount - fc;
320 }
321
322
323
324 int l0 = state.level0Files.size(), sc = state.stripeFiles.size();
325 int priority = (int)Math.ceil(((double)(this.blockingFileCount - fc + l0) / sc) - l0);
326 return (priority <= HStore.PRIORITY_USER) ? (HStore.PRIORITY_USER + 1) : priority;
327 }
328
329
330
331
332
333
334 private long getStripeFilesSize(int stripeIndex) {
335 long result = 0;
336 for (StoreFile sf : state.stripeFiles.get(stripeIndex)) {
337 result += sf.getReader().length();
338 }
339 return result;
340 }
341
342
343
344
345
346
347
348
349 private void loadUnclassifiedStoreFiles(List<StoreFile> storeFiles) {
350 LOG.debug("Attempting to load " + storeFiles.size() + " store files.");
351 TreeMap<byte[], ArrayList<StoreFile>> candidateStripes =
352 new TreeMap<byte[], ArrayList<StoreFile>>(MAP_COMPARATOR);
353 ArrayList<StoreFile> level0Files = new ArrayList<StoreFile>();
354
355
356 for (StoreFile sf : storeFiles) {
357 byte[] startRow = startOf(sf), endRow = endOf(sf);
358
359 if (isInvalid(startRow) || isInvalid(endRow)) {
360 insertFileIntoStripe(level0Files, sf);
361 ensureLevel0Metadata(sf);
362 } else if (!isOpen(startRow) && !isOpen(endRow) &&
363 nonOpenRowCompare(startRow, endRow) >= 0) {
364 LOG.error("Unexpected metadata - start row [" + Bytes.toString(startRow) + "], end row ["
365 + Bytes.toString(endRow) + "] in file [" + sf.getPath() + "], pushing to L0");
366 insertFileIntoStripe(level0Files, sf);
367 ensureLevel0Metadata(sf);
368 } else {
369 ArrayList<StoreFile> stripe = candidateStripes.get(endRow);
370 if (stripe == null) {
371 stripe = new ArrayList<StoreFile>();
372 candidateStripes.put(endRow, stripe);
373 }
374 insertFileIntoStripe(stripe, sf);
375 }
376 }
377
378
379
380 boolean hasOverlaps = false;
381 byte[] expectedStartRow = null;
382 Iterator<Map.Entry<byte[], ArrayList<StoreFile>>> entryIter =
383 candidateStripes.entrySet().iterator();
384 while (entryIter.hasNext()) {
385 Map.Entry<byte[], ArrayList<StoreFile>> entry = entryIter.next();
386 ArrayList<StoreFile> files = entry.getValue();
387
388 for (int i = 0; i < files.size(); ++i) {
389 StoreFile sf = files.get(i);
390 byte[] startRow = startOf(sf);
391 if (expectedStartRow == null) {
392 expectedStartRow = startRow;
393 } else if (!rowEquals(expectedStartRow, startRow)) {
394 hasOverlaps = true;
395 LOG.warn("Store file doesn't fit into the tentative stripes - expected to start at ["
396 + Bytes.toString(expectedStartRow) + "], but starts at [" + Bytes.toString(startRow)
397 + "], to L0 it goes");
398 StoreFile badSf = files.remove(i);
399 insertFileIntoStripe(level0Files, badSf);
400 ensureLevel0Metadata(badSf);
401 --i;
402 }
403 }
404
405 byte[] endRow = entry.getKey();
406 if (!files.isEmpty()) {
407 expectedStartRow = endRow;
408 } else {
409 entryIter.remove();
410 }
411 }
412
413
414
415
416
417 if (!candidateStripes.isEmpty()) {
418 StoreFile firstFile = candidateStripes.firstEntry().getValue().get(0);
419 boolean isOpen = isOpen(startOf(firstFile)) && isOpen(candidateStripes.lastKey());
420 if (!isOpen) {
421 LOG.warn("The range of the loaded files does not cover full key space: from ["
422 + Bytes.toString(startOf(firstFile)) + "], to ["
423 + Bytes.toString(candidateStripes.lastKey()) + "]");
424 if (!hasOverlaps) {
425 ensureEdgeStripeMetadata(candidateStripes.firstEntry().getValue(), true);
426 ensureEdgeStripeMetadata(candidateStripes.lastEntry().getValue(), false);
427 } else {
428 LOG.warn("Inconsistent files, everything goes to L0.");
429 for (ArrayList<StoreFile> files : candidateStripes.values()) {
430 for (StoreFile sf : files) {
431 insertFileIntoStripe(level0Files, sf);
432 ensureLevel0Metadata(sf);
433 }
434 }
435 candidateStripes.clear();
436 }
437 }
438 }
439
440
441 State state = new State();
442 state.level0Files = ImmutableList.copyOf(level0Files);
443 state.stripeFiles = new ArrayList<ImmutableList<StoreFile>>(candidateStripes.size());
444 state.stripeEndRows = new byte[Math.max(0, candidateStripes.size() - 1)][];
445 ArrayList<StoreFile> newAllFiles = new ArrayList<StoreFile>(level0Files);
446 int i = candidateStripes.size() - 1;
447 for (Map.Entry<byte[], ArrayList<StoreFile>> entry : candidateStripes.entrySet()) {
448 state.stripeFiles.add(ImmutableList.copyOf(entry.getValue()));
449 newAllFiles.addAll(entry.getValue());
450 if (i > 0) {
451 state.stripeEndRows[state.stripeFiles.size() - 1] = entry.getKey();
452 }
453 --i;
454 }
455 state.allFilesCached = ImmutableList.copyOf(newAllFiles);
456 this.state = state;
457 debugDumpState("Files loaded");
458 }
459
460 private void ensureEdgeStripeMetadata(ArrayList<StoreFile> stripe, boolean isFirst) {
461 HashMap<StoreFile, byte[]> targetMap = isFirst ? fileStarts : fileEnds;
462 for (StoreFile sf : stripe) {
463 targetMap.put(sf, OPEN_KEY);
464 }
465 }
466
467 private void ensureLevel0Metadata(StoreFile sf) {
468 if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, INVALID_KEY_IN_MAP);
469 if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, INVALID_KEY_IN_MAP);
470 }
471
472 private void debugDumpState(String string) {
473 if (!LOG.isDebugEnabled()) return;
474 StringBuilder sb = new StringBuilder();
475 sb.append("\n" + string + "; current stripe state is as such:");
476 sb.append("\n level 0 with ").append(state.level0Files.size())
477 .append(
478 " files: "
479 + StringUtils.humanReadableInt(StripeCompactionPolicy
480 .getTotalFileSize(state.level0Files)) + ";");
481 for (int i = 0; i < state.stripeFiles.size(); ++i) {
482 String endRow = (i == state.stripeEndRows.length)
483 ? "(end)" : "[" + Bytes.toString(state.stripeEndRows[i]) + "]";
484 sb.append("\n stripe ending in ").append(endRow).append(" with ")
485 .append(state.stripeFiles.get(i).size())
486 .append(
487 " files: "
488 + StringUtils.humanReadableInt(StripeCompactionPolicy
489 .getTotalFileSize(state.stripeFiles.get(i))) + ";");
490 }
491 sb.append("\n").append(state.stripeFiles.size()).append(" stripes total.");
492 sb.append("\n").append(getStorefileCount()).append(" files total.");
493 LOG.debug(sb.toString());
494 }
495
496
497
498
499 private static final boolean isOpen(byte[] key) {
500 return key != null && key.length == 0;
501 }
502
503
504
505
506 private static final boolean isInvalid(byte[] key) {
507 return key == INVALID_KEY;
508 }
509
510
511
512
513 private final boolean rowEquals(byte[] k1, byte[] k2) {
514 return kvComparator.matchingRows(k1, 0, k1.length, k2, 0, k2.length);
515 }
516
517
518
519
520 private final int nonOpenRowCompare(byte[] k1, byte[] k2) {
521 assert !isOpen(k1) && !isOpen(k2);
522 return kvComparator.compareRows(k1, 0, k1.length, k2, 0, k2.length);
523 }
524
525
526
527
528 private final int findStripeIndexByEndRow(byte[] endRow) {
529 assert !isInvalid(endRow);
530 if (isOpen(endRow)) return state.stripeEndRows.length;
531 return Arrays.binarySearch(state.stripeEndRows, endRow, Bytes.BYTES_COMPARATOR);
532 }
533
534
535
536
537 private final int findStripeForRow(byte[] row, boolean isStart) {
538 if (isStart && row == HConstants.EMPTY_START_ROW) return 0;
539 if (!isStart && row == HConstants.EMPTY_END_ROW) return state.stripeFiles.size() - 1;
540
541
542
543
544
545 return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1);
546 }
547
548 @Override
549 public final byte[] getStartRow(int stripeIndex) {
550 return (stripeIndex == 0 ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]);
551 }
552
553 @Override
554 public final byte[] getEndRow(int stripeIndex) {
555 return (stripeIndex == state.stripeEndRows.length
556 ? OPEN_KEY : state.stripeEndRows[stripeIndex]);
557 }
558
559
560 private byte[] startOf(StoreFile sf) {
561 byte[] result = this.fileStarts.get(sf);
562 return result == null ? sf.getMetadataValue(STRIPE_START_KEY)
563 : (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
564 }
565
566 private byte[] endOf(StoreFile sf) {
567 byte[] result = this.fileEnds.get(sf);
568 return result == null ? sf.getMetadataValue(STRIPE_END_KEY)
569 : (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
570 }
571
572
573
574
575
576
577 private static void insertFileIntoStripe(ArrayList<StoreFile> stripe, StoreFile sf) {
578
579
580 for (int insertBefore = 0; ; ++insertBefore) {
581 if (insertBefore == stripe.size()
582 || (StoreFile.Comparators.SEQ_ID.compare(sf, stripe.get(insertBefore)) >= 0)) {
583 stripe.add(insertBefore, sf);
584 break;
585 }
586 }
587 }
588
589
590
591
592
593
594
595
596
597
598 private static class KeyBeforeConcatenatedLists extends ConcatenatedLists<StoreFile> {
599 @Override
600 public java.util.Iterator<StoreFile> iterator() {
601 return new Iterator();
602 }
603
604 public class Iterator extends ConcatenatedLists<StoreFile>.Iterator {
605 public ArrayList<List<StoreFile>> getComponents() {
606 return components;
607 }
608
609 public void removeComponents(int startIndex) {
610 List<List<StoreFile>> subList = components.subList(startIndex, components.size());
611 for (List<StoreFile> entry : subList) {
612 size -= entry.size();
613 }
614 assert size >= 0;
615 subList.clear();
616 }
617
618 @Override
619 public void remove() {
620 if (!this.nextWasCalled) {
621 throw new IllegalStateException("No element to remove");
622 }
623 this.nextWasCalled = false;
624 List<StoreFile> src = components.get(currentComponent);
625 if (src instanceof ImmutableList<?>) {
626 src = new ArrayList<StoreFile>(src);
627 components.set(currentComponent, src);
628 }
629 src.remove(indexWithinComponent);
630 --size;
631 --indexWithinComponent;
632 if (src.isEmpty()) {
633 components.remove(currentComponent);
634 }
635 }
636 }
637 }
638
639
640
641
642
643
644 private class CompactionOrFlushMergeCopy {
645 private ArrayList<List<StoreFile>> stripeFiles = null;
646 private ArrayList<StoreFile> level0Files = null;
647 private ArrayList<byte[]> stripeEndRows = null;
648
649 private Collection<StoreFile> compactedFiles = null;
650 private Collection<StoreFile> results = null;
651
652 private List<StoreFile> l0Results = new ArrayList<StoreFile>();
653 private final boolean isFlush;
654
655 public CompactionOrFlushMergeCopy(boolean isFlush) {
656
657 this.stripeFiles = new ArrayList<List<StoreFile>>(
658 StripeStoreFileManager.this.state.stripeFiles);
659 this.isFlush = isFlush;
660 }
661
662 public void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
663 throws IOException {
664 assert this.compactedFiles == null && this.results == null;
665 this.compactedFiles = compactedFiles;
666 this.results = results;
667
668 if (!isFlush) removeCompactedFiles();
669 TreeMap<byte[], StoreFile> newStripes = processResults();
670 if (newStripes != null) {
671 processNewCandidateStripes(newStripes);
672 }
673
674 State state = createNewState();
675 StripeStoreFileManager.this.state = state;
676 updateMetadataMaps();
677 }
678
679 private State createNewState() {
680 State oldState = StripeStoreFileManager.this.state;
681
682 assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
683 State newState = new State();
684 newState.level0Files = (this.level0Files == null) ? oldState.level0Files
685 : ImmutableList.copyOf(this.level0Files);
686 newState.stripeEndRows = (this.stripeEndRows == null) ? oldState.stripeEndRows
687 : this.stripeEndRows.toArray(new byte[this.stripeEndRows.size()][]);
688 newState.stripeFiles = new ArrayList<ImmutableList<StoreFile>>(this.stripeFiles.size());
689 for (List<StoreFile> newStripe : this.stripeFiles) {
690 newState.stripeFiles.add(newStripe instanceof ImmutableList<?>
691 ? (ImmutableList<StoreFile>)newStripe : ImmutableList.copyOf(newStripe));
692 }
693
694 List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached);
695 if (!isFlush) newAllFiles.removeAll(compactedFiles);
696 newAllFiles.addAll(results);
697 newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
698 return newState;
699 }
700
701 private void updateMetadataMaps() {
702 StripeStoreFileManager parent = StripeStoreFileManager.this;
703 if (!isFlush) {
704 for (StoreFile sf : this.compactedFiles) {
705 parent.fileStarts.remove(sf);
706 parent.fileEnds.remove(sf);
707 }
708 }
709 if (this.l0Results != null) {
710 for (StoreFile sf : this.l0Results) {
711 parent.ensureLevel0Metadata(sf);
712 }
713 }
714 }
715
716
717
718
719
720 private final ArrayList<StoreFile> getStripeCopy(int index) {
721 List<StoreFile> stripeCopy = this.stripeFiles.get(index);
722 ArrayList<StoreFile> result = null;
723 if (stripeCopy instanceof ImmutableList<?>) {
724 result = new ArrayList<StoreFile>(stripeCopy);
725 this.stripeFiles.set(index, result);
726 } else {
727 result = (ArrayList<StoreFile>)stripeCopy;
728 }
729 return result;
730 }
731
732
733
734
735 private final ArrayList<StoreFile> getLevel0Copy() {
736 if (this.level0Files == null) {
737 this.level0Files = new ArrayList<StoreFile>(StripeStoreFileManager.this.state.level0Files);
738 }
739 return this.level0Files;
740 }
741
742
743
744
745
746
747 private TreeMap<byte[], StoreFile> processResults() throws IOException {
748 TreeMap<byte[], StoreFile> newStripes = null;
749 for (StoreFile sf : this.results) {
750 byte[] startRow = startOf(sf), endRow = endOf(sf);
751 if (isInvalid(endRow) || isInvalid(startRow)) {
752 if (!isFlush) {
753 LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath());
754 }
755 insertFileIntoStripe(getLevel0Copy(), sf);
756 this.l0Results.add(sf);
757 continue;
758 }
759 if (!this.stripeFiles.isEmpty()) {
760 int stripeIndex = findStripeIndexByEndRow(endRow);
761 if ((stripeIndex >= 0) && rowEquals(getStartRow(stripeIndex), startRow)) {
762
763 insertFileIntoStripe(getStripeCopy(stripeIndex), sf);
764 continue;
765 }
766 }
767
768
769 if (newStripes == null) {
770 newStripes = new TreeMap<byte[], StoreFile>(MAP_COMPARATOR);
771 }
772 StoreFile oldSf = newStripes.put(endRow, sf);
773 if (oldSf != null) {
774 throw new IOException("Compactor has produced multiple files for the stripe ending in ["
775 + Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath());
776 }
777 }
778 return newStripes;
779 }
780
781
782
783
784
785 private void removeCompactedFiles() throws IOException {
786 for (StoreFile oldFile : this.compactedFiles) {
787 byte[] oldEndRow = endOf(oldFile);
788 List<StoreFile> source = null;
789 if (isInvalid(oldEndRow)) {
790 source = getLevel0Copy();
791 } else {
792 int stripeIndex = findStripeIndexByEndRow(oldEndRow);
793 if (stripeIndex < 0) {
794 throw new IOException("An allegedly compacted file [" + oldFile + "] does not belong"
795 + " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])");
796 }
797 source = getStripeCopy(stripeIndex);
798 }
799 if (!source.remove(oldFile)) {
800 throw new IOException("An allegedly compacted file [" + oldFile + "] was not found");
801 }
802 }
803 }
804
805
806
807
808
809
810 private void processNewCandidateStripes(
811 TreeMap<byte[], StoreFile> newStripes) throws IOException {
812
813 boolean hasStripes = !this.stripeFiles.isEmpty();
814 this.stripeEndRows = new ArrayList<byte[]>(
815 Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows));
816 int removeFrom = 0;
817 byte[] firstStartRow = startOf(newStripes.firstEntry().getValue());
818 byte[] lastEndRow = newStripes.lastKey();
819 if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) {
820 throw new IOException("Newly created stripes do not cover the entire key space.");
821 }
822
823 boolean canAddNewStripes = true;
824 Collection<StoreFile> filesForL0 = null;
825 if (hasStripes) {
826
827
828 if (isOpen(firstStartRow)) {
829 removeFrom = 0;
830 } else {
831 removeFrom = findStripeIndexByEndRow(firstStartRow);
832 if (removeFrom < 0) throw new IOException("Compaction is trying to add a bad range.");
833 ++removeFrom;
834 }
835 int removeTo = findStripeIndexByEndRow(lastEndRow);
836 if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range.");
837
838 ArrayList<StoreFile> conflictingFiles = new ArrayList<StoreFile>();
839 for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
840 conflictingFiles.addAll(this.stripeFiles.get(removeIndex));
841 }
842 if (!conflictingFiles.isEmpty()) {
843
844
845
846 if (isFlush) {
847 long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values());
848 LOG.warn("Stripes were created by a flush, but results of size " + newSize
849 + " cannot be added because the stripes have changed");
850 canAddNewStripes = false;
851 filesForL0 = newStripes.values();
852 } else {
853 long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles);
854 LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) "
855 + " of size " + oldSize + " are moved to L0 due to concurrent stripe change");
856 filesForL0 = conflictingFiles;
857 }
858 if (filesForL0 != null) {
859 for (StoreFile sf : filesForL0) {
860 insertFileIntoStripe(getLevel0Copy(), sf);
861 }
862 l0Results.addAll(filesForL0);
863 }
864 }
865
866 if (canAddNewStripes) {
867
868 int originalCount = this.stripeFiles.size();
869 for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
870 if (removeIndex != originalCount - 1) {
871 this.stripeEndRows.remove(removeIndex);
872 }
873 this.stripeFiles.remove(removeIndex);
874 }
875 }
876 }
877
878 if (!canAddNewStripes) return;
879
880
881 byte[] previousEndRow = null;
882 int insertAt = removeFrom;
883 for (Map.Entry<byte[], StoreFile> newStripe : newStripes.entrySet()) {
884 if (previousEndRow != null) {
885
886 assert !isOpen(previousEndRow);
887 byte[] startRow = startOf(newStripe.getValue());
888 if (!rowEquals(previousEndRow, startRow)) {
889 throw new IOException("The new stripes produced by "
890 + (isFlush ? "flush" : "compaction") + " are not contiguous");
891 }
892 }
893
894 ArrayList<StoreFile> tmp = new ArrayList<StoreFile>();
895 tmp.add(newStripe.getValue());
896 stripeFiles.add(insertAt, tmp);
897 previousEndRow = newStripe.getKey();
898 if (!isOpen(previousEndRow)) {
899 stripeEndRows.add(insertAt, previousEndRow);
900 }
901 ++insertAt;
902 }
903 }
904 }
905
906 @Override
907 public List<StoreFile> getLevel0Files() {
908 return this.state.level0Files;
909 }
910
911 @Override
912 public List<byte[]> getStripeBoundaries() {
913 if (this.state.stripeFiles.isEmpty()) return new ArrayList<byte[]>();
914 ArrayList<byte[]> result = new ArrayList<byte[]>(this.state.stripeEndRows.length + 2);
915 result.add(OPEN_KEY);
916 for (int i = 0; i < this.state.stripeEndRows.length; ++i) {
917 result.add(this.state.stripeEndRows[i]);
918 }
919 result.add(OPEN_KEY);
920 return result;
921 }
922
923 @Override
924 public ArrayList<ImmutableList<StoreFile>> getStripes() {
925 return this.state.stripeFiles;
926 }
927
928 @Override
929 public int getStripeCount() {
930 return this.state.stripeFiles.size();
931 }
932 }