1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.test;
20
21 import com.google.common.base.Joiner;
22 import org.apache.commons.cli.CommandLine;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.conf.Configured;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.hbase.HBaseConfiguration;
29 import org.apache.hadoop.hbase.HRegionLocation;
30 import org.apache.hadoop.hbase.IntegrationTestingUtility;
31 import org.apache.hadoop.hbase.ServerName;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.client.Connection;
35 import org.apache.hadoop.hbase.client.ConnectionFactory;
36 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
37 import org.apache.hadoop.hbase.client.Admin;
38 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
39 import org.apache.hadoop.util.Tool;
40 import org.apache.hadoop.util.ToolRunner;
41
42 import java.util.ArrayList;
43 import java.util.HashMap;
44 import java.util.Set;
45 import java.util.TreeSet;
46 import java.util.UUID;
47
48
49
50
51
52
53
54
55 public class IntegrationTestReplication extends IntegrationTestBigLinkedList {
56 protected String sourceClusterIdString;
57 protected String sinkClusterIdString;
58 protected int numIterations;
59 protected int numMappers;
60 protected long numNodes;
61 protected String outputDir;
62 protected int numReducers;
63 protected int generateVerifyGap;
64 protected Integer width;
65 protected Integer wrapMultiplier;
66 protected boolean noReplicationSetup = false;
67
68 private final String SOURCE_CLUSTER_OPT = "sourceCluster";
69 private final String DEST_CLUSTER_OPT = "destCluster";
70 private final String ITERATIONS_OPT = "iterations";
71 private final String NUM_MAPPERS_OPT = "numMappers";
72 private final String OUTPUT_DIR_OPT = "outputDir";
73 private final String NUM_REDUCERS_OPT = "numReducers";
74 private final String NO_REPLICATION_SETUP_OPT = "noReplicationSetup";
75
76
77
78
79
80 private final String GENERATE_VERIFY_GAP_OPT = "generateVerifyGap";
81
82
83
84
85
86 private final String WIDTH_OPT = "width";
87
88
89
90
91
92 private final String WRAP_MULTIPLIER_OPT = "wrapMultiplier";
93
94
95
96
97
98
99 private final String NUM_NODES_OPT = "numNodes";
100
101 private final int DEFAULT_NUM_MAPPERS = 1;
102 private final int DEFAULT_NUM_REDUCERS = 1;
103 private final int DEFAULT_NUM_ITERATIONS = 1;
104 private final int DEFAULT_GENERATE_VERIFY_GAP = 60;
105 private final int DEFAULT_WIDTH = 1000000;
106 private final int DEFAULT_WRAP_MULTIPLIER = 25;
107 private final int DEFAULT_NUM_NODES = DEFAULT_WIDTH * DEFAULT_WRAP_MULTIPLIER;
108
109
110
111
112
113 protected class ClusterID {
114 private final Configuration configuration;
115 private Connection connection = null;
116
117
118
119
120
121
122
123
124 public ClusterID(Configuration base,
125 String key) {
126 configuration = new Configuration(base);
127 String[] parts = key.split(":");
128 configuration.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
129 configuration.set(HConstants.ZOOKEEPER_CLIENT_PORT, parts[1]);
130 configuration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]);
131 }
132
133 @Override
134 public String toString() {
135 return Joiner.on(":").join(configuration.get(HConstants.ZOOKEEPER_QUORUM),
136 configuration.get(HConstants.ZOOKEEPER_CLIENT_PORT),
137 configuration.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
138 }
139
140 public Configuration getConfiguration() {
141 return this.configuration;
142 }
143
144 public Connection getConnection() throws Exception {
145 if (this.connection == null) {
146 this.connection = ConnectionFactory.createConnection(this.configuration);
147 }
148 return this.connection;
149 }
150
151 public void closeConnection() throws Exception {
152 this.connection.close();
153 this.connection = null;
154 }
155
156 public boolean equals(ClusterID other) {
157 return this.toString().equalsIgnoreCase(other.toString());
158 }
159 }
160
161
162
163
164
165
166
167 protected class VerifyReplicationLoop extends Configured implements Tool {
168 private final Log LOG = LogFactory.getLog(VerifyReplicationLoop.class);
169 protected ClusterID source;
170 protected ClusterID sink;
171
172 IntegrationTestBigLinkedList integrationTestBigLinkedList;
173
174
175
176
177
178
179
180
181
182 protected void setupTablesAndReplication() throws Exception {
183 TableName tableName = getTableName(source.getConfiguration());
184
185 ClusterID[] clusters = {source, sink};
186
187
188 for (ClusterID cluster : clusters) {
189 Admin admin = cluster.getConnection().getAdmin();
190
191 if (admin.tableExists(tableName)) {
192 if (admin.isTableEnabled(tableName)) {
193 admin.disableTable(tableName);
194 }
195
196
197
198
199
200
201
202
203
204 Set<ServerName> regionServers = new TreeSet<>();
205 for (HRegionLocation rl :
206 cluster.getConnection().getRegionLocator(tableName).getAllRegionLocations()) {
207 regionServers.add(rl.getServerName());
208 }
209
210 for (ServerName server : regionServers) {
211 source.getConnection().getAdmin().rollWALWriter(server);
212 }
213
214 admin.deleteTable(tableName);
215 }
216 }
217
218
219 Generator generator = new Generator();
220 generator.setConf(source.getConfiguration());
221 generator.createSchema();
222
223
224 if (!source.equals(sink)) {
225 ReplicationAdmin replicationAdmin = new ReplicationAdmin(source.getConfiguration());
226
227 for (String oldPeer : replicationAdmin.listPeerConfigs().keySet()) {
228 replicationAdmin.removePeer(oldPeer);
229 }
230
231
232 ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
233 peerConfig.setClusterKey(sink.toString());
234
235
236 HashMap<TableName, ArrayList<String>> toReplicate = new HashMap<>();
237 toReplicate.put(tableName, new ArrayList<String>(0));
238
239 replicationAdmin.addPeer("TestPeer", peerConfig, toReplicate);
240
241 replicationAdmin.enableTableRep(tableName);
242 replicationAdmin.close();
243 }
244
245 for (ClusterID cluster : clusters) {
246 cluster.closeConnection();
247 }
248 }
249
250 protected void waitForReplication() throws Exception {
251
252
253 Thread.sleep(generateVerifyGap * 1000);
254 }
255
256
257
258
259
260
261
262 protected void runGenerator() throws Exception {
263 Path outputPath = new Path(outputDir);
264 UUID uuid = UUID.randomUUID();
265 Path generatorOutput = new Path(outputPath, uuid.toString());
266
267 Generator generator = new Generator();
268 generator.setConf(source.getConfiguration());
269
270 int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier);
271 if (retCode > 0) {
272 throw new RuntimeException("Generator failed with return code: " + retCode);
273 }
274 }
275
276
277
278
279
280
281
282
283
284
285 protected void runVerify(long expectedNumNodes) throws Exception {
286 Path outputPath = new Path(outputDir);
287 UUID uuid = UUID.randomUUID();
288 Path iterationOutput = new Path(outputPath, uuid.toString());
289
290 Verify verify = new Verify();
291 verify.setConf(sink.getConfiguration());
292
293 int retCode = verify.run(iterationOutput, numReducers);
294 if (retCode > 0) {
295 throw new RuntimeException("Verify.run failed with return code: " + retCode);
296 }
297
298 if (!verify.verify(expectedNumNodes)) {
299 throw new RuntimeException("Verify.verify failed");
300 }
301
302 LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
303 }
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318 @Override
319 public int run(String[] args) throws Exception {
320 source = new ClusterID(getConf(), sourceClusterIdString);
321 sink = new ClusterID(getConf(), sinkClusterIdString);
322
323 if (!noReplicationSetup) {
324 setupTablesAndReplication();
325 }
326 int expectedNumNodes = 0;
327 for (int i = 0; i < numIterations; i++) {
328 LOG.info("Starting iteration = " + i);
329
330 expectedNumNodes += numMappers * numNodes;
331
332 runGenerator();
333 waitForReplication();
334 runVerify(expectedNumNodes);
335 }
336
337
338
339
340
341 return 0;
342 }
343 }
344
345 @Override
346 protected void addOptions() {
347 super.addOptions();
348 addRequiredOptWithArg("s", SOURCE_CLUSTER_OPT,
349 "Cluster ID of the source cluster (e.g. localhost:2181:/hbase)");
350 addRequiredOptWithArg("r", DEST_CLUSTER_OPT,
351 "Cluster ID of the sink cluster (e.g. localhost:2182:/hbase)");
352 addRequiredOptWithArg("d", OUTPUT_DIR_OPT,
353 "Temporary directory where to write keys for the test");
354
355 addOptWithArg("nm", NUM_MAPPERS_OPT,
356 "Number of mappers (default: " + DEFAULT_NUM_MAPPERS + ")");
357 addOptWithArg("nr", NUM_REDUCERS_OPT,
358 "Number of reducers (default: " + DEFAULT_NUM_MAPPERS + ")");
359 addOptNoArg("nrs", NO_REPLICATION_SETUP_OPT,
360 "Don't setup tables or configure replication before starting test");
361 addOptWithArg("n", NUM_NODES_OPT,
362 "Number of nodes. This should be a multiple of width * wrapMultiplier." +
363 " (default: " + DEFAULT_NUM_NODES + ")");
364 addOptWithArg("i", ITERATIONS_OPT, "Number of iterations to run (default: " +
365 DEFAULT_NUM_ITERATIONS + ")");
366 addOptWithArg("t", GENERATE_VERIFY_GAP_OPT,
367 "Gap between generate and verify steps in seconds (default: " +
368 DEFAULT_GENERATE_VERIFY_GAP + ")");
369 addOptWithArg("w", WIDTH_OPT,
370 "Width of the linked list chain (default: " + DEFAULT_WIDTH + ")");
371 addOptWithArg("wm", WRAP_MULTIPLIER_OPT, "How many times to wrap around (default: " +
372 DEFAULT_WRAP_MULTIPLIER + ")");
373 }
374
375 @Override
376 protected void processOptions(CommandLine cmd) {
377 processBaseOptions(cmd);
378
379 sourceClusterIdString = cmd.getOptionValue(SOURCE_CLUSTER_OPT);
380 sinkClusterIdString = cmd.getOptionValue(DEST_CLUSTER_OPT);
381 outputDir = cmd.getOptionValue(OUTPUT_DIR_OPT);
382
383
384 numMappers = parseInt(cmd.getOptionValue(NUM_MAPPERS_OPT,
385 Integer.toString(DEFAULT_NUM_MAPPERS)),
386 1, Integer.MAX_VALUE);
387 numReducers = parseInt(cmd.getOptionValue(NUM_REDUCERS_OPT,
388 Integer.toString(DEFAULT_NUM_REDUCERS)),
389 1, Integer.MAX_VALUE);
390 numNodes = parseInt(cmd.getOptionValue(NUM_NODES_OPT, Integer.toString(DEFAULT_NUM_NODES)),
391 1, Integer.MAX_VALUE);
392 generateVerifyGap = parseInt(cmd.getOptionValue(GENERATE_VERIFY_GAP_OPT,
393 Integer.toString(DEFAULT_GENERATE_VERIFY_GAP)),
394 1, Integer.MAX_VALUE);
395 numIterations = parseInt(cmd.getOptionValue(ITERATIONS_OPT,
396 Integer.toString(DEFAULT_NUM_ITERATIONS)),
397 1, Integer.MAX_VALUE);
398 width = parseInt(cmd.getOptionValue(WIDTH_OPT, Integer.toString(DEFAULT_WIDTH)),
399 1, Integer.MAX_VALUE);
400 wrapMultiplier = parseInt(cmd.getOptionValue(WRAP_MULTIPLIER_OPT,
401 Integer.toString(DEFAULT_WRAP_MULTIPLIER)),
402 1, Integer.MAX_VALUE);
403
404 if (cmd.hasOption(NO_REPLICATION_SETUP_OPT)) {
405 noReplicationSetup = true;
406 }
407
408 if (numNodes % (width * wrapMultiplier) != 0) {
409 throw new RuntimeException("numNodes must be a multiple of width and wrap multiplier");
410 }
411 }
412
413 @Override
414 public int runTestFromCommandLine() throws Exception {
415 VerifyReplicationLoop tool = new VerifyReplicationLoop();
416 tool.integrationTestBigLinkedList = this;
417 return ToolRunner.run(getConf(), tool, null);
418 }
419
420 public static void main(String[] args) throws Exception {
421 Configuration conf = HBaseConfiguration.create();
422 IntegrationTestingUtility.setUseDistributedCluster(conf);
423 int ret = ToolRunner.run(conf, new IntegrationTestReplication(), args);
424 System.exit(ret);
425 }
426 }