1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import static org.junit.Assert.assertEquals;
22
23 import java.io.IOException;
24
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.fs.FileSystem;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.hbase.HBaseTestingUtility;
29 import org.apache.hadoop.hbase.HColumnDescriptor;
30 import org.apache.hadoop.hbase.HRegionInfo;
31 import org.apache.hadoop.hbase.HTableDescriptor;
32 import org.apache.hadoop.hbase.MediumTests;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.client.Durability;
35 import org.apache.hadoop.hbase.client.Increment;
36 import org.apache.hadoop.hbase.client.Put;
37 import org.apache.hadoop.hbase.client.Result;
38 import org.apache.hadoop.hbase.regionserver.HRegion;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hdfs.MiniDFSCluster;
41 import org.junit.AfterClass;
42 import org.junit.BeforeClass;
43 import org.junit.Test;
44 import org.junit.experimental.categories.Category;
45
46
47
48
49 @Category(MediumTests.class)
50 public class TestDurability {
51 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
52 private static FileSystem FS;
53 private static MiniDFSCluster CLUSTER;
54 private static Configuration CONF;
55 private static Path DIR;
56
57 private static byte[] FAMILY = Bytes.toBytes("family");
58 private static byte[] ROW = Bytes.toBytes("row");
59 private static byte[] COL = Bytes.toBytes("col");
60
61
62 @BeforeClass
63 public static void setUpBeforeClass() throws Exception {
64 CONF = TEST_UTIL.getConfiguration();
65 TEST_UTIL.startMiniDFSCluster(1);
66
67 CLUSTER = TEST_UTIL.getDFSCluster();
68 FS = CLUSTER.getFileSystem();
69 DIR = TEST_UTIL.getDataTestDirOnTestFS("TestDurability");
70 }
71
72 @AfterClass
73 public static void tearDownAfterClass() throws Exception {
74 TEST_UTIL.shutdownMiniCluster();
75 }
76
77 @Test
78 public void testDurability() throws Exception {
79 HLog wal = HLogFactory.createHLog(FS, DIR, "hlogdir",
80 "hlogdir_archive", CONF);
81 byte[] tableName = Bytes.toBytes("TestDurability");
82 HRegion region = createHRegion(tableName, "region", wal, false);
83 HRegion deferredRegion = createHRegion(tableName, "deferredRegion", wal, true);
84
85 region.put(newPut(null));
86 verifyHLogCount(wal, 1);
87
88
89
90
91 deferredRegion.put(newPut(null));
92
93 wal.sync();
94 verifyHLogCount(wal, 2);
95
96
97 deferredRegion.put(newPut(null));
98 wal.sync();
99 verifyHLogCount(wal, 3);
100 region.put(newPut(null));
101 verifyHLogCount(wal, 4);
102
103
104 deferredRegion.put(newPut(Durability.USE_DEFAULT));
105 wal.sync();
106 verifyHLogCount(wal, 5);
107 region.put(newPut(Durability.USE_DEFAULT));
108 verifyHLogCount(wal, 6);
109
110
111 region.put(newPut(Durability.SKIP_WAL));
112 deferredRegion.put(newPut(Durability.SKIP_WAL));
113 verifyHLogCount(wal, 6);
114 wal.sync();
115 verifyHLogCount(wal, 6);
116
117
118 region.put(newPut(Durability.ASYNC_WAL));
119 deferredRegion.put(newPut(Durability.ASYNC_WAL));
120 wal.sync();
121 verifyHLogCount(wal, 8);
122
123
124 region.put(newPut(Durability.SYNC_WAL));
125 deferredRegion.put(newPut(Durability.SYNC_WAL));
126 verifyHLogCount(wal, 10);
127
128
129 region.put(newPut(Durability.FSYNC_WAL));
130 deferredRegion.put(newPut(Durability.FSYNC_WAL));
131 verifyHLogCount(wal, 12);
132 }
133
134 @Test
135 public void testIncrement() throws Exception {
136 byte[] row1 = Bytes.toBytes("row1");
137 byte[] col1 = Bytes.toBytes("col1");
138 byte[] col2 = Bytes.toBytes("col2");
139 byte[] col3 = Bytes.toBytes("col3");
140
141
142 HLog wal = HLogFactory.createHLog(FS, DIR, "myhlogdir",
143 "myhlogdir_archive", CONF);
144 byte[] tableName = Bytes.toBytes("TestIncrement");
145 HRegion region = createHRegion(tableName, "increment", wal, false);
146
147
148 Increment inc1 = new Increment(row1);
149 inc1.addColumn(FAMILY, col1, 1);
150 Result res = region.increment(inc1);
151 assertEquals(1, res.size());
152 assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
153 verifyHLogCount(wal, 1);
154
155
156 inc1 = new Increment(row1);
157 inc1.addColumn(FAMILY, col1, 0);
158 res = region.increment(inc1);
159 assertEquals(1, res.size());
160 assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
161 verifyHLogCount(wal, 1);
162
163
164
165 inc1 = new Increment(row1);
166 inc1.addColumn(FAMILY, col1, 0);
167 inc1.addColumn(FAMILY, col2, 0);
168 inc1.addColumn(FAMILY, col3, 0);
169 res = region.increment(inc1);
170 assertEquals(3, res.size());
171 assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
172 assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col2)));
173 assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col3)));
174 verifyHLogCount(wal, 1);
175
176
177
178 inc1 = new Increment(row1);
179 inc1.addColumn(FAMILY, col1, 5);
180 inc1.addColumn(FAMILY, col2, 4);
181 inc1.addColumn(FAMILY, col3, 3);
182 res = region.increment(inc1);
183 assertEquals(3, res.size());
184 assertEquals(6, Bytes.toLong(res.getValue(FAMILY, col1)));
185 assertEquals(4, Bytes.toLong(res.getValue(FAMILY, col2)));
186 assertEquals(3, Bytes.toLong(res.getValue(FAMILY, col3)));
187 verifyHLogCount(wal, 2);
188 }
189
190 private Put newPut(Durability durability) {
191 Put p = new Put(ROW);
192 p.add(FAMILY, COL, COL);
193 if (durability != null) {
194 p.setDurability(durability);
195 }
196 return p;
197 }
198
199 private void verifyHLogCount(HLog log, int expected) throws Exception {
200 Path walPath = ((FSHLog) log).computeFilename();
201 HLog.Reader reader = HLogFactory.createReader(FS, walPath, CONF);
202 int count = 0;
203 HLog.Entry entry = new HLog.Entry();
204 while (reader.next(entry) != null) count++;
205 reader.close();
206 assertEquals(expected, count);
207 }
208
209
210 private HRegion createHRegion (byte [] tableName, String callingMethod,
211 HLog log, boolean isAsyncLogFlush)
212 throws IOException {
213 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
214 htd.setDeferredLogFlush(isAsyncLogFlush);
215 HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
216 htd.addFamily(hcd);
217 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
218 Path path = new Path(DIR + callingMethod);
219 if (FS.exists(path)) {
220 if (!FS.delete(path, true)) {
221 throw new IOException("Failed delete of " + path);
222 }
223 }
224 return HRegion.createHRegion(info, path, CONF, htd, log);
225 }
226
227 }