View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import static org.junit.Assert.*;
22  
23  import java.io.IOException;
24  import java.lang.reflect.Method;
25  import java.net.BindException;
26  import java.util.Comparator;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.commons.logging.impl.Log4JLogger;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FSDataInputStream;
37  import org.apache.hadoop.fs.FSDataOutputStream;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.*;
42  import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.FSUtils;
45  import org.apache.hadoop.hbase.util.Threads;
46  import org.apache.hadoop.hbase.Coprocessor;
47  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
48  import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
49  import org.apache.hadoop.hdfs.DFSClient;
50  import org.apache.hadoop.hdfs.DistributedFileSystem;
51  import org.apache.hadoop.hdfs.MiniDFSCluster;
52  import org.apache.hadoop.hdfs.protocol.FSConstants;
53  import org.apache.hadoop.hdfs.server.datanode.DataNode;
54  import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
55  import org.apache.log4j.Level;
56  import org.junit.After;
57  import org.junit.AfterClass;
58  import org.junit.Assert;
59  import org.junit.Before;
60  import org.junit.BeforeClass;
61  import org.junit.Test;
62  import org.junit.experimental.categories.Category;
63  
64  /** JUnit test case for HLog */
65  @Category(LargeTests.class)
66  @SuppressWarnings("deprecation")
67  public class TestHLog  {
68    private static final Log LOG = LogFactory.getLog(TestHLog.class);
69    {
70      ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
71      ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
72      ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
73        .getLogger().setLevel(Level.ALL);
74      ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
75      ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
76    }
77  
78    private static Configuration conf;
79    private static FileSystem fs;
80    private static Path dir;
81    private static MiniDFSCluster cluster;
82    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
83    private static Path hbaseDir;
84    private static Path oldLogDir;
85  
86    @Before
87    public void setUp() throws Exception {
88  
89      FileStatus[] entries = fs.listStatus(new Path("/"));
90      for (FileStatus dir : entries) {
91        fs.delete(dir.getPath(), true);
92      }
93  
94    }
95  
96    @After
97    public void tearDown() throws Exception {
98    }
99  
100   @BeforeClass
101   public static void setUpBeforeClass() throws Exception {
102     // Make block sizes small.
103     TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
104     // needed for testAppendClose()
105     TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
106     TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
107     // quicker heartbeat interval for faster DN death notification
108     TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
109     TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
110     TEST_UTIL.getConfiguration().setInt("dfs.socket.timeout", 5000);
111     // faster failover with cluster.shutdown();fs.close() idiom
112     TEST_UTIL.getConfiguration().setInt("ipc.client.connect.max.retries", 1);
113     TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1);
114     TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
115     TEST_UTIL.getConfiguration().setInt("ipc.client.connection.maxidletime", 500);
116     TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500);
117     TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
118         SampleRegionWALObserver.class.getName());
119     TEST_UTIL.startMiniDFSCluster(3);
120 
121     conf = TEST_UTIL.getConfiguration();
122     cluster = TEST_UTIL.getDFSCluster();
123     fs = cluster.getFileSystem();
124 
125     hbaseDir = TEST_UTIL.createRootDir();
126     oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
127     dir = new Path(hbaseDir, getName());
128   }
129   @AfterClass
130   public static void tearDownAfterClass() throws Exception {
131     TEST_UTIL.shutdownMiniCluster();
132   }
133 
134   private static String getName() {
135     // TODO Auto-generated method stub
136     return "TestHLog";
137   }
138 
139   /**
140    * Write to a log file with three concurrent threads and verifying all data is written.
141    * @throws Exception
142    */
143   @Test
144   public void testConcurrentWrites() throws Exception {
145     // Run the HPE tool with three threads writing 3000 edits each concurrently.
146     // When done, verify that all edits were written.
147     int errCode = HLogPerformanceEvaluation.
148       innerMain(new Configuration(TEST_UTIL.getConfiguration()),
149         new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"});
150     assertEquals(0, errCode);
151   }
152 
153   /**
154    * Just write multiple logs then split.  Before fix for HADOOP-2283, this
155    * would fail.
156    * @throws IOException
157    */
158   @Test
159   public void testSplit() throws IOException {
160 
161     final TableName tableName =
162         TableName.valueOf(getName());
163     final byte [] rowName = tableName.getName();
164     Path logdir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
165     HLog log = HLogFactory.createHLog(fs, hbaseDir,
166         HConstants.HREGION_LOGDIR_NAME, conf);
167     final int howmany = 3;
168     HRegionInfo[] infos = new HRegionInfo[3];
169     Path tabledir = FSUtils.getTableDir(hbaseDir, tableName);
170     fs.mkdirs(tabledir);
171     for(int i = 0; i < howmany; i++) {
172       infos[i] = new HRegionInfo(tableName,
173                 Bytes.toBytes("" + i), Bytes.toBytes("" + (i+1)), false);
174       fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
175       LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
176     }
177     HTableDescriptor htd = new HTableDescriptor(tableName);
178     htd.addFamily(new HColumnDescriptor("column"));
179 
180     // Add edits for three regions.
181     final AtomicLong sequenceId = new AtomicLong(1);
182     try {
183       for (int ii = 0; ii < howmany; ii++) {
184         for (int i = 0; i < howmany; i++) {
185 
186           for (int j = 0; j < howmany; j++) {
187             WALEdit edit = new WALEdit();
188             byte [] family = Bytes.toBytes("column");
189             byte [] qualifier = Bytes.toBytes(Integer.toString(j));
190             byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
191             edit.add(new KeyValue(rowName, family, qualifier,
192                 System.currentTimeMillis(), column));
193             LOG.info("Region " + i + ": " + edit);
194             log.append(infos[i], tableName, edit,
195               System.currentTimeMillis(), htd, sequenceId);
196           }
197         }
198         log.rollWriter();
199       }
200       log.close();
201       List<Path> splits = HLogSplitter.split(
202         hbaseDir, logdir, oldLogDir, fs, conf);
203       verifySplits(splits, howmany);
204       log = null;
205     } finally {
206       if (log != null) {
207         log.closeAndDelete();
208       }
209     }
210   }
211 
212   /**
213    * Test new HDFS-265 sync.
214    * @throws Exception
215    */
216   @Test
217   public void Broken_testSync() throws Exception {
218     TableName tableName =
219         TableName.valueOf(getName());
220     // First verify that using streams all works.
221     Path p = new Path(dir, getName() + ".fsdos");
222     FSDataOutputStream out = fs.create(p);
223     out.write(tableName.getName());
224     Method syncMethod = null;
225     try {
226       syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
227     } catch (NoSuchMethodException e) {
228       try {
229         syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
230       } catch (NoSuchMethodException ex) {
231         fail("This version of Hadoop supports neither Syncable.sync() " +
232             "nor Syncable.hflush().");
233       }
234     }
235     syncMethod.invoke(out, new Object[]{});
236     FSDataInputStream in = fs.open(p);
237     assertTrue(in.available() > 0);
238     byte [] buffer = new byte [1024];
239     int read = in.read(buffer);
240     assertEquals(tableName.getName().length, read);
241     out.close();
242     in.close();
243 
244     HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir", conf);
245     final AtomicLong sequenceId = new AtomicLong(1);
246     final int total = 20;
247     HLog.Reader reader = null;
248 
249     try {
250       HRegionInfo info = new HRegionInfo(tableName,
251                   null,null, false);
252       HTableDescriptor htd = new HTableDescriptor();
253       htd.addFamily(new HColumnDescriptor(tableName.getName()));
254 
255       for (int i = 0; i < total; i++) {
256         WALEdit kvs = new WALEdit();
257         kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
258         wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
259       }
260       // Now call sync and try reading.  Opening a Reader before you sync just
261       // gives you EOFE.
262       wal.sync();
263       // Open a Reader.
264       Path walPath = ((FSHLog) wal).computeFilename();
265       reader = HLogFactory.createReader(fs, walPath, conf);
266       int count = 0;
267       HLog.Entry entry = new HLog.Entry();
268       while ((entry = reader.next(entry)) != null) count++;
269       assertEquals(total, count);
270       reader.close();
271       // Add test that checks to see that an open of a Reader works on a file
272       // that has had a sync done on it.
273       for (int i = 0; i < total; i++) {
274         WALEdit kvs = new WALEdit();
275         kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
276         wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
277       }
278       reader = HLogFactory.createReader(fs, walPath, conf);
279       count = 0;
280       while((entry = reader.next(entry)) != null) count++;
281       assertTrue(count >= total);
282       reader.close();
283       // If I sync, should see double the edits.
284       wal.sync();
285       reader = HLogFactory.createReader(fs, walPath, conf);
286       count = 0;
287       while((entry = reader.next(entry)) != null) count++;
288       assertEquals(total * 2, count);
289       // Now do a test that ensures stuff works when we go over block boundary,
290       // especially that we return good length on file.
291       final byte [] value = new byte[1025 * 1024];  // Make a 1M value.
292       for (int i = 0; i < total; i++) {
293         WALEdit kvs = new WALEdit();
294         kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
295         wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
296       }
297       // Now I should have written out lots of blocks.  Sync then read.
298       wal.sync();
299       reader = HLogFactory.createReader(fs, walPath, conf);
300       count = 0;
301       while((entry = reader.next(entry)) != null) count++;
302       assertEquals(total * 3, count);
303       reader.close();
304       // Close it and ensure that closed, Reader gets right length also.
305       wal.close();
306       reader = HLogFactory.createReader(fs, walPath, conf);
307       count = 0;
308       while((entry = reader.next(entry)) != null) count++;
309       assertEquals(total * 3, count);
310       reader.close();
311     } finally {
312       if (wal != null) wal.closeAndDelete();
313       if (reader != null) reader.close();
314     }
315   }
316 
317   private void verifySplits(List<Path> splits, final int howmany)
318   throws IOException {
319     assertEquals(howmany * howmany, splits.size());
320     for (int i = 0; i < splits.size(); i++) {
321       LOG.info("Verifying=" + splits.get(i));
322       HLog.Reader reader = HLogFactory.createReader(fs, splits.get(i), conf);
323       try {
324         int count = 0;
325         String previousRegion = null;
326         long seqno = -1;
327         HLog.Entry entry = new HLog.Entry();
328         while((entry = reader.next(entry)) != null) {
329           HLogKey key = entry.getKey();
330           String region = Bytes.toString(key.getEncodedRegionName());
331           // Assert that all edits are for same region.
332           if (previousRegion != null) {
333             assertEquals(previousRegion, region);
334           }
335           LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getLogSeqNum());
336           assertTrue(seqno < key.getLogSeqNum());
337           seqno = key.getLogSeqNum();
338           previousRegion = region;
339           count++;
340         }
341         assertEquals(howmany, count);
342       } finally {
343         reader.close();
344       }
345     }
346   }
347 
348   /*
349    * We pass different values to recoverFileLease() so that different code paths are covered
350    *
351    * For this test to pass, requires:
352    * 1. HDFS-200 (append support)
353    * 2. HDFS-988 (SafeMode should freeze file operations
354    *              [FSNamesystem.nextGenerationStampForBlock])
355    * 3. HDFS-142 (on restart, maintain pendingCreates)
356    */
357   @Test (timeout=300000)
358   public void testAppendClose() throws Exception {
359     TableName tableName =
360         TableName.valueOf(getName());
361     HRegionInfo regioninfo = new HRegionInfo(tableName,
362              HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
363 
364     HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir",
365         "hlogdir_archive", conf);
366     final AtomicLong sequenceId = new AtomicLong(1);
367     final int total = 20;
368 
369     HTableDescriptor htd = new HTableDescriptor();
370     htd.addFamily(new HColumnDescriptor(tableName.getName()));
371 
372     for (int i = 0; i < total; i++) {
373       WALEdit kvs = new WALEdit();
374       kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
375       wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
376     }
377     // Now call sync to send the data to HDFS datanodes
378     wal.sync();
379      int namenodePort = cluster.getNameNodePort();
380     final Path walPath = ((FSHLog) wal).computeFilename();
381 
382 
383     // Stop the cluster.  (ensure restart since we're sharing MiniDFSCluster)
384     try {
385       DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
386       dfs.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_ENTER);
387       TEST_UTIL.shutdownMiniDFSCluster();
388       try {
389         // wal.writer.close() will throw an exception,
390         // but still call this since it closes the LogSyncer thread first
391         wal.close();
392       } catch (IOException e) {
393         LOG.info(e);
394       }
395       fs.close(); // closing FS last so DFSOutputStream can't call close
396       LOG.info("STOPPED first instance of the cluster");
397     } finally {
398       // Restart the cluster
399       while (cluster.isClusterUp()){
400         LOG.error("Waiting for cluster to go down");
401         Thread.sleep(1000);
402       }
403       assertFalse(cluster.isClusterUp());
404       cluster = null;
405       for (int i = 0; i < 100; i++) {
406         try {
407           cluster = TEST_UTIL.startMiniDFSClusterForTestHLog(namenodePort);
408           break;
409         } catch (BindException e) {
410           LOG.info("Sleeping.  BindException bringing up new cluster");
411           Threads.sleep(1000);
412         }
413       }
414       cluster.waitActive();
415       fs = cluster.getFileSystem();
416       LOG.info("STARTED second instance.");
417     }
418 
419     // set the lease period to be 1 second so that the
420     // namenode triggers lease recovery upon append request
421     Method setLeasePeriod = cluster.getClass()
422       .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
423     setLeasePeriod.setAccessible(true);
424     setLeasePeriod.invoke(cluster, 1000L, 1000L);
425     try {
426       Thread.sleep(1000);
427     } catch (InterruptedException e) {
428       LOG.info(e);
429     }
430 
431     // Now try recovering the log, like the HMaster would do
432     final FileSystem recoveredFs = fs;
433     final Configuration rlConf = conf;
434 
435     class RecoverLogThread extends Thread {
436       public Exception exception = null;
437       public void run() {
438           try {
439             FSUtils.getInstance(fs, rlConf)
440               .recoverFileLease(recoveredFs, walPath, rlConf, null);
441           } catch (IOException e) {
442             exception = e;
443           }
444       }
445     }
446 
447     RecoverLogThread t = new RecoverLogThread();
448     t.start();
449     // Timeout after 60 sec. Without correct patches, would be an infinite loop
450     t.join(60 * 1000);
451     if(t.isAlive()) {
452       t.interrupt();
453       throw new Exception("Timed out waiting for HLog.recoverLog()");
454     }
455 
456     if (t.exception != null)
457       throw t.exception;
458 
459     // Make sure you can read all the content
460     HLog.Reader reader = HLogFactory.createReader(fs, walPath, conf);
461     int count = 0;
462     HLog.Entry entry = new HLog.Entry();
463     while (reader.next(entry) != null) {
464       count++;
465       assertTrue("Should be one KeyValue per WALEdit",
466                   entry.getEdit().getKeyValues().size() == 1);
467     }
468     assertEquals(total, count);
469     reader.close();
470 
471     // Reset the lease period
472     setLeasePeriod.invoke(cluster, new Object[]{new Long(60000), new Long(3600000)});
473   }
474 
475   /**
476    * Tests that we can write out an edit, close, and then read it back in again.
477    * @throws IOException
478    */
479   @Test
480   public void testEditAdd() throws IOException {
481     final int COL_COUNT = 10;
482     final TableName tableName =
483         TableName.valueOf("tablename");
484     final byte [] row = Bytes.toBytes("row");
485     HLog.Reader reader = null;
486     HLog log = null;
487     try {
488       log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
489       final AtomicLong sequenceId = new AtomicLong(1);
490 
491       // Write columns named 1, 2, 3, etc. and then values of single byte
492       // 1, 2, 3...
493       long timestamp = System.currentTimeMillis();
494       WALEdit cols = new WALEdit();
495       for (int i = 0; i < COL_COUNT; i++) {
496         cols.add(new KeyValue(row, Bytes.toBytes("column"),
497             Bytes.toBytes(Integer.toString(i)),
498           timestamp, new byte[] { (byte)(i + '0') }));
499       }
500       HRegionInfo info = new HRegionInfo(tableName,
501         row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
502       HTableDescriptor htd = new HTableDescriptor();
503       htd.addFamily(new HColumnDescriptor("column"));
504 
505       log.append(info, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
506       log.startCacheFlush(info.getEncodedNameAsBytes());
507       log.completeCacheFlush(info.getEncodedNameAsBytes());
508       log.close();
509       Path filename = ((FSHLog) log).computeFilename();
510       log = null;
511       // Now open a reader on the log and assert append worked.
512       reader = HLogFactory.createReader(fs, filename, conf);
513       // Above we added all columns on a single row so we only read one
514       // entry in the below... thats why we have '1'.
515       for (int i = 0; i < 1; i++) {
516         HLog.Entry entry = reader.next(null);
517         if (entry == null) break;
518         HLogKey key = entry.getKey();
519         WALEdit val = entry.getEdit();
520         assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
521         assertTrue(tableName.equals(key.getTablename()));
522         KeyValue kv = val.getKeyValues().get(0);
523         assertTrue(Bytes.equals(row, kv.getRow()));
524         assertEquals((byte)(i + '0'), kv.getValue()[0]);
525         System.out.println(key + " " + val);
526       }
527     } finally {
528       if (log != null) {
529         log.closeAndDelete();
530       }
531       if (reader != null) {
532         reader.close();
533       }
534     }
535   }
536 
537   /**
538    * @throws IOException
539    */
540   @Test
541   public void testAppend() throws IOException {
542     final int COL_COUNT = 10;
543     final TableName tableName =
544         TableName.valueOf("tablename");
545     final byte [] row = Bytes.toBytes("row");
546     Reader reader = null;
547     HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
548     final AtomicLong sequenceId = new AtomicLong(1);
549     try {
550       // Write columns named 1, 2, 3, etc. and then values of single byte
551       // 1, 2, 3...
552       long timestamp = System.currentTimeMillis();
553       WALEdit cols = new WALEdit();
554       for (int i = 0; i < COL_COUNT; i++) {
555         cols.add(new KeyValue(row, Bytes.toBytes("column"),
556           Bytes.toBytes(Integer.toString(i)),
557           timestamp, new byte[] { (byte)(i + '0') }));
558       }
559       HRegionInfo hri = new HRegionInfo(tableName,
560           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
561       HTableDescriptor htd = new HTableDescriptor();
562       htd.addFamily(new HColumnDescriptor("column"));
563       log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
564       log.startCacheFlush(hri.getEncodedNameAsBytes());
565       log.completeCacheFlush(hri.getEncodedNameAsBytes());
566       log.close();
567       Path filename = ((FSHLog) log).computeFilename();
568       log = null;
569       // Now open a reader on the log and assert append worked.
570       reader = HLogFactory.createReader(fs, filename, conf);
571       HLog.Entry entry = reader.next();
572       assertEquals(COL_COUNT, entry.getEdit().size());
573       int idx = 0;
574       for (KeyValue val : entry.getEdit().getKeyValues()) {
575         assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
576           entry.getKey().getEncodedRegionName()));
577         assertTrue(tableName.equals(entry.getKey().getTablename()));
578         assertTrue(Bytes.equals(row, val.getRow()));
579         assertEquals((byte)(idx + '0'), val.getValue()[0]);
580         System.out.println(entry.getKey() + " " + val);
581         idx++;
582       }
583     } finally {
584       if (log != null) {
585         log.closeAndDelete();
586       }
587       if (reader != null) {
588         reader.close();
589       }
590     }
591   }
592 
593   /**
594    * Test that we can visit entries before they are appended
595    * @throws Exception
596    */
597   @Test
598   public void testVisitors() throws Exception {
599     final int COL_COUNT = 10;
600     final TableName tableName =
601         TableName.valueOf("tablename");
602     final byte [] row = Bytes.toBytes("row");
603     HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
604     final AtomicLong sequenceId = new AtomicLong(1);
605     try {
606       DumbWALActionsListener visitor = new DumbWALActionsListener();
607       log.registerWALActionsListener(visitor);
608       long timestamp = System.currentTimeMillis();
609       HTableDescriptor htd = new HTableDescriptor();
610       htd.addFamily(new HColumnDescriptor("column"));
611 
612       HRegionInfo hri = new HRegionInfo(tableName,
613           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
614       for (int i = 0; i < COL_COUNT; i++) {
615         WALEdit cols = new WALEdit();
616         cols.add(new KeyValue(row, Bytes.toBytes("column"),
617             Bytes.toBytes(Integer.toString(i)),
618             timestamp, new byte[]{(byte) (i + '0')}));
619         log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
620       }
621       assertEquals(COL_COUNT, visitor.increments);
622       log.unregisterWALActionsListener(visitor);
623       WALEdit cols = new WALEdit();
624       cols.add(new KeyValue(row, Bytes.toBytes("column"),
625           Bytes.toBytes(Integer.toString(11)),
626           timestamp, new byte[]{(byte) (11 + '0')}));
627       log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
628       assertEquals(COL_COUNT, visitor.increments);
629     } finally {
630       if (log != null) log.closeAndDelete();
631     }
632   }
633 
634   @Test
635   public void testLogCleaning() throws Exception {
636     LOG.info("testLogCleaning");
637     final TableName tableName =
638         TableName.valueOf("testLogCleaning");
639     final TableName tableName2 =
640         TableName.valueOf("testLogCleaning2");
641 
642     HLog log = HLogFactory.createHLog(fs, hbaseDir,
643         getName(), conf);
644     final AtomicLong sequenceId = new AtomicLong(1);
645     try {
646       HRegionInfo hri = new HRegionInfo(tableName,
647           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
648       HRegionInfo hri2 = new HRegionInfo(tableName2,
649           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
650 
651       // Add a single edit and make sure that rolling won't remove the file
652       // Before HBASE-3198 it used to delete it
653       addEdits(log, hri, tableName, 1, sequenceId);
654       log.rollWriter();
655       assertEquals(1, ((FSHLog) log).getNumRolledLogFiles());
656 
657       // See if there's anything wrong with more than 1 edit
658       addEdits(log, hri, tableName, 2, sequenceId);
659       log.rollWriter();
660       assertEquals(2, ((FSHLog) log).getNumRolledLogFiles());
661 
662       // Now mix edits from 2 regions, still no flushing
663       addEdits(log, hri, tableName, 1, sequenceId);
664       addEdits(log, hri2, tableName2, 1, sequenceId);
665       addEdits(log, hri, tableName, 1, sequenceId);
666       addEdits(log, hri2, tableName2, 1, sequenceId);
667       log.rollWriter();
668       assertEquals(3, ((FSHLog) log).getNumRolledLogFiles());
669 
670       // Flush the first region, we expect to see the first two files getting
671       // archived. We need to append something or writer won't be rolled.
672       addEdits(log, hri2, tableName2, 1, sequenceId);
673       log.startCacheFlush(hri.getEncodedNameAsBytes());
674       log.completeCacheFlush(hri.getEncodedNameAsBytes());
675       log.rollWriter();
676       assertEquals(2, ((FSHLog) log).getNumRolledLogFiles());
677 
678       // Flush the second region, which removes all the remaining output files
679       // since the oldest was completely flushed and the two others only contain
680       // flush information
681       addEdits(log, hri2, tableName2, 1, sequenceId);
682       log.startCacheFlush(hri2.getEncodedNameAsBytes());
683       log.completeCacheFlush(hri2.getEncodedNameAsBytes());
684       log.rollWriter();
685       assertEquals(0, ((FSHLog) log).getNumRolledLogFiles());
686     } finally {
687       if (log != null) log.closeAndDelete();
688     }
689   }
690 
691   @Test
692   public void testFailedToCreateHLogIfParentRenamed() throws IOException {
693     FSHLog log = (FSHLog)HLogFactory.createHLog(
694       fs, hbaseDir, "testFailedToCreateHLogIfParentRenamed", conf);
695     long filenum = System.currentTimeMillis();
696     Path path = log.computeFilename(filenum);
697     HLogFactory.createWALWriter(fs, path, conf);
698     Path parent = path.getParent();
699     path = log.computeFilename(filenum + 1);
700     Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
701     fs.rename(parent, newPath);
702     try {
703       HLogFactory.createWALWriter(fs, path, conf);
704       fail("It should fail to create the new WAL");
705     } catch (IOException ioe) {
706       // expected, good.
707     }
708   }
709 
710   @Test
711   public void testGetServerNameFromHLogDirectoryName() throws IOException {
712     ServerName sn = ServerName.valueOf("hn", 450, 1398);
713     String hl = FSUtils.getRootDir(conf) + "/" + HLogUtil.getHLogDirectoryName(sn.toString());
714 
715     // Must not throw exception
716     Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, null));
717     Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf,
718         FSUtils.getRootDir(conf).toUri().toString()));
719     Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, ""));
720     Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, "                  "));
721     Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, hl));
722     Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, hl + "qdf"));
723     Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, "sfqf" + hl + "qdf"));
724 
725     final String wals = "/WALs/";
726     ServerName parsed = HLogUtil.getServerNameFromHLogDirectoryName(conf,
727       FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
728       "/localhost%2C32984%2C1343316388997.1343316390417");
729     Assert.assertEquals("standard",  sn, parsed);
730 
731     parsed = HLogUtil.getServerNameFromHLogDirectoryName(conf, hl + "/qdf");
732     Assert.assertEquals("subdir", sn, parsed);
733 
734     parsed = HLogUtil.getServerNameFromHLogDirectoryName(conf,
735       FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
736       "-splitting/localhost%3A57020.1340474893931");
737     Assert.assertEquals("split", sn, parsed);
738   }
739 
740   /**
741    * A loaded WAL coprocessor won't break existing HLog test cases.
742    */
743   @Test
744   public void testWALCoprocessorLoaded() throws Exception {
745     // test to see whether the coprocessor is loaded or not.
746     HLog log = HLogFactory.createHLog(fs, hbaseDir,
747         getName(), conf);
748     try {
749       WALCoprocessorHost host = log.getCoprocessorHost();
750       Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
751       assertNotNull(c);
752     } finally {
753       if (log != null) log.closeAndDelete();
754     }
755   }
756 
757   private void addEdits(HLog log, HRegionInfo hri, TableName tableName,
758                         int times, AtomicLong sequenceId) throws IOException {
759     HTableDescriptor htd = new HTableDescriptor();
760     htd.addFamily(new HColumnDescriptor("row"));
761 
762     final byte [] row = Bytes.toBytes("row");
763     for (int i = 0; i < times; i++) {
764       long timestamp = System.currentTimeMillis();
765       WALEdit cols = new WALEdit();
766       cols.add(new KeyValue(row, row, row, timestamp, row));
767       log.append(hri, tableName, cols, timestamp, htd, sequenceId);
768     }
769   }
770 
771 
772   /**
773    * @throws IOException
774    */
775   @Test
776   public void testReadLegacyLog() throws IOException {
777     final int columnCount = 5;
778     final int recordCount = 5;
779     final TableName tableName =
780         TableName.valueOf("tablename");
781     final byte[] row = Bytes.toBytes("row");
782     long timestamp = System.currentTimeMillis();
783     Path path = new Path(dir, "temphlog");
784     SequenceFileLogWriter sflw = null;
785     HLog.Reader reader = null;
786     try {
787       HRegionInfo hri = new HRegionInfo(tableName,
788           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
789       HTableDescriptor htd = new HTableDescriptor(tableName);
790       fs.mkdirs(dir);
791       // Write log in pre-PB format.
792       sflw = new SequenceFileLogWriter();
793       sflw.init(fs, path, conf, false);
794       for (int i = 0; i < recordCount; ++i) {
795         HLogKey key = new HLogKey(
796             hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
797         WALEdit edit = new WALEdit();
798         for (int j = 0; j < columnCount; ++j) {
799           if (i == 0) {
800             htd.addFamily(new HColumnDescriptor("column" + j));
801           }
802           String value = i + "" + j;
803           edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
804         }
805         sflw.append(new HLog.Entry(key, edit));
806       }
807       sflw.sync();
808       sflw.close();
809 
810       // Now read the log using standard means.
811       reader = HLogFactory.createReader(fs, path, conf);
812       assertTrue(reader instanceof SequenceFileLogReader);
813       for (int i = 0; i < recordCount; ++i) {
814         HLog.Entry entry = reader.next();
815         assertNotNull(entry);
816         assertEquals(columnCount, entry.getEdit().size());
817         assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
818         assertEquals(tableName, entry.getKey().getTablename());
819         int idx = 0;
820         for (KeyValue val : entry.getEdit().getKeyValues()) {
821           assertTrue(Bytes.equals(row, val.getRow()));
822           String value = i + "" + idx;
823           assertArrayEquals(Bytes.toBytes(value), val.getValue());
824           idx++;
825         }
826       }
827       HLog.Entry entry = reader.next();
828       assertNull(entry);
829     } finally {
830       if (sflw != null) {
831         sflw.close();
832       }
833       if (reader != null) {
834         reader.close();
835       }
836     }
837   }
838 
839   /**
840    * Reads the WAL with and without WALTrailer.
841    * @throws IOException
842    */
843   @Test
844   public void testWALTrailer() throws IOException {
845     // read With trailer.
846     doRead(true);
847     // read without trailer
848     doRead(false);
849   }
850 
851   /**
852    * Appends entries in the WAL and reads it.
853    * @param withTrailer If 'withTrailer' is true, it calls a close on the WALwriter before reading
854    *          so that a trailer is appended to the WAL. Otherwise, it starts reading after the sync
855    *          call. This means that reader is not aware of the trailer. In this scenario, if the
856    *          reader tries to read the trailer in its next() call, it returns false from
857    *          ProtoBufLogReader.
858    * @throws IOException
859    */
860   private void doRead(boolean withTrailer) throws IOException {
861     final int columnCount = 5;
862     final int recordCount = 5;
863     final TableName tableName =
864         TableName.valueOf("tablename");
865     final byte[] row = Bytes.toBytes("row");
866     long timestamp = System.currentTimeMillis();
867     Path path = new Path(dir, "temphlog");
868     // delete the log if already exists, for test only
869     fs.delete(path, true);
870     HLog.Writer writer = null;
871     HLog.Reader reader = null;
872     try {
873       HRegionInfo hri = new HRegionInfo(tableName,
874           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
875       HTableDescriptor htd = new HTableDescriptor(tableName);
876       fs.mkdirs(dir);
877       // Write log in pb format.
878       writer = HLogFactory.createWALWriter(fs, path, conf);
879       for (int i = 0; i < recordCount; ++i) {
880         HLogKey key = new HLogKey(
881             hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
882         WALEdit edit = new WALEdit();
883         for (int j = 0; j < columnCount; ++j) {
884           if (i == 0) {
885             htd.addFamily(new HColumnDescriptor("column" + j));
886           }
887           String value = i + "" + j;
888           edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
889         }
890         writer.append(new HLog.Entry(key, edit));
891       }
892       writer.sync();
893       if (withTrailer) writer.close();
894 
895       // Now read the log using standard means.
896       reader = HLogFactory.createReader(fs, path, conf);
897       assertTrue(reader instanceof ProtobufLogReader);
898       if (withTrailer) {
899         assertNotNull(reader.getWALTrailer());
900       } else {
901         assertNull(reader.getWALTrailer());
902       }
903       for (int i = 0; i < recordCount; ++i) {
904         HLog.Entry entry = reader.next();
905         assertNotNull(entry);
906         assertEquals(columnCount, entry.getEdit().size());
907         assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
908         assertEquals(tableName, entry.getKey().getTablename());
909         int idx = 0;
910         for (KeyValue val : entry.getEdit().getKeyValues()) {
911           assertTrue(Bytes.equals(row, val.getRow()));
912           String value = i + "" + idx;
913           assertArrayEquals(Bytes.toBytes(value), val.getValue());
914           idx++;
915         }
916       }
917       HLog.Entry entry = reader.next();
918       assertNull(entry);
919     } finally {
920       if (writer != null) {
921         writer.close();
922       }
923       if (reader != null) {
924         reader.close();
925       }
926     }
927   }
928 
929   /**
930    * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws
931    * exception if we do). Comparison is based on the timestamp present in the wal name.
932    * @throws Exception
933    */
934   @Test
935   public void testHLogComparator() throws Exception {
936     HLog hlog1 = null;
937     HLog hlogMeta = null;
938     try {
939       hlog1 = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf);
940       LOG.debug("Log obtained is: " + hlog1);
941       Comparator<Path> comp = ((FSHLog) hlog1).LOG_NAME_COMPARATOR;
942       Path p1 = ((FSHLog) hlog1).computeFilename(11);
943       Path p2 = ((FSHLog) hlog1).computeFilename(12);
944       // comparing with itself returns 0
945       assertTrue(comp.compare(p1, p1) == 0);
946       // comparing with different filenum.
947       assertTrue(comp.compare(p1, p2) < 0);
948       hlogMeta = HLogFactory.createMetaHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf,
949         null, null);
950       Comparator<Path> compMeta = ((FSHLog) hlogMeta).LOG_NAME_COMPARATOR;
951 
952       Path p1WithMeta = ((FSHLog) hlogMeta).computeFilename(11);
953       Path p2WithMeta = ((FSHLog) hlogMeta).computeFilename(12);
954       assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
955       assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
956       // mixing meta and non-meta logs gives error
957       boolean ex = false;
958       try {
959         comp.compare(p1WithMeta, p2);
960       } catch (Exception e) {
961         ex = true;
962       }
963       assertTrue("Comparator doesn't complain while checking meta log files", ex);
964       boolean exMeta = false;
965       try {
966         compMeta.compare(p1WithMeta, p2);
967       } catch (Exception e) {
968         exMeta = true;
969       }
970       assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
971     } finally {
972       if (hlog1 != null) hlog1.close();
973       if (hlogMeta != null) hlogMeta.close();
974     }
975   }
976 
977   /**
978    * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs
979    * and also don't archive "live logs" (that is, a log with un-flushed entries).
980    * <p>
981    * This is what it does:
982    * It creates two regions, and does a series of inserts along with log rolling.
983    * Whenever a WAL is rolled, FSHLog checks previous wals for archiving. A wal is eligible for
984    * archiving if for all the regions which have entries in that wal file, have flushed - past
985    * their maximum sequence id in that wal file.
986    * <p>
987    * @throws IOException
988    */
989   @Test
990   public void testWALArchiving() throws IOException {
991     LOG.debug("testWALArchiving");
992     TableName table1 = TableName.valueOf("t1");
993     TableName table2 = TableName.valueOf("t2");
994     HLog hlog = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf);
995     try {
996       assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
997       HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW,
998           HConstants.EMPTY_END_ROW);
999       HRegionInfo hri2 = new HRegionInfo(table2, HConstants.EMPTY_START_ROW,
1000           HConstants.EMPTY_END_ROW);
1001       // ensure that we don't split the regions.
1002       hri1.setSplit(false);
1003       hri2.setSplit(false);
1004       // variables to mock region sequenceIds.
1005       final AtomicLong sequenceId1 = new AtomicLong(1);
1006       final AtomicLong sequenceId2 = new AtomicLong(1);
1007       // start with the testing logic: insert a waledit, and roll writer
1008       addEdits(hlog, hri1, table1, 1, sequenceId1);
1009       hlog.rollWriter();
1010       // assert that the wal is rolled
1011       assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1012       // add edits in the second wal file, and roll writer.
1013       addEdits(hlog, hri1, table1, 1, sequenceId1);
1014       hlog.rollWriter();
1015       // assert that the wal is rolled
1016       assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles());
1017       // add a waledit to table1, and flush the region.
1018       addEdits(hlog, hri1, table1, 3, sequenceId1);
1019       flushRegion(hlog, hri1.getEncodedNameAsBytes());
1020       // roll log; all old logs should be archived.
1021       hlog.rollWriter();
1022       assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
1023       // add an edit to table2, and roll writer
1024       addEdits(hlog, hri2, table2, 1, sequenceId2);
1025       hlog.rollWriter();
1026       assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1027       // add edits for table1, and roll writer
1028       addEdits(hlog, hri1, table1, 2, sequenceId1);
1029       hlog.rollWriter();
1030       assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles());
1031       // add edits for table2, and flush hri1.
1032       addEdits(hlog, hri2, table2, 2, sequenceId2);
1033       flushRegion(hlog, hri1.getEncodedNameAsBytes());
1034       // the log : region-sequenceId map is
1035       // log1: region2 (unflushed)
1036       // log2: region1 (flushed)
1037       // log3: region2 (unflushed)
1038       // roll the writer; log2 should be archived.
1039       hlog.rollWriter();
1040       assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles());
1041       // flush region2, and all logs should be archived.
1042       addEdits(hlog, hri2, table2, 2, sequenceId2);
1043       flushRegion(hlog, hri2.getEncodedNameAsBytes());
1044       hlog.rollWriter();
1045       assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
1046     } finally {
1047       if (hlog != null) hlog.close();
1048     }
1049   }
1050 
1051   /**
1052    * On rolling a wal after reaching the threshold, {@link HLog#rollWriter()} returns the list of
1053    * regions which should be flushed in order to archive the oldest wal file.
1054    * <p>
1055    * This method tests this behavior by inserting edits and rolling the wal enough times to reach
1056    * the max number of logs threshold. It checks whether we get the "right regions" for flush on
1057    * rolling the wal.
1058    * @throws Exception
1059    */
1060   @Test
1061   public void testFindMemStoresEligibleForFlush() throws Exception {
1062     LOG.debug("testFindMemStoresEligibleForFlush");
1063     Configuration conf1 = HBaseConfiguration.create(conf);
1064     conf1.setInt("hbase.regionserver.maxlogs", 1);
1065     HLog hlog = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf1), dir.toString(), conf1);
1066     TableName t1 = TableName.valueOf("t1");
1067     TableName t2 = TableName.valueOf("t2");
1068     HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
1069     HRegionInfo hri2 = new HRegionInfo(t2, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
1070     // variables to mock region sequenceIds
1071     final AtomicLong sequenceId1 = new AtomicLong(1);
1072     final AtomicLong sequenceId2 = new AtomicLong(1);
1073     // add edits and roll the wal
1074     try {
1075       addEdits(hlog, hri1, t1, 2, sequenceId1);
1076       hlog.rollWriter();
1077       // add some more edits and roll the wal. This would reach the log number threshold
1078       addEdits(hlog, hri1, t1, 2, sequenceId1);
1079       hlog.rollWriter();
1080       // with above rollWriter call, the max logs limit is reached.
1081       assertTrue(((FSHLog) hlog).getNumRolledLogFiles() == 2);
1082 
1083       // get the regions to flush; since there is only one region in the oldest wal, it should
1084       // return only one region.
1085       byte[][] regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush();
1086       assertEquals(1, regionsToFlush.length);
1087       assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
1088       // insert edits in second region
1089       addEdits(hlog, hri2, t2, 2, sequenceId2);
1090       // get the regions to flush, it should still read region1.
1091       regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush();
1092       assertEquals(regionsToFlush.length, 1);
1093       assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
1094       // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
1095       // remain.
1096       flushRegion(hlog, hri1.getEncodedNameAsBytes());
1097       hlog.rollWriter();
1098       // only one wal should remain now (that is for the second region).
1099       assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1100       // flush the second region
1101       flushRegion(hlog, hri2.getEncodedNameAsBytes());
1102       hlog.rollWriter(true);
1103       // no wal should remain now.
1104       assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
1105       // add edits both to region 1 and region 2, and roll.
1106       addEdits(hlog, hri1, t1, 2, sequenceId1);
1107       addEdits(hlog, hri2, t2, 2, sequenceId2);
1108       hlog.rollWriter();
1109       // add edits and roll the writer, to reach the max logs limit.
1110       assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1111       addEdits(hlog, hri1, t1, 2, sequenceId1);
1112       hlog.rollWriter();
1113       // it should return two regions to flush, as the oldest wal file has entries
1114       // for both regions.
1115       regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush();
1116       assertEquals(2, regionsToFlush.length);
1117       // flush both regions
1118       flushRegion(hlog, hri1.getEncodedNameAsBytes());
1119       flushRegion(hlog, hri2.getEncodedNameAsBytes());
1120       hlog.rollWriter(true);
1121       assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
1122       // Add an edit to region1, and roll the wal.
1123       addEdits(hlog, hri1, t1, 2, sequenceId1);
1124       // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
1125       hlog.startCacheFlush(hri1.getEncodedNameAsBytes());
1126       hlog.rollWriter();
1127       hlog.completeCacheFlush(hri1.getEncodedNameAsBytes());
1128       assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1129     } finally {
1130       if (hlog != null) hlog.close();
1131     }
1132   }
1133 
1134   /**
1135    * Simulates HLog append ops for a region and tests
1136    * {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} API.
1137    * It compares the region sequenceIds with oldestFlushing and oldestUnFlushed entries.
1138    * If a region's entries are larger than min of (oldestFlushing, oldestUnFlushed), then the
1139    * region should be flushed before archiving this WAL.
1140   */
1141   @Test
1142   public void testAllRegionsFlushed() {
1143     LOG.debug("testAllRegionsFlushed");
1144     Map<byte[], Long> oldestFlushingSeqNo = new HashMap<byte[], Long>();
1145     Map<byte[], Long> oldestUnFlushedSeqNo = new HashMap<byte[], Long>();
1146     Map<byte[], Long> seqNo = new HashMap<byte[], Long>();
1147     // create a table
1148     TableName t1 = TableName.valueOf("t1");
1149     // create a region
1150     HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
1151     // variables to mock region sequenceIds
1152     final AtomicLong sequenceId1 = new AtomicLong(1);
1153     // test empty map
1154     assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1155     // add entries in the region
1156     seqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.incrementAndGet());
1157     oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
1158     // should say region1 is not flushed.
1159     assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1160     // test with entries in oldestFlushing map.
1161     oldestUnFlushedSeqNo.clear();
1162     oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
1163     assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1164     // simulate region flush, i.e., clear oldestFlushing and oldestUnflushed maps
1165     oldestFlushingSeqNo.clear();
1166     oldestUnFlushedSeqNo.clear();
1167     assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1168     // insert some large values for region1
1169     oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), 1000l);
1170     seqNo.put(hri1.getEncodedNameAsBytes(), 1500l);
1171     assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1172 
1173     // tests when oldestUnFlushed/oldestFlushing contains larger value.
1174     // It means region is flushed.
1175     oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), 1200l);
1176     oldestUnFlushedSeqNo.clear();
1177     seqNo.put(hri1.getEncodedNameAsBytes(), 1199l);
1178     assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1179   }
1180 
1181   /**
1182    * helper method to simulate region flush for a WAL.
1183    * @param hlog
1184    * @param regionEncodedName
1185    */
1186   private void flushRegion(HLog hlog, byte[] regionEncodedName) {
1187     hlog.startCacheFlush(regionEncodedName);
1188     hlog.completeCacheFlush(regionEncodedName);
1189   }
1190 
1191   static class DumbWALActionsListener implements WALActionsListener {
1192     int increments = 0;
1193 
1194     @Override
1195     public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
1196                                          WALEdit logEdit) {
1197       increments++;
1198     }
1199 
1200     @Override
1201     public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {
1202       //To change body of implemented methods use File | Settings | File Templates.
1203       increments++;
1204     }
1205 
1206     @Override
1207     public void preLogRoll(Path oldFile, Path newFile) {
1208       // TODO Auto-generated method stub
1209     }
1210 
1211     @Override
1212     public void postLogRoll(Path oldFile, Path newFile) {
1213       // TODO Auto-generated method stub
1214     }
1215 
1216     @Override
1217     public void preLogArchive(Path oldFile, Path newFile) {
1218       // TODO Auto-generated method stub
1219     }
1220 
1221     @Override
1222     public void postLogArchive(Path oldFile, Path newFile) {
1223       // TODO Auto-generated method stub
1224     }
1225 
1226     @Override
1227     public void logRollRequested() {
1228       // TODO Auto-generated method stub
1229 
1230     }
1231 
1232     @Override
1233     public void logCloseRequested() {
1234       // not interested
1235     }
1236   }
1237 
1238 }
1239