View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.util.Set;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.HBaseConfiguration;
28  import org.apache.hadoop.hbase.IntegrationTestBase;
29  import org.apache.hadoop.hbase.IntegrationTestingUtility;
30  import org.apache.hadoop.hbase.IntegrationTests;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.util.Bytes;
33  import org.apache.hadoop.util.ToolRunner;
34  import org.junit.After;
35  import org.junit.Before;
36  import org.junit.experimental.categories.Category;
37  
38  /**
39   * An integration test to test {@link TableSnapshotInputFormat} which enables
40   * reading directly from snapshot files without going through hbase servers.
41   *
42   * This test creates a table and loads the table with the rows ranging from
43   * 'aaa' to 'zzz', and for each row, sets the columns f1:(null) and f2:(null) to be
44   * the the same as the row value.
45   * <pre>
46   * aaa, f1: => aaa
47   * aaa, f2: => aaa
48   * aab, f1: => aab
49   * ....
50   * zzz, f2: => zzz
51   * </pre>
52   *
53   * Then the test creates a snapshot from this table, and overrides the values in the original
54   * table with values 'after_snapshot_value'. The test, then runs a mapreduce job over the snapshot
55   * with a scan start row 'bbb' and stop row 'yyy'. The data is saved in a single reduce output
56   * file, and inspected later to verify that the MR job has seen all the values from the snapshot.
57   *
58   * <p> These parameters can be used to configure the job:
59   * <br>"IntegrationTestTableSnapshotInputFormat.table" =&gt; the name of the table
60   * <br>"IntegrationTestTableSnapshotInputFormat.snapshot" =&gt; the name of the snapshot
61   * <br>"IntegrationTestTableSnapshotInputFormat.numRegions" =&gt; number of regions in the table
62   * to be created (default, 32).
63   * <br>"IntegrationTestTableSnapshotInputFormat.tableDir" =&gt; temporary directory to restore the
64   * snapshot files
65   * <br>"IntegrationTestTableSnapshotInputFormat.tableDir" =&gt; temporary directory to restore the
66   * snapshot files
67   */
68  @Category(IntegrationTests.class)
69  // Not runnable as a unit test. See TestTableSnapshotInputFormat
70  public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase {
71  
72    private static final Log LOG = LogFactory.getLog(IntegrationTestTableSnapshotInputFormat.class);
73  
74    private static final String TABLE_NAME_KEY = "IntegrationTestTableSnapshotInputFormat.table";
75    private static final String DEFAULT_TABLE_NAME = "IntegrationTestTableSnapshotInputFormat";
76  
77    private static final String SNAPSHOT_NAME_KEY = "IntegrationTestTableSnapshotInputFormat.snapshot";
78  
79    private static final String MR_IMPLEMENTATION_KEY =
80      "IntegrationTestTableSnapshotInputFormat.API";
81    private static final String MAPRED_IMPLEMENTATION = "mapred";
82    private static final String MAPREDUCE_IMPLEMENTATION = "mapreduce";
83  
84    private static final String NUM_REGIONS_KEY =
85      "IntegrationTestTableSnapshotInputFormat.numRegions";
86    private static final int DEFAULT_NUM_REGIONS = 32;
87    private static final String TABLE_DIR_KEY = "IntegrationTestTableSnapshotInputFormat.tableDir";
88  
89    private static final byte[] START_ROW = Bytes.toBytes("bbb");
90    private static final byte[] END_ROW = Bytes.toBytes("yyy");
91  
92    // mapred API missing feature pairity with mapreduce. See comments in
93    // mapred.TestTableSnapshotInputFormat
94    private static final byte[] MAPRED_START_ROW = Bytes.toBytes("aaa");
95    private static final byte[] MAPRED_END_ROW = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
96  
97    private IntegrationTestingUtility util;
98  
99    @Override
100   public void setConf(Configuration conf) {
101     super.setConf(conf);
102     util = getTestingUtil(conf);
103   }
104 
105   @Override
106   @Before
107   public void setUp() throws Exception {
108     super.setUp();
109     util = getTestingUtil(getConf());
110     util.initializeCluster(1);
111     this.setConf(util.getConfiguration());
112   }
113 
114   @Override
115   @After
116   public void cleanUp() throws Exception {
117     util.restoreCluster();
118   }
119 
120   @Override
121   public void setUpCluster() throws Exception {
122   }
123 
124   @Override
125   public int runTestFromCommandLine() throws Exception {
126     Configuration conf = getConf();
127     TableName tableName = TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
128     String snapshotName = conf.get(SNAPSHOT_NAME_KEY, tableName.getQualifierAsString()
129       + "_snapshot_" + System.currentTimeMillis());
130     int numRegions = conf.getInt(NUM_REGIONS_KEY, DEFAULT_NUM_REGIONS);
131     String tableDirStr = conf.get(TABLE_DIR_KEY);
132     Path tableDir;
133     if (tableDirStr == null) {
134       tableDir = util.getDataTestDirOnTestFS(tableName.getQualifierAsString());
135     } else {
136       tableDir = new Path(tableDirStr);
137     }
138 
139     final String mr = conf.get(MR_IMPLEMENTATION_KEY, MAPREDUCE_IMPLEMENTATION);
140     if (mr.equalsIgnoreCase(MAPREDUCE_IMPLEMENTATION)) {
141       /*
142        * We create the table using HBaseAdmin#createTable(), which will create the table
143        * with desired number of regions. We pass bbb as startKey and yyy as endKey, so if
144        * desiredNumRegions is > 2, we create regions empty - bbb and yyy - empty, and we
145        * create numRegions - 2 regions between bbb - yyy. The test uses a Scan with startRow
146        * bbb and endRow yyy, so, we expect the first and last region to be filtered out in
147        * the input format, and we expect numRegions - 2 splits between bbb and yyy.
148        */
149       LOG.debug("Running job with mapreduce API.");
150       int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions;
151 
152       org.apache.hadoop.hbase.mapreduce.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
153         tableName, snapshotName, START_ROW, END_ROW, tableDir, numRegions,
154         expectedNumSplits, false);
155     } else if (mr.equalsIgnoreCase(MAPRED_IMPLEMENTATION)) {
156       /*
157        * Similar considerations to above. The difference is that mapred API does not support
158        * specifying start/end rows (or a scan object at all). Thus the omission of first and
159        * last regions are not performed. See comments in mapred.TestTableSnapshotInputFormat
160        * for details of how that test works around the problem. This feature should be added
161        * in follow-on work.
162        */
163       LOG.debug("Running job with mapred API.");
164       int expectedNumSplits = numRegions;
165 
166       org.apache.hadoop.hbase.mapred.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
167         tableName, snapshotName, MAPRED_START_ROW, MAPRED_END_ROW, tableDir, numRegions,
168         expectedNumSplits, false);
169     } else {
170       throw new IllegalArgumentException("Unrecognized mapreduce implementation: " + mr +".");
171     }
172 
173     return 0;
174   }
175 
176   @Override // CM is not intended to be run with this test
177   public String getTablename() {
178     return null;
179   }
180 
181   @Override
182   protected Set<String> getColumnFamilies() {
183     return null;
184   }
185 
186   public static void main(String[] args) throws Exception {
187     Configuration conf = HBaseConfiguration.create();
188     IntegrationTestingUtility.setUseDistributedCluster(conf);
189     int ret = ToolRunner.run(conf, new IntegrationTestTableSnapshotInputFormat(), args);
190     System.exit(ret);
191   }
192 
193 }