1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import java.io.ByteArrayInputStream;
21 import java.io.ByteArrayOutputStream;
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.security.SecureRandom;
27
28 import org.apache.commons.io.IOUtils;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.KeyValue;
32 import org.apache.hadoop.hbase.codec.KeyValueCodec;
33 import org.apache.hadoop.hbase.io.crypto.Decryptor;
34 import org.apache.hadoop.hbase.io.crypto.Encryption;
35 import org.apache.hadoop.hbase.io.crypto.Encryptor;
36 import org.apache.hadoop.hbase.io.util.StreamUtils;
37 import org.apache.hadoop.hbase.util.Bytes;
38
39
40
41
42 public class SecureWALCellCodec extends WALCellCodec {
43
44 private Encryptor encryptor;
45 private Decryptor decryptor;
46
47 public SecureWALCellCodec(Configuration conf, Encryptor encryptor) {
48 super(conf, null);
49 this.encryptor = encryptor;
50 }
51
52 public SecureWALCellCodec(Configuration conf, Decryptor decryptor) {
53 super(conf, null);
54 this.decryptor = decryptor;
55 }
56
57 static class EncryptedKvDecoder extends KeyValueCodec.KeyValueDecoder {
58
59 private Decryptor decryptor;
60 private byte[] iv;
61
62 public EncryptedKvDecoder(InputStream in) {
63 super(in);
64 }
65
66 public EncryptedKvDecoder(InputStream in, Decryptor decryptor) {
67 super(in);
68 this.decryptor = decryptor;
69 this.iv = new byte[decryptor.getIvLength()];
70 }
71
72 @Override
73 protected Cell parseCell() throws IOException {
74 int ivLength = 0;
75 try {
76 ivLength = StreamUtils.readRawVarint32(in);
77 } catch (EOFException e) {
78
79 return null;
80 }
81
82
83
84
85 if (ivLength != this.iv.length) {
86 throw new IOException("Incorrect IV length: expected=" + iv.length + " have=" +
87 ivLength);
88 }
89 IOUtils.readFully(in, this.iv);
90
91 int codedLength = StreamUtils.readRawVarint32(in);
92 byte[] codedBytes = new byte[codedLength];
93 IOUtils.readFully(in, codedBytes);
94
95 decryptor.setIv(iv);
96 decryptor.reset();
97
98 InputStream cin = decryptor.createDecryptionStream(new ByteArrayInputStream(codedBytes));
99
100
101
102 int keylength = StreamUtils.readRawVarint32(cin);
103 int vlength = StreamUtils.readRawVarint32(cin);
104 int tagsLength = StreamUtils.readRawVarint32(cin);
105 int length = 0;
106 if (tagsLength == 0) {
107 length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
108 } else {
109 length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
110 }
111
112 byte[] backingArray = new byte[length];
113 int pos = 0;
114 pos = Bytes.putInt(backingArray, pos, keylength);
115 pos = Bytes.putInt(backingArray, pos, vlength);
116
117
118 int elemLen = StreamUtils.readRawVarint32(cin);
119 pos = Bytes.putShort(backingArray, pos, (short)elemLen);
120 IOUtils.readFully(cin, backingArray, pos, elemLen);
121 pos += elemLen;
122
123 elemLen = StreamUtils.readRawVarint32(cin);
124 pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
125 IOUtils.readFully(cin, backingArray, pos, elemLen);
126 pos += elemLen;
127
128 elemLen = StreamUtils.readRawVarint32(cin);
129 IOUtils.readFully(cin, backingArray, pos, elemLen);
130 pos += elemLen;
131
132 IOUtils.readFully(cin, backingArray, pos, length - pos);
133 return new KeyValue(backingArray, 0, length);
134 }
135
136 }
137
138 static class EncryptedKvEncoder extends KeyValueCodec.KeyValueEncoder {
139
140 private Encryptor encryptor;
141 private final ThreadLocal<byte[]> iv = new ThreadLocal<byte[]>() {
142 @Override
143 protected byte[] initialValue() {
144 byte[] iv = new byte[encryptor.getIvLength()];
145 new SecureRandom().nextBytes(iv);
146 return iv;
147 }
148 };
149
150 protected byte[] nextIv() {
151 byte[] b = iv.get(), ret = new byte[b.length];
152 System.arraycopy(b, 0, ret, 0, b.length);
153 return ret;
154 }
155
156 protected void incrementIv(int v) {
157 Encryption.incrementIv(iv.get(), 1 + (v / encryptor.getBlockSize()));
158 }
159
160 public EncryptedKvEncoder(OutputStream os) {
161 super(os);
162 }
163
164 public EncryptedKvEncoder(OutputStream os, Encryptor encryptor) {
165 super(os);
166 this.encryptor = encryptor;
167 }
168
169 @Override
170 public void write(Cell cell) throws IOException {
171 if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
172
173 KeyValue kv = (KeyValue)cell;
174 byte[] kvBuffer = kv.getBuffer();
175 int offset = kv.getOffset();
176
177 byte[] iv = nextIv();
178 encryptor.setIv(iv);
179 encryptor.reset();
180
181
182
183
184
185 StreamUtils.writeRawVInt32(out, iv.length);
186 out.write(iv);
187
188
189
190 ByteArrayOutputStream baos = new ByteArrayOutputStream();
191 OutputStream cout = encryptor.createEncryptionStream(baos);
192
193
194 StreamUtils.writeRawVInt32(cout, kv.getKeyLength());
195 StreamUtils.writeRawVInt32(cout, kv.getValueLength());
196
197 StreamUtils.writeRawVInt32(cout, kv.getTagsLengthUnsigned());
198
199
200 StreamUtils.writeRawVInt32(cout, kv.getRowLength());
201 cout.write(kvBuffer, kv.getRowOffset(), kv.getRowLength());
202 StreamUtils.writeRawVInt32(cout, kv.getFamilyLength());
203 cout.write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength());
204 StreamUtils.writeRawVInt32(cout, kv.getQualifierLength());
205 cout.write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength());
206
207 int pos = kv.getTimestampOffset();
208 int remainingLength = kv.getLength() + offset - pos;
209 cout.write(kvBuffer, pos, remainingLength);
210 cout.close();
211
212 StreamUtils.writeRawVInt32(out, baos.size());
213 baos.writeTo(out);
214
215
216 incrementIv(baos.size());
217 }
218
219 }
220
221 @Override
222 public Decoder getDecoder(InputStream is) {
223 return new EncryptedKvDecoder(is, decryptor);
224 }
225
226 @Override
227 public Encoder getEncoder(OutputStream os) {
228 return new EncryptedKvEncoder(os, encryptor);
229 }
230
231 public static WALCellCodec getCodec(Configuration conf, Encryptor encryptor) {
232 return new SecureWALCellCodec(conf, encryptor);
233 }
234
235 public static WALCellCodec getCodec(Configuration conf, Decryptor decryptor) {
236 return new SecureWALCellCodec(conf, decryptor);
237 }
238
239 }