View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.List;
25  import java.util.SortedSet;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValueUtil;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
39  import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
40  import org.apache.hadoop.hbase.util.CollectionBackedScanner;
41  
42  /**
43   * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
44   * Custom implementation can be provided.
45   */
46  @InterfaceAudience.Private
47  abstract class StoreFlusher {
48    protected Configuration conf;
49    protected Store store;
50  
51    public StoreFlusher(Configuration conf, Store store) {
52      this.conf = conf;
53      this.store = store;
54    }
55  
56    /**
57     * Turns a snapshot of memstore into a set of store files.
58     * @param snapshot Memstore snapshot.
59     * @param cacheFlushSeqNum Log cache flush sequence number.
60     * @param snapshotTimeRangeTracker Time range tracker from the memstore
61     *                                 pertaining to the snapshot.
62     * @param flushedSize Out parameter for the size of the KVs flushed.
63     * @param status Task that represents the flush operation and may be updated with status.
64     * @return List of files written. Can be empty; must not be null.
65     */
66    public abstract List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushSeqNum,
67        TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status)
68        throws IOException;
69  
70    protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum,
71        MonitoredTask status) throws IOException {
72      // Write out the log sequence number that corresponds to this output
73      // hfile. Also write current time in metadata as minFlushTime.
74      // The hfile is current up to and including cacheFlushSeqNum.
75      status.setStatus("Flushing " + store + ": appending metadata");
76      writer.appendMetadata(cacheFlushSeqNum, false);
77      status.setStatus("Flushing " + store + ": closing flushed file");
78      writer.close();
79    }
80  
81  
82    /**
83     * Creates the scanner for flushing snapshot. Also calls coprocessors.
84     * @return The scanner; null if coprocessor is canceling the flush.
85     */
86    protected InternalScanner createScanner(SortedSet<KeyValue> snapshot,
87        long smallestReadPoint) throws IOException {
88      KeyValueScanner memstoreScanner =
89          new CollectionBackedScanner(snapshot, store.getComparator());
90      InternalScanner scanner = null;
91      if (store.getCoprocessorHost() != null) {
92        scanner = store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner);
93      }
94      if (scanner == null) {
95        Scan scan = new Scan();
96        scan.setMaxVersions(store.getScanInfo().getMaxVersions());
97        scanner = new StoreScanner(store, store.getScanInfo(), scan,
98            Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
99            smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
100     }
101     assert scanner != null;
102     if (store.getCoprocessorHost() != null) {
103       try {
104         return store.getCoprocessorHost().preFlush(store, scanner);
105       } catch (IOException ioe) {
106         scanner.close();
107         throw ioe;
108       }
109     }
110     return scanner;
111   }
112 
113   /**
114    * Performs memstore flush, writing data from scanner into sink.
115    * @param scanner Scanner to get data from.
116    * @param sink Sink to write data to. Could be StoreFile.Writer.
117    * @param smallestReadPoint Smallest read point used for the flush.
118    * @return Bytes flushed.
119    */
120   protected long performFlush(InternalScanner scanner,
121       Compactor.CellSink sink, long smallestReadPoint) throws IOException {
122     int compactionKVMax =
123       conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
124     List<Cell> kvs = new ArrayList<Cell>();
125     boolean hasMore;
126     long flushed = 0;
127     do {
128       hasMore = scanner.next(kvs, compactionKVMax);
129       if (!kvs.isEmpty()) {
130         for (Cell c : kvs) {
131           // If we know that this KV is going to be included always, then let us
132           // set its memstoreTS to 0. This will help us save space when writing to
133           // disk.
134           KeyValue kv = KeyValueUtil.ensureKeyValue(c);
135           if (kv.getMvccVersion() <= smallestReadPoint) {
136             // let us not change the original KV. It could be in the memstore
137             // changing its memstoreTS could affect other threads/scanners.
138             kv = kv.shallowCopy();
139             kv.setMvccVersion(0);
140           }
141           sink.append(kv);
142           flushed += MemStore.heapSizeChange(kv, true);
143         }
144         kvs.clear();
145       }
146     } while (hasMore);
147     return flushed;
148   }
149 }