View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.wal;
19  
20  import java.io.ByteArrayOutputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.codec.BaseDecoder;
30  import org.apache.hadoop.hbase.codec.BaseEncoder;
31  import org.apache.hadoop.hbase.codec.Codec;
32  import org.apache.hadoop.hbase.codec.KeyValueCodec;
33  import org.apache.hadoop.hbase.io.util.Dictionary;
34  import org.apache.hadoop.hbase.io.util.StreamUtils;
35  import org.apache.hadoop.hbase.util.Bytes;
36  import org.apache.hadoop.hbase.util.ReflectionUtils;
37  import org.apache.hadoop.io.IOUtils;
38  
39  import com.google.protobuf.ByteString;
40  
41  
42  /**
43   * Compression in this class is lifted off Compressor/KeyValueCompression.
44   * This is a pure coincidence... they are independent and don't have to be compatible.
45   *
46   * This codec is used at server side for writing cells to WAL as well as for sending edits
47   * as part of the distributed splitting process.
48   */
49  @InterfaceAudience.Private
50  public class WALCellCodec implements Codec {
51    /** Configuration key for the class to use when encoding cells in the WAL */
52    public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
53  
54    protected final CompressionContext compression;
55    protected final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() {
56      @Override
57      public byte[] uncompress(ByteString data, Dictionary dict) throws IOException {
58        return WALCellCodec.uncompressByteString(data, dict);
59      }
60    };
61  
62    /**
63     * <b>All subclasses must implement a no argument constructor</b>
64     */
65    public WALCellCodec() {
66      this.compression = null;
67    }
68  
69    /**
70     * Default constructor - <b>all subclasses must implement a constructor with this signature </b>
71     * if they are to be dynamically loaded from the {@link Configuration}.
72     * @param conf configuration to configure <tt>this</tt>
73     * @param compression compression the codec should support, can be <tt>null</tt> to indicate no
74     *          compression
75     */
76    public WALCellCodec(Configuration conf, CompressionContext compression) {
77      this.compression = compression;
78    }
79  
80    /**
81     * Create and setup a {@link WALCellCodec} from the {@link Configuration} and CompressionContext,
82     * if they have been specified. Fully prepares the codec for use.
83     * @param conf {@link Configuration} to read for the user-specified codec. If none is specified,
84     *          uses a {@link WALCellCodec}.
85     * @param compression compression the codec should use
86     * @return a {@link WALCellCodec} ready for use.
87     * @throws UnsupportedOperationException if the codec cannot be instantiated
88     */
89    public static WALCellCodec create(Configuration conf, CompressionContext compression)
90        throws UnsupportedOperationException {
91      String className = conf.get(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
92      return ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { Configuration.class,
93          CompressionContext.class }, new Object[] { conf, compression });
94    }
95  
96    public interface ByteStringCompressor {
97      ByteString compress(byte[] data, Dictionary dict) throws IOException;
98    }
99  
100   public interface ByteStringUncompressor {
101     byte[] uncompress(ByteString data, Dictionary dict) throws IOException;
102   }
103 
104   // TODO: it sucks that compression context is in HLog.Entry. It'd be nice if it was here.
105   //       Dictionary could be gotten by enum; initially, based on enum, context would create
106   //       an array of dictionaries.
107   static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor {
108     public ByteString toByteString() {
109       return ByteString.copyFrom(this.buf, 0, this.count);
110     }
111 
112     @Override
113     public ByteString compress(byte[] data, Dictionary dict) throws IOException {
114       writeCompressed(data, dict);
115       ByteString result = ByteString.copyFrom(this.buf, 0, this.count);
116       reset(); // Only resets the count - we reuse the byte array.
117       return result;
118     }
119 
120     private void writeCompressed(byte[] data, Dictionary dict) throws IOException {
121       assert dict != null;
122       short dictIdx = dict.findEntry(data, 0, data.length);
123       if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
124         write(Dictionary.NOT_IN_DICTIONARY);
125         StreamUtils.writeRawVInt32(this, data.length);
126         write(data, 0, data.length);
127       } else {
128         StreamUtils.writeShort(this, dictIdx);
129       }
130     }
131   }
132 
133   private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
134     InputStream in = bs.newInput();
135     byte status = (byte)in.read();
136     if (status == Dictionary.NOT_IN_DICTIONARY) {
137       byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
138       int bytesRead = in.read(arr);
139       if (bytesRead != arr.length) {
140         throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
141       }
142       if (dict != null) dict.addEntry(arr, 0, arr.length);
143       return arr;
144     } else {
145       // Status here is the higher-order byte of index of the dictionary entry.
146       short dictIdx = StreamUtils.toShort(status, (byte)in.read());
147       byte[] entry = dict.getEntry(dictIdx);
148       if (entry == null) {
149         throw new IOException("Missing dictionary entry for index " + dictIdx);
150       }
151       return entry;
152     }
153   }
154 
155   static class CompressedKvEncoder extends BaseEncoder {
156     private final CompressionContext compression;
157     public CompressedKvEncoder(OutputStream out, CompressionContext compression) {
158       super(out);
159       this.compression = compression;
160     }
161 
162     @Override
163     public void write(Cell cell) throws IOException {
164       if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
165       KeyValue kv = (KeyValue)cell;
166       byte[] kvBuffer = kv.getBuffer();
167       int offset = kv.getOffset();
168 
169       // We first write the KeyValue infrastructure as VInts.
170       StreamUtils.writeRawVInt32(out, kv.getKeyLength());
171       StreamUtils.writeRawVInt32(out, kv.getValueLength());
172       // To support tags
173       int tagsLength = kv.getTagsLengthUnsigned();
174       StreamUtils.writeRawVInt32(out, tagsLength);
175 
176       // Write row, qualifier, and family; use dictionary
177       // compression as they're likely to have duplicates.
178       write(kvBuffer, kv.getRowOffset(), kv.getRowLength(), compression.rowDict);
179       write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength(), compression.familyDict);
180       write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength(), compression.qualifierDict);
181 
182       // Write timestamp, type and value as uncompressed.
183       int pos = kv.getTimestampOffset();
184       int tsTypeValLen = kv.getLength() + offset - pos;
185       if (tagsLength > 0) {
186         tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
187       }
188       assert tsTypeValLen > 0;
189       out.write(kvBuffer, pos, tsTypeValLen);
190       if (tagsLength > 0) {
191         if (compression.tagCompressionContext != null) {
192           // Write tags using Dictionary compression
193           compression.tagCompressionContext.compressTags(out, kvBuffer, kv.getTagsOffset(),
194               tagsLength);
195         } else {
196           // Tag compression is disabled within the WAL compression. Just write the tags bytes as
197           // it is.
198           out.write(kvBuffer, kv.getTagsOffset(), tagsLength);
199         }
200       }
201     }
202 
203     private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException {
204       short dictIdx = Dictionary.NOT_IN_DICTIONARY;
205       if (dict != null) {
206         dictIdx = dict.findEntry(data, offset, length);
207       }
208       if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
209         out.write(Dictionary.NOT_IN_DICTIONARY);
210         StreamUtils.writeRawVInt32(out, length);
211         out.write(data, offset, length);
212       } else {
213         StreamUtils.writeShort(out, dictIdx);
214       }
215     }
216   }
217 
218   static class CompressedKvDecoder extends BaseDecoder {
219     private final CompressionContext compression;
220     public CompressedKvDecoder(InputStream in, CompressionContext compression) {
221       super(in);
222       this.compression = compression;
223     }
224 
225     @Override
226     protected Cell parseCell() throws IOException {
227       int keylength = StreamUtils.readRawVarint32(in);
228       int vlength = StreamUtils.readRawVarint32(in);
229       
230       int tagsLength = StreamUtils.readRawVarint32(in);
231       int length = 0;
232       if(tagsLength == 0) {
233         length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
234       } else {
235         length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
236       }
237 
238       byte[] backingArray = new byte[length];
239       int pos = 0;
240       pos = Bytes.putInt(backingArray, pos, keylength);
241       pos = Bytes.putInt(backingArray, pos, vlength);
242 
243       // the row
244       int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, compression.rowDict);
245       checkLength(elemLen, Short.MAX_VALUE);
246       pos = Bytes.putShort(backingArray, pos, (short)elemLen);
247       pos += elemLen;
248 
249       // family
250       elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, compression.familyDict);
251       checkLength(elemLen, Byte.MAX_VALUE);
252       pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
253       pos += elemLen;
254 
255       // qualifier
256       elemLen = readIntoArray(backingArray, pos, compression.qualifierDict);
257       pos += elemLen;
258 
259       // timestamp, type and value
260       int tsTypeValLen = length - pos;
261       if (tagsLength > 0) {
262         tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
263       }
264       IOUtils.readFully(in, backingArray, pos, tsTypeValLen);
265       pos += tsTypeValLen;
266 
267       // tags
268       if (tagsLength > 0) {
269         pos = Bytes.putAsShort(backingArray, pos, tagsLength);
270         if (compression.tagCompressionContext != null) {
271           compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
272         } else {
273           IOUtils.readFully(in, backingArray, pos, tagsLength);
274         }
275       }
276       return new KeyValue(backingArray, 0, length);
277     }
278 
279     private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
280       byte status = (byte)in.read();
281       if (status == Dictionary.NOT_IN_DICTIONARY) {
282         // status byte indicating that data to be read is not in dictionary.
283         // if this isn't in the dictionary, we need to add to the dictionary.
284         int length = StreamUtils.readRawVarint32(in);
285         IOUtils.readFully(in, to, offset, length);
286         dict.addEntry(to, offset, length);
287         return length;
288       } else {
289         // the status byte also acts as the higher order byte of the dictionary entry.
290         short dictIdx = StreamUtils.toShort(status, (byte)in.read());
291         byte[] entry = dict.getEntry(dictIdx);
292         if (entry == null) {
293           throw new IOException("Missing dictionary entry for index " + dictIdx);
294         }
295         // now we write the uncompressed value.
296         Bytes.putBytes(to, offset, entry, 0, entry.length);
297         return entry.length;
298       }
299     }
300 
301     private static void checkLength(int len, int max) throws IOException {
302       if (len < 0 || len > max) {
303         throw new IOException("Invalid length for compresesed portion of keyvalue: " + len);
304       }
305     }
306   }
307 
308   public class EnsureKvEncoder extends BaseEncoder {
309     public EnsureKvEncoder(OutputStream out) {
310       super(out);
311     }
312     @Override
313     public void write(Cell cell) throws IOException {
314       if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
315       checkFlushed();
316       // Make sure to write tags into WAL
317       KeyValue.oswrite((KeyValue) cell, this.out, true);
318     }
319   }
320 
321   @Override
322   public Decoder getDecoder(InputStream is) {
323     return (compression == null)
324         ? new KeyValueCodec.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression);
325   }
326 
327   @Override
328   public Encoder getEncoder(OutputStream os) {
329     return (compression == null)
330         ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression);
331   }
332 
333   public ByteStringCompressor getByteStringCompressor() {
334     // TODO: ideally this should also encapsulate compressionContext
335     return new BaosAndCompressor();
336   }
337 
338   public ByteStringUncompressor getByteStringUncompressor() {
339     // TODO: ideally this should also encapsulate compressionContext
340     return this.statelessUncompressor;
341   }
342 }