1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.SortedSet;
27 import java.util.concurrent.atomic.AtomicLong;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.KeyValue;
34 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
35 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
36 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
37 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
38 import org.apache.hadoop.hbase.util.CollectionBackedScanner;
39
40 import com.google.common.annotations.VisibleForTesting;
41
42
43
44
45
46 public class StripeStoreFlusher extends StoreFlusher {
47 private static final Log LOG = LogFactory.getLog(StripeStoreFlusher.class);
48 private final Object flushLock = new Object();
49 private final StripeCompactionPolicy policy;
50 private final StripeCompactionPolicy.StripeInformationProvider stripes;
51
52 public StripeStoreFlusher(Configuration conf, Store store,
53 StripeCompactionPolicy policy, StripeStoreFileManager stripes) {
54 super(conf, store);
55 this.policy = policy;
56 this.stripes = stripes;
57 }
58
59 @Override
60 public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushSeqNum,
61 final TimeRangeTracker tracker, AtomicLong flushedSize, MonitoredTask status)
62 throws IOException {
63 List<Path> result = null;
64 int kvCount = snapshot.size();
65 if (kvCount == 0) return result;
66
67 long smallestReadPoint = store.getSmallestReadPoint();
68 InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
69 if (scanner == null) {
70 return result;
71 }
72
73
74 StripeFlushRequest req = this.policy.selectFlush(this.stripes, kvCount);
75
76 long flushedBytes = 0;
77 boolean success = false;
78 StripeMultiFileWriter mw = null;
79 try {
80 mw = req.createWriter();
81 StripeMultiFileWriter.WriterFactory factory = createWriterFactory(tracker, kvCount);
82 StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
83 mw.init(storeScanner, factory, store.getComparator());
84
85 synchronized (flushLock) {
86 flushedBytes = performFlush(scanner, mw, smallestReadPoint);
87 result = mw.commitWriters(cacheFlushSeqNum, false);
88 success = true;
89 }
90 } finally {
91 if (!success && (mw != null)) {
92 if (result != null) {
93 result.clear();
94 }
95 for (Path leftoverFile : mw.abortWriters()) {
96 try {
97 store.getFileSystem().delete(leftoverFile, false);
98 } catch (Exception e) {
99 LOG.error("Failed to delete a file after failed flush: " + e);
100 }
101 }
102 }
103 flushedSize.set(flushedBytes);
104 try {
105 scanner.close();
106 } catch (IOException ex) {
107 LOG.warn("Failed to close flush scanner, ignoring", ex);
108 }
109 }
110 return result;
111 }
112
113 private StripeMultiFileWriter.WriterFactory createWriterFactory(
114 final TimeRangeTracker tracker, final long kvCount) {
115 return new StripeMultiFileWriter.WriterFactory() {
116 @Override
117 public Writer createWriter() throws IOException {
118 StoreFile.Writer writer = store.createWriterInTmp(
119 kvCount, store.getFamily().getCompression(), false, true, true);
120 writer.setTimeRangeTracker(tracker);
121 return writer;
122 }
123 };
124 }
125
126
127 public static class StripeFlushRequest {
128 @VisibleForTesting
129 public StripeMultiFileWriter createWriter() throws IOException {
130 StripeMultiFileWriter writer =
131 new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
132 writer.setNoStripeMetadata();
133 return writer;
134 }
135 }
136
137
138 public static class BoundaryStripeFlushRequest extends StripeFlushRequest {
139 private final List<byte[]> targetBoundaries;
140
141
142 public BoundaryStripeFlushRequest(List<byte[]> targetBoundaries) {
143 this.targetBoundaries = targetBoundaries;
144 }
145
146 @Override
147 public StripeMultiFileWriter createWriter() throws IOException {
148 return new StripeMultiFileWriter.BoundaryMultiWriter(targetBoundaries, null, null);
149 }
150 }
151
152
153 public static class SizeStripeFlushRequest extends StripeFlushRequest {
154 private final int targetCount;
155 private final long targetKvs;
156
157
158
159
160
161
162 public SizeStripeFlushRequest(int targetCount, long targetKvs) {
163 this.targetCount = targetCount;
164 this.targetKvs = targetKvs;
165 }
166
167 @Override
168 public StripeMultiFileWriter createWriter() throws IOException {
169 return new StripeMultiFileWriter.SizeMultiWriter(
170 this.targetCount, this.targetKvs, OPEN_KEY, OPEN_KEY);
171 }
172 }
173 }