1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22 import java.util.HashMap;
23 import java.util.Map;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.classification.InterfaceAudience;
28 import org.apache.hadoop.classification.InterfaceStability;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.HBaseConfiguration;
31 import org.apache.hadoop.hbase.client.Delete;
32 import org.apache.hadoop.hbase.client.HTable;
33 import org.apache.hadoop.hbase.client.Mutation;
34 import org.apache.hadoop.hbase.client.Put;
35 import org.apache.hadoop.hbase.client.Durability;
36 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.mapreduce.JobContext;
39 import org.apache.hadoop.mapreduce.OutputCommitter;
40 import org.apache.hadoop.mapreduce.OutputFormat;
41 import org.apache.hadoop.mapreduce.RecordWriter;
42 import org.apache.hadoop.mapreduce.TaskAttemptContext;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 @InterfaceAudience.Public
61 @InterfaceStability.Stable
62 public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Mutation> {
63
64 public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal";
65
66 public static final boolean WAL_ON = true;
67
68 public static final boolean WAL_OFF = false;
69
70
71
72 protected static class MultiTableRecordWriter extends
73 RecordWriter<ImmutableBytesWritable, Mutation> {
74 private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class);
75 Map<ImmutableBytesWritable, HTable> tables;
76 Configuration conf;
77 boolean useWriteAheadLogging;
78
79
80
81
82
83
84
85
86 public MultiTableRecordWriter(Configuration conf,
87 boolean useWriteAheadLogging) {
88 LOG.debug("Created new MultiTableRecordReader with WAL "
89 + (useWriteAheadLogging ? "on" : "off"));
90 this.tables = new HashMap<ImmutableBytesWritable, HTable>();
91 this.conf = conf;
92 this.useWriteAheadLogging = useWriteAheadLogging;
93 }
94
95
96
97
98
99
100
101
102 HTable getTable(ImmutableBytesWritable tableName) throws IOException {
103 if (!tables.containsKey(tableName)) {
104 LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing");
105 HTable table = new HTable(conf, tableName.get());
106 table.setAutoFlush(false, true);
107 tables.put(tableName, table);
108 }
109 return tables.get(tableName);
110 }
111
112 @Override
113 public void close(TaskAttemptContext context) throws IOException {
114 for (HTable table : tables.values()) {
115 table.flushCommits();
116 }
117 }
118
119
120
121
122
123
124
125
126
127
128
129 @Override
130 public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException {
131 HTable table = getTable(tableName);
132
133 if (action instanceof Put) {
134 Put put = new Put((Put) action);
135 put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL
136 : Durability.SKIP_WAL);
137 table.put(put);
138 } else if (action instanceof Delete) {
139 Delete delete = new Delete((Delete) action);
140 table.delete(delete);
141 } else
142 throw new IllegalArgumentException(
143 "action must be either Delete or Put");
144 }
145 }
146
147 @Override
148 public void checkOutputSpecs(JobContext context) throws IOException,
149 InterruptedException {
150
151
152 }
153
154 @Override
155 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
156 throws IOException, InterruptedException {
157 return new TableOutputCommitter();
158 }
159
160 @Override
161 public RecordWriter<ImmutableBytesWritable, Mutation> getRecordWriter(TaskAttemptContext context)
162 throws IOException, InterruptedException {
163 Configuration conf = context.getConfiguration();
164 return new MultiTableRecordWriter(HBaseConfiguration.create(conf),
165 conf.getBoolean(WAL_PROPERTY, WAL_ON));
166 }
167
168 }