1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.io.hfile;
21
22 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
23 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
24 import static org.junit.Assert.assertEquals;
25
26 import java.io.ByteArrayInputStream;
27 import java.io.DataInputStream;
28 import java.io.DataOutputStream;
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.fs.FSDataInputStream;
35 import org.apache.hadoop.fs.FSDataOutputStream;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.HBaseTestingUtility;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.SmallTests;
41 import org.apache.hadoop.hbase.fs.HFileSystem;
42 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
43 import org.apache.hadoop.hbase.io.compress.Compression;
44 import org.apache.hadoop.hbase.util.ChecksumType;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.junit.experimental.categories.Category;
48
49 @Category(SmallTests.class)
50 public class TestChecksum {
51
52 private static final boolean detailedLogging = true;
53 private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
54
55 private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
56
57 static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
58 NONE, GZ };
59
60 static final int[] BYTES_PER_CHECKSUM = {
61 50, 500, 688, 16*1024, (16*1024+980), 64 * 1024};
62
63 private static final HBaseTestingUtility TEST_UTIL =
64 new HBaseTestingUtility();
65 private FileSystem fs;
66 private HFileSystem hfs;
67
68 @Before
69 public void setUp() throws Exception {
70 fs = HFileSystem.get(TEST_UTIL.getConfiguration());
71 hfs = (HFileSystem)fs;
72 }
73
74
75
76
77
78 @Test
79 public void testChecksumCorruption() throws IOException {
80 testChecksumCorruptionInternals(false);
81 testChecksumCorruptionInternals(true);
82 }
83
84 protected void testChecksumCorruptionInternals(boolean useTags) throws IOException {
85 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
86 for (boolean pread : new boolean[] { false, true }) {
87 LOG.info("testChecksumCorruption: Compression algorithm: " + algo +
88 ", pread=" + pread);
89 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
90 + algo);
91 FSDataOutputStream os = fs.create(path);
92 HFileContext meta = new HFileContextBuilder()
93 .withCompression(algo)
94 .withIncludesMvcc(true)
95 .withIncludesTags(useTags)
96 .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
97 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
98 .build();
99 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
100 long totalSize = 0;
101 for (int blockId = 0; blockId < 2; ++blockId) {
102 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
103 for (int i = 0; i < 1234; ++i)
104 dos.writeInt(i);
105 hbw.writeHeaderAndData(os);
106 totalSize += hbw.getOnDiskSizeWithHeader();
107 }
108 os.close();
109
110
111 assertEquals(true, hfs.useHBaseChecksum());
112
113
114 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
115 meta = new HFileContextBuilder()
116 .withCompression(algo)
117 .withIncludesMvcc(true)
118 .withIncludesTags(useTags)
119 .withHBaseCheckSum(true)
120 .build();
121 HFileBlock.FSReader hbr = new FSReaderV2Test(is, totalSize, fs, path, meta);
122 HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
123 b.sanityCheck();
124 assertEquals(4936, b.getUncompressedSizeWithoutHeader());
125 assertEquals(algo == GZ ? 2173 : 4936,
126 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
127
128 ByteBuffer bb = b.getBufferWithoutHeader();
129 DataInputStream in = new DataInputStream(
130 new ByteArrayInputStream(
131 bb.array(), bb.arrayOffset(), bb.limit()));
132
133
134
135 assertEquals(1, HFile.getChecksumFailuresCount());
136 validateData(in);
137
138
139
140
141 for (int i = 0; i <
142 HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
143 b = hbr.readBlockData(0, -1, -1, pread);
144 assertEquals(0, HFile.getChecksumFailuresCount());
145 }
146
147
148 b = hbr.readBlockData(0, -1, -1, pread);
149 assertEquals(1, HFile.getChecksumFailuresCount());
150
151
152
153 b = hbr.readBlockData(0, -1, -1, pread);
154 assertEquals(0, HFile.getChecksumFailuresCount());
155 is.close();
156
157
158
159
160 HFileSystem newfs = new HFileSystem(TEST_UTIL.getConfiguration(), false);
161 assertEquals(false, newfs.useHBaseChecksum());
162 is = new FSDataInputStreamWrapper(newfs, path);
163 hbr = new FSReaderV2Test(is, totalSize, newfs, path, meta);
164 b = hbr.readBlockData(0, -1, -1, pread);
165 is.close();
166 b.sanityCheck();
167 assertEquals(4936, b.getUncompressedSizeWithoutHeader());
168 assertEquals(algo == GZ ? 2173 : 4936,
169 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
170
171 bb = b.getBufferWithoutHeader();
172 in = new DataInputStream(new ByteArrayInputStream(
173 bb.array(), bb.arrayOffset(), bb.limit()));
174
175
176
177 assertEquals(0, HFile.getChecksumFailuresCount());
178 validateData(in);
179 }
180 }
181 }
182
183
184
185
186 @Test
187 public void testChecksumChunks() throws IOException {
188 testChecksumInternals(false);
189 testChecksumInternals(true);
190 }
191
192 protected void testChecksumInternals(boolean useTags) throws IOException {
193 Compression.Algorithm algo = NONE;
194 for (boolean pread : new boolean[] { false, true }) {
195 for (int bytesPerChecksum : BYTES_PER_CHECKSUM) {
196 Path path = new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" +
197 algo + bytesPerChecksum);
198 FSDataOutputStream os = fs.create(path);
199 HFileContext meta = new HFileContextBuilder()
200 .withCompression(algo)
201 .withIncludesMvcc(true)
202 .withIncludesTags(useTags)
203 .withHBaseCheckSum(true)
204 .withBytesPerCheckSum(bytesPerChecksum)
205 .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
206 .build();
207 HFileBlock.Writer hbw = new HFileBlock.Writer(null,
208 meta);
209
210
211
212 long dataSize = 0;
213 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
214 for (; dataSize < 6 * bytesPerChecksum;) {
215 for (int i = 0; i < 1234; ++i) {
216 dos.writeInt(i);
217 dataSize += 4;
218 }
219 }
220 hbw.writeHeaderAndData(os);
221 long totalSize = hbw.getOnDiskSizeWithHeader();
222 os.close();
223
224 long expectedChunks = ChecksumUtil.numChunks(
225 dataSize + HConstants.HFILEBLOCK_HEADER_SIZE,
226 bytesPerChecksum);
227 LOG.info("testChecksumChunks: pread=" + pread +
228 ", bytesPerChecksum=" + bytesPerChecksum +
229 ", fileSize=" + totalSize +
230 ", dataSize=" + dataSize +
231 ", expectedChunks=" + expectedChunks);
232
233
234 assertEquals(true, hfs.useHBaseChecksum());
235
236
237 FSDataInputStream is = fs.open(path);
238 FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path);
239 meta = new HFileContextBuilder()
240 .withCompression(algo)
241 .withIncludesMvcc(true)
242 .withIncludesTags(useTags)
243 .withHBaseCheckSum(true)
244 .withBytesPerCheckSum(bytesPerChecksum)
245 .build();
246 HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(
247 is, nochecksum), totalSize, hfs, path, meta);
248 HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
249 is.close();
250 b.sanityCheck();
251 assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
252
253
254 assertEquals(totalSize, HConstants.HFILEBLOCK_HEADER_SIZE + dataSize +
255 expectedChunks * HFileBlock.CHECKSUM_SIZE);
256
257
258 assertEquals(0, HFile.getChecksumFailuresCount());
259 }
260 }
261 }
262
263
264
265
266 @Test
267 public void testChecksumAlgorithm() throws IOException {
268 ChecksumType type = ChecksumType.CRC32;
269 assertEquals(ChecksumType.nameToType(type.getName()), type);
270 assertEquals(ChecksumType.valueOf(type.toString()), type);
271 }
272
273 private void validateData(DataInputStream in) throws IOException {
274
275 for (int i = 0; i < 1234; i++) {
276 int val = in.readInt();
277 if (val != i) {
278 String msg = "testChecksumCorruption: data mismatch at index " +
279 i + " expected " + i + " found " + val;
280 LOG.warn(msg);
281 assertEquals(i, val);
282 }
283 }
284 }
285
286
287
288
289
290
291 static private class FSReaderV2Test extends HFileBlock.FSReaderV2 {
292 public FSReaderV2Test(FSDataInputStreamWrapper istream, long fileSize, FileSystem fs,
293 Path path, HFileContext meta) throws IOException {
294 super(istream, fileSize, (HFileSystem) fs, path, meta);
295 }
296
297 @Override
298 protected boolean validateBlockChecksum(HFileBlock block,
299 byte[] data, int hdrSize) throws IOException {
300 return false;
301 }
302 }
303 }
304