1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.util.Iterator;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.conf.Configured;
27 import org.apache.hadoop.fs.FileStatus;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.CellComparator;
32 import org.apache.hadoop.hbase.CellUtil;
33 import org.apache.hadoop.hbase.HBaseConfiguration;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.client.Connection;
36 import org.apache.hadoop.hbase.client.ConnectionFactory;
37 import org.apache.hadoop.hbase.client.Delete;
38 import org.apache.hadoop.hbase.client.Mutation;
39 import org.apache.hadoop.hbase.client.Put;
40 import org.apache.hadoop.hbase.client.Result;
41 import org.apache.hadoop.hbase.client.ResultScanner;
42 import org.apache.hadoop.hbase.client.Scan;
43 import org.apache.hadoop.hbase.client.Table;
44 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.mapreduce.Counters;
47 import org.apache.hadoop.mapreduce.Job;
48 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
49 import org.apache.hadoop.util.GenericOptionsParser;
50 import org.apache.hadoop.util.Tool;
51 import org.apache.hadoop.util.ToolRunner;
52
53 import com.google.common.base.Throwables;
54 import com.google.common.collect.Iterators;
55
56 public class SyncTable extends Configured implements Tool {
57
58 private static final Log LOG = LogFactory.getLog(SyncTable.class);
59
60 static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
61 static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
62 static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
63 static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
64 static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
65 static final String DRY_RUN_CONF_KEY="sync.table.dry.run";
66
67 Path sourceHashDir;
68 String sourceTableName;
69 String targetTableName;
70
71 String sourceZkCluster;
72 String targetZkCluster;
73 boolean dryRun;
74
75 Counters counters;
76
77 public SyncTable(Configuration conf) {
78 super(conf);
79 }
80
81 public Job createSubmittableJob(String[] args) throws IOException {
82 FileSystem fs = sourceHashDir.getFileSystem(getConf());
83 if (!fs.exists(sourceHashDir)) {
84 throw new IOException("Source hash dir not found: " + sourceHashDir);
85 }
86
87 HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir);
88 LOG.info("Read source hash manifest: " + tableHash);
89 LOG.info("Read " + tableHash.partitions.size() + " partition keys");
90 if (!tableHash.tableName.equals(sourceTableName)) {
91 LOG.warn("Table name mismatch - manifest indicates hash was taken from: "
92 + tableHash.tableName + " but job is reading from: " + sourceTableName);
93 }
94 if (tableHash.numHashFiles != tableHash.partitions.size() + 1) {
95 throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
96 + " should be 1 more than the number of partition keys. However, the manifest file "
97 + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys"
98 + " found in the partitions file is " + tableHash.partitions.size());
99 }
100
101 Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR);
102 int dataSubdirCount = 0;
103 for (FileStatus file : fs.listStatus(dataDir)) {
104 if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) {
105 dataSubdirCount++;
106 }
107 }
108
109 if (dataSubdirCount != tableHash.numHashFiles) {
110 throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
111 + " should be 1 more than the number of partition keys. However, the number of data dirs"
112 + " found is " + dataSubdirCount + " but the number of partition keys"
113 + " found in the partitions file is " + tableHash.partitions.size());
114 }
115
116 Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name",
117 "syncTable_" + sourceTableName + "-" + targetTableName));
118 Configuration jobConf = job.getConfiguration();
119 job.setJarByClass(HashTable.class);
120 jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
121 jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
122 jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
123 if (sourceZkCluster != null) {
124 jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
125 }
126 if (targetZkCluster != null) {
127 jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
128 }
129 jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
130
131 TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
132 SyncMapper.class, null, null, job);
133
134 job.setNumReduceTasks(0);
135
136 if (dryRun) {
137 job.setOutputFormatClass(NullOutputFormat.class);
138 } else {
139
140
141 TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null,
142 targetZkCluster, null, null);
143
144
145 }
146
147 return job;
148 }
149
150 public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> {
151 Path sourceHashDir;
152
153 Connection sourceConnection;
154 Connection targetConnection;
155 Table sourceTable;
156 Table targetTable;
157 boolean dryRun;
158
159 HashTable.TableHash sourceTableHash;
160 HashTable.TableHash.Reader sourceHashReader;
161 ImmutableBytesWritable currentSourceHash;
162 ImmutableBytesWritable nextSourceKey;
163 HashTable.ResultHasher targetHasher;
164
165 Throwable mapperException;
166
167 public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS,
168 SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES,
169 MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED};
170
171 @Override
172 protected void setup(Context context) throws IOException {
173
174 Configuration conf = context.getConfiguration();
175 sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
176 sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null);
177 targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY,
178 TableOutputFormat.OUTPUT_CONF_PREFIX);
179 sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
180 targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
181 dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
182
183 sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
184 LOG.info("Read source hash manifest: " + sourceTableHash);
185 LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys");
186
187 TableSplit split = (TableSplit) context.getInputSplit();
188 ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow());
189
190 sourceHashReader = sourceTableHash.newReader(conf, splitStartKey);
191 findNextKeyHashPair();
192
193
194
195
196 targetHasher = new HashTable.ResultHasher();
197 }
198
199 private static Connection openConnection(Configuration conf, String zkClusterConfKey,
200 String configPrefix)
201 throws IOException {
202 String zkCluster = conf.get(zkClusterConfKey);
203 Configuration clusterConf = HBaseConfiguration.createClusterConf(conf,
204 zkCluster, configPrefix);
205 return ConnectionFactory.createConnection(clusterConf);
206 }
207
208 private static Table openTable(Connection connection, Configuration conf,
209 String tableNameConfKey) throws IOException {
210 return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey)));
211 }
212
213
214
215
216
217 private void findNextKeyHashPair() throws IOException {
218 boolean hasNext = sourceHashReader.next();
219 if (hasNext) {
220 nextSourceKey = sourceHashReader.getCurrentKey();
221 } else {
222
223 nextSourceKey = null;
224 }
225 }
226
227 @Override
228 protected void map(ImmutableBytesWritable key, Result value, Context context)
229 throws IOException, InterruptedException {
230 try {
231
232 while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) {
233 moveToNextBatch(context);
234 }
235
236
237 if (targetHasher.isBatchStarted()) {
238 targetHasher.hashResult(value);
239 }
240 } catch (Throwable t) {
241 mapperException = t;
242 Throwables.propagateIfInstanceOf(t, IOException.class);
243 Throwables.propagateIfInstanceOf(t, InterruptedException.class);
244 Throwables.propagate(t);
245 }
246 }
247
248
249
250
251
252 private void moveToNextBatch(Context context) throws IOException, InterruptedException {
253 if (targetHasher.isBatchStarted()) {
254 finishBatchAndCompareHashes(context);
255 }
256 targetHasher.startBatch(nextSourceKey);
257 currentSourceHash = sourceHashReader.getCurrentHash();
258
259 findNextKeyHashPair();
260 }
261
262
263
264
265
266
267 private void finishBatchAndCompareHashes(Context context)
268 throws IOException, InterruptedException {
269 targetHasher.finishBatch();
270 context.getCounter(Counter.BATCHES).increment(1);
271 if (targetHasher.getBatchSize() == 0) {
272 context.getCounter(Counter.EMPTY_BATCHES).increment(1);
273 }
274 ImmutableBytesWritable targetHash = targetHasher.getBatchHash();
275 if (targetHash.equals(currentSourceHash)) {
276 context.getCounter(Counter.HASHES_MATCHED).increment(1);
277 } else {
278 context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1);
279
280 ImmutableBytesWritable stopRow = nextSourceKey == null
281 ? new ImmutableBytesWritable(sourceTableHash.stopRow)
282 : nextSourceKey;
283
284 if (LOG.isDebugEnabled()) {
285 LOG.debug("Hash mismatch. Key range: " + toHex(targetHasher.getBatchStartKey())
286 + " to " + toHex(stopRow)
287 + " sourceHash: " + toHex(currentSourceHash)
288 + " targetHash: " + toHex(targetHash));
289 }
290
291 syncRange(context, targetHasher.getBatchStartKey(), stopRow);
292 }
293 }
294 private static String toHex(ImmutableBytesWritable bytes) {
295 return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength());
296 }
297
298 private static final CellScanner EMPTY_CELL_SCANNER
299 = new CellScanner(Iterators.<Result>emptyIterator());
300
301
302
303
304
305
306 private void syncRange(Context context, ImmutableBytesWritable startRow,
307 ImmutableBytesWritable stopRow) throws IOException, InterruptedException {
308
309 Scan scan = sourceTableHash.initScan();
310 scan.setStartRow(startRow.copyBytes());
311 scan.setStopRow(stopRow.copyBytes());
312
313 ResultScanner sourceScanner = sourceTable.getScanner(scan);
314 CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
315
316 ResultScanner targetScanner = targetTable.getScanner(scan);
317 CellScanner targetCells = new CellScanner(targetScanner.iterator());
318
319 boolean rangeMatched = true;
320 byte[] nextSourceRow = sourceCells.nextRow();
321 byte[] nextTargetRow = targetCells.nextRow();
322 while(nextSourceRow != null || nextTargetRow != null) {
323 boolean rowMatched;
324 int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow);
325 if (rowComparison < 0) {
326 if (LOG.isInfoEnabled()) {
327 LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow));
328 }
329 context.getCounter(Counter.TARGETMISSINGROWS).increment(1);
330
331 rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER);
332 nextSourceRow = sourceCells.nextRow();
333 } else if (rowComparison > 0) {
334 if (LOG.isInfoEnabled()) {
335 LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow));
336 }
337 context.getCounter(Counter.SOURCEMISSINGROWS).increment(1);
338
339 rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells);
340 nextTargetRow = targetCells.nextRow();
341 } else {
342
343 rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells);
344 nextSourceRow = sourceCells.nextRow();
345 nextTargetRow = targetCells.nextRow();
346 }
347
348 if (!rowMatched) {
349 rangeMatched = false;
350 }
351 }
352
353 sourceScanner.close();
354 targetScanner.close();
355
356 context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED)
357 .increment(1);
358 }
359
360 private static class CellScanner {
361 private final Iterator<Result> results;
362
363 private byte[] currentRow;
364 private Result currentRowResult;
365 private int nextCellInRow;
366
367 private Result nextRowResult;
368
369 public CellScanner(Iterator<Result> results) {
370 this.results = results;
371 }
372
373
374
375
376
377 public byte[] nextRow() {
378 if (nextRowResult == null) {
379
380 while (results.hasNext()) {
381 nextRowResult = results.next();
382 Cell nextCell = nextRowResult.rawCells()[0];
383 if (currentRow == null
384 || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(),
385 nextCell.getRowOffset(), nextCell.getRowLength())) {
386
387 break;
388 } else {
389
390 nextRowResult = null;
391 }
392 }
393
394 if (nextRowResult == null) {
395
396 currentRowResult = null;
397 currentRow = null;
398 return null;
399 }
400 }
401
402
403 currentRowResult = nextRowResult;
404 nextCellInRow = 0;
405 currentRow = currentRowResult.getRow();
406 nextRowResult = null;
407 return currentRow;
408 }
409
410
411
412
413 public Cell nextCellInRow() {
414 if (currentRowResult == null) {
415
416 return null;
417 }
418
419 Cell nextCell = currentRowResult.rawCells()[nextCellInRow];
420 nextCellInRow++;
421 if (nextCellInRow == currentRowResult.size()) {
422 if (results.hasNext()) {
423 Result result = results.next();
424 Cell cell = result.rawCells()[0];
425 if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(),
426 cell.getRowOffset(), cell.getRowLength())) {
427
428 currentRowResult = result;
429 nextCellInRow = 0;
430 } else {
431
432 nextRowResult = result;
433
434 currentRowResult = null;
435 }
436 } else {
437
438 currentRowResult = null;
439 }
440 }
441 return nextCell;
442 }
443 }
444
445
446
447
448
449
450
451 private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells,
452 CellScanner targetCells) throws IOException, InterruptedException {
453 Put put = null;
454 Delete delete = null;
455 long matchingCells = 0;
456 boolean matchingRow = true;
457 Cell sourceCell = sourceCells.nextCellInRow();
458 Cell targetCell = targetCells.nextCellInRow();
459 while (sourceCell != null || targetCell != null) {
460
461 int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell);
462 if (cellKeyComparison < 0) {
463 if (LOG.isDebugEnabled()) {
464 LOG.debug("Target missing cell: " + sourceCell);
465 }
466 context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
467 matchingRow = false;
468
469 if (!dryRun) {
470 if (put == null) {
471 put = new Put(rowKey);
472 }
473 put.add(sourceCell);
474 }
475
476 sourceCell = sourceCells.nextCellInRow();
477 } else if (cellKeyComparison > 0) {
478 if (LOG.isDebugEnabled()) {
479 LOG.debug("Source missing cell: " + targetCell);
480 }
481 context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
482 matchingRow = false;
483
484 if (!dryRun) {
485 if (delete == null) {
486 delete = new Delete(rowKey);
487 }
488
489 delete.addColumn(CellUtil.cloneFamily(targetCell),
490 CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp());
491 }
492
493 targetCell = targetCells.nextCellInRow();
494 } else {
495
496 if (CellUtil.matchingValue(sourceCell, targetCell)) {
497 matchingCells++;
498 } else {
499 if (LOG.isDebugEnabled()) {
500 LOG.debug("Different values: ");
501 LOG.debug(" source cell: " + sourceCell
502 + " value: " + Bytes.toHex(sourceCell.getValueArray(),
503 sourceCell.getValueOffset(), sourceCell.getValueLength()));
504 LOG.debug(" target cell: " + targetCell
505 + " value: " + Bytes.toHex(targetCell.getValueArray(),
506 targetCell.getValueOffset(), targetCell.getValueLength()));
507 }
508 context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
509 matchingRow = false;
510
511 if (!dryRun) {
512
513 if (put == null) {
514 put = new Put(rowKey);
515 }
516 put.add(sourceCell);
517 }
518 }
519 sourceCell = sourceCells.nextCellInRow();
520 targetCell = targetCells.nextCellInRow();
521 }
522
523 if (!dryRun && sourceTableHash.scanBatch > 0) {
524 if (put != null && put.size() >= sourceTableHash.scanBatch) {
525 context.write(new ImmutableBytesWritable(rowKey), put);
526 put = null;
527 }
528 if (delete != null && delete.size() >= sourceTableHash.scanBatch) {
529 context.write(new ImmutableBytesWritable(rowKey), delete);
530 delete = null;
531 }
532 }
533 }
534
535 if (!dryRun) {
536 if (put != null) {
537 context.write(new ImmutableBytesWritable(rowKey), put);
538 }
539 if (delete != null) {
540 context.write(new ImmutableBytesWritable(rowKey), delete);
541 }
542 }
543
544 if (matchingCells > 0) {
545 context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells);
546 }
547 if (matchingRow) {
548 context.getCounter(Counter.MATCHINGROWS).increment(1);
549 return true;
550 } else {
551 context.getCounter(Counter.ROWSWITHDIFFS).increment(1);
552 return false;
553 }
554 }
555
556 private static final CellComparator cellComparator = new CellComparator();
557
558
559
560
561 private static int compareRowKeys(byte[] r1, byte[] r2) {
562 if (r1 == null) {
563 return 1;
564 } else if (r2 == null) {
565 return -1;
566 } else {
567 return cellComparator.compareRows(r1, 0, r1.length, r2, 0, r2.length);
568 }
569 }
570
571
572
573
574
575
576 private static int compareCellKeysWithinRow(Cell c1, Cell c2) {
577 if (c1 == null) {
578 return 1;
579 }
580 if (c2 == null) {
581 return -1;
582 }
583
584 int result = CellComparator.compareFamilies(c1, c2);
585 if (result != 0) {
586 return result;
587 }
588
589 result = CellComparator.compareQualifiers(c1, c2);
590 if (result != 0) {
591 return result;
592 }
593
594
595 return CellComparator.compareTimestamps(c1, c2);
596 }
597
598 @Override
599 protected void cleanup(Context context)
600 throws IOException, InterruptedException {
601 if (mapperException == null) {
602 try {
603 finishRemainingHashRanges(context);
604 } catch (Throwable t) {
605 mapperException = t;
606 }
607 }
608
609 try {
610 sourceTable.close();
611 targetTable.close();
612 sourceConnection.close();
613 targetConnection.close();
614 } catch (Throwable t) {
615 if (mapperException == null) {
616 mapperException = t;
617 } else {
618 LOG.error("Suppressing exception from closing tables", t);
619 }
620 }
621
622
623 if (mapperException != null) {
624 Throwables.propagateIfInstanceOf(mapperException, IOException.class);
625 Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class);
626 Throwables.propagate(mapperException);
627 }
628 }
629
630 private void finishRemainingHashRanges(Context context) throws IOException,
631 InterruptedException {
632 TableSplit split = (TableSplit) context.getInputSplit();
633 byte[] splitEndRow = split.getEndRow();
634 boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow);
635
636
637 while (nextSourceKey != null
638 && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) {
639 moveToNextBatch(context);
640 }
641
642 if (targetHasher.isBatchStarted()) {
643
644
645 if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0)
646 || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) {
647
648
649 Scan scan = sourceTableHash.initScan();
650 scan.setStartRow(splitEndRow);
651 if (nextSourceKey == null) {
652 scan.setStopRow(sourceTableHash.stopRow);
653 } else {
654 scan.setStopRow(nextSourceKey.copyBytes());
655 }
656
657 ResultScanner targetScanner = targetTable.getScanner(scan);
658 for (Result row : targetScanner) {
659 targetHasher.hashResult(row);
660 }
661 }
662
663 finishBatchAndCompareHashes(context);
664 }
665 }
666 }
667
668 private static final int NUM_ARGS = 3;
669 private static void printUsage(final String errorMsg) {
670 if (errorMsg != null && errorMsg.length() > 0) {
671 System.err.println("ERROR: " + errorMsg);
672 System.err.println();
673 }
674 System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>");
675 System.err.println();
676 System.err.println("Options:");
677
678 System.err.println(" sourcezkcluster ZK cluster key of the source table");
679 System.err.println(" (defaults to cluster in classpath's config)");
680 System.err.println(" targetzkcluster ZK cluster key of the target table");
681 System.err.println(" (defaults to cluster in classpath's config)");
682 System.err.println(" dryrun if true, output counters but no writes");
683 System.err.println(" (defaults to false)");
684 System.err.println();
685 System.err.println("Args:");
686 System.err.println(" sourcehashdir path to HashTable output dir for source table");
687 System.err.println(" if not specified, then all data will be scanned");
688 System.err.println(" sourcetable Name of the source table to sync from");
689 System.err.println(" targettable Name of the target table to sync to");
690 System.err.println();
691 System.err.println("Examples:");
692 System.err.println(" For a dry run SyncTable of tableA from a remote source cluster");
693 System.err.println(" to a local target cluster:");
694 System.err.println(" $ bin/hbase " +
695 "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true"
696 + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase"
697 + " hdfs://nn:9000/hashes/tableA tableA tableA");
698 }
699
700 private boolean doCommandLine(final String[] args) {
701 if (args.length < NUM_ARGS) {
702 printUsage(null);
703 return false;
704 }
705 try {
706 sourceHashDir = new Path(args[args.length - 3]);
707 sourceTableName = args[args.length - 2];
708 targetTableName = args[args.length - 1];
709
710 for (int i = 0; i < args.length - NUM_ARGS; i++) {
711 String cmd = args[i];
712 if (cmd.equals("-h") || cmd.startsWith("--h")) {
713 printUsage(null);
714 return false;
715 }
716
717 final String sourceZkClusterKey = "--sourcezkcluster=";
718 if (cmd.startsWith(sourceZkClusterKey)) {
719 sourceZkCluster = cmd.substring(sourceZkClusterKey.length());
720 continue;
721 }
722
723 final String targetZkClusterKey = "--targetzkcluster=";
724 if (cmd.startsWith(targetZkClusterKey)) {
725 targetZkCluster = cmd.substring(targetZkClusterKey.length());
726 continue;
727 }
728
729 final String dryRunKey = "--dryrun=";
730 if (cmd.startsWith(dryRunKey)) {
731 dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length()));
732 continue;
733 }
734
735 printUsage("Invalid argument '" + cmd + "'");
736 return false;
737 }
738
739
740 } catch (Exception e) {
741 e.printStackTrace();
742 printUsage("Can't start because " + e.getMessage());
743 return false;
744 }
745 return true;
746 }
747
748
749
750
751 public static void main(String[] args) throws Exception {
752 int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args);
753 System.exit(ret);
754 }
755
756 @Override
757 public int run(String[] args) throws Exception {
758 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
759 if (!doCommandLine(otherArgs)) {
760 return 1;
761 }
762
763 Job job = createSubmittableJob(otherArgs);
764 if (!job.waitForCompletion(true)) {
765 LOG.info("Map-reduce job failed!");
766 return 1;
767 }
768 counters = job.getCounters();
769 return 0;
770 }
771
772 }