1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.util.Iterator;
22 import java.util.Set;
23 import java.util.TreeSet;
24
25 import org.apache.hadoop.classification.InterfaceAudience;
26 import org.apache.hadoop.classification.InterfaceStability;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
30 import org.apache.hadoop.hbase.util.Base64;
31 import org.apache.hadoop.io.Text;
32 import org.apache.hadoop.mapreduce.Counter;
33 import org.apache.hadoop.mapreduce.Reducer;
34 import org.apache.hadoop.util.StringUtils;
35
36
37
38
39
40
41
42
43 @InterfaceAudience.Public
44 @InterfaceStability.Evolving
45 public class TextSortReducer extends
46 Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {
47
48
49 private long ts;
50
51
52 private String separator;
53
54
55 private boolean skipBadLines;
56
57 private Counter badLineCount;
58
59 private ImportTsv.TsvParser parser;
60
61
62 private String cellVisibilityExpr;
63
64 private LabelExpander labelExpander;
65
66 public long getTs() {
67 return ts;
68 }
69
70 public boolean getSkipBadLines() {
71 return skipBadLines;
72 }
73
74 public Counter getBadLineCount() {
75 return badLineCount;
76 }
77
78 public void incrementBadLineCount(int count) {
79 this.badLineCount.increment(count);
80 }
81
82
83
84
85
86
87
88
89
90 @Override
91 protected void setup(Context context) {
92 doSetup(context);
93
94 Configuration conf = context.getConfiguration();
95
96 parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
97 if (parser.getRowKeyColumnIndex() == -1) {
98 throw new RuntimeException("No row key column specified");
99 }
100 labelExpander = new LabelExpander(conf);
101 }
102
103
104
105
106
107 protected void doSetup(Context context) {
108 Configuration conf = context.getConfiguration();
109
110
111
112 separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
113 if (separator == null) {
114 separator = ImportTsv.DEFAULT_SEPARATOR;
115 } else {
116 separator = new String(Base64.decode(separator));
117 }
118
119
120 ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
121
122 skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
123 badLineCount = context.getCounter("ImportTsv", "Bad Lines");
124 }
125
126 @Override
127 protected void reduce(
128 ImmutableBytesWritable rowKey,
129 java.lang.Iterable<Text> lines,
130 Reducer<ImmutableBytesWritable, Text,
131 ImmutableBytesWritable, KeyValue>.Context context)
132 throws java.io.IOException, InterruptedException
133 {
134
135 long threshold = context.getConfiguration().getLong(
136 "reducer.row.threshold", 1L * (1<<30));
137 Iterator<Text> iter = lines.iterator();
138 while (iter.hasNext()) {
139 Set<KeyValue> kvs = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
140 long curSize = 0;
141
142 while (iter.hasNext() && curSize < threshold) {
143 Text line = iter.next();
144 byte[] lineBytes = line.getBytes();
145 try {
146 ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, line.getLength());
147
148 ts = parsed.getTimestamp(ts);
149 cellVisibilityExpr = parsed.getCellVisibility();
150
151 for (int i = 0; i < parsed.getColumnCount(); i++) {
152 if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
153 || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
154 continue;
155 }
156 KeyValue kv = null;
157 if (cellVisibilityExpr == null) {
158 kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
159 parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
160 parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
161 parsed.getColumnOffset(i), parsed.getColumnLength(i));
162 } else {
163
164 kv = labelExpander.createKVFromCellVisibilityExpr(
165 parsed.getRowKeyOffset(), parsed.getRowKeyLength(), parser.getFamily(i), 0,
166 parser.getFamily(i).length, parser.getQualifier(i), 0,
167 parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
168 parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr);
169 }
170 kvs.add(kv);
171 curSize += kv.heapSize();
172 }
173 } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
174 if (skipBadLines) {
175 System.err.println("Bad line." + badLine.getMessage());
176 incrementBadLineCount(1);
177 return;
178 }
179 throw new IOException(badLine);
180 } catch (IllegalArgumentException e) {
181 if (skipBadLines) {
182 System.err.println("Bad line." + e.getMessage());
183 incrementBadLineCount(1);
184 return;
185 }
186 throw new IOException(e);
187 }
188 }
189 context.setStatus("Read " + kvs.size() + " entries of " + kvs.getClass()
190 + "(" + StringUtils.humanReadableInt(curSize) + ")");
191 int index = 0;
192 for (KeyValue kv : kvs) {
193 context.write(rowKey, kv);
194 if (++index > 0 && index % 100 == 0)
195 context.setStatus("Wrote " + index + " key values.");
196 }
197
198
199 if (iter.hasNext()) {
200
201 context.write(null, null);
202 }
203 }
204 }
205 }