1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import java.io.ByteArrayOutputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24
25 import org.apache.hadoop.classification.InterfaceAudience;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.Cell;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.codec.BaseDecoder;
30 import org.apache.hadoop.hbase.codec.BaseEncoder;
31 import org.apache.hadoop.hbase.codec.Codec;
32 import org.apache.hadoop.hbase.codec.KeyValueCodec;
33 import org.apache.hadoop.hbase.io.util.Dictionary;
34 import org.apache.hadoop.hbase.io.util.StreamUtils;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.util.ReflectionUtils;
37 import org.apache.hadoop.io.IOUtils;
38
39 import com.google.protobuf.ByteString;
40
41
42
43
44
45
46
47
48
49 @InterfaceAudience.Private
50 public class WALCellCodec implements Codec {
51
52 public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
53
54 protected final CompressionContext compression;
55 protected final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() {
56 @Override
57 public byte[] uncompress(ByteString data, Dictionary dict) throws IOException {
58 return WALCellCodec.uncompressByteString(data, dict);
59 }
60 };
61
62
63
64
65 public WALCellCodec() {
66 this.compression = null;
67 }
68
69
70
71
72
73
74
75
76 public WALCellCodec(Configuration conf, CompressionContext compression) {
77 this.compression = compression;
78 }
79
80
81
82
83
84
85
86
87
88
89 public static WALCellCodec create(Configuration conf, CompressionContext compression)
90 throws UnsupportedOperationException {
91 String className = conf.get(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
92 return ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { Configuration.class,
93 CompressionContext.class }, new Object[] { conf, compression });
94 }
95
96 public interface ByteStringCompressor {
97 ByteString compress(byte[] data, Dictionary dict) throws IOException;
98 }
99
100 public interface ByteStringUncompressor {
101 byte[] uncompress(ByteString data, Dictionary dict) throws IOException;
102 }
103
104
105
106
107 static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor {
108 public ByteString toByteString() {
109 return ByteString.copyFrom(this.buf, 0, this.count);
110 }
111
112 @Override
113 public ByteString compress(byte[] data, Dictionary dict) throws IOException {
114 writeCompressed(data, dict);
115 ByteString result = ByteString.copyFrom(this.buf, 0, this.count);
116 reset();
117 return result;
118 }
119
120 private void writeCompressed(byte[] data, Dictionary dict) throws IOException {
121 assert dict != null;
122 short dictIdx = dict.findEntry(data, 0, data.length);
123 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
124 write(Dictionary.NOT_IN_DICTIONARY);
125 StreamUtils.writeRawVInt32(this, data.length);
126 write(data, 0, data.length);
127 } else {
128 StreamUtils.writeShort(this, dictIdx);
129 }
130 }
131 }
132
133 private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
134 InputStream in = bs.newInput();
135 byte status = (byte)in.read();
136 if (status == Dictionary.NOT_IN_DICTIONARY) {
137 byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
138 int bytesRead = in.read(arr);
139 if (bytesRead != arr.length) {
140 throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
141 }
142 if (dict != null) dict.addEntry(arr, 0, arr.length);
143 return arr;
144 } else {
145
146 short dictIdx = StreamUtils.toShort(status, (byte)in.read());
147 byte[] entry = dict.getEntry(dictIdx);
148 if (entry == null) {
149 throw new IOException("Missing dictionary entry for index " + dictIdx);
150 }
151 return entry;
152 }
153 }
154
155 static class CompressedKvEncoder extends BaseEncoder {
156 private final CompressionContext compression;
157 public CompressedKvEncoder(OutputStream out, CompressionContext compression) {
158 super(out);
159 this.compression = compression;
160 }
161
162 @Override
163 public void write(Cell cell) throws IOException {
164 if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
165 KeyValue kv = (KeyValue)cell;
166 byte[] kvBuffer = kv.getBuffer();
167 int offset = kv.getOffset();
168
169
170 StreamUtils.writeRawVInt32(out, kv.getKeyLength());
171 StreamUtils.writeRawVInt32(out, kv.getValueLength());
172
173 int tagsLength = kv.getTagsLengthUnsigned();
174 StreamUtils.writeRawVInt32(out, tagsLength);
175
176
177
178 write(kvBuffer, kv.getRowOffset(), kv.getRowLength(), compression.rowDict);
179 write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength(), compression.familyDict);
180 write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength(), compression.qualifierDict);
181
182
183 int pos = kv.getTimestampOffset();
184 int tsTypeValLen = kv.getLength() + offset - pos;
185 if (tagsLength > 0) {
186 tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
187 }
188 assert tsTypeValLen > 0;
189 out.write(kvBuffer, pos, tsTypeValLen);
190 if (tagsLength > 0) {
191 if (compression.tagCompressionContext != null) {
192
193 compression.tagCompressionContext.compressTags(out, kvBuffer, kv.getTagsOffset(),
194 tagsLength);
195 } else {
196
197
198 out.write(kvBuffer, kv.getTagsOffset(), tagsLength);
199 }
200 }
201 }
202
203 private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException {
204 short dictIdx = Dictionary.NOT_IN_DICTIONARY;
205 if (dict != null) {
206 dictIdx = dict.findEntry(data, offset, length);
207 }
208 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
209 out.write(Dictionary.NOT_IN_DICTIONARY);
210 StreamUtils.writeRawVInt32(out, length);
211 out.write(data, offset, length);
212 } else {
213 StreamUtils.writeShort(out, dictIdx);
214 }
215 }
216 }
217
218 static class CompressedKvDecoder extends BaseDecoder {
219 private final CompressionContext compression;
220 public CompressedKvDecoder(InputStream in, CompressionContext compression) {
221 super(in);
222 this.compression = compression;
223 }
224
225 @Override
226 protected Cell parseCell() throws IOException {
227 int keylength = StreamUtils.readRawVarint32(in);
228 int vlength = StreamUtils.readRawVarint32(in);
229
230 int tagsLength = StreamUtils.readRawVarint32(in);
231 int length = 0;
232 if(tagsLength == 0) {
233 length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
234 } else {
235 length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
236 }
237
238 byte[] backingArray = new byte[length];
239 int pos = 0;
240 pos = Bytes.putInt(backingArray, pos, keylength);
241 pos = Bytes.putInt(backingArray, pos, vlength);
242
243
244 int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, compression.rowDict);
245 checkLength(elemLen, Short.MAX_VALUE);
246 pos = Bytes.putShort(backingArray, pos, (short)elemLen);
247 pos += elemLen;
248
249
250 elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, compression.familyDict);
251 checkLength(elemLen, Byte.MAX_VALUE);
252 pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
253 pos += elemLen;
254
255
256 elemLen = readIntoArray(backingArray, pos, compression.qualifierDict);
257 pos += elemLen;
258
259
260 int tsTypeValLen = length - pos;
261 if (tagsLength > 0) {
262 tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
263 }
264 IOUtils.readFully(in, backingArray, pos, tsTypeValLen);
265 pos += tsTypeValLen;
266
267
268 if (tagsLength > 0) {
269 pos = Bytes.putAsShort(backingArray, pos, tagsLength);
270 if (compression.tagCompressionContext != null) {
271 compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
272 } else {
273 IOUtils.readFully(in, backingArray, pos, tagsLength);
274 }
275 }
276 return new KeyValue(backingArray, 0, length);
277 }
278
279 private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
280 byte status = (byte)in.read();
281 if (status == Dictionary.NOT_IN_DICTIONARY) {
282
283
284 int length = StreamUtils.readRawVarint32(in);
285 IOUtils.readFully(in, to, offset, length);
286 dict.addEntry(to, offset, length);
287 return length;
288 } else {
289
290 short dictIdx = StreamUtils.toShort(status, (byte)in.read());
291 byte[] entry = dict.getEntry(dictIdx);
292 if (entry == null) {
293 throw new IOException("Missing dictionary entry for index " + dictIdx);
294 }
295
296 Bytes.putBytes(to, offset, entry, 0, entry.length);
297 return entry.length;
298 }
299 }
300
301 private static void checkLength(int len, int max) throws IOException {
302 if (len < 0 || len > max) {
303 throw new IOException("Invalid length for compresesed portion of keyvalue: " + len);
304 }
305 }
306 }
307
308 public class EnsureKvEncoder extends BaseEncoder {
309 public EnsureKvEncoder(OutputStream out) {
310 super(out);
311 }
312 @Override
313 public void write(Cell cell) throws IOException {
314 if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
315 checkFlushed();
316
317 KeyValue.oswrite((KeyValue) cell, this.out, true);
318 }
319 }
320
321 @Override
322 public Decoder getDecoder(InputStream is) {
323 return (compression == null)
324 ? new KeyValueCodec.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression);
325 }
326
327 @Override
328 public Encoder getEncoder(OutputStream os) {
329 return (compression == null)
330 ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression);
331 }
332
333 public ByteStringCompressor getByteStringCompressor() {
334
335 return new BaosAndCompressor();
336 }
337
338 public ByteStringUncompressor getByteStringUncompressor() {
339
340 return this.statelessUncompressor;
341 }
342 }