View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.classification.InterfaceStability;
27  import org.apache.hadoop.hbase.HBaseConfiguration;
28  import org.apache.hadoop.hbase.client.HTable;
29  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
30  import org.apache.hadoop.hbase.util.Bytes;
31  import org.apache.hadoop.mapred.JobConf;
32  import org.apache.hadoop.mapred.Partitioner;
33  
34  
35  /**
36   * This is used to partition the output keys into groups of keys.
37   * Keys are grouped according to the regions that currently exist
38   * so that each reducer fills a single region so load is distributed.
39   *
40   * @param <K2>
41   * @param <V2>
42   */
43  @Deprecated
44  @InterfaceAudience.Public
45  @InterfaceStability.Stable
46  public class HRegionPartitioner<K2,V2>
47  implements Partitioner<ImmutableBytesWritable, V2> {
48    private final Log LOG = LogFactory.getLog(TableInputFormat.class);
49    private HTable table;
50    private byte[][] startKeys;
51  
52    public void configure(JobConf job) {
53      try {
54        this.table = new HTable(HBaseConfiguration.create(job),
55          job.get(TableOutputFormat.OUTPUT_TABLE));
56      } catch (IOException e) {
57        LOG.error(e);
58      }
59  
60      try {
61        this.startKeys = this.table.getStartKeys();
62      } catch (IOException e) {
63        LOG.error(e);
64      }
65    }
66  
67    public int getPartition(ImmutableBytesWritable key,
68        V2 value, int numPartitions) {
69      byte[] region = null;
70      // Only one region return 0
71      if (this.startKeys.length == 1){
72        return 0;
73      }
74      try {
75        // Not sure if this is cached after a split so we could have problems
76        // here if a region splits while mapping
77        region = table.getRegionLocation(key.get()).getRegionInfo().getStartKey();
78      } catch (IOException e) {
79        LOG.error(e);
80      }
81      for (int i = 0; i < this.startKeys.length; i++){
82        if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){
83          if (i >= numPartitions-1){
84            // cover if we have less reduces then regions.
85            return (Integer.toString(i).hashCode()
86                & Integer.MAX_VALUE) % numPartitions;
87          }
88          return i;
89        }
90      }
91      // if above fails to find start key that match we need to return something
92      return 0;
93    }
94  }