1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.classification.InterfaceAudience;
26 import org.apache.hadoop.classification.InterfaceStability;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.hbase.HBaseConfiguration;
29 import org.apache.hadoop.hbase.client.HTable;
30 import org.apache.hadoop.hbase.client.Put;
31 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32 import org.apache.hadoop.fs.FileAlreadyExistsException;
33 import org.apache.hadoop.mapred.InvalidJobConfException;
34 import org.apache.hadoop.mapred.JobConf;
35 import org.apache.hadoop.mapred.FileOutputFormat;
36 import org.apache.hadoop.mapred.RecordWriter;
37 import org.apache.hadoop.mapred.Reporter;
38 import org.apache.hadoop.util.Progressable;
39
40
41
42
43 @Deprecated
44 @InterfaceAudience.Public
45 @InterfaceStability.Stable
46 public class TableOutputFormat extends
47 FileOutputFormat<ImmutableBytesWritable, Put> {
48
49
50 public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
51 private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
52
53
54
55
56
57 protected static class TableRecordWriter
58 implements RecordWriter<ImmutableBytesWritable, Put> {
59 private HTable m_table;
60
61
62
63
64
65
66 public TableRecordWriter(HTable table) {
67 m_table = table;
68 }
69
70 public void close(Reporter reporter)
71 throws IOException {
72 m_table.close();
73 }
74
75 public void write(ImmutableBytesWritable key,
76 Put value) throws IOException {
77 m_table.put(new Put(value));
78 }
79 }
80
81 @Override
82 @SuppressWarnings("unchecked")
83 public RecordWriter getRecordWriter(FileSystem ignored,
84 JobConf job, String name, Progressable progress) throws IOException {
85
86
87
88 String tableName = job.get(OUTPUT_TABLE);
89 HTable table = null;
90 try {
91 table = new HTable(HBaseConfiguration.create(job), tableName);
92 } catch(IOException e) {
93 LOG.error(e);
94 throw e;
95 }
96 table.setAutoFlush(false, true);
97 return new TableRecordWriter(table);
98 }
99
100 @Override
101 public void checkOutputSpecs(FileSystem ignored, JobConf job)
102 throws FileAlreadyExistsException, InvalidJobConfException, IOException {
103
104 String tableName = job.get(OUTPUT_TABLE);
105 if(tableName == null) {
106 throw new IOException("Must specify table name");
107 }
108 }
109 }