1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.KeyValue;
30 import org.apache.hadoop.hbase.KeyValue.KVComparator;
31 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
32 import org.apache.hadoop.hbase.util.Bytes;
33
34
35
36
37 public abstract class StripeMultiFileWriter implements Compactor.CellSink {
38 private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class);
39
40
41 protected WriterFactory writerFactory;
42 protected KVComparator comparator;
43
44 protected List<StoreFile.Writer> existingWriters;
45 protected List<byte[]> boundaries;
46
47 protected StoreScanner sourceScanner;
48
49
50 private boolean doWriteStripeMetadata = true;
51
52 public interface WriterFactory {
53 public StoreFile.Writer createWriter() throws IOException;
54 }
55
56
57
58
59
60
61
62 public void init(StoreScanner sourceScanner, WriterFactory factory, KVComparator comparator)
63 throws IOException {
64 this.writerFactory = factory;
65 this.sourceScanner = sourceScanner;
66 this.comparator = comparator;
67 }
68
69 public void setNoStripeMetadata() {
70 this.doWriteStripeMetadata = false;
71 }
72
73 public List<Path> commitWriters(long maxSeqId, boolean isMajor) throws IOException {
74 assert this.existingWriters != null;
75 commitWritersInternal();
76 assert this.boundaries.size() == (this.existingWriters.size() + 1);
77 LOG.debug((this.doWriteStripeMetadata ? "W" : "Not w")
78 + "riting out metadata for " + this.existingWriters.size() + " writers");
79 List<Path> paths = new ArrayList<Path>();
80 for (int i = 0; i < this.existingWriters.size(); ++i) {
81 StoreFile.Writer writer = this.existingWriters.get(i);
82 if (writer == null) continue;
83 if (doWriteStripeMetadata) {
84 writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i));
85 writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1));
86 }
87 writer.appendMetadata(maxSeqId, isMajor);
88 paths.add(writer.getPath());
89 writer.close();
90 }
91 this.existingWriters = null;
92 return paths;
93 }
94
95 public List<Path> abortWriters() {
96 assert this.existingWriters != null;
97 List<Path> paths = new ArrayList<Path>();
98 for (StoreFile.Writer writer : this.existingWriters) {
99 try {
100 paths.add(writer.getPath());
101 writer.close();
102 } catch (Exception ex) {
103 LOG.error("Failed to close the writer after an unfinished compaction.", ex);
104 }
105 }
106 this.existingWriters = null;
107 return paths;
108 }
109
110
111
112
113
114
115
116
117 protected void sanityCheckLeft(
118 byte[] left, byte[] row, int rowOffset, int rowLength) throws IOException {
119 if (StripeStoreFileManager.OPEN_KEY != left &&
120 comparator.compareRows(row, rowOffset, rowLength, left, 0, left.length) < 0) {
121 String error = "The first row is lower than the left boundary of [" + Bytes.toString(left)
122 + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
123 LOG.error(error);
124 throw new IOException(error);
125 }
126 }
127
128
129
130
131
132
133
134
135 protected void sanityCheckRight(
136 byte[] right, byte[] row, int rowOffset, int rowLength) throws IOException {
137 if (StripeStoreFileManager.OPEN_KEY != right &&
138 comparator.compareRows(row, rowOffset, rowLength, right, 0, right.length) >= 0) {
139 String error = "The last row is higher or equal than the right boundary of ["
140 + Bytes.toString(right) + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
141 LOG.error(error);
142 throw new IOException(error);
143 }
144 }
145
146
147
148
149
150 protected abstract void commitWritersInternal() throws IOException;
151
152
153
154
155
156
157 public static class BoundaryMultiWriter extends StripeMultiFileWriter {
158 private StoreFile.Writer currentWriter;
159 private byte[] currentWriterEndKey;
160
161 private KeyValue lastKv;
162 private long kvsInCurrentWriter = 0;
163 private int majorRangeFromIndex = -1, majorRangeToIndex = -1;
164 private boolean hasAnyWriter = false;
165
166
167
168
169
170
171
172
173 public BoundaryMultiWriter(List<byte[]> targetBoundaries,
174 byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException {
175 super();
176 this.boundaries = targetBoundaries;
177 this.existingWriters = new ArrayList<StoreFile.Writer>(this.boundaries.size() - 1);
178
179
180 assert (majorRangeFrom == null) == (majorRangeTo == null);
181 if (majorRangeFrom != null) {
182 majorRangeFromIndex = (majorRangeFrom == StripeStoreFileManager.OPEN_KEY) ? 0
183 : Collections.binarySearch(this.boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR);
184 majorRangeToIndex = (majorRangeTo == StripeStoreFileManager.OPEN_KEY) ? boundaries.size()
185 : Collections.binarySearch(this.boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR);
186 if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) {
187 throw new IOException("Major range does not match writer boundaries: [" +
188 Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from "
189 + majorRangeFromIndex + " to " + majorRangeToIndex);
190 }
191 }
192 }
193
194 @Override
195 public void append(KeyValue kv) throws IOException {
196 if (currentWriter == null && existingWriters.isEmpty()) {
197
198 sanityCheckLeft(this.boundaries.get(0),
199 kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
200 }
201 prepareWriterFor(kv);
202 currentWriter.append(kv);
203 lastKv = kv;
204 ++kvsInCurrentWriter;
205 }
206
207 private boolean isKvAfterCurrentWriter(KeyValue kv) {
208 return ((currentWriterEndKey != StripeStoreFileManager.OPEN_KEY) &&
209 (comparator.compareRows(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
210 currentWriterEndKey, 0, currentWriterEndKey.length) >= 0));
211 }
212
213 @Override
214 protected void commitWritersInternal() throws IOException {
215 stopUsingCurrentWriter();
216 while (existingWriters.size() < boundaries.size() - 1) {
217 createEmptyWriter();
218 }
219 if (lastKv != null) {
220 sanityCheckRight(boundaries.get(boundaries.size() - 1),
221 lastKv.getRowArray(), lastKv.getRowOffset(), lastKv.getRowLength());
222 }
223 }
224
225 private void prepareWriterFor(KeyValue kv) throws IOException {
226 if (currentWriter != null && !isKvAfterCurrentWriter(kv)) return;
227
228 stopUsingCurrentWriter();
229
230 while (isKvAfterCurrentWriter(kv)) {
231 checkCanCreateWriter();
232 createEmptyWriter();
233 }
234 checkCanCreateWriter();
235 hasAnyWriter = true;
236 currentWriter = writerFactory.createWriter();
237 existingWriters.add(currentWriter);
238 }
239
240
241
242
243
244
245
246
247
248
249
250 private void createEmptyWriter() throws IOException {
251 int index = existingWriters.size();
252 boolean isInMajorRange = (index >= majorRangeFromIndex) && (index < majorRangeToIndex);
253
254 boolean isLastWriter = !hasAnyWriter && (index == (boundaries.size() - 2));
255 boolean needEmptyFile = isInMajorRange || isLastWriter;
256 existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null);
257 hasAnyWriter |= needEmptyFile;
258 currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
259 ? null : boundaries.get(existingWriters.size() + 1);
260 }
261
262 private void checkCanCreateWriter() throws IOException {
263 int maxWriterCount = boundaries.size() - 1;
264 assert existingWriters.size() <= maxWriterCount;
265 if (existingWriters.size() >= maxWriterCount) {
266 throw new IOException("Cannot create any more writers (created " + existingWriters.size()
267 + " out of " + maxWriterCount + " - row might be out of range of all valid writers");
268 }
269 }
270
271 private void stopUsingCurrentWriter() {
272 if (currentWriter != null) {
273 if (LOG.isDebugEnabled()) {
274 LOG.debug("Stopping to use a writer after [" + Bytes.toString(currentWriterEndKey)
275 + "] row; wrote out " + kvsInCurrentWriter + " kvs");
276 }
277 kvsInCurrentWriter = 0;
278 }
279 currentWriter = null;
280 currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
281 ? null : boundaries.get(existingWriters.size() + 1);
282 }
283 }
284
285
286
287
288
289
290
291 public static class SizeMultiWriter extends StripeMultiFileWriter {
292 private int targetCount;
293 private long targetKvs;
294 private byte[] left;
295 private byte[] right;
296
297 private KeyValue lastKv;
298 private StoreFile.Writer currentWriter;
299 protected byte[] lastRowInCurrentWriter = null;
300 private long kvsInCurrentWriter = 0;
301 private long kvsSeen = 0;
302 private long kvsSeenInPrevious = 0;
303
304
305
306
307
308
309
310 public SizeMultiWriter(int targetCount, long targetKvs, byte[] left, byte[] right) {
311 super();
312 this.targetCount = targetCount;
313 this.targetKvs = targetKvs;
314 this.left = left;
315 this.right = right;
316 int preallocate = Math.min(this.targetCount, 64);
317 this.existingWriters = new ArrayList<StoreFile.Writer>(preallocate);
318 this.boundaries = new ArrayList<byte[]>(preallocate + 1);
319 }
320
321 @Override
322 public void append(KeyValue kv) throws IOException {
323
324
325 boolean doCreateWriter = false;
326 if (currentWriter == null) {
327
328 sanityCheckLeft(left, kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
329 doCreateWriter = true;
330 } else if (lastRowInCurrentWriter != null
331 && !comparator.matchingRows(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
332 lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) {
333 if (LOG.isDebugEnabled()) {
334 LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter)
335 + "] row; wrote out " + kvsInCurrentWriter + " kvs");
336 }
337 lastRowInCurrentWriter = null;
338 kvsInCurrentWriter = 0;
339 kvsSeenInPrevious += kvsSeen;
340 doCreateWriter = true;
341 }
342 if (doCreateWriter) {
343 byte[] boundary = existingWriters.isEmpty() ? left : kv.getRow();
344 if (LOG.isDebugEnabled()) {
345 LOG.debug("Creating new writer starting at [" + Bytes.toString(boundary) + "]");
346 }
347 currentWriter = writerFactory.createWriter();
348 boundaries.add(boundary);
349 existingWriters.add(currentWriter);
350 }
351
352 currentWriter.append(kv);
353 lastKv = kv;
354 ++kvsInCurrentWriter;
355 kvsSeen = kvsInCurrentWriter;
356 if (this.sourceScanner != null) {
357 kvsSeen = Math.max(kvsSeen,
358 this.sourceScanner.getEstimatedNumberOfKvsScanned() - kvsSeenInPrevious);
359 }
360
361
362
363 if (lastRowInCurrentWriter == null
364 && existingWriters.size() < targetCount
365 && kvsSeen >= targetKvs) {
366 lastRowInCurrentWriter = kv.getRow();
367 if (LOG.isDebugEnabled()) {
368 LOG.debug("Preparing to start a new writer after [" + Bytes.toString(
369 lastRowInCurrentWriter) + "] row; observed " + kvsSeen + " kvs and wrote out "
370 + kvsInCurrentWriter + " kvs");
371 }
372 }
373 }
374
375 @Override
376 protected void commitWritersInternal() throws IOException {
377 if (LOG.isDebugEnabled()) {
378 LOG.debug("Stopping with " + kvsInCurrentWriter + " kvs in last writer" +
379 ((this.sourceScanner == null) ? "" : ("; observed estimated "
380 + this.sourceScanner.getEstimatedNumberOfKvsScanned() + " KVs total")));
381 }
382 if (lastKv != null) {
383 sanityCheckRight(
384 right, lastKv.getRowArray(), lastKv.getRowOffset(), lastKv.getRowLength());
385 }
386
387
388
389 if (existingWriters.isEmpty() && 1 == targetCount) {
390 if (LOG.isDebugEnabled()) {
391 LOG.debug("Merge expired stripes into one, create an empty file to preserve metadata.");
392 }
393 boundaries.add(left);
394 existingWriters.add(writerFactory.createWriter());
395 }
396
397 this.boundaries.add(right);
398 }
399 }
400 }