View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.List;
26  import java.util.UUID;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configurable;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FSDataOutputStream;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HBaseTestingUtility;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.client.Durability;
38  import org.apache.hadoop.hbase.client.Put;
39  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
40  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
41  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
42  import org.apache.hadoop.hbase.regionserver.Region;
43  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
44  import org.apache.hadoop.hbase.testclassification.LargeTests;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.util.Tool;
47  import org.apache.hadoop.util.ToolRunner;
48  import org.junit.AfterClass;
49  import org.junit.BeforeClass;
50  import org.junit.Test;
51  import org.junit.experimental.categories.Category;
52  
53  @Category(LargeTests.class)
54  public class TestImportTSVWithTTLs implements Configurable {
55  
56    protected static final Log LOG = LogFactory.getLog(TestImportTSVWithTTLs.class);
57    protected static final String NAME = TestImportTsv.class.getSimpleName();
58    protected static HBaseTestingUtility util = new HBaseTestingUtility();
59  
60    /**
61     * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
62     * false.
63     */
64    protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
65  
66    /**
67     * Force use of combiner in doMROnTableTest. Boolean. Default is true.
68     */
69    protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
70  
71    private final String FAMILY = "FAM";
72    private static Configuration conf;
73  
74    @Override
75    public Configuration getConf() {
76      return util.getConfiguration();
77    }
78  
79    @Override
80    public void setConf(Configuration conf) {
81      throw new IllegalArgumentException("setConf not supported");
82    }
83  
84    @BeforeClass
85    public static void provisionCluster() throws Exception {
86      conf = util.getConfiguration();
87      // We don't check persistence in HFiles in this test, but if we ever do we will
88      // need this where the default hfile version is not 3 (i.e. 0.98)
89      conf.setInt("hfile.format.version", 3);
90      conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName());
91      util.setJobWithoutMRCluster();
92      util.startMiniCluster();
93    }
94  
95    @AfterClass
96    public static void releaseCluster() throws Exception {
97      util.shutdownMiniCluster();
98    }
99  
100   @Test
101   public void testMROnTable() throws Exception {
102     String tableName = "test-" + UUID.randomUUID();
103 
104     // Prepare the arguments required for the test.
105     String[] args = new String[] {
106         "-D" + ImportTsv.MAPPER_CONF_KEY
107             + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
108         "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL",
109         "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName };
110     String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n";
111     util.createTable(TableName.valueOf(tableName), FAMILY);
112     doMROnTableTest(util, FAMILY, data, args, 1);
113     util.deleteTable(tableName);
114   }
115 
116   protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
117       String[] args, int valueMultiplier) throws Exception {
118     TableName table = TableName.valueOf(args[args.length - 1]);
119     Configuration conf = new Configuration(util.getConfiguration());
120 
121     // populate input file
122     FileSystem fs = FileSystem.get(conf);
123     Path inputPath = fs.makeQualified(new Path(util
124         .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
125     FSDataOutputStream op = fs.create(inputPath, true);
126     op.write(Bytes.toBytes(data));
127     op.close();
128     LOG.debug(String.format("Wrote test data to file: %s", inputPath));
129 
130     if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
131       LOG.debug("Forcing combiner.");
132       conf.setInt("mapreduce.map.combine.minspills", 1);
133     }
134 
135     // run the import
136     List<String> argv = new ArrayList<String>(Arrays.asList(args));
137     argv.add(inputPath.toString());
138     Tool tool = new ImportTsv();
139     LOG.debug("Running ImportTsv with arguments: " + argv);
140     try {
141       // Job will fail if observer rejects entries without TTL
142       assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
143     } finally {
144       // Clean up
145       if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
146         LOG.debug("Deleting test subdirectory");
147         util.cleanupDataTestDirOnTestFS(table.getNameAsString());
148       }
149     }
150 
151     return tool;
152   }
153 
154   public static class TTLCheckingObserver extends BaseRegionObserver {
155 
156     @Override
157     public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
158         Durability durability) throws IOException {
159       Region region = e.getEnvironment().getRegion();
160       if (!region.getRegionInfo().isMetaTable()
161           && !region.getRegionInfo().getTable().isSystemTable()) {
162         // The put carries the TTL attribute
163         if (put.getTTL() != Long.MAX_VALUE) {
164           return;
165         }
166         throw new IOException("Operation does not have TTL set");
167       }
168     }
169   }
170 }