View Javadoc

1   /*
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.io.hfile;
21  
22  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
23  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertTrue;
26  
27  import java.io.ByteArrayInputStream;
28  import java.io.DataInputStream;
29  import java.io.DataOutputStream;
30  import java.io.IOException;
31  import java.nio.BufferUnderflowException;
32  import java.nio.ByteBuffer;
33  import java.util.ArrayList;
34  import java.util.Arrays;
35  import java.util.Iterator;
36  import java.util.List;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.fs.FSDataInputStream;
41  import org.apache.hadoop.fs.FSDataOutputStream;
42  import org.apache.hadoop.fs.FileSystem;
43  import org.apache.hadoop.fs.Path;
44  import org.apache.hadoop.hbase.HBaseTestingUtility;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.testclassification.SmallTests;
47  import org.apache.hadoop.hbase.fs.HFileSystem;
48  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
49  import org.apache.hadoop.hbase.io.compress.Compression;
50  import org.apache.hadoop.hbase.util.ChecksumType;
51  import org.junit.Before;
52  import org.junit.Test;
53  import org.junit.experimental.categories.Category;
54  
55  @Category(SmallTests.class)
56  public class TestChecksum {
57    private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
58  
59    static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
60        NONE, GZ };
61  
62    static final int[] BYTES_PER_CHECKSUM = {
63        50, 500, 688, 16*1024, (16*1024+980), 64 * 1024};
64  
65    private static final HBaseTestingUtility TEST_UTIL =
66      new HBaseTestingUtility();
67    private FileSystem fs;
68    private HFileSystem hfs;
69  
70    @Before
71    public void setUp() throws Exception {
72      fs = HFileSystem.get(TEST_UTIL.getConfiguration());
73      hfs = (HFileSystem)fs;
74    }
75  
76    @Test
77    public void testNewBlocksHaveDefaultChecksum() throws IOException {
78      Path path = new Path(TEST_UTIL.getDataTestDir(), "default_checksum");
79      FSDataOutputStream os = fs.create(path);
80      HFileContext meta = new HFileContextBuilder().build();
81      HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
82      DataOutputStream dos = hbw.startWriting(BlockType.DATA);
83      for (int i = 0; i < 1000; ++i)
84        dos.writeInt(i);
85      hbw.writeHeaderAndData(os);
86      int totalSize = hbw.getOnDiskSizeWithHeader();
87      os.close();
88  
89      // Use hbase checksums.
90      assertEquals(true, hfs.useHBaseChecksum());
91  
92      FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
93      meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
94      HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
95          is, totalSize, (HFileSystem) fs, path, meta);
96      HFileBlock b = hbr.readBlockData(0, -1, -1, false);
97      assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
98    }
99  
100   /**
101    * Test all checksum types by writing and reading back blocks.
102    */
103   @Test
104   public void testAllChecksumTypes() throws IOException {
105     List<ChecksumType> cktypes = new ArrayList<>(Arrays.asList(ChecksumType.values()));
106     for (Iterator<ChecksumType> itr = cktypes.iterator(); itr.hasNext(); ) {
107       ChecksumType cktype = itr.next();
108       Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + cktype.getName());
109       FSDataOutputStream os = fs.create(path);
110       HFileContext meta = new HFileContextBuilder()
111           .withChecksumType(cktype).build();
112       HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
113       DataOutputStream dos = hbw.startWriting(BlockType.DATA);
114       for (int i = 0; i < 1000; ++i)
115         dos.writeInt(i);
116       hbw.writeHeaderAndData(os);
117       int totalSize = hbw.getOnDiskSizeWithHeader();
118       os.close();
119 
120       // Use hbase checksums.
121       assertEquals(true, hfs.useHBaseChecksum());
122 
123       FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
124       meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
125       HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
126           is, totalSize, (HFileSystem) fs, path, meta);
127       HFileBlock b = hbr.readBlockData(0, -1, -1, false);
128       ByteBuffer data = b.getBufferWithoutHeader();
129       for (int i = 0; i < 1000; i++) {
130         assertEquals(i, data.getInt());
131       }
132       boolean exception_thrown = false;
133       try {
134         data.getInt();
135       } catch (BufferUnderflowException e) {
136         exception_thrown = true;
137       }
138       assertTrue(exception_thrown);
139       assertEquals(0, HFile.getChecksumFailuresCount());
140     }
141   }
142 
143   /**
144    * Introduce checksum failures and check that we can still read
145    * the data
146    */
147   @Test
148   public void testChecksumCorruption() throws IOException {
149     testChecksumCorruptionInternals(false);
150     testChecksumCorruptionInternals(true);
151   }
152 
153   protected void testChecksumCorruptionInternals(boolean useTags) throws IOException {
154     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
155       for (boolean pread : new boolean[] { false, true }) {
156         LOG.info("testChecksumCorruption: Compression algorithm: " + algo +
157                    ", pread=" + pread);
158         Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
159             + algo);
160         FSDataOutputStream os = fs.create(path);
161         HFileContext meta = new HFileContextBuilder()
162                             .withCompression(algo)
163                             .withIncludesMvcc(true)
164                             .withIncludesTags(useTags)
165                             .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
166                             .build();
167         HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
168         long totalSize = 0;
169         for (int blockId = 0; blockId < 2; ++blockId) {
170           DataOutputStream dos = hbw.startWriting(BlockType.DATA);
171           for (int i = 0; i < 1234; ++i)
172             dos.writeInt(i);
173           hbw.writeHeaderAndData(os);
174           totalSize += hbw.getOnDiskSizeWithHeader();
175         }
176         os.close();
177 
178         // Use hbase checksums. 
179         assertEquals(true, hfs.useHBaseChecksum());
180 
181         // Do a read that purposely introduces checksum verification failures.
182         FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
183         meta = new HFileContextBuilder()
184               .withCompression(algo)
185               .withIncludesMvcc(true)
186               .withIncludesTags(useTags)
187               .withHBaseCheckSum(true)
188               .build();
189         HFileBlock.FSReader hbr = new FSReaderImplTest(is, totalSize, fs, path, meta);
190         HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
191         b.sanityCheck();
192         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
193         assertEquals(algo == GZ ? 2173 : 4936, 
194                      b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
195         // read data back from the hfile, exclude header and checksum
196         ByteBuffer bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data
197         DataInputStream in = new DataInputStream(
198                                new ByteArrayInputStream(
199                                  bb.array(), bb.arrayOffset(), bb.limit()));
200 
201         // assert that we encountered hbase checksum verification failures
202         // but still used hdfs checksums and read data successfully.
203         assertEquals(1, HFile.getChecksumFailuresCount());
204         validateData(in);
205 
206         // A single instance of hbase checksum failure causes the reader to
207         // switch off hbase checksum verification for the next 100 read
208         // requests. Verify that this is correct.
209         for (int i = 0; i < 
210              HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
211           b = hbr.readBlockData(0, -1, -1, pread);
212           assertEquals(0, HFile.getChecksumFailuresCount());
213         }
214         // The next read should have hbase checksum verification reanabled,
215         // we verify this by assertng that there was a hbase-checksum failure.
216         b = hbr.readBlockData(0, -1, -1, pread);
217         assertEquals(1, HFile.getChecksumFailuresCount());
218 
219         // Since the above encountered a checksum failure, we switch
220         // back to not checking hbase checksums.
221         b = hbr.readBlockData(0, -1, -1, pread);
222         assertEquals(0, HFile.getChecksumFailuresCount());
223         is.close();
224 
225         // Now, use a completely new reader. Switch off hbase checksums in 
226         // the configuration. In this case, we should not detect
227         // any retries within hbase. 
228         HFileSystem newfs = new HFileSystem(TEST_UTIL.getConfiguration(), false);
229         assertEquals(false, newfs.useHBaseChecksum());
230         is = new FSDataInputStreamWrapper(newfs, path);
231         hbr = new FSReaderImplTest(is, totalSize, newfs, path, meta);
232         b = hbr.readBlockData(0, -1, -1, pread);
233         is.close();
234         b.sanityCheck();
235         b = b.unpack(meta, hbr);
236         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
237         assertEquals(algo == GZ ? 2173 : 4936, 
238                      b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
239         // read data back from the hfile, exclude header and checksum
240         bb = b.getBufferWithoutHeader(); // read back data
241         in = new DataInputStream(new ByteArrayInputStream(
242                                  bb.array(), bb.arrayOffset(), bb.limit()));
243 
244         // assert that we did not encounter hbase checksum verification failures
245         // but still used hdfs checksums and read data successfully.
246         assertEquals(0, HFile.getChecksumFailuresCount());
247         validateData(in);
248       }
249     }
250   }
251 
252   /** 
253    * Test different values of bytesPerChecksum
254    */
255   @Test
256   public void testChecksumChunks() throws IOException {
257     testChecksumInternals(false);
258     testChecksumInternals(true);
259   }
260 
261   protected void testChecksumInternals(boolean useTags) throws IOException {
262     Compression.Algorithm algo = NONE;
263     for (boolean pread : new boolean[] { false, true }) {
264       for (int bytesPerChecksum : BYTES_PER_CHECKSUM) {
265         Path path = new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" + 
266                              algo + bytesPerChecksum);
267         FSDataOutputStream os = fs.create(path);
268         HFileContext meta = new HFileContextBuilder()
269                             .withCompression(algo)
270                             .withIncludesMvcc(true)
271                             .withIncludesTags(useTags)
272                             .withHBaseCheckSum(true)
273                             .withBytesPerCheckSum(bytesPerChecksum)
274                             .build();
275         HFileBlock.Writer hbw = new HFileBlock.Writer(null,
276            meta);
277 
278         // write one block. The block has data
279         // that is at least 6 times more than the checksum chunk size
280         long dataSize = 0;
281         DataOutputStream dos = hbw.startWriting(BlockType.DATA);
282         for (; dataSize < 6 * bytesPerChecksum;) {
283           for (int i = 0; i < 1234; ++i) {
284             dos.writeInt(i);
285             dataSize += 4;
286           }
287         }
288         hbw.writeHeaderAndData(os);
289         long totalSize = hbw.getOnDiskSizeWithHeader();
290         os.close();
291 
292         long expectedChunks = ChecksumUtil.numChunks(
293                                dataSize + HConstants.HFILEBLOCK_HEADER_SIZE,
294                                bytesPerChecksum);
295         LOG.info("testChecksumChunks: pread=" + pread +
296                    ", bytesPerChecksum=" + bytesPerChecksum +
297                    ", fileSize=" + totalSize +
298                    ", dataSize=" + dataSize +
299                    ", expectedChunks=" + expectedChunks);
300 
301         // Verify hbase checksums. 
302         assertEquals(true, hfs.useHBaseChecksum());
303 
304         // Read data back from file.
305         FSDataInputStream is = fs.open(path);
306         FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path);
307         meta = new HFileContextBuilder()
308                .withCompression(algo)
309                .withIncludesMvcc(true)
310                .withIncludesTags(useTags)
311                .withHBaseCheckSum(true)
312                .withBytesPerCheckSum(bytesPerChecksum)
313                .build();
314         HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(
315             is, nochecksum), totalSize, hfs, path, meta);
316         HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
317         is.close();
318         b.sanityCheck();
319         assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
320 
321         // verify that we have the expected number of checksum chunks
322         assertEquals(totalSize, HConstants.HFILEBLOCK_HEADER_SIZE + dataSize +
323                      expectedChunks * HFileBlock.CHECKSUM_SIZE);
324 
325         // assert that we did not encounter hbase checksum verification failures
326         assertEquals(0, HFile.getChecksumFailuresCount());
327       }
328     }
329   }
330 
331   private void validateData(DataInputStream in) throws IOException {
332     // validate data
333     for (int i = 0; i < 1234; i++) {
334       int val = in.readInt();
335       assertEquals("testChecksumCorruption: data mismatch at index " + i, i, val);
336     }
337   }
338 
339   /**
340    * A class that introduces hbase-checksum failures while 
341    * reading  data from hfiles. This should trigger the hdfs level
342    * checksum validations.
343    */
344   static private class FSReaderImplTest extends HFileBlock.FSReaderImpl {
345     public FSReaderImplTest(FSDataInputStreamWrapper istream, long fileSize, FileSystem fs,
346         Path path, HFileContext meta) throws IOException {
347       super(istream, fileSize, (HFileSystem) fs, path, meta);
348     }
349 
350     @Override
351     protected boolean validateBlockChecksum(HFileBlock block, long offset,
352       byte[] data, int hdrSize) throws IOException {
353       return false;  // checksum validation failure
354     }
355   }
356 }
357