View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.util.Iterator;
22  import java.util.Set;
23  import java.util.TreeSet;
24  
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
30  import org.apache.hadoop.hbase.util.Base64;
31  import org.apache.hadoop.io.Text;
32  import org.apache.hadoop.mapreduce.Counter;
33  import org.apache.hadoop.mapreduce.Reducer;
34  import org.apache.hadoop.util.StringUtils;
35  
36  /**
37   * Emits Sorted KeyValues. Reads the text passed, parses it and creates the Key Values then Sorts
38   * them and emits Keyalues in sorted order. 
39   * @see HFileOutputFormat
40   * @see KeyValueSortReducer
41   * @see PutSortReducer
42   */
43  @InterfaceAudience.Public
44  @InterfaceStability.Evolving
45  public class TextSortReducer extends
46      Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {
47    
48    /** Timestamp for all inserted rows */
49    private long ts;
50  
51    /** Column seperator */
52    private String separator;
53  
54    /** Should skip bad lines */
55    private boolean skipBadLines;
56    
57    private Counter badLineCount;
58  
59    private ImportTsv.TsvParser parser;
60  
61    /** Cell visibility expr **/
62    private String cellVisibilityExpr;
63  
64    private LabelExpander labelExpander;
65  
66    public long getTs() {
67      return ts;
68    }
69  
70    public boolean getSkipBadLines() {
71      return skipBadLines;
72    }
73  
74    public Counter getBadLineCount() {
75      return badLineCount;
76    }
77  
78    public void incrementBadLineCount(int count) {
79      this.badLineCount.increment(count);
80    }
81  
82    /**
83     * Handles initializing this class with objects specific to it (i.e., the parser).
84     * Common initialization that might be leveraged by a subsclass is done in
85     * <code>doSetup</code>. Hence a subclass may choose to override this method
86     * and call <code>doSetup</code> as well before handling it's own custom params.
87     *
88     * @param context
89     */
90    @Override
91    protected void setup(Context context) {
92      doSetup(context);
93  
94      Configuration conf = context.getConfiguration();
95  
96      parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
97      if (parser.getRowKeyColumnIndex() == -1) {
98        throw new RuntimeException("No row key column specified");
99      }
100     labelExpander = new LabelExpander(conf);
101   }
102 
103   /**
104    * Handles common parameter initialization that a subclass might want to leverage.
105    * @param context
106    */
107   protected void doSetup(Context context) {
108     Configuration conf = context.getConfiguration();
109 
110     // If a custom separator has been used,
111     // decode it back from Base64 encoding.
112     separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
113     if (separator == null) {
114       separator = ImportTsv.DEFAULT_SEPARATOR;
115     } else {
116       separator = new String(Base64.decode(separator));
117     }
118 
119     // Should never get 0 as we are setting this to a valid value in job configuration.
120     ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
121 
122     skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
123     badLineCount = context.getCounter("ImportTsv", "Bad Lines");
124   }
125   
126   @Override
127   protected void reduce(
128       ImmutableBytesWritable rowKey,
129       java.lang.Iterable<Text> lines,
130       Reducer<ImmutableBytesWritable, Text,
131               ImmutableBytesWritable, KeyValue>.Context context)
132       throws java.io.IOException, InterruptedException
133   {
134     // although reduce() is called per-row, handle pathological case
135     long threshold = context.getConfiguration().getLong(
136         "reducer.row.threshold", 1L * (1<<30));
137     Iterator<Text> iter = lines.iterator();
138     while (iter.hasNext()) {
139       Set<KeyValue> kvs = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
140       long curSize = 0;
141       // stop at the end or the RAM threshold
142       while (iter.hasNext() && curSize < threshold) {
143         Text line = iter.next();
144         byte[] lineBytes = line.getBytes();
145         try {
146           ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, line.getLength());
147           // Retrieve timestamp if exists
148           ts = parsed.getTimestamp(ts);
149           cellVisibilityExpr = parsed.getCellVisibility();
150 
151           for (int i = 0; i < parsed.getColumnCount(); i++) {
152             if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
153                 || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
154               continue;
155             }
156             KeyValue kv = null;
157             if (cellVisibilityExpr == null) {
158               kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
159                   parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
160                   parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
161                   parsed.getColumnOffset(i), parsed.getColumnLength(i));
162             } else {
163               // Should ensure that VisibilityController is present
164               kv = labelExpander.createKVFromCellVisibilityExpr(
165                   parsed.getRowKeyOffset(), parsed.getRowKeyLength(), parser.getFamily(i), 0,
166                   parser.getFamily(i).length, parser.getQualifier(i), 0,
167                   parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
168                   parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr);
169             }
170             kvs.add(kv);
171             curSize += kv.heapSize();
172           }
173         } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
174           if (skipBadLines) {
175             System.err.println("Bad line." + badLine.getMessage());
176             incrementBadLineCount(1);
177             return;
178           }
179           throw new IOException(badLine);
180         } catch (IllegalArgumentException e) {
181           if (skipBadLines) {
182             System.err.println("Bad line." + e.getMessage());
183             incrementBadLineCount(1);
184             return;
185           } 
186           throw new IOException(e);
187         } 
188       }
189       context.setStatus("Read " + kvs.size() + " entries of " + kvs.getClass()
190           + "(" + StringUtils.humanReadableInt(curSize) + ")");
191       int index = 0;
192       for (KeyValue kv : kvs) {
193         context.write(rowKey, kv);
194         if (++index > 0 && index % 100 == 0)
195           context.setStatus("Wrote " + index + " key values.");
196       }
197 
198       // if we have more entries to process
199       if (iter.hasNext()) {
200         // force flush because we cannot guarantee intra-row sorted order
201         context.write(null, null);
202       }
203     }
204   }
205 }