1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.lang.reflect.Constructor;
25 import java.lang.reflect.InvocationTargetException;
26 import java.nio.ByteBuffer;
27
28 import org.apache.hadoop.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.Tag;
30 import org.apache.hadoop.hbase.io.util.Dictionary;
31 import org.apache.hadoop.hbase.io.util.StreamUtils;
32 import org.apache.hadoop.hbase.util.ByteBufferUtils;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.apache.hadoop.io.IOUtils;
35
36
37
38
39
40 @InterfaceAudience.Private
41 public class TagCompressionContext {
42 private final Dictionary tagDict;
43
44 public TagCompressionContext(Class<? extends Dictionary> dictType, int dictCapacity)
45 throws SecurityException, NoSuchMethodException, InstantiationException,
46 IllegalAccessException, InvocationTargetException {
47 Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor();
48 tagDict = dictConstructor.newInstance();
49 tagDict.init(dictCapacity);
50 }
51
52 public void clear() {
53 tagDict.clear();
54 }
55
56
57
58
59
60
61
62
63
64 public void compressTags(OutputStream out, byte[] in, int offset, int length)
65 throws IOException {
66 int pos = offset;
67 int endOffset = pos + length;
68 assert pos < endOffset;
69 while (pos < endOffset) {
70 int tagLen = Bytes.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
71 pos += Tag.TAG_LENGTH_SIZE;
72 write(in, pos, tagLen, out);
73 pos += tagLen;
74 }
75 }
76
77
78
79
80
81
82
83
84 public void compressTags(OutputStream out, ByteBuffer in, int length) throws IOException {
85 if (in.hasArray()) {
86 compressTags(out, in.array(), in.arrayOffset() + in.position(), length);
87 ByteBufferUtils.skip(in, length);
88 } else {
89 byte[] tagBuf = new byte[length];
90 in.get(tagBuf);
91 compressTags(out, tagBuf, 0, length);
92 }
93 }
94
95
96
97
98
99
100
101
102
103 public void uncompressTags(InputStream src, byte[] dest, int offset, int length)
104 throws IOException {
105 int endOffset = offset + length;
106 while (offset < endOffset) {
107 byte status = (byte) src.read();
108 if (status == Dictionary.NOT_IN_DICTIONARY) {
109
110 int tagLen = StreamUtils.readRawVarint32(src);
111 offset = Bytes.putAsShort(dest, offset, tagLen);
112 IOUtils.readFully(src, dest, offset, tagLen);
113 tagDict.addEntry(dest, offset, tagLen);
114 offset += tagLen;
115 } else {
116 short dictIdx = StreamUtils.toShort(status, (byte) src.read());
117 byte[] entry = tagDict.getEntry(dictIdx);
118 if (entry == null) {
119 throw new IOException("Missing dictionary entry for index " + dictIdx);
120 }
121 offset = Bytes.putAsShort(dest, offset, entry.length);
122 System.arraycopy(entry, 0, dest, offset, entry.length);
123 offset += entry.length;
124 }
125 }
126 }
127
128
129
130
131
132
133
134
135
136
137 public int uncompressTags(ByteBuffer src, byte[] dest, int offset, int length)
138 throws IOException {
139 int srcBeginPos = src.position();
140 int endOffset = offset + length;
141 while (offset < endOffset) {
142 byte status = src.get();
143 int tagLen;
144 if (status == Dictionary.NOT_IN_DICTIONARY) {
145
146 tagLen = StreamUtils.readRawVarint32(src);
147 offset = Bytes.putAsShort(dest, offset, tagLen);
148 src.get(dest, offset, tagLen);
149 tagDict.addEntry(dest, offset, tagLen);
150 offset += tagLen;
151 } else {
152 short dictIdx = StreamUtils.toShort(status, src.get());
153 byte[] entry = tagDict.getEntry(dictIdx);
154 if (entry == null) {
155 throw new IOException("Missing dictionary entry for index " + dictIdx);
156 }
157 tagLen = entry.length;
158 offset = Bytes.putAsShort(dest, offset, tagLen);
159 System.arraycopy(entry, 0, dest, offset, tagLen);
160 offset += tagLen;
161 }
162 }
163 return src.position() - srcBeginPos;
164 }
165
166
167
168
169
170
171
172
173 public void uncompressTags(InputStream src, ByteBuffer dest, int length) throws IOException {
174 if (dest.hasArray()) {
175 uncompressTags(src, dest.array(), dest.arrayOffset() + dest.position(), length);
176 } else {
177 byte[] tagBuf = new byte[length];
178 uncompressTags(src, tagBuf, 0, length);
179 dest.put(tagBuf);
180 }
181 }
182
183 private void write(byte[] data, int offset, int length, OutputStream out) throws IOException {
184 short dictIdx = Dictionary.NOT_IN_DICTIONARY;
185 if (tagDict != null) {
186 dictIdx = tagDict.findEntry(data, offset, length);
187 }
188 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
189 out.write(Dictionary.NOT_IN_DICTIONARY);
190 StreamUtils.writeRawVInt32(out, length);
191 out.write(data, offset, length);
192 } else {
193 StreamUtils.writeShort(out, dictIdx);
194 }
195 }
196 }