1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.classification.InterfaceAudience;
26 import org.apache.hadoop.classification.InterfaceStability;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.CellUtil;
31 import org.apache.hadoop.hbase.HBaseConfiguration;
32 import org.apache.hadoop.hbase.client.Result;
33 import org.apache.hadoop.hbase.client.Scan;
34 import org.apache.hadoop.hbase.filter.CompareFilter;
35 import org.apache.hadoop.hbase.filter.Filter;
36 import org.apache.hadoop.hbase.filter.PrefixFilter;
37 import org.apache.hadoop.hbase.filter.RegexStringComparator;
38 import org.apache.hadoop.hbase.filter.RowFilter;
39 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.io.IntWritable;
42 import org.apache.hadoop.io.Text;
43 import org.apache.hadoop.mapreduce.Job;
44 import org.apache.hadoop.mapreduce.Reducer;
45 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
46 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
47 import org.apache.hadoop.util.GenericOptionsParser;
48
49 import com.google.common.base.Preconditions;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 @InterfaceAudience.Public
70 @InterfaceStability.Stable
71 public class CellCounter {
72 private static final Log LOG =
73 LogFactory.getLog(CellCounter.class.getName());
74
75
76
77
78
79 static final String NAME = "CellCounter";
80
81
82
83
84 static class CellCounterMapper
85 extends TableMapper<Text, IntWritable> {
86
87
88
89 public static enum Counters {
90 ROWS
91 }
92
93
94
95
96
97
98
99
100
101
102
103
104 @Override
105 public void map(ImmutableBytesWritable row, Result values,
106 Context context)
107 throws IOException {
108 Preconditions.checkState(values != null,
109 "values passed to the map is null");
110 String currentFamilyName = null;
111 String currentQualifierName = null;
112 String currentRowKey = null;
113 Configuration config = context.getConfiguration();
114 String separator = config.get("ReportSeparator",":");
115 try {
116 context.getCounter(Counters.ROWS).increment(1);
117 context.write(new Text("Total ROWS"), new IntWritable(1));
118
119 for (Cell value : values.listCells()) {
120 currentRowKey = Bytes.toStringBinary(CellUtil.cloneRow(value));
121 String thisRowFamilyName = Bytes.toStringBinary(CellUtil.cloneFamily(value));
122 if (!thisRowFamilyName.equals(currentFamilyName)) {
123 currentFamilyName = thisRowFamilyName;
124 context.getCounter("CF", thisRowFamilyName).increment(1);
125 context.write(new Text("Total Families Across all Rows"),
126 new IntWritable(1));
127 context.write(new Text(thisRowFamilyName), new IntWritable(1));
128 }
129 String thisRowQualifierName = thisRowFamilyName + separator
130 + Bytes.toStringBinary(CellUtil.cloneQualifier(value));
131 if (!thisRowQualifierName.equals(currentQualifierName)) {
132 currentQualifierName = thisRowQualifierName;
133 context.getCounter("CFQL", thisRowQualifierName).increment(1);
134 context.write(new Text("Total Qualifiers across all Rows"),
135 new IntWritable(1));
136 context.write(new Text(thisRowQualifierName), new IntWritable(1));
137
138 context.getCounter("QL_VERSIONS", currentRowKey + separator +
139 thisRowQualifierName).increment(1);
140 context.write(new Text(currentRowKey + separator
141 + thisRowQualifierName + "_Versions"), new IntWritable(1));
142
143 } else {
144
145 currentQualifierName = thisRowQualifierName;
146 context.getCounter("QL_VERSIONS", currentRowKey + separator +
147 thisRowQualifierName).increment(1);
148 context.write(new Text(currentRowKey + separator
149 + thisRowQualifierName + "_Versions"), new IntWritable(1));
150 }
151 }
152 } catch (InterruptedException e) {
153 e.printStackTrace();
154 }
155 }
156 }
157
158 static class IntSumReducer<Key> extends Reducer<Key, IntWritable,
159 Key, IntWritable> {
160
161 private IntWritable result = new IntWritable();
162 public void reduce(Key key, Iterable<IntWritable> values,
163 Context context)
164 throws IOException, InterruptedException {
165 int sum = 0;
166 for (IntWritable val : values) {
167 sum += val.get();
168 }
169 result.set(sum);
170 context.write(key, result);
171 }
172 }
173
174
175
176
177
178
179
180
181
182 public static Job createSubmittableJob(Configuration conf, String[] args)
183 throws IOException {
184 String tableName = args[0];
185 Path outputDir = new Path(args[1]);
186 String reportSeparatorString = (args.length > 2) ? args[2]: ":";
187 conf.set("ReportSeparator", reportSeparatorString);
188 Job job = new Job(conf, NAME + "_" + tableName);
189 job.setJarByClass(CellCounter.class);
190 Scan scan = getConfiguredScanForJob(conf, args);
191 TableMapReduceUtil.initTableMapperJob(tableName, scan,
192 CellCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
193 job.setNumReduceTasks(1);
194 job.setMapOutputKeyClass(Text.class);
195 job.setMapOutputValueClass(IntWritable.class);
196 job.setOutputFormatClass(TextOutputFormat.class);
197 job.setOutputKeyClass(Text.class);
198 job.setOutputValueClass(IntWritable.class);
199 FileOutputFormat.setOutputPath(job, outputDir);
200 job.setReducerClass(IntSumReducer.class);
201 return job;
202 }
203
204 private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
205 Scan s = new Scan();
206
207 s.setMaxVersions(Integer.MAX_VALUE);
208 s.setCacheBlocks(false);
209
210 if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
211 s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
212 }
213
214 Filter rowFilter = getRowFilter(args);
215 if (rowFilter!= null) {
216 LOG.info("Setting Row Filter for counter.");
217 s.setFilter(rowFilter);
218 }
219 return s;
220 }
221
222
223 private static Filter getRowFilter(String[] args) {
224 Filter rowFilter = null;
225 String filterCriteria = (args.length > 3) ? args[3]: null;
226 if (filterCriteria == null) return null;
227 if (filterCriteria.startsWith("^")) {
228 String regexPattern = filterCriteria.substring(1, filterCriteria.length());
229 rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regexPattern));
230 } else {
231 rowFilter = new PrefixFilter(Bytes.toBytes(filterCriteria));
232 }
233 return rowFilter;
234 }
235
236
237
238
239
240
241
242 public static void main(String[] args) throws Exception {
243 Configuration conf = HBaseConfiguration.create();
244 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
245 if (otherArgs.length < 1) {
246 System.err.println("ERROR: Wrong number of parameters: " + args.length);
247 System.err.println("Usage: CellCounter <tablename> <outputDir> <reportSeparator> " +
248 "[^[regex pattern] or [Prefix] for row filter]] ");
249 System.err.println(" Note: -D properties will be applied to the conf used. ");
250 System.err.println(" Additionally, the following SCAN properties can be specified");
251 System.err.println(" to get fine grained control on what is counted..");
252 System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
253 System.err.println(" <reportSeparator> parameter can be used to override the default report separator " +
254 "string : used to separate the rowId/column family name and qualifier name.");
255 System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the cell counter count " +
256 "operation to a limited subset of rows from the table based on regex or prefix pattern.");
257 System.exit(-1);
258 }
259 Job job = createSubmittableJob(conf, otherArgs);
260 System.exit(job.waitForCompletion(true) ? 0 : 1);
261 }
262 }