View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertFalse;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.util.List;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FileStatus;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.HBaseTestingUtility;
33  import org.apache.hadoop.hbase.HColumnDescriptor;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.HRegionInfo;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.KeyValue;
38  import org.apache.hadoop.hbase.MediumTests;
39  import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogRecordReader;
40  import org.apache.hadoop.hbase.regionserver.wal.HLog;
41  import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
42  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.mapreduce.InputSplit;
45  import org.apache.hadoop.mapreduce.MapReduceTestUtil;
46  import org.junit.AfterClass;
47  import org.junit.Before;
48  import org.junit.BeforeClass;
49  import org.junit.Test;
50  import org.junit.experimental.categories.Category;
51  
52  /**
53   * JUnit tests for the HLogRecordReader
54   */
55  @Category(MediumTests.class)
56  public class TestHLogRecordReader {
57    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
58    private static Configuration conf;
59    private static FileSystem fs;
60    private static Path hbaseDir;
61    private static final TableName tableName =
62        TableName.valueOf(getName());
63    private static final byte [] rowName = tableName.getName();
64    private static final HRegionInfo info = new HRegionInfo(tableName,
65        Bytes.toBytes(""), Bytes.toBytes(""), false);
66    private static final byte [] family = Bytes.toBytes("column");
67    private static final byte [] value = Bytes.toBytes("value");
68    private static HTableDescriptor htd;
69    private static Path logDir;
70    private static String logName;
71  
72    private static String getName() {
73      return "TestHLogRecordReader";
74    }
75  
76    @Before
77    public void setUp() throws Exception {
78      FileStatus[] entries = fs.listStatus(hbaseDir);
79      for (FileStatus dir : entries) {
80        fs.delete(dir.getPath(), true);
81      }
82  
83    }
84    @BeforeClass
85    public static void setUpBeforeClass() throws Exception {
86      // Make block sizes small.
87      conf = TEST_UTIL.getConfiguration();
88      conf.setInt("dfs.blocksize", 1024 * 1024);
89      conf.setInt("dfs.replication", 1);
90      TEST_UTIL.startMiniDFSCluster(1);
91  
92      conf = TEST_UTIL.getConfiguration();
93      fs = TEST_UTIL.getDFSCluster().getFileSystem();
94  
95      hbaseDir = TEST_UTIL.createRootDir();
96      
97      logName = HConstants.HREGION_LOGDIR_NAME;
98      logDir = new Path(hbaseDir, logName);
99  
100     htd = new HTableDescriptor(tableName);
101     htd.addFamily(new HColumnDescriptor(family));
102   }
103 
104   @AfterClass
105   public static void tearDownAfterClass() throws Exception {
106     TEST_UTIL.shutdownMiniCluster();
107   }
108 
109   /**
110    * Test partial reads from the log based on passed time range
111    * @throws Exception
112    */
113   @Test
114   public void testPartialRead() throws Exception {
115     HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf);
116     long ts = System.currentTimeMillis();
117     WALEdit edit = new WALEdit();
118     final AtomicLong sequenceId = new AtomicLong(0);
119     edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
120     log.append(info, tableName, edit, ts, htd, sequenceId);
121     edit = new WALEdit();
122     edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
123     log.append(info, tableName, edit, ts+1, htd, sequenceId);
124     log.rollWriter();
125 
126     Thread.sleep(1);
127     long ts1 = System.currentTimeMillis();
128 
129     edit = new WALEdit();
130     edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
131     log.append(info, tableName, edit, ts1+1, htd, sequenceId);
132     edit = new WALEdit();
133     edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
134     log.append(info, tableName, edit, ts1+2, htd, sequenceId);
135     log.close();
136 
137     HLogInputFormat input = new HLogInputFormat();
138     Configuration jobConf = new Configuration(conf);
139     jobConf.set("mapred.input.dir", logDir.toString());
140     jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts);
141 
142     // only 1st file is considered, and only its 1st entry is used
143     List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
144     assertEquals(1, splits.size());
145     testSplit(splits.get(0), Bytes.toBytes("1"));
146 
147     jobConf.setLong(HLogInputFormat.START_TIME_KEY, ts+1);
148     jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts1+1);
149     splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
150     // both files need to be considered
151     assertEquals(2, splits.size());
152     // only the 2nd entry from the 1st file is used
153     testSplit(splits.get(0), Bytes.toBytes("2"));
154     // only the 1nd entry from the 2nd file is used
155     testSplit(splits.get(1), Bytes.toBytes("3"));
156   }
157 
158   /**
159    * Test basic functionality
160    * @throws Exception
161    */
162   @Test
163   public void testHLogRecordReader() throws Exception {
164     HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf);
165     byte [] value = Bytes.toBytes("value");
166     final AtomicLong sequenceId = new AtomicLong(0);
167     WALEdit edit = new WALEdit();
168     edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
169         System.currentTimeMillis(), value));
170     log.append(info, tableName, edit,
171       System.currentTimeMillis(), htd, sequenceId);
172 
173     Thread.sleep(1); // make sure 2nd log gets a later timestamp
174     long secondTs = System.currentTimeMillis();
175     log.rollWriter();
176 
177     edit = new WALEdit();
178     edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
179         System.currentTimeMillis(), value));
180     log.append(info, tableName, edit,
181       System.currentTimeMillis(), htd, sequenceId);
182     log.close();
183     long thirdTs = System.currentTimeMillis();
184 
185     // should have 2 log files now
186     HLogInputFormat input = new HLogInputFormat();
187     Configuration jobConf = new Configuration(conf);
188     jobConf.set("mapred.input.dir", logDir.toString());
189 
190     // make sure both logs are found
191     List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
192     assertEquals(2, splits.size());
193 
194     // should return exactly one KV
195     testSplit(splits.get(0), Bytes.toBytes("1"));
196     // same for the 2nd split
197     testSplit(splits.get(1), Bytes.toBytes("2"));
198 
199     // now test basic time ranges:
200 
201     // set an endtime, the 2nd log file can be ignored completely.
202     jobConf.setLong(HLogInputFormat.END_TIME_KEY, secondTs-1);
203     splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
204     assertEquals(1, splits.size());
205     testSplit(splits.get(0), Bytes.toBytes("1"));
206 
207     // now set a start time
208     jobConf.setLong(HLogInputFormat.END_TIME_KEY, Long.MAX_VALUE);
209     jobConf.setLong(HLogInputFormat.START_TIME_KEY, thirdTs);
210     splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
211     // both logs need to be considered
212     assertEquals(2, splits.size());
213     // but both readers skip all edits
214     testSplit(splits.get(0));
215     testSplit(splits.get(1));
216   }
217 
218   /**
219    * Create a new reader from the split, and match the edits against the passed columns.
220    */
221   private void testSplit(InputSplit split, byte[]... columns) throws Exception {
222     HLogRecordReader reader = new HLogRecordReader();
223     reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
224 
225     for (byte[] column : columns) {
226       assertTrue(reader.nextKeyValue());
227       KeyValue kv = reader.getCurrentValue().getKeyValues().get(0);
228       if (!Bytes.equals(column, kv.getQualifier())) {
229         assertTrue("expected [" + Bytes.toString(column) + "], actual ["
230             + Bytes.toString(kv.getQualifier()) + "]", false);
231       }
232     }
233     assertFalse(reader.nextKeyValue());
234     reader.close();
235   }
236 
237 }