View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  import static org.junit.Assert.fail;
23  
24  import java.io.ByteArrayOutputStream;
25  import java.io.PrintStream;
26  import java.util.ArrayList;
27  import java.util.List;
28  
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.CellUtil;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.HBaseTestingUtility;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.LargeTests;
37  import org.apache.hadoop.hbase.MiniHBaseCluster;
38  import org.apache.hadoop.hbase.client.Delete;
39  import org.apache.hadoop.hbase.client.Get;
40  import org.apache.hadoop.hbase.client.HTable;
41  import org.apache.hadoop.hbase.client.Put;
42  import org.apache.hadoop.hbase.client.Result;
43  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
44  import org.apache.hadoop.hbase.mapreduce.WALPlayer.HLogKeyValueMapper;
45  import org.apache.hadoop.hbase.regionserver.wal.HLog;
46  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
47  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.util.LauncherSecurityManager;
50  import org.apache.hadoop.mapreduce.Mapper;
51  import org.apache.hadoop.mapreduce.Mapper.Context;
52  import org.junit.AfterClass;
53  import org.junit.BeforeClass;
54  import org.junit.Test;
55  import org.junit.experimental.categories.Category;
56  import org.mockito.invocation.InvocationOnMock;
57  import org.mockito.stubbing.Answer;
58  
59  import static org.mockito.Matchers.any;
60  import static org.mockito.Mockito.*;
61  
62  /**
63   * Basic test for the WALPlayer M/R tool
64   */
65  @Category(LargeTests.class)
66  public class TestWALPlayer {
67    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
68    private static MiniHBaseCluster cluster;
69  
70    @BeforeClass
71    public static void beforeClass() throws Exception {
72      cluster = TEST_UTIL.startMiniCluster();
73      TEST_UTIL.startMiniMapReduceCluster();
74    }
75  
76    @AfterClass
77    public static void afterClass() throws Exception {
78      TEST_UTIL.shutdownMiniMapReduceCluster();
79      TEST_UTIL.shutdownMiniCluster();
80    }
81  
82    /**
83     * Simple end-to-end test
84     * @throws Exception
85     */
86    @Test
87    public void testWALPlayer() throws Exception {
88      final byte[] TABLENAME1 = Bytes.toBytes("testWALPlayer1");
89      final byte[] TABLENAME2 = Bytes.toBytes("testWALPlayer2");
90      final byte[] FAMILY = Bytes.toBytes("family");
91      final byte[] COLUMN1 = Bytes.toBytes("c1");
92      final byte[] COLUMN2 = Bytes.toBytes("c2");
93      final byte[] ROW = Bytes.toBytes("row");
94      HTable t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY);
95      HTable t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY);
96  
97      // put a row into the first table
98      Put p = new Put(ROW);
99      p.add(FAMILY, COLUMN1, COLUMN1);
100     p.add(FAMILY, COLUMN2, COLUMN2);
101     t1.put(p);
102     // delete one column
103     Delete d = new Delete(ROW);
104     d.deleteColumns(FAMILY, COLUMN1);
105     t1.delete(d);
106 
107     // replay the WAL, map table 1 to table 2
108     HLog log = cluster.getRegionServer(0).getWAL();
109     log.rollWriter();
110     String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
111         .getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
112 
113     Configuration configuration= TEST_UTIL.getConfiguration();
114     WALPlayer player = new WALPlayer(configuration);
115     String optionName="_test_.name";
116     configuration.set(optionName, "1000");
117     player.setupTime(configuration, optionName);
118     assertEquals(1000,configuration.getLong(optionName,0));
119     assertEquals(0, player.run(new String[] { walInputDir, Bytes.toString(TABLENAME1),
120         Bytes.toString(TABLENAME2) }));
121 
122     
123     // verify the WAL was player into table 2
124     Get g = new Get(ROW);
125     Result r = t2.get(g);
126     assertEquals(1, r.size());
127     assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2));
128   }
129 
130   /**
131    * Test HLogKeyValueMapper setup and map
132    */
133   @Test
134   public void testHLogKeyValueMapper() throws Exception {
135     Configuration configuration = new Configuration();
136     configuration.set(WALPlayer.TABLES_KEY, "table");
137     HLogKeyValueMapper mapper = new HLogKeyValueMapper();
138     HLogKey key = mock(HLogKey.class);
139     when(key.getTablename()).thenReturn(TableName.valueOf("table"));
140     @SuppressWarnings("unchecked")
141     Mapper<HLogKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context =
142         mock(Context.class);
143     when(context.getConfiguration()).thenReturn(configuration);
144 
145     WALEdit value = mock(WALEdit.class);
146     ArrayList<KeyValue> values = new ArrayList<KeyValue>();
147     KeyValue kv1 = mock(KeyValue.class);
148     when(kv1.getFamily()).thenReturn(Bytes.toBytes("family"));
149     when(kv1.getRow()).thenReturn(Bytes.toBytes("row"));
150     values.add(kv1);
151     when(value.getKeyValues()).thenReturn(values);
152     mapper.setup(context);
153 
154     doAnswer(new Answer<Void>() {
155 
156       @Override
157       public Void answer(InvocationOnMock invocation) throws Throwable {
158         ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
159         KeyValue key = (KeyValue) invocation.getArguments()[1];
160         assertEquals("row", Bytes.toString(writer.get()));
161         assertEquals("row", Bytes.toString(key.getRow()));
162         return null;
163       }
164     }).when(context).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
165 
166     mapper.map(key, value, context);
167 
168   }
169 
170   /**
171    * Test main method
172    */
173   @Test
174   public void testMainMethod() throws Exception {
175 
176     PrintStream oldPrintStream = System.err;
177     SecurityManager SECURITY_MANAGER = System.getSecurityManager();
178     LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
179     System.setSecurityManager(newSecurityManager);
180     ByteArrayOutputStream data = new ByteArrayOutputStream();
181     String[] args = {};
182     System.setErr(new PrintStream(data));
183     try {
184       System.setErr(new PrintStream(data));
185       try {
186         WALPlayer.main(args);
187         fail("should be SecurityException");
188       } catch (SecurityException e) {
189         assertEquals(-1, newSecurityManager.getExitCode());
190         assertTrue(data.toString().contains("ERROR: Wrong number of arguments:"));
191         assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" +
192             " <tables> [<tableMappings>]"));
193         assertTrue(data.toString().contains("-Dhlog.bulk.output=/path/for/output"));
194       }
195 
196     } finally {
197       System.setErr(oldPrintStream);
198       System.setSecurityManager(SECURITY_MANAGER);
199     }
200 
201   }
202 
203 }