View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.wal;
20  
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertFalse;
24  import static org.junit.Assert.assertNotNull;
25  import static org.junit.Assert.assertNull;
26  import static org.junit.Assert.assertTrue;
27  import static org.junit.Assert.fail;
28  
29  import java.io.IOException;
30  import java.lang.reflect.Method;
31  import java.net.BindException;
32  import java.util.List;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FSDataInputStream;
38  import org.apache.hadoop.fs.FSDataOutputStream;
39  import org.apache.hadoop.fs.FileStatus;
40  import org.apache.hadoop.fs.FileSystem;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.Cell;
43  import org.apache.hadoop.hbase.Coprocessor;
44  import org.apache.hadoop.hbase.HBaseTestingUtility;
45  import org.apache.hadoop.hbase.HColumnDescriptor;
46  import org.apache.hadoop.hbase.HConstants;
47  import org.apache.hadoop.hbase.HRegionInfo;
48  import org.apache.hadoop.hbase.HTableDescriptor;
49  import org.apache.hadoop.hbase.KeyValue;
50  import org.apache.hadoop.hbase.TableName;
51  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
52  import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
53  import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
54  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
55  import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader;
56  import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogWriter;
57  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
58  import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
59  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
60  import org.apache.hadoop.hbase.testclassification.MediumTests;
61  import org.apache.hadoop.hbase.util.Bytes;
62  import org.apache.hadoop.hbase.util.FSUtils;
63  import org.apache.hadoop.hbase.util.Threads;
64  import org.apache.hadoop.hdfs.DistributedFileSystem;
65  import org.apache.hadoop.hdfs.MiniDFSCluster;
66  import org.apache.hadoop.hdfs.protocol.HdfsConstants;
67  import org.junit.After;
68  import org.junit.AfterClass;
69  import org.junit.Before;
70  import org.junit.BeforeClass;
71  import org.junit.Rule;
72  import org.junit.Test;
73  import org.junit.experimental.categories.Category;
74  import org.junit.rules.TestName;
75  
76  /**
77   * WAL tests that can be reused across providers.
78   */
79  @Category(MediumTests.class)
80  public class TestWALFactory {
81    private static final Log LOG = LogFactory.getLog(TestWALFactory.class);
82  
83    protected static Configuration conf;
84    private static MiniDFSCluster cluster;
85    protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
86    protected static Path hbaseDir;
87  
88    protected FileSystem fs;
89    protected Path dir;
90    protected WALFactory wals;
91  
92    @Rule
93    public final TestName currentTest = new TestName();
94  
95    @Before
96    public void setUp() throws Exception {
97      fs = cluster.getFileSystem();
98      dir = new Path(hbaseDir, currentTest.getMethodName());
99      wals = new WALFactory(conf, null, currentTest.getMethodName());
100   }
101 
102   @After
103   public void tearDown() throws Exception {
104     // testAppendClose closes the FileSystem, which will prevent us from closing cleanly here.
105     try {
106       wals.close();
107     } catch (IOException exception) {
108       LOG.warn("Encountered exception while closing wal factory. If you have other errors, this" +
109           " may be the cause. Message: " + exception);
110       LOG.debug("Exception details for failure to close wal factory.", exception);
111     }
112     FileStatus[] entries = fs.listStatus(new Path("/"));
113     for (FileStatus dir : entries) {
114       fs.delete(dir.getPath(), true);
115     }
116   }
117 
118   @BeforeClass
119   public static void setUpBeforeClass() throws Exception {
120     // Make block sizes small.
121     TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
122     // needed for testAppendClose()
123     TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
124     TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
125     // quicker heartbeat interval for faster DN death notification
126     TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
127     TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
128     TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
129 
130     // faster failover with cluster.shutdown();fs.close() idiom
131     TEST_UTIL.getConfiguration()
132         .setInt("hbase.ipc.client.connect.max.retries", 1);
133     TEST_UTIL.getConfiguration().setInt(
134         "dfs.client.block.recovery.retries", 1);
135     TEST_UTIL.getConfiguration().setInt(
136       "hbase.ipc.client.connection.maxidletime", 500);
137     TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
138         SampleRegionWALObserver.class.getName());
139     TEST_UTIL.startMiniDFSCluster(3);
140 
141     conf = TEST_UTIL.getConfiguration();
142     cluster = TEST_UTIL.getDFSCluster();
143 
144     hbaseDir = TEST_UTIL.createRootDir();
145   }
146 
147   @AfterClass
148   public static void tearDownAfterClass() throws Exception {
149     TEST_UTIL.shutdownMiniCluster();
150   }
151 
152   @Test
153   public void canCloseSingleton() throws IOException {
154     WALFactory.getInstance(conf).close();
155   }
156 
157   /**
158    * Just write multiple logs then split.  Before fix for HADOOP-2283, this
159    * would fail.
160    * @throws IOException
161    */
162   @Test
163   public void testSplit() throws IOException {
164     final TableName tableName = TableName.valueOf(currentTest.getMethodName());
165     final byte [] rowName = tableName.getName();
166     final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
167     final Path logdir = new Path(hbaseDir,
168         DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName()));
169     Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
170     final int howmany = 3;
171     HRegionInfo[] infos = new HRegionInfo[3];
172     Path tabledir = FSUtils.getTableDir(hbaseDir, tableName);
173     fs.mkdirs(tabledir);
174     for(int i = 0; i < howmany; i++) {
175       infos[i] = new HRegionInfo(tableName,
176                 Bytes.toBytes("" + i), Bytes.toBytes("" + (i+1)), false);
177       fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
178       LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
179     }
180     HTableDescriptor htd = new HTableDescriptor(tableName);
181     htd.addFamily(new HColumnDescriptor("column"));
182 
183     // Add edits for three regions.
184     for (int ii = 0; ii < howmany; ii++) {
185       for (int i = 0; i < howmany; i++) {
186         final WAL log = wals.getWAL(infos[i].getEncodedNameAsBytes());
187         for (int j = 0; j < howmany; j++) {
188           WALEdit edit = new WALEdit();
189           byte [] family = Bytes.toBytes("column");
190           byte [] qualifier = Bytes.toBytes(Integer.toString(j));
191           byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
192           edit.add(new KeyValue(rowName, family, qualifier,
193               System.currentTimeMillis(), column));
194           LOG.info("Region " + i + ": " + edit);
195           WALKey walKey =  new WALKey(infos[i].getEncodedNameAsBytes(), tableName,
196               System.currentTimeMillis(), mvcc);
197           log.append(htd, infos[i], walKey, edit, true);
198           walKey.getWriteEntry();
199         }
200         log.sync();
201         log.rollWriter(true);
202       }
203     }
204     wals.shutdown();
205     List<Path> splits = WALSplitter.split(hbaseDir, logdir, oldLogDir, fs, conf, wals);
206     verifySplits(splits, howmany);
207   }
208 
209   /**
210    * Test new HDFS-265 sync.
211    * @throws Exception
212    */
213   @Test
214   public void Broken_testSync() throws Exception {
215     TableName tableName = TableName.valueOf(currentTest.getMethodName());
216     MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
217     // First verify that using streams all works.
218     Path p = new Path(dir, currentTest.getMethodName() + ".fsdos");
219     FSDataOutputStream out = fs.create(p);
220     out.write(tableName.getName());
221     Method syncMethod = null;
222     try {
223       syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
224     } catch (NoSuchMethodException e) {
225       try {
226         syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
227       } catch (NoSuchMethodException ex) {
228         fail("This version of Hadoop supports neither Syncable.sync() " +
229             "nor Syncable.hflush().");
230       }
231     }
232     syncMethod.invoke(out, new Object[]{});
233     FSDataInputStream in = fs.open(p);
234     assertTrue(in.available() > 0);
235     byte [] buffer = new byte [1024];
236     int read = in.read(buffer);
237     assertEquals(tableName.getName().length, read);
238     out.close();
239     in.close();
240 
241     final int total = 20;
242     WAL.Reader reader = null;
243 
244     try {
245       HRegionInfo info = new HRegionInfo(tableName,
246                   null,null, false);
247       HTableDescriptor htd = new HTableDescriptor();
248       htd.addFamily(new HColumnDescriptor(tableName.getName()));
249       final WAL wal = wals.getWAL(info.getEncodedNameAsBytes());
250 
251       for (int i = 0; i < total; i++) {
252         WALEdit kvs = new WALEdit();
253         kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
254         wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
255             System.currentTimeMillis(), mvcc), kvs, true);
256       }
257       // Now call sync and try reading.  Opening a Reader before you sync just
258       // gives you EOFE.
259       wal.sync();
260       // Open a Reader.
261       Path walPath = DefaultWALProvider.getCurrentFileName(wal);
262       reader = wals.createReader(fs, walPath);
263       int count = 0;
264       WAL.Entry entry = new WAL.Entry();
265       while ((entry = reader.next(entry)) != null) count++;
266       assertEquals(total, count);
267       reader.close();
268       // Add test that checks to see that an open of a Reader works on a file
269       // that has had a sync done on it.
270       for (int i = 0; i < total; i++) {
271         WALEdit kvs = new WALEdit();
272         kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
273         wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
274             System.currentTimeMillis(), mvcc), kvs, true);
275       }
276       wal.sync();
277       reader = wals.createReader(fs, walPath);
278       count = 0;
279       while((entry = reader.next(entry)) != null) count++;
280       assertTrue(count >= total);
281       reader.close();
282       // If I sync, should see double the edits.
283       wal.sync();
284       reader = wals.createReader(fs, walPath);
285       count = 0;
286       while((entry = reader.next(entry)) != null) count++;
287       assertEquals(total * 2, count);
288       reader.close();
289       // Now do a test that ensures stuff works when we go over block boundary,
290       // especially that we return good length on file.
291       final byte [] value = new byte[1025 * 1024];  // Make a 1M value.
292       for (int i = 0; i < total; i++) {
293         WALEdit kvs = new WALEdit();
294         kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
295         wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
296             System.currentTimeMillis(), mvcc), kvs,  true);
297       }
298       // Now I should have written out lots of blocks.  Sync then read.
299       wal.sync();
300       reader = wals.createReader(fs, walPath);
301       count = 0;
302       while((entry = reader.next(entry)) != null) count++;
303       assertEquals(total * 3, count);
304       reader.close();
305       // shutdown and ensure that Reader gets right length also.
306       wal.shutdown();
307       reader = wals.createReader(fs, walPath);
308       count = 0;
309       while((entry = reader.next(entry)) != null) count++;
310       assertEquals(total * 3, count);
311       reader.close();
312     } finally {
313       if (reader != null) reader.close();
314     }
315   }
316 
317   private void verifySplits(final List<Path> splits, final int howmany)
318   throws IOException {
319     assertEquals(howmany * howmany, splits.size());
320     for (int i = 0; i < splits.size(); i++) {
321       LOG.info("Verifying=" + splits.get(i));
322       WAL.Reader reader = wals.createReader(fs, splits.get(i));
323       try {
324         int count = 0;
325         String previousRegion = null;
326         long seqno = -1;
327         WAL.Entry entry = new WAL.Entry();
328         while((entry = reader.next(entry)) != null) {
329           WALKey key = entry.getKey();
330           String region = Bytes.toString(key.getEncodedRegionName());
331           // Assert that all edits are for same region.
332           if (previousRegion != null) {
333             assertEquals(previousRegion, region);
334           }
335           LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getLogSeqNum());
336           assertTrue(seqno < key.getLogSeqNum());
337           seqno = key.getLogSeqNum();
338           previousRegion = region;
339           count++;
340         }
341         assertEquals(howmany, count);
342       } finally {
343         reader.close();
344       }
345     }
346   }
347 
348   /*
349    * We pass different values to recoverFileLease() so that different code paths are covered
350    *
351    * For this test to pass, requires:
352    * 1. HDFS-200 (append support)
353    * 2. HDFS-988 (SafeMode should freeze file operations
354    *              [FSNamesystem.nextGenerationStampForBlock])
355    * 3. HDFS-142 (on restart, maintain pendingCreates)
356    */
357   @Test (timeout=300000)
358   public void testAppendClose() throws Exception {
359     TableName tableName =
360         TableName.valueOf(currentTest.getMethodName());
361     HRegionInfo regioninfo = new HRegionInfo(tableName,
362              HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
363 
364     final WAL wal = wals.getWAL(regioninfo.getEncodedNameAsBytes());
365     final int total = 20;
366 
367     HTableDescriptor htd = new HTableDescriptor();
368     htd.addFamily(new HColumnDescriptor(tableName.getName()));
369 
370     for (int i = 0; i < total; i++) {
371       WALEdit kvs = new WALEdit();
372       kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
373       wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
374           System.currentTimeMillis()), kvs,  true);
375     }
376     // Now call sync to send the data to HDFS datanodes
377     wal.sync();
378      int namenodePort = cluster.getNameNodePort();
379     final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
380 
381 
382     // Stop the cluster.  (ensure restart since we're sharing MiniDFSCluster)
383     try {
384       DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
385       dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
386       TEST_UTIL.shutdownMiniDFSCluster();
387       try {
388         // wal.writer.close() will throw an exception,
389         // but still call this since it closes the LogSyncer thread first
390         wal.shutdown();
391       } catch (IOException e) {
392         LOG.info(e);
393       }
394       fs.close(); // closing FS last so DFSOutputStream can't call close
395       LOG.info("STOPPED first instance of the cluster");
396     } finally {
397       // Restart the cluster
398       while (cluster.isClusterUp()){
399         LOG.error("Waiting for cluster to go down");
400         Thread.sleep(1000);
401       }
402       assertFalse(cluster.isClusterUp());
403       cluster = null;
404       for (int i = 0; i < 100; i++) {
405         try {
406           cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort);
407           break;
408         } catch (BindException e) {
409           LOG.info("Sleeping.  BindException bringing up new cluster");
410           Threads.sleep(1000);
411         }
412       }
413       cluster.waitActive();
414       fs = cluster.getFileSystem();
415       LOG.info("STARTED second instance.");
416     }
417 
418     // set the lease period to be 1 second so that the
419     // namenode triggers lease recovery upon append request
420     Method setLeasePeriod = cluster.getClass()
421       .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
422     setLeasePeriod.setAccessible(true);
423     setLeasePeriod.invoke(cluster, 1000L, 1000L);
424     try {
425       Thread.sleep(1000);
426     } catch (InterruptedException e) {
427       LOG.info(e);
428     }
429 
430     // Now try recovering the log, like the HMaster would do
431     final FileSystem recoveredFs = fs;
432     final Configuration rlConf = conf;
433 
434     class RecoverLogThread extends Thread {
435       public Exception exception = null;
436       public void run() {
437           try {
438             FSUtils.getInstance(fs, rlConf)
439               .recoverFileLease(recoveredFs, walPath, rlConf, null);
440           } catch (IOException e) {
441             exception = e;
442           }
443       }
444     }
445 
446     RecoverLogThread t = new RecoverLogThread();
447     t.start();
448     // Timeout after 60 sec. Without correct patches, would be an infinite loop
449     t.join(60 * 1000);
450     if(t.isAlive()) {
451       t.interrupt();
452       throw new Exception("Timed out waiting for WAL.recoverLog()");
453     }
454 
455     if (t.exception != null)
456       throw t.exception;
457 
458     // Make sure you can read all the content
459     WAL.Reader reader = wals.createReader(fs, walPath);
460     int count = 0;
461     WAL.Entry entry = new WAL.Entry();
462     while (reader.next(entry) != null) {
463       count++;
464       assertTrue("Should be one KeyValue per WALEdit",
465                   entry.getEdit().getCells().size() == 1);
466     }
467     assertEquals(total, count);
468     reader.close();
469 
470     // Reset the lease period
471     setLeasePeriod.invoke(cluster, new Object[]{new Long(60000), new Long(3600000)});
472   }
473 
474   /**
475    * Tests that we can write out an edit, close, and then read it back in again.
476    * @throws IOException
477    */
478   @Test
479   public void testEditAdd() throws IOException {
480     final int COL_COUNT = 10;
481     final HTableDescriptor htd =
482         new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
483             "column"));
484     final byte [] row = Bytes.toBytes("row");
485     WAL.Reader reader = null;
486     try {
487       final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
488 
489       // Write columns named 1, 2, 3, etc. and then values of single byte
490       // 1, 2, 3...
491       long timestamp = System.currentTimeMillis();
492       WALEdit cols = new WALEdit();
493       for (int i = 0; i < COL_COUNT; i++) {
494         cols.add(new KeyValue(row, Bytes.toBytes("column"),
495             Bytes.toBytes(Integer.toString(i)),
496           timestamp, new byte[] { (byte)(i + '0') }));
497       }
498       HRegionInfo info = new HRegionInfo(htd.getTableName(),
499         row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
500       final WAL log = wals.getWAL(info.getEncodedNameAsBytes());
501 
502       final long txid = log.append(htd, info,
503         new WALKey(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
504             mvcc),
505         cols, true);
506       log.sync(txid);
507       log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getFamiliesKeys());
508       log.completeCacheFlush(info.getEncodedNameAsBytes());
509       log.shutdown();
510       Path filename = DefaultWALProvider.getCurrentFileName(log);
511       // Now open a reader on the log and assert append worked.
512       reader = wals.createReader(fs, filename);
513       // Above we added all columns on a single row so we only read one
514       // entry in the below... thats why we have '1'.
515       for (int i = 0; i < 1; i++) {
516         WAL.Entry entry = reader.next(null);
517         if (entry == null) break;
518         WALKey key = entry.getKey();
519         WALEdit val = entry.getEdit();
520         assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
521         assertTrue(htd.getTableName().equals(key.getTablename()));
522         Cell cell = val.getCells().get(0);
523         assertTrue(Bytes.equals(row, cell.getRow()));
524         assertEquals((byte)(i + '0'), cell.getValue()[0]);
525         System.out.println(key + " " + val);
526       }
527     } finally {
528       if (reader != null) {
529         reader.close();
530       }
531     }
532   }
533 
534   /**
535    * @throws IOException
536    */
537   @Test
538   public void testAppend() throws IOException {
539     final int COL_COUNT = 10;
540     final HTableDescriptor htd =
541         new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
542             "column"));
543     final byte [] row = Bytes.toBytes("row");
544     WAL.Reader reader = null;
545     final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
546     try {
547       // Write columns named 1, 2, 3, etc. and then values of single byte
548       // 1, 2, 3...
549       long timestamp = System.currentTimeMillis();
550       WALEdit cols = new WALEdit();
551       for (int i = 0; i < COL_COUNT; i++) {
552         cols.add(new KeyValue(row, Bytes.toBytes("column"),
553           Bytes.toBytes(Integer.toString(i)),
554           timestamp, new byte[] { (byte)(i + '0') }));
555       }
556       HRegionInfo hri = new HRegionInfo(htd.getTableName(),
557           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
558       final WAL log = wals.getWAL(hri.getEncodedNameAsBytes());
559       final long txid = log.append(htd, hri,
560         new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
561             mvcc),
562         cols, true);
563       log.sync(txid);
564       log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
565       log.completeCacheFlush(hri.getEncodedNameAsBytes());
566       log.shutdown();
567       Path filename = DefaultWALProvider.getCurrentFileName(log);
568       // Now open a reader on the log and assert append worked.
569       reader = wals.createReader(fs, filename);
570       WAL.Entry entry = reader.next();
571       assertEquals(COL_COUNT, entry.getEdit().size());
572       int idx = 0;
573       for (Cell val : entry.getEdit().getCells()) {
574         assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
575           entry.getKey().getEncodedRegionName()));
576         assertTrue(htd.getTableName().equals(entry.getKey().getTablename()));
577         assertTrue(Bytes.equals(row, val.getRow()));
578         assertEquals((byte)(idx + '0'), val.getValue()[0]);
579         System.out.println(entry.getKey() + " " + val);
580         idx++;
581       }
582     } finally {
583       if (reader != null) {
584         reader.close();
585       }
586     }
587   }
588 
589   /**
590    * Test that we can visit entries before they are appended
591    * @throws Exception
592    */
593   @Test
594   public void testVisitors() throws Exception {
595     final int COL_COUNT = 10;
596     final TableName tableName =
597         TableName.valueOf("tablename");
598     final byte [] row = Bytes.toBytes("row");
599     final DumbWALActionsListener visitor = new DumbWALActionsListener();
600     final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
601     long timestamp = System.currentTimeMillis();
602     HTableDescriptor htd = new HTableDescriptor();
603     htd.addFamily(new HColumnDescriptor("column"));
604 
605     HRegionInfo hri = new HRegionInfo(tableName,
606         HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
607     final WAL log = wals.getWAL(hri.getEncodedNameAsBytes());
608     log.registerWALActionsListener(visitor);
609     for (int i = 0; i < COL_COUNT; i++) {
610       WALEdit cols = new WALEdit();
611       cols.add(new KeyValue(row, Bytes.toBytes("column"),
612           Bytes.toBytes(Integer.toString(i)),
613           timestamp, new byte[]{(byte) (i + '0')}));
614       log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
615           System.currentTimeMillis(), mvcc), cols, true);
616     }
617     log.sync();
618     assertEquals(COL_COUNT, visitor.increments);
619     log.unregisterWALActionsListener(visitor);
620     WALEdit cols = new WALEdit();
621     cols.add(new KeyValue(row, Bytes.toBytes("column"),
622         Bytes.toBytes(Integer.toString(11)),
623         timestamp, new byte[]{(byte) (11 + '0')}));
624     log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
625         System.currentTimeMillis(), mvcc), cols, true);
626     log.sync();
627     assertEquals(COL_COUNT, visitor.increments);
628   }
629 
630   /**
631    * A loaded WAL coprocessor won't break existing WAL test cases.
632    */
633   @Test
634   public void testWALCoprocessorLoaded() throws Exception {
635     // test to see whether the coprocessor is loaded or not.
636     WALCoprocessorHost host = wals.getWAL(UNSPECIFIED_REGION).getCoprocessorHost();
637     Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
638     assertNotNull(c);
639   }
640 
641   /**
642    * @throws IOException
643    */
644   @Test
645   public void testReadLegacyLog() throws IOException {
646     final int columnCount = 5;
647     final int recordCount = 5;
648     final TableName tableName =
649         TableName.valueOf("tablename");
650     final byte[] row = Bytes.toBytes("row");
651     long timestamp = System.currentTimeMillis();
652     Path path = new Path(dir, "tempwal");
653     SequenceFileLogWriter sflw = null;
654     WAL.Reader reader = null;
655     try {
656       HRegionInfo hri = new HRegionInfo(tableName,
657           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
658       HTableDescriptor htd = new HTableDescriptor(tableName);
659       fs.mkdirs(dir);
660       // Write log in pre-PB format.
661       sflw = new SequenceFileLogWriter();
662       sflw.init(fs, path, conf, false);
663       for (int i = 0; i < recordCount; ++i) {
664         WALKey key = new HLogKey(
665             hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
666         WALEdit edit = new WALEdit();
667         for (int j = 0; j < columnCount; ++j) {
668           if (i == 0) {
669             htd.addFamily(new HColumnDescriptor("column" + j));
670           }
671           String value = i + "" + j;
672           edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
673         }
674         sflw.append(new WAL.Entry(key, edit));
675       }
676       sflw.sync();
677       sflw.close();
678 
679       // Now read the log using standard means.
680       reader = wals.createReader(fs, path);
681       assertTrue(reader instanceof SequenceFileLogReader);
682       for (int i = 0; i < recordCount; ++i) {
683         WAL.Entry entry = reader.next();
684         assertNotNull(entry);
685         assertEquals(columnCount, entry.getEdit().size());
686         assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
687         assertEquals(tableName, entry.getKey().getTablename());
688         int idx = 0;
689         for (Cell val : entry.getEdit().getCells()) {
690           assertTrue(Bytes.equals(row, val.getRow()));
691           String value = i + "" + idx;
692           assertArrayEquals(Bytes.toBytes(value), val.getValue());
693           idx++;
694         }
695       }
696       WAL.Entry entry = reader.next();
697       assertNull(entry);
698     } finally {
699       if (sflw != null) {
700         sflw.close();
701       }
702       if (reader != null) {
703         reader.close();
704       }
705     }
706   }
707 
708   static class DumbWALActionsListener extends WALActionsListener.Base {
709     int increments = 0;
710 
711     @Override
712     public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey,
713                                          WALEdit logEdit) {
714       increments++;
715     }
716 
717     @Override
718     public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
719       //To change body of implemented methods use File | Settings | File Templates.
720       increments++;
721     }
722   }
723 
724   private static final byte[] UNSPECIFIED_REGION = new byte[]{};
725 
726 }