1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.fs.FileSystem;
23 import org.apache.hadoop.fs.Path;
24 import org.apache.hadoop.hbase.HBaseTestingUtility;
25 import org.apache.hadoop.hbase.HConstants;
26 import org.apache.hadoop.hbase.HRegionInfo;
27 import org.apache.hadoop.hbase.HTableDescriptor;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.LargeTests;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.regionserver.wal.HLog;
32 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
33 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
34 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
35 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hdfs.MiniDFSCluster;
38 import org.junit.After;
39 import org.junit.AfterClass;
40 import org.junit.Before;
41 import org.junit.BeforeClass;
42 import org.junit.Test;
43 import org.junit.experimental.categories.Category;
44 import org.junit.runner.RunWith;
45 import org.junit.runners.Parameterized;
46 import org.junit.runners.Parameterized.Parameters;
47
48 import static org.junit.Assert.*;
49
50 import java.io.EOFException;
51 import java.io.IOException;
52 import java.util.ArrayList;
53 import java.util.Collection;
54 import java.util.List;
55 import java.util.concurrent.atomic.AtomicLong;
56
57 @Category(LargeTests.class)
58 @RunWith(Parameterized.class)
59 public class TestReplicationHLogReaderManager {
60
61 private static HBaseTestingUtility TEST_UTIL;
62 private static Configuration conf;
63 private static Path hbaseDir;
64 private static FileSystem fs;
65 private static MiniDFSCluster cluster;
66 private static final TableName tableName = TableName.valueOf("tablename");
67 private static final byte [] family = Bytes.toBytes("column");
68 private static final byte [] qualifier = Bytes.toBytes("qualifier");
69 private static final HRegionInfo info = new HRegionInfo(tableName,
70 HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
71 private static final HTableDescriptor htd = new HTableDescriptor(tableName);
72
73 private HLog log;
74 private ReplicationHLogReaderManager logManager;
75 private PathWatcher pathWatcher;
76 private int nbRows;
77 private int walEditKVs;
78 private final AtomicLong sequenceId = new AtomicLong(1);
79
80 @Parameters
81 public static Collection<Object[]> parameters() {
82
83 int[] NB_ROWS = { 1500, 60000 };
84 int[] NB_KVS = { 1, 100 };
85
86 Boolean[] BOOL_VALS = { false, true };
87 List<Object[]> parameters = new ArrayList<Object[]>();
88 for (int nbRows : NB_ROWS) {
89 for (int walEditKVs : NB_KVS) {
90 for (boolean b : BOOL_VALS) {
91 Object[] arr = new Object[3];
92 arr[0] = nbRows;
93 arr[1] = walEditKVs;
94 arr[2] = b;
95 parameters.add(arr);
96 }
97 }
98 }
99 return parameters;
100 }
101
102 public TestReplicationHLogReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
103 this.nbRows = nbRows;
104 this.walEditKVs = walEditKVs;
105 TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
106 enableCompression);
107 }
108
109 @BeforeClass
110 public static void setUpBeforeClass() throws Exception {
111 TEST_UTIL = new HBaseTestingUtility();
112 conf = TEST_UTIL.getConfiguration();
113 TEST_UTIL.startMiniDFSCluster(3);
114
115 hbaseDir = TEST_UTIL.createRootDir();
116 cluster = TEST_UTIL.getDFSCluster();
117 fs = cluster.getFileSystem();
118 }
119
120 @AfterClass
121 public static void tearDownAfterClass() throws Exception {
122 TEST_UTIL.shutdownMiniCluster();
123 }
124
125 @Before
126 public void setUp() throws Exception {
127 logManager = new ReplicationHLogReaderManager(fs, conf);
128 List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
129 pathWatcher = new PathWatcher();
130 listeners.add(pathWatcher);
131 log = HLogFactory.createHLog(fs, hbaseDir, "test", conf, listeners, "some server");
132 }
133
134 @After
135 public void tearDown() throws Exception {
136 log.closeAndDelete();
137 }
138
139 @Test
140 public void test() throws Exception {
141
142 Path path = pathWatcher.currentPath;
143
144 assertEquals(0, logManager.getPosition());
145
146 appendToLog();
147
148
149 assertNotNull(logManager.openReader(path));
150 logManager.seek();
151 HLog.Entry entry = logManager.readNextAndSetPosition();
152 assertNotNull(entry);
153 entry = logManager.readNextAndSetPosition();
154 assertNull(entry);
155 logManager.closeReader();
156 long oldPos = logManager.getPosition();
157
158 appendToLog();
159
160
161 assertNotNull(logManager.openReader(path));
162 logManager.seek();
163 entry = logManager.readNextAndSetPosition();
164 assertNotEquals(oldPos, logManager.getPosition());
165 assertNotNull(entry);
166 logManager.closeReader();
167 oldPos = logManager.getPosition();
168
169 log.rollWriter();
170
171
172 assertNotNull(logManager.openReader(path));
173 logManager.seek();
174 entry = logManager.readNextAndSetPosition();
175 assertEquals(oldPos, logManager.getPosition());
176 assertNull(entry);
177 logManager.finishCurrentFile();
178
179 path = pathWatcher.currentPath;
180
181 for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
182 log.rollWriter();
183 logManager.openReader(path);
184 logManager.seek();
185 for (int i = 0; i < nbRows; i++) {
186 HLog.Entry e = logManager.readNextAndSetPosition();
187 if (e == null) {
188 fail("Should have enough entries");
189 }
190 }
191 }
192
193 private void appendToLog() throws IOException {
194 appendToLogPlus(1);
195 }
196
197 private void appendToLogPlus(int count) throws IOException {
198 log.append(info, tableName, getWALEdits(count), System.currentTimeMillis(), htd, sequenceId);
199 }
200
201 private WALEdit getWALEdits(int count) {
202 WALEdit edit = new WALEdit();
203 for (int i = 0; i < count; i++) {
204 edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
205 System.currentTimeMillis(), qualifier));
206 }
207 return edit;
208 }
209
210 class PathWatcher implements WALActionsListener {
211
212 Path currentPath;
213
214 @Override
215 public void preLogRoll(Path oldPath, Path newPath) throws IOException {
216 currentPath = newPath;
217 }
218
219 @Override
220 public void postLogRoll(Path oldPath, Path newPath) throws IOException {}
221
222 @Override
223 public void preLogArchive(Path oldPath, Path newPath) throws IOException {}
224
225 @Override
226 public void postLogArchive(Path oldPath, Path newPath) throws IOException {}
227
228 @Override
229 public void logRollRequested() {}
230
231 @Override
232 public void logCloseRequested() {}
233
234 @Override
235 public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {}
236
237 @Override
238 public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {}
239 }
240 }