View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.io.OutputStream;
23  import java.nio.ByteBuffer;
24  
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.KeyValue;
27  import org.apache.hadoop.hbase.KeyValue.KVComparator;
28  import org.apache.hadoop.hbase.util.ByteBufferUtils;
29  import org.apache.hadoop.hbase.util.Bytes;
30  
31  /**
32   * Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster.
33   *
34   * Compress using:
35   * - store size of common prefix
36   * - save column family once in the first KeyValue
37   * - use integer compression for key, value and prefix (7-bit encoding)
38   * - use bits to avoid duplication key length, value length
39   *   and type if it same as previous
40   * - store in 3 bits length of prefix timestamp
41   *    with previous KeyValue's timestamp
42   * - one bit which allow to omit value if it is the same
43   *
44   * Format:
45   * - 1 byte:    flag
46   * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
47   * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
48   * - 1-5 bytes: prefix length
49   * - ... bytes: rest of the row (if prefix length is small enough)
50   * - ... bytes: qualifier (or suffix depending on prefix length)
51   * - 1-8 bytes: timestamp suffix
52   * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
53   * - ... bytes: value (only if FLAG_SAME_VALUE is not set in the flag)
54   *
55   */
56  @InterfaceAudience.Private
57  public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
58    final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2);
59    final int SHIFT_TIMESTAMP_LENGTH = 0;
60    final int FLAG_SAME_KEY_LENGTH = 1 << 3;
61    final int FLAG_SAME_VALUE_LENGTH = 1 << 4;
62    final int FLAG_SAME_TYPE = 1 << 5;
63    final int FLAG_SAME_VALUE = 1 << 6;
64  
65    private static class FastDiffCompressionState extends CompressionState {
66      byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE];
67      int prevTimestampOffset;
68  
69      @Override
70      protected void readTimestamp(ByteBuffer in) {
71        in.get(timestamp);
72      }
73  
74      @Override
75      void copyFrom(CompressionState state) {
76        super.copyFrom(state);
77        FastDiffCompressionState state2 = (FastDiffCompressionState) state;
78        System.arraycopy(state2.timestamp, 0, timestamp, 0,
79            KeyValue.TIMESTAMP_SIZE);
80        prevTimestampOffset = state2.prevTimestampOffset;
81      }
82  
83      /**
84       * Copies the first key/value from the given stream, and initializes
85       * decompression state based on it. Assumes that we have already read key
86       * and value lengths. Does not set {@link #qualifierLength} (not used by
87       * decompression) or {@link #prevOffset} (set by the calle afterwards).
88       */
89      private void decompressFirstKV(ByteBuffer out, DataInputStream in)
90          throws IOException {
91        int kvPos = out.position();
92        out.putInt(keyLength);
93        out.putInt(valueLength);
94        prevTimestampOffset = out.position() + keyLength -
95            KeyValue.TIMESTAMP_TYPE_SIZE;
96        ByteBufferUtils.copyFromStreamToBuffer(out, in, keyLength + valueLength);
97        rowLength = out.getShort(kvPos + KeyValue.ROW_OFFSET);
98        familyLength = out.get(kvPos + KeyValue.ROW_OFFSET +
99            KeyValue.ROW_LENGTH_SIZE + rowLength);
100       type = out.get(prevTimestampOffset + KeyValue.TIMESTAMP_SIZE);
101     }
102 
103   }
104 
105   private void compressSingleKeyValue(
106         FastDiffCompressionState previousState,
107         FastDiffCompressionState currentState,
108         OutputStream out, ByteBuffer in) throws IOException {
109     currentState.prevOffset = in.position();
110     int keyLength = in.getInt();
111     int valueOffset =
112         currentState.prevOffset + keyLength + KeyValue.ROW_OFFSET;
113     int valueLength = in.getInt();
114     byte flag = 0;
115 
116     if (previousState.isFirst()) {
117       // copy the key, there is no common prefix with none
118       out.write(flag);
119       ByteBufferUtils.putCompressedInt(out, keyLength);
120       ByteBufferUtils.putCompressedInt(out, valueLength);
121       ByteBufferUtils.putCompressedInt(out, 0);
122 
123       currentState.readKey(in, keyLength, valueLength);
124 
125       ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
126     } else {
127       // find a common prefix and skip it
128       int commonPrefix = ByteBufferUtils.findCommonPrefix(in, in.position(),
129           previousState.prevOffset + KeyValue.ROW_OFFSET,
130           Math.min(keyLength, previousState.keyLength) -
131           KeyValue.TIMESTAMP_TYPE_SIZE);
132 
133       currentState.readKey(in, keyLength, valueLength,
134           commonPrefix, previousState);
135 
136       if (keyLength == previousState.keyLength) {
137         flag |= FLAG_SAME_KEY_LENGTH;
138       }
139       if (valueLength == previousState.valueLength) {
140         flag |= FLAG_SAME_VALUE_LENGTH;
141       }
142       if (currentState.type == previousState.type) {
143         flag |= FLAG_SAME_TYPE;
144       }
145 
146       int commonTimestampPrefix = findCommonTimestampPrefix(
147           currentState, previousState);
148       flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
149 
150       // Check if current and previous values are the same. Compare value
151       // length first as an optimization.
152       if (valueLength == previousState.valueLength) {
153         int previousValueOffset = previousState.prevOffset
154             + previousState.keyLength + KeyValue.ROW_OFFSET;
155         if (ByteBufferUtils.arePartsEqual(in,
156                 previousValueOffset, previousState.valueLength,
157                 valueOffset, valueLength)) {
158           flag |= FLAG_SAME_VALUE;
159         }
160       }
161 
162       out.write(flag);
163       if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
164         ByteBufferUtils.putCompressedInt(out, keyLength);
165       }
166       if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
167         ByteBufferUtils.putCompressedInt(out, valueLength);
168       }
169       ByteBufferUtils.putCompressedInt(out, commonPrefix);
170 
171       ByteBufferUtils.skip(in, commonPrefix);
172       if (commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
173         // Previous and current rows are different. Copy the differing part of
174         // the row, skip the column family, and copy the qualifier.
175         ByteBufferUtils.moveBufferToStream(out, in,
176             currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix);
177         ByteBufferUtils.skip(in, currentState.familyLength +
178             KeyValue.FAMILY_LENGTH_SIZE);
179         ByteBufferUtils.moveBufferToStream(out, in,
180             currentState.qualifierLength);
181       } else {
182         // The common part includes the whole row. As the column family is the
183         // same across the whole file, it will automatically be included in the
184         // common prefix, so we need not special-case it here.
185         int restKeyLength = keyLength - commonPrefix -
186             KeyValue.TIMESTAMP_TYPE_SIZE;
187         ByteBufferUtils.moveBufferToStream(out, in, restKeyLength);
188       }
189       ByteBufferUtils.skip(in, commonTimestampPrefix);
190       ByteBufferUtils.moveBufferToStream(out, in,
191           KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
192 
193       // Write the type if it is not the same as before.
194       if ((flag & FLAG_SAME_TYPE) == 0) {
195         out.write(currentState.type);
196       }
197 
198       // Write the value if it is not the same as before.
199       if ((flag & FLAG_SAME_VALUE) == 0) {
200         ByteBufferUtils.copyBufferToStream(out, in, valueOffset, valueLength);
201       }
202 
203       // Skip key type and value in the input buffer.
204       ByteBufferUtils.skip(in, KeyValue.TYPE_SIZE + currentState.valueLength);
205     }
206   }
207 
208   private int findCommonTimestampPrefix(FastDiffCompressionState left,
209       FastDiffCompressionState right) {
210     int prefixTimestamp = 0;
211     while (prefixTimestamp < (KeyValue.TIMESTAMP_SIZE - 1) &&
212         left.timestamp[prefixTimestamp]
213             == right.timestamp[prefixTimestamp]) {
214       prefixTimestamp++;
215     }
216     return prefixTimestamp; // has to be at most 7 bytes
217   }
218 
219   private void uncompressSingleKeyValue(DataInputStream source,
220       ByteBuffer out, FastDiffCompressionState state)
221           throws IOException, EncoderBufferTooSmallException {
222     byte flag = source.readByte();
223     int prevKeyLength = state.keyLength;
224 
225     if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
226       state.keyLength = ByteBufferUtils.readCompressedInt(source);
227     }
228     if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
229       state.valueLength = ByteBufferUtils.readCompressedInt(source);
230     }
231     int commonLength = ByteBufferUtils.readCompressedInt(source);
232 
233     ensureSpace(out, state.keyLength + state.valueLength + KeyValue.ROW_OFFSET);
234 
235     int kvPos = out.position();
236 
237     if (!state.isFirst()) {
238       // copy the prefix
239       int common;
240       int prevOffset;
241 
242       if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
243         out.putInt(state.keyLength);
244         out.putInt(state.valueLength);
245         prevOffset = state.prevOffset + KeyValue.ROW_OFFSET;
246         common = commonLength;
247       } else {
248         if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
249           prevOffset = state.prevOffset;
250           common = commonLength + KeyValue.ROW_OFFSET;
251         } else {
252           out.putInt(state.keyLength);
253           prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE;
254           common = commonLength + KeyValue.KEY_LENGTH_SIZE;
255         }
256       }
257 
258       ByteBufferUtils.copyFromBufferToBuffer(out, out, prevOffset, common);
259 
260       // copy the rest of the key from the buffer
261       int keyRestLength;
262       if (commonLength < state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
263         // omit the family part of the key, it is always the same
264         int rowWithSizeLength;
265         int rowRestLength;
266 
267         // check length of row
268         if (commonLength < KeyValue.ROW_LENGTH_SIZE) {
269           // not yet copied, do it now
270           ByteBufferUtils.copyFromStreamToBuffer(out, source,
271               KeyValue.ROW_LENGTH_SIZE - commonLength);
272 
273           rowWithSizeLength = out.getShort(out.position() -
274               KeyValue.ROW_LENGTH_SIZE) + KeyValue.ROW_LENGTH_SIZE;
275           rowRestLength = rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE;
276         } else {
277           // already in kvBuffer, just read it
278           rowWithSizeLength = out.getShort(kvPos + KeyValue.ROW_OFFSET) +
279               KeyValue.ROW_LENGTH_SIZE;
280           rowRestLength = rowWithSizeLength - commonLength;
281         }
282 
283         // copy the rest of row
284         ByteBufferUtils.copyFromStreamToBuffer(out, source, rowRestLength);
285 
286         // copy the column family
287         ByteBufferUtils.copyFromBufferToBuffer(out, out,
288             state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE
289                 + state.rowLength, state.familyLength
290                 + KeyValue.FAMILY_LENGTH_SIZE);
291         state.rowLength = (short) (rowWithSizeLength -
292             KeyValue.ROW_LENGTH_SIZE);
293 
294         keyRestLength = state.keyLength - rowWithSizeLength -
295             state.familyLength -
296             (KeyValue.FAMILY_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
297       } else {
298         // prevRowWithSizeLength is the same as on previous row
299         keyRestLength = state.keyLength - commonLength -
300             KeyValue.TIMESTAMP_TYPE_SIZE;
301       }
302       // copy the rest of the key, after column family == column qualifier
303       ByteBufferUtils.copyFromStreamToBuffer(out, source, keyRestLength);
304 
305       // copy timestamp
306       int prefixTimestamp =
307           (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH;
308       ByteBufferUtils.copyFromBufferToBuffer(out, out,
309           state.prevTimestampOffset, prefixTimestamp);
310       state.prevTimestampOffset = out.position() - prefixTimestamp;
311       ByteBufferUtils.copyFromStreamToBuffer(out, source,
312           KeyValue.TIMESTAMP_SIZE - prefixTimestamp);
313 
314       // copy the type and value
315       if ((flag & FLAG_SAME_TYPE) != 0) {
316         out.put(state.type);
317         if ((flag & FLAG_SAME_VALUE) != 0) {
318           ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
319               KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
320         } else {
321           ByteBufferUtils.copyFromStreamToBuffer(out, source,
322               state.valueLength);
323         }
324       } else {
325         if ((flag & FLAG_SAME_VALUE) != 0) {
326           ByteBufferUtils.copyFromStreamToBuffer(out, source,
327               KeyValue.TYPE_SIZE);
328           ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
329               KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
330         } else {
331           ByteBufferUtils.copyFromStreamToBuffer(out, source,
332               state.valueLength + KeyValue.TYPE_SIZE);
333         }
334         state.type = out.get(state.prevTimestampOffset +
335             KeyValue.TIMESTAMP_SIZE);
336       }
337     } else { // this is the first element
338       state.decompressFirstKV(out, source);
339     }
340 
341     state.prevOffset = kvPos;
342   }
343 
344   @Override
345   public void internalEncodeKeyValues(DataOutputStream out, ByteBuffer in,
346       HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
347     in.rewind();
348     ByteBufferUtils.putInt(out, in.limit());
349     FastDiffCompressionState previousState = new FastDiffCompressionState();
350     FastDiffCompressionState currentState = new FastDiffCompressionState();
351     while (in.hasRemaining()) {
352       compressSingleKeyValue(previousState, currentState,
353           out, in);
354       afterEncodingKeyValue(in, out, encodingCtx);
355 
356       // swap previousState <-> currentState
357       FastDiffCompressionState tmp = previousState;
358       previousState = currentState;
359       currentState = tmp;
360     }
361   }
362 
363   @Override
364   protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
365       int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
366     int decompressedSize = source.readInt();
367     ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
368         allocateHeaderLength);
369     buffer.position(allocateHeaderLength);
370     FastDiffCompressionState state = new FastDiffCompressionState();
371     while (source.available() > skipLastBytes) {
372       uncompressSingleKeyValue(source, buffer, state);
373       afterDecodingKeyValue(source, buffer, decodingCtx);
374     }
375 
376     if (source.available() != skipLastBytes) {
377       throw new IllegalStateException("Read too much bytes.");
378     }
379 
380     return buffer;
381   }
382 
383   @Override
384   public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
385     block.mark();
386     block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
387     int keyLength = ByteBufferUtils.readCompressedInt(block);
388     ByteBufferUtils.readCompressedInt(block); // valueLength
389     ByteBufferUtils.readCompressedInt(block); // commonLength
390     int pos = block.position();
391     block.reset();
392     return ByteBuffer.wrap(block.array(), block.arrayOffset() + pos, keyLength)
393         .slice();
394   }
395 
396   @Override
397   public String toString() {
398     return FastDiffDeltaEncoder.class.getSimpleName();
399   }
400 
401   protected static class FastDiffSeekerState extends SeekerState {
402     private byte[] prevTimestampAndType =
403         new byte[KeyValue.TIMESTAMP_TYPE_SIZE];
404     private int rowLengthWithSize;
405     private int familyLengthWithSize;
406 
407     @Override
408     protected void copyFromNext(SeekerState that) {
409       super.copyFromNext(that);
410       FastDiffSeekerState other = (FastDiffSeekerState) that;
411       System.arraycopy(other.prevTimestampAndType, 0,
412           prevTimestampAndType, 0,
413           KeyValue.TIMESTAMP_TYPE_SIZE);
414       rowLengthWithSize = other.rowLengthWithSize;
415       familyLengthWithSize = other.familyLengthWithSize;
416     }
417   }
418 
419   @Override
420   public EncodedSeeker createSeeker(KVComparator comparator,
421       final HFileBlockDecodingContext decodingCtx) {
422     return new BufferedEncodedSeeker<FastDiffSeekerState>(comparator, decodingCtx) {
423       private void decode(boolean isFirst) {
424         byte flag = currentBuffer.get();
425         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
426           if (!isFirst) {
427             System.arraycopy(current.keyBuffer,
428                 current.keyLength - current.prevTimestampAndType.length,
429                 current.prevTimestampAndType, 0,
430                 current.prevTimestampAndType.length);
431           }
432           current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
433         }
434         if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
435           current.valueLength =
436               ByteBufferUtils.readCompressedInt(currentBuffer);
437         }
438         current.lastCommonPrefix =
439             ByteBufferUtils.readCompressedInt(currentBuffer);
440 
441         current.ensureSpaceForKey();
442 
443         if (isFirst) {
444           // copy everything
445           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
446               current.keyLength - current.prevTimestampAndType.length);
447           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
448               Bytes.SIZEOF_SHORT;
449           current.familyLengthWithSize =
450               current.keyBuffer[current.rowLengthWithSize] + Bytes.SIZEOF_BYTE;
451         } else if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
452           // length of row is different, copy everything except family
453 
454           // copy the row size
455           int oldRowLengthWithSize = current.rowLengthWithSize;
456           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
457               Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
458           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
459               Bytes.SIZEOF_SHORT;
460 
461           // move the column family
462           System.arraycopy(current.keyBuffer, oldRowLengthWithSize,
463               current.keyBuffer, current.rowLengthWithSize,
464               current.familyLengthWithSize);
465 
466           // copy the rest of row
467           currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
468               current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
469 
470           // copy the qualifier
471           currentBuffer.get(current.keyBuffer, current.rowLengthWithSize
472               + current.familyLengthWithSize, current.keyLength
473               - current.rowLengthWithSize - current.familyLengthWithSize
474               - current.prevTimestampAndType.length);
475         } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
476           // We have to copy part of row and qualifier, but the column family
477           // is in the right place.
478 
479           // before column family (rest of row)
480           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
481               current.rowLengthWithSize - current.lastCommonPrefix);
482 
483           // after column family (qualifier)
484           currentBuffer.get(current.keyBuffer, current.rowLengthWithSize
485               + current.familyLengthWithSize, current.keyLength
486               - current.rowLengthWithSize - current.familyLengthWithSize
487               - current.prevTimestampAndType.length);
488         } else {
489           // copy just the ending
490           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
491               current.keyLength - current.prevTimestampAndType.length
492                   - current.lastCommonPrefix);
493         }
494 
495         // timestamp
496         int pos = current.keyLength - current.prevTimestampAndType.length;
497         int commonTimestampPrefix = (flag & MASK_TIMESTAMP_LENGTH) >>>
498           SHIFT_TIMESTAMP_LENGTH;
499         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
500           System.arraycopy(current.prevTimestampAndType, 0, current.keyBuffer,
501               pos, commonTimestampPrefix);
502         }
503         pos += commonTimestampPrefix;
504         currentBuffer.get(current.keyBuffer, pos,
505             Bytes.SIZEOF_LONG - commonTimestampPrefix);
506         pos += Bytes.SIZEOF_LONG - commonTimestampPrefix;
507 
508         // type
509         if ((flag & FLAG_SAME_TYPE) == 0) {
510           currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
511         } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
512           current.keyBuffer[pos] =
513               current.prevTimestampAndType[Bytes.SIZEOF_LONG];
514         }
515 
516         // handle value
517         if ((flag & FLAG_SAME_VALUE) == 0) {
518           current.valueOffset = currentBuffer.position();
519           ByteBufferUtils.skip(currentBuffer, current.valueLength);
520         }
521 
522         if (includesTags()) {
523           decodeTags();
524         }
525         if (includesMvcc()) {
526           current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
527         } else {
528           current.memstoreTS = 0;
529         }
530         current.nextKvOffset = currentBuffer.position();
531       }
532 
533       @Override
534       protected void decodeFirst() {
535         ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
536         decode(true);
537       }
538 
539       @Override
540       protected void decodeNext() {
541         decode(false);
542       }
543 
544       @Override
545       protected FastDiffSeekerState createSeekerState() {
546         return new FastDiffSeekerState();
547       }
548     };
549   }
550 }