1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22 import java.util.TreeMap;
23
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.HBaseConfiguration;
26 import org.apache.hadoop.hbase.client.Put;
27 import org.apache.hadoop.hbase.client.Result;
28 import org.apache.hadoop.hbase.client.Scan;
29 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.apache.hadoop.mapreduce.Job;
32 import org.apache.hadoop.mapreduce.Mapper;
33 import org.apache.hadoop.util.GenericOptionsParser;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 public class IndexBuilder {
66
67 public static final byte[] INDEX_COLUMN = Bytes.toBytes("INDEX");
68
69 public static final byte[] INDEX_QUALIFIER = Bytes.toBytes("ROW");
70
71
72
73
74 public static class Map extends
75 Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put> {
76 private byte[] family;
77 private TreeMap<byte[], ImmutableBytesWritable> indexes;
78
79 @Override
80 protected void map(ImmutableBytesWritable rowKey, Result result, Context context)
81 throws IOException, InterruptedException {
82 for(java.util.Map.Entry<byte[], ImmutableBytesWritable> index : indexes.entrySet()) {
83 byte[] qualifier = index.getKey();
84 ImmutableBytesWritable tableName = index.getValue();
85 byte[] value = result.getValue(family, qualifier);
86 if (value != null) {
87
88
89 Put put = new Put(value);
90 put.add(INDEX_COLUMN, INDEX_QUALIFIER, rowKey.get());
91 context.write(tableName, put);
92 }
93 }
94 }
95
96 @Override
97 protected void setup(Context context) throws IOException,
98 InterruptedException {
99 Configuration configuration = context.getConfiguration();
100 String tableName = configuration.get("index.tablename");
101 String[] fields = configuration.getStrings("index.fields");
102 String familyName = configuration.get("index.familyname");
103 family = Bytes.toBytes(familyName);
104 indexes = new TreeMap<byte[], ImmutableBytesWritable>(Bytes.BYTES_COMPARATOR);
105 for(String field : fields) {
106
107
108 indexes.put(Bytes.toBytes(field),
109 new ImmutableBytesWritable(Bytes.toBytes(tableName + "-" + field)));
110 }
111 }
112 }
113
114
115
116
117 public static Job configureJob(Configuration conf, String [] args)
118 throws IOException {
119 String tableName = args[0];
120 String columnFamily = args[1];
121 System.out.println("****" + tableName);
122 conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(new Scan()));
123 conf.set(TableInputFormat.INPUT_TABLE, tableName);
124 conf.set("index.tablename", tableName);
125 conf.set("index.familyname", columnFamily);
126 String[] fields = new String[args.length - 2];
127 for(int i = 0; i < fields.length; i++) {
128 fields[i] = args[i + 2];
129 }
130 conf.setStrings("index.fields", fields);
131 Job job = new Job(conf, tableName);
132 job.setJarByClass(IndexBuilder.class);
133 job.setMapperClass(Map.class);
134 job.setNumReduceTasks(0);
135 job.setInputFormatClass(TableInputFormat.class);
136 job.setOutputFormatClass(MultiTableOutputFormat.class);
137 return job;
138 }
139
140 public static void main(String[] args) throws Exception {
141 Configuration conf = HBaseConfiguration.create();
142 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
143 if(otherArgs.length < 3) {
144 System.err.println("Only " + otherArgs.length + " arguments supplied, required: 3");
145 System.err.println("Usage: IndexBuilder <TABLE_NAME> <COLUMN_FAMILY> <ATTR> [<ATTR> ...]");
146 System.exit(-1);
147 }
148 Job job = configureJob(conf, otherArgs);
149 System.exit(job.waitForCompletion(true) ? 0 : 1);
150 }
151 }