View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import org.apache.hadoop.conf.Configuration;
22  import org.apache.hadoop.fs.FileSystem;
23  import org.apache.hadoop.fs.Path;
24  import org.apache.hadoop.hbase.HBaseTestingUtility;
25  import org.apache.hadoop.hbase.HConstants;
26  import org.apache.hadoop.hbase.HRegionInfo;
27  import org.apache.hadoop.hbase.HTableDescriptor;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.LargeTests;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.regionserver.wal.HLog;
32  import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
33  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
34  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
35  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hdfs.MiniDFSCluster;
38  import org.junit.After;
39  import org.junit.AfterClass;
40  import org.junit.Before;
41  import org.junit.BeforeClass;
42  import org.junit.Test;
43  import org.junit.experimental.categories.Category;
44  import org.junit.runner.RunWith;
45  import org.junit.runners.Parameterized;
46  import org.junit.runners.Parameterized.Parameters;
47  
48  import static org.junit.Assert.*;
49  
50  import java.io.EOFException;
51  import java.io.IOException;
52  import java.util.ArrayList;
53  import java.util.Collection;
54  import java.util.List;
55  import java.util.concurrent.atomic.AtomicLong;
56  
57  @Category(LargeTests.class)
58  @RunWith(Parameterized.class)
59  public class TestReplicationHLogReaderManager {
60  
61    private static HBaseTestingUtility TEST_UTIL;
62    private static Configuration conf;
63    private static Path hbaseDir;
64    private static FileSystem fs;
65    private static MiniDFSCluster cluster;
66    private static final TableName tableName = TableName.valueOf("tablename");
67    private static final byte [] family = Bytes.toBytes("column");
68    private static final byte [] qualifier = Bytes.toBytes("qualifier");
69    private static final HRegionInfo info = new HRegionInfo(tableName,
70        HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
71    private static final HTableDescriptor htd = new HTableDescriptor(tableName);
72  
73    private HLog log;
74    private ReplicationHLogReaderManager logManager;
75    private PathWatcher pathWatcher;
76    private int nbRows;
77    private int walEditKVs;
78    private final AtomicLong sequenceId = new AtomicLong(1);
79  
80    @Parameters
81    public static Collection<Object[]> parameters() {
82      // Try out different combinations of row count and KeyValue count
83      int[] NB_ROWS = { 1500, 60000 };
84      int[] NB_KVS = { 1, 100 };
85      // whether compression is used
86      Boolean[] BOOL_VALS = { false, true };
87      List<Object[]> parameters = new ArrayList<Object[]>();
88      for (int nbRows : NB_ROWS) {
89        for (int walEditKVs : NB_KVS) {
90          for (boolean b : BOOL_VALS) {
91            Object[] arr = new Object[3];
92            arr[0] = nbRows;
93            arr[1] = walEditKVs;
94            arr[2] = b;
95            parameters.add(arr);
96          }
97        }
98      }
99      return parameters;
100   }
101 
102   public TestReplicationHLogReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
103     this.nbRows = nbRows;
104     this.walEditKVs = walEditKVs;
105     TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
106       enableCompression);
107   }
108   
109   @BeforeClass
110   public static void setUpBeforeClass() throws Exception {
111     TEST_UTIL = new HBaseTestingUtility();
112     conf = TEST_UTIL.getConfiguration();
113     TEST_UTIL.startMiniDFSCluster(3);
114 
115     hbaseDir = TEST_UTIL.createRootDir();
116     cluster = TEST_UTIL.getDFSCluster();
117     fs = cluster.getFileSystem();
118   }
119 
120   @AfterClass
121   public static void tearDownAfterClass() throws Exception {
122     TEST_UTIL.shutdownMiniCluster();
123   }
124 
125   @Before
126   public void setUp() throws Exception {
127     logManager = new ReplicationHLogReaderManager(fs, conf);
128     List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
129     pathWatcher = new PathWatcher();
130     listeners.add(pathWatcher);
131     log = HLogFactory.createHLog(fs, hbaseDir, "test", conf, listeners, "some server");
132   }
133 
134   @After
135   public void tearDown() throws Exception {
136     log.closeAndDelete();
137   }
138 
139   @Test
140   public void test() throws Exception {
141     // Grab the path that was generated when the log rolled as part of its creation
142     Path path = pathWatcher.currentPath;
143 
144     assertEquals(0, logManager.getPosition());
145 
146     appendToLog();
147 
148     // There's one edit in the log, read it. Reading past it needs to return nulls
149     assertNotNull(logManager.openReader(path));
150     logManager.seek();
151     HLog.Entry entry = logManager.readNextAndSetPosition();
152     assertNotNull(entry);
153     entry = logManager.readNextAndSetPosition();
154     assertNull(entry);
155     logManager.closeReader();
156     long oldPos = logManager.getPosition();
157 
158     appendToLog();
159 
160     // Read the newly added entry, make sure we made progress
161     assertNotNull(logManager.openReader(path));
162     logManager.seek();
163     entry = logManager.readNextAndSetPosition();
164     assertNotEquals(oldPos, logManager.getPosition());
165     assertNotNull(entry);
166     logManager.closeReader();
167     oldPos = logManager.getPosition();
168 
169     log.rollWriter();
170 
171     // We rolled but we still should see the end of the first log and not get data
172     assertNotNull(logManager.openReader(path));
173     logManager.seek();
174     entry = logManager.readNextAndSetPosition();
175     assertEquals(oldPos, logManager.getPosition());
176     assertNull(entry);
177     logManager.finishCurrentFile();
178 
179     path = pathWatcher.currentPath;
180 
181     for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
182     log.rollWriter();
183     logManager.openReader(path);
184     logManager.seek();
185     for (int i = 0; i < nbRows; i++) {
186       HLog.Entry e = logManager.readNextAndSetPosition();
187       if (e == null) {
188         fail("Should have enough entries");
189       }
190     }
191   }
192 
193   private void appendToLog() throws IOException {
194     appendToLogPlus(1);
195   }
196 
197   private void appendToLogPlus(int count) throws IOException {
198     log.append(info, tableName, getWALEdits(count), System.currentTimeMillis(), htd, sequenceId);
199   }
200 
201   private WALEdit getWALEdits(int count) {
202     WALEdit edit = new WALEdit();
203     for (int i = 0; i < count; i++) {
204       edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
205         System.currentTimeMillis(), qualifier));
206     }
207     return edit;
208   }
209 
210   class PathWatcher implements WALActionsListener {
211 
212     Path currentPath;
213 
214     @Override
215     public void preLogRoll(Path oldPath, Path newPath) throws IOException {
216       currentPath = newPath;
217     }
218 
219     @Override
220     public void postLogRoll(Path oldPath, Path newPath) throws IOException {}
221 
222     @Override
223     public void preLogArchive(Path oldPath, Path newPath) throws IOException {}
224 
225     @Override
226     public void postLogArchive(Path oldPath, Path newPath) throws IOException {}
227 
228     @Override
229     public void logRollRequested() {}
230 
231     @Override
232     public void logCloseRequested() {}
233 
234     @Override
235     public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {}
236 
237     @Override
238     public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {}
239   }
240 }