1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.coprocessor;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.FileSystem;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.*;
28 import org.apache.hadoop.hbase.regionserver.HRegion;
29 import org.apache.hadoop.hbase.client.Put;
30 import org.apache.hadoop.hbase.regionserver.wal.HLog;
31 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
32 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
33 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
34 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
35 import org.apache.hadoop.hbase.security.User;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.FSUtils;
38 import org.apache.hadoop.hbase.util.EnvironmentEdge;
39 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40 import org.junit.After;
41 import org.junit.AfterClass;
42 import org.junit.Before;
43 import org.junit.BeforeClass;
44 import org.junit.Test;
45 import org.junit.experimental.categories.Category;
46
47 import java.io.IOException;
48 import java.security.PrivilegedExceptionAction;
49 import java.util.Arrays;
50 import java.util.List;
51 import java.util.Map;
52 import java.util.concurrent.atomic.AtomicLong;
53
54 import static org.junit.Assert.*;
55
56
57
58
59
60
61 @Category(MediumTests.class)
62 public class TestWALObserver {
63 private static final Log LOG = LogFactory.getLog(TestWALObserver.class);
64 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
65
66 private static byte[] TEST_TABLE = Bytes.toBytes("observedTable");
67 private static byte[][] TEST_FAMILY = { Bytes.toBytes("fam1"),
68 Bytes.toBytes("fam2"), Bytes.toBytes("fam3"), };
69 private static byte[][] TEST_QUALIFIER = { Bytes.toBytes("q1"),
70 Bytes.toBytes("q2"), Bytes.toBytes("q3"), };
71 private static byte[][] TEST_VALUE = { Bytes.toBytes("v1"),
72 Bytes.toBytes("v2"), Bytes.toBytes("v3"), };
73 private static byte[] TEST_ROW = Bytes.toBytes("testRow");
74
75 private Configuration conf;
76 private FileSystem fs;
77 private Path dir;
78 private Path hbaseRootDir;
79 private String logName;
80 private Path oldLogDir;
81 private Path logDir;
82
83 @BeforeClass
84 public static void setupBeforeClass() throws Exception {
85 Configuration conf = TEST_UTIL.getConfiguration();
86 conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
87 SampleRegionWALObserver.class.getName());
88 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
89 SampleRegionWALObserver.class.getName());
90 conf.setBoolean("dfs.support.append", true);
91 conf.setInt("dfs.client.block.recovery.retries", 2);
92
93 TEST_UTIL.startMiniCluster(1);
94 Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem()
95 .makeQualified(new Path("/hbase"));
96 LOG.info("hbase.rootdir=" + hbaseRootDir);
97 FSUtils.setRootDir(conf, hbaseRootDir);
98 }
99
100 @AfterClass
101 public static void teardownAfterClass() throws Exception {
102 TEST_UTIL.shutdownMiniCluster();
103 }
104
105 @Before
106 public void setUp() throws Exception {
107 this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
108
109 this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
110 this.hbaseRootDir = FSUtils.getRootDir(conf);
111 this.dir = new Path(this.hbaseRootDir, TestWALObserver.class.getName());
112 this.oldLogDir = new Path(this.hbaseRootDir,
113 HConstants.HREGION_OLDLOGDIR_NAME);
114 this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME);
115 this.logName = HConstants.HREGION_LOGDIR_NAME;
116
117 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
118 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
119 }
120 }
121
122 @After
123 public void tearDown() throws Exception {
124 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
125 }
126
127
128
129
130
131
132 @Test
133 public void testWALObserverWriteToWAL() throws Exception {
134
135 HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
136 final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
137 .toString(TEST_TABLE));
138
139 Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
140 deleteDir(basedir);
141 fs.mkdirs(new Path(basedir, hri.getEncodedName()));
142 final AtomicLong sequenceId = new AtomicLong(0);
143
144 HLog log = HLogFactory.createHLog(this.fs, hbaseRootDir,
145 TestWALObserver.class.getName(), this.conf);
146 SampleRegionWALObserver cp = getCoprocessor(log);
147
148
149
150
151 cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0],
152 TEST_FAMILY[1], TEST_QUALIFIER[1], TEST_FAMILY[2], TEST_QUALIFIER[2]);
153
154 assertFalse(cp.isPreWALWriteCalled());
155 assertFalse(cp.isPostWALWriteCalled());
156
157
158
159
160 Put p = creatPutWith2Families(TEST_ROW);
161
162 Map<byte[], List<Cell>> familyMap = p.getFamilyCellMap();
163 WALEdit edit = new WALEdit();
164 addFamilyMapToWALEdit(familyMap, edit);
165
166 boolean foundFamily0 = false;
167 boolean foundFamily2 = false;
168 boolean modifiedFamily1 = false;
169
170 List<KeyValue> kvs = edit.getKeyValues();
171
172 for (KeyValue kv : kvs) {
173 if (Arrays.equals(kv.getFamily(), TEST_FAMILY[0])) {
174 foundFamily0 = true;
175 }
176 if (Arrays.equals(kv.getFamily(), TEST_FAMILY[2])) {
177 foundFamily2 = true;
178 }
179 if (Arrays.equals(kv.getFamily(), TEST_FAMILY[1])) {
180 if (!Arrays.equals(kv.getValue(), TEST_VALUE[1])) {
181 modifiedFamily1 = true;
182 }
183 }
184 }
185 assertTrue(foundFamily0);
186 assertFalse(foundFamily2);
187 assertFalse(modifiedFamily1);
188
189
190 long now = EnvironmentEdgeManager.currentTimeMillis();
191 log.append(hri, hri.getTable(), edit, now, htd, sequenceId);
192
193
194 foundFamily0 = false;
195 foundFamily2 = false;
196 modifiedFamily1 = false;
197 for (KeyValue kv : kvs) {
198 if (Arrays.equals(kv.getFamily(), TEST_FAMILY[0])) {
199 foundFamily0 = true;
200 }
201 if (Arrays.equals(kv.getFamily(), TEST_FAMILY[2])) {
202 foundFamily2 = true;
203 }
204 if (Arrays.equals(kv.getFamily(), TEST_FAMILY[1])) {
205 if (!Arrays.equals(kv.getValue(), TEST_VALUE[1])) {
206 modifiedFamily1 = true;
207 }
208 }
209 }
210 assertFalse(foundFamily0);
211 assertTrue(foundFamily2);
212 assertTrue(modifiedFamily1);
213
214 assertTrue(cp.isPreWALWriteCalled());
215 assertTrue(cp.isPostWALWriteCalled());
216 }
217
218
219
220
221 @Test
222 public void testWALCoprocessorReplay() throws Exception {
223
224
225 TableName tableName = TableName.valueOf("testWALCoprocessorReplay");
226 final HTableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName);
227 final AtomicLong sequenceId = new AtomicLong(0);
228
229
230
231
232 final HRegionInfo hri = new HRegionInfo(tableName, null, null);
233
234 final Path basedir =
235 FSUtils.getTableDir(this.hbaseRootDir, tableName);
236 deleteDir(basedir);
237 fs.mkdirs(new Path(basedir, hri.getEncodedName()));
238
239 final Configuration newConf = HBaseConfiguration.create(this.conf);
240
241
242 HLog wal = createWAL(this.conf);
243
244 WALEdit edit = new WALEdit();
245 long now = EnvironmentEdgeManager.currentTimeMillis();
246
247 final int countPerFamily = 1000;
248
249 for (HColumnDescriptor hcd : htd.getFamilies()) {
250
251
252 addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
253 EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
254 }
255 wal.append(hri, tableName, edit, now, htd, sequenceId);
256
257 wal.sync();
258
259 User user = HBaseTestingUtility.getDifferentUser(newConf,
260 ".replay.wal.secondtime");
261 user.runAs(new PrivilegedExceptionAction() {
262 public Object run() throws Exception {
263 Path p = runWALSplit(newConf);
264 LOG.info("WALSplit path == " + p);
265 FileSystem newFS = FileSystem.get(newConf);
266
267 HLog wal2 = createWAL(newConf);
268 HRegion region = HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir,
269 hri, htd, wal2, TEST_UTIL.getHBaseCluster().getRegionServer(0), null);
270 long seqid2 = region.getOpenSeqNum();
271
272 SampleRegionWALObserver cp2 =
273 (SampleRegionWALObserver)region.getCoprocessorHost().findCoprocessor(
274 SampleRegionWALObserver.class.getName());
275
276 assertNotNull(cp2);
277 assertTrue(cp2.isPreWALRestoreCalled());
278 assertTrue(cp2.isPostWALRestoreCalled());
279 region.close();
280 wal2.closeAndDelete();
281 return null;
282 }
283 });
284 }
285
286
287
288
289
290
291 @Test
292 public void testWALObserverLoaded() throws Exception {
293 HLog log = HLogFactory.createHLog(fs, hbaseRootDir,
294 TestWALObserver.class.getName(), conf);
295 assertNotNull(getCoprocessor(log));
296 }
297
298 private SampleRegionWALObserver getCoprocessor(HLog wal) throws Exception {
299 WALCoprocessorHost host = wal.getCoprocessorHost();
300 Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class
301 .getName());
302 return (SampleRegionWALObserver) c;
303 }
304
305
306
307
308
309
310
311 private HRegionInfo createBasic3FamilyHRegionInfo(final String tableName) {
312 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
313
314 for (int i = 0; i < TEST_FAMILY.length; i++) {
315 HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]);
316 htd.addFamily(a);
317 }
318 return new HRegionInfo(htd.getTableName(), null, null, false);
319 }
320
321
322
323
324 private void deleteDir(final Path p) throws IOException {
325 if (this.fs.exists(p)) {
326 if (!this.fs.delete(p, true)) {
327 throw new IOException("Failed remove of " + p);
328 }
329 }
330 }
331
332 private Put creatPutWith2Families(byte[] row) throws IOException {
333 Put p = new Put(row);
334 for (int i = 0; i < TEST_FAMILY.length - 1; i++) {
335 p.add(TEST_FAMILY[i], TEST_QUALIFIER[i], TEST_VALUE[i]);
336 }
337 return p;
338 }
339
340
341
342
343
344
345
346
347
348 private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
349 WALEdit walEdit) {
350 for (List<Cell> edits : familyMap.values()) {
351 for (Cell cell : edits) {
352
353 walEdit.add((KeyValue)cell);
354 }
355 }
356 }
357
358 private Path runWALSplit(final Configuration c) throws IOException {
359 List<Path> splits = HLogSplitter.split(
360 hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c);
361
362 assertEquals(1, splits.size());
363
364 assertTrue(fs.exists(splits.get(0)));
365 LOG.info("Split file=" + splits.get(0));
366 return splits.get(0);
367 }
368
369 private HLog createWAL(final Configuration c) throws IOException {
370 return HLogFactory.createHLog(FileSystem.get(c), hbaseRootDir, logName, c);
371 }
372
373 private void addWALEdits(final TableName tableName, final HRegionInfo hri,
374 final byte[] rowName, final byte[] family, final int count,
375 EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd, final AtomicLong sequenceId)
376 throws IOException {
377 String familyStr = Bytes.toString(family);
378 for (int j = 0; j < count; j++) {
379 byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
380 byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
381 WALEdit edit = new WALEdit();
382 edit.add(new KeyValue(rowName, family, qualifierBytes, ee
383 .currentTimeMillis(), columnBytes));
384 wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd, sequenceId);
385 }
386 }
387
388 private HTableDescriptor getBasic3FamilyHTableDescriptor(
389 final TableName tableName) {
390 HTableDescriptor htd = new HTableDescriptor(tableName);
391
392 for (int i = 0; i < TEST_FAMILY.length; i++) {
393 HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]);
394 htd.addFamily(a);
395 }
396 return htd;
397 }
398
399 private HTableDescriptor createBasic3FamilyHTD(final String tableName) {
400 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
401 HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
402 htd.addFamily(a);
403 HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
404 htd.addFamily(b);
405 HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
406 htd.addFamily(c);
407 return htd;
408 }
409
410 }