View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.coprocessor;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.fs.FileSystem;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.*;
28  import org.apache.hadoop.hbase.regionserver.HRegion;
29  import org.apache.hadoop.hbase.client.Put;
30  import org.apache.hadoop.hbase.regionserver.wal.HLog;
31  import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
32  import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
33  import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
34  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
35  import org.apache.hadoop.hbase.security.User;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.FSUtils;
38  import org.apache.hadoop.hbase.util.EnvironmentEdge;
39  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40  import org.junit.After;
41  import org.junit.AfterClass;
42  import org.junit.Before;
43  import org.junit.BeforeClass;
44  import org.junit.Test;
45  import org.junit.experimental.categories.Category;
46  
47  import java.io.IOException;
48  import java.security.PrivilegedExceptionAction;
49  import java.util.Arrays;
50  import java.util.List;
51  import java.util.Map;
52  import java.util.concurrent.atomic.AtomicLong;
53  
54  import static org.junit.Assert.*;
55  
56  /**
57   * Tests invocation of the
58   * {@link org.apache.hadoop.hbase.coprocessor.MasterObserver} interface hooks at
59   * all appropriate times during normal HMaster operations.
60   */
61  @Category(MediumTests.class)
62  public class TestWALObserver {
63    private static final Log LOG = LogFactory.getLog(TestWALObserver.class);
64    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
65  
66    private static byte[] TEST_TABLE = Bytes.toBytes("observedTable");
67    private static byte[][] TEST_FAMILY = { Bytes.toBytes("fam1"),
68        Bytes.toBytes("fam2"), Bytes.toBytes("fam3"), };
69    private static byte[][] TEST_QUALIFIER = { Bytes.toBytes("q1"),
70        Bytes.toBytes("q2"), Bytes.toBytes("q3"), };
71    private static byte[][] TEST_VALUE = { Bytes.toBytes("v1"),
72        Bytes.toBytes("v2"), Bytes.toBytes("v3"), };
73    private static byte[] TEST_ROW = Bytes.toBytes("testRow");
74  
75    private Configuration conf;
76    private FileSystem fs;
77    private Path dir;
78    private Path hbaseRootDir;
79    private String logName;
80    private Path oldLogDir;
81    private Path logDir;
82  
83    @BeforeClass
84    public static void setupBeforeClass() throws Exception {
85      Configuration conf = TEST_UTIL.getConfiguration();
86      conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
87          SampleRegionWALObserver.class.getName());
88      conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
89          SampleRegionWALObserver.class.getName());
90      conf.setBoolean("dfs.support.append", true);
91      conf.setInt("dfs.client.block.recovery.retries", 2);
92  
93      TEST_UTIL.startMiniCluster(1);
94      Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem()
95          .makeQualified(new Path("/hbase"));
96      LOG.info("hbase.rootdir=" + hbaseRootDir);
97      FSUtils.setRootDir(conf, hbaseRootDir);
98    }
99  
100   @AfterClass
101   public static void teardownAfterClass() throws Exception {
102     TEST_UTIL.shutdownMiniCluster();
103   }
104 
105   @Before
106   public void setUp() throws Exception {
107     this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
108     // this.cluster = TEST_UTIL.getDFSCluster();
109     this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
110     this.hbaseRootDir = FSUtils.getRootDir(conf);
111     this.dir = new Path(this.hbaseRootDir, TestWALObserver.class.getName());
112     this.oldLogDir = new Path(this.hbaseRootDir,
113         HConstants.HREGION_OLDLOGDIR_NAME);
114     this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME);
115     this.logName = HConstants.HREGION_LOGDIR_NAME;
116 
117     if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
118       TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
119     }
120   }
121 
122   @After
123   public void tearDown() throws Exception {
124     TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
125   }
126 
127   /**
128    * Test WAL write behavior with WALObserver. The coprocessor monitors a
129    * WALEdit written to WAL, and ignore, modify, and add KeyValue's for the
130    * WALEdit.
131    */
132   @Test
133   public void testWALObserverWriteToWAL() throws Exception {
134 
135     HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
136     final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
137         .toString(TEST_TABLE));
138 
139     Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
140     deleteDir(basedir);
141     fs.mkdirs(new Path(basedir, hri.getEncodedName()));
142     final AtomicLong sequenceId = new AtomicLong(0);
143 
144     HLog log = HLogFactory.createHLog(this.fs, hbaseRootDir,
145         TestWALObserver.class.getName(), this.conf);
146     SampleRegionWALObserver cp = getCoprocessor(log);
147 
148     // TEST_FAMILY[0] shall be removed from WALEdit.
149     // TEST_FAMILY[1] value shall be changed.
150     // TEST_FAMILY[2] shall be added to WALEdit, although it's not in the put.
151     cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0],
152         TEST_FAMILY[1], TEST_QUALIFIER[1], TEST_FAMILY[2], TEST_QUALIFIER[2]);
153 
154     assertFalse(cp.isPreWALWriteCalled());
155     assertFalse(cp.isPostWALWriteCalled());
156 
157     // TEST_FAMILY[2] is not in the put, however it shall be added by the tested
158     // coprocessor.
159     // Use a Put to create familyMap.
160     Put p = creatPutWith2Families(TEST_ROW);
161 
162     Map<byte[], List<Cell>> familyMap = p.getFamilyCellMap();
163     WALEdit edit = new WALEdit();
164     addFamilyMapToWALEdit(familyMap, edit);
165 
166     boolean foundFamily0 = false;
167     boolean foundFamily2 = false;
168     boolean modifiedFamily1 = false;
169 
170     List<KeyValue> kvs = edit.getKeyValues();
171 
172     for (KeyValue kv : kvs) {
173       if (Arrays.equals(kv.getFamily(), TEST_FAMILY[0])) {
174         foundFamily0 = true;
175       }
176       if (Arrays.equals(kv.getFamily(), TEST_FAMILY[2])) {
177         foundFamily2 = true;
178       }
179       if (Arrays.equals(kv.getFamily(), TEST_FAMILY[1])) {
180         if (!Arrays.equals(kv.getValue(), TEST_VALUE[1])) {
181           modifiedFamily1 = true;
182         }
183       }
184     }
185     assertTrue(foundFamily0);
186     assertFalse(foundFamily2);
187     assertFalse(modifiedFamily1);
188 
189     // it's where WAL write cp should occur.
190     long now = EnvironmentEdgeManager.currentTimeMillis();
191     log.append(hri, hri.getTable(), edit, now, htd, sequenceId);
192 
193     // the edit shall have been change now by the coprocessor.
194     foundFamily0 = false;
195     foundFamily2 = false;
196     modifiedFamily1 = false;
197     for (KeyValue kv : kvs) {
198       if (Arrays.equals(kv.getFamily(), TEST_FAMILY[0])) {
199         foundFamily0 = true;
200       }
201       if (Arrays.equals(kv.getFamily(), TEST_FAMILY[2])) {
202         foundFamily2 = true;
203       }
204       if (Arrays.equals(kv.getFamily(), TEST_FAMILY[1])) {
205         if (!Arrays.equals(kv.getValue(), TEST_VALUE[1])) {
206           modifiedFamily1 = true;
207         }
208       }
209     }
210     assertFalse(foundFamily0);
211     assertTrue(foundFamily2);
212     assertTrue(modifiedFamily1);
213 
214     assertTrue(cp.isPreWALWriteCalled());
215     assertTrue(cp.isPostWALWriteCalled());
216   }
217 
218   /**
219    * Test WAL replay behavior with WALObserver.
220    */
221   @Test
222   public void testWALCoprocessorReplay() throws Exception {
223     // WAL replay is handled at HRegion::replayRecoveredEdits(), which is
224     // ultimately called by HRegion::initialize()
225     TableName tableName = TableName.valueOf("testWALCoprocessorReplay");
226     final HTableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName);
227     final AtomicLong sequenceId = new AtomicLong(0);
228     // final HRegionInfo hri =
229     // createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
230     // final HRegionInfo hri1 =
231     // createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
232     final HRegionInfo hri = new HRegionInfo(tableName, null, null);
233 
234     final Path basedir =
235         FSUtils.getTableDir(this.hbaseRootDir, tableName);
236     deleteDir(basedir);
237     fs.mkdirs(new Path(basedir, hri.getEncodedName()));
238 
239     final Configuration newConf = HBaseConfiguration.create(this.conf);
240 
241     // HLog wal = new HLog(this.fs, this.dir, this.oldLogDir, this.conf);
242     HLog wal = createWAL(this.conf);
243     // Put p = creatPutWith2Families(TEST_ROW);
244     WALEdit edit = new WALEdit();
245     long now = EnvironmentEdgeManager.currentTimeMillis();
246     // addFamilyMapToWALEdit(p.getFamilyMap(), edit);
247     final int countPerFamily = 1000;
248     // for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
249     for (HColumnDescriptor hcd : htd.getFamilies()) {
250       // addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
251       // EnvironmentEdgeManager.getDelegate(), wal);
252       addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
253           EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
254     }
255     wal.append(hri, tableName, edit, now, htd, sequenceId);
256     // sync to fs.
257     wal.sync();
258 
259     User user = HBaseTestingUtility.getDifferentUser(newConf,
260         ".replay.wal.secondtime");
261     user.runAs(new PrivilegedExceptionAction() {
262       public Object run() throws Exception {
263         Path p = runWALSplit(newConf);
264         LOG.info("WALSplit path == " + p);
265         FileSystem newFS = FileSystem.get(newConf);
266         // Make a new wal for new region open.
267         HLog wal2 = createWAL(newConf);
268         HRegion region = HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir,
269             hri, htd, wal2, TEST_UTIL.getHBaseCluster().getRegionServer(0), null);
270         long seqid2 = region.getOpenSeqNum();
271 
272         SampleRegionWALObserver cp2 =
273           (SampleRegionWALObserver)region.getCoprocessorHost().findCoprocessor(
274               SampleRegionWALObserver.class.getName());
275         // TODO: asserting here is problematic.
276         assertNotNull(cp2);
277         assertTrue(cp2.isPreWALRestoreCalled());
278         assertTrue(cp2.isPostWALRestoreCalled());
279         region.close();
280         wal2.closeAndDelete();
281         return null;
282       }
283     });
284   }
285 
286   /**
287    * Test to see CP loaded successfully or not. There is a duplication at
288    * TestHLog, but the purpose of that one is to see whether the loaded CP will
289    * impact existing HLog tests or not.
290    */
291   @Test
292   public void testWALObserverLoaded() throws Exception {
293     HLog log = HLogFactory.createHLog(fs, hbaseRootDir,
294         TestWALObserver.class.getName(), conf);
295     assertNotNull(getCoprocessor(log));
296   }
297 
298   private SampleRegionWALObserver getCoprocessor(HLog wal) throws Exception {
299     WALCoprocessorHost host = wal.getCoprocessorHost();
300     Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class
301         .getName());
302     return (SampleRegionWALObserver) c;
303   }
304 
305   /*
306    * Creates an HRI around an HTD that has <code>tableName</code> and three
307    * column families named.
308    * 
309    * @param tableName Name of table to use when we create HTableDescriptor.
310    */
311   private HRegionInfo createBasic3FamilyHRegionInfo(final String tableName) {
312     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
313 
314     for (int i = 0; i < TEST_FAMILY.length; i++) {
315       HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]);
316       htd.addFamily(a);
317     }
318     return new HRegionInfo(htd.getTableName(), null, null, false);
319   }
320 
321   /*
322    * @param p Directory to cleanup
323    */
324   private void deleteDir(final Path p) throws IOException {
325     if (this.fs.exists(p)) {
326       if (!this.fs.delete(p, true)) {
327         throw new IOException("Failed remove of " + p);
328       }
329     }
330   }
331 
332   private Put creatPutWith2Families(byte[] row) throws IOException {
333     Put p = new Put(row);
334     for (int i = 0; i < TEST_FAMILY.length - 1; i++) {
335       p.add(TEST_FAMILY[i], TEST_QUALIFIER[i], TEST_VALUE[i]);
336     }
337     return p;
338   }
339 
340   /**
341    * Copied from HRegion.
342    * 
343    * @param familyMap
344    *          map of family->edits
345    * @param walEdit
346    *          the destination entry to append into
347    */
348   private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
349       WALEdit walEdit) {
350     for (List<Cell> edits : familyMap.values()) {
351       for (Cell cell : edits) {
352         // KeyValue v1 expectation. Cast for now until we go all Cell all the time. TODO.
353         walEdit.add((KeyValue)cell);
354       }
355     }
356   }
357 
358   private Path runWALSplit(final Configuration c) throws IOException {
359     List<Path> splits = HLogSplitter.split(
360       hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c);
361     // Split should generate only 1 file since there's only 1 region
362     assertEquals(1, splits.size());
363     // Make sure the file exists
364     assertTrue(fs.exists(splits.get(0)));
365     LOG.info("Split file=" + splits.get(0));
366     return splits.get(0);
367   }
368 
369   private HLog createWAL(final Configuration c) throws IOException {
370     return HLogFactory.createHLog(FileSystem.get(c), hbaseRootDir, logName, c);
371   }
372 
373   private void addWALEdits(final TableName tableName, final HRegionInfo hri,
374       final byte[] rowName, final byte[] family, final int count,
375       EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd, final AtomicLong sequenceId)
376       throws IOException {
377     String familyStr = Bytes.toString(family);
378     for (int j = 0; j < count; j++) {
379       byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
380       byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
381       WALEdit edit = new WALEdit();
382       edit.add(new KeyValue(rowName, family, qualifierBytes, ee
383           .currentTimeMillis(), columnBytes));
384       wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd, sequenceId);
385     }
386   }
387 
388   private HTableDescriptor getBasic3FamilyHTableDescriptor(
389       final TableName tableName) {
390     HTableDescriptor htd = new HTableDescriptor(tableName);
391 
392     for (int i = 0; i < TEST_FAMILY.length; i++) {
393       HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]);
394       htd.addFamily(a);
395     }
396     return htd;
397   }
398 
399   private HTableDescriptor createBasic3FamilyHTD(final String tableName) {
400     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
401     HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
402     htd.addFamily(a);
403     HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
404     htd.addFamily(b);
405     HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
406     htd.addFamily(c);
407     return htd;
408   }
409 
410 }