1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.NavigableMap;
27 import java.util.TreeMap;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configurable;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FileUtil;
34 import org.apache.hadoop.hbase.*;
35 import org.apache.hadoop.hbase.client.HBaseAdmin;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.client.Result;
39 import org.apache.hadoop.hbase.client.ResultScanner;
40 import org.apache.hadoop.hbase.client.Scan;
41 import org.apache.hadoop.hbase.client.Durability;
42 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.io.MapWritable;
45 import org.apache.hadoop.io.Text;
46 import org.apache.hadoop.mapreduce.Job;
47 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
48 import org.junit.After;
49 import org.junit.AfterClass;
50 import org.junit.Before;
51 import org.junit.BeforeClass;
52 import org.junit.Test;
53 import org.junit.experimental.categories.Category;
54
55 @Category(LargeTests.class)
56 public class TestTimeRangeMapRed {
57 private final static Log log = LogFactory.getLog(TestTimeRangeMapRed.class);
58 private static final HBaseTestingUtility UTIL =
59 new HBaseTestingUtility();
60 private HBaseAdmin admin;
61
62 private static final byte [] KEY = Bytes.toBytes("row1");
63 private static final NavigableMap<Long, Boolean> TIMESTAMP =
64 new TreeMap<Long, Boolean>();
65 static {
66 TIMESTAMP.put((long)1245620000, false);
67 TIMESTAMP.put((long)1245620005, true);
68 TIMESTAMP.put((long)1245620010, true);
69 TIMESTAMP.put((long)1245620055, true);
70 TIMESTAMP.put((long)1245620100, true);
71 TIMESTAMP.put((long)1245620150, false);
72 TIMESTAMP.put((long)1245620250, false);
73 }
74 static final long MINSTAMP = 1245620005;
75 static final long MAXSTAMP = 1245620100 + 1;
76
77 static final byte[] TABLE_NAME = Bytes.toBytes("table123");
78 static final byte[] FAMILY_NAME = Bytes.toBytes("text");
79 static final byte[] COLUMN_NAME = Bytes.toBytes("input");
80
81 @BeforeClass
82 public static void beforeClass() throws Exception {
83 UTIL.startMiniCluster();
84 }
85
86 @AfterClass
87 public static void afterClass() throws Exception {
88 UTIL.shutdownMiniCluster();
89 }
90
91 @Before
92 public void before() throws Exception {
93 this.admin = new HBaseAdmin(UTIL.getConfiguration());
94 }
95
96 @After
97 public void after() throws IOException {
98 this.admin.close();
99 }
100
101 private static class ProcessTimeRangeMapper
102 extends TableMapper<ImmutableBytesWritable, MapWritable>
103 implements Configurable {
104
105 private Configuration conf = null;
106 private HTable table = null;
107
108 @Override
109 public void map(ImmutableBytesWritable key, Result result,
110 Context context)
111 throws IOException {
112 List<Long> tsList = new ArrayList<Long>();
113 for (Cell kv : result.listCells()) {
114 tsList.add(kv.getTimestamp());
115 }
116
117 for (Long ts : tsList) {
118 Put put = new Put(key.get());
119 put.setDurability(Durability.SKIP_WAL);
120 put.add(FAMILY_NAME, COLUMN_NAME, ts, Bytes.toBytes(true));
121 table.put(put);
122 }
123 table.flushCommits();
124 }
125
126 @Override
127 public Configuration getConf() {
128 return conf;
129 }
130
131 @Override
132 public void setConf(Configuration configuration) {
133 this.conf = configuration;
134 try {
135 table = new HTable(HBaseConfiguration.create(conf), TABLE_NAME);
136 } catch (IOException e) {
137 e.printStackTrace();
138 }
139 }
140 }
141
142 @Test
143 public void testTimeRangeMapRed()
144 throws IOException, InterruptedException, ClassNotFoundException {
145 final HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
146 final HColumnDescriptor col = new HColumnDescriptor(FAMILY_NAME);
147 col.setMaxVersions(Integer.MAX_VALUE);
148 desc.addFamily(col);
149 admin.createTable(desc);
150 HTable table = new HTable(UTIL.getConfiguration(), desc.getTableName());
151 prepareTest(table);
152 runTestOnTable();
153 verify(table);
154 }
155
156 private void prepareTest(final HTable table) throws IOException {
157 for (Map.Entry<Long, Boolean> entry : TIMESTAMP.entrySet()) {
158 Put put = new Put(KEY);
159 put.setDurability(Durability.SKIP_WAL);
160 put.add(FAMILY_NAME, COLUMN_NAME, entry.getKey(), Bytes.toBytes(false));
161 table.put(put);
162 }
163 table.flushCommits();
164 }
165
166 private void runTestOnTable()
167 throws IOException, InterruptedException, ClassNotFoundException {
168 UTIL.startMiniMapReduceCluster();
169 Job job = null;
170 try {
171 job = new Job(UTIL.getConfiguration(), "test123");
172 job.setOutputFormatClass(NullOutputFormat.class);
173 job.setNumReduceTasks(0);
174 Scan scan = new Scan();
175 scan.addColumn(FAMILY_NAME, COLUMN_NAME);
176 scan.setTimeRange(MINSTAMP, MAXSTAMP);
177 scan.setMaxVersions();
178 TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME),
179 scan, ProcessTimeRangeMapper.class, Text.class, Text.class, job);
180 job.waitForCompletion(true);
181 } catch (IOException e) {
182
183 e.printStackTrace();
184 } finally {
185 UTIL.shutdownMiniMapReduceCluster();
186 if (job != null) {
187 FileUtil.fullyDelete(
188 new File(job.getConfiguration().get("hadoop.tmp.dir")));
189 }
190 }
191 }
192
193 private void verify(final HTable table) throws IOException {
194 Scan scan = new Scan();
195 scan.addColumn(FAMILY_NAME, COLUMN_NAME);
196 scan.setMaxVersions(1);
197 ResultScanner scanner = table.getScanner(scan);
198 for (Result r: scanner) {
199 for (Cell kv : r.listCells()) {
200 log.debug(Bytes.toString(r.getRow()) + "\t" + Bytes.toString(CellUtil.cloneFamily(kv))
201 + "\t" + Bytes.toString(CellUtil.cloneQualifier(kv))
202 + "\t" + kv.getTimestamp() + "\t" + Bytes.toBoolean(CellUtil.cloneValue(kv)));
203 org.junit.Assert.assertEquals(TIMESTAMP.get(kv.getTimestamp()),
204 (Boolean)Bytes.toBoolean(CellUtil.cloneValue(kv)));
205 }
206 }
207 scanner.close();
208 }
209
210 }
211