1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.classification.InterfaceAudience;
26 import org.apache.hadoop.classification.InterfaceStability;
27 import org.apache.hadoop.conf.Configurable;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.KeyValue;
30 import org.apache.hadoop.hbase.client.HTable;
31 import org.apache.hadoop.hbase.client.Scan;
32 import org.apache.hadoop.hbase.util.Bytes;
33 import org.apache.hadoop.util.StringUtils;
34
35
36
37
38 @InterfaceAudience.Public
39 @InterfaceStability.Stable
40 public class TableInputFormat extends TableInputFormatBase
41 implements Configurable {
42
43 private final Log LOG = LogFactory.getLog(TableInputFormat.class);
44
45
46 public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
47
48
49
50 public static final String SCAN = "hbase.mapreduce.scan";
51
52 public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
53
54 public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
55
56 public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
57
58 public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
59
60 public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
61
62 public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
63
64 public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
65
66 public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
67
68 public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
69
70 public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
71
72 public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
73
74
75 private Configuration conf = null;
76
77
78
79
80
81
82
83 @Override
84 public Configuration getConf() {
85 return conf;
86 }
87
88
89
90
91
92
93
94
95
96 @Override
97 public void setConf(Configuration configuration) {
98 this.conf = configuration;
99 String tableName = conf.get(INPUT_TABLE);
100 try {
101 setHTable(new HTable(new Configuration(conf), tableName));
102 } catch (Exception e) {
103 LOG.error(StringUtils.stringifyException(e));
104 }
105
106 Scan scan = null;
107
108 if (conf.get(SCAN) != null) {
109 try {
110 scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
111 } catch (IOException e) {
112 LOG.error("An error occurred.", e);
113 }
114 } else {
115 try {
116 scan = new Scan();
117
118 if (conf.get(SCAN_ROW_START) != null) {
119 scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START)));
120 }
121
122 if (conf.get(SCAN_ROW_STOP) != null) {
123 scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP)));
124 }
125
126 if (conf.get(SCAN_COLUMNS) != null) {
127 addColumns(scan, conf.get(SCAN_COLUMNS));
128 }
129
130 if (conf.get(SCAN_COLUMN_FAMILY) != null) {
131 scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
132 }
133
134 if (conf.get(SCAN_TIMESTAMP) != null) {
135 scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
136 }
137
138 if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
139 scan.setTimeRange(
140 Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
141 Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
142 }
143
144 if (conf.get(SCAN_MAXVERSIONS) != null) {
145 scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
146 }
147
148 if (conf.get(SCAN_CACHEDROWS) != null) {
149 scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
150 }
151
152 if (conf.get(SCAN_BATCHSIZE) != null) {
153 scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
154 }
155
156
157 scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
158 } catch (Exception e) {
159 LOG.error(StringUtils.stringifyException(e));
160 }
161 }
162
163 setScan(scan);
164 }
165
166
167
168
169
170
171
172
173
174
175
176 private static void addColumn(Scan scan, byte[] familyAndQualifier) {
177 byte [][] fq = KeyValue.parseColumn(familyAndQualifier);
178 if (fq.length == 1) {
179 scan.addFamily(fq[0]);
180 } else if (fq.length == 2) {
181 scan.addColumn(fq[0], fq[1]);
182 } else {
183 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
184 }
185 }
186
187
188
189
190
191
192
193
194
195
196
197 public static void addColumns(Scan scan, byte [][] columns) {
198 for (byte[] column : columns) {
199 addColumn(scan, column);
200 }
201 }
202
203
204
205
206
207
208
209 private static void addColumns(Scan scan, String columns) {
210 String[] cols = columns.split(" ");
211 for (String col : cols) {
212 addColumn(scan, Bytes.toBytes(col));
213 }
214 }
215
216 }