1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.io.encoding;
18
19 import java.io.DataInputStream;
20 import java.io.DataOutputStream;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23
24 import org.apache.hadoop.classification.InterfaceAudience;
25 import org.apache.hadoop.hbase.KeyValue;
26 import org.apache.hadoop.hbase.KeyValue.KVComparator;
27 import org.apache.hadoop.hbase.util.ByteBufferUtils;
28 import org.apache.hadoop.hbase.util.Bytes;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 @InterfaceAudience.Private
45 public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
46
47 private int addKV(int prevKeyOffset, DataOutputStream out,
48 ByteBuffer in, int prevKeyLength) throws IOException {
49 int keyLength = in.getInt();
50 int valueLength = in.getInt();
51
52 if (prevKeyOffset == -1) {
53
54 ByteBufferUtils.putCompressedInt(out, keyLength);
55 ByteBufferUtils.putCompressedInt(out, valueLength);
56 ByteBufferUtils.putCompressedInt(out, 0);
57 ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
58 } else {
59
60 int common = ByteBufferUtils.findCommonPrefix(
61 in, prevKeyOffset + KeyValue.ROW_OFFSET,
62 in.position(),
63 Math.min(prevKeyLength, keyLength));
64
65 ByteBufferUtils.putCompressedInt(out, keyLength - common);
66 ByteBufferUtils.putCompressedInt(out, valueLength);
67 ByteBufferUtils.putCompressedInt(out, common);
68
69 ByteBufferUtils.skip(in, common);
70 ByteBufferUtils.moveBufferToStream(out, in, keyLength - common
71 + valueLength);
72 }
73
74 return keyLength;
75 }
76
77 @Override
78 public void internalEncodeKeyValues(DataOutputStream writeHere, ByteBuffer in,
79 HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
80 in.rewind();
81 ByteBufferUtils.putInt(writeHere, in.limit());
82 int prevOffset = -1;
83 int offset = 0;
84 int keyLength = 0;
85 while (in.hasRemaining()) {
86 offset = in.position();
87 keyLength = addKV(prevOffset, writeHere, in, keyLength);
88 afterEncodingKeyValue(in, writeHere, encodingCtx);
89 prevOffset = offset;
90 }
91 }
92
93 @Override
94 protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
95 int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
96 int decompressedSize = source.readInt();
97 ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
98 allocateHeaderLength);
99 buffer.position(allocateHeaderLength);
100 int prevKeyOffset = 0;
101
102 while (source.available() > skipLastBytes) {
103 prevKeyOffset = decodeKeyValue(source, buffer, prevKeyOffset);
104 afterDecodingKeyValue(source, buffer, decodingCtx);
105 }
106
107 if (source.available() != skipLastBytes) {
108 throw new IllegalStateException("Read too many bytes.");
109 }
110
111 buffer.limit(buffer.position());
112 return buffer;
113 }
114
115 private int decodeKeyValue(DataInputStream source, ByteBuffer buffer,
116 int prevKeyOffset)
117 throws IOException, EncoderBufferTooSmallException {
118 int keyLength = ByteBufferUtils.readCompressedInt(source);
119 int valueLength = ByteBufferUtils.readCompressedInt(source);
120 int commonLength = ByteBufferUtils.readCompressedInt(source);
121 int keyOffset;
122 keyLength += commonLength;
123
124 ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
125
126 buffer.putInt(keyLength);
127 buffer.putInt(valueLength);
128
129
130 if (commonLength > 0) {
131 keyOffset = buffer.position();
132 ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, prevKeyOffset,
133 commonLength);
134 } else {
135 keyOffset = buffer.position();
136 }
137
138
139 int len = keyLength - commonLength + valueLength;
140 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, len);
141 return keyOffset;
142 }
143
144 @Override
145 public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
146 block.mark();
147 block.position(Bytes.SIZEOF_INT);
148 int keyLength = ByteBufferUtils.readCompressedInt(block);
149 ByteBufferUtils.readCompressedInt(block);
150 int commonLength = ByteBufferUtils.readCompressedInt(block);
151 if (commonLength != 0) {
152 throw new AssertionError("Nonzero common length in the first key in "
153 + "block: " + commonLength);
154 }
155 int pos = block.position();
156 block.reset();
157 return ByteBuffer.wrap(block.array(), block.arrayOffset() + pos, keyLength)
158 .slice();
159 }
160
161 @Override
162 public String toString() {
163 return PrefixKeyDeltaEncoder.class.getSimpleName();
164 }
165
166 @Override
167 public EncodedSeeker createSeeker(KVComparator comparator,
168 final HFileBlockDecodingContext decodingCtx) {
169 return new BufferedEncodedSeeker<SeekerState>(comparator, decodingCtx) {
170 @Override
171 protected void decodeNext() {
172 current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
173 current.valueLength = ByteBufferUtils.readCompressedInt(currentBuffer);
174 current.lastCommonPrefix =
175 ByteBufferUtils.readCompressedInt(currentBuffer);
176 current.keyLength += current.lastCommonPrefix;
177 current.ensureSpaceForKey();
178 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
179 current.keyLength - current.lastCommonPrefix);
180 current.valueOffset = currentBuffer.position();
181 ByteBufferUtils.skip(currentBuffer, current.valueLength);
182 if (includesTags()) {
183 decodeTags();
184 }
185 if (includesMvcc()) {
186 current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
187 } else {
188 current.memstoreTS = 0;
189 }
190 current.nextKvOffset = currentBuffer.position();
191 }
192
193 @Override
194 protected void decodeFirst() {
195 ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
196 decodeNext();
197 }
198 };
199 }
200 }