1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.io.encoding;
18
19 import java.io.DataInputStream;
20 import java.io.DataOutputStream;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23
24 import org.apache.hadoop.classification.InterfaceAudience;
25 import org.apache.hadoop.hbase.HConstants;
26 import org.apache.hadoop.hbase.KeyValue;
27 import org.apache.hadoop.hbase.KeyValue.KVComparator;
28 import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
29 import org.apache.hadoop.hbase.io.TagCompressionContext;
30 import org.apache.hadoop.hbase.io.hfile.BlockType;
31 import org.apache.hadoop.hbase.io.hfile.HFileContext;
32 import org.apache.hadoop.hbase.io.util.LRUDictionary;
33 import org.apache.hadoop.hbase.util.ByteBufferUtils;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.apache.hadoop.io.WritableUtils;
36
37
38
39
40 @InterfaceAudience.Private
41 abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
42
43 private static int INITIAL_KEY_BUFFER_SIZE = 512;
44
45 @Override
46 public ByteBuffer decodeKeyValues(DataInputStream source,
47 HFileBlockDecodingContext blkDecodingCtx) throws IOException {
48 if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
49 throw new IOException(this.getClass().getName() + " only accepts "
50 + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
51 }
52
53 HFileBlockDefaultDecodingContext decodingCtx =
54 (HFileBlockDefaultDecodingContext) blkDecodingCtx;
55 if (decodingCtx.getHFileContext().isIncludesTags()
56 && decodingCtx.getHFileContext().isCompressTags()) {
57 if (decodingCtx.getTagCompressionContext() != null) {
58
59
60 decodingCtx.getTagCompressionContext().clear();
61 } else {
62 try {
63 TagCompressionContext tagCompressionContext = new TagCompressionContext(
64 LRUDictionary.class, Byte.MAX_VALUE);
65 decodingCtx.setTagCompressionContext(tagCompressionContext);
66 } catch (Exception e) {
67 throw new IOException("Failed to initialize TagCompressionContext", e);
68 }
69 }
70 }
71 return internalDecodeKeyValues(source, 0, 0, decodingCtx);
72 }
73
74 protected static class SeekerState {
75 protected int valueOffset = -1;
76 protected int keyLength;
77 protected int valueLength;
78 protected int lastCommonPrefix;
79 protected int tagsLength = 0;
80 protected int tagsOffset = -1;
81 protected int tagsCompressedLength = 0;
82 protected boolean uncompressTags = true;
83
84
85 protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
86 protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
87
88 protected long memstoreTS;
89 protected int nextKvOffset;
90
91 protected boolean isValid() {
92 return valueOffset != -1;
93 }
94
95 protected void invalidate() {
96 valueOffset = -1;
97 tagsCompressedLength = 0;
98 uncompressTags = true;
99 }
100
101 protected void ensureSpaceForKey() {
102 if (keyLength > keyBuffer.length) {
103
104 int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
105 while (keyLength > newKeyBufferLength) {
106 newKeyBufferLength *= 2;
107 }
108 byte[] newKeyBuffer = new byte[newKeyBufferLength];
109 System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
110 keyBuffer = newKeyBuffer;
111 }
112 }
113
114 protected void ensureSpaceForTags() {
115 if (tagsLength > tagsBuffer.length) {
116
117 int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
118 while (tagsLength > newTagsBufferLength) {
119 newTagsBufferLength *= 2;
120 }
121 byte[] newTagsBuffer = new byte[newTagsBufferLength];
122 System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
123 tagsBuffer = newTagsBuffer;
124 }
125 }
126
127
128
129
130
131
132 protected void copyFromNext(SeekerState nextState) {
133 if (keyBuffer.length != nextState.keyBuffer.length) {
134 keyBuffer = nextState.keyBuffer.clone();
135 } else if (!isValid()) {
136
137
138 System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
139 nextState.keyLength);
140 } else {
141
142 System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
143 keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
144 - nextState.lastCommonPrefix);
145 }
146
147 valueOffset = nextState.valueOffset;
148 keyLength = nextState.keyLength;
149 valueLength = nextState.valueLength;
150 lastCommonPrefix = nextState.lastCommonPrefix;
151 nextKvOffset = nextState.nextKvOffset;
152 memstoreTS = nextState.memstoreTS;
153 }
154
155 }
156
157 protected abstract static class
158 BufferedEncodedSeeker<STATE extends SeekerState>
159 implements EncodedSeeker {
160 protected HFileBlockDecodingContext decodingCtx;
161 protected final KVComparator comparator;
162 protected final SamePrefixComparator<byte[]> samePrefixComparator;
163 protected ByteBuffer currentBuffer;
164 protected STATE current = createSeekerState();
165 protected STATE previous = createSeekerState();
166 protected TagCompressionContext tagCompressionContext = null;
167
168 public BufferedEncodedSeeker(KVComparator comparator,
169 HFileBlockDecodingContext decodingCtx) {
170 this.comparator = comparator;
171 this.samePrefixComparator = comparator;
172 this.decodingCtx = decodingCtx;
173 if (decodingCtx.getHFileContext().isCompressTags()) {
174 try {
175 tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
176 } catch (Exception e) {
177 throw new RuntimeException("Failed to initialize TagCompressionContext", e);
178 }
179 }
180 }
181
182 protected boolean includesMvcc() {
183 return this.decodingCtx.getHFileContext().isIncludesMvcc();
184 }
185
186 protected boolean includesTags() {
187 return this.decodingCtx.getHFileContext().isIncludesTags();
188 }
189
190 @Override
191 public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
192 return comparator.compareFlatKey(key, offset, length,
193 current.keyBuffer, 0, current.keyLength);
194 }
195
196 @Override
197 public void setCurrentBuffer(ByteBuffer buffer) {
198 if (this.tagCompressionContext != null) {
199 this.tagCompressionContext.clear();
200 }
201 currentBuffer = buffer;
202 decodeFirst();
203 previous.invalidate();
204 }
205
206 @Override
207 public ByteBuffer getKeyDeepCopy() {
208 ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
209 keyBuffer.put(current.keyBuffer, 0, current.keyLength);
210 return keyBuffer;
211 }
212
213 @Override
214 public ByteBuffer getValueShallowCopy() {
215 return ByteBuffer.wrap(currentBuffer.array(),
216 currentBuffer.arrayOffset() + current.valueOffset,
217 current.valueLength);
218 }
219
220 @Override
221 public ByteBuffer getKeyValueBuffer() {
222 ByteBuffer kvBuffer = createKVBuffer();
223 kvBuffer.putInt(current.keyLength);
224 kvBuffer.putInt(current.valueLength);
225 kvBuffer.put(current.keyBuffer, 0, current.keyLength);
226 kvBuffer.put(currentBuffer.array(),
227 currentBuffer.arrayOffset() + current.valueOffset,
228 current.valueLength);
229 if (current.tagsLength > 0) {
230
231 kvBuffer.put((byte)(current.tagsLength >> 8 & 0xff));
232 kvBuffer.put((byte)(current.tagsLength & 0xff));
233 if (current.tagsOffset != -1) {
234
235
236 kvBuffer.put(currentBuffer.array(), currentBuffer.arrayOffset() + current.tagsOffset,
237 current.tagsLength);
238 } else {
239
240
241 kvBuffer.put(current.tagsBuffer, 0, current.tagsLength);
242 }
243 }
244 return kvBuffer;
245 }
246
247 protected ByteBuffer createKVBuffer() {
248 int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(current.keyLength,
249 current.valueLength, current.tagsLength);
250 ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
251 return kvBuffer;
252 }
253
254 @Override
255 public KeyValue getKeyValue() {
256 ByteBuffer kvBuf = getKeyValueBuffer();
257 KeyValue kv = new KeyValue(kvBuf.array(), kvBuf.arrayOffset(), kvBuf.array().length
258 - kvBuf.arrayOffset());
259 kv.setMvccVersion(current.memstoreTS);
260 return kv;
261 }
262
263 @Override
264 public void rewind() {
265 currentBuffer.rewind();
266 if (tagCompressionContext != null) {
267 tagCompressionContext.clear();
268 }
269 decodeFirst();
270 previous.invalidate();
271 }
272
273 @Override
274 public boolean next() {
275 if (!currentBuffer.hasRemaining()) {
276 return false;
277 }
278 decodeNext();
279 previous.invalidate();
280 return true;
281 }
282
283 protected void decodeTags() {
284 current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
285 if (tagCompressionContext != null) {
286 if (current.uncompressTags) {
287
288 current.ensureSpaceForTags();
289 try {
290 current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
291 current.tagsBuffer, 0, current.tagsLength);
292 } catch (IOException e) {
293 throw new RuntimeException("Exception while uncompressing tags", e);
294 }
295 } else {
296 ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
297 current.uncompressTags = true;
298 }
299 current.tagsOffset = -1;
300 } else {
301
302
303 current.tagsOffset = currentBuffer.position();
304 ByteBufferUtils.skip(currentBuffer, current.tagsLength);
305 }
306 }
307
308 @Override
309 public int seekToKeyInBlock(byte[] key, int offset, int length,
310 boolean seekBefore) {
311 int commonPrefix = 0;
312 previous.invalidate();
313 do {
314 int comp;
315 if (samePrefixComparator != null) {
316 commonPrefix = Math.min(commonPrefix, current.lastCommonPrefix);
317
318
319 commonPrefix += ByteBufferUtils.findCommonPrefix(
320 key, offset + commonPrefix, length - commonPrefix,
321 current.keyBuffer, commonPrefix,
322 current.keyLength - commonPrefix);
323
324 comp = samePrefixComparator.compareIgnoringPrefix(commonPrefix, key,
325 offset, length, current.keyBuffer, 0, current.keyLength);
326 } else {
327 comp = comparator.compareFlatKey(key, offset, length,
328 current.keyBuffer, 0, current.keyLength);
329 }
330
331 if (comp == 0) {
332 if (seekBefore) {
333 if (!previous.isValid()) {
334
335
336 throw new IllegalStateException("Cannot seekBefore if " +
337 "positioned at the first key in the block: key=" +
338 Bytes.toStringBinary(key, offset, length));
339 }
340 moveToPrevious();
341 return 1;
342 }
343 return 0;
344 }
345
346 if (comp < 0) {
347 if (previous.isValid()) {
348 moveToPrevious();
349 } else {
350 return HConstants.INDEX_KEY_MAGIC;
351 }
352 return 1;
353 }
354
355
356 if (currentBuffer.hasRemaining()) {
357 previous.copyFromNext(current);
358 decodeNext();
359 } else {
360 break;
361 }
362 } while (true);
363
364
365 return 1;
366 }
367
368 private void moveToPrevious() {
369 if (!previous.isValid()) {
370 throw new IllegalStateException(
371 "Can move back only once and not in first key in the block.");
372 }
373
374 STATE tmp = previous;
375 previous = current;
376 current = tmp;
377
378
379 currentBuffer.position(current.nextKvOffset);
380
381
382
383
384
385
386 current.tagsBuffer = previous.tagsBuffer;
387 current.tagsCompressedLength = previous.tagsCompressedLength;
388 current.uncompressTags = false;
389 previous.invalidate();
390 }
391
392 @SuppressWarnings("unchecked")
393 protected STATE createSeekerState() {
394
395
396 return (STATE) new SeekerState();
397 }
398
399 abstract protected void decodeFirst();
400 abstract protected void decodeNext();
401 }
402
403 protected final void afterEncodingKeyValue(ByteBuffer in,
404 DataOutputStream out, HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
405 if (encodingCtx.getHFileContext().isIncludesTags()) {
406
407 int tagsLength = ((in.get() & 0xff) << 8) ^ (in.get() & 0xff);
408 ByteBufferUtils.putCompressedInt(out, tagsLength);
409
410 if (tagsLength > 0) {
411 TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
412
413
414 if (tagCompressionContext != null) {
415 tagCompressionContext.compressTags(out, in, tagsLength);
416 } else {
417 ByteBufferUtils.moveBufferToStream(out, in, tagsLength);
418 }
419 }
420 }
421 if (encodingCtx.getHFileContext().isIncludesMvcc()) {
422
423 long memstoreTS = -1;
424 try {
425 memstoreTS = ByteBufferUtils.readVLong(in);
426 WritableUtils.writeVLong(out, memstoreTS);
427 } catch (IOException ex) {
428 throw new RuntimeException("Unable to copy memstore timestamp " +
429 memstoreTS + " after encoding a key/value");
430 }
431 }
432 }
433
434 protected final void afterDecodingKeyValue(DataInputStream source,
435 ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
436 if (decodingCtx.getHFileContext().isIncludesTags()) {
437 int tagsLength = ByteBufferUtils.readCompressedInt(source);
438
439 dest.put((byte)((tagsLength >> 8) & 0xff));
440 dest.put((byte)(tagsLength & 0xff));
441 if (tagsLength > 0) {
442 TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
443
444
445 if (tagCompressionContext != null) {
446 tagCompressionContext.uncompressTags(source, dest, tagsLength);
447 } else {
448 ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
449 }
450 }
451 }
452 if (decodingCtx.getHFileContext().isIncludesMvcc()) {
453 long memstoreTS = -1;
454 try {
455
456
457 memstoreTS = WritableUtils.readVLong(source);
458 ByteBufferUtils.writeVLong(dest, memstoreTS);
459 } catch (IOException ex) {
460 throw new RuntimeException("Unable to copy memstore timestamp " +
461 memstoreTS + " after decoding a key/value");
462 }
463 }
464 }
465
466 @Override
467 public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
468 byte[] header, HFileContext meta) {
469 return new HFileBlockDefaultEncodingContext(encoding, header, meta);
470 }
471
472 @Override
473 public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
474 return new HFileBlockDefaultDecodingContext(meta);
475 }
476
477
478
479
480
481
482
483
484 public abstract void internalEncodeKeyValues(DataOutputStream out,
485 ByteBuffer in, HFileBlockDefaultEncodingContext encodingCtx) throws IOException;
486
487 protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
488 int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
489 throws IOException;
490
491 @Override
492 public void encodeKeyValues(ByteBuffer in,
493 HFileBlockEncodingContext blkEncodingCtx) throws IOException {
494 if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
495 throw new IOException (this.getClass().getName() + " only accepts "
496 + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
497 "encoding context.");
498 }
499
500 HFileBlockDefaultEncodingContext encodingCtx =
501 (HFileBlockDefaultEncodingContext) blkEncodingCtx;
502 encodingCtx.prepareEncoding();
503 DataOutputStream dataOut = encodingCtx.getOutputStreamForEncoder();
504 if (encodingCtx.getHFileContext().isIncludesTags()
505 && encodingCtx.getHFileContext().isCompressTags()) {
506 if (encodingCtx.getTagCompressionContext() != null) {
507
508
509 encodingCtx.getTagCompressionContext().clear();
510 } else {
511 try {
512 TagCompressionContext tagCompressionContext = new TagCompressionContext(
513 LRUDictionary.class, Byte.MAX_VALUE);
514 encodingCtx.setTagCompressionContext(tagCompressionContext);
515 } catch (Exception e) {
516 throw new IOException("Failed to initialize TagCompressionContext", e);
517 }
518 }
519 }
520 internalEncodeKeyValues(dataOut, in, encodingCtx);
521 if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
522 encodingCtx.postEncoding(BlockType.ENCODED_DATA);
523 } else {
524 encodingCtx.postEncoding(BlockType.DATA);
525 }
526 }
527
528
529
530
531
532
533
534
535 protected static void ensureSpace(ByteBuffer out, int length)
536 throws EncoderBufferTooSmallException {
537 if (out.position() + length > out.limit()) {
538 throw new EncoderBufferTooSmallException(
539 "Buffer position=" + out.position() +
540 ", buffer limit=" + out.limit() +
541 ", length to be written=" + length);
542 }
543 }
544
545 }