1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22 import java.util.Set;
23 import java.util.TreeSet;
24
25 import org.apache.commons.lang.StringUtils;
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.classification.InterfaceStability;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.client.Result;
31 import org.apache.hadoop.hbase.client.Scan;
32 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
33 import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter;
34 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.mapreduce.Job;
37 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
38 import org.apache.hadoop.util.GenericOptionsParser;
39
40
41
42
43
44 @InterfaceAudience.Public
45 @InterfaceStability.Stable
46 public class RowCounter {
47
48
49 static final String NAME = "rowcounter";
50
51
52
53
54 static class RowCounterMapper
55 extends TableMapper<ImmutableBytesWritable, Result> {
56
57
58 public static enum Counters {ROWS}
59
60
61
62
63
64
65
66
67
68
69
70 @Override
71 public void map(ImmutableBytesWritable row, Result values,
72 Context context)
73 throws IOException {
74
75 context.getCounter(Counters.ROWS).increment(1);
76 }
77 }
78
79
80
81
82
83
84
85
86
87 public static Job createSubmittableJob(Configuration conf, String[] args)
88 throws IOException {
89 String tableName = args[0];
90 String startKey = null;
91 String endKey = null;
92 StringBuilder sb = new StringBuilder();
93
94 final String rangeSwitch = "--range=";
95
96
97 for (int i = 1; i < args.length; i++) {
98 if (args[i].startsWith(rangeSwitch)) {
99 String[] startEnd = args[i].substring(rangeSwitch.length()).split(",", 2);
100 if (startEnd.length != 2 || startEnd[1].contains(",")) {
101 printUsage("Please specify range in such format as \"--range=a,b\" " +
102 "or, with only one boundary, \"--range=,b\" or \"--range=a,\"");
103 return null;
104 }
105 startKey = startEnd[0];
106 endKey = startEnd[1];
107 }
108 else {
109
110 sb.append(args[i]);
111 sb.append(" ");
112 }
113 }
114
115 Job job = new Job(conf, NAME + "_" + tableName);
116 job.setJarByClass(RowCounter.class);
117 Scan scan = new Scan();
118 scan.setCacheBlocks(false);
119 Set<byte []> qualifiers = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
120 if (startKey != null && !startKey.equals("")) {
121 scan.setStartRow(Bytes.toBytes(startKey));
122 }
123 if (endKey != null && !endKey.equals("")) {
124 scan.setStopRow(Bytes.toBytes(endKey));
125 }
126 scan.setFilter(new FirstKeyOnlyFilter());
127 if (sb.length() > 0) {
128 for (String columnName : sb.toString().trim().split(" ")) {
129 String family = StringUtils.substringBefore(columnName, ":");
130 String qualifier = StringUtils.substringAfter(columnName, ":");
131
132 if (StringUtils.isBlank(qualifier)) {
133 scan.addFamily(Bytes.toBytes(family));
134 }
135 else {
136 scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
137 }
138 }
139 }
140
141
142
143 if (qualifiers.size() == 0) {
144 scan.setFilter(new FirstKeyOnlyFilter());
145 } else {
146 scan.setFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers));
147 }
148 job.setOutputFormatClass(NullOutputFormat.class);
149 TableMapReduceUtil.initTableMapperJob(tableName, scan,
150 RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
151 job.setNumReduceTasks(0);
152 return job;
153 }
154
155
156
157
158 private static void printUsage(String errorMessage) {
159 System.err.println("ERROR: " + errorMessage);
160 printUsage();
161 }
162
163
164
165
166 private static void printUsage() {
167 System.err.println("Usage: RowCounter [options] <tablename> " +
168 "[--range=[startKey],[endKey]] [<column1> <column2>...]");
169 System.err.println("For performance consider the following options:\n"
170 + "-Dhbase.client.scanner.caching=100\n"
171 + "-Dmapred.map.tasks.speculative.execution=false");
172 }
173
174
175
176
177
178
179
180 public static void main(String[] args) throws Exception {
181 Configuration conf = HBaseConfiguration.create();
182 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
183 if (otherArgs.length < 1) {
184 printUsage("Wrong number of parameters: " + args.length);
185 System.exit(-1);
186 }
187 Job job = createSubmittableJob(conf, otherArgs);
188 if (job == null) {
189 System.exit(-1);
190 }
191 System.exit(job.waitForCompletion(true) ? 0 : 1);
192 }
193 }