View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import static org.junit.Assert.assertEquals;
22  
23  import java.io.IOException;
24  
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.fs.FileSystem;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.hbase.HBaseTestingUtility;
29  import org.apache.hadoop.hbase.HColumnDescriptor;
30  import org.apache.hadoop.hbase.HRegionInfo;
31  import org.apache.hadoop.hbase.HTableDescriptor;
32  import org.apache.hadoop.hbase.MediumTests;
33  import org.apache.hadoop.hbase.TableName;
34  import org.apache.hadoop.hbase.client.Durability;
35  import org.apache.hadoop.hbase.client.Increment;
36  import org.apache.hadoop.hbase.client.Put;
37  import org.apache.hadoop.hbase.client.Result;
38  import org.apache.hadoop.hbase.regionserver.HRegion;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hdfs.MiniDFSCluster;
41  import org.junit.AfterClass;
42  import org.junit.BeforeClass;
43  import org.junit.Test;
44  import org.junit.experimental.categories.Category;
45  
46  /**
47   * Tests for HLog write durability
48   */
49  @Category(MediumTests.class)
50  public class TestDurability {
51    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
52    private static FileSystem FS;
53    private static MiniDFSCluster CLUSTER;
54    private static Configuration CONF;
55    private static Path DIR;
56  
57    private static byte[] FAMILY = Bytes.toBytes("family");
58    private static byte[] ROW = Bytes.toBytes("row");
59    private static byte[] COL = Bytes.toBytes("col");
60  
61  
62    @BeforeClass
63    public static void setUpBeforeClass() throws Exception {
64      CONF = TEST_UTIL.getConfiguration();
65      TEST_UTIL.startMiniDFSCluster(1);
66  
67      CLUSTER = TEST_UTIL.getDFSCluster();
68      FS = CLUSTER.getFileSystem();
69      DIR = TEST_UTIL.getDataTestDirOnTestFS("TestDurability");
70    }
71  
72    @AfterClass
73    public static void tearDownAfterClass() throws Exception {
74      TEST_UTIL.shutdownMiniCluster();
75    }
76  
77    @Test
78    public void testDurability() throws Exception {
79      HLog wal = HLogFactory.createHLog(FS, DIR, "hlogdir",
80          "hlogdir_archive", CONF);
81      byte[] tableName = Bytes.toBytes("TestDurability");
82      HRegion region = createHRegion(tableName, "region", wal, false);
83      HRegion deferredRegion = createHRegion(tableName, "deferredRegion", wal, true);
84  
85      region.put(newPut(null));
86      verifyHLogCount(wal, 1);
87  
88      // a put through the deferred table does not write to the wal immdiately,
89      // but maybe has been successfully sync-ed by the underlying AsyncWriter +
90      // AsyncFlusher thread
91      deferredRegion.put(newPut(null));
92      // but will after we sync the wal
93      wal.sync();
94      verifyHLogCount(wal, 2);
95  
96      // a put through a deferred table will be sync with the put sync'ed put
97      deferredRegion.put(newPut(null));
98      wal.sync();
99      verifyHLogCount(wal, 3);
100     region.put(newPut(null));
101     verifyHLogCount(wal, 4);
102 
103     // a put through a deferred table will be sync with the put sync'ed put
104     deferredRegion.put(newPut(Durability.USE_DEFAULT));
105     wal.sync();
106     verifyHLogCount(wal, 5);
107     region.put(newPut(Durability.USE_DEFAULT));
108     verifyHLogCount(wal, 6);
109 
110     // SKIP_WAL never writes to the wal
111     region.put(newPut(Durability.SKIP_WAL));
112     deferredRegion.put(newPut(Durability.SKIP_WAL));
113     verifyHLogCount(wal, 6);
114     wal.sync();
115     verifyHLogCount(wal, 6);
116 
117     // async overrides sync table default
118     region.put(newPut(Durability.ASYNC_WAL));
119     deferredRegion.put(newPut(Durability.ASYNC_WAL));
120     wal.sync();
121     verifyHLogCount(wal, 8);
122 
123     // sync overrides async table default
124     region.put(newPut(Durability.SYNC_WAL));
125     deferredRegion.put(newPut(Durability.SYNC_WAL));
126     verifyHLogCount(wal, 10);
127 
128     // fsync behaves like sync
129     region.put(newPut(Durability.FSYNC_WAL));
130     deferredRegion.put(newPut(Durability.FSYNC_WAL));
131     verifyHLogCount(wal, 12);
132   }
133 
134   @Test
135   public void testIncrement() throws Exception {
136     byte[] row1 = Bytes.toBytes("row1");
137     byte[] col1 = Bytes.toBytes("col1");
138     byte[] col2 = Bytes.toBytes("col2");
139     byte[] col3 = Bytes.toBytes("col3");
140 
141     // Setting up region
142     HLog wal = HLogFactory.createHLog(FS, DIR, "myhlogdir",
143         "myhlogdir_archive", CONF);
144     byte[] tableName = Bytes.toBytes("TestIncrement");
145     HRegion region = createHRegion(tableName, "increment", wal, false);
146 
147     // col1: amount = 1, 1 write back to WAL
148     Increment inc1 = new Increment(row1);
149     inc1.addColumn(FAMILY, col1, 1);
150     Result res = region.increment(inc1);
151     assertEquals(1, res.size());
152     assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
153     verifyHLogCount(wal, 1);
154 
155     // col1: amount = 0, 0 write back to WAL
156     inc1 = new Increment(row1);
157     inc1.addColumn(FAMILY, col1, 0);
158     res = region.increment(inc1);
159     assertEquals(1, res.size());
160     assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
161     verifyHLogCount(wal, 1);
162 
163     // col1: amount = 0, col2: amount = 0, col3: amount = 0
164     // 0 write back to WAL
165     inc1 = new Increment(row1);
166     inc1.addColumn(FAMILY, col1, 0);
167     inc1.addColumn(FAMILY, col2, 0);
168     inc1.addColumn(FAMILY, col3, 0);
169     res = region.increment(inc1);
170     assertEquals(3, res.size());
171     assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
172     assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col2)));
173     assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col3)));
174     verifyHLogCount(wal, 1);
175 
176     // col1: amount = 5, col2: amount = 4, col3: amount = 3
177     // 1 write back to WAL
178     inc1 = new Increment(row1);
179     inc1.addColumn(FAMILY, col1, 5);
180     inc1.addColumn(FAMILY, col2, 4);
181     inc1.addColumn(FAMILY, col3, 3);
182     res = region.increment(inc1);
183     assertEquals(3, res.size());
184     assertEquals(6, Bytes.toLong(res.getValue(FAMILY, col1)));
185     assertEquals(4, Bytes.toLong(res.getValue(FAMILY, col2)));
186     assertEquals(3, Bytes.toLong(res.getValue(FAMILY, col3)));
187     verifyHLogCount(wal, 2);
188   }
189 
190   private Put newPut(Durability durability) {
191     Put p = new Put(ROW);
192     p.add(FAMILY, COL, COL);
193     if (durability != null) {
194       p.setDurability(durability);
195     }
196     return p;
197   }
198 
199   private void verifyHLogCount(HLog log, int expected) throws Exception {
200     Path walPath = ((FSHLog) log).computeFilename();
201     HLog.Reader reader = HLogFactory.createReader(FS, walPath, CONF);
202     int count = 0;
203     HLog.Entry entry = new HLog.Entry();
204     while (reader.next(entry) != null) count++;
205     reader.close();
206     assertEquals(expected, count);
207   }
208 
209   // lifted from TestAtomicOperation
210   private HRegion createHRegion (byte [] tableName, String callingMethod,
211       HLog log, boolean isAsyncLogFlush)
212     throws IOException {
213       HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
214       htd.setDeferredLogFlush(isAsyncLogFlush);
215       HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
216       htd.addFamily(hcd);
217       HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
218       Path path = new Path(DIR + callingMethod);
219       if (FS.exists(path)) {
220         if (!FS.delete(path, true)) {
221           throw new IOException("Failed delete of " + path);
222         }
223       }
224       return HRegion.createHRegion(info, path, CONF, htd, log);
225     }
226 
227 }