1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25
26 import java.io.IOException;
27 import java.lang.reflect.Field;
28 import java.util.ArrayList;
29 import java.util.Comparator;
30 import java.util.List;
31 import java.util.Set;
32 import java.util.UUID;
33
34 import org.apache.commons.lang.mutable.MutableBoolean;
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FileStatus;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.hbase.CellScanner;
42 import org.apache.hadoop.hbase.Coprocessor;
43 import org.apache.hadoop.hbase.HBaseConfiguration;
44 import org.apache.hadoop.hbase.HBaseTestingUtility;
45 import org.apache.hadoop.hbase.HColumnDescriptor;
46 import org.apache.hadoop.hbase.HConstants;
47 import org.apache.hadoop.hbase.HRegionInfo;
48 import org.apache.hadoop.hbase.HTableDescriptor;
49 import org.apache.hadoop.hbase.KeyValue;
50 import org.apache.hadoop.hbase.TableName;
51 import org.apache.hadoop.hbase.client.Get;
52 import org.apache.hadoop.hbase.client.Put;
53 import org.apache.hadoop.hbase.client.Result;
54 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
55 import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
56 import org.apache.hadoop.hbase.regionserver.HRegion;
57 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
58 import org.apache.hadoop.hbase.testclassification.MediumTests;
59 import org.apache.hadoop.hbase.util.Bytes;
60 import org.apache.hadoop.hbase.util.EnvironmentEdge;
61 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
62 import org.apache.hadoop.hbase.util.FSUtils;
63 import org.apache.hadoop.hbase.util.Threads;
64 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
65 import org.apache.hadoop.hbase.wal.WAL;
66 import org.apache.hadoop.hbase.wal.WALKey;
67 import org.junit.After;
68 import org.junit.AfterClass;
69 import org.junit.Before;
70 import org.junit.BeforeClass;
71 import org.junit.Rule;
72 import org.junit.Test;
73 import org.junit.experimental.categories.Category;
74 import org.junit.rules.TestName;
75
76
77
78
79 @Category(MediumTests.class)
80 public class TestFSHLog {
81 private static final Log LOG = LogFactory.getLog(TestFSHLog.class);
82
83 protected static Configuration conf;
84 protected static FileSystem fs;
85 protected static Path dir;
86 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
87
88 @Rule
89 public final TestName currentTest = new TestName();
90
91 @Before
92 public void setUp() throws Exception {
93 FileStatus[] entries = fs.listStatus(new Path("/"));
94 for (FileStatus dir : entries) {
95 fs.delete(dir.getPath(), true);
96 }
97 final Path hbaseDir = TEST_UTIL.createRootDir();
98 dir = new Path(hbaseDir, currentTest.getMethodName());
99 }
100
101 @After
102 public void tearDown() throws Exception {
103 }
104
105 @BeforeClass
106 public static void setUpBeforeClass() throws Exception {
107
108 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
109
110 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
111 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
112 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
113
114
115 TEST_UTIL.getConfiguration()
116 .setInt("hbase.ipc.client.connect.max.retries", 1);
117 TEST_UTIL.getConfiguration().setInt(
118 "dfs.client.block.recovery.retries", 1);
119 TEST_UTIL.getConfiguration().setInt(
120 "hbase.ipc.client.connection.maxidletime", 500);
121 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
122 SampleRegionWALObserver.class.getName());
123 TEST_UTIL.startMiniDFSCluster(3);
124
125 conf = TEST_UTIL.getConfiguration();
126 fs = TEST_UTIL.getDFSCluster().getFileSystem();
127 }
128
129 @AfterClass
130 public static void tearDownAfterClass() throws Exception {
131 TEST_UTIL.shutdownMiniCluster();
132 }
133
134
135
136
137 @Test
138 public void testWALCoprocessorLoaded() throws Exception {
139
140 FSHLog log = null;
141 try {
142 log = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
143 HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
144 WALCoprocessorHost host = log.getCoprocessorHost();
145 Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
146 assertNotNull(c);
147 } finally {
148 if (log != null) {
149 log.close();
150 }
151 }
152 }
153
154 protected void addEdits(WAL log,
155 HRegionInfo hri,
156 HTableDescriptor htd,
157 int times,
158 MultiVersionConcurrencyControl mvcc)
159 throws IOException {
160 final byte[] row = Bytes.toBytes("row");
161 for (int i = 0; i < times; i++) {
162 long timestamp = System.currentTimeMillis();
163 WALEdit cols = new WALEdit();
164 cols.add(new KeyValue(row, row, row, timestamp, row));
165 WALKey key = new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(),
166 WALKey.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
167 HConstants.NO_NONCE, mvcc);
168 log.append(htd, hri, key, cols, true);
169 }
170 log.sync();
171 }
172
173
174
175
176
177
178 protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
179 wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
180 wal.completeCacheFlush(regionEncodedName);
181 }
182
183
184
185
186
187
188 @Test
189 public void testWALComparator() throws Exception {
190 FSHLog wal1 = null;
191 FSHLog walMeta = null;
192 try {
193 wal1 = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
194 HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
195 LOG.debug("Log obtained is: " + wal1);
196 Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
197 Path p1 = wal1.computeFilename(11);
198 Path p2 = wal1.computeFilename(12);
199
200 assertTrue(comp.compare(p1, p1) == 0);
201
202 assertTrue(comp.compare(p1, p2) < 0);
203 walMeta = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
204 HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null,
205 DefaultWALProvider.META_WAL_PROVIDER_ID);
206 Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
207
208 Path p1WithMeta = walMeta.computeFilename(11);
209 Path p2WithMeta = walMeta.computeFilename(12);
210 assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
211 assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
212
213 boolean ex = false;
214 try {
215 comp.compare(p1WithMeta, p2);
216 } catch (IllegalArgumentException e) {
217 ex = true;
218 }
219 assertTrue("Comparator doesn't complain while checking meta log files", ex);
220 boolean exMeta = false;
221 try {
222 compMeta.compare(p1WithMeta, p2);
223 } catch (IllegalArgumentException e) {
224 exMeta = true;
225 }
226 assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
227 } finally {
228 if (wal1 != null) {
229 wal1.close();
230 }
231 if (walMeta != null) {
232 walMeta.close();
233 }
234 }
235 }
236
237
238
239
240
241
242
243
244
245
246 @Test
247 public void testFindMemStoresEligibleForFlush() throws Exception {
248 LOG.debug("testFindMemStoresEligibleForFlush");
249 Configuration conf1 = HBaseConfiguration.create(conf);
250 conf1.setInt("hbase.regionserver.maxlogs", 1);
251 FSHLog wal = new FSHLog(fs, FSUtils.getRootDir(conf1), dir.toString(),
252 HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
253 HTableDescriptor t1 =
254 new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
255 HTableDescriptor t2 =
256 new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row"));
257 HRegionInfo hri1 =
258 new HRegionInfo(t1.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
259 HRegionInfo hri2 =
260 new HRegionInfo(t2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
261
262 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
263 try {
264 addEdits(wal, hri1, t1, 2, mvcc);
265 wal.rollWriter();
266
267 addEdits(wal, hri1, t1, 2, mvcc);
268 wal.rollWriter();
269
270 assertTrue(wal.getNumRolledLogFiles() == 2);
271
272
273
274 byte[][] regionsToFlush = wal.findRegionsToForceFlush();
275 assertEquals(1, regionsToFlush.length);
276 assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
277
278 addEdits(wal, hri2, t2, 2, mvcc);
279
280 regionsToFlush = wal.findRegionsToForceFlush();
281 assertEquals(regionsToFlush.length, 1);
282 assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
283
284
285 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
286 wal.rollWriter();
287
288 assertEquals(1, wal.getNumRolledLogFiles());
289
290 flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
291 wal.rollWriter(true);
292
293 assertEquals(0, wal.getNumRolledLogFiles());
294
295 addEdits(wal, hri1, t1, 2, mvcc);
296 addEdits(wal, hri2, t2, 2, mvcc);
297 wal.rollWriter();
298
299 assertEquals(1, wal.getNumRolledLogFiles());
300 addEdits(wal, hri1, t1, 2, mvcc);
301 wal.rollWriter();
302
303
304 regionsToFlush = wal.findRegionsToForceFlush();
305 assertEquals(2, regionsToFlush.length);
306
307 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
308 flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
309 wal.rollWriter(true);
310 assertEquals(0, wal.getNumRolledLogFiles());
311
312 addEdits(wal, hri1, t1, 2, mvcc);
313
314 wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
315 wal.rollWriter();
316 wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
317 assertEquals(1, wal.getNumRolledLogFiles());
318 } finally {
319 if (wal != null) {
320 wal.close();
321 }
322 }
323 }
324
325 @Test(expected=IOException.class)
326 public void testFailedToCreateWALIfParentRenamed() throws IOException {
327 final String name = "testFailedToCreateWALIfParentRenamed";
328 FSHLog log = new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME,
329 conf, null, true, null, null);
330 long filenum = System.currentTimeMillis();
331 Path path = log.computeFilename(filenum);
332 log.createWriterInstance(path);
333 Path parent = path.getParent();
334 path = log.computeFilename(filenum + 1);
335 Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
336 fs.rename(parent, newPath);
337 log.createWriterInstance(path);
338 fail("It should fail to create the new WAL");
339 }
340
341
342
343
344
345
346
347
348
349 @Test
350 public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
351 String testName = "testFlushSequenceIdIsGreaterThanAllEditsInHFile";
352 final TableName tableName = TableName.valueOf(testName);
353 final HRegionInfo hri = new HRegionInfo(tableName);
354 final byte[] rowName = tableName.getName();
355 final HTableDescriptor htd = new HTableDescriptor(tableName);
356 htd.addFamily(new HColumnDescriptor("f"));
357 HRegion r = HRegion.createHRegion(hri, TEST_UTIL.getDefaultRootDirPath(),
358 TEST_UTIL.getConfiguration(), htd);
359 HRegion.closeHRegion(r);
360 final int countPerFamily = 10;
361 final MutableBoolean goslow = new MutableBoolean(false);
362
363 FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDefaultRootDirPath(),
364 testName, conf) {
365 @Override
366 void atHeadOfRingBufferEventHandlerAppend() {
367 if (goslow.isTrue()) {
368 Threads.sleep(100);
369 LOG.debug("Sleeping before appending 100ms");
370 }
371 super.atHeadOfRingBufferEventHandlerAppend();
372 }
373 };
374 HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
375 TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
376 EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
377 try {
378 List<Put> puts = null;
379 for (HColumnDescriptor hcd: htd.getFamilies()) {
380 puts =
381 TestWALReplay.addRegionEdits(rowName, hcd.getName(), countPerFamily, ee, region, "x");
382 }
383
384
385 final Get g = new Get(rowName);
386 Result result = region.get(g);
387 assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
388
389
390 WALEdit edits = new WALEdit();
391 for (Put p: puts) {
392 CellScanner cs = p.cellScanner();
393 while (cs.advance()) {
394 edits.add(cs.current());
395 }
396 }
397
398 List<UUID> clusterIds = new ArrayList<UUID>();
399 clusterIds.add(UUID.randomUUID());
400
401 goslow.setValue(true);
402 for (int i = 0; i < countPerFamily; i++) {
403 final HRegionInfo info = region.getRegionInfo();
404 final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
405 System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC());
406 wal.append(htd, info, logkey, edits, true);
407 }
408 region.flush(true);
409
410 long currentSequenceId = region.getSequenceId();
411
412 goslow.setValue(false);
413 synchronized (goslow) {
414 goslow.notifyAll();
415 }
416 assertTrue(currentSequenceId >= region.getSequenceId());
417 } finally {
418 region.close(true);
419 wal.close();
420 }
421 }
422
423 @Test
424 public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException,
425 SecurityException, IllegalArgumentException, IllegalAccessException {
426 final String name = "testSyncRunnerIndexOverflow";
427 FSHLog log =
428 new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME, conf,
429 null, true, null, null);
430 try {
431 Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler");
432 ringBufferEventHandlerField.setAccessible(true);
433 FSHLog.RingBufferEventHandler ringBufferEventHandler =
434 (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
435 Field syncRunnerIndexField =
436 FSHLog.RingBufferEventHandler.class.getDeclaredField("syncRunnerIndex");
437 syncRunnerIndexField.setAccessible(true);
438 syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1);
439 HTableDescriptor htd =
440 new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
441 HRegionInfo hri =
442 new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
443 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
444 for (int i = 0; i < 10; i++) {
445 addEdits(log, hri, htd, 1, mvcc);
446 }
447 } finally {
448 log.close();
449 }
450 }
451 }