1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.text.MessageFormat;
22 import java.util.ArrayList;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.classification.InterfaceAudience;
28 import org.apache.hadoop.classification.InterfaceStability;
29 import org.apache.hadoop.hbase.HRegionInfo;
30 import org.apache.hadoop.hbase.HRegionLocation;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.client.Result;
33 import org.apache.hadoop.hbase.client.Scan;
34 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.util.Pair;
37 import org.apache.hadoop.hbase.util.RegionSizeCalculator;
38 import org.apache.hadoop.mapreduce.InputFormat;
39 import org.apache.hadoop.mapreduce.InputSplit;
40 import org.apache.hadoop.mapreduce.JobContext;
41 import org.apache.hadoop.mapreduce.RecordReader;
42 import org.apache.hadoop.mapreduce.TaskAttemptContext;
43
44
45
46
47
48
49 @InterfaceAudience.Public
50 @InterfaceStability.Evolving
51 public abstract class MultiTableInputFormatBase extends
52 InputFormat<ImmutableBytesWritable, Result> {
53
54 final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class);
55
56
57 private List<Scan> scans;
58
59
60 private TableRecordReader tableRecordReader = null;
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 @Override
76 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
77 InputSplit split, TaskAttemptContext context)
78 throws IOException, InterruptedException {
79 TableSplit tSplit = (TableSplit) split;
80 LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength()));
81
82 if (tSplit.getTableName() == null) {
83 throw new IOException("Cannot create a record reader because of a"
84 + " previous error. Please look at the previous logs lines from"
85 + " the task's full log for more details.");
86 }
87 HTable table =
88 new HTable(context.getConfiguration(), tSplit.getTableName());
89
90 TableRecordReader trr = this.tableRecordReader;
91
92 try {
93
94 if (trr == null) {
95 trr = new TableRecordReader();
96 }
97 Scan sc = tSplit.getScan();
98 sc.setStartRow(tSplit.getStartRow());
99 sc.setStopRow(tSplit.getEndRow());
100 trr.setScan(sc);
101 trr.setHTable(table);
102 } catch (IOException ioe) {
103
104
105 table.close();
106 trr.close();
107 throw ioe;
108 }
109 return trr;
110 }
111
112
113
114
115
116
117
118
119
120
121 @Override
122 public List<InputSplit> getSplits(JobContext context) throws IOException {
123 if (scans.isEmpty()) {
124 throw new IOException("No scans were provided.");
125 }
126 List<InputSplit> splits = new ArrayList<InputSplit>();
127
128 for (Scan scan : scans) {
129 byte[] tableName = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
130 if (tableName == null)
131 throw new IOException("A scan object did not have a table name");
132
133 HTable table = null;
134 try {
135 table = new HTable(context.getConfiguration(), tableName);
136 Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
137 if (keys == null || keys.getFirst() == null ||
138 keys.getFirst().length == 0) {
139 throw new IOException("Expecting at least one region for table : "
140 + Bytes.toString(tableName));
141 }
142 int count = 0;
143
144 byte[] startRow = scan.getStartRow();
145 byte[] stopRow = scan.getStopRow();
146
147 RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(table);
148
149 for (int i = 0; i < keys.getFirst().length; i++) {
150 if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
151 continue;
152 }
153 HRegionLocation hregionLocation = table.getRegionLocation(keys.getFirst()[i], false);
154 String regionHostname = hregionLocation.getHostname();
155 HRegionInfo regionInfo = hregionLocation.getRegionInfo();
156
157
158 if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
159 Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
160 (stopRow.length == 0 ||
161 Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
162 byte[] splitStart =
163 startRow.length == 0 ||
164 Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys
165 .getFirst()[i] : startRow;
166 byte[] splitStop =
167 (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i],
168 stopRow) <= 0) && keys.getSecond()[i].length > 0 ? keys
169 .getSecond()[i] : stopRow;
170
171 long regionSize = sizeCalculator.getRegionSize(regionInfo.getRegionName());
172 TableSplit split =
173 new TableSplit(table.getName(),
174 scan, splitStart, splitStop, regionHostname, regionSize);
175
176 splits.add(split);
177 if (LOG.isDebugEnabled())
178 LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
179 }
180 }
181 } finally {
182 if (null != table) table.close();
183 }
184 }
185 return splits;
186 }
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210 protected boolean includeRegionInSplit(final byte[] startKey,
211 final byte[] endKey) {
212 return true;
213 }
214
215
216
217
218 protected List<Scan> getScans() {
219 return this.scans;
220 }
221
222
223
224
225
226
227 protected void setScans(List<Scan> scans) {
228 this.scans = scans;
229 }
230
231
232
233
234
235
236
237 protected void setTableRecordReader(TableRecordReader tableRecordReader) {
238 this.tableRecordReader = tableRecordReader;
239 }
240 }