View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.wal;
19  
20  import java.io.ByteArrayInputStream;
21  import java.io.ByteArrayOutputStream;
22  import java.io.EOFException;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.io.OutputStream;
26  import java.security.SecureRandom;
27  
28  import org.apache.commons.io.IOUtils;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.KeyValue;
32  import org.apache.hadoop.hbase.codec.KeyValueCodec;
33  import org.apache.hadoop.hbase.io.crypto.Decryptor;
34  import org.apache.hadoop.hbase.io.crypto.Encryption;
35  import org.apache.hadoop.hbase.io.crypto.Encryptor;
36  import org.apache.hadoop.hbase.io.util.StreamUtils;
37  import org.apache.hadoop.hbase.util.Bytes;
38  
39  /**
40   * A WALCellCodec that encrypts the WALedits.
41   */
42  public class SecureWALCellCodec extends WALCellCodec {
43  
44    private Encryptor encryptor;
45    private Decryptor decryptor;
46  
47    public SecureWALCellCodec(Configuration conf, Encryptor encryptor) {
48      super(conf, null);
49      this.encryptor = encryptor;
50    }
51  
52    public SecureWALCellCodec(Configuration conf, Decryptor decryptor) {
53      super(conf, null);
54      this.decryptor = decryptor;
55    }
56  
57    static class EncryptedKvDecoder extends KeyValueCodec.KeyValueDecoder {
58  
59      private Decryptor decryptor;
60      private byte[] iv;
61  
62      public EncryptedKvDecoder(InputStream in) {
63        super(in);
64      }
65  
66      public EncryptedKvDecoder(InputStream in, Decryptor decryptor) {
67        super(in);
68        this.decryptor = decryptor;
69        this.iv = new byte[decryptor.getIvLength()];
70      }
71  
72      @Override
73      protected Cell parseCell() throws IOException {
74        int ivLength = 0;
75        try {
76          ivLength = StreamUtils.readRawVarint32(in);
77        } catch (EOFException e) {
78          // EOF at start is OK
79          return null;
80        }
81  
82        // TODO: An IV length of 0 could signify an unwrapped cell, when the
83        // encoder supports that just read the remainder in directly
84  
85        if (ivLength != this.iv.length) {
86          throw new IOException("Incorrect IV length: expected=" + iv.length + " have=" +
87            ivLength);
88        }
89        IOUtils.readFully(in, this.iv);
90  
91        int codedLength = StreamUtils.readRawVarint32(in);
92        byte[] codedBytes = new byte[codedLength];
93        IOUtils.readFully(in, codedBytes);
94  
95        decryptor.setIv(iv);
96        decryptor.reset();
97  
98        InputStream cin = decryptor.createDecryptionStream(new ByteArrayInputStream(codedBytes));
99  
100       // TODO: Add support for WAL compression
101 
102       int keylength = StreamUtils.readRawVarint32(cin);
103       int vlength = StreamUtils.readRawVarint32(cin);
104       int tagsLength = StreamUtils.readRawVarint32(cin);
105       int length = 0;
106       if (tagsLength == 0) {
107         length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
108       } else {
109         length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
110       }
111 
112       byte[] backingArray = new byte[length];
113       int pos = 0;
114       pos = Bytes.putInt(backingArray, pos, keylength);
115       pos = Bytes.putInt(backingArray, pos, vlength);
116 
117       // Row
118       int elemLen = StreamUtils.readRawVarint32(cin);
119       pos = Bytes.putShort(backingArray, pos, (short)elemLen);
120       IOUtils.readFully(cin, backingArray, pos, elemLen);
121       pos += elemLen;
122       // Family
123       elemLen = StreamUtils.readRawVarint32(cin);
124       pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
125       IOUtils.readFully(cin, backingArray, pos, elemLen);
126       pos += elemLen;
127       // Qualifier
128       elemLen = StreamUtils.readRawVarint32(cin);
129       IOUtils.readFully(cin, backingArray, pos, elemLen);
130       pos += elemLen;
131       // Remainder
132       IOUtils.readFully(cin, backingArray, pos, length - pos);
133       return new KeyValue(backingArray, 0, length);
134     }
135 
136   }
137 
138   static class EncryptedKvEncoder extends KeyValueCodec.KeyValueEncoder {
139 
140     private Encryptor encryptor;
141     private final ThreadLocal<byte[]> iv = new ThreadLocal<byte[]>() {
142       @Override
143       protected byte[] initialValue() {
144         byte[] iv = new byte[encryptor.getIvLength()];
145         new SecureRandom().nextBytes(iv);
146         return iv;
147       }
148     };
149 
150     protected byte[] nextIv() {
151       byte[] b = iv.get(), ret = new byte[b.length];
152       System.arraycopy(b, 0, ret, 0, b.length);
153       return ret;
154     }
155 
156     protected void incrementIv(int v) {
157       Encryption.incrementIv(iv.get(), 1 + (v / encryptor.getBlockSize()));
158     }
159 
160     public EncryptedKvEncoder(OutputStream os) {
161       super(os);
162     }
163 
164     public EncryptedKvEncoder(OutputStream os, Encryptor encryptor) {
165       super(os);
166       this.encryptor = encryptor;
167     }
168 
169     @Override
170     public void write(Cell cell) throws IOException {
171       if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
172 
173       KeyValue kv = (KeyValue)cell;
174       byte[] kvBuffer = kv.getBuffer();
175       int offset = kv.getOffset();
176 
177       byte[] iv = nextIv();
178       encryptor.setIv(iv);
179       encryptor.reset();
180 
181       // TODO: Check if this is a cell for an encrypted CF. If not, we can
182       // write a 0 here to signal an unwrapped cell and just dump the KV bytes
183       // afterward
184 
185       StreamUtils.writeRawVInt32(out, iv.length);
186       out.write(iv);
187 
188       // TODO: Add support for WAL compression
189 
190       ByteArrayOutputStream baos = new ByteArrayOutputStream();
191       OutputStream cout = encryptor.createEncryptionStream(baos);
192 
193       // Write the KeyValue infrastructure as VInts.
194       StreamUtils.writeRawVInt32(cout, kv.getKeyLength());
195       StreamUtils.writeRawVInt32(cout, kv.getValueLength());
196       // To support tags
197       StreamUtils.writeRawVInt32(cout, kv.getTagsLengthUnsigned());
198 
199       // Write row, qualifier, and family
200       StreamUtils.writeRawVInt32(cout, kv.getRowLength());
201       cout.write(kvBuffer, kv.getRowOffset(), kv.getRowLength());
202       StreamUtils.writeRawVInt32(cout, kv.getFamilyLength());
203       cout.write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength());
204       StreamUtils.writeRawVInt32(cout, kv.getQualifierLength());
205       cout.write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength());
206       // Write the rest
207       int pos = kv.getTimestampOffset();
208       int remainingLength = kv.getLength() + offset - pos;
209       cout.write(kvBuffer, pos, remainingLength);
210       cout.close();
211 
212       StreamUtils.writeRawVInt32(out, baos.size());
213       baos.writeTo(out);
214 
215       // Increment IV given the final payload length
216       incrementIv(baos.size());
217     }
218 
219   }
220 
221   @Override
222   public Decoder getDecoder(InputStream is) {
223     return new EncryptedKvDecoder(is, decryptor);
224   }
225 
226   @Override
227   public Encoder getEncoder(OutputStream os) {
228     return new EncryptedKvEncoder(os, encryptor);
229   }
230 
231   public static WALCellCodec getCodec(Configuration conf, Encryptor encryptor) {
232     return new SecureWALCellCodec(conf, encryptor);
233   }
234 
235   public static WALCellCodec getCodec(Configuration conf, Decryptor decryptor) {
236     return new SecureWALCellCodec(conf, decryptor);
237   }
238 
239 }