View Javadoc

1   /*
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.io.hfile;
21  
22  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
23  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
24  import static org.junit.Assert.assertEquals;
25  
26  import java.io.ByteArrayInputStream;
27  import java.io.DataInputStream;
28  import java.io.DataOutputStream;
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.fs.FSDataInputStream;
35  import org.apache.hadoop.fs.FSDataOutputStream;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.HBaseTestingUtility;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.SmallTests;
41  import org.apache.hadoop.hbase.fs.HFileSystem;
42  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
43  import org.apache.hadoop.hbase.io.compress.Compression;
44  import org.apache.hadoop.hbase.util.ChecksumType;
45  import org.junit.Before;
46  import org.junit.Test;
47  import org.junit.experimental.categories.Category;
48  
49  @Category(SmallTests.class)
50  public class TestChecksum {
51    // change this value to activate more logs
52    private static final boolean detailedLogging = true;
53    private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
54  
55    private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
56  
57    static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
58        NONE, GZ };
59  
60    static final int[] BYTES_PER_CHECKSUM = {
61        50, 500, 688, 16*1024, (16*1024+980), 64 * 1024};
62  
63    private static final HBaseTestingUtility TEST_UTIL =
64      new HBaseTestingUtility();
65    private FileSystem fs;
66    private HFileSystem hfs;
67  
68    @Before
69    public void setUp() throws Exception {
70      fs = HFileSystem.get(TEST_UTIL.getConfiguration());
71      hfs = (HFileSystem)fs;
72    }
73  
74    /**
75     * Introduce checksum failures and check that we can still read
76     * the data
77     */
78    @Test
79    public void testChecksumCorruption() throws IOException {
80      testChecksumCorruptionInternals(false);
81      testChecksumCorruptionInternals(true);
82    }
83  
84    protected void testChecksumCorruptionInternals(boolean useTags) throws IOException {
85      for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
86        for (boolean pread : new boolean[] { false, true }) {
87          LOG.info("testChecksumCorruption: Compression algorithm: " + algo +
88                     ", pread=" + pread);
89          Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
90              + algo);
91          FSDataOutputStream os = fs.create(path);
92          HFileContext meta = new HFileContextBuilder()
93                              .withCompression(algo)
94                              .withIncludesMvcc(true)
95                              .withIncludesTags(useTags)
96                              .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
97                              .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
98                              .build();
99          HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
100         long totalSize = 0;
101         for (int blockId = 0; blockId < 2; ++blockId) {
102           DataOutputStream dos = hbw.startWriting(BlockType.DATA);
103           for (int i = 0; i < 1234; ++i)
104             dos.writeInt(i);
105           hbw.writeHeaderAndData(os);
106           totalSize += hbw.getOnDiskSizeWithHeader();
107         }
108         os.close();
109 
110         // Use hbase checksums. 
111         assertEquals(true, hfs.useHBaseChecksum());
112 
113         // Do a read that purposely introduces checksum verification failures.
114         FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
115         meta = new HFileContextBuilder()
116               .withCompression(algo)
117               .withIncludesMvcc(true)
118               .withIncludesTags(useTags)
119               .withHBaseCheckSum(true)
120               .build();
121         HFileBlock.FSReader hbr = new FSReaderV2Test(is, totalSize, fs, path, meta);
122         HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
123         b.sanityCheck();
124         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
125         assertEquals(algo == GZ ? 2173 : 4936, 
126                      b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
127         // read data back from the hfile, exclude header and checksum
128         ByteBuffer bb = b.getBufferWithoutHeader(); // read back data
129         DataInputStream in = new DataInputStream(
130                                new ByteArrayInputStream(
131                                  bb.array(), bb.arrayOffset(), bb.limit()));
132 
133         // assert that we encountered hbase checksum verification failures
134         // but still used hdfs checksums and read data successfully.
135         assertEquals(1, HFile.getChecksumFailuresCount());
136         validateData(in);
137 
138         // A single instance of hbase checksum failure causes the reader to
139         // switch off hbase checksum verification for the next 100 read
140         // requests. Verify that this is correct.
141         for (int i = 0; i < 
142              HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
143           b = hbr.readBlockData(0, -1, -1, pread);
144           assertEquals(0, HFile.getChecksumFailuresCount());
145         }
146         // The next read should have hbase checksum verification reanabled,
147         // we verify this by assertng that there was a hbase-checksum failure.
148         b = hbr.readBlockData(0, -1, -1, pread);
149         assertEquals(1, HFile.getChecksumFailuresCount());
150 
151         // Since the above encountered a checksum failure, we switch
152         // back to not checking hbase checksums.
153         b = hbr.readBlockData(0, -1, -1, pread);
154         assertEquals(0, HFile.getChecksumFailuresCount());
155         is.close();
156 
157         // Now, use a completely new reader. Switch off hbase checksums in 
158         // the configuration. In this case, we should not detect
159         // any retries within hbase. 
160         HFileSystem newfs = new HFileSystem(TEST_UTIL.getConfiguration(), false);
161         assertEquals(false, newfs.useHBaseChecksum());
162         is = new FSDataInputStreamWrapper(newfs, path);
163         hbr = new FSReaderV2Test(is, totalSize, newfs, path, meta);
164         b = hbr.readBlockData(0, -1, -1, pread);
165         is.close();
166         b.sanityCheck();
167         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
168         assertEquals(algo == GZ ? 2173 : 4936, 
169                      b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
170         // read data back from the hfile, exclude header and checksum
171         bb = b.getBufferWithoutHeader(); // read back data
172         in = new DataInputStream(new ByteArrayInputStream(
173                                  bb.array(), bb.arrayOffset(), bb.limit()));
174 
175         // assert that we did not encounter hbase checksum verification failures
176         // but still used hdfs checksums and read data successfully.
177         assertEquals(0, HFile.getChecksumFailuresCount());
178         validateData(in);
179       }
180     }
181   }
182 
183   /** 
184    * Test different values of bytesPerChecksum
185    */
186   @Test
187   public void testChecksumChunks() throws IOException {
188     testChecksumInternals(false);
189     testChecksumInternals(true);
190   }
191 
192   protected void testChecksumInternals(boolean useTags) throws IOException {
193     Compression.Algorithm algo = NONE;
194     for (boolean pread : new boolean[] { false, true }) {
195       for (int bytesPerChecksum : BYTES_PER_CHECKSUM) {
196         Path path = new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" + 
197                              algo + bytesPerChecksum);
198         FSDataOutputStream os = fs.create(path);
199         HFileContext meta = new HFileContextBuilder()
200                             .withCompression(algo)
201                             .withIncludesMvcc(true)
202                             .withIncludesTags(useTags)
203                             .withHBaseCheckSum(true)
204                             .withBytesPerCheckSum(bytesPerChecksum)
205                             .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
206                             .build();
207         HFileBlock.Writer hbw = new HFileBlock.Writer(null,
208            meta);
209 
210         // write one block. The block has data
211         // that is at least 6 times more than the checksum chunk size
212         long dataSize = 0;
213         DataOutputStream dos = hbw.startWriting(BlockType.DATA);
214         for (; dataSize < 6 * bytesPerChecksum;) {
215           for (int i = 0; i < 1234; ++i) {
216             dos.writeInt(i);
217             dataSize += 4;
218           }
219         }
220         hbw.writeHeaderAndData(os);
221         long totalSize = hbw.getOnDiskSizeWithHeader();
222         os.close();
223 
224         long expectedChunks = ChecksumUtil.numChunks(
225                                dataSize + HConstants.HFILEBLOCK_HEADER_SIZE,
226                                bytesPerChecksum);
227         LOG.info("testChecksumChunks: pread=" + pread +
228                    ", bytesPerChecksum=" + bytesPerChecksum +
229                    ", fileSize=" + totalSize +
230                    ", dataSize=" + dataSize +
231                    ", expectedChunks=" + expectedChunks);
232 
233         // Verify hbase checksums. 
234         assertEquals(true, hfs.useHBaseChecksum());
235 
236         // Read data back from file.
237         FSDataInputStream is = fs.open(path);
238         FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path);
239         meta = new HFileContextBuilder()
240                .withCompression(algo)
241                .withIncludesMvcc(true)
242                .withIncludesTags(useTags)
243                .withHBaseCheckSum(true)
244                .withBytesPerCheckSum(bytesPerChecksum)
245                .build();
246         HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(
247             is, nochecksum), totalSize, hfs, path, meta);
248         HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
249         is.close();
250         b.sanityCheck();
251         assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
252 
253         // verify that we have the expected number of checksum chunks
254         assertEquals(totalSize, HConstants.HFILEBLOCK_HEADER_SIZE + dataSize +
255                      expectedChunks * HFileBlock.CHECKSUM_SIZE);
256 
257         // assert that we did not encounter hbase checksum verification failures
258         assertEquals(0, HFile.getChecksumFailuresCount());
259       }
260     }
261   }
262 
263   /** 
264    * Test to ensure that these is at least one valid checksum implementation
265    */
266   @Test
267   public void testChecksumAlgorithm() throws IOException {
268     ChecksumType type = ChecksumType.CRC32;
269     assertEquals(ChecksumType.nameToType(type.getName()), type);
270     assertEquals(ChecksumType.valueOf(type.toString()), type);
271   }
272 
273   private void validateData(DataInputStream in) throws IOException {
274     // validate data
275     for (int i = 0; i < 1234; i++) {
276       int val = in.readInt();
277       if (val != i) {
278         String msg = "testChecksumCorruption: data mismatch at index " +
279                      i + " expected " + i + " found " + val;
280         LOG.warn(msg);
281         assertEquals(i, val);
282       }
283     }
284   }
285 
286   /**
287    * A class that introduces hbase-checksum failures while 
288    * reading  data from hfiles. This should trigger the hdfs level
289    * checksum validations.
290    */
291   static private class FSReaderV2Test extends HFileBlock.FSReaderV2 {
292     public FSReaderV2Test(FSDataInputStreamWrapper istream, long fileSize, FileSystem fs,
293         Path path, HFileContext meta) throws IOException {
294       super(istream, fileSize, (HFileSystem) fs, path, meta);
295     }
296 
297     @Override
298     protected boolean validateBlockChecksum(HFileBlock block, 
299       byte[] data, int hdrSize) throws IOException {
300       return false;  // checksum validation failure
301     }
302   }
303 }
304