1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import static org.junit.Assert.*;
22
23 import java.io.IOException;
24 import java.lang.reflect.Method;
25 import java.net.BindException;
26 import java.util.Comparator;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.atomic.AtomicLong;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.commons.logging.impl.Log4JLogger;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FSDataInputStream;
37 import org.apache.hadoop.fs.FSDataOutputStream;
38 import org.apache.hadoop.fs.FileStatus;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.hbase.*;
42 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.util.FSUtils;
45 import org.apache.hadoop.hbase.util.Threads;
46 import org.apache.hadoop.hbase.Coprocessor;
47 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
48 import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
49 import org.apache.hadoop.hdfs.DFSClient;
50 import org.apache.hadoop.hdfs.DistributedFileSystem;
51 import org.apache.hadoop.hdfs.MiniDFSCluster;
52 import org.apache.hadoop.hdfs.protocol.FSConstants;
53 import org.apache.hadoop.hdfs.server.datanode.DataNode;
54 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
55 import org.apache.log4j.Level;
56 import org.junit.After;
57 import org.junit.AfterClass;
58 import org.junit.Assert;
59 import org.junit.Before;
60 import org.junit.BeforeClass;
61 import org.junit.Test;
62 import org.junit.experimental.categories.Category;
63
64
65 @Category(LargeTests.class)
66 @SuppressWarnings("deprecation")
67 public class TestHLog {
68 private static final Log LOG = LogFactory.getLog(TestHLog.class);
69 {
70 ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
71 ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
72 ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
73 .getLogger().setLevel(Level.ALL);
74 ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
75 ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
76 }
77
78 private static Configuration conf;
79 private static FileSystem fs;
80 private static Path dir;
81 private static MiniDFSCluster cluster;
82 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
83 private static Path hbaseDir;
84 private static Path oldLogDir;
85
86 @Before
87 public void setUp() throws Exception {
88
89 FileStatus[] entries = fs.listStatus(new Path("/"));
90 for (FileStatus dir : entries) {
91 fs.delete(dir.getPath(), true);
92 }
93
94 }
95
96 @After
97 public void tearDown() throws Exception {
98 }
99
100 @BeforeClass
101 public static void setUpBeforeClass() throws Exception {
102
103 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
104
105 TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
106 TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
107
108 TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
109 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
110 TEST_UTIL.getConfiguration().setInt("dfs.socket.timeout", 5000);
111
112 TEST_UTIL.getConfiguration().setInt("ipc.client.connect.max.retries", 1);
113 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1);
114 TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
115 TEST_UTIL.getConfiguration().setInt("ipc.client.connection.maxidletime", 500);
116 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500);
117 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
118 SampleRegionWALObserver.class.getName());
119 TEST_UTIL.startMiniDFSCluster(3);
120
121 conf = TEST_UTIL.getConfiguration();
122 cluster = TEST_UTIL.getDFSCluster();
123 fs = cluster.getFileSystem();
124
125 hbaseDir = TEST_UTIL.createRootDir();
126 oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
127 dir = new Path(hbaseDir, getName());
128 }
129 @AfterClass
130 public static void tearDownAfterClass() throws Exception {
131 TEST_UTIL.shutdownMiniCluster();
132 }
133
134 private static String getName() {
135
136 return "TestHLog";
137 }
138
139
140
141
142
143 @Test
144 public void testConcurrentWrites() throws Exception {
145
146
147 int errCode = HLogPerformanceEvaluation.
148 innerMain(new Configuration(TEST_UTIL.getConfiguration()),
149 new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"});
150 assertEquals(0, errCode);
151 }
152
153
154
155
156
157
158 @Test
159 public void testSplit() throws IOException {
160
161 final TableName tableName =
162 TableName.valueOf(getName());
163 final byte [] rowName = tableName.getName();
164 Path logdir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
165 HLog log = HLogFactory.createHLog(fs, hbaseDir,
166 HConstants.HREGION_LOGDIR_NAME, conf);
167 final int howmany = 3;
168 HRegionInfo[] infos = new HRegionInfo[3];
169 Path tabledir = FSUtils.getTableDir(hbaseDir, tableName);
170 fs.mkdirs(tabledir);
171 for(int i = 0; i < howmany; i++) {
172 infos[i] = new HRegionInfo(tableName,
173 Bytes.toBytes("" + i), Bytes.toBytes("" + (i+1)), false);
174 fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
175 LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
176 }
177 HTableDescriptor htd = new HTableDescriptor(tableName);
178 htd.addFamily(new HColumnDescriptor("column"));
179
180
181 final AtomicLong sequenceId = new AtomicLong(1);
182 try {
183 for (int ii = 0; ii < howmany; ii++) {
184 for (int i = 0; i < howmany; i++) {
185
186 for (int j = 0; j < howmany; j++) {
187 WALEdit edit = new WALEdit();
188 byte [] family = Bytes.toBytes("column");
189 byte [] qualifier = Bytes.toBytes(Integer.toString(j));
190 byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
191 edit.add(new KeyValue(rowName, family, qualifier,
192 System.currentTimeMillis(), column));
193 LOG.info("Region " + i + ": " + edit);
194 log.append(infos[i], tableName, edit,
195 System.currentTimeMillis(), htd, sequenceId);
196 }
197 }
198 log.rollWriter();
199 }
200 log.close();
201 List<Path> splits = HLogSplitter.split(
202 hbaseDir, logdir, oldLogDir, fs, conf);
203 verifySplits(splits, howmany);
204 log = null;
205 } finally {
206 if (log != null) {
207 log.closeAndDelete();
208 }
209 }
210 }
211
212
213
214
215
216 @Test
217 public void Broken_testSync() throws Exception {
218 TableName tableName =
219 TableName.valueOf(getName());
220
221 Path p = new Path(dir, getName() + ".fsdos");
222 FSDataOutputStream out = fs.create(p);
223 out.write(tableName.getName());
224 Method syncMethod = null;
225 try {
226 syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
227 } catch (NoSuchMethodException e) {
228 try {
229 syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
230 } catch (NoSuchMethodException ex) {
231 fail("This version of Hadoop supports neither Syncable.sync() " +
232 "nor Syncable.hflush().");
233 }
234 }
235 syncMethod.invoke(out, new Object[]{});
236 FSDataInputStream in = fs.open(p);
237 assertTrue(in.available() > 0);
238 byte [] buffer = new byte [1024];
239 int read = in.read(buffer);
240 assertEquals(tableName.getName().length, read);
241 out.close();
242 in.close();
243
244 HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir", conf);
245 final AtomicLong sequenceId = new AtomicLong(1);
246 final int total = 20;
247 HLog.Reader reader = null;
248
249 try {
250 HRegionInfo info = new HRegionInfo(tableName,
251 null,null, false);
252 HTableDescriptor htd = new HTableDescriptor();
253 htd.addFamily(new HColumnDescriptor(tableName.getName()));
254
255 for (int i = 0; i < total; i++) {
256 WALEdit kvs = new WALEdit();
257 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
258 wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
259 }
260
261
262 wal.sync();
263
264 Path walPath = ((FSHLog) wal).computeFilename();
265 reader = HLogFactory.createReader(fs, walPath, conf);
266 int count = 0;
267 HLog.Entry entry = new HLog.Entry();
268 while ((entry = reader.next(entry)) != null) count++;
269 assertEquals(total, count);
270 reader.close();
271
272
273 for (int i = 0; i < total; i++) {
274 WALEdit kvs = new WALEdit();
275 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
276 wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
277 }
278 reader = HLogFactory.createReader(fs, walPath, conf);
279 count = 0;
280 while((entry = reader.next(entry)) != null) count++;
281 assertTrue(count >= total);
282 reader.close();
283
284 wal.sync();
285 reader = HLogFactory.createReader(fs, walPath, conf);
286 count = 0;
287 while((entry = reader.next(entry)) != null) count++;
288 assertEquals(total * 2, count);
289
290
291 final byte [] value = new byte[1025 * 1024];
292 for (int i = 0; i < total; i++) {
293 WALEdit kvs = new WALEdit();
294 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
295 wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
296 }
297
298 wal.sync();
299 reader = HLogFactory.createReader(fs, walPath, conf);
300 count = 0;
301 while((entry = reader.next(entry)) != null) count++;
302 assertEquals(total * 3, count);
303 reader.close();
304
305 wal.close();
306 reader = HLogFactory.createReader(fs, walPath, conf);
307 count = 0;
308 while((entry = reader.next(entry)) != null) count++;
309 assertEquals(total * 3, count);
310 reader.close();
311 } finally {
312 if (wal != null) wal.closeAndDelete();
313 if (reader != null) reader.close();
314 }
315 }
316
317 private void verifySplits(List<Path> splits, final int howmany)
318 throws IOException {
319 assertEquals(howmany * howmany, splits.size());
320 for (int i = 0; i < splits.size(); i++) {
321 LOG.info("Verifying=" + splits.get(i));
322 HLog.Reader reader = HLogFactory.createReader(fs, splits.get(i), conf);
323 try {
324 int count = 0;
325 String previousRegion = null;
326 long seqno = -1;
327 HLog.Entry entry = new HLog.Entry();
328 while((entry = reader.next(entry)) != null) {
329 HLogKey key = entry.getKey();
330 String region = Bytes.toString(key.getEncodedRegionName());
331
332 if (previousRegion != null) {
333 assertEquals(previousRegion, region);
334 }
335 LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getLogSeqNum());
336 assertTrue(seqno < key.getLogSeqNum());
337 seqno = key.getLogSeqNum();
338 previousRegion = region;
339 count++;
340 }
341 assertEquals(howmany, count);
342 } finally {
343 reader.close();
344 }
345 }
346 }
347
348
349
350
351
352
353
354
355
356
357 @Test (timeout=300000)
358 public void testAppendClose() throws Exception {
359 TableName tableName =
360 TableName.valueOf(getName());
361 HRegionInfo regioninfo = new HRegionInfo(tableName,
362 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
363
364 HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir",
365 "hlogdir_archive", conf);
366 final AtomicLong sequenceId = new AtomicLong(1);
367 final int total = 20;
368
369 HTableDescriptor htd = new HTableDescriptor();
370 htd.addFamily(new HColumnDescriptor(tableName.getName()));
371
372 for (int i = 0; i < total; i++) {
373 WALEdit kvs = new WALEdit();
374 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
375 wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
376 }
377
378 wal.sync();
379 int namenodePort = cluster.getNameNodePort();
380 final Path walPath = ((FSHLog) wal).computeFilename();
381
382
383
384 try {
385 DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
386 dfs.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_ENTER);
387 TEST_UTIL.shutdownMiniDFSCluster();
388 try {
389
390
391 wal.close();
392 } catch (IOException e) {
393 LOG.info(e);
394 }
395 fs.close();
396 LOG.info("STOPPED first instance of the cluster");
397 } finally {
398
399 while (cluster.isClusterUp()){
400 LOG.error("Waiting for cluster to go down");
401 Thread.sleep(1000);
402 }
403 assertFalse(cluster.isClusterUp());
404 cluster = null;
405 for (int i = 0; i < 100; i++) {
406 try {
407 cluster = TEST_UTIL.startMiniDFSClusterForTestHLog(namenodePort);
408 break;
409 } catch (BindException e) {
410 LOG.info("Sleeping. BindException bringing up new cluster");
411 Threads.sleep(1000);
412 }
413 }
414 cluster.waitActive();
415 fs = cluster.getFileSystem();
416 LOG.info("STARTED second instance.");
417 }
418
419
420
421 Method setLeasePeriod = cluster.getClass()
422 .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
423 setLeasePeriod.setAccessible(true);
424 setLeasePeriod.invoke(cluster, 1000L, 1000L);
425 try {
426 Thread.sleep(1000);
427 } catch (InterruptedException e) {
428 LOG.info(e);
429 }
430
431
432 final FileSystem recoveredFs = fs;
433 final Configuration rlConf = conf;
434
435 class RecoverLogThread extends Thread {
436 public Exception exception = null;
437 public void run() {
438 try {
439 FSUtils.getInstance(fs, rlConf)
440 .recoverFileLease(recoveredFs, walPath, rlConf, null);
441 } catch (IOException e) {
442 exception = e;
443 }
444 }
445 }
446
447 RecoverLogThread t = new RecoverLogThread();
448 t.start();
449
450 t.join(60 * 1000);
451 if(t.isAlive()) {
452 t.interrupt();
453 throw new Exception("Timed out waiting for HLog.recoverLog()");
454 }
455
456 if (t.exception != null)
457 throw t.exception;
458
459
460 HLog.Reader reader = HLogFactory.createReader(fs, walPath, conf);
461 int count = 0;
462 HLog.Entry entry = new HLog.Entry();
463 while (reader.next(entry) != null) {
464 count++;
465 assertTrue("Should be one KeyValue per WALEdit",
466 entry.getEdit().getKeyValues().size() == 1);
467 }
468 assertEquals(total, count);
469 reader.close();
470
471
472 setLeasePeriod.invoke(cluster, new Object[]{new Long(60000), new Long(3600000)});
473 }
474
475
476
477
478
479 @Test
480 public void testEditAdd() throws IOException {
481 final int COL_COUNT = 10;
482 final TableName tableName =
483 TableName.valueOf("tablename");
484 final byte [] row = Bytes.toBytes("row");
485 HLog.Reader reader = null;
486 HLog log = null;
487 try {
488 log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
489 final AtomicLong sequenceId = new AtomicLong(1);
490
491
492
493 long timestamp = System.currentTimeMillis();
494 WALEdit cols = new WALEdit();
495 for (int i = 0; i < COL_COUNT; i++) {
496 cols.add(new KeyValue(row, Bytes.toBytes("column"),
497 Bytes.toBytes(Integer.toString(i)),
498 timestamp, new byte[] { (byte)(i + '0') }));
499 }
500 HRegionInfo info = new HRegionInfo(tableName,
501 row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
502 HTableDescriptor htd = new HTableDescriptor();
503 htd.addFamily(new HColumnDescriptor("column"));
504
505 log.append(info, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
506 log.startCacheFlush(info.getEncodedNameAsBytes());
507 log.completeCacheFlush(info.getEncodedNameAsBytes());
508 log.close();
509 Path filename = ((FSHLog) log).computeFilename();
510 log = null;
511
512 reader = HLogFactory.createReader(fs, filename, conf);
513
514
515 for (int i = 0; i < 1; i++) {
516 HLog.Entry entry = reader.next(null);
517 if (entry == null) break;
518 HLogKey key = entry.getKey();
519 WALEdit val = entry.getEdit();
520 assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
521 assertTrue(tableName.equals(key.getTablename()));
522 KeyValue kv = val.getKeyValues().get(0);
523 assertTrue(Bytes.equals(row, kv.getRow()));
524 assertEquals((byte)(i + '0'), kv.getValue()[0]);
525 System.out.println(key + " " + val);
526 }
527 } finally {
528 if (log != null) {
529 log.closeAndDelete();
530 }
531 if (reader != null) {
532 reader.close();
533 }
534 }
535 }
536
537
538
539
540 @Test
541 public void testAppend() throws IOException {
542 final int COL_COUNT = 10;
543 final TableName tableName =
544 TableName.valueOf("tablename");
545 final byte [] row = Bytes.toBytes("row");
546 Reader reader = null;
547 HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
548 final AtomicLong sequenceId = new AtomicLong(1);
549 try {
550
551
552 long timestamp = System.currentTimeMillis();
553 WALEdit cols = new WALEdit();
554 for (int i = 0; i < COL_COUNT; i++) {
555 cols.add(new KeyValue(row, Bytes.toBytes("column"),
556 Bytes.toBytes(Integer.toString(i)),
557 timestamp, new byte[] { (byte)(i + '0') }));
558 }
559 HRegionInfo hri = new HRegionInfo(tableName,
560 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
561 HTableDescriptor htd = new HTableDescriptor();
562 htd.addFamily(new HColumnDescriptor("column"));
563 log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
564 log.startCacheFlush(hri.getEncodedNameAsBytes());
565 log.completeCacheFlush(hri.getEncodedNameAsBytes());
566 log.close();
567 Path filename = ((FSHLog) log).computeFilename();
568 log = null;
569
570 reader = HLogFactory.createReader(fs, filename, conf);
571 HLog.Entry entry = reader.next();
572 assertEquals(COL_COUNT, entry.getEdit().size());
573 int idx = 0;
574 for (KeyValue val : entry.getEdit().getKeyValues()) {
575 assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
576 entry.getKey().getEncodedRegionName()));
577 assertTrue(tableName.equals(entry.getKey().getTablename()));
578 assertTrue(Bytes.equals(row, val.getRow()));
579 assertEquals((byte)(idx + '0'), val.getValue()[0]);
580 System.out.println(entry.getKey() + " " + val);
581 idx++;
582 }
583 } finally {
584 if (log != null) {
585 log.closeAndDelete();
586 }
587 if (reader != null) {
588 reader.close();
589 }
590 }
591 }
592
593
594
595
596
597 @Test
598 public void testVisitors() throws Exception {
599 final int COL_COUNT = 10;
600 final TableName tableName =
601 TableName.valueOf("tablename");
602 final byte [] row = Bytes.toBytes("row");
603 HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
604 final AtomicLong sequenceId = new AtomicLong(1);
605 try {
606 DumbWALActionsListener visitor = new DumbWALActionsListener();
607 log.registerWALActionsListener(visitor);
608 long timestamp = System.currentTimeMillis();
609 HTableDescriptor htd = new HTableDescriptor();
610 htd.addFamily(new HColumnDescriptor("column"));
611
612 HRegionInfo hri = new HRegionInfo(tableName,
613 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
614 for (int i = 0; i < COL_COUNT; i++) {
615 WALEdit cols = new WALEdit();
616 cols.add(new KeyValue(row, Bytes.toBytes("column"),
617 Bytes.toBytes(Integer.toString(i)),
618 timestamp, new byte[]{(byte) (i + '0')}));
619 log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
620 }
621 assertEquals(COL_COUNT, visitor.increments);
622 log.unregisterWALActionsListener(visitor);
623 WALEdit cols = new WALEdit();
624 cols.add(new KeyValue(row, Bytes.toBytes("column"),
625 Bytes.toBytes(Integer.toString(11)),
626 timestamp, new byte[]{(byte) (11 + '0')}));
627 log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
628 assertEquals(COL_COUNT, visitor.increments);
629 } finally {
630 if (log != null) log.closeAndDelete();
631 }
632 }
633
634 @Test
635 public void testLogCleaning() throws Exception {
636 LOG.info("testLogCleaning");
637 final TableName tableName =
638 TableName.valueOf("testLogCleaning");
639 final TableName tableName2 =
640 TableName.valueOf("testLogCleaning2");
641
642 HLog log = HLogFactory.createHLog(fs, hbaseDir,
643 getName(), conf);
644 final AtomicLong sequenceId = new AtomicLong(1);
645 try {
646 HRegionInfo hri = new HRegionInfo(tableName,
647 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
648 HRegionInfo hri2 = new HRegionInfo(tableName2,
649 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
650
651
652
653 addEdits(log, hri, tableName, 1, sequenceId);
654 log.rollWriter();
655 assertEquals(1, ((FSHLog) log).getNumRolledLogFiles());
656
657
658 addEdits(log, hri, tableName, 2, sequenceId);
659 log.rollWriter();
660 assertEquals(2, ((FSHLog) log).getNumRolledLogFiles());
661
662
663 addEdits(log, hri, tableName, 1, sequenceId);
664 addEdits(log, hri2, tableName2, 1, sequenceId);
665 addEdits(log, hri, tableName, 1, sequenceId);
666 addEdits(log, hri2, tableName2, 1, sequenceId);
667 log.rollWriter();
668 assertEquals(3, ((FSHLog) log).getNumRolledLogFiles());
669
670
671
672 addEdits(log, hri2, tableName2, 1, sequenceId);
673 log.startCacheFlush(hri.getEncodedNameAsBytes());
674 log.completeCacheFlush(hri.getEncodedNameAsBytes());
675 log.rollWriter();
676 assertEquals(2, ((FSHLog) log).getNumRolledLogFiles());
677
678
679
680
681 addEdits(log, hri2, tableName2, 1, sequenceId);
682 log.startCacheFlush(hri2.getEncodedNameAsBytes());
683 log.completeCacheFlush(hri2.getEncodedNameAsBytes());
684 log.rollWriter();
685 assertEquals(0, ((FSHLog) log).getNumRolledLogFiles());
686 } finally {
687 if (log != null) log.closeAndDelete();
688 }
689 }
690
691 @Test
692 public void testFailedToCreateHLogIfParentRenamed() throws IOException {
693 FSHLog log = (FSHLog)HLogFactory.createHLog(
694 fs, hbaseDir, "testFailedToCreateHLogIfParentRenamed", conf);
695 long filenum = System.currentTimeMillis();
696 Path path = log.computeFilename(filenum);
697 HLogFactory.createWALWriter(fs, path, conf);
698 Path parent = path.getParent();
699 path = log.computeFilename(filenum + 1);
700 Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
701 fs.rename(parent, newPath);
702 try {
703 HLogFactory.createWALWriter(fs, path, conf);
704 fail("It should fail to create the new WAL");
705 } catch (IOException ioe) {
706
707 }
708 }
709
710 @Test
711 public void testGetServerNameFromHLogDirectoryName() throws IOException {
712 ServerName sn = ServerName.valueOf("hn", 450, 1398);
713 String hl = FSUtils.getRootDir(conf) + "/" + HLogUtil.getHLogDirectoryName(sn.toString());
714
715
716 Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, null));
717 Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf,
718 FSUtils.getRootDir(conf).toUri().toString()));
719 Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, ""));
720 Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, " "));
721 Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, hl));
722 Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, hl + "qdf"));
723 Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, "sfqf" + hl + "qdf"));
724
725 final String wals = "/WALs/";
726 ServerName parsed = HLogUtil.getServerNameFromHLogDirectoryName(conf,
727 FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
728 "/localhost%2C32984%2C1343316388997.1343316390417");
729 Assert.assertEquals("standard", sn, parsed);
730
731 parsed = HLogUtil.getServerNameFromHLogDirectoryName(conf, hl + "/qdf");
732 Assert.assertEquals("subdir", sn, parsed);
733
734 parsed = HLogUtil.getServerNameFromHLogDirectoryName(conf,
735 FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
736 "-splitting/localhost%3A57020.1340474893931");
737 Assert.assertEquals("split", sn, parsed);
738 }
739
740
741
742
743 @Test
744 public void testWALCoprocessorLoaded() throws Exception {
745
746 HLog log = HLogFactory.createHLog(fs, hbaseDir,
747 getName(), conf);
748 try {
749 WALCoprocessorHost host = log.getCoprocessorHost();
750 Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
751 assertNotNull(c);
752 } finally {
753 if (log != null) log.closeAndDelete();
754 }
755 }
756
757 private void addEdits(HLog log, HRegionInfo hri, TableName tableName,
758 int times, AtomicLong sequenceId) throws IOException {
759 HTableDescriptor htd = new HTableDescriptor();
760 htd.addFamily(new HColumnDescriptor("row"));
761
762 final byte [] row = Bytes.toBytes("row");
763 for (int i = 0; i < times; i++) {
764 long timestamp = System.currentTimeMillis();
765 WALEdit cols = new WALEdit();
766 cols.add(new KeyValue(row, row, row, timestamp, row));
767 log.append(hri, tableName, cols, timestamp, htd, sequenceId);
768 }
769 }
770
771
772
773
774
775 @Test
776 public void testReadLegacyLog() throws IOException {
777 final int columnCount = 5;
778 final int recordCount = 5;
779 final TableName tableName =
780 TableName.valueOf("tablename");
781 final byte[] row = Bytes.toBytes("row");
782 long timestamp = System.currentTimeMillis();
783 Path path = new Path(dir, "temphlog");
784 SequenceFileLogWriter sflw = null;
785 HLog.Reader reader = null;
786 try {
787 HRegionInfo hri = new HRegionInfo(tableName,
788 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
789 HTableDescriptor htd = new HTableDescriptor(tableName);
790 fs.mkdirs(dir);
791
792 sflw = new SequenceFileLogWriter();
793 sflw.init(fs, path, conf, false);
794 for (int i = 0; i < recordCount; ++i) {
795 HLogKey key = new HLogKey(
796 hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
797 WALEdit edit = new WALEdit();
798 for (int j = 0; j < columnCount; ++j) {
799 if (i == 0) {
800 htd.addFamily(new HColumnDescriptor("column" + j));
801 }
802 String value = i + "" + j;
803 edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
804 }
805 sflw.append(new HLog.Entry(key, edit));
806 }
807 sflw.sync();
808 sflw.close();
809
810
811 reader = HLogFactory.createReader(fs, path, conf);
812 assertTrue(reader instanceof SequenceFileLogReader);
813 for (int i = 0; i < recordCount; ++i) {
814 HLog.Entry entry = reader.next();
815 assertNotNull(entry);
816 assertEquals(columnCount, entry.getEdit().size());
817 assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
818 assertEquals(tableName, entry.getKey().getTablename());
819 int idx = 0;
820 for (KeyValue val : entry.getEdit().getKeyValues()) {
821 assertTrue(Bytes.equals(row, val.getRow()));
822 String value = i + "" + idx;
823 assertArrayEquals(Bytes.toBytes(value), val.getValue());
824 idx++;
825 }
826 }
827 HLog.Entry entry = reader.next();
828 assertNull(entry);
829 } finally {
830 if (sflw != null) {
831 sflw.close();
832 }
833 if (reader != null) {
834 reader.close();
835 }
836 }
837 }
838
839
840
841
842
843 @Test
844 public void testWALTrailer() throws IOException {
845
846 doRead(true);
847
848 doRead(false);
849 }
850
851
852
853
854
855
856
857
858
859
860 private void doRead(boolean withTrailer) throws IOException {
861 final int columnCount = 5;
862 final int recordCount = 5;
863 final TableName tableName =
864 TableName.valueOf("tablename");
865 final byte[] row = Bytes.toBytes("row");
866 long timestamp = System.currentTimeMillis();
867 Path path = new Path(dir, "temphlog");
868
869 fs.delete(path, true);
870 HLog.Writer writer = null;
871 HLog.Reader reader = null;
872 try {
873 HRegionInfo hri = new HRegionInfo(tableName,
874 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
875 HTableDescriptor htd = new HTableDescriptor(tableName);
876 fs.mkdirs(dir);
877
878 writer = HLogFactory.createWALWriter(fs, path, conf);
879 for (int i = 0; i < recordCount; ++i) {
880 HLogKey key = new HLogKey(
881 hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
882 WALEdit edit = new WALEdit();
883 for (int j = 0; j < columnCount; ++j) {
884 if (i == 0) {
885 htd.addFamily(new HColumnDescriptor("column" + j));
886 }
887 String value = i + "" + j;
888 edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
889 }
890 writer.append(new HLog.Entry(key, edit));
891 }
892 writer.sync();
893 if (withTrailer) writer.close();
894
895
896 reader = HLogFactory.createReader(fs, path, conf);
897 assertTrue(reader instanceof ProtobufLogReader);
898 if (withTrailer) {
899 assertNotNull(reader.getWALTrailer());
900 } else {
901 assertNull(reader.getWALTrailer());
902 }
903 for (int i = 0; i < recordCount; ++i) {
904 HLog.Entry entry = reader.next();
905 assertNotNull(entry);
906 assertEquals(columnCount, entry.getEdit().size());
907 assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
908 assertEquals(tableName, entry.getKey().getTablename());
909 int idx = 0;
910 for (KeyValue val : entry.getEdit().getKeyValues()) {
911 assertTrue(Bytes.equals(row, val.getRow()));
912 String value = i + "" + idx;
913 assertArrayEquals(Bytes.toBytes(value), val.getValue());
914 idx++;
915 }
916 }
917 HLog.Entry entry = reader.next();
918 assertNull(entry);
919 } finally {
920 if (writer != null) {
921 writer.close();
922 }
923 if (reader != null) {
924 reader.close();
925 }
926 }
927 }
928
929
930
931
932
933
934 @Test
935 public void testHLogComparator() throws Exception {
936 HLog hlog1 = null;
937 HLog hlogMeta = null;
938 try {
939 hlog1 = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf);
940 LOG.debug("Log obtained is: " + hlog1);
941 Comparator<Path> comp = ((FSHLog) hlog1).LOG_NAME_COMPARATOR;
942 Path p1 = ((FSHLog) hlog1).computeFilename(11);
943 Path p2 = ((FSHLog) hlog1).computeFilename(12);
944
945 assertTrue(comp.compare(p1, p1) == 0);
946
947 assertTrue(comp.compare(p1, p2) < 0);
948 hlogMeta = HLogFactory.createMetaHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf,
949 null, null);
950 Comparator<Path> compMeta = ((FSHLog) hlogMeta).LOG_NAME_COMPARATOR;
951
952 Path p1WithMeta = ((FSHLog) hlogMeta).computeFilename(11);
953 Path p2WithMeta = ((FSHLog) hlogMeta).computeFilename(12);
954 assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
955 assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
956
957 boolean ex = false;
958 try {
959 comp.compare(p1WithMeta, p2);
960 } catch (Exception e) {
961 ex = true;
962 }
963 assertTrue("Comparator doesn't complain while checking meta log files", ex);
964 boolean exMeta = false;
965 try {
966 compMeta.compare(p1WithMeta, p2);
967 } catch (Exception e) {
968 exMeta = true;
969 }
970 assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
971 } finally {
972 if (hlog1 != null) hlog1.close();
973 if (hlogMeta != null) hlogMeta.close();
974 }
975 }
976
977
978
979
980
981
982
983
984
985
986
987
988
989 @Test
990 public void testWALArchiving() throws IOException {
991 LOG.debug("testWALArchiving");
992 TableName table1 = TableName.valueOf("t1");
993 TableName table2 = TableName.valueOf("t2");
994 HLog hlog = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf);
995 try {
996 assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
997 HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW,
998 HConstants.EMPTY_END_ROW);
999 HRegionInfo hri2 = new HRegionInfo(table2, HConstants.EMPTY_START_ROW,
1000 HConstants.EMPTY_END_ROW);
1001
1002 hri1.setSplit(false);
1003 hri2.setSplit(false);
1004
1005 final AtomicLong sequenceId1 = new AtomicLong(1);
1006 final AtomicLong sequenceId2 = new AtomicLong(1);
1007
1008 addEdits(hlog, hri1, table1, 1, sequenceId1);
1009 hlog.rollWriter();
1010
1011 assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1012
1013 addEdits(hlog, hri1, table1, 1, sequenceId1);
1014 hlog.rollWriter();
1015
1016 assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles());
1017
1018 addEdits(hlog, hri1, table1, 3, sequenceId1);
1019 flushRegion(hlog, hri1.getEncodedNameAsBytes());
1020
1021 hlog.rollWriter();
1022 assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
1023
1024 addEdits(hlog, hri2, table2, 1, sequenceId2);
1025 hlog.rollWriter();
1026 assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1027
1028 addEdits(hlog, hri1, table1, 2, sequenceId1);
1029 hlog.rollWriter();
1030 assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles());
1031
1032 addEdits(hlog, hri2, table2, 2, sequenceId2);
1033 flushRegion(hlog, hri1.getEncodedNameAsBytes());
1034
1035
1036
1037
1038
1039 hlog.rollWriter();
1040 assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles());
1041
1042 addEdits(hlog, hri2, table2, 2, sequenceId2);
1043 flushRegion(hlog, hri2.getEncodedNameAsBytes());
1044 hlog.rollWriter();
1045 assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
1046 } finally {
1047 if (hlog != null) hlog.close();
1048 }
1049 }
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060 @Test
1061 public void testFindMemStoresEligibleForFlush() throws Exception {
1062 LOG.debug("testFindMemStoresEligibleForFlush");
1063 Configuration conf1 = HBaseConfiguration.create(conf);
1064 conf1.setInt("hbase.regionserver.maxlogs", 1);
1065 HLog hlog = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf1), dir.toString(), conf1);
1066 TableName t1 = TableName.valueOf("t1");
1067 TableName t2 = TableName.valueOf("t2");
1068 HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
1069 HRegionInfo hri2 = new HRegionInfo(t2, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
1070
1071 final AtomicLong sequenceId1 = new AtomicLong(1);
1072 final AtomicLong sequenceId2 = new AtomicLong(1);
1073
1074 try {
1075 addEdits(hlog, hri1, t1, 2, sequenceId1);
1076 hlog.rollWriter();
1077
1078 addEdits(hlog, hri1, t1, 2, sequenceId1);
1079 hlog.rollWriter();
1080
1081 assertTrue(((FSHLog) hlog).getNumRolledLogFiles() == 2);
1082
1083
1084
1085 byte[][] regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush();
1086 assertEquals(1, regionsToFlush.length);
1087 assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
1088
1089 addEdits(hlog, hri2, t2, 2, sequenceId2);
1090
1091 regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush();
1092 assertEquals(regionsToFlush.length, 1);
1093 assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
1094
1095
1096 flushRegion(hlog, hri1.getEncodedNameAsBytes());
1097 hlog.rollWriter();
1098
1099 assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1100
1101 flushRegion(hlog, hri2.getEncodedNameAsBytes());
1102 hlog.rollWriter(true);
1103
1104 assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
1105
1106 addEdits(hlog, hri1, t1, 2, sequenceId1);
1107 addEdits(hlog, hri2, t2, 2, sequenceId2);
1108 hlog.rollWriter();
1109
1110 assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1111 addEdits(hlog, hri1, t1, 2, sequenceId1);
1112 hlog.rollWriter();
1113
1114
1115 regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush();
1116 assertEquals(2, regionsToFlush.length);
1117
1118 flushRegion(hlog, hri1.getEncodedNameAsBytes());
1119 flushRegion(hlog, hri2.getEncodedNameAsBytes());
1120 hlog.rollWriter(true);
1121 assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
1122
1123 addEdits(hlog, hri1, t1, 2, sequenceId1);
1124
1125 hlog.startCacheFlush(hri1.getEncodedNameAsBytes());
1126 hlog.rollWriter();
1127 hlog.completeCacheFlush(hri1.getEncodedNameAsBytes());
1128 assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1129 } finally {
1130 if (hlog != null) hlog.close();
1131 }
1132 }
1133
1134
1135
1136
1137
1138
1139
1140
1141 @Test
1142 public void testAllRegionsFlushed() {
1143 LOG.debug("testAllRegionsFlushed");
1144 Map<byte[], Long> oldestFlushingSeqNo = new HashMap<byte[], Long>();
1145 Map<byte[], Long> oldestUnFlushedSeqNo = new HashMap<byte[], Long>();
1146 Map<byte[], Long> seqNo = new HashMap<byte[], Long>();
1147
1148 TableName t1 = TableName.valueOf("t1");
1149
1150 HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
1151
1152 final AtomicLong sequenceId1 = new AtomicLong(1);
1153
1154 assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1155
1156 seqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.incrementAndGet());
1157 oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
1158
1159 assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1160
1161 oldestUnFlushedSeqNo.clear();
1162 oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
1163 assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1164
1165 oldestFlushingSeqNo.clear();
1166 oldestUnFlushedSeqNo.clear();
1167 assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1168
1169 oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), 1000l);
1170 seqNo.put(hri1.getEncodedNameAsBytes(), 1500l);
1171 assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1172
1173
1174
1175 oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), 1200l);
1176 oldestUnFlushedSeqNo.clear();
1177 seqNo.put(hri1.getEncodedNameAsBytes(), 1199l);
1178 assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1179 }
1180
1181
1182
1183
1184
1185
1186 private void flushRegion(HLog hlog, byte[] regionEncodedName) {
1187 hlog.startCacheFlush(regionEncodedName);
1188 hlog.completeCacheFlush(regionEncodedName);
1189 }
1190
1191 static class DumbWALActionsListener implements WALActionsListener {
1192 int increments = 0;
1193
1194 @Override
1195 public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
1196 WALEdit logEdit) {
1197 increments++;
1198 }
1199
1200 @Override
1201 public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {
1202
1203 increments++;
1204 }
1205
1206 @Override
1207 public void preLogRoll(Path oldFile, Path newFile) {
1208
1209 }
1210
1211 @Override
1212 public void postLogRoll(Path oldFile, Path newFile) {
1213
1214 }
1215
1216 @Override
1217 public void preLogArchive(Path oldFile, Path newFile) {
1218
1219 }
1220
1221 @Override
1222 public void postLogArchive(Path oldFile, Path newFile) {
1223
1224 }
1225
1226 @Override
1227 public void logRollRequested() {
1228
1229
1230 }
1231
1232 @Override
1233 public void logCloseRequested() {
1234
1235 }
1236 }
1237
1238 }
1239