1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21
22 import org.apache.hadoop.classification.InterfaceAudience;
23 import org.apache.hadoop.classification.InterfaceStability;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.KeyValue;
26 import org.apache.hadoop.hbase.client.Put;
27 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
28 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
29 import org.apache.hadoop.hbase.security.visibility.CellVisibility;
30 import org.apache.hadoop.hbase.util.Base64;
31 import org.apache.hadoop.io.LongWritable;
32 import org.apache.hadoop.io.Text;
33 import org.apache.hadoop.mapreduce.Counter;
34 import org.apache.hadoop.mapreduce.Mapper;
35
36
37
38
39 @InterfaceAudience.Public
40 @InterfaceStability.Stable
41 public class TsvImporterMapper
42 extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
43 {
44
45
46 protected long ts;
47
48
49 private String separator;
50
51
52 private boolean skipBadLines;
53 private Counter badLineCount;
54
55 protected ImportTsv.TsvParser parser;
56
57 protected Configuration conf;
58
59 protected String cellVisibilityExpr;
60
61 private String hfileOutPath;
62
63 private LabelExpander labelExpander;
64
65 public long getTs() {
66 return ts;
67 }
68
69 public boolean getSkipBadLines() {
70 return skipBadLines;
71 }
72
73 public Counter getBadLineCount() {
74 return badLineCount;
75 }
76
77 public void incrementBadLineCount(int count) {
78 this.badLineCount.increment(count);
79 }
80
81
82
83
84
85
86
87
88
89 @Override
90 protected void setup(Context context) {
91 doSetup(context);
92
93 conf = context.getConfiguration();
94 parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
95 separator);
96 if (parser.getRowKeyColumnIndex() == -1) {
97 throw new RuntimeException("No row key column specified");
98 }
99 labelExpander = new LabelExpander(conf);
100 }
101
102
103
104
105
106 protected void doSetup(Context context) {
107 Configuration conf = context.getConfiguration();
108
109
110
111 separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
112 if (separator == null) {
113 separator = ImportTsv.DEFAULT_SEPARATOR;
114 } else {
115 separator = new String(Base64.decode(separator));
116 }
117
118
119 ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
120
121 skipBadLines = context.getConfiguration().getBoolean(
122 ImportTsv.SKIP_LINES_CONF_KEY, true);
123 badLineCount = context.getCounter("ImportTsv", "Bad Lines");
124 hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
125 }
126
127
128
129
130 @Override
131 public void map(LongWritable offset, Text value,
132 Context context)
133 throws IOException {
134 byte[] lineBytes = value.getBytes();
135
136 try {
137 ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
138 lineBytes, value.getLength());
139 ImmutableBytesWritable rowKey =
140 new ImmutableBytesWritable(lineBytes,
141 parsed.getRowKeyOffset(),
142 parsed.getRowKeyLength());
143
144 ts = parsed.getTimestamp(ts);
145 cellVisibilityExpr = parsed.getCellVisibility();
146
147 Put put = new Put(rowKey.copyBytes());
148 for (int i = 0; i < parsed.getColumnCount(); i++) {
149 if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
150 || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
151 continue;
152 }
153 KeyValue kv = createPuts(lineBytes, parsed, put, i);
154 }
155 context.write(rowKey, put);
156 } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
157 if (skipBadLines) {
158 System.err.println(
159 "Bad line at offset: " + offset.get() + ":\n" +
160 badLine.getMessage());
161 incrementBadLineCount(1);
162 return;
163 } else {
164 throw new IOException(badLine);
165 }
166 } catch (IllegalArgumentException e) {
167 if (skipBadLines) {
168 System.err.println(
169 "Bad line at offset: " + offset.get() + ":\n" +
170 e.getMessage());
171 incrementBadLineCount(1);
172 return;
173 } else {
174 throw new IOException(e);
175 }
176 } catch (InterruptedException e) {
177 e.printStackTrace();
178 }
179 }
180
181 protected KeyValue createPuts(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put,
182 int i) throws BadTsvLineException, IOException {
183 KeyValue kv = null;
184 if (hfileOutPath == null) {
185 kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
186 parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
187 parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
188 parsed.getColumnOffset(i), parsed.getColumnLength(i));
189 if (cellVisibilityExpr != null) {
190
191
192 put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
193 }
194 } else {
195 kv = labelExpander.createKVFromCellVisibilityExpr(
196 parsed.getRowKeyOffset(), parsed.getRowKeyLength(), parser.getFamily(i), 0,
197 parser.getFamily(i).length, parser.getQualifier(i), 0,
198 parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
199 parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr);
200 }
201 put.add(kv);
202 return kv;
203 }
204 }