View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.Iterator;
27  import java.util.LinkedList;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Map.Entry;
31  import java.util.TreeMap;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.classification.InterfaceAudience;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.KeyValue;
39  import org.apache.hadoop.hbase.KeyValue.KVComparator;
40  import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.ConcatenatedLists;
43  import org.apache.hadoop.util.StringUtils;
44  
45  import com.google.common.collect.ImmutableCollection;
46  import com.google.common.collect.ImmutableList;
47  import com.google.common.collect.Lists;
48  
49  
50  /**
51   * Stripe implementation of StoreFileManager.
52   * Not thread safe - relies on external locking (in HStore). Collections that this class
53   * returns are immutable or unique to the call, so they should be safe.
54   * Stripe store splits the key space of the region into non-overlapping stripes, as well as
55   * some recent files that have all the keys (level 0). Each stripe contains a set of files.
56   * When L0 is compacted, it's split into the files corresponding to existing stripe boundaries,
57   * that can thus be added to stripes.
58   * When scan or get happens, it only has to read the files from the corresponding stripes.
59   * See StripeCompationPolicy on how the stripes are determined; this class doesn't care.
60   *
61   * This class should work together with StripeCompactionPolicy and StripeCompactor.
62   * With regard to how they work, we make at least the following (reasonable) assumptions:
63   *  - Compaction produces one file per new stripe (if any); that is easy to change.
64   *  - Compaction has one contiguous set of stripes both in and out, except if L0 is involved.
65   */
66  @InterfaceAudience.Private
67  public class StripeStoreFileManager
68    implements StoreFileManager, StripeCompactionPolicy.StripeInformationProvider {
69    static final Log LOG = LogFactory.getLog(StripeStoreFileManager.class);
70  
71    /**
72     * The file metadata fields that contain the stripe information.
73     */
74    public static final byte[] STRIPE_START_KEY = Bytes.toBytes("STRIPE_START_KEY");
75    public static final byte[] STRIPE_END_KEY = Bytes.toBytes("STRIPE_END_KEY");
76  
77    private final static Bytes.RowEndKeyComparator MAP_COMPARATOR = new Bytes.RowEndKeyComparator();
78  
79    /**
80     * The key value used for range boundary, indicating that the boundary is open (i.e. +-inf).
81     */
82    public final static byte[] OPEN_KEY = HConstants.EMPTY_BYTE_ARRAY;
83    final static byte[] INVALID_KEY = null;
84  
85    /**
86     * The state class. Used solely to replace results atomically during
87     * compactions and avoid complicated error handling.
88     */
89    private static class State {
90      /**
91       * The end rows of each stripe. The last stripe end is always open-ended, so it's not stored
92       * here. It is invariant that the start row of the stripe is the end row of the previous one
93       * (and is an open boundary for the first one).
94       */
95      public byte[][] stripeEndRows = new byte[0][];
96  
97      /**
98       * Files by stripe. Each element of the list corresponds to stripeEndRow element with the
99       * same index, except the last one. Inside each list, the files are in reverse order by
100      * seqNum. Note that the length of this is one higher than that of stripeEndKeys.
101      */
102     public ArrayList<ImmutableList<StoreFile>> stripeFiles
103       = new ArrayList<ImmutableList<StoreFile>>();
104     /** Level 0. The files are in reverse order by seqNum. */
105     public ImmutableList<StoreFile> level0Files = ImmutableList.<StoreFile>of();
106 
107     /** Cached list of all files in the structure, to return from some calls */
108     public ImmutableList<StoreFile> allFilesCached = ImmutableList.<StoreFile>of();
109   }
110   private State state = null;
111 
112   /** Cached file metadata (or overrides as the case may be) */
113   private HashMap<StoreFile, byte[]> fileStarts = new HashMap<StoreFile, byte[]>();
114   private HashMap<StoreFile, byte[]> fileEnds = new HashMap<StoreFile, byte[]>();
115   /** Normally invalid key is null, but in the map null is the result for "no key"; so use
116    * the following constant value in these maps instead. Note that this is a constant and
117    * we use it to compare by reference when we read from the map. */
118   private static final byte[] INVALID_KEY_IN_MAP = new byte[0];
119 
120   private final KVComparator kvComparator;
121   private StripeStoreConfig config;
122 
123   private final int blockingFileCount;
124 
125   public StripeStoreFileManager(
126       KVComparator kvComparator, Configuration conf, StripeStoreConfig config) {
127     this.kvComparator = kvComparator;
128     this.config = config;
129     this.blockingFileCount = conf.getInt(
130         HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
131   }
132 
133   @Override
134   public void loadFiles(List<StoreFile> storeFiles) {
135     loadUnclassifiedStoreFiles(storeFiles);
136   }
137 
138   @Override
139   public Collection<StoreFile> getStorefiles() {
140     return state.allFilesCached;
141   }
142 
143   @Override
144   public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
145     CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
146     cmc.mergeResults(null, sfs);
147     debugDumpState("Added new files");
148   }
149 
150   @Override
151   public ImmutableCollection<StoreFile> clearFiles() {
152     ImmutableCollection<StoreFile> result = state.allFilesCached;
153     this.state = new State();
154     this.fileStarts.clear();
155     this.fileEnds.clear();
156     return result;
157   }
158 
159   @Override
160   public int getStorefileCount() {
161     return state.allFilesCached.size();
162   }
163 
164   /** See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)}
165    * for details on this methods. */
166   @Override
167   public Iterator<StoreFile> getCandidateFilesForRowKeyBefore(final KeyValue targetKey) {
168     KeyBeforeConcatenatedLists result = new KeyBeforeConcatenatedLists();
169     // Order matters for this call.
170     result.addSublist(state.level0Files);
171     if (!state.stripeFiles.isEmpty()) {
172       int lastStripeIndex = findStripeForRow(targetKey.getRow(), false);
173       for (int stripeIndex = lastStripeIndex; stripeIndex >= 0; --stripeIndex) {
174         result.addSublist(state.stripeFiles.get(stripeIndex));
175       }
176     }
177     return result.iterator();
178   }
179 
180   /** See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)} and
181    * {@link StoreFileManager#updateCandidateFilesForRowKeyBefore(Iterator, KeyValue, KeyValue)}
182    * for details on this methods. */
183   @Override
184   public Iterator<StoreFile> updateCandidateFilesForRowKeyBefore(
185       Iterator<StoreFile> candidateFiles, final KeyValue targetKey, final KeyValue candidate) {
186     KeyBeforeConcatenatedLists.Iterator original =
187         (KeyBeforeConcatenatedLists.Iterator)candidateFiles;
188     assert original != null;
189     ArrayList<List<StoreFile>> components = original.getComponents();
190     for (int firstIrrelevant = 0; firstIrrelevant < components.size(); ++firstIrrelevant) {
191       StoreFile sf = components.get(firstIrrelevant).get(0);
192       byte[] endKey = endOf(sf);
193       // Entries are ordered as such: L0, then stripes in reverse order. We never remove
194       // level 0; we remove the stripe, and all subsequent ones, as soon as we find the
195       // first one that cannot possibly have better candidates.
196       if (!isInvalid(endKey) && !isOpen(endKey)
197           && (nonOpenRowCompare(endKey, targetKey.getRow()) <= 0)) {
198         original.removeComponents(firstIrrelevant);
199         break;
200       }
201     }
202     return original;
203   }
204 
205   @Override
206   /**
207    * Override of getSplitPoint that determines the split point as the boundary between two
208    * stripes, unless it causes significant imbalance between split sides' sizes. In that
209    * case, the split boundary will be chosen from the middle of one of the stripes to
210    * minimize imbalance.
211    * @return The split point, or null if no split is possible.
212    */
213   public byte[] getSplitPoint() throws IOException {
214     if (this.getStorefileCount() == 0) return null;
215     if (state.stripeFiles.size() <= 1) {
216       return getSplitPointFromAllFiles();
217     }
218     int leftIndex = -1, rightIndex = state.stripeFiles.size();
219     long leftSize = 0, rightSize = 0;
220     long lastLeftSize = 0, lastRightSize = 0;
221     while (rightIndex - 1 != leftIndex) {
222       if (leftSize >= rightSize) {
223         --rightIndex;
224         lastRightSize = getStripeFilesSize(rightIndex);
225         rightSize += lastRightSize;
226       } else {
227         ++leftIndex;
228         lastLeftSize = getStripeFilesSize(leftIndex);
229         leftSize += lastLeftSize;
230       }
231     }
232     if (leftSize == 0 || rightSize == 0) {
233       String errMsg = String.format("Cannot split on a boundary - left index %d size %d, "
234           + "right index %d size %d", leftIndex, leftSize, rightIndex, rightSize);
235       debugDumpState(errMsg);
236       LOG.warn(errMsg);
237       return getSplitPointFromAllFiles();
238     }
239     double ratio = (double)rightSize / leftSize;
240     if (ratio < 1) {
241       ratio = 1 / ratio;
242     }
243     if (config.getMaxSplitImbalance() > ratio) return state.stripeEndRows[leftIndex];
244 
245     // If the difference between the sides is too large, we could get the proportional key on
246     // the a stripe to equalize the difference, but there's no proportional key method at the
247     // moment, and it's not extremely important.
248     // See if we can achieve better ratio if we split the bigger side in half.
249     boolean isRightLarger = rightSize >= leftSize;
250     double newRatio = isRightLarger
251         ? getMidStripeSplitRatio(leftSize, rightSize, lastRightSize)
252         : getMidStripeSplitRatio(rightSize, leftSize, lastLeftSize);
253     if (newRatio < 1) {
254       newRatio = 1 / newRatio;
255     }
256     if (newRatio >= ratio)  return state.stripeEndRows[leftIndex];
257     LOG.debug("Splitting the stripe - ratio w/o split " + ratio + ", ratio with split "
258         + newRatio + " configured ratio " + config.getMaxSplitImbalance());
259     // Ok, we may get better ratio, get it.
260     return StoreUtils.getLargestFile(state.stripeFiles.get(
261         isRightLarger ? rightIndex : leftIndex)).getFileSplitPoint(this.kvComparator);
262   }
263 
264   private byte[] getSplitPointFromAllFiles() throws IOException {
265     ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
266     sfs.addSublist(state.level0Files);
267     sfs.addAllSublists(state.stripeFiles);
268     if (sfs.isEmpty()) return null;
269     return StoreUtils.getLargestFile(sfs).getFileSplitPoint(this.kvComparator);
270   }
271 
272   private double getMidStripeSplitRatio(long smallerSize, long largerSize, long lastLargerSize) {
273     return (double)(largerSize - lastLargerSize / 2f) / (smallerSize + lastLargerSize / 2f);
274   }
275 
276   @Override
277   public Collection<StoreFile> getFilesForScanOrGet(
278       boolean isGet, byte[] startRow, byte[] stopRow) {
279     if (state.stripeFiles.isEmpty()) {
280       return state.level0Files; // There's just L0.
281     }
282 
283     int firstStripe = findStripeForRow(startRow, true);
284     int lastStripe = findStripeForRow(stopRow, false);
285     assert firstStripe <= lastStripe;
286     if (firstStripe == lastStripe && state.level0Files.isEmpty()) {
287       return state.stripeFiles.get(firstStripe); // There's just one stripe we need.
288     }
289     if (firstStripe == 0 && lastStripe == (state.stripeFiles.size() - 1)) {
290       return state.allFilesCached; // We need to read all files.
291     }
292 
293     ConcatenatedLists<StoreFile> result = new ConcatenatedLists<StoreFile>();
294     result.addAllSublists(state.stripeFiles.subList(firstStripe, lastStripe + 1));
295     result.addSublist(state.level0Files);
296     return result;
297   }
298 
299   @Override
300   public void addCompactionResults(
301     Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException {
302     // See class comment for the assumptions we make here.
303     LOG.debug("Attempting to merge compaction results: " + compactedFiles.size()
304         + " files replaced by " + results.size());
305     // In order to be able to fail in the middle of the operation, we'll operate on lazy
306     // copies and apply the result at the end.
307     CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
308     cmc.mergeResults(compactedFiles, results);
309     debugDumpState("Merged compaction results");
310   }
311 
312   @Override
313   public int getStoreCompactionPriority() {
314     // If there's only L0, do what the default store does.
315     // If we are in critical priority, do the same - we don't want to trump all stores all
316     // the time due to how many files we have.
317     int fc = getStorefileCount();
318     if (state.stripeFiles.isEmpty() || (this.blockingFileCount <= fc)) {
319       return this.blockingFileCount - fc;
320     }
321     // If we are in good shape, we don't want to be trumped by all other stores due to how
322     // many files we have, so do an approximate mapping to normal priority range; L0 counts
323     // for all stripes.
324     int l0 = state.level0Files.size(), sc = state.stripeFiles.size();
325     int priority = (int)Math.ceil(((double)(this.blockingFileCount - fc + l0) / sc) - l0);
326     return (priority <= HStore.PRIORITY_USER) ? (HStore.PRIORITY_USER + 1) : priority;
327   }
328 
329   /**
330    * Gets the total size of all files in the stripe.
331    * @param stripeIndex Stripe index.
332    * @return Size.
333    */
334   private long getStripeFilesSize(int stripeIndex) {
335     long result = 0;
336     for (StoreFile sf : state.stripeFiles.get(stripeIndex)) {
337       result += sf.getReader().length();
338     }
339     return result;
340   }
341 
342   /**
343    * Loads initial store files that were picked up from some physical location pertaining to
344    * this store (presumably). Unlike adding files after compaction, assumes empty initial
345    * sets, and is forgiving with regard to stripe constraints - at worst, many/all files will
346    * go to level 0.
347    * @param storeFiles Store files to add.
348    */
349   private void loadUnclassifiedStoreFiles(List<StoreFile> storeFiles) {
350     LOG.debug("Attempting to load " + storeFiles.size() + " store files.");
351     TreeMap<byte[], ArrayList<StoreFile>> candidateStripes =
352         new TreeMap<byte[], ArrayList<StoreFile>>(MAP_COMPARATOR);
353     ArrayList<StoreFile> level0Files = new ArrayList<StoreFile>();
354     // Separate the files into tentative stripes; then validate. Currently, we rely on metadata.
355     // If needed, we could dynamically determine the stripes in future.
356     for (StoreFile sf : storeFiles) {
357       byte[] startRow = startOf(sf), endRow = endOf(sf);
358       // Validate the range and put the files into place.
359       if (isInvalid(startRow) || isInvalid(endRow)) {
360         insertFileIntoStripe(level0Files, sf); // No metadata - goes to L0.
361         ensureLevel0Metadata(sf);
362       } else if (!isOpen(startRow) && !isOpen(endRow) &&
363           nonOpenRowCompare(startRow, endRow) >= 0) {
364         LOG.error("Unexpected metadata - start row [" + Bytes.toString(startRow) + "], end row ["
365           + Bytes.toString(endRow) + "] in file [" + sf.getPath() + "], pushing to L0");
366         insertFileIntoStripe(level0Files, sf); // Bad metadata - goes to L0 also.
367         ensureLevel0Metadata(sf);
368       } else {
369         ArrayList<StoreFile> stripe = candidateStripes.get(endRow);
370         if (stripe == null) {
371           stripe = new ArrayList<StoreFile>();
372           candidateStripes.put(endRow, stripe);
373         }
374         insertFileIntoStripe(stripe, sf);
375       }
376     }
377     // Possible improvement - for variable-count stripes, if all the files are in L0, we can
378     // instead create single, open-ended stripe with all files.
379 
380     boolean hasOverlaps = false;
381     byte[] expectedStartRow = null; // first stripe can start wherever
382     Iterator<Map.Entry<byte[], ArrayList<StoreFile>>> entryIter =
383         candidateStripes.entrySet().iterator();
384     while (entryIter.hasNext()) {
385       Map.Entry<byte[], ArrayList<StoreFile>> entry = entryIter.next();
386       ArrayList<StoreFile> files = entry.getValue();
387       // Validate the file start rows, and remove the bad ones to level 0.
388       for (int i = 0; i < files.size(); ++i) {
389         StoreFile sf = files.get(i);
390         byte[] startRow = startOf(sf);
391         if (expectedStartRow == null) {
392           expectedStartRow = startRow; // ensure that first stripe is still consistent
393         } else if (!rowEquals(expectedStartRow, startRow)) {
394           hasOverlaps = true;
395           LOG.warn("Store file doesn't fit into the tentative stripes - expected to start at ["
396               + Bytes.toString(expectedStartRow) + "], but starts at [" + Bytes.toString(startRow)
397               + "], to L0 it goes");
398           StoreFile badSf = files.remove(i);
399           insertFileIntoStripe(level0Files, badSf);
400           ensureLevel0Metadata(badSf);
401           --i;
402         }
403       }
404       // Check if any files from the candidate stripe are valid. If so, add a stripe.
405       byte[] endRow = entry.getKey();
406       if (!files.isEmpty()) {
407         expectedStartRow = endRow; // Next stripe must start exactly at that key.
408       } else {
409         entryIter.remove();
410       }
411     }
412 
413     // In the end, there must be open ends on two sides. If not, and there were no errors i.e.
414     // files are consistent, they might be coming from a split. We will treat the boundaries
415     // as open keys anyway, and log the message.
416     // If there were errors, we'll play it safe and dump everything into L0.
417     if (!candidateStripes.isEmpty()) {
418       StoreFile firstFile = candidateStripes.firstEntry().getValue().get(0);
419       boolean isOpen = isOpen(startOf(firstFile)) && isOpen(candidateStripes.lastKey());
420       if (!isOpen) {
421         LOG.warn("The range of the loaded files does not cover full key space: from ["
422             + Bytes.toString(startOf(firstFile)) + "], to ["
423             + Bytes.toString(candidateStripes.lastKey()) + "]");
424         if (!hasOverlaps) {
425           ensureEdgeStripeMetadata(candidateStripes.firstEntry().getValue(), true);
426           ensureEdgeStripeMetadata(candidateStripes.lastEntry().getValue(), false);
427         } else {
428           LOG.warn("Inconsistent files, everything goes to L0.");
429           for (ArrayList<StoreFile> files : candidateStripes.values()) {
430             for (StoreFile sf : files) {
431               insertFileIntoStripe(level0Files, sf);
432               ensureLevel0Metadata(sf);
433             }
434           }
435           candidateStripes.clear();
436         }
437       }
438     }
439 
440     // Copy the results into the fields.
441     State state = new State();
442     state.level0Files = ImmutableList.copyOf(level0Files);
443     state.stripeFiles = new ArrayList<ImmutableList<StoreFile>>(candidateStripes.size());
444     state.stripeEndRows = new byte[Math.max(0, candidateStripes.size() - 1)][];
445     ArrayList<StoreFile> newAllFiles = new ArrayList<StoreFile>(level0Files);
446     int i = candidateStripes.size() - 1;
447     for (Map.Entry<byte[], ArrayList<StoreFile>> entry : candidateStripes.entrySet()) {
448       state.stripeFiles.add(ImmutableList.copyOf(entry.getValue()));
449       newAllFiles.addAll(entry.getValue());
450       if (i > 0) {
451         state.stripeEndRows[state.stripeFiles.size() - 1] = entry.getKey();
452       }
453       --i;
454     }
455     state.allFilesCached = ImmutableList.copyOf(newAllFiles);
456     this.state = state;
457     debugDumpState("Files loaded");
458   }
459 
460   private void ensureEdgeStripeMetadata(ArrayList<StoreFile> stripe, boolean isFirst) {
461     HashMap<StoreFile, byte[]> targetMap = isFirst ? fileStarts : fileEnds;
462     for (StoreFile sf : stripe) {
463       targetMap.put(sf, OPEN_KEY);
464     }
465   }
466 
467   private void ensureLevel0Metadata(StoreFile sf) {
468     if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, INVALID_KEY_IN_MAP);
469     if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, INVALID_KEY_IN_MAP);
470   }
471 
472   private void debugDumpState(String string) {
473     if (!LOG.isDebugEnabled()) return;
474     StringBuilder sb = new StringBuilder();
475     sb.append("\n" + string + "; current stripe state is as such:");
476     sb.append("\n level 0 with ").append(state.level0Files.size())
477         .append(
478           " files: "
479               + StringUtils.humanReadableInt(StripeCompactionPolicy
480                   .getTotalFileSize(state.level0Files)) + ";");
481     for (int i = 0; i < state.stripeFiles.size(); ++i) {
482       String endRow = (i == state.stripeEndRows.length)
483           ? "(end)" : "[" + Bytes.toString(state.stripeEndRows[i]) + "]";
484       sb.append("\n stripe ending in ").append(endRow).append(" with ")
485           .append(state.stripeFiles.get(i).size())
486           .append(
487             " files: "
488                 + StringUtils.humanReadableInt(StripeCompactionPolicy
489                     .getTotalFileSize(state.stripeFiles.get(i))) + ";");
490     }
491     sb.append("\n").append(state.stripeFiles.size()).append(" stripes total.");
492     sb.append("\n").append(getStorefileCount()).append(" files total.");
493     LOG.debug(sb.toString());
494   }
495 
496   /**
497    * Checks whether the key indicates an open interval boundary (i.e. infinity).
498    */
499   private static final boolean isOpen(byte[] key) {
500     return key != null && key.length == 0;
501   }
502 
503   /**
504    * Checks whether the key is invalid (e.g. from an L0 file, or non-stripe-compacted files).
505    */
506   private static final boolean isInvalid(byte[] key) {
507     return key == INVALID_KEY;
508   }
509 
510   /**
511    * Compare two keys for equality.
512    */
513   private final boolean rowEquals(byte[] k1, byte[] k2) {
514     return kvComparator.matchingRows(k1, 0, k1.length, k2, 0, k2.length);
515   }
516 
517   /**
518    * Compare two keys. Keys must not be open (isOpen(row) == false).
519    */
520   private final int nonOpenRowCompare(byte[] k1, byte[] k2) {
521     assert !isOpen(k1) && !isOpen(k2);
522     return kvComparator.compareRows(k1, 0, k1.length, k2, 0, k2.length);
523   }
524 
525   /**
526    * Finds the stripe index by end row.
527    */
528   private final int findStripeIndexByEndRow(byte[] endRow) {
529     assert !isInvalid(endRow);
530     if (isOpen(endRow)) return state.stripeEndRows.length;
531     return Arrays.binarySearch(state.stripeEndRows, endRow, Bytes.BYTES_COMPARATOR);
532   }
533 
534   /**
535    * Finds the stripe index for the stripe containing a row provided externally for get/scan.
536    */
537   private final int findStripeForRow(byte[] row, boolean isStart) {
538     if (isStart && row == HConstants.EMPTY_START_ROW) return 0;
539     if (!isStart && row == HConstants.EMPTY_END_ROW) return state.stripeFiles.size() - 1;
540     // If there's an exact match below, a stripe ends at "row". Stripe right boundary is
541     // exclusive, so that means the row is in the next stripe; thus, we need to add one to index.
542     // If there's no match, the return value of binarySearch is (-(insertion point) - 1), where
543     // insertion point is the index of the next greater element, or list size if none. The
544     // insertion point happens to be exactly what we need, so we need to add one to the result.
545     return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1);
546   }
547 
548   @Override
549   public final byte[] getStartRow(int stripeIndex) {
550     return (stripeIndex == 0  ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]);
551   }
552 
553   @Override
554   public final byte[] getEndRow(int stripeIndex) {
555     return (stripeIndex == state.stripeEndRows.length
556         ? OPEN_KEY : state.stripeEndRows[stripeIndex]);
557   }
558 
559 
560   private byte[] startOf(StoreFile sf) {
561     byte[] result = this.fileStarts.get(sf);
562     return result == null ? sf.getMetadataValue(STRIPE_START_KEY)
563         : (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
564   }
565 
566   private byte[] endOf(StoreFile sf) {
567     byte[] result = this.fileEnds.get(sf);
568     return result == null ? sf.getMetadataValue(STRIPE_END_KEY)
569         : (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
570   }
571 
572   /**
573    * Inserts a file in the correct place (by seqnum) in a stripe copy.
574    * @param stripe Stripe copy to insert into.
575    * @param sf File to insert.
576    */
577   private static void insertFileIntoStripe(ArrayList<StoreFile> stripe, StoreFile sf) {
578     // The only operation for which sorting of the files matters is KeyBefore. Therefore,
579     // we will store the file in reverse order by seqNum from the outset.
580     for (int insertBefore = 0; ; ++insertBefore) {
581       if (insertBefore == stripe.size()
582           || (StoreFile.Comparators.SEQ_ID.compare(sf, stripe.get(insertBefore)) >= 0)) {
583         stripe.add(insertBefore, sf);
584         break;
585       }
586     }
587   }
588 
589   /**
590    * An extension of ConcatenatedLists that has several peculiar properties.
591    * First, one can cut the tail of the logical list by removing last several sub-lists.
592    * Second, items can be removed thru iterator.
593    * Third, if the sub-lists are immutable, they are replaced with mutable copies when needed.
594    * On average KeyBefore operation will contain half the stripes as potential candidates,
595    * but will quickly cut down on them as it finds something in the more likely ones; thus,
596    * the above allow us to avoid unnecessary copying of a bunch of lists.
597    */
598   private static class KeyBeforeConcatenatedLists extends ConcatenatedLists<StoreFile> {
599     @Override
600     public java.util.Iterator<StoreFile> iterator() {
601       return new Iterator();
602     }
603 
604     public class Iterator extends ConcatenatedLists<StoreFile>.Iterator {
605       public ArrayList<List<StoreFile>> getComponents() {
606         return components;
607       }
608 
609       public void removeComponents(int startIndex) {
610         List<List<StoreFile>> subList = components.subList(startIndex, components.size());
611         for (List<StoreFile> entry : subList) {
612           size -= entry.size();
613         }
614         assert size >= 0;
615         subList.clear();
616       }
617 
618       @Override
619       public void remove() {
620         if (!this.nextWasCalled) {
621           throw new IllegalStateException("No element to remove");
622         }
623         this.nextWasCalled = false;
624         List<StoreFile> src = components.get(currentComponent);
625         if (src instanceof ImmutableList<?>) {
626           src = new ArrayList<StoreFile>(src);
627           components.set(currentComponent, src);
628         }
629         src.remove(indexWithinComponent);
630         --size;
631         --indexWithinComponent;
632         if (src.isEmpty()) {
633           components.remove(currentComponent); // indexWithinComponent is already -1 here.
634         }
635       }
636     }
637   }
638 
639   /**
640    * Non-static helper class for merging compaction or flush results.
641    * Since we want to merge them atomically (more or less), it operates on lazy copies,
642    * then creates a new state object and puts it in place.
643    */
644   private class CompactionOrFlushMergeCopy {
645     private ArrayList<List<StoreFile>> stripeFiles = null;
646     private ArrayList<StoreFile> level0Files = null;
647     private ArrayList<byte[]> stripeEndRows = null;
648 
649     private Collection<StoreFile> compactedFiles = null;
650     private Collection<StoreFile> results = null;
651 
652     private List<StoreFile> l0Results = new ArrayList<StoreFile>();
653     private final boolean isFlush;
654 
655     public CompactionOrFlushMergeCopy(boolean isFlush) {
656       // Create a lazy mutable copy (other fields are so lazy they start out as nulls).
657       this.stripeFiles = new ArrayList<List<StoreFile>>(
658           StripeStoreFileManager.this.state.stripeFiles);
659       this.isFlush = isFlush;
660     }
661 
662     public void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
663         throws IOException {
664       assert this.compactedFiles == null && this.results == null;
665       this.compactedFiles = compactedFiles;
666       this.results = results;
667       // Do logical processing.
668       if (!isFlush) removeCompactedFiles();
669       TreeMap<byte[], StoreFile> newStripes = processResults();
670       if (newStripes != null) {
671         processNewCandidateStripes(newStripes);
672       }
673       // Create new state and update parent.
674       State state = createNewState();
675       StripeStoreFileManager.this.state = state;
676       updateMetadataMaps();
677     }
678 
679     private State createNewState() {
680       State oldState = StripeStoreFileManager.this.state;
681       // Stripe count should be the same unless the end rows changed.
682       assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
683       State newState = new State();
684       newState.level0Files = (this.level0Files == null) ? oldState.level0Files
685           : ImmutableList.copyOf(this.level0Files);
686       newState.stripeEndRows = (this.stripeEndRows == null) ? oldState.stripeEndRows
687           : this.stripeEndRows.toArray(new byte[this.stripeEndRows.size()][]);
688       newState.stripeFiles = new ArrayList<ImmutableList<StoreFile>>(this.stripeFiles.size());
689       for (List<StoreFile> newStripe : this.stripeFiles) {
690         newState.stripeFiles.add(newStripe instanceof ImmutableList<?>
691             ? (ImmutableList<StoreFile>)newStripe : ImmutableList.copyOf(newStripe));
692       }
693 
694       List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached);
695       if (!isFlush) newAllFiles.removeAll(compactedFiles);
696       newAllFiles.addAll(results);
697       newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
698       return newState;
699     }
700 
701     private void updateMetadataMaps() {
702       StripeStoreFileManager parent = StripeStoreFileManager.this;
703       if (!isFlush) {
704         for (StoreFile sf : this.compactedFiles) {
705           parent.fileStarts.remove(sf);
706           parent.fileEnds.remove(sf);
707         }
708       }
709       if (this.l0Results != null) {
710         for (StoreFile sf : this.l0Results) {
711           parent.ensureLevel0Metadata(sf);
712         }
713       }
714     }
715 
716     /**
717      * @param index Index of the stripe we need.
718      * @return A lazy stripe copy from current stripes.
719      */
720     private final ArrayList<StoreFile> getStripeCopy(int index) {
721       List<StoreFile> stripeCopy = this.stripeFiles.get(index);
722       ArrayList<StoreFile> result = null;
723       if (stripeCopy instanceof ImmutableList<?>) {
724         result = new ArrayList<StoreFile>(stripeCopy);
725         this.stripeFiles.set(index, result);
726       } else {
727         result = (ArrayList<StoreFile>)stripeCopy;
728       }
729       return result;
730     }
731 
732     /**
733      * @return A lazy L0 copy from current state.
734      */
735     private final ArrayList<StoreFile> getLevel0Copy() {
736       if (this.level0Files == null) {
737         this.level0Files = new ArrayList<StoreFile>(StripeStoreFileManager.this.state.level0Files);
738       }
739       return this.level0Files;
740     }
741 
742     /**
743      * Process new files, and add them either to the structure of existing stripes,
744      * or to the list of new candidate stripes.
745      * @return New candidate stripes.
746      */
747     private TreeMap<byte[], StoreFile> processResults() throws IOException {
748       TreeMap<byte[], StoreFile> newStripes = null;
749       for (StoreFile sf : this.results) {
750         byte[] startRow = startOf(sf), endRow = endOf(sf);
751         if (isInvalid(endRow) || isInvalid(startRow)) {
752           if (!isFlush) {
753             LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath());
754           }
755           insertFileIntoStripe(getLevel0Copy(), sf);
756           this.l0Results.add(sf);
757           continue;
758         }
759         if (!this.stripeFiles.isEmpty()) {
760           int stripeIndex = findStripeIndexByEndRow(endRow);
761           if ((stripeIndex >= 0) && rowEquals(getStartRow(stripeIndex), startRow)) {
762             // Simple/common case - add file to an existing stripe.
763             insertFileIntoStripe(getStripeCopy(stripeIndex), sf);
764             continue;
765           }
766         }
767 
768         // Make a new candidate stripe.
769         if (newStripes == null) {
770           newStripes = new TreeMap<byte[], StoreFile>(MAP_COMPARATOR);
771         }
772         StoreFile oldSf = newStripes.put(endRow, sf);
773         if (oldSf != null) {
774           throw new IOException("Compactor has produced multiple files for the stripe ending in ["
775               + Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath());
776         }
777       }
778       return newStripes;
779     }
780 
781     /**
782      * Remove compacted files.
783      * @param compactedFiles Compacted files.
784      */
785     private void removeCompactedFiles() throws IOException {
786       for (StoreFile oldFile : this.compactedFiles) {
787         byte[] oldEndRow = endOf(oldFile);
788         List<StoreFile> source = null;
789         if (isInvalid(oldEndRow)) {
790           source = getLevel0Copy();
791         } else {
792           int stripeIndex = findStripeIndexByEndRow(oldEndRow);
793           if (stripeIndex < 0) {
794             throw new IOException("An allegedly compacted file [" + oldFile + "] does not belong"
795                 + " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])");
796           }
797           source = getStripeCopy(stripeIndex);
798         }
799         if (!source.remove(oldFile)) {
800           throw new IOException("An allegedly compacted file [" + oldFile + "] was not found");
801         }
802       }
803     }
804 
805     /**
806      * See {@link #addCompactionResults(Collection, Collection)} - updates the stripe list with
807      * new candidate stripes/removes old stripes; produces new set of stripe end rows.
808      * @param newStripes  New stripes - files by end row.
809      */
810     private void processNewCandidateStripes(
811         TreeMap<byte[], StoreFile> newStripes) throws IOException {
812       // Validate that the removed and added aggregate ranges still make for a full key space.
813       boolean hasStripes = !this.stripeFiles.isEmpty();
814       this.stripeEndRows = new ArrayList<byte[]>(
815           Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows));
816       int removeFrom = 0;
817       byte[] firstStartRow = startOf(newStripes.firstEntry().getValue());
818       byte[] lastEndRow = newStripes.lastKey();
819       if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) {
820         throw new IOException("Newly created stripes do not cover the entire key space.");
821       }
822 
823       boolean canAddNewStripes = true;
824       Collection<StoreFile> filesForL0 = null;
825       if (hasStripes) {
826         // Determine which stripes will need to be removed because they conflict with new stripes.
827         // The new boundaries should match old stripe boundaries, so we should get exact matches.
828         if (isOpen(firstStartRow)) {
829           removeFrom = 0;
830         } else {
831           removeFrom = findStripeIndexByEndRow(firstStartRow);
832           if (removeFrom < 0) throw new IOException("Compaction is trying to add a bad range.");
833           ++removeFrom;
834         }
835         int removeTo = findStripeIndexByEndRow(lastEndRow);
836         if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range.");
837         // See if there are files in the stripes we are trying to replace.
838         ArrayList<StoreFile> conflictingFiles = new ArrayList<StoreFile>();
839         for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
840           conflictingFiles.addAll(this.stripeFiles.get(removeIndex));
841         }
842         if (!conflictingFiles.isEmpty()) {
843           // This can be caused by two things - concurrent flush into stripes, or a bug.
844           // Unfortunately, we cannot tell them apart without looking at timing or something
845           // like that. We will assume we are dealing with a flush and dump it into L0.
846           if (isFlush) {
847             long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values());
848             LOG.warn("Stripes were created by a flush, but results of size " + newSize
849                 + " cannot be added because the stripes have changed");
850             canAddNewStripes = false;
851             filesForL0 = newStripes.values();
852           } else {
853             long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles);
854             LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) "
855                 + " of size " + oldSize + " are moved to L0 due to concurrent stripe change");
856             filesForL0 = conflictingFiles;
857           }
858           if (filesForL0 != null) {
859             for (StoreFile sf : filesForL0) {
860               insertFileIntoStripe(getLevel0Copy(), sf);
861             }
862             l0Results.addAll(filesForL0);
863           }
864         }
865 
866         if (canAddNewStripes) {
867           // Remove old empty stripes.
868           int originalCount = this.stripeFiles.size();
869           for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
870             if (removeIndex != originalCount - 1) {
871               this.stripeEndRows.remove(removeIndex);
872             }
873             this.stripeFiles.remove(removeIndex);
874           }
875         }
876       }
877 
878       if (!canAddNewStripes) return; // Files were already put into L0.
879 
880       // Now, insert new stripes. The total ranges match, so we can insert where we removed.
881       byte[] previousEndRow = null;
882       int insertAt = removeFrom;
883       for (Map.Entry<byte[], StoreFile> newStripe : newStripes.entrySet()) {
884         if (previousEndRow != null) {
885           // Validate that the ranges are contiguous.
886           assert !isOpen(previousEndRow);
887           byte[] startRow = startOf(newStripe.getValue());
888           if (!rowEquals(previousEndRow, startRow)) {
889             throw new IOException("The new stripes produced by "
890                 + (isFlush ? "flush" : "compaction") + " are not contiguous");
891           }
892         }
893         // Add the new stripe.
894         ArrayList<StoreFile> tmp = new ArrayList<StoreFile>();
895         tmp.add(newStripe.getValue());
896         stripeFiles.add(insertAt, tmp);
897         previousEndRow = newStripe.getKey();
898         if (!isOpen(previousEndRow)) {
899           stripeEndRows.add(insertAt, previousEndRow);
900         }
901         ++insertAt;
902       }
903     }
904   }
905 
906   @Override
907   public List<StoreFile> getLevel0Files() {
908     return this.state.level0Files;
909   }
910 
911   @Override
912   public List<byte[]> getStripeBoundaries() {
913     if (this.state.stripeFiles.isEmpty()) return new ArrayList<byte[]>();
914     ArrayList<byte[]> result = new ArrayList<byte[]>(this.state.stripeEndRows.length + 2);
915     result.add(OPEN_KEY);
916     for (int i = 0; i < this.state.stripeEndRows.length; ++i) {
917       result.add(this.state.stripeEndRows[i]);
918     }
919     result.add(OPEN_KEY);
920     return result;
921   }
922 
923   @Override
924   public ArrayList<ImmutableList<StoreFile>> getStripes() {
925     return this.state.stripeFiles;
926   }
927 
928   @Override
929   public int getStripeCount() {
930     return this.state.stripeFiles.size();
931   }
932 }