1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.io.UnsupportedEncodingException;
22 import java.net.URLDecoder;
23 import java.net.URLEncoder;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.TreeMap;
29 import java.util.TreeSet;
30 import java.util.UUID;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.classification.InterfaceAudience;
35 import org.apache.hadoop.classification.InterfaceStability;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.Cell;
40 import org.apache.hadoop.hbase.CellUtil;
41 import org.apache.hadoop.hbase.HColumnDescriptor;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.HTableDescriptor;
44 import org.apache.hadoop.hbase.KeyValue;
45 import org.apache.hadoop.hbase.KeyValueUtil;
46 import org.apache.hadoop.hbase.client.HTable;
47 import org.apache.hadoop.hbase.client.Put;
48 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
49 import org.apache.hadoop.hbase.io.compress.Compression;
50 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
51 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
52 import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
53 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
54 import org.apache.hadoop.hbase.io.hfile.HFileContext;
55 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
56 import org.apache.hadoop.hbase.regionserver.BloomType;
57 import org.apache.hadoop.hbase.regionserver.HStore;
58 import org.apache.hadoop.hbase.regionserver.StoreFile;
59 import org.apache.hadoop.hbase.util.Bytes;
60 import org.apache.hadoop.io.NullWritable;
61 import org.apache.hadoop.io.SequenceFile;
62 import org.apache.hadoop.io.Text;
63 import org.apache.hadoop.mapreduce.Job;
64 import org.apache.hadoop.mapreduce.OutputFormat;
65 import org.apache.hadoop.mapreduce.RecordWriter;
66 import org.apache.hadoop.mapreduce.TaskAttemptContext;
67 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
68 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
69 import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
70
71 import com.google.common.annotations.VisibleForTesting;
72
73
74
75
76
77
78
79
80
81
82 @InterfaceAudience.Public
83 @InterfaceStability.Evolving
84 public class HFileOutputFormat2
85 extends FileOutputFormat<ImmutableBytesWritable, Cell> {
86 static Log LOG = LogFactory.getLog(HFileOutputFormat2.class);
87
88
89
90
91
92 private static final String COMPRESSION_FAMILIES_CONF_KEY =
93 "hbase.hfileoutputformat.families.compression";
94 private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
95 "hbase.hfileoutputformat.families.bloomtype";
96 private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
97 "hbase.mapreduce.hfileoutputformat.blocksize";
98 private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
99 "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
100
101
102
103
104
105 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
106 "hbase.mapreduce.hfileoutputformat.datablock.encoding";
107
108 public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
109 final TaskAttemptContext context) throws IOException, InterruptedException {
110 return createRecordWriter(context);
111 }
112
113 static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
114 createRecordWriter(final TaskAttemptContext context)
115 throws IOException, InterruptedException {
116
117
118 final Path outputPath = FileOutputFormat.getOutputPath(context);
119 final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
120 final Configuration conf = context.getConfiguration();
121 final FileSystem fs = outputdir.getFileSystem(conf);
122
123 final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
124 HConstants.DEFAULT_MAX_FILE_SIZE);
125
126 final String defaultCompressionStr = conf.get("hfile.compression",
127 Compression.Algorithm.NONE.getName());
128 final Algorithm defaultCompression = AbstractHFileWriter
129 .compressionByName(defaultCompressionStr);
130 final boolean compactionExclude = conf.getBoolean(
131 "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
132
133
134 final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
135 final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
136 final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
137
138 String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
139 final Map<byte[], DataBlockEncoding> datablockEncodingMap
140 = createFamilyDataBlockEncodingMap(conf);
141 final DataBlockEncoding overriddenEncoding;
142 if (dataBlockEncodingStr != null) {
143 overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
144 } else {
145 overriddenEncoding = null;
146 }
147
148 return new RecordWriter<ImmutableBytesWritable, V>() {
149
150 private final Map<byte [], WriterLength> writers =
151 new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
152 private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
153 private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
154 private boolean rollRequested = false;
155
156 public void write(ImmutableBytesWritable row, V cell)
157 throws IOException {
158 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
159
160
161 if (row == null && kv == null) {
162 rollWriters();
163 return;
164 }
165
166 byte [] rowKey = CellUtil.cloneRow(kv);
167 long length = kv.getLength();
168 byte [] family = CellUtil.cloneFamily(kv);
169 WriterLength wl = this.writers.get(family);
170
171
172 if (wl == null) {
173 fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
174 }
175
176
177
178 if (wl != null && wl.written + length >= maxsize) {
179 this.rollRequested = true;
180 }
181
182
183 if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
184 rollWriters();
185 }
186
187
188 if (wl == null || wl.writer == null) {
189 wl = getNewWriter(family, conf);
190 }
191
192
193 kv.updateLatestStamp(this.now);
194 wl.writer.append(kv);
195 wl.written += length;
196
197
198 this.previousRow = rowKey;
199 }
200
201 private void rollWriters() throws IOException {
202 for (WriterLength wl : this.writers.values()) {
203 if (wl.writer != null) {
204 LOG.info("Writer=" + wl.writer.getPath() +
205 ((wl.written == 0)? "": ", wrote=" + wl.written));
206 close(wl.writer);
207 }
208 wl.writer = null;
209 wl.written = 0;
210 }
211 this.rollRequested = false;
212 }
213
214
215
216
217
218
219 private WriterLength getNewWriter(byte[] family, Configuration conf)
220 throws IOException {
221 WriterLength wl = new WriterLength();
222 Path familydir = new Path(outputdir, Bytes.toString(family));
223 Algorithm compression = compressionMap.get(family);
224 compression = compression == null ? defaultCompression : compression;
225 BloomType bloomType = bloomTypeMap.get(family);
226 bloomType = bloomType == null ? BloomType.NONE : bloomType;
227 Integer blockSize = blockSizeMap.get(family);
228 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
229 DataBlockEncoding encoding = overriddenEncoding;
230 encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
231 encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
232 Configuration tempConf = new Configuration(conf);
233 tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
234 HFileContextBuilder contextBuilder = new HFileContextBuilder()
235 .withCompression(compression)
236 .withChecksumType(HStore.getChecksumType(conf))
237 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
238 .withBlockSize(blockSize);
239 contextBuilder.withDataBlockEncoding(encoding);
240 HFileContext hFileContext = contextBuilder.build();
241
242 wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
243 .withOutputDir(familydir).withBloomType(bloomType)
244 .withComparator(KeyValue.COMPARATOR)
245 .withFileContext(hFileContext).build();
246
247 this.writers.put(family, wl);
248 return wl;
249 }
250
251 private void close(final StoreFile.Writer w) throws IOException {
252 if (w != null) {
253 w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
254 Bytes.toBytes(System.currentTimeMillis()));
255 w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
256 Bytes.toBytes(context.getTaskAttemptID().toString()));
257 w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
258 Bytes.toBytes(true));
259 w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
260 Bytes.toBytes(compactionExclude));
261 w.appendTrackedTimestampsToMetadata();
262 w.close();
263 }
264 }
265
266 public void close(TaskAttemptContext c)
267 throws IOException, InterruptedException {
268 for (WriterLength wl: this.writers.values()) {
269 close(wl.writer);
270 }
271 }
272 };
273 }
274
275
276
277
278 static class WriterLength {
279 long written = 0;
280 StoreFile.Writer writer = null;
281 }
282
283
284
285
286
287 private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
288 throws IOException {
289 byte[][] byteKeys = table.getStartKeys();
290 ArrayList<ImmutableBytesWritable> ret =
291 new ArrayList<ImmutableBytesWritable>(byteKeys.length);
292 for (byte[] byteKey : byteKeys) {
293 ret.add(new ImmutableBytesWritable(byteKey));
294 }
295 return ret;
296 }
297
298
299
300
301
302 @SuppressWarnings("deprecation")
303 private static void writePartitions(Configuration conf, Path partitionsPath,
304 List<ImmutableBytesWritable> startKeys) throws IOException {
305 LOG.info("Writing partition information to " + partitionsPath);
306 if (startKeys.isEmpty()) {
307 throw new IllegalArgumentException("No regions passed");
308 }
309
310
311
312
313
314 TreeSet<ImmutableBytesWritable> sorted =
315 new TreeSet<ImmutableBytesWritable>(startKeys);
316
317 ImmutableBytesWritable first = sorted.first();
318 if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
319 throw new IllegalArgumentException(
320 "First region of table should have empty start key. Instead has: "
321 + Bytes.toStringBinary(first.get()));
322 }
323 sorted.remove(first);
324
325
326 FileSystem fs = partitionsPath.getFileSystem(conf);
327 SequenceFile.Writer writer = SequenceFile.createWriter(
328 fs, conf, partitionsPath, ImmutableBytesWritable.class,
329 NullWritable.class);
330
331 try {
332 for (ImmutableBytesWritable startKey : sorted) {
333 writer.append(startKey, NullWritable.get());
334 }
335 } finally {
336 writer.close();
337 }
338 }
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354 public static void configureIncrementalLoad(Job job, HTable table)
355 throws IOException {
356 configureIncrementalLoad(job, table, HFileOutputFormat2.class);
357 }
358
359 static void configureIncrementalLoad(Job job, HTable table,
360 Class<? extends OutputFormat<?, ?>> cls) throws IOException {
361 Configuration conf = job.getConfiguration();
362
363 job.setOutputKeyClass(ImmutableBytesWritable.class);
364 job.setOutputValueClass(KeyValue.class);
365 job.setOutputFormatClass(cls);
366
367
368
369
370 if (KeyValue.class.equals(job.getMapOutputValueClass())) {
371 job.setReducerClass(KeyValueSortReducer.class);
372 } else if (Put.class.equals(job.getMapOutputValueClass())) {
373 job.setReducerClass(PutSortReducer.class);
374 } else if (Text.class.equals(job.getMapOutputValueClass())) {
375 job.setReducerClass(TextSortReducer.class);
376 } else {
377 LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
378 }
379
380 conf.setStrings("io.serializations", conf.get("io.serializations"),
381 MutationSerialization.class.getName(), ResultSerialization.class.getName(),
382 KeyValueSerialization.class.getName());
383
384
385 LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName()));
386 List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
387 LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
388 "to match current region count");
389 job.setNumReduceTasks(startKeys.size());
390
391 configurePartitioner(job, startKeys);
392
393 configureCompression(table, conf);
394 configureBloomType(table, conf);
395 configureBlockSize(table, conf);
396 configureDataBlockEncoding(table, conf);
397
398 TableMapReduceUtil.addDependencyJars(job);
399 TableMapReduceUtil.initCredentials(job);
400 LOG.info("Incremental table " + Bytes.toString(table.getTableName())
401 + " output configured.");
402 }
403
404
405
406
407
408
409
410
411 @VisibleForTesting
412 static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
413 conf) {
414 Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
415 COMPRESSION_FAMILIES_CONF_KEY);
416 Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],
417 Algorithm>(Bytes.BYTES_COMPARATOR);
418 for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
419 Algorithm algorithm = AbstractHFileWriter.compressionByName
420 (e.getValue());
421 compressionMap.put(e.getKey(), algorithm);
422 }
423 return compressionMap;
424 }
425
426
427
428
429
430
431
432
433 @VisibleForTesting
434 static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
435 Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
436 BLOOM_TYPE_FAMILIES_CONF_KEY);
437 Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[],
438 BloomType>(Bytes.BYTES_COMPARATOR);
439 for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
440 BloomType bloomType = BloomType.valueOf(e.getValue());
441 bloomTypeMap.put(e.getKey(), bloomType);
442 }
443 return bloomTypeMap;
444 }
445
446
447
448
449
450
451
452
453 @VisibleForTesting
454 static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
455 Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
456 BLOCK_SIZE_FAMILIES_CONF_KEY);
457 Map<byte[], Integer> blockSizeMap = new TreeMap<byte[],
458 Integer>(Bytes.BYTES_COMPARATOR);
459 for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
460 Integer blockSize = Integer.parseInt(e.getValue());
461 blockSizeMap.put(e.getKey(), blockSize);
462 }
463 return blockSizeMap;
464 }
465
466
467
468
469
470
471
472
473
474 @VisibleForTesting
475 static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
476 Configuration conf) {
477 Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
478 DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
479 Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[],
480 DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
481 for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
482 encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
483 }
484 return encoderMap;
485 }
486
487
488
489
490
491
492
493
494
495 private static Map<byte[], String> createFamilyConfValueMap(
496 Configuration conf, String confName) {
497 Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
498 String confVal = conf.get(confName, "");
499 for (String familyConf : confVal.split("&")) {
500 String[] familySplit = familyConf.split("=");
501 if (familySplit.length != 2) {
502 continue;
503 }
504 try {
505 confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
506 URLDecoder.decode(familySplit[1], "UTF-8"));
507 } catch (UnsupportedEncodingException e) {
508
509 throw new AssertionError(e);
510 }
511 }
512 return confValMap;
513 }
514
515
516
517
518
519 static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
520 throws IOException {
521
522
523 FileSystem fs = FileSystem.get(job.getConfiguration());
524 Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID());
525 fs.makeQualified(partitionsPath);
526 fs.deleteOnExit(partitionsPath);
527 writePartitions(job.getConfiguration(), partitionsPath, splitPoints);
528
529
530 job.setPartitionerClass(TotalOrderPartitioner.class);
531 TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath);
532 }
533
534
535
536
537
538
539
540
541
542
543 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
544 value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
545 @VisibleForTesting
546 static void configureCompression(
547 HTable table, Configuration conf) throws IOException {
548 StringBuilder compressionConfigValue = new StringBuilder();
549 HTableDescriptor tableDescriptor = table.getTableDescriptor();
550 if(tableDescriptor == null){
551
552 return;
553 }
554 Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
555 int i = 0;
556 for (HColumnDescriptor familyDescriptor : families) {
557 if (i++ > 0) {
558 compressionConfigValue.append('&');
559 }
560 compressionConfigValue.append(URLEncoder.encode(
561 familyDescriptor.getNameAsString(), "UTF-8"));
562 compressionConfigValue.append('=');
563 compressionConfigValue.append(URLEncoder.encode(
564 familyDescriptor.getCompression().getName(), "UTF-8"));
565 }
566
567 conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
568 }
569
570
571
572
573
574
575
576
577
578
579 @VisibleForTesting
580 static void configureBlockSize(
581 HTable table, Configuration conf) throws IOException {
582 StringBuilder blockSizeConfigValue = new StringBuilder();
583 HTableDescriptor tableDescriptor = table.getTableDescriptor();
584 if (tableDescriptor == null) {
585
586 return;
587 }
588 Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
589 int i = 0;
590 for (HColumnDescriptor familyDescriptor : families) {
591 if (i++ > 0) {
592 blockSizeConfigValue.append('&');
593 }
594 blockSizeConfigValue.append(URLEncoder.encode(
595 familyDescriptor.getNameAsString(), "UTF-8"));
596 blockSizeConfigValue.append('=');
597 blockSizeConfigValue.append(URLEncoder.encode(
598 String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
599 }
600
601 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
602 }
603
604
605
606
607
608
609
610
611
612
613 @VisibleForTesting
614 static void configureBloomType(
615 HTable table, Configuration conf) throws IOException {
616 HTableDescriptor tableDescriptor = table.getTableDescriptor();
617 if (tableDescriptor == null) {
618
619 return;
620 }
621 StringBuilder bloomTypeConfigValue = new StringBuilder();
622 Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
623 int i = 0;
624 for (HColumnDescriptor familyDescriptor : families) {
625 if (i++ > 0) {
626 bloomTypeConfigValue.append('&');
627 }
628 bloomTypeConfigValue.append(URLEncoder.encode(
629 familyDescriptor.getNameAsString(), "UTF-8"));
630 bloomTypeConfigValue.append('=');
631 String bloomType = familyDescriptor.getBloomFilterType().toString();
632 if (bloomType == null) {
633 bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
634 }
635 bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
636 }
637 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
638 }
639
640
641
642
643
644
645
646
647
648
649 @VisibleForTesting
650 static void configureDataBlockEncoding(HTable table,
651 Configuration conf) throws IOException {
652 HTableDescriptor tableDescriptor = table.getTableDescriptor();
653 if (tableDescriptor == null) {
654
655 return;
656 }
657 StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
658 Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
659 int i = 0;
660 for (HColumnDescriptor familyDescriptor : families) {
661 if (i++ > 0) {
662 dataBlockEncodingConfigValue.append('&');
663 }
664 dataBlockEncodingConfigValue.append(
665 URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
666 dataBlockEncodingConfigValue.append('=');
667 DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
668 if (encoding == null) {
669 encoding = DataBlockEncoding.NONE;
670 }
671 dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(),
672 "UTF-8"));
673 }
674 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
675 dataBlockEncodingConfigValue.toString());
676 }
677 }