1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.io.encoding;
18
19 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
20
21 import java.io.ByteArrayInputStream;
22 import java.io.ByteArrayOutputStream;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.security.SecureRandom;
27
28 import org.apache.hadoop.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.io.TagCompressionContext;
30 import org.apache.hadoop.hbase.io.compress.Compression;
31 import org.apache.hadoop.hbase.io.crypto.Cipher;
32 import org.apache.hadoop.hbase.io.crypto.Encryption;
33 import org.apache.hadoop.hbase.io.crypto.Encryptor;
34 import org.apache.hadoop.hbase.io.hfile.BlockType;
35 import org.apache.hadoop.hbase.io.hfile.HFileContext;
36 import org.apache.hadoop.io.compress.CompressionOutputStream;
37 import org.apache.hadoop.io.compress.Compressor;
38
39 import com.google.common.base.Preconditions;
40
41
42
43
44
45
46
47
48 @InterfaceAudience.Private
49 public class HFileBlockDefaultEncodingContext implements
50 HFileBlockEncodingContext {
51 private byte[] onDiskBytesWithHeader;
52 private byte[] uncompressedBytesWithHeader;
53 private BlockType blockType;
54 private final DataBlockEncoding encodingAlgo;
55
56 private ByteArrayOutputStream encodedStream = new ByteArrayOutputStream();
57 private DataOutputStream dataOut = new DataOutputStream(encodedStream);
58
59 private byte[] dummyHeader;
60
61
62
63
64 private Compressor compressor;
65
66 private CompressionOutputStream compressionStream;
67
68 private ByteArrayOutputStream compressedByteStream;
69
70 private HFileContext fileContext;
71 private TagCompressionContext tagCompressionContext;
72
73
74
75
76 private ByteArrayOutputStream cryptoByteStream;
77
78 private byte[] iv;
79
80
81
82
83
84
85 public HFileBlockDefaultEncodingContext(DataBlockEncoding encoding, byte[] headerBytes,
86 HFileContext fileContext) {
87 this.encodingAlgo = encoding;
88 this.fileContext = fileContext;
89 Compression.Algorithm compressionAlgorithm =
90 fileContext.getCompression() == null ? NONE : fileContext.getCompression();
91 if (compressionAlgorithm != NONE) {
92 compressor = compressionAlgorithm.getCompressor();
93 compressedByteStream = new ByteArrayOutputStream();
94 try {
95 compressionStream =
96 compressionAlgorithm.createPlainCompressionStream(
97 compressedByteStream, compressor);
98 } catch (IOException e) {
99 throw new RuntimeException(
100 "Could not create compression stream for algorithm "
101 + compressionAlgorithm, e);
102 }
103 }
104
105 Encryption.Context cryptoContext = fileContext.getEncryptionContext();
106 if (cryptoContext != Encryption.Context.NONE) {
107 cryptoByteStream = new ByteArrayOutputStream();
108 iv = new byte[cryptoContext.getCipher().getIvLength()];
109 new SecureRandom().nextBytes(iv);
110 }
111
112 dummyHeader = Preconditions.checkNotNull(headerBytes,
113 "Please pass HConstants.HFILEBLOCK_DUMMY_HEADER instead of null for param headerBytes");
114 }
115
116 @Override
117 public void setDummyHeader(byte[] headerBytes) {
118 dummyHeader = headerBytes;
119 }
120
121
122
123
124
125 public void prepareEncoding() throws IOException {
126 encodedStream.reset();
127 dataOut.write(dummyHeader);
128 if (encodingAlgo != null
129 && encodingAlgo != DataBlockEncoding.NONE) {
130 encodingAlgo.writeIdInBytes(dataOut);
131 }
132 }
133
134 @Override
135 public void postEncoding(BlockType blockType)
136 throws IOException {
137 dataOut.flush();
138 compressAfterEncodingWithBlockType(encodedStream.toByteArray(), blockType);
139 this.blockType = blockType;
140 }
141
142
143
144
145
146
147 public void compressAfterEncodingWithBlockType(byte[] uncompressedBytesWithHeader,
148 BlockType blockType) throws IOException {
149 compressAfterEncoding(uncompressedBytesWithHeader, blockType, dummyHeader);
150 }
151
152
153
154
155
156
157
158 protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader,
159 BlockType blockType, byte[] headerBytes) throws IOException {
160 this.uncompressedBytesWithHeader = uncompressedBytesWithHeader;
161
162 Encryption.Context cryptoContext = fileContext.getEncryptionContext();
163 if (cryptoContext != Encryption.Context.NONE) {
164
165
166
167
168
169
170
171
172
173
174 cryptoByteStream.reset();
175
176 cryptoByteStream.write(headerBytes);
177
178 InputStream in;
179 int plaintextLength;
180
181 if (fileContext.getCompression() != Compression.Algorithm.NONE) {
182 compressedByteStream.reset();
183 compressionStream.resetState();
184 compressionStream.write(uncompressedBytesWithHeader,
185 headerBytes.length, uncompressedBytesWithHeader.length - headerBytes.length);
186 compressionStream.flush();
187 compressionStream.finish();
188 byte[] plaintext = compressedByteStream.toByteArray();
189 plaintextLength = plaintext.length;
190 in = new ByteArrayInputStream(plaintext);
191 } else {
192 plaintextLength = uncompressedBytesWithHeader.length - headerBytes.length;
193 in = new ByteArrayInputStream(uncompressedBytesWithHeader,
194 headerBytes.length, plaintextLength);
195 }
196
197 if (plaintextLength > 0) {
198
199
200 Cipher cipher = cryptoContext.getCipher();
201 Encryptor encryptor = cipher.getEncryptor();
202 encryptor.setKey(cryptoContext.getKey());
203
204
205 int ivLength = iv.length;
206 Preconditions.checkState(ivLength <= Byte.MAX_VALUE, "IV length out of range");
207 cryptoByteStream.write(ivLength);
208 if (ivLength > 0) {
209 encryptor.setIv(iv);
210 cryptoByteStream.write(iv);
211 }
212
213
214 Encryption.encrypt(cryptoByteStream, in, encryptor);
215
216 onDiskBytesWithHeader = cryptoByteStream.toByteArray();
217
218
219 Encryption.incrementIv(iv, 1 + (onDiskBytesWithHeader.length / encryptor.getBlockSize()));
220
221 } else {
222
223 cryptoByteStream.write(0);
224 onDiskBytesWithHeader = cryptoByteStream.toByteArray();
225
226 }
227
228 } else {
229
230 if (this.fileContext.getCompression() != NONE) {
231 compressedByteStream.reset();
232 compressedByteStream.write(headerBytes);
233 compressionStream.resetState();
234 compressionStream.write(uncompressedBytesWithHeader,
235 headerBytes.length, uncompressedBytesWithHeader.length
236 - headerBytes.length);
237 compressionStream.flush();
238 compressionStream.finish();
239 onDiskBytesWithHeader = compressedByteStream.toByteArray();
240 } else {
241 onDiskBytesWithHeader = uncompressedBytesWithHeader;
242 }
243
244 }
245
246 this.blockType = blockType;
247 }
248
249 @Override
250 public byte[] getOnDiskBytesWithHeader() {
251 return onDiskBytesWithHeader;
252 }
253
254 @Override
255 public byte[] getUncompressedBytesWithHeader() {
256 return uncompressedBytesWithHeader;
257 }
258
259 @Override
260 public BlockType getBlockType() {
261 return blockType;
262 }
263
264
265
266
267
268 @Override
269 public void close() {
270 if (compressor != null) {
271 this.fileContext.getCompression().returnCompressor(compressor);
272 compressor = null;
273 }
274 }
275
276 public DataOutputStream getOutputStreamForEncoder() {
277 return this.dataOut;
278 }
279
280 @Override
281 public DataBlockEncoding getDataBlockEncoding() {
282 return this.encodingAlgo;
283 }
284
285 @Override
286 public HFileContext getHFileContext() {
287 return this.fileContext;
288 }
289
290 public TagCompressionContext getTagCompressionContext() {
291 return tagCompressionContext;
292 }
293
294 public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
295 this.tagCompressionContext = tagCompressionContext;
296 }
297 }