View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.SortedSet;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
35  import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
36  import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
37  import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
38  import org.apache.hadoop.hbase.util.CollectionBackedScanner;
39  
40  import com.google.common.annotations.VisibleForTesting;
41  
42  /**
43   * Stripe implementation of StoreFlusher. Flushes files either into L0 file w/o metadata, or
44   * into separate striped files, avoiding L0.
45   */
46  public class StripeStoreFlusher extends StoreFlusher {
47    private static final Log LOG = LogFactory.getLog(StripeStoreFlusher.class);
48    private final Object flushLock = new Object();
49    private final StripeCompactionPolicy policy;
50    private final StripeCompactionPolicy.StripeInformationProvider stripes;
51  
52    public StripeStoreFlusher(Configuration conf, Store store,
53        StripeCompactionPolicy policy, StripeStoreFileManager stripes) {
54      super(conf, store);
55      this.policy = policy;
56      this.stripes = stripes;
57    }
58  
59    @Override
60    public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushSeqNum,
61        final TimeRangeTracker tracker, AtomicLong flushedSize, MonitoredTask status)
62            throws IOException {
63      List<Path> result = null;
64      int kvCount = snapshot.size();
65      if (kvCount == 0) return result; // don't flush if there are no entries
66  
67      long smallestReadPoint = store.getSmallestReadPoint();
68      InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
69      if (scanner == null) {
70        return result; // NULL scanner returned from coprocessor hooks means skip normal processing
71      }
72  
73      // Let policy select flush method.
74      StripeFlushRequest req = this.policy.selectFlush(this.stripes, kvCount);
75  
76      long flushedBytes = 0;
77      boolean success = false;
78      StripeMultiFileWriter mw = null;
79      try {
80        mw = req.createWriter(); // Writer according to the policy.
81        StripeMultiFileWriter.WriterFactory factory = createWriterFactory(tracker, kvCount);
82        StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
83        mw.init(storeScanner, factory, store.getComparator());
84  
85        synchronized (flushLock) {
86          flushedBytes = performFlush(scanner, mw, smallestReadPoint);
87          result = mw.commitWriters(cacheFlushSeqNum, false);
88          success = true;
89        }
90      } finally {
91        if (!success && (mw != null)) {
92          if (result != null) {
93            result.clear();
94          }
95          for (Path leftoverFile : mw.abortWriters()) {
96            try {
97              store.getFileSystem().delete(leftoverFile, false);
98            } catch (Exception e) {
99              LOG.error("Failed to delete a file after failed flush: " + e);
100           }
101         }
102       }
103       flushedSize.set(flushedBytes);
104       try {
105         scanner.close();
106       } catch (IOException ex) {
107         LOG.warn("Failed to close flush scanner, ignoring", ex);
108       }
109     }
110     return result;
111   }
112 
113   private StripeMultiFileWriter.WriterFactory createWriterFactory(
114       final TimeRangeTracker tracker, final long kvCount) {
115     return new StripeMultiFileWriter.WriterFactory() {
116       @Override
117       public Writer createWriter() throws IOException {
118         StoreFile.Writer writer = store.createWriterInTmp(
119             kvCount, store.getFamily().getCompression(), false, true, true);
120         writer.setTimeRangeTracker(tracker);
121         return writer;
122       }
123     };
124   }
125 
126   /** Stripe flush request wrapper that writes a non-striped file. */
127   public static class StripeFlushRequest {
128     @VisibleForTesting
129     public StripeMultiFileWriter createWriter() throws IOException {
130       StripeMultiFileWriter writer =
131           new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
132       writer.setNoStripeMetadata();
133       return writer;
134     }
135   }
136 
137   /** Stripe flush request wrapper based on boundaries. */
138   public static class BoundaryStripeFlushRequest extends StripeFlushRequest {
139     private final List<byte[]> targetBoundaries;
140 
141     /** @param targetBoundaries New files should be written with these boundaries. */
142     public BoundaryStripeFlushRequest(List<byte[]> targetBoundaries) {
143       this.targetBoundaries = targetBoundaries;
144     }
145 
146     @Override
147     public StripeMultiFileWriter createWriter() throws IOException {
148       return new StripeMultiFileWriter.BoundaryMultiWriter(targetBoundaries, null, null);
149     }
150   }
151 
152   /** Stripe flush request wrapper based on size. */
153   public static class SizeStripeFlushRequest extends StripeFlushRequest {
154     private final int targetCount;
155     private final long targetKvs;
156 
157     /**
158      * @param targetCount The maximum number of stripes to flush into.
159      * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
160      *                  total number of kvs, all the overflow data goes into the last stripe.
161      */
162     public SizeStripeFlushRequest(int targetCount, long targetKvs) {
163       this.targetCount = targetCount;
164       this.targetKvs = targetKvs;
165     }
166 
167     @Override
168     public StripeMultiFileWriter createWriter() throws IOException {
169       return new StripeMultiFileWriter.SizeMultiWriter(
170           this.targetCount, this.targetKvs, OPEN_KEY, OPEN_KEY);
171     }
172   }
173 }