View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.lang.management.ManagementFactory;
23  import java.lang.management.RuntimeMXBean;
24  import java.rmi.UnexpectedException;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.NavigableSet;
30  import java.util.SortedSet;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.classification.InterfaceAudience;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.HBaseConfiguration;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.KeyValueUtil;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.io.HeapSize;
44  import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.ClassSize;
47  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48  
49  /**
50   * The MemStore holds in-memory modifications to the Store.  Modifications
51   * are {@link KeyValue}s.  When asked to flush, current memstore is moved
52   * to snapshot and is cleared.  We continue to serve edits out of new memstore
53   * and backing snapshot until flusher reports in that the flush succeeded. At
54   * this point we let the snapshot go.
55   *  <p>
56   * The MemStore functions should not be called in parallel. Callers should hold
57   *  write and read locks. This is done in {@link HStore}.
58   *  </p>
59   *
60   * TODO: Adjust size of the memstore when we remove items because they have
61   * been deleted.
62   * TODO: With new KVSLS, need to make sure we update HeapSize with difference
63   * in KV size.
64   */
65  @InterfaceAudience.Private
66  public class MemStore implements HeapSize {
67    private static final Log LOG = LogFactory.getLog(MemStore.class);
68  
69    static final String USEMSLAB_KEY =
70      "hbase.hregion.memstore.mslab.enabled";
71    private static final boolean USEMSLAB_DEFAULT = true;
72  
73    private Configuration conf;
74  
75    // MemStore.  Use a KeyValueSkipListSet rather than SkipListSet because of the
76    // better semantics.  The Map will overwrite if passed a key it already had
77    // whereas the Set will not add new KV if key is same though value might be
78    // different.  Value is not important -- just make sure always same
79    // reference passed.
80    volatile KeyValueSkipListSet kvset;
81  
82    // Snapshot of memstore.  Made for flusher.
83    volatile KeyValueSkipListSet snapshot;
84  
85    final KeyValue.KVComparator comparator;
86  
87    // Used to track own heapSize
88    final AtomicLong size;
89    private volatile long snapshotSize;
90  
91    // Used to track when to flush
92    volatile long timeOfOldestEdit = Long.MAX_VALUE;
93  
94    TimeRangeTracker timeRangeTracker;
95    TimeRangeTracker snapshotTimeRangeTracker;
96  
97    MemStoreChunkPool chunkPool;
98    volatile MemStoreLAB allocator;
99    volatile MemStoreLAB snapshotAllocator;
100 
101   /**
102    * Default constructor. Used for tests.
103    */
104   public MemStore() {
105     this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
106   }
107 
108   /**
109    * Constructor.
110    * @param c Comparator
111    */
112   public MemStore(final Configuration conf,
113                   final KeyValue.KVComparator c) {
114     this.conf = conf;
115     this.comparator = c;
116     this.kvset = new KeyValueSkipListSet(c);
117     this.snapshot = new KeyValueSkipListSet(c);
118     timeRangeTracker = new TimeRangeTracker();
119     snapshotTimeRangeTracker = new TimeRangeTracker();
120     this.size = new AtomicLong(DEEP_OVERHEAD);
121     this.snapshotSize = 0;
122     if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
123       this.chunkPool = MemStoreChunkPool.getPool(conf);
124       this.allocator = new MemStoreLAB(conf, chunkPool);
125     } else {
126       this.allocator = null;
127       this.chunkPool = null;
128     }
129   }
130 
131   void dump() {
132     for (KeyValue kv: this.kvset) {
133       LOG.info(kv);
134     }
135     for (KeyValue kv: this.snapshot) {
136       LOG.info(kv);
137     }
138   }
139 
140   /**
141    * Creates a snapshot of the current memstore.
142    * Snapshot must be cleared by call to {@link #clearSnapshot(SortedSet)}
143    * To get the snapshot made by this method, use {@link #getSnapshot()}
144    */
145   void snapshot() {
146     // If snapshot currently has entries, then flusher failed or didn't call
147     // cleanup.  Log a warning.
148     if (!this.snapshot.isEmpty()) {
149       LOG.warn("Snapshot called again without clearing previous. " +
150           "Doing nothing. Another ongoing flush or did we fail last attempt?");
151     } else {
152       if (!this.kvset.isEmpty()) {
153         this.snapshotSize = keySize();
154         this.snapshot = this.kvset;
155         this.kvset = new KeyValueSkipListSet(this.comparator);
156         this.snapshotTimeRangeTracker = this.timeRangeTracker;
157         this.timeRangeTracker = new TimeRangeTracker();
158         // Reset heap to not include any keys
159         this.size.set(DEEP_OVERHEAD);
160         this.snapshotAllocator = this.allocator;
161         // Reset allocator so we get a fresh buffer for the new memstore
162         if (allocator != null) {
163           this.allocator = new MemStoreLAB(conf, chunkPool);
164         }
165         timeOfOldestEdit = Long.MAX_VALUE;
166       }
167     }
168   }
169 
170   /**
171    * Return the current snapshot.
172    * Called by flusher to get current snapshot made by a previous
173    * call to {@link #snapshot()}
174    * @return Return snapshot.
175    * @see #snapshot()
176    * @see #clearSnapshot(SortedSet)
177    */
178   KeyValueSkipListSet getSnapshot() {
179     return this.snapshot;
180   }
181 
182   /**
183    * On flush, how much memory we will clear.
184    * Flush will first clear out the data in snapshot if any (It will take a second flush
185    * invocation to clear the current Cell set). If snapshot is empty, current
186    * Cell set will be flushed.
187    *
188    * @return size of data that is going to be flushed
189    */
190   long getFlushableSize() {
191     return this.snapshotSize > 0 ? this.snapshotSize : keySize();
192   }
193 
194   /**
195    * The passed snapshot was successfully persisted; it can be let go.
196    * @param ss The snapshot to clean out.
197    * @throws UnexpectedException
198    * @see #snapshot()
199    */
200   void clearSnapshot(final SortedSet<KeyValue> ss)
201   throws UnexpectedException {
202     MemStoreLAB tmpAllocator = null;
203     if (this.snapshot != ss) {
204       throw new UnexpectedException("Current snapshot is " +
205           this.snapshot + ", was passed " + ss);
206     }
207     // OK. Passed in snapshot is same as current snapshot.  If not-empty,
208     // create a new snapshot and let the old one go.
209     if (!ss.isEmpty()) {
210       this.snapshot = new KeyValueSkipListSet(this.comparator);
211       this.snapshotTimeRangeTracker = new TimeRangeTracker();
212     }
213     this.snapshotSize = 0;
214     if (this.snapshotAllocator != null) {
215       tmpAllocator = this.snapshotAllocator;
216       this.snapshotAllocator = null;
217     }
218     if (tmpAllocator != null) {
219       tmpAllocator.close();
220     }
221   }
222 
223   /**
224    * Write an update
225    * @param kv
226    * @return approximate size of the passed key and value.
227    */
228   long add(final KeyValue kv) {
229     KeyValue toAdd = maybeCloneWithAllocator(kv);
230     return internalAdd(toAdd);
231   }
232 
233   long timeOfOldestEdit() {
234     return timeOfOldestEdit;
235   }
236 
237   private boolean addToKVSet(KeyValue e) {
238     boolean b = this.kvset.add(e);
239     setOldestEditTimeToNow();
240     return b;
241   }
242 
243   private boolean removeFromKVSet(KeyValue e) {
244     boolean b = this.kvset.remove(e);
245     setOldestEditTimeToNow();
246     return b;
247   }
248 
249   void setOldestEditTimeToNow() {
250     if (timeOfOldestEdit == Long.MAX_VALUE) {
251       timeOfOldestEdit = EnvironmentEdgeManager.currentTimeMillis();
252     }
253   }
254 
255   /**
256    * Internal version of add() that doesn't clone KVs with the
257    * allocator, and doesn't take the lock.
258    *
259    * Callers should ensure they already have the read lock taken
260    */
261   private long internalAdd(final KeyValue toAdd) {
262     long s = heapSizeChange(toAdd, addToKVSet(toAdd));
263     timeRangeTracker.includeTimestamp(toAdd);
264     this.size.addAndGet(s);
265     return s;
266   }
267 
268   private KeyValue maybeCloneWithAllocator(KeyValue kv) {
269     if (allocator == null) {
270       return kv;
271     }
272 
273     int len = kv.getLength();
274     Allocation alloc = allocator.allocateBytes(len);
275     if (alloc == null) {
276       // The allocation was too large, allocator decided
277       // not to do anything with it.
278       return kv;
279     }
280     assert alloc.getData() != null;
281     System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
282     KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
283     newKv.setMvccVersion(kv.getMvccVersion());
284     return newKv;
285   }
286 
287   /**
288    * Remove n key from the memstore. Only kvs that have the same key and the
289    * same memstoreTS are removed.  It is ok to not update timeRangeTracker
290    * in this call. It is possible that we can optimize this method by using
291    * tailMap/iterator, but since this method is called rarely (only for
292    * error recovery), we can leave those optimization for the future.
293    * @param kv
294    */
295   void rollback(final KeyValue kv) {
296     // If the key is in the snapshot, delete it. We should not update
297     // this.size, because that tracks the size of only the memstore and
298     // not the snapshot. The flush of this snapshot to disk has not
299     // yet started because Store.flush() waits for all rwcc transactions to
300     // commit before starting the flush to disk.
301     KeyValue found = this.snapshot.get(kv);
302     if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
303       this.snapshot.remove(kv);
304       long sz = heapSizeChange(kv, true);
305       this.snapshotSize -= sz;
306     }
307     // If the key is in the memstore, delete it. Update this.size.
308     found = this.kvset.get(kv);
309     if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
310       removeFromKVSet(kv);
311       long s = heapSizeChange(kv, true);
312       this.size.addAndGet(-s);
313     }
314   }
315 
316   /**
317    * Write a delete
318    * @param delete
319    * @return approximate size of the passed key and value.
320    */
321   long delete(final KeyValue delete) {
322     long s = 0;
323     KeyValue toAdd = maybeCloneWithAllocator(delete);
324     s += heapSizeChange(toAdd, addToKVSet(toAdd));
325     timeRangeTracker.includeTimestamp(toAdd);
326     this.size.addAndGet(s);
327     return s;
328   }
329 
330   /**
331    * @param kv Find the row that comes after this one.  If null, we return the
332    * first.
333    * @return Next row or null if none found.
334    */
335   KeyValue getNextRow(final KeyValue kv) {
336     return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
337   }
338 
339   /*
340    * @param a
341    * @param b
342    * @return Return lowest of a or b or null if both a and b are null
343    */
344   private KeyValue getLowest(final KeyValue a, final KeyValue b) {
345     if (a == null) {
346       return b;
347     }
348     if (b == null) {
349       return a;
350     }
351     return comparator.compareRows(a, b) <= 0? a: b;
352   }
353 
354   /*
355    * @param key Find row that follows this one.  If null, return first.
356    * @param map Set to look in for a row beyond <code>row</code>.
357    * @return Next row or null if none found.  If one found, will be a new
358    * KeyValue -- can be destroyed by subsequent calls to this method.
359    */
360   private KeyValue getNextRow(final KeyValue key,
361       final NavigableSet<KeyValue> set) {
362     KeyValue result = null;
363     SortedSet<KeyValue> tail = key == null? set: set.tailSet(key);
364     // Iterate until we fall into the next row; i.e. move off current row
365     for (KeyValue kv: tail) {
366       if (comparator.compareRows(kv, key) <= 0)
367         continue;
368       // Note: Not suppressing deletes or expired cells.  Needs to be handled
369       // by higher up functions.
370       result = kv;
371       break;
372     }
373     return result;
374   }
375 
376   /**
377    * @param state column/delete tracking state
378    */
379   void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
380     getRowKeyAtOrBefore(kvset, state);
381     getRowKeyAtOrBefore(snapshot, state);
382   }
383 
384   /*
385    * @param set
386    * @param state Accumulates deletes and candidates.
387    */
388   private void getRowKeyAtOrBefore(final NavigableSet<KeyValue> set,
389       final GetClosestRowBeforeTracker state) {
390     if (set.isEmpty()) {
391       return;
392     }
393     if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
394       // Found nothing in row.  Try backing up.
395       getRowKeyBefore(set, state);
396     }
397   }
398 
399   /*
400    * Walk forward in a row from <code>firstOnRow</code>.  Presumption is that
401    * we have been passed the first possible key on a row.  As we walk forward
402    * we accumulate deletes until we hit a candidate on the row at which point
403    * we return.
404    * @param set
405    * @param firstOnRow First possible key on this row.
406    * @param state
407    * @return True if we found a candidate walking this row.
408    */
409   private boolean walkForwardInSingleRow(final SortedSet<KeyValue> set,
410       final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) {
411     boolean foundCandidate = false;
412     SortedSet<KeyValue> tail = set.tailSet(firstOnRow);
413     if (tail.isEmpty()) return foundCandidate;
414     for (Iterator<KeyValue> i = tail.iterator(); i.hasNext();) {
415       KeyValue kv = i.next();
416       // Did we go beyond the target row? If so break.
417       if (state.isTooFar(kv, firstOnRow)) break;
418       if (state.isExpired(kv)) {
419         i.remove();
420         continue;
421       }
422       // If we added something, this row is a contender. break.
423       if (state.handle(kv)) {
424         foundCandidate = true;
425         break;
426       }
427     }
428     return foundCandidate;
429   }
430 
431   /*
432    * Walk backwards through the passed set a row at a time until we run out of
433    * set or until we get a candidate.
434    * @param set
435    * @param state
436    */
437   private void getRowKeyBefore(NavigableSet<KeyValue> set,
438       final GetClosestRowBeforeTracker state) {
439     KeyValue firstOnRow = state.getTargetKey();
440     for (Member p = memberOfPreviousRow(set, state, firstOnRow);
441         p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
442       // Make sure we don't fall out of our table.
443       if (!state.isTargetTable(p.kv)) break;
444       // Stop looking if we've exited the better candidate range.
445       if (!state.isBetterCandidate(p.kv)) break;
446       // Make into firstOnRow
447       firstOnRow = new KeyValue(p.kv.getRowArray(), p.kv.getRowOffset(), p.kv.getRowLength(),
448           HConstants.LATEST_TIMESTAMP);
449       // If we find something, break;
450       if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
451     }
452   }
453 
454   /**
455    * Only used by tests. TODO: Remove
456    *
457    * Given the specs of a column, update it, first by inserting a new record,
458    * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
459    * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
460    * store will ensure that the insert/delete each are atomic. A scanner/reader will either
461    * get the new value, or the old value and all readers will eventually only see the new
462    * value after the old was removed.
463    *
464    * @param row
465    * @param family
466    * @param qualifier
467    * @param newValue
468    * @param now
469    * @return  Timestamp
470    */
471   long updateColumnValue(byte[] row,
472                                 byte[] family,
473                                 byte[] qualifier,
474                                 long newValue,
475                                 long now) {
476     KeyValue firstKv = KeyValue.createFirstOnRow(
477         row, family, qualifier);
478     // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
479     SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
480     if (!snSs.isEmpty()) {
481       KeyValue snKv = snSs.first();
482       // is there a matching KV in the snapshot?
483       if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
484         if (snKv.getTimestamp() == now) {
485           // poop,
486           now += 1;
487         }
488       }
489     }
490 
491     // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
492     // But the timestamp should also be max(now, mostRecentTsInMemstore)
493 
494     // so we cant add the new KV w/o knowing what's there already, but we also
495     // want to take this chance to delete some kvs. So two loops (sad)
496 
497     SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
498     for (KeyValue kv : ss) {
499       // if this isnt the row we are interested in, then bail:
500       if (!kv.matchingColumn(family, qualifier) || !kv.matchingRow(firstKv)) {
501         break; // rows dont match, bail.
502       }
503 
504       // if the qualifier matches and it's a put, just RM it out of the kvset.
505       if (kv.getTypeByte() == KeyValue.Type.Put.getCode() &&
506           kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
507         now = kv.getTimestamp();
508       }
509     }
510 
511     // create or update (upsert) a new KeyValue with
512     // 'now' and a 0 memstoreTS == immediately visible
513     List<Cell> cells = new ArrayList<Cell>(1);
514     cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
515     return upsert(cells, 1L);
516   }
517 
518   /**
519    * Update or insert the specified KeyValues.
520    * <p>
521    * For each KeyValue, insert into MemStore.  This will atomically upsert the
522    * value for that row/family/qualifier.  If a KeyValue did already exist,
523    * it will then be removed.
524    * <p>
525    * Currently the memstoreTS is kept at 0 so as each insert happens, it will
526    * be immediately visible.  May want to change this so it is atomic across
527    * all KeyValues.
528    * <p>
529    * This is called under row lock, so Get operations will still see updates
530    * atomically.  Scans will only see each KeyValue update as atomic.
531    *
532    * @param cells
533    * @param readpoint readpoint below which we can safely remove duplicate KVs 
534    * @return change in memstore size
535    */
536   public long upsert(Iterable<Cell> cells, long readpoint) {
537     long size = 0;
538     for (Cell cell : cells) {
539       size += upsert(cell, readpoint);
540     }
541     return size;
542   }
543 
544   /**
545    * Inserts the specified KeyValue into MemStore and deletes any existing
546    * versions of the same row/family/qualifier as the specified KeyValue.
547    * <p>
548    * First, the specified KeyValue is inserted into the Memstore.
549    * <p>
550    * If there are any existing KeyValues in this MemStore with the same row,
551    * family, and qualifier, they are removed.
552    * <p>
553    * Callers must hold the read lock.
554    *
555    * @param cell
556    * @return change in size of MemStore
557    */
558   private long upsert(Cell cell, long readpoint) {
559     // Add the KeyValue to the MemStore
560     // Use the internalAdd method here since we (a) already have a lock
561     // and (b) cannot safely use the MSLAB here without potentially
562     // hitting OOME - see TestMemStore.testUpsertMSLAB for a
563     // test that triggers the pathological case if we don't avoid MSLAB
564     // here.
565     KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
566     long addedSize = internalAdd(kv);
567 
568     // Get the KeyValues for the row/family/qualifier regardless of timestamp.
569     // For this case we want to clean up any other puts
570     KeyValue firstKv = KeyValue.createFirstOnRow(
571         kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(),
572         kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
573         kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
574     SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
575     Iterator<KeyValue> it = ss.iterator();
576     // versions visible to oldest scanner
577     int versionsVisible = 0;
578     while ( it.hasNext() ) {
579       KeyValue cur = it.next();
580 
581       if (kv == cur) {
582         // ignore the one just put in
583         continue;
584       }
585       // check that this is the row and column we are interested in, otherwise bail
586       if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) {
587         // only remove Puts that concurrent scanners cannot possibly see
588         if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
589             cur.getMvccVersion() <= readpoint) {
590           if (versionsVisible > 1) {
591             // if we get here we have seen at least one version visible to the oldest scanner,
592             // which means we can prove that no scanner will see this version
593 
594             // false means there was a change, so give us the size.
595             long delta = heapSizeChange(cur, true);
596             addedSize -= delta;
597             this.size.addAndGet(-delta);
598             it.remove();
599             setOldestEditTimeToNow();
600           } else {
601             versionsVisible++;
602           }
603         }
604       } else {
605         // past the row or column, done
606         break;
607       }
608     }
609     return addedSize;
610   }
611 
612   /*
613    * Immutable data structure to hold member found in set and the set it was
614    * found in.  Include set because it is carrying context.
615    */
616   private static class Member {
617     final KeyValue kv;
618     final NavigableSet<KeyValue> set;
619     Member(final NavigableSet<KeyValue> s, final KeyValue kv) {
620       this.kv = kv;
621       this.set = s;
622     }
623   }
624 
625   /*
626    * @param set Set to walk back in.  Pass a first in row or we'll return
627    * same row (loop).
628    * @param state Utility and context.
629    * @param firstOnRow First item on the row after the one we want to find a
630    * member in.
631    * @return Null or member of row previous to <code>firstOnRow</code>
632    */
633   private Member memberOfPreviousRow(NavigableSet<KeyValue> set,
634       final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) {
635     NavigableSet<KeyValue> head = set.headSet(firstOnRow, false);
636     if (head.isEmpty()) return null;
637     for (Iterator<KeyValue> i = head.descendingIterator(); i.hasNext();) {
638       KeyValue found = i.next();
639       if (state.isExpired(found)) {
640         i.remove();
641         continue;
642       }
643       return new Member(head, found);
644     }
645     return null;
646   }
647 
648   /**
649    * @return scanner on memstore and snapshot in this order.
650    */
651   List<KeyValueScanner> getScanners(long readPt) {
652     return Collections.<KeyValueScanner>singletonList(
653         new MemStoreScanner(readPt));
654   }
655 
656   /**
657    * Check if this memstore may contain the required keys
658    * @param scan
659    * @return False if the key definitely does not exist in this Memstore
660    */
661   public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
662     return (timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
663         snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange()))
664         && (Math.max(timeRangeTracker.getMaximumTimestamp(),
665                      snapshotTimeRangeTracker.getMaximumTimestamp()) >=
666             oldestUnexpiredTS);
667   }
668 
669   public TimeRangeTracker getSnapshotTimeRangeTracker() {
670     return this.snapshotTimeRangeTracker;
671   }
672 
673   /*
674    * MemStoreScanner implements the KeyValueScanner.
675    * It lets the caller scan the contents of a memstore -- both current
676    * map and snapshot.
677    * This behaves as if it were a real scanner but does not maintain position.
678    */
679   protected class MemStoreScanner extends NonLazyKeyValueScanner {
680     // Next row information for either kvset or snapshot
681     private KeyValue kvsetNextRow = null;
682     private KeyValue snapshotNextRow = null;
683 
684     // last iterated KVs for kvset and snapshot (to restore iterator state after reseek)
685     private KeyValue kvsetItRow = null;
686     private KeyValue snapshotItRow = null;
687     
688     // iterator based scanning.
689     private Iterator<KeyValue> kvsetIt;
690     private Iterator<KeyValue> snapshotIt;
691 
692     // The kvset and snapshot at the time of creating this scanner
693     private KeyValueSkipListSet kvsetAtCreation;
694     private KeyValueSkipListSet snapshotAtCreation;
695 
696     // the pre-calculated KeyValue to be returned by peek() or next()
697     private KeyValue theNext;
698 
699     // The allocator and snapshot allocator at the time of creating this scanner
700     volatile MemStoreLAB allocatorAtCreation;
701     volatile MemStoreLAB snapshotAllocatorAtCreation;
702     
703     // A flag represents whether could stop skipping KeyValues for MVCC
704     // if have encountered the next row. Only used for reversed scan
705     private boolean stopSkippingKVsIfNextRow = false;
706 
707     private long readPoint;
708 
709     /*
710     Some notes...
711 
712      So memstorescanner is fixed at creation time. this includes pointers/iterators into
713     existing kvset/snapshot.  during a snapshot creation, the kvset is null, and the
714     snapshot is moved.  since kvset is null there is no point on reseeking on both,
715       we can save us the trouble. During the snapshot->hfile transition, the memstore
716       scanner is re-created by StoreScanner#updateReaders().  StoreScanner should
717       potentially do something smarter by adjusting the existing memstore scanner.
718 
719       But there is a greater problem here, that being once a scanner has progressed
720       during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
721       if a scan lasts a little while, there is a chance for new entries in kvset to
722       become available but we will never see them.  This needs to be handled at the
723       StoreScanner level with coordination with MemStoreScanner.
724 
725       Currently, this problem is only partly managed: during the small amount of time
726       when the StoreScanner has not yet created a new MemStoreScanner, we will miss
727       the adds to kvset in the MemStoreScanner.
728     */
729 
730     MemStoreScanner(long readPoint) {
731       super();
732 
733       this.readPoint = readPoint;
734       kvsetAtCreation = kvset;
735       snapshotAtCreation = snapshot;
736       if (allocator != null) {
737         this.allocatorAtCreation = allocator;
738         this.allocatorAtCreation.incScannerCount();
739       }
740       if (snapshotAllocator != null) {
741         this.snapshotAllocatorAtCreation = snapshotAllocator;
742         this.snapshotAllocatorAtCreation.incScannerCount();
743       }
744     }
745 
746     private KeyValue getNext(Iterator<KeyValue> it) {
747       KeyValue startKV = theNext;
748       KeyValue v = null;
749       try {
750         while (it.hasNext()) {
751           v = it.next();
752           if (v.getMvccVersion() <= this.readPoint) {
753             return v;
754           }
755           if (stopSkippingKVsIfNextRow && startKV != null
756               && comparator.compareRows(v, startKV) > 0) {
757             return null;
758           }
759         }
760 
761         return null;
762       } finally {
763         if (v != null) {
764           // in all cases, remember the last KV iterated to
765           if (it == snapshotIt) {
766             snapshotItRow = v;
767           } else {
768             kvsetItRow = v;
769           }
770         }
771       }
772     }
773 
774     /**
775      *  Set the scanner at the seek key.
776      *  Must be called only once: there is no thread safety between the scanner
777      *   and the memStore.
778      * @param key seek value
779      * @return false if the key is null or if there is no data
780      */
781     @Override
782     public synchronized boolean seek(KeyValue key) {
783       if (key == null) {
784         close();
785         return false;
786       }
787 
788       // kvset and snapshot will never be null.
789       // if tailSet can't find anything, SortedSet is empty (not null).
790       kvsetIt = kvsetAtCreation.tailSet(key).iterator();
791       snapshotIt = snapshotAtCreation.tailSet(key).iterator();
792       kvsetItRow = null;
793       snapshotItRow = null;
794 
795       return seekInSubLists(key);
796     }
797 
798 
799     /**
800      * (Re)initialize the iterators after a seek or a reseek.
801      */
802     private synchronized boolean seekInSubLists(KeyValue key){
803       kvsetNextRow = getNext(kvsetIt);
804       snapshotNextRow = getNext(snapshotIt);
805 
806       // Calculate the next value
807       theNext = getLowest(kvsetNextRow, snapshotNextRow);
808 
809       // has data
810       return (theNext != null);
811     }
812 
813 
814     /**
815      * Move forward on the sub-lists set previously by seek.
816      * @param key seek value (should be non-null)
817      * @return true if there is at least one KV to read, false otherwise
818      */
819     @Override
820     public synchronized boolean reseek(KeyValue key) {
821       /*
822       See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
823       This code is executed concurrently with flush and puts, without locks.
824       Two points must be known when working on this code:
825       1) It's not possible to use the 'kvTail' and 'snapshot'
826        variables, as they are modified during a flush.
827       2) The ideal implementation for performance would use the sub skip list
828        implicitly pointed by the iterators 'kvsetIt' and
829        'snapshotIt'. Unfortunately the Java API does not offer a method to
830        get it. So we remember the last keys we iterated to and restore
831        the reseeked set to at least that point.
832        */
833 
834       kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator();
835       snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
836 
837       return seekInSubLists(key);
838     }
839 
840 
841     @Override
842     public synchronized KeyValue peek() {
843       //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
844       return theNext;
845     }
846 
847     @Override
848     public synchronized KeyValue next() {
849       if (theNext == null) {
850           return null;
851       }
852 
853       final KeyValue ret = theNext;
854 
855       // Advance one of the iterators
856       if (theNext == kvsetNextRow) {
857         kvsetNextRow = getNext(kvsetIt);
858       } else {
859         snapshotNextRow = getNext(snapshotIt);
860       }
861 
862       // Calculate the next value
863       theNext = getLowest(kvsetNextRow, snapshotNextRow);
864 
865       //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
866       //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
867       //    getLowest() + " threadpoint=" + readpoint);
868       return ret;
869     }
870 
871     /*
872      * Returns the lower of the two key values, or null if they are both null.
873      * This uses comparator.compare() to compare the KeyValue using the memstore
874      * comparator.
875      */
876     private KeyValue getLowest(KeyValue first, KeyValue second) {
877       if (first == null && second == null) {
878         return null;
879       }
880       if (first != null && second != null) {
881         int compare = comparator.compare(first, second);
882         return (compare <= 0 ? first : second);
883       }
884       return (first != null ? first : second);
885     }
886 
887     /*
888      * Returns the higher of the two key values, or null if they are both null.
889      * This uses comparator.compare() to compare the KeyValue using the memstore
890      * comparator.
891      */
892     private KeyValue getHighest(KeyValue first, KeyValue second) {
893       if (first == null && second == null) {
894         return null;
895       }
896       if (first != null && second != null) {
897         int compare = comparator.compare(first, second);
898         return (compare > 0 ? first : second);
899       }
900       return (first != null ? first : second);
901     }
902 
903     public synchronized void close() {
904       this.kvsetNextRow = null;
905       this.snapshotNextRow = null;
906 
907       this.kvsetIt = null;
908       this.snapshotIt = null;
909       
910       if (allocatorAtCreation != null) {
911         this.allocatorAtCreation.decScannerCount();
912         this.allocatorAtCreation = null;
913       }
914       if (snapshotAllocatorAtCreation != null) {
915         this.snapshotAllocatorAtCreation.decScannerCount();
916         this.snapshotAllocatorAtCreation = null;
917       }
918 
919       this.kvsetItRow = null;
920       this.snapshotItRow = null;
921     }
922 
923     /**
924      * MemStoreScanner returns max value as sequence id because it will
925      * always have the latest data among all files.
926      */
927     @Override
928     public long getSequenceID() {
929       return Long.MAX_VALUE;
930     }
931 
932     @Override
933     public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns,
934         long oldestUnexpiredTS) {
935       return shouldSeek(scan, oldestUnexpiredTS);
936     }
937 
938     /**
939      * Seek scanner to the given key first. If it returns false(means
940      * peek()==null) or scanner's peek row is bigger than row of given key, seek
941      * the scanner to the previous row of given key
942      */
943     @Override
944     public synchronized boolean backwardSeek(KeyValue key) {
945       seek(key);
946       if (peek() == null || comparator.compareRows(peek(), key) > 0) {
947         return seekToPreviousRow(key);
948       }
949       return true;
950     }
951 
952     /**
953      * Separately get the KeyValue before the specified key from kvset and
954      * snapshotset, and use the row of higher one as the previous row of
955      * specified key, then seek to the first KeyValue of previous row
956      */
957     @Override
958     public synchronized boolean seekToPreviousRow(KeyValue key) {
959       KeyValue firstKeyOnRow = KeyValue.createFirstOnRow(key.getRow());
960       SortedSet<KeyValue> kvHead = kvsetAtCreation.headSet(firstKeyOnRow);
961       KeyValue kvsetBeforeRow = kvHead.isEmpty() ? null : kvHead.last();
962       SortedSet<KeyValue> snapshotHead = snapshotAtCreation
963           .headSet(firstKeyOnRow);
964       KeyValue snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
965           .last();
966       KeyValue lastKVBeforeRow = getHighest(kvsetBeforeRow, snapshotBeforeRow);
967       if (lastKVBeforeRow == null) {
968         theNext = null;
969         return false;
970       }
971       KeyValue firstKeyOnPreviousRow = KeyValue
972           .createFirstOnRow(lastKVBeforeRow.getRow());
973       this.stopSkippingKVsIfNextRow = true;
974       seek(firstKeyOnPreviousRow);
975       this.stopSkippingKVsIfNextRow = false;
976       if (peek() == null
977           || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
978         return seekToPreviousRow(lastKVBeforeRow);
979       }
980       return true;
981     }
982 
983     @Override
984     public synchronized boolean seekToLastRow() {
985       KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation
986           .last();
987       KeyValue second = snapshotAtCreation.isEmpty() ? null
988           : snapshotAtCreation.last();
989       KeyValue higherKv = getHighest(first, second);
990       if (higherKv == null) {
991         return false;
992       }
993       KeyValue firstKvOnLastRow = KeyValue.createFirstOnRow(higherKv.getRow());
994       if (seek(firstKvOnLastRow)) {
995         return true;
996       } else {
997         return seekToPreviousRow(higherKv);
998       }
999 
1000     }
1001   }
1002 
1003   public final static long FIXED_OVERHEAD = ClassSize.align(
1004       ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG));
1005 
1006   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
1007       ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
1008       (2 * ClassSize.KEYVALUE_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
1009 
1010   /*
1011    * Calculate how the MemStore size has changed.  Includes overhead of the
1012    * backing Map.
1013    * @param kv
1014    * @param notpresent True if the kv was NOT present in the set.
1015    * @return Size
1016    */
1017   static long heapSizeChange(final KeyValue kv, final boolean notpresent) {
1018     return notpresent ?
1019         ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
1020         0;
1021   }
1022 
1023   /**
1024    * Get the entire heap usage for this MemStore not including keys in the
1025    * snapshot.
1026    */
1027   @Override
1028   public long heapSize() {
1029     return size.get();
1030   }
1031 
1032   /**
1033    * Get the heap usage of KVs in this MemStore.
1034    */
1035   public long keySize() {
1036     return heapSize() - DEEP_OVERHEAD;
1037   }
1038 
1039   /**
1040    * Code to help figure if our approximation of object heap sizes is close
1041    * enough.  See hbase-900.  Fills memstores then waits so user can heap
1042    * dump and bring up resultant hprof in something like jprofiler which
1043    * allows you get 'deep size' on objects.
1044    * @param args main args
1045    */
1046   public static void main(String [] args) {
1047     RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
1048     LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
1049       runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
1050     LOG.info("vmInputArguments=" + runtime.getInputArguments());
1051     MemStore memstore1 = new MemStore();
1052     // TODO: x32 vs x64
1053     long size = 0;
1054     final int count = 10000;
1055     byte [] fam = Bytes.toBytes("col");
1056     byte [] qf = Bytes.toBytes("umn");
1057     byte [] empty = new byte[0];
1058     for (int i = 0; i < count; i++) {
1059       // Give each its own ts
1060       size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
1061     }
1062     LOG.info("memstore1 estimated size=" + size);
1063     for (int i = 0; i < count; i++) {
1064       size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
1065     }
1066     LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
1067     // Make a variably sized memstore.
1068     MemStore memstore2 = new MemStore();
1069     for (int i = 0; i < count; i++) {
1070       size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
1071         new byte[i]));
1072     }
1073     LOG.info("memstore2 estimated size=" + size);
1074     final int seconds = 30;
1075     LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
1076     for (int i = 0; i < seconds; i++) {
1077       // Thread.sleep(1000);
1078     }
1079     LOG.info("Exiting.");
1080   }
1081 }