1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.io.compress;
18
19 import java.io.BufferedInputStream;
20 import java.io.BufferedOutputStream;
21 import java.io.FilterOutputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.classification.InterfaceAudience;
29 import org.apache.hadoop.classification.InterfaceStability;
30 import org.apache.hadoop.conf.Configurable;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.io.IOUtils;
33 import org.apache.hadoop.io.compress.CodecPool;
34 import org.apache.hadoop.io.compress.CompressionCodec;
35 import org.apache.hadoop.io.compress.CompressionInputStream;
36 import org.apache.hadoop.io.compress.CompressionOutputStream;
37 import org.apache.hadoop.io.compress.Compressor;
38 import org.apache.hadoop.io.compress.Decompressor;
39 import org.apache.hadoop.io.compress.DefaultCodec;
40 import org.apache.hadoop.io.compress.DoNotPool;
41 import org.apache.hadoop.io.compress.GzipCodec;
42 import org.apache.hadoop.util.ReflectionUtils;
43
44
45
46
47
48 @InterfaceAudience.Private
49 public final class Compression {
50 static final Log LOG = LogFactory.getLog(Compression.class);
51
52
53
54
55 private Compression() {
56 super();
57 }
58
59 static class FinishOnFlushCompressionStream extends FilterOutputStream {
60 public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
61 super(cout);
62 }
63
64 @Override
65 public void write(byte b[], int off, int len) throws IOException {
66 out.write(b, off, len);
67 }
68
69 @Override
70 public void flush() throws IOException {
71 CompressionOutputStream cout = (CompressionOutputStream) out;
72 cout.finish();
73 cout.flush();
74 cout.resetState();
75 }
76 }
77
78
79
80
81 private static ClassLoader getClassLoaderForCodec() {
82 ClassLoader cl = Thread.currentThread().getContextClassLoader();
83 if (cl == null) {
84 cl = Compression.class.getClassLoader();
85 }
86 if (cl == null) {
87 cl = ClassLoader.getSystemClassLoader();
88 }
89 if (cl == null) {
90 throw new RuntimeException("A ClassLoader to load the Codec could not be determined");
91 }
92 return cl;
93 }
94
95
96
97
98
99
100 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
101 value="SE_TRANSIENT_FIELD_NOT_RESTORED",
102 justification="We are not serializing so doesn't apply (not sure why transient though)")
103 @InterfaceAudience.Public
104 @InterfaceStability.Evolving
105 public static enum Algorithm {
106 LZO("lzo") {
107
108 private volatile transient CompressionCodec lzoCodec;
109 private transient Object lock = new Object();
110
111 @Override
112 CompressionCodec getCodec(Configuration conf) {
113 if (lzoCodec == null) {
114 synchronized (lock) {
115 if (lzoCodec == null) {
116 lzoCodec = buildCodec(conf);
117 }
118 }
119 }
120 return lzoCodec;
121 }
122
123 private CompressionCodec buildCodec(Configuration conf) {
124 try {
125 Class<?> externalCodec =
126 getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec");
127 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
128 new Configuration(conf));
129 } catch (ClassNotFoundException e) {
130 throw new RuntimeException(e);
131 }
132 }
133 },
134 GZ("gz") {
135 private volatile transient GzipCodec codec;
136 private transient Object lock = new Object();
137
138 @Override
139 DefaultCodec getCodec(Configuration conf) {
140 if (codec == null) {
141 synchronized (lock) {
142 if (codec == null) {
143 codec = buildCodec(conf);
144 }
145 }
146 }
147
148 return codec;
149 }
150
151 private GzipCodec buildCodec(Configuration conf) {
152 GzipCodec gzcodec = new ReusableStreamGzipCodec();
153 gzcodec.setConf(new Configuration(conf));
154 return gzcodec;
155 }
156 },
157
158 NONE("none") {
159 @Override
160 DefaultCodec getCodec(Configuration conf) {
161 return null;
162 }
163
164 @Override
165 public synchronized InputStream createDecompressionStream(
166 InputStream downStream, Decompressor decompressor,
167 int downStreamBufferSize) throws IOException {
168 if (downStreamBufferSize > 0) {
169 return new BufferedInputStream(downStream, downStreamBufferSize);
170 }
171
172
173
174
175
176 return downStream;
177 }
178
179 @Override
180 public synchronized OutputStream createCompressionStream(
181 OutputStream downStream, Compressor compressor,
182 int downStreamBufferSize) throws IOException {
183 if (downStreamBufferSize > 0) {
184 return new BufferedOutputStream(downStream, downStreamBufferSize);
185 }
186
187 return downStream;
188 }
189 },
190 SNAPPY("snappy") {
191
192 private volatile transient CompressionCodec snappyCodec;
193 private transient Object lock = new Object();
194
195 @Override
196 CompressionCodec getCodec(Configuration conf) {
197 if (snappyCodec == null) {
198 synchronized (lock) {
199 if (snappyCodec == null) {
200 snappyCodec = buildCodec(conf);
201 }
202 }
203 }
204 return snappyCodec;
205 }
206
207 private CompressionCodec buildCodec(Configuration conf) {
208 try {
209 Class<?> externalCodec =
210 getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec");
211 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
212 conf);
213 } catch (ClassNotFoundException e) {
214 throw new RuntimeException(e);
215 }
216 }
217 },
218 LZ4("lz4") {
219
220 private volatile transient CompressionCodec lz4Codec;
221 private transient Object lock = new Object();
222
223 @Override
224 CompressionCodec getCodec(Configuration conf) {
225 if (lz4Codec == null) {
226 synchronized (lock) {
227 if (lz4Codec == null) {
228 lz4Codec = buildCodec(conf);
229 }
230 }
231 }
232 return lz4Codec;
233 }
234
235 private CompressionCodec buildCodec(Configuration conf) {
236 try {
237 Class<?> externalCodec =
238 getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec");
239 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
240 conf);
241 } catch (ClassNotFoundException e) {
242 throw new RuntimeException(e);
243 }
244 }
245 };
246
247 private final Configuration conf;
248 private final String compressName;
249
250 private static final int DATA_IBUF_SIZE = 1 * 1024;
251
252 private static final int DATA_OBUF_SIZE = 4 * 1024;
253
254 Algorithm(String name) {
255 this.conf = new Configuration();
256 this.conf.setBoolean("hadoop.native.lib", true);
257 this.compressName = name;
258 }
259
260 abstract CompressionCodec getCodec(Configuration conf);
261
262 public InputStream createDecompressionStream(
263 InputStream downStream, Decompressor decompressor,
264 int downStreamBufferSize) throws IOException {
265 CompressionCodec codec = getCodec(conf);
266
267 if (downStreamBufferSize > 0) {
268 ((Configurable)codec).getConf().setInt("io.file.buffer.size",
269 downStreamBufferSize);
270 }
271 CompressionInputStream cis =
272 codec.createInputStream(downStream, decompressor);
273 BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
274 return bis2;
275
276 }
277
278 public OutputStream createCompressionStream(
279 OutputStream downStream, Compressor compressor, int downStreamBufferSize)
280 throws IOException {
281 OutputStream bos1 = null;
282 if (downStreamBufferSize > 0) {
283 bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
284 }
285 else {
286 bos1 = downStream;
287 }
288 CompressionOutputStream cos =
289 createPlainCompressionStream(bos1, compressor);
290 BufferedOutputStream bos2 =
291 new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
292 DATA_OBUF_SIZE);
293 return bos2;
294 }
295
296
297
298
299
300 public CompressionOutputStream createPlainCompressionStream(
301 OutputStream downStream, Compressor compressor) throws IOException {
302 CompressionCodec codec = getCodec(conf);
303 ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
304 return codec.createOutputStream(downStream, compressor);
305 }
306
307 public Compressor getCompressor() {
308 CompressionCodec codec = getCodec(conf);
309 if (codec != null) {
310 Compressor compressor = CodecPool.getCompressor(codec);
311 if (compressor != null) {
312 if (compressor.finished()) {
313
314
315 LOG
316 .warn("Compressor obtained from CodecPool is already finished()");
317
318
319 }
320 compressor.reset();
321 }
322 return compressor;
323 }
324 return null;
325 }
326
327 public void returnCompressor(Compressor compressor) {
328 if (compressor != null) {
329 CodecPool.returnCompressor(compressor);
330 }
331 }
332
333 public Decompressor getDecompressor() {
334 CompressionCodec codec = getCodec(conf);
335 if (codec != null) {
336 Decompressor decompressor = CodecPool.getDecompressor(codec);
337 if (decompressor != null) {
338 if (decompressor.finished()) {
339
340
341 LOG
342 .warn("Deompressor obtained from CodecPool is already finished()");
343
344
345 }
346 decompressor.reset();
347 }
348 return decompressor;
349 }
350
351 return null;
352 }
353
354 public void returnDecompressor(Decompressor decompressor) {
355 if (decompressor != null) {
356 CodecPool.returnDecompressor(decompressor);
357 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
358 decompressor.end();
359 }
360 }
361 }
362
363 public String getName() {
364 return compressName;
365 }
366 }
367
368 public static Algorithm getCompressionAlgorithmByName(String compressName) {
369 Algorithm[] algos = Algorithm.class.getEnumConstants();
370
371 for (Algorithm a : algos) {
372 if (a.getName().equals(compressName)) {
373 return a;
374 }
375 }
376
377 throw new IllegalArgumentException(
378 "Unsupported compression algorithm name: " + compressName);
379 }
380
381
382
383
384
385
386
387 public static String[] getSupportedAlgorithms() {
388 Algorithm[] algos = Algorithm.class.getEnumConstants();
389
390 String[] ret = new String[algos.length];
391 int i = 0;
392 for (Algorithm a : algos) {
393 ret[i++] = a.getName();
394 }
395
396 return ret;
397 }
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419 public static void decompress(byte[] dest, int destOffset,
420 InputStream bufferedBoundedStream, int compressedSize,
421 int uncompressedSize, Compression.Algorithm compressAlgo)
422 throws IOException {
423
424 if (dest.length - destOffset < uncompressedSize) {
425 throw new IllegalArgumentException(
426 "Output buffer does not have enough space to hold "
427 + uncompressedSize + " decompressed bytes, available: "
428 + (dest.length - destOffset));
429 }
430
431 Decompressor decompressor = null;
432 try {
433 decompressor = compressAlgo.getDecompressor();
434 InputStream is = compressAlgo.createDecompressionStream(
435 bufferedBoundedStream, decompressor, 0);
436
437 IOUtils.readFully(is, dest, destOffset, uncompressedSize);
438 is.close();
439 } finally {
440 if (decompressor != null) {
441 compressAlgo.returnDecompressor(decompressor);
442 }
443 }
444 }
445
446 }