1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.classification.InterfaceAudience;
26 import org.apache.hadoop.classification.InterfaceStability;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.client.HTable;
29 import org.apache.hadoop.hbase.client.Result;
30 import org.apache.hadoop.hbase.filter.Filter;
31 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32 import org.apache.hadoop.hbase.regionserver.HRegion;
33 import org.apache.hadoop.mapred.InputFormat;
34 import org.apache.hadoop.mapred.InputSplit;
35 import org.apache.hadoop.mapred.JobConf;
36 import org.apache.hadoop.mapred.RecordReader;
37 import org.apache.hadoop.mapred.Reporter;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 @Deprecated
69 @InterfaceAudience.Public
70 @InterfaceStability.Stable
71 public abstract class TableInputFormatBase
72 implements InputFormat<ImmutableBytesWritable, Result> {
73 final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
74 private byte [][] inputColumns;
75 private HTable table;
76 private TableRecordReader tableRecordReader;
77 private Filter rowFilter;
78
79
80
81
82
83
84
85
86 public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
87 InputSplit split, JobConf job, Reporter reporter)
88 throws IOException {
89 TableSplit tSplit = (TableSplit) split;
90 TableRecordReader trr = this.tableRecordReader;
91
92 if (trr == null) {
93 trr = new TableRecordReader();
94 }
95 trr.setStartRow(tSplit.getStartRow());
96 trr.setEndRow(tSplit.getEndRow());
97 trr.setHTable(this.table);
98 trr.setInputColumns(this.inputColumns);
99 trr.setRowFilter(this.rowFilter);
100 trr.init();
101 return trr;
102 }
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
122 if (this.table == null) {
123 throw new IOException("No table was provided");
124 }
125 byte [][] startKeys = this.table.getStartKeys();
126 if (startKeys == null || startKeys.length == 0) {
127 throw new IOException("Expecting at least one region");
128 }
129 if (this.inputColumns == null || this.inputColumns.length == 0) {
130 throw new IOException("Expecting at least one column");
131 }
132 int realNumSplits = numSplits > startKeys.length? startKeys.length:
133 numSplits;
134 InputSplit[] splits = new InputSplit[realNumSplits];
135 int middle = startKeys.length / realNumSplits;
136 int startPos = 0;
137 for (int i = 0; i < realNumSplits; i++) {
138 int lastPos = startPos + middle;
139 lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
140 String regionLocation = table.getRegionLocation(startKeys[startPos]).
141 getHostname();
142 splits[i] = new TableSplit(this.table.getName(),
143 startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
144 HConstants.EMPTY_START_ROW, regionLocation);
145 LOG.info("split: " + i + "->" + splits[i]);
146 startPos = lastPos;
147 }
148 return splits;
149 }
150
151
152
153
154 protected void setInputColumns(byte [][] inputColumns) {
155 this.inputColumns = inputColumns;
156 }
157
158
159
160
161 protected HTable getHTable() {
162 return this.table;
163 }
164
165
166
167
168
169
170 protected void setHTable(HTable table) {
171 this.table = table;
172 }
173
174
175
176
177
178
179
180 protected void setTableRecordReader(TableRecordReader tableRecordReader) {
181 this.tableRecordReader = tableRecordReader;
182 }
183
184
185
186
187
188
189 protected void setRowFilter(Filter rowFilter) {
190 this.rowFilter = rowFilter;
191 }
192 }