1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import org.apache.hadoop.classification.InterfaceAudience;
22 import org.apache.hadoop.classification.InterfaceStability;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.conf.Configured;
25 import org.apache.hadoop.hbase.HBaseConfiguration;
26 import org.apache.hadoop.hbase.HConstants;
27 import org.apache.hadoop.hbase.util.Bytes;
28 import org.apache.hadoop.hbase.client.Scan;
29 import org.apache.hadoop.mapreduce.Job;
30 import org.apache.hadoop.util.GenericOptionsParser;
31 import org.apache.hadoop.util.Tool;
32 import org.apache.hadoop.util.ToolRunner;
33
34 import java.io.IOException;
35 import java.util.HashMap;
36 import java.util.Map;
37
38
39
40
41
42
43 @InterfaceAudience.Public
44 @InterfaceStability.Stable
45 public class CopyTable extends Configured implements Tool {
46
47 final static String NAME = "copytable";
48 static long startTime = 0;
49 static long endTime = 0;
50 static int versions = -1;
51 static String tableName = null;
52 static String startRow = null;
53 static String stopRow = null;
54 static String newTableName = null;
55 static String peerAddress = null;
56 static String families = null;
57 static boolean allCells = false;
58
59 public CopyTable(Configuration conf) {
60 super(conf);
61 }
62
63
64
65
66
67
68
69
70 public static Job createSubmittableJob(Configuration conf, String[] args)
71 throws IOException {
72 if (!doCommandLine(args)) {
73 return null;
74 }
75 Job job = new Job(conf, NAME + "_" + tableName);
76 job.setJarByClass(CopyTable.class);
77 Scan scan = new Scan();
78 scan.setCacheBlocks(false);
79 if (startTime != 0) {
80 scan.setTimeRange(startTime,
81 endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
82 }
83 if (allCells) {
84 scan.setRaw(true);
85 }
86 if (versions >= 0) {
87 scan.setMaxVersions(versions);
88 }
89
90 if (startRow != null) {
91 scan.setStartRow(Bytes.toBytes(startRow));
92 }
93
94 if (stopRow != null) {
95 scan.setStopRow(Bytes.toBytes(stopRow));
96 }
97
98 if(families != null) {
99 String[] fams = families.split(",");
100 Map<String,String> cfRenameMap = new HashMap<String,String>();
101 for(String fam : fams) {
102 String sourceCf;
103 if(fam.contains(":")) {
104
105 String[] srcAndDest = fam.split(":", 2);
106 sourceCf = srcAndDest[0];
107 String destCf = srcAndDest[1];
108 cfRenameMap.put(sourceCf, destCf);
109 } else {
110
111 sourceCf = fam;
112 }
113 scan.addFamily(Bytes.toBytes(sourceCf));
114 }
115 Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
116 }
117 TableMapReduceUtil.initTableMapperJob(tableName, scan,
118 Import.Importer.class, null, null, job);
119 TableMapReduceUtil.initTableReducerJob(
120 newTableName == null ? tableName : newTableName, null, job,
121 null, peerAddress, null, null);
122 job.setNumReduceTasks(0);
123 return job;
124 }
125
126
127
128
129 private static void printUsage(final String errorMsg) {
130 if (errorMsg != null && errorMsg.length() > 0) {
131 System.err.println("ERROR: " + errorMsg);
132 }
133 System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
134 "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
135 System.err.println();
136 System.err.println("Options:");
137 System.err.println(" rs.class hbase.regionserver.class of the peer cluster");
138 System.err.println(" specify if different from current cluster");
139 System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster");
140 System.err.println(" startrow the start row");
141 System.err.println(" stoprow the stop row");
142 System.err.println(" starttime beginning of the time range (unixtime in millis)");
143 System.err.println(" without endtime means from starttime to forever");
144 System.err.println(" endtime end of the time range. Ignored if no starttime specified.");
145 System.err.println(" versions number of cell versions to copy");
146 System.err.println(" new.name new table's name");
147 System.err.println(" peer.adr Address of the peer cluster given in the format");
148 System.err.println(" hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent");
149 System.err.println(" families comma-separated list of families to copy");
150 System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. ");
151 System.err.println(" To keep the same name, just give \"cfName\"");
152 System.err.println(" all.cells also copy delete markers and deleted cells");
153 System.err.println();
154 System.err.println("Args:");
155 System.err.println(" tablename Name of the table to copy");
156 System.err.println();
157 System.err.println("Examples:");
158 System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
159 System.err.println(" $ bin/hbase " +
160 "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
161 "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
162 System.err.println("For performance consider the following general options:\n"
163 + "-Dhbase.client.scanner.caching=100\n"
164 + "-Dmapred.map.tasks.speculative.execution=false");
165 }
166
167 private static boolean doCommandLine(final String[] args) {
168
169
170 if (args.length < 1) {
171 printUsage(null);
172 return false;
173 }
174 try {
175 for (int i = 0; i < args.length; i++) {
176 String cmd = args[i];
177 if (cmd.equals("-h") || cmd.startsWith("--h")) {
178 printUsage(null);
179 return false;
180 }
181
182 final String startRowArgKey = "--startrow=";
183 if (cmd.startsWith(startRowArgKey)) {
184 startRow = cmd.substring(startRowArgKey.length());
185 continue;
186 }
187
188 final String stopRowArgKey = "--stoprow=";
189 if (cmd.startsWith(stopRowArgKey)) {
190 stopRow = cmd.substring(stopRowArgKey.length());
191 continue;
192 }
193
194 final String startTimeArgKey = "--starttime=";
195 if (cmd.startsWith(startTimeArgKey)) {
196 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
197 continue;
198 }
199
200 final String endTimeArgKey = "--endtime=";
201 if (cmd.startsWith(endTimeArgKey)) {
202 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
203 continue;
204 }
205
206 final String versionsArgKey = "--versions=";
207 if (cmd.startsWith(versionsArgKey)) {
208 versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
209 continue;
210 }
211
212 final String newNameArgKey = "--new.name=";
213 if (cmd.startsWith(newNameArgKey)) {
214 newTableName = cmd.substring(newNameArgKey.length());
215 continue;
216 }
217
218 final String peerAdrArgKey = "--peer.adr=";
219 if (cmd.startsWith(peerAdrArgKey)) {
220 peerAddress = cmd.substring(peerAdrArgKey.length());
221 continue;
222 }
223
224 final String familiesArgKey = "--families=";
225 if (cmd.startsWith(familiesArgKey)) {
226 families = cmd.substring(familiesArgKey.length());
227 continue;
228 }
229
230 if (cmd.startsWith("--all.cells")) {
231 allCells = true;
232 continue;
233 }
234
235 if (i == args.length-1) {
236 tableName = cmd;
237 } else {
238 printUsage("Invalid argument '" + cmd + "'" );
239 return false;
240 }
241 }
242 if (newTableName == null && peerAddress == null) {
243 printUsage("At least a new table name or a " +
244 "peer address must be specified");
245 return false;
246 }
247 if (startTime > endTime) {
248 printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime);
249 return false;
250 }
251 } catch (Exception e) {
252 e.printStackTrace();
253 printUsage("Can't start because " + e.getMessage());
254 return false;
255 }
256 return true;
257 }
258
259
260
261
262
263
264
265 public static void main(String[] args) throws Exception {
266 int ret = ToolRunner.run(new CopyTable(HBaseConfiguration.create()), args);
267 System.exit(ret);
268 }
269
270 @Override
271 public int run(String[] args) throws Exception {
272 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
273 Job job = createSubmittableJob(getConf(), otherArgs);
274 if (job == null) return 1;
275 return job.waitForCompletion(true) ? 0 : 1;
276 }
277 }