1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertTrue;
25
26 import java.io.EOFException;
27 import java.io.IOException;
28 import java.io.OutputStream;
29 import java.lang.reflect.InvocationTargetException;
30 import java.lang.reflect.Method;
31 import java.util.ArrayList;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Set;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.commons.logging.impl.Log4JLogger;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.hbase.HBaseTestingUtility;
42 import org.apache.hadoop.hbase.HColumnDescriptor;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HRegionInfo;
45 import org.apache.hadoop.hbase.HTableDescriptor;
46 import org.apache.hadoop.hbase.KeyValue;
47 import org.apache.hadoop.hbase.LargeTests;
48 import org.apache.hadoop.hbase.MiniHBaseCluster;
49 import org.apache.hadoop.hbase.TableName;
50 import org.apache.hadoop.hbase.client.Get;
51 import org.apache.hadoop.hbase.client.HBaseAdmin;
52 import org.apache.hadoop.hbase.client.HTable;
53 import org.apache.hadoop.hbase.client.Put;
54 import org.apache.hadoop.hbase.client.Result;
55 import org.apache.hadoop.hbase.client.ResultScanner;
56 import org.apache.hadoop.hbase.client.Scan;
57 import org.apache.hadoop.hbase.fs.HFileSystem;
58 import org.apache.hadoop.hbase.regionserver.HRegion;
59 import org.apache.hadoop.hbase.regionserver.HRegionServer;
60 import org.apache.hadoop.hbase.regionserver.Store;
61 import org.apache.hadoop.hbase.util.Bytes;
62 import org.apache.hadoop.hbase.util.FSUtils;
63 import org.apache.hadoop.hbase.util.JVMClusterUtil;
64 import org.apache.hadoop.hbase.util.Threads;
65 import org.apache.hadoop.hdfs.DFSClient;
66 import org.apache.hadoop.hdfs.MiniDFSCluster;
67 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
68 import org.apache.hadoop.hdfs.server.datanode.DataNode;
69 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
70 import org.apache.log4j.Level;
71 import org.junit.After;
72 import org.junit.Assert;
73 import org.junit.Before;
74 import org.junit.BeforeClass;
75 import org.junit.Test;
76 import org.junit.experimental.categories.Category;
77
78
79
80
81 @Category(LargeTests.class)
82 public class TestLogRolling {
83 private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
84 private HRegionServer server;
85 private HLog log;
86 private String tableName;
87 private byte[] value;
88 private FileSystem fs;
89 private MiniDFSCluster dfsCluster;
90 private HBaseAdmin admin;
91 private MiniHBaseCluster cluster;
92 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
93
94
95 {
96 ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
97 ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
98 ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
99 .getLogger().setLevel(Level.ALL);
100 ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
101 ((Log4JLogger)HRegionServer.LOG).getLogger().setLevel(Level.ALL);
102 ((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.ALL);
103 ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
104 }
105
106
107
108
109
110 public TestLogRolling() {
111 this.server = null;
112 this.log = null;
113 this.tableName = null;
114
115 String className = this.getClass().getName();
116 StringBuilder v = new StringBuilder(className);
117 while (v.length() < 1000) {
118 v.append(className);
119 }
120 this.value = Bytes.toBytes(v.toString());
121 }
122
123
124
125 @BeforeClass
126 public static void setUpBeforeClass() throws Exception {
127
128
129 System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
130
131
132
133 TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L);
134
135
136 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
137
138 TEST_UTIL.getConfiguration().setInt(
139 "hbase.regionserver.logroll.errors.tolerated", 2);
140 TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000);
141 TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
142 TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
143
144
145 TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2);
146
147
148 TEST_UTIL.getConfiguration().setInt(
149 HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192);
150
151
152 TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000);
153
154
155
156 TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
157
158
159
160 TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
161
162
163 TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
164 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
165
166
167 TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
168 TEST_UTIL.getConfiguration().setInt(
169 "hbase.regionserver.hlog.tolerable.lowreplication", 2);
170 TEST_UTIL.getConfiguration().setInt(
171 "hbase.regionserver.hlog.lowreplication.rolllimit", 3);
172 }
173
174 @Before
175 public void setUp() throws Exception {
176 TEST_UTIL.startMiniCluster(1, 1, 2);
177
178 cluster = TEST_UTIL.getHBaseCluster();
179 dfsCluster = TEST_UTIL.getDFSCluster();
180 fs = TEST_UTIL.getTestFileSystem();
181 admin = TEST_UTIL.getHBaseAdmin();
182
183
184 cluster.getMaster().balanceSwitch(false);
185 }
186
187 @After
188 public void tearDown() throws Exception {
189 TEST_UTIL.shutdownMiniCluster();
190 }
191
192 private void startAndWriteData() throws IOException, InterruptedException {
193
194 new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
195 this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
196 this.log = server.getWAL();
197
198 HTable table = createTestTable(this.tableName);
199
200 server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
201 this.log = server.getWAL();
202 for (int i = 1; i <= 256; i++) {
203 doPut(table, i);
204 if (i % 32 == 0) {
205
206 try {
207 Thread.sleep(2000);
208 } catch (InterruptedException e) {
209
210 }
211 }
212 }
213 }
214
215
216
217
218
219
220 @Test
221 public void testLogRolling() throws Exception {
222 this.tableName = getName();
223 startAndWriteData();
224 LOG.info("after writing there are " + ((FSHLog) log).getNumRolledLogFiles() + " log files");
225
226
227
228 List<HRegion> regions =
229 new ArrayList<HRegion>(server.getOnlineRegionsLocalContext());
230 for (HRegion r: regions) {
231 r.flushcache();
232 }
233
234
235 log.rollWriter();
236
237 int count = ((FSHLog) log).getNumRolledLogFiles();
238 LOG.info("after flushing all regions and rolling logs there are " +
239 ((FSHLog) log).getNumRolledLogFiles() + " log files");
240 assertTrue(("actual count: " + count), count <= 2);
241 }
242
243 private static String getName() {
244 return "TestLogRolling";
245 }
246
247 void writeData(HTable table, int rownum) throws IOException {
248 doPut(table, rownum);
249
250
251 try {
252 Thread.sleep(2000);
253 } catch (InterruptedException e) {
254
255 }
256 }
257
258 void validateData(HTable table, int rownum) throws IOException {
259 String row = "row" + String.format("%1$04d", rownum);
260 Get get = new Get(Bytes.toBytes(row));
261 get.addFamily(HConstants.CATALOG_FAMILY);
262 Result result = table.get(get);
263 assertTrue(result.size() == 1);
264 assertTrue(Bytes.equals(value,
265 result.getValue(HConstants.CATALOG_FAMILY, null)));
266 LOG.info("Validated row " + row);
267 }
268
269 void batchWriteAndWait(HTable table, int start, boolean expect, int timeout)
270 throws IOException {
271 for (int i = 0; i < 10; i++) {
272 Put put = new Put(Bytes.toBytes("row"
273 + String.format("%1$04d", (start + i))));
274 put.add(HConstants.CATALOG_FAMILY, null, value);
275 table.put(put);
276 }
277 Put tmpPut = new Put(Bytes.toBytes("tmprow"));
278 tmpPut.add(HConstants.CATALOG_FAMILY, null, value);
279 long startTime = System.currentTimeMillis();
280 long remaining = timeout;
281 while (remaining > 0) {
282 if (log.isLowReplicationRollEnabled() == expect) {
283 break;
284 } else {
285
286 table.put(tmpPut);
287 try {
288 Thread.sleep(200);
289 } catch (InterruptedException e) {
290
291 }
292 remaining = timeout - (System.currentTimeMillis() - startTime);
293 }
294 }
295 }
296
297
298
299
300 DatanodeInfo[] getPipeline(HLog log) throws IllegalArgumentException,
301 IllegalAccessException, InvocationTargetException {
302 OutputStream stm = ((FSHLog) log).getOutputStream();
303 Method getPipeline = null;
304 for (Method m : stm.getClass().getDeclaredMethods()) {
305 if (m.getName().endsWith("getPipeline")) {
306 getPipeline = m;
307 getPipeline.setAccessible(true);
308 break;
309 }
310 }
311
312 assertTrue("Need DFSOutputStream.getPipeline() for this test",
313 null != getPipeline);
314 Object repl = getPipeline.invoke(stm, new Object[] {}
315 return (DatanodeInfo[]) repl;
316 }
317
318
319
320
321
322
323 @Test
324 public void testLogRollOnDatanodeDeath() throws Exception {
325 assertTrue("This test requires HLog file replication set to 2.",
326 fs.getDefaultReplication() == 2);
327 LOG.info("Replication=" + fs.getDefaultReplication());
328
329 this.server = cluster.getRegionServer(0);
330 this.log = server.getWAL();
331
332
333 String tableName = getName();
334 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
335 desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
336
337 admin.createTable(desc);
338 HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
339 assertTrue(table.isAutoFlush());
340
341 server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
342 this.log = server.getWAL();
343
344 assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
345
346 assertTrue("Need append support for this test", FSUtils
347 .isAppendSupported(TEST_UTIL.getConfiguration()));
348
349
350
351
352
353 List<DataNode> existingNodes = dfsCluster.getDataNodes();
354 int numDataNodes = 3;
355 dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true,
356 null, null);
357 List<DataNode> allNodes = dfsCluster.getDataNodes();
358 for (int i = allNodes.size()-1; i >= 0; i--) {
359 if (existingNodes.contains(allNodes.get(i))) {
360 dfsCluster.stopDataNode( i );
361 }
362 }
363
364 assertTrue("DataNodes " + dfsCluster.getDataNodes().size() +
365 " default replication " + fs.getDefaultReplication(),
366 dfsCluster.getDataNodes().size() >= fs.getDefaultReplication() + 1);
367
368 writeData(table, 2);
369
370 long curTime = System.currentTimeMillis();
371 long oldFilenum = ((FSHLog) log).getFilenum();
372 assertTrue("Log should have a timestamp older than now",
373 curTime > oldFilenum && oldFilenum != -1);
374
375 assertTrue("The log shouldn't have rolled yet",
376 oldFilenum == ((FSHLog) log).getFilenum());
377 final DatanodeInfo[] pipeline = getPipeline(log);
378 assertTrue(pipeline.length == fs.getDefaultReplication());
379
380
381
382 assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null);
383
384
385 writeData(table, 2);
386 long newFilenum = ((FSHLog) log).getFilenum();
387
388 assertTrue("Missing datanode should've triggered a log roll",
389 newFilenum > oldFilenum && newFilenum > curTime);
390
391
392 writeData(table, 3);
393 assertTrue("The log should not roll again.",
394 ((FSHLog) log).getFilenum() == newFilenum);
395
396
397 assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
398
399 batchWriteAndWait(table, 3, false, 14000);
400 assertTrue("LowReplication Roller should've been disabled, current replication="
401 + ((FSHLog) log).getLogReplication(),
402 !log.isLowReplicationRollEnabled());
403
404 dfsCluster
405 .startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
406
407
408
409 log.rollWriter(true);
410 batchWriteAndWait(table, 13, true, 10000);
411 assertTrue("New log file should have the default replication instead of " +
412 ((FSHLog) log).getLogReplication(),
413 ((FSHLog) log).getLogReplication() == fs.getDefaultReplication());
414 assertTrue("LowReplication Roller should've been enabled",
415 log.isLowReplicationRollEnabled());
416 }
417
418
419
420
421
422
423 @Test
424 public void testLogRollOnPipelineRestart() throws Exception {
425 LOG.info("Starting testLogRollOnPipelineRestart");
426 assertTrue("This test requires HLog file replication.",
427 fs.getDefaultReplication() > 1);
428 LOG.info("Replication=" + fs.getDefaultReplication());
429
430 new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
431
432 this.server = cluster.getRegionServer(0);
433 this.log = server.getWAL();
434
435
436 String tableName = getName();
437 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
438 desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
439
440 admin.createTable(desc);
441 HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
442
443 server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
444 this.log = server.getWAL();
445 final List<Path> paths = new ArrayList<Path>();
446 final List<Integer> preLogRolledCalled = new ArrayList<Integer>();
447 paths.add(((FSHLog) log).computeFilename());
448 log.registerWALActionsListener(new WALActionsListener() {
449 @Override
450 public void preLogRoll(Path oldFile, Path newFile) {
451 LOG.debug("preLogRoll: oldFile="+oldFile+" newFile="+newFile);
452 preLogRolledCalled.add(new Integer(1));
453 }
454 @Override
455 public void postLogRoll(Path oldFile, Path newFile) {
456 paths.add(newFile);
457 }
458 @Override
459 public void preLogArchive(Path oldFile, Path newFile) {}
460 @Override
461 public void postLogArchive(Path oldFile, Path newFile) {}
462 @Override
463 public void logRollRequested() {}
464 @Override
465 public void logCloseRequested() {}
466 @Override
467 public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
468 WALEdit logEdit) {}
469 @Override
470 public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
471 WALEdit logEdit) {}
472 });
473
474 assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
475
476 assertTrue("Need append support for this test", FSUtils
477 .isAppendSupported(TEST_UTIL.getConfiguration()));
478
479 writeData(table, 1002);
480
481 table.setAutoFlush(true, true);
482
483 long curTime = System.currentTimeMillis();
484 long oldFilenum = log.getFilenum();
485 assertTrue("Log should have a timestamp older than now",
486 curTime > oldFilenum && oldFilenum != -1);
487
488 assertTrue("The log shouldn't have rolled yet", oldFilenum == log.getFilenum());
489
490
491 dfsCluster.restartDataNodes();
492 Thread.sleep(1000);
493 dfsCluster.waitActive();
494 LOG.info("Data Nodes restarted");
495 validateData(table, 1002);
496
497
498 writeData(table, 1003);
499 long newFilenum = log.getFilenum();
500
501 assertTrue("Missing datanode should've triggered a log roll",
502 newFilenum > oldFilenum && newFilenum > curTime);
503 validateData(table, 1003);
504
505 writeData(table, 1004);
506
507
508 dfsCluster.restartDataNodes();
509 Thread.sleep(1000);
510 dfsCluster.waitActive();
511 LOG.info("Data Nodes restarted");
512 validateData(table, 1004);
513
514
515 writeData(table, 1005);
516
517
518 log.rollWriter(true);
519 assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(),
520 preLogRolledCalled.size() >= 1);
521
522
523 Set<String> loggedRows = new HashSet<String>();
524 FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration());
525 for (Path p : paths) {
526 LOG.debug("recovering lease for " + p);
527 fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), null);
528
529 LOG.debug("Reading HLog "+FSUtils.getPath(p));
530 HLog.Reader reader = null;
531 try {
532 reader = HLogFactory.createReader(fs, p,
533 TEST_UTIL.getConfiguration());
534 HLog.Entry entry;
535 while ((entry = reader.next()) != null) {
536 LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getKeyValues());
537 for (KeyValue kv : entry.getEdit().getKeyValues()) {
538 loggedRows.add(Bytes.toStringBinary(kv.getRow()));
539 }
540 }
541 } catch (EOFException e) {
542 LOG.debug("EOF reading file "+FSUtils.getPath(p));
543 } finally {
544 if (reader != null) reader.close();
545 }
546 }
547
548
549 assertTrue(loggedRows.contains("row1002"));
550 assertTrue(loggedRows.contains("row1003"));
551 assertTrue(loggedRows.contains("row1004"));
552 assertTrue(loggedRows.contains("row1005"));
553
554
555 List<HRegion> regions =
556 new ArrayList<HRegion>(server.getOnlineRegionsLocalContext());
557 for (HRegion r: regions) {
558 r.flushcache();
559 }
560
561 ResultScanner scanner = table.getScanner(new Scan());
562 try {
563 for (int i=2; i<=5; i++) {
564 Result r = scanner.next();
565 assertNotNull(r);
566 assertFalse(r.isEmpty());
567 assertEquals("row100"+i, Bytes.toString(r.getRow()));
568 }
569 } finally {
570 scanner.close();
571 }
572
573
574 for (JVMClusterUtil.RegionServerThread rsThread:
575 TEST_UTIL.getHBaseCluster().getRegionServerThreads()) {
576 assertFalse(rsThread.getRegionServer().isAborted());
577 }
578 }
579
580
581
582
583
584 @Test
585 public void testCompactionRecordDoesntBlockRolling() throws Exception {
586
587 new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
588
589 String tableName = getName();
590 HTable table = createTestTable(tableName);
591 String tableName2 = tableName + "1";
592 HTable table2 = createTestTable(tableName2);
593
594 server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
595 this.log = server.getWAL();
596 FSHLog fshLog = (FSHLog)log;
597 HRegion region = server.getOnlineRegions(table2.getName()).get(0);
598 Store s = region.getStore(HConstants.CATALOG_FAMILY);
599
600
601 admin.flush(TableName.NAMESPACE_TABLE_NAME.getName());
602
603
604 for (int i = 1; i <= 2; ++i) {
605 doPut(table2, i);
606 admin.flush(table2.getTableName());
607 }
608 doPut(table2, 3);
609 assertEquals("Should have no WAL after initial writes", 0, fshLog.getNumRolledLogFiles());
610 assertEquals(2, s.getStorefilesCount());
611
612
613 fshLog.rollWriter();
614 assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumRolledLogFiles());
615 admin.flush(table2.getTableName());
616 region.compactStores();
617
618 Assert.assertNotNull(s);
619 for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) {
620 Threads.sleepWithoutInterrupt(200);
621 }
622 assertEquals("Compaction didn't happen", 1, s.getStorefilesCount());
623
624
625 doPut(table, 0);
626 fshLog.rollWriter();
627 assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumRolledLogFiles());
628
629
630 admin.flush(table.getTableName());
631 doPut(table, 1);
632 fshLog.rollWriter();
633 assertEquals("Should have 1 WALs at the end", 1, fshLog.getNumRolledLogFiles());
634
635 table.close();
636 table2.close();
637 }
638
639 private void doPut(HTable table, int i) throws IOException {
640 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
641 put.add(HConstants.CATALOG_FAMILY, null, value);
642 table.put(put);
643 }
644
645 private HTable createTestTable(String tableName) throws IOException {
646
647 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
648 desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
649 admin.createTable(desc);
650 return new HTable(TEST_UTIL.getConfiguration(), tableName);
651 }
652 }
653