1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.io.encoding;
18
19 import java.io.DataInputStream;
20 import java.io.DataOutputStream;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23
24 import org.apache.hadoop.classification.InterfaceAudience;
25 import org.apache.hadoop.hbase.KeyValue;
26 import org.apache.hadoop.hbase.KeyValue.KVComparator;
27 import org.apache.hadoop.hbase.util.ByteBufferUtils;
28 import org.apache.hadoop.hbase.util.Bytes;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 @InterfaceAudience.Private
52 public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
53 static final int FLAG_SAME_KEY_LENGTH = 1;
54 static final int FLAG_SAME_VALUE_LENGTH = 1 << 1;
55 static final int FLAG_SAME_TYPE = 1 << 2;
56 static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3;
57 static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6);
58 static final int SHIFT_TIMESTAMP_LENGTH = 4;
59 static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
60
61 protected static class DiffCompressionState extends CompressionState {
62 long timestamp;
63 byte[] familyNameWithSize;
64
65 @Override
66 protected void readTimestamp(ByteBuffer in) {
67 timestamp = in.getLong();
68 }
69
70 @Override
71 void copyFrom(CompressionState state) {
72 super.copyFrom(state);
73 DiffCompressionState state2 = (DiffCompressionState) state;
74 timestamp = state2.timestamp;
75 }
76 }
77
78 private void compressSingleKeyValue(DiffCompressionState previousState,
79 DiffCompressionState currentState, DataOutputStream out,
80 ByteBuffer in) throws IOException {
81 byte flag = 0;
82 int kvPos = in.position();
83 int keyLength = in.getInt();
84 int valueLength = in.getInt();
85
86 long timestamp;
87 long diffTimestamp = 0;
88 int diffTimestampFitsInBytes = 0;
89
90 int commonPrefix;
91
92 int timestampFitsInBytes;
93
94 if (previousState.isFirst()) {
95 currentState.readKey(in, keyLength, valueLength);
96 currentState.prevOffset = kvPos;
97 timestamp = currentState.timestamp;
98 if (timestamp < 0) {
99 flag |= FLAG_TIMESTAMP_SIGN;
100 timestamp = -timestamp;
101 }
102 timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
103
104 flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
105 commonPrefix = 0;
106
107
108 in.mark();
109 ByteBufferUtils.skip(in, currentState.rowLength
110 + KeyValue.ROW_LENGTH_SIZE);
111 ByteBufferUtils.moveBufferToStream(out, in, currentState.familyLength
112 + KeyValue.FAMILY_LENGTH_SIZE);
113 in.reset();
114 } else {
115
116 commonPrefix =
117 ByteBufferUtils.findCommonPrefix(in, in.position(),
118 previousState.prevOffset + KeyValue.ROW_OFFSET, keyLength
119 - KeyValue.TIMESTAMP_TYPE_SIZE);
120
121
122 currentState.readKey(in, keyLength, valueLength,
123 commonPrefix, previousState);
124 currentState.prevOffset = kvPos;
125 timestamp = currentState.timestamp;
126 boolean negativeTimestamp = timestamp < 0;
127 if (negativeTimestamp) {
128 timestamp = -timestamp;
129 }
130 timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
131
132 if (keyLength == previousState.keyLength) {
133 flag |= FLAG_SAME_KEY_LENGTH;
134 }
135 if (valueLength == previousState.valueLength) {
136 flag |= FLAG_SAME_VALUE_LENGTH;
137 }
138 if (currentState.type == previousState.type) {
139 flag |= FLAG_SAME_TYPE;
140 }
141
142
143 diffTimestamp = previousState.timestamp - currentState.timestamp;
144 boolean minusDiffTimestamp = diffTimestamp < 0;
145 if (minusDiffTimestamp) {
146 diffTimestamp = -diffTimestamp;
147 }
148 diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
149 if (diffTimestampFitsInBytes < timestampFitsInBytes) {
150 flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
151 flag |= FLAG_TIMESTAMP_IS_DIFF;
152 if (minusDiffTimestamp) {
153 flag |= FLAG_TIMESTAMP_SIGN;
154 }
155 } else {
156 flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
157 if (negativeTimestamp) {
158 flag |= FLAG_TIMESTAMP_SIGN;
159 }
160 }
161 }
162
163 out.write(flag);
164
165 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
166 ByteBufferUtils.putCompressedInt(out, keyLength);
167 }
168 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
169 ByteBufferUtils.putCompressedInt(out, valueLength);
170 }
171
172 ByteBufferUtils.putCompressedInt(out, commonPrefix);
173 ByteBufferUtils.skip(in, commonPrefix);
174
175 if (previousState.isFirst() ||
176 commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
177 int restRowLength =
178 currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
179 ByteBufferUtils.moveBufferToStream(out, in, restRowLength);
180 ByteBufferUtils.skip(in, currentState.familyLength +
181 KeyValue.FAMILY_LENGTH_SIZE);
182 ByteBufferUtils.moveBufferToStream(out, in, currentState.qualifierLength);
183 } else {
184 ByteBufferUtils.moveBufferToStream(out, in,
185 keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE);
186 }
187
188 if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
189 ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
190 } else {
191 ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
192 }
193
194 if ((flag & FLAG_SAME_TYPE) == 0) {
195 out.write(currentState.type);
196 }
197 ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_TYPE_SIZE);
198
199 ByteBufferUtils.moveBufferToStream(out, in, valueLength);
200 }
201
202 private void uncompressSingleKeyValue(DataInputStream source,
203 ByteBuffer buffer,
204 DiffCompressionState state)
205 throws IOException, EncoderBufferTooSmallException {
206
207 if (state.isFirst()) {
208 state.familyLength = source.readByte();
209 state.familyNameWithSize =
210 new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE];
211 state.familyNameWithSize[0] = state.familyLength;
212 int read = source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE,
213 state.familyLength);
214 assert read == state.familyLength;
215 }
216
217
218 byte flag = source.readByte();
219
220
221 int keyLength;
222 int valueLength;
223 if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
224 keyLength = state.keyLength;
225 } else {
226 keyLength = ByteBufferUtils.readCompressedInt(source);
227 }
228 if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) {
229 valueLength = state.valueLength;
230 } else {
231 valueLength = ByteBufferUtils.readCompressedInt(source);
232 }
233 int commonPrefix = ByteBufferUtils.readCompressedInt(source);
234
235
236 int keyOffset = buffer.position();
237 ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
238 buffer.putInt(keyLength);
239 buffer.putInt(valueLength);
240
241
242 if (commonPrefix > 0) {
243 ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset
244 + KeyValue.ROW_OFFSET, commonPrefix);
245 }
246
247
248 int keyRestLength;
249 if (state.isFirst() || commonPrefix <
250 state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
251
252 short rowLength;
253 int rowRestLength;
254
255
256 if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
257
258 ByteBufferUtils.copyFromStreamToBuffer(buffer, source,
259 KeyValue.ROW_LENGTH_SIZE - commonPrefix);
260 ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE);
261 rowLength = buffer.getShort();
262 rowRestLength = rowLength;
263 } else {
264
265 rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET);
266 rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
267 }
268
269
270 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, rowRestLength);
271 state.rowLength = rowLength;
272
273
274 buffer.put(state.familyNameWithSize);
275
276 keyRestLength = keyLength - rowLength -
277 state.familyNameWithSize.length -
278 (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
279 } else {
280
281 keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
282 }
283
284 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, keyRestLength);
285
286
287 int timestampFitsInBytes =
288 ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
289 long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes);
290 if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
291 timestamp = -timestamp;
292 }
293 if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) {
294 timestamp = state.timestamp - timestamp;
295 }
296 buffer.putLong(timestamp);
297
298
299 byte type;
300 if ((flag & FLAG_SAME_TYPE) != 0) {
301 type = state.type;
302 } else {
303 type = source.readByte();
304 }
305 buffer.put(type);
306
307
308 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength);
309
310 state.keyLength = keyLength;
311 state.valueLength = valueLength;
312 state.prevOffset = keyOffset;
313 state.timestamp = timestamp;
314 state.type = type;
315
316 }
317
318 @Override
319 public void internalEncodeKeyValues(DataOutputStream out,
320 ByteBuffer in, HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
321 in.rewind();
322 ByteBufferUtils.putInt(out, in.limit());
323 DiffCompressionState previousState = new DiffCompressionState();
324 DiffCompressionState currentState = new DiffCompressionState();
325 while (in.hasRemaining()) {
326 compressSingleKeyValue(previousState, currentState,
327 out, in);
328 afterEncodingKeyValue(in, out, encodingCtx);
329
330
331 DiffCompressionState tmp = previousState;
332 previousState = currentState;
333 currentState = tmp;
334 }
335 }
336
337
338 @Override
339 public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
340 block.mark();
341 block.position(Bytes.SIZEOF_INT);
342 byte familyLength = block.get();
343 ByteBufferUtils.skip(block, familyLength);
344 byte flag = block.get();
345 int keyLength = ByteBufferUtils.readCompressedInt(block);
346 ByteBufferUtils.readCompressedInt(block);
347 ByteBufferUtils.readCompressedInt(block);
348 ByteBuffer result = ByteBuffer.allocate(keyLength);
349
350
351 int pos = result.arrayOffset();
352 block.get(result.array(), pos, Bytes.SIZEOF_SHORT);
353 pos += Bytes.SIZEOF_SHORT;
354 short rowLength = result.getShort();
355 block.get(result.array(), pos, rowLength);
356 pos += rowLength;
357
358
359 int savePosition = block.position();
360 block.position(Bytes.SIZEOF_INT);
361 block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE);
362 pos += familyLength + Bytes.SIZEOF_BYTE;
363
364
365 block.position(savePosition);
366 int qualifierLength =
367 keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE;
368 block.get(result.array(), pos, qualifierLength);
369 pos += qualifierLength;
370
371
372 int timestampFitInBytes =
373 ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
374 long timestamp = ByteBufferUtils.readLong(block, timestampFitInBytes);
375 if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
376 timestamp = -timestamp;
377 }
378 result.putLong(pos, timestamp);
379 pos += Bytes.SIZEOF_LONG;
380 block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
381
382 block.reset();
383 return result;
384 }
385
386 @Override
387 public String toString() {
388 return DiffKeyDeltaEncoder.class.getSimpleName();
389 }
390
391 protected static class DiffSeekerState extends SeekerState {
392 private int rowLengthWithSize;
393 private long timestamp;
394
395 @Override
396 protected void copyFromNext(SeekerState that) {
397 super.copyFromNext(that);
398 DiffSeekerState other = (DiffSeekerState) that;
399 rowLengthWithSize = other.rowLengthWithSize;
400 timestamp = other.timestamp;
401 }
402 }
403
404 @Override
405 public EncodedSeeker createSeeker(KVComparator comparator,
406 HFileBlockDecodingContext decodingCtx) {
407 return new BufferedEncodedSeeker<DiffSeekerState>(comparator, decodingCtx) {
408 private byte[] familyNameWithSize;
409 private static final int TIMESTAMP_WITH_TYPE_LENGTH =
410 Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
411
412 private void decode(boolean isFirst) {
413 byte flag = currentBuffer.get();
414 byte type = 0;
415 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
416 if (!isFirst) {
417 type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
418 }
419 current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
420 }
421 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
422 current.valueLength =
423 ByteBufferUtils.readCompressedInt(currentBuffer);
424 }
425 current.lastCommonPrefix =
426 ByteBufferUtils.readCompressedInt(currentBuffer);
427
428 current.ensureSpaceForKey();
429
430 if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
431
432
433
434 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
435 Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
436 current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
437 Bytes.SIZEOF_SHORT;
438
439
440 currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
441 current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
442
443
444 System.arraycopy(familyNameWithSize, 0, current.keyBuffer,
445 current.rowLengthWithSize, familyNameWithSize.length);
446
447
448 currentBuffer.get(current.keyBuffer,
449 current.rowLengthWithSize + familyNameWithSize.length,
450 current.keyLength - current.rowLengthWithSize -
451 familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
452 } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
453
454
455
456
457 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
458 current.rowLengthWithSize - current.lastCommonPrefix);
459
460
461 currentBuffer.get(current.keyBuffer,
462 current.rowLengthWithSize + familyNameWithSize.length,
463 current.keyLength - current.rowLengthWithSize -
464 familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
465 } else {
466
467 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
468 current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH -
469 current.lastCommonPrefix);
470 }
471
472
473 int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
474 int timestampFitInBytes = 1 +
475 ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
476 long timestampOrDiff =
477 ByteBufferUtils.readLong(currentBuffer, timestampFitInBytes);
478 if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
479 timestampOrDiff = -timestampOrDiff;
480 }
481 if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
482 current.timestamp = timestampOrDiff;
483 } else {
484 current.timestamp = current.timestamp - timestampOrDiff;
485 }
486 Bytes.putLong(current.keyBuffer, pos, current.timestamp);
487 pos += Bytes.SIZEOF_LONG;
488
489
490 if ((flag & FLAG_SAME_TYPE) == 0) {
491 currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
492 } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
493 current.keyBuffer[pos] = type;
494 }
495
496 current.valueOffset = currentBuffer.position();
497 ByteBufferUtils.skip(currentBuffer, current.valueLength);
498
499 if (includesTags()) {
500 decodeTags();
501 }
502 if (includesMvcc()) {
503 current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
504 } else {
505 current.memstoreTS = 0;
506 }
507 current.nextKvOffset = currentBuffer.position();
508 }
509
510 @Override
511 protected void decodeFirst() {
512 ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
513
514
515 byte familyNameLength = currentBuffer.get();
516 familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE];
517 familyNameWithSize[0] = familyNameLength;
518 currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE,
519 familyNameLength);
520 decode(true);
521 }
522
523 @Override
524 protected void decodeNext() {
525 decode(false);
526 }
527
528 @Override
529 protected DiffSeekerState createSeekerState() {
530 return new DiffSeekerState();
531 }
532 };
533 }
534
535 @Override
536 protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
537 int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
538 int decompressedSize = source.readInt();
539 ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
540 allocateHeaderLength);
541 buffer.position(allocateHeaderLength);
542 DiffCompressionState state = new DiffCompressionState();
543 while (source.available() > skipLastBytes) {
544 uncompressSingleKeyValue(source, buffer, state);
545 afterDecodingKeyValue(source, buffer, decodingCtx);
546 }
547
548 if (source.available() != skipLastBytes) {
549 throw new IllegalStateException("Read too much bytes.");
550 }
551
552 return buffer;
553 }
554 }