View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNotNull;
23  import static org.junit.Assert.assertTrue;
24  import static org.mockito.Matchers.any;
25  import static org.mockito.Matchers.anyInt;
26  import static org.mockito.Mockito.mock;
27  import static org.mockito.Mockito.times;
28  
29  import java.io.ByteArrayOutputStream;
30  import java.io.IOException;
31  import java.io.PrintStream;
32  
33  import org.apache.hadoop.hbase.HBaseConfiguration;
34  import org.apache.hadoop.hbase.SmallTests;
35  import org.apache.hadoop.hbase.client.Result;
36  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37  import org.apache.hadoop.hbase.mapred.RowCounter.RowCounterMapper;
38  import org.apache.hadoop.mapred.JobConf;
39  import org.apache.hadoop.mapred.OutputCollector;
40  import org.apache.hadoop.mapred.Reporter;
41  import org.junit.Test;
42  import org.junit.experimental.categories.Category;
43  import org.mockito.Mockito;
44  
45  import com.google.common.base.Joiner;
46  
47  @Category(SmallTests.class)
48  public class TestRowCounter {
49  
50    @Test
51    @SuppressWarnings("deprecation")
52    public void shouldPrintUsage() throws Exception {
53      String expectedOutput = "rowcounter <outputdir> <tablename> <column1> [<column2>...]";
54      String result = new OutputReader(System.out) {
55        @Override
56        void doRead() {
57          assertEquals(-1, RowCounter.printUsage());
58        }
59      }.read();
60  
61      assertTrue(result.startsWith(expectedOutput));
62    }
63  
64    @Test
65    @SuppressWarnings("deprecation")
66    public void shouldExitAndPrintUsageSinceParameterNumberLessThanThree()
67        throws Exception {
68      final String[] args = new String[] { "one", "two" };
69      String line = "ERROR: Wrong number of parameters: " + args.length;
70      String result = new OutputReader(System.err) {
71        @Override
72        void doRead() throws Exception {
73          assertEquals(-1, new RowCounter().run(args));
74        }
75      }.read();
76  
77      assertTrue(result.startsWith(line));
78    }
79  
80    @Test
81    @SuppressWarnings({ "deprecation", "unchecked" })
82    public void shouldRegInReportEveryIncomingRow() throws IOException {
83      int iterationNumber = 999;
84      RowCounter.RowCounterMapper mapper = new RowCounter.RowCounterMapper();
85      Reporter reporter = mock(Reporter.class);
86      for (int i = 0; i < iterationNumber; i++)
87        mapper.map(mock(ImmutableBytesWritable.class), mock(Result.class),
88            mock(OutputCollector.class), reporter);
89  
90      Mockito.verify(reporter, times(iterationNumber)).incrCounter(
91          any(Enum.class), anyInt());
92    }
93  
94    @Test
95    @SuppressWarnings({ "deprecation" })
96    public void shouldCreateAndRunSubmittableJob() throws Exception {
97      RowCounter rCounter = new RowCounter();
98      rCounter.setConf(HBaseConfiguration.create());
99      String[] args = new String[] { "\temp", "tableA", "column1", "column2",
100         "column3" };
101     JobConf jobConfig = rCounter.createSubmittableJob(args);
102 
103     assertNotNull(jobConfig);
104     assertEquals(0, jobConfig.getNumReduceTasks());
105     assertEquals("rowcounter", jobConfig.getJobName());
106     assertEquals(jobConfig.getMapOutputValueClass(), Result.class);
107     assertEquals(jobConfig.getMapperClass(), RowCounterMapper.class);
108     assertEquals(jobConfig.get(TableInputFormat.COLUMN_LIST), Joiner.on(' ')
109         .join("column1", "column2", "column3"));
110     assertEquals(jobConfig.getMapOutputKeyClass(), ImmutableBytesWritable.class);
111   }
112 
113   enum Outs {
114     OUT, ERR
115   }
116 
117   private static abstract class OutputReader {
118     private final PrintStream ps;
119     private PrintStream oldPrintStream;
120     private Outs outs;
121 
122     protected OutputReader(PrintStream ps) {
123       this.ps = ps;
124     }
125 
126     protected String read() throws Exception {
127       ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
128       if (ps == System.out) {
129         oldPrintStream = System.out;
130         outs = Outs.OUT;
131         System.setOut(new PrintStream(outBytes));
132       } else if (ps == System.err) {
133         oldPrintStream = System.err;
134         outs = Outs.ERR;
135         System.setErr(new PrintStream(outBytes));
136       } else {
137         throw new IllegalStateException("OutputReader: unsupported PrintStream");
138       }
139 
140       try {
141         doRead();
142         return new String(outBytes.toByteArray());
143       } finally {
144         switch (outs) {
145         case OUT: {
146           System.setOut(oldPrintStream);
147           break;
148         }
149         case ERR: {
150           System.setErr(oldPrintStream);
151           break;
152         }
153         default:
154           throw new IllegalStateException(
155               "OutputReader: unsupported PrintStream");
156         }
157       }
158     }
159 
160     abstract void doRead() throws Exception;
161   }
162 }