View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  import static org.junit.Assert.fail;
24  
25  import java.io.ByteArrayOutputStream;
26  import java.io.IOException;
27  import java.io.PrintStream;
28  import java.util.ArrayList;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.CategoryBasedTimeout;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.testclassification.LargeTests;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.client.HTable;
38  import org.apache.hadoop.hbase.client.Put;
39  import org.apache.hadoop.hbase.client.Table;
40  import org.apache.hadoop.hbase.mapreduce.RowCounter.RowCounterMapper;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.LauncherSecurityManager;
43  import org.apache.hadoop.mapreduce.Counter;
44  import org.apache.hadoop.mapreduce.Job;
45  import org.apache.hadoop.util.GenericOptionsParser;
46  import org.junit.AfterClass;
47  import org.junit.BeforeClass;
48  import org.junit.Rule;
49  import org.junit.Test;
50  import org.junit.experimental.categories.Category;
51  import org.junit.rules.TestRule;
52  
53  /**
54   * Test the rowcounter map reduce job.
55   */
56  @Category(LargeTests.class)
57  public class TestRowCounter {
58    @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
59        withTimeout(this.getClass()).withLookingForStuckThread(true).build();
60    private static final Log LOG = LogFactory.getLog(TestRowCounter.class);
61    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
62  
63    private final static String TABLE_NAME = "testRowCounter";
64  
65    private final static String COL_FAM = "col_fam";
66  
67    private final static String COL1 = "c1";
68  
69    private final static String COL2 = "c2";
70  
71    private final static String COMPOSITE_COLUMN = "C:A:A";
72  
73    private final static int TOTAL_ROWS = 10;
74  
75    private final static int ROWS_WITH_ONE_COL = 2;
76  
77    /**
78     * @throws java.lang.Exception
79     */
80    @BeforeClass
81    public static void setUpBeforeClass()
82        throws Exception {
83      TEST_UTIL.setJobWithoutMRCluster();
84      TEST_UTIL.startMiniCluster();
85      Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes(COL_FAM));
86      writeRows(table);
87      table.close();
88    }
89  
90    /**
91     * @throws java.lang.Exception
92     */
93    @AfterClass
94    public static void tearDownAfterClass()
95        throws Exception {
96      TEST_UTIL.shutdownMiniCluster();
97    }
98  
99    /**
100    * Test a case when no column was specified in command line arguments.
101    *
102    * @throws Exception
103    */
104   @Test
105   public void testRowCounterNoColumn()
106       throws Exception {
107     String[] args = new String[] {TABLE_NAME};
108     runRowCount(args, 10);
109   }
110 
111   /**
112    * Test a case when the column specified in command line arguments is
113    * exclusive for few rows.
114    *
115    * @throws Exception
116    */
117   @Test
118   public void testRowCounterExclusiveColumn()
119       throws Exception {
120     String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1};
121     runRowCount(args, 8);
122   }
123 
124   /**
125    * Test a case when the column specified in command line arguments is
126    * one for which the qualifier contains colons.
127    *
128    * @throws Exception
129    */
130   @Test
131   public void testRowCounterColumnWithColonInQualifier()
132       throws Exception {
133     String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN};
134     runRowCount(args, 8);
135   }
136 
137   /**
138    * Test a case when the column specified in command line arguments is not part
139    * of first KV for a row.
140    *
141    * @throws Exception
142    */
143   @Test
144   public void testRowCounterHiddenColumn()
145       throws Exception {
146     String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL2};
147     runRowCount(args, 10);
148   }
149 
150   /**
151    * Test a case when the timerange is specified with --starttime and --endtime options
152    *
153    * @throws Exception
154    */
155   @Test
156   public void testRowCounterTimeRange()
157       throws Exception {
158     final byte[] family = Bytes.toBytes(COL_FAM);
159     final byte[] col1 = Bytes.toBytes(COL1);
160     Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1));
161     Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2));
162     Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3));
163 
164     long ts;
165 
166     // clean up content of TABLE_NAME
167     HTable table = TEST_UTIL.deleteTableData(TableName.valueOf(TABLE_NAME));
168     ts = System.currentTimeMillis();
169     put1.add(family, col1, ts, Bytes.toBytes("val1"));
170     table.put(put1);
171     Thread.sleep(100);
172 
173     ts = System.currentTimeMillis();
174     put2.add(family, col1, ts, Bytes.toBytes("val2"));
175     put3.add(family, col1, ts, Bytes.toBytes("val3"));
176     table.put(put2);
177     table.put(put3);
178     table.close();
179 
180     String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + 0,
181                                      "--endtime=" + ts};
182     runRowCount(args, 1);
183 
184     args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + 0,
185                             "--endtime=" + (ts - 10)};
186     runRowCount(args, 1);
187 
188     args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + ts,
189                             "--endtime=" + (ts + 1000)};
190     runRowCount(args, 2);
191 
192     args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + (ts - 30 * 1000),
193                             "--endtime=" + (ts + 30 * 1000),};
194     runRowCount(args, 3);
195   }
196 
197   /**
198    * Run the RowCounter map reduce job and verify the row count.
199    *
200    * @param args the command line arguments to be used for rowcounter job.
201    * @param expectedCount the expected row count (result of map reduce job).
202    * @throws Exception
203    */
204   private void runRowCount(String[] args, int expectedCount)
205       throws Exception {
206     GenericOptionsParser opts = new GenericOptionsParser(TEST_UTIL.getConfiguration(), args);
207     Configuration conf = opts.getConfiguration();
208     args = opts.getRemainingArgs();
209     Job job = RowCounter.createSubmittableJob(conf, args);
210     job.waitForCompletion(true);
211     assertTrue(job.isSuccessful());
212     Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS);
213     assertEquals(expectedCount, counter.getValue());
214   }
215 
216   /**
217    * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have
218    * two columns, Few have one.
219    *
220    * @param table
221    * @throws IOException
222    */
223   private static void writeRows(Table table)
224       throws IOException {
225     final byte[] family = Bytes.toBytes(COL_FAM);
226     final byte[] value = Bytes.toBytes("abcd");
227     final byte[] col1 = Bytes.toBytes(COL1);
228     final byte[] col2 = Bytes.toBytes(COL2);
229     final byte[] col3 = Bytes.toBytes(COMPOSITE_COLUMN);
230     ArrayList<Put> rowsUpdate = new ArrayList<Put>();
231     // write few rows with two columns
232     int i = 0;
233     for (; i < TOTAL_ROWS - ROWS_WITH_ONE_COL; i++) {
234       byte[] row = Bytes.toBytes("row" + i);
235       Put put = new Put(row);
236       put.add(family, col1, value);
237       put.add(family, col2, value);
238       put.add(family, col3, value);
239       rowsUpdate.add(put);
240     }
241 
242     // write few rows with only one column
243     for (; i < TOTAL_ROWS; i++) {
244       byte[] row = Bytes.toBytes("row" + i);
245       Put put = new Put(row);
246       put.add(family, col2, value);
247       rowsUpdate.add(put);
248     }
249     table.put(rowsUpdate);
250   }
251 
252   /**
253    * test main method. Import should print help and call System.exit
254    */
255   @Test
256   public void testImportMain()
257       throws Exception {
258     PrintStream oldPrintStream = System.err;
259     SecurityManager SECURITY_MANAGER = System.getSecurityManager();
260     LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
261     System.setSecurityManager(newSecurityManager);
262     ByteArrayOutputStream data = new ByteArrayOutputStream();
263     String[] args = {};
264     System.setErr(new PrintStream(data));
265     try {
266       System.setErr(new PrintStream(data));
267 
268       try {
269         RowCounter.main(args);
270         fail("should be SecurityException");
271       } catch (SecurityException e) {
272         assertEquals(-1, newSecurityManager.getExitCode());
273         assertTrue(data.toString().contains("Wrong number of parameters:"));
274         assertTrue(data.toString().contains("Usage: RowCounter [options] <tablename> " +
275                                                 "[--starttime=[start] --endtime=[end] " +
276                                                 "[--range=[startKey],[endKey]] " +
277                                                 "[<column1> <column2>...]"));
278         assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
279         assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false"));
280       }
281       data.reset();
282       try {
283         args = new String[2];
284         args[0] = "table";
285         args[1] = "--range=1";
286         RowCounter.main(args);
287         fail("should be SecurityException");
288       } catch (SecurityException e) {
289         assertEquals(-1, newSecurityManager.getExitCode());
290         assertTrue(data.toString().contains("Please specify range in such format as \"--range=a,b\" or, with only one boundary," +
291 
292                                                 " \"--range=,b\" or \"--range=a,\""));
293         assertTrue(data.toString().contains("Usage: RowCounter [options] <tablename> " +
294                                                 "[--starttime=[start] --endtime=[end] " +
295                                                 "[--range=[startKey],[endKey]] " +
296                                                 "[<column1> <column2>...]"));
297       }
298 
299     } finally {
300       System.setErr(oldPrintStream);
301       System.setSecurityManager(SECURITY_MANAGER);
302     }
303 
304   }
305 }