1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.PrintStream;
26 import java.util.ArrayList;
27 import java.util.List;
28
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.CellUtil;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.KeyValue;
36 import org.apache.hadoop.hbase.LargeTests;
37 import org.apache.hadoop.hbase.MiniHBaseCluster;
38 import org.apache.hadoop.hbase.client.Delete;
39 import org.apache.hadoop.hbase.client.Get;
40 import org.apache.hadoop.hbase.client.HTable;
41 import org.apache.hadoop.hbase.client.Put;
42 import org.apache.hadoop.hbase.client.Result;
43 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
44 import org.apache.hadoop.hbase.mapreduce.WALPlayer.HLogKeyValueMapper;
45 import org.apache.hadoop.hbase.regionserver.wal.HLog;
46 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
47 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.LauncherSecurityManager;
50 import org.apache.hadoop.mapreduce.Mapper;
51 import org.apache.hadoop.mapreduce.Mapper.Context;
52 import org.junit.AfterClass;
53 import org.junit.BeforeClass;
54 import org.junit.Test;
55 import org.junit.experimental.categories.Category;
56 import org.mockito.invocation.InvocationOnMock;
57 import org.mockito.stubbing.Answer;
58
59 import static org.mockito.Matchers.any;
60 import static org.mockito.Mockito.*;
61
62
63
64
65 @Category(LargeTests.class)
66 public class TestWALPlayer {
67 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
68 private static MiniHBaseCluster cluster;
69
70 @BeforeClass
71 public static void beforeClass() throws Exception {
72 cluster = TEST_UTIL.startMiniCluster();
73 TEST_UTIL.startMiniMapReduceCluster();
74 }
75
76 @AfterClass
77 public static void afterClass() throws Exception {
78 TEST_UTIL.shutdownMiniMapReduceCluster();
79 TEST_UTIL.shutdownMiniCluster();
80 }
81
82
83
84
85
86 @Test
87 public void testWALPlayer() throws Exception {
88 final byte[] TABLENAME1 = Bytes.toBytes("testWALPlayer1");
89 final byte[] TABLENAME2 = Bytes.toBytes("testWALPlayer2");
90 final byte[] FAMILY = Bytes.toBytes("family");
91 final byte[] COLUMN1 = Bytes.toBytes("c1");
92 final byte[] COLUMN2 = Bytes.toBytes("c2");
93 final byte[] ROW = Bytes.toBytes("row");
94 HTable t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY);
95 HTable t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY);
96
97
98 Put p = new Put(ROW);
99 p.add(FAMILY, COLUMN1, COLUMN1);
100 p.add(FAMILY, COLUMN2, COLUMN2);
101 t1.put(p);
102
103 Delete d = new Delete(ROW);
104 d.deleteColumns(FAMILY, COLUMN1);
105 t1.delete(d);
106
107
108 HLog log = cluster.getRegionServer(0).getWAL();
109 log.rollWriter();
110 String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
111 .getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
112
113 Configuration configuration= TEST_UTIL.getConfiguration();
114 WALPlayer player = new WALPlayer(configuration);
115 String optionName="_test_.name";
116 configuration.set(optionName, "1000");
117 player.setupTime(configuration, optionName);
118 assertEquals(1000,configuration.getLong(optionName,0));
119 assertEquals(0, player.run(new String[] { walInputDir, Bytes.toString(TABLENAME1),
120 Bytes.toString(TABLENAME2) }));
121
122
123
124 Get g = new Get(ROW);
125 Result r = t2.get(g);
126 assertEquals(1, r.size());
127 assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2));
128 }
129
130
131
132
133 @Test
134 public void testHLogKeyValueMapper() throws Exception {
135 Configuration configuration = new Configuration();
136 configuration.set(WALPlayer.TABLES_KEY, "table");
137 HLogKeyValueMapper mapper = new HLogKeyValueMapper();
138 HLogKey key = mock(HLogKey.class);
139 when(key.getTablename()).thenReturn(TableName.valueOf("table"));
140 @SuppressWarnings("unchecked")
141 Mapper<HLogKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context =
142 mock(Context.class);
143 when(context.getConfiguration()).thenReturn(configuration);
144
145 WALEdit value = mock(WALEdit.class);
146 ArrayList<KeyValue> values = new ArrayList<KeyValue>();
147 KeyValue kv1 = mock(KeyValue.class);
148 when(kv1.getFamily()).thenReturn(Bytes.toBytes("family"));
149 when(kv1.getRow()).thenReturn(Bytes.toBytes("row"));
150 values.add(kv1);
151 when(value.getKeyValues()).thenReturn(values);
152 mapper.setup(context);
153
154 doAnswer(new Answer<Void>() {
155
156 @Override
157 public Void answer(InvocationOnMock invocation) throws Throwable {
158 ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
159 KeyValue key = (KeyValue) invocation.getArguments()[1];
160 assertEquals("row", Bytes.toString(writer.get()));
161 assertEquals("row", Bytes.toString(key.getRow()));
162 return null;
163 }
164 }).when(context).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
165
166 mapper.map(key, value, context);
167
168 }
169
170
171
172
173 @Test
174 public void testMainMethod() throws Exception {
175
176 PrintStream oldPrintStream = System.err;
177 SecurityManager SECURITY_MANAGER = System.getSecurityManager();
178 LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
179 System.setSecurityManager(newSecurityManager);
180 ByteArrayOutputStream data = new ByteArrayOutputStream();
181 String[] args = {};
182 System.setErr(new PrintStream(data));
183 try {
184 System.setErr(new PrintStream(data));
185 try {
186 WALPlayer.main(args);
187 fail("should be SecurityException");
188 } catch (SecurityException e) {
189 assertEquals(-1, newSecurityManager.getExitCode());
190 assertTrue(data.toString().contains("ERROR: Wrong number of arguments:"));
191 assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" +
192 " <tables> [<tableMappings>]"));
193 assertTrue(data.toString().contains("-Dhlog.bulk.output=/path/for/output"));
194 }
195
196 } finally {
197 System.setErr(oldPrintStream);
198 System.setSecurityManager(SECURITY_MANAGER);
199 }
200
201 }
202
203 }