1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.SortedSet;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.KeyValue;
32 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
33 import org.apache.hadoop.hbase.util.CollectionBackedScanner;
34 import org.apache.hadoop.util.StringUtils;
35
36
37
38
39 public class DefaultStoreFlusher extends StoreFlusher {
40 private static final Log LOG = LogFactory.getLog(DefaultStoreFlusher.class);
41 private final Object flushLock = new Object();
42
43 public DefaultStoreFlusher(Configuration conf, Store store) {
44 super(conf, store);
45 }
46
47 @Override
48 public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushId,
49 TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize,
50 MonitoredTask status) throws IOException {
51 ArrayList<Path> result = new ArrayList<Path>();
52 if (snapshot.size() == 0) return result;
53
54
55 long smallestReadPoint = store.getSmallestReadPoint();
56 InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
57 if (scanner == null) {
58 return result;
59 }
60
61 StoreFile.Writer writer;
62 long flushed = 0;
63 try {
64
65
66 synchronized (flushLock) {
67 status.setStatus("Flushing " + store + ": creating writer");
68
69 writer = store.createWriterInTmp(
70 snapshot.size(), store.getFamily().getCompression(), false, true, true);
71 writer.setTimeRangeTracker(snapshotTimeRangeTracker);
72 try {
73 flushed = performFlush(scanner, writer, smallestReadPoint);
74 } finally {
75 finalizeWriter(writer, cacheFlushId, status);
76 }
77 }
78 } finally {
79 flushedSize.set(flushed);
80 scanner.close();
81 }
82 LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
83 + StringUtils.humanReadableInt(flushed) +
84 ", hasBloomFilter=" + writer.hasGeneralBloom() +
85 ", into tmp file " + writer.getPath());
86 result.add(writer.getPath());
87 return result;
88 }
89 }