View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.SortedSet;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.KeyValue;
32  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
33  import org.apache.hadoop.hbase.util.CollectionBackedScanner;
34  import org.apache.hadoop.util.StringUtils;
35  
36  /**
37   * Default implementation of StoreFlusher.
38   */
39  public class DefaultStoreFlusher extends StoreFlusher {
40    private static final Log LOG = LogFactory.getLog(DefaultStoreFlusher.class);
41    private final Object flushLock = new Object();
42  
43    public DefaultStoreFlusher(Configuration conf, Store store) {
44      super(conf, store);
45    }
46  
47    @Override
48    public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushId,
49        TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize,
50        MonitoredTask status) throws IOException {
51      ArrayList<Path> result = new ArrayList<Path>();
52      if (snapshot.size() == 0) return result; // don't flush if there are no entries
53  
54      // Use a store scanner to find which rows to flush.
55      long smallestReadPoint = store.getSmallestReadPoint();
56      InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
57      if (scanner == null) {
58        return result; // NULL scanner returned from coprocessor hooks means skip normal processing
59      }
60  
61      StoreFile.Writer writer;
62      long flushed = 0;
63      try {
64        // TODO:  We can fail in the below block before we complete adding this flush to
65        //        list of store files.  Add cleanup of anything put on filesystem if we fail.
66        synchronized (flushLock) {
67          status.setStatus("Flushing " + store + ": creating writer");
68          // Write the map out to the disk
69          writer = store.createWriterInTmp(
70              snapshot.size(), store.getFamily().getCompression(), false, true, true);
71          writer.setTimeRangeTracker(snapshotTimeRangeTracker);
72          try {
73            flushed = performFlush(scanner, writer, smallestReadPoint);
74          } finally {
75            finalizeWriter(writer, cacheFlushId, status);
76          }
77        }
78      } finally {
79        flushedSize.set(flushed);
80        scanner.close();
81      }
82      LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
83          + StringUtils.humanReadableInt(flushed) +
84          ", hasBloomFilter=" + writer.hasGeneralBloom() +
85          ", into tmp file " + writer.getPath());
86      result.add(writer.getPath());
87      return result;
88    }
89  }