View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.io.UnsupportedEncodingException;
22  import java.net.URLDecoder;
23  import java.net.URLEncoder;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.TreeMap;
29  import java.util.TreeSet;
30  import java.util.UUID;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.classification.InterfaceAudience;
35  import org.apache.hadoop.classification.InterfaceStability;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HColumnDescriptor;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.KeyValueUtil;
46  import org.apache.hadoop.hbase.client.HTable;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
49  import org.apache.hadoop.hbase.io.compress.Compression;
50  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
51  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
52  import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
53  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
54  import org.apache.hadoop.hbase.io.hfile.HFileContext;
55  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
56  import org.apache.hadoop.hbase.regionserver.BloomType;
57  import org.apache.hadoop.hbase.regionserver.HStore;
58  import org.apache.hadoop.hbase.regionserver.StoreFile;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.apache.hadoop.io.NullWritable;
61  import org.apache.hadoop.io.SequenceFile;
62  import org.apache.hadoop.io.Text;
63  import org.apache.hadoop.mapreduce.Job;
64  import org.apache.hadoop.mapreduce.OutputFormat;
65  import org.apache.hadoop.mapreduce.RecordWriter;
66  import org.apache.hadoop.mapreduce.TaskAttemptContext;
67  import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
68  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
69  import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
70  
71  import com.google.common.annotations.VisibleForTesting;
72  
73  /**
74   * Writes HFiles. Passed Cells must arrive in order.
75   * Writes current time as the sequence id for the file. Sets the major compacted
76   * attribute on created hfiles. Calling write(null,null) will forcibly roll
77   * all HFiles being written.
78   * <p>
79   * Using this class as part of a MapReduce job is best done
80   * using {@link #configureIncrementalLoad(Job, HTable)}.
81   */
82  @InterfaceAudience.Public
83  @InterfaceStability.Evolving
84  public class HFileOutputFormat2
85      extends FileOutputFormat<ImmutableBytesWritable, Cell> {
86    static Log LOG = LogFactory.getLog(HFileOutputFormat2.class);
87  
88    // The following constants are private since these are used by
89    // HFileOutputFormat2 to internally transfer data between job setup and
90    // reducer run using conf.
91    // These should not be changed by the client.
92    private static final String COMPRESSION_FAMILIES_CONF_KEY =
93        "hbase.hfileoutputformat.families.compression";
94    private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
95        "hbase.hfileoutputformat.families.bloomtype";
96    private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
97        "hbase.mapreduce.hfileoutputformat.blocksize";
98    private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
99        "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
100 
101   // This constant is public since the client can modify this when setting
102   // up their conf object and thus refer to this symbol.
103   // It is present for backwards compatibility reasons. Use it only to
104   // override the auto-detection of datablock encoding.
105   public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
106       "hbase.mapreduce.hfileoutputformat.datablock.encoding";
107 
108   public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
109       final TaskAttemptContext context) throws IOException, InterruptedException {
110     return createRecordWriter(context);
111   }
112 
113   static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
114       createRecordWriter(final TaskAttemptContext context)
115           throws IOException, InterruptedException {
116 
117     // Get the path of the temporary output file
118     final Path outputPath = FileOutputFormat.getOutputPath(context);
119     final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
120     final Configuration conf = context.getConfiguration();
121     final FileSystem fs = outputdir.getFileSystem(conf);
122     // These configs. are from hbase-*.xml
123     final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
124         HConstants.DEFAULT_MAX_FILE_SIZE);
125     // Invented config.  Add to hbase-*.xml if other than default compression.
126     final String defaultCompressionStr = conf.get("hfile.compression",
127         Compression.Algorithm.NONE.getName());
128     final Algorithm defaultCompression = AbstractHFileWriter
129         .compressionByName(defaultCompressionStr);
130     final boolean compactionExclude = conf.getBoolean(
131         "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
132 
133     // create a map from column family to the compression algorithm
134     final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
135     final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
136     final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
137 
138     String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
139     final Map<byte[], DataBlockEncoding> datablockEncodingMap
140         = createFamilyDataBlockEncodingMap(conf);
141     final DataBlockEncoding overriddenEncoding;
142     if (dataBlockEncodingStr != null) {
143       overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
144     } else {
145       overriddenEncoding = null;
146     }
147 
148     return new RecordWriter<ImmutableBytesWritable, V>() {
149       // Map of families to writers and how much has been output on the writer.
150       private final Map<byte [], WriterLength> writers =
151         new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
152       private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
153       private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
154       private boolean rollRequested = false;
155 
156       public void write(ImmutableBytesWritable row, V cell)
157           throws IOException {
158         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
159 
160         // null input == user explicitly wants to flush
161         if (row == null && kv == null) {
162           rollWriters();
163           return;
164         }
165 
166         byte [] rowKey = CellUtil.cloneRow(kv);
167         long length = kv.getLength();
168         byte [] family = CellUtil.cloneFamily(kv);
169         WriterLength wl = this.writers.get(family);
170 
171         // If this is a new column family, verify that the directory exists
172         if (wl == null) {
173           fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
174         }
175 
176         // If any of the HFiles for the column families has reached
177         // maxsize, we need to roll all the writers
178         if (wl != null && wl.written + length >= maxsize) {
179           this.rollRequested = true;
180         }
181 
182         // This can only happen once a row is finished though
183         if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
184           rollWriters();
185         }
186 
187         // create a new HLog writer, if necessary
188         if (wl == null || wl.writer == null) {
189           wl = getNewWriter(family, conf);
190         }
191 
192         // we now have the proper HLog writer. full steam ahead
193         kv.updateLatestStamp(this.now);
194         wl.writer.append(kv);
195         wl.written += length;
196 
197         // Copy the row so we know when a row transition.
198         this.previousRow = rowKey;
199       }
200 
201       private void rollWriters() throws IOException {
202         for (WriterLength wl : this.writers.values()) {
203           if (wl.writer != null) {
204             LOG.info("Writer=" + wl.writer.getPath() +
205                 ((wl.written == 0)? "": ", wrote=" + wl.written));
206             close(wl.writer);
207           }
208           wl.writer = null;
209           wl.written = 0;
210         }
211         this.rollRequested = false;
212       }
213 
214       /* Create a new StoreFile.Writer.
215        * @param family
216        * @return A WriterLength, containing a new StoreFile.Writer.
217        * @throws IOException
218        */
219       private WriterLength getNewWriter(byte[] family, Configuration conf)
220           throws IOException {
221         WriterLength wl = new WriterLength();
222         Path familydir = new Path(outputdir, Bytes.toString(family));
223         Algorithm compression = compressionMap.get(family);
224         compression = compression == null ? defaultCompression : compression;
225         BloomType bloomType = bloomTypeMap.get(family);
226         bloomType = bloomType == null ? BloomType.NONE : bloomType;
227         Integer blockSize = blockSizeMap.get(family);
228         blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
229         DataBlockEncoding encoding = overriddenEncoding;
230         encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
231         encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
232         Configuration tempConf = new Configuration(conf);
233         tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
234         HFileContextBuilder contextBuilder = new HFileContextBuilder()
235                                     .withCompression(compression)
236                                     .withChecksumType(HStore.getChecksumType(conf))
237                                     .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
238                                     .withBlockSize(blockSize);
239         contextBuilder.withDataBlockEncoding(encoding);
240         HFileContext hFileContext = contextBuilder.build();
241                                     
242         wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
243             .withOutputDir(familydir).withBloomType(bloomType)
244             .withComparator(KeyValue.COMPARATOR)
245             .withFileContext(hFileContext).build();
246 
247         this.writers.put(family, wl);
248         return wl;
249       }
250 
251       private void close(final StoreFile.Writer w) throws IOException {
252         if (w != null) {
253           w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
254               Bytes.toBytes(System.currentTimeMillis()));
255           w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
256               Bytes.toBytes(context.getTaskAttemptID().toString()));
257           w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
258               Bytes.toBytes(true));
259           w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
260               Bytes.toBytes(compactionExclude));
261           w.appendTrackedTimestampsToMetadata();
262           w.close();
263         }
264       }
265 
266       public void close(TaskAttemptContext c)
267       throws IOException, InterruptedException {
268         for (WriterLength wl: this.writers.values()) {
269           close(wl.writer);
270         }
271       }
272     };
273   }
274 
275   /*
276    * Data structure to hold a Writer and amount of data written on it.
277    */
278   static class WriterLength {
279     long written = 0;
280     StoreFile.Writer writer = null;
281   }
282 
283   /**
284    * Return the start keys of all of the regions in this table,
285    * as a list of ImmutableBytesWritable.
286    */
287   private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
288   throws IOException {
289     byte[][] byteKeys = table.getStartKeys();
290     ArrayList<ImmutableBytesWritable> ret =
291       new ArrayList<ImmutableBytesWritable>(byteKeys.length);
292     for (byte[] byteKey : byteKeys) {
293       ret.add(new ImmutableBytesWritable(byteKey));
294     }
295     return ret;
296   }
297 
298   /**
299    * Write out a {@link SequenceFile} that can be read by
300    * {@link TotalOrderPartitioner} that contains the split points in startKeys.
301    */
302   @SuppressWarnings("deprecation")
303   private static void writePartitions(Configuration conf, Path partitionsPath,
304       List<ImmutableBytesWritable> startKeys) throws IOException {
305     LOG.info("Writing partition information to " + partitionsPath);
306     if (startKeys.isEmpty()) {
307       throw new IllegalArgumentException("No regions passed");
308     }
309 
310     // We're generating a list of split points, and we don't ever
311     // have keys < the first region (which has an empty start key)
312     // so we need to remove it. Otherwise we would end up with an
313     // empty reducer with index 0
314     TreeSet<ImmutableBytesWritable> sorted =
315       new TreeSet<ImmutableBytesWritable>(startKeys);
316 
317     ImmutableBytesWritable first = sorted.first();
318     if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
319       throw new IllegalArgumentException(
320           "First region of table should have empty start key. Instead has: "
321           + Bytes.toStringBinary(first.get()));
322     }
323     sorted.remove(first);
324 
325     // Write the actual file
326     FileSystem fs = partitionsPath.getFileSystem(conf);
327     SequenceFile.Writer writer = SequenceFile.createWriter(
328       fs, conf, partitionsPath, ImmutableBytesWritable.class,
329       NullWritable.class);
330 
331     try {
332       for (ImmutableBytesWritable startKey : sorted) {
333         writer.append(startKey, NullWritable.get());
334       }
335     } finally {
336       writer.close();
337     }
338   }
339 
340   /**
341    * Configure a MapReduce Job to perform an incremental load into the given
342    * table. This
343    * <ul>
344    *   <li>Inspects the table to configure a total order partitioner</li>
345    *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
346    *   <li>Sets the number of reduce tasks to match the current number of regions</li>
347    *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
348    *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
349    *     PutSortReducer)</li>
350    * </ul>
351    * The user should be sure to set the map output value class to either KeyValue or Put before
352    * running this function.
353    */
354   public static void configureIncrementalLoad(Job job, HTable table)
355       throws IOException {
356     configureIncrementalLoad(job, table, HFileOutputFormat2.class);
357   }
358 
359   static void configureIncrementalLoad(Job job, HTable table,
360       Class<? extends OutputFormat<?, ?>> cls) throws IOException {
361     Configuration conf = job.getConfiguration();
362 
363     job.setOutputKeyClass(ImmutableBytesWritable.class);
364     job.setOutputValueClass(KeyValue.class);
365     job.setOutputFormatClass(cls);
366 
367     // Based on the configured map output class, set the correct reducer to properly
368     // sort the incoming values.
369     // TODO it would be nice to pick one or the other of these formats.
370     if (KeyValue.class.equals(job.getMapOutputValueClass())) {
371       job.setReducerClass(KeyValueSortReducer.class);
372     } else if (Put.class.equals(job.getMapOutputValueClass())) {
373       job.setReducerClass(PutSortReducer.class);
374     } else if (Text.class.equals(job.getMapOutputValueClass())) {
375       job.setReducerClass(TextSortReducer.class);
376     } else {
377       LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
378     }
379 
380     conf.setStrings("io.serializations", conf.get("io.serializations"),
381         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
382         KeyValueSerialization.class.getName());
383 
384     // Use table's region boundaries for TOP split points.
385     LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName()));
386     List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
387     LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
388         "to match current region count");
389     job.setNumReduceTasks(startKeys.size());
390 
391     configurePartitioner(job, startKeys);
392     // Set compression algorithms based on column families
393     configureCompression(table, conf);
394     configureBloomType(table, conf);
395     configureBlockSize(table, conf);
396     configureDataBlockEncoding(table, conf);
397 
398     TableMapReduceUtil.addDependencyJars(job);
399     TableMapReduceUtil.initCredentials(job);
400     LOG.info("Incremental table " + Bytes.toString(table.getTableName())
401       + " output configured.");
402   }
403 
404   /**
405    * Runs inside the task to deserialize column family to compression algorithm
406    * map from the configuration.
407    *
408    * @param conf to read the serialized values from
409    * @return a map from column family to the configured compression algorithm
410    */
411   @VisibleForTesting
412   static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
413       conf) {
414     Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
415         COMPRESSION_FAMILIES_CONF_KEY);
416     Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],
417         Algorithm>(Bytes.BYTES_COMPARATOR);
418     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
419       Algorithm algorithm = AbstractHFileWriter.compressionByName
420           (e.getValue());
421       compressionMap.put(e.getKey(), algorithm);
422     }
423     return compressionMap;
424   }
425 
426   /**
427    * Runs inside the task to deserialize column family to bloom filter type
428    * map from the configuration.
429    *
430    * @param conf to read the serialized values from
431    * @return a map from column family to the the configured bloom filter type
432    */
433   @VisibleForTesting
434   static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
435     Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
436         BLOOM_TYPE_FAMILIES_CONF_KEY);
437     Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[],
438         BloomType>(Bytes.BYTES_COMPARATOR);
439     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
440       BloomType bloomType = BloomType.valueOf(e.getValue());
441       bloomTypeMap.put(e.getKey(), bloomType);
442     }
443     return bloomTypeMap;
444   }
445 
446   /**
447    * Runs inside the task to deserialize column family to block size
448    * map from the configuration.
449    *
450    * @param conf to read the serialized values from
451    * @return a map from column family to the configured block size
452    */
453   @VisibleForTesting
454   static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
455     Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
456         BLOCK_SIZE_FAMILIES_CONF_KEY);
457     Map<byte[], Integer> blockSizeMap = new TreeMap<byte[],
458         Integer>(Bytes.BYTES_COMPARATOR);
459     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
460       Integer blockSize = Integer.parseInt(e.getValue());
461       blockSizeMap.put(e.getKey(), blockSize);
462     }
463     return blockSizeMap;
464   }
465 
466   /**
467    * Runs inside the task to deserialize column family to data block encoding
468    * type map from the configuration.
469    *
470    * @param conf to read the serialized values from
471    * @return a map from column family to HFileDataBlockEncoder for the
472    *         configured data block type for the family
473    */
474   @VisibleForTesting
475   static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
476       Configuration conf) {
477     Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
478         DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
479     Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[],
480         DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
481     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
482       encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
483     }
484     return encoderMap;
485   }
486 
487 
488   /**
489    * Run inside the task to deserialize column family to given conf value map.
490    *
491    * @param conf to read the serialized values from
492    * @param confName conf key to read from the configuration
493    * @return a map of column family to the given configuration value
494    */
495   private static Map<byte[], String> createFamilyConfValueMap(
496       Configuration conf, String confName) {
497     Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
498     String confVal = conf.get(confName, "");
499     for (String familyConf : confVal.split("&")) {
500       String[] familySplit = familyConf.split("=");
501       if (familySplit.length != 2) {
502         continue;
503       }
504       try {
505         confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
506             URLDecoder.decode(familySplit[1], "UTF-8"));
507       } catch (UnsupportedEncodingException e) {
508         // will not happen with UTF-8 encoding
509         throw new AssertionError(e);
510       }
511     }
512     return confValMap;
513   }
514 
515   /**
516    * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
517    * <code>splitPoints</code>. Cleans up the partitions file after job exists.
518    */
519   static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
520       throws IOException {
521 
522     // create the partitions file
523     FileSystem fs = FileSystem.get(job.getConfiguration());
524     Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID());
525     fs.makeQualified(partitionsPath);
526     fs.deleteOnExit(partitionsPath);
527     writePartitions(job.getConfiguration(), partitionsPath, splitPoints);
528 
529     // configure job to use it
530     job.setPartitionerClass(TotalOrderPartitioner.class);
531     TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath);
532   }
533 
534   /**
535    * Serialize column family to compression algorithm map to configuration.
536    * Invoked while configuring the MR job for incremental load.
537    *
538    * @param table to read the properties from
539    * @param conf to persist serialized values into
540    * @throws IOException
541    *           on failure to read column family descriptors
542    */
543   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
544       value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
545   @VisibleForTesting
546   static void configureCompression(
547       HTable table, Configuration conf) throws IOException {
548     StringBuilder compressionConfigValue = new StringBuilder();
549     HTableDescriptor tableDescriptor = table.getTableDescriptor();
550     if(tableDescriptor == null){
551       // could happen with mock table instance
552       return;
553     }
554     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
555     int i = 0;
556     for (HColumnDescriptor familyDescriptor : families) {
557       if (i++ > 0) {
558         compressionConfigValue.append('&');
559       }
560       compressionConfigValue.append(URLEncoder.encode(
561         familyDescriptor.getNameAsString(), "UTF-8"));
562       compressionConfigValue.append('=');
563       compressionConfigValue.append(URLEncoder.encode(
564         familyDescriptor.getCompression().getName(), "UTF-8"));
565     }
566     // Get rid of the last ampersand
567     conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
568   }
569 
570   /**
571    * Serialize column family to block size map to configuration.
572    * Invoked while configuring the MR job for incremental load.
573    *
574    * @param table to read the properties from
575    * @param conf to persist serialized values into
576    * @throws IOException
577    *           on failure to read column family descriptors
578    */
579   @VisibleForTesting
580   static void configureBlockSize(
581       HTable table, Configuration conf) throws IOException {
582     StringBuilder blockSizeConfigValue = new StringBuilder();
583     HTableDescriptor tableDescriptor = table.getTableDescriptor();
584     if (tableDescriptor == null) {
585       // could happen with mock table instance
586       return;
587     }
588     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
589     int i = 0;
590     for (HColumnDescriptor familyDescriptor : families) {
591       if (i++ > 0) {
592         blockSizeConfigValue.append('&');
593       }
594       blockSizeConfigValue.append(URLEncoder.encode(
595           familyDescriptor.getNameAsString(), "UTF-8"));
596       blockSizeConfigValue.append('=');
597       blockSizeConfigValue.append(URLEncoder.encode(
598           String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
599     }
600     // Get rid of the last ampersand
601     conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
602   }
603 
604   /**
605    * Serialize column family to bloom type map to configuration.
606    * Invoked while configuring the MR job for incremental load.
607    *
608    * @param table to read the properties from
609    * @param conf to persist serialized values into
610    * @throws IOException
611    *           on failure to read column family descriptors
612    */
613   @VisibleForTesting
614   static void configureBloomType(
615       HTable table, Configuration conf) throws IOException {
616     HTableDescriptor tableDescriptor = table.getTableDescriptor();
617     if (tableDescriptor == null) {
618       // could happen with mock table instance
619       return;
620     }
621     StringBuilder bloomTypeConfigValue = new StringBuilder();
622     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
623     int i = 0;
624     for (HColumnDescriptor familyDescriptor : families) {
625       if (i++ > 0) {
626         bloomTypeConfigValue.append('&');
627       }
628       bloomTypeConfigValue.append(URLEncoder.encode(
629         familyDescriptor.getNameAsString(), "UTF-8"));
630       bloomTypeConfigValue.append('=');
631       String bloomType = familyDescriptor.getBloomFilterType().toString();
632       if (bloomType == null) {
633         bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
634       }
635       bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
636     }
637     conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
638   }
639 
640   /**
641    * Serialize column family to data block encoding map to configuration.
642    * Invoked while configuring the MR job for incremental load.
643    *
644    * @param table to read the properties from
645    * @param conf to persist serialized values into
646    * @throws IOException
647    *           on failure to read column family descriptors
648    */
649   @VisibleForTesting
650   static void configureDataBlockEncoding(HTable table,
651       Configuration conf) throws IOException {
652     HTableDescriptor tableDescriptor = table.getTableDescriptor();
653     if (tableDescriptor == null) {
654       // could happen with mock table instance
655       return;
656     }
657     StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
658     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
659     int i = 0;
660     for (HColumnDescriptor familyDescriptor : families) {
661       if (i++ > 0) {
662         dataBlockEncodingConfigValue.append('&');
663       }
664       dataBlockEncodingConfigValue.append(
665           URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
666       dataBlockEncodingConfigValue.append('=');
667       DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
668       if (encoding == null) {
669         encoding = DataBlockEncoding.NONE;
670       }
671       dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(),
672           "UTF-8"));
673     }
674     conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
675         dataBlockEncodingConfigValue.toString());
676   }
677 }