1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.coprocessor;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNull;
24
25 import java.io.IOException;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.NavigableSet;
29 import java.util.concurrent.CountDownLatch;
30
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.Cell;
35 import org.apache.hadoop.hbase.Coprocessor;
36 import org.apache.hadoop.hbase.HBaseConfiguration;
37 import org.apache.hadoop.hbase.HBaseTestingUtility;
38 import org.apache.hadoop.hbase.HColumnDescriptor;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.HRegionInfo;
41 import org.apache.hadoop.hbase.HTableDescriptor;
42 import org.apache.hadoop.hbase.testclassification.MediumTests;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.client.Admin;
45 import org.apache.hadoop.hbase.client.Get;
46 import org.apache.hadoop.hbase.client.HTable;
47 import org.apache.hadoop.hbase.client.IsolationLevel;
48 import org.apache.hadoop.hbase.client.Put;
49 import org.apache.hadoop.hbase.client.Result;
50 import org.apache.hadoop.hbase.client.Scan;
51 import org.apache.hadoop.hbase.client.Table;
52 import org.apache.hadoop.hbase.filter.FilterBase;
53 import org.apache.hadoop.hbase.regionserver.HRegion;
54 import org.apache.hadoop.hbase.regionserver.HRegionServer;
55 import org.apache.hadoop.hbase.regionserver.HStore;
56 import org.apache.hadoop.hbase.regionserver.InternalScanner;
57 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
58 import org.apache.hadoop.hbase.regionserver.Region;
59 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
60 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
61 import org.apache.hadoop.hbase.regionserver.ScanType;
62 import org.apache.hadoop.hbase.regionserver.Store;
63 import org.apache.hadoop.hbase.regionserver.StoreScanner;
64 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
65 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
66 import org.apache.hadoop.hbase.security.User;
67 import org.apache.hadoop.hbase.wal.WAL;
68 import org.apache.hadoop.hbase.util.Bytes;
69 import org.junit.Test;
70 import org.junit.experimental.categories.Category;
71
72 @Category(MediumTests.class)
73 public class TestRegionObserverScannerOpenHook {
74 private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
75 static final Path DIR = UTIL.getDataTestDir();
76
77 public static class NoDataFilter extends FilterBase {
78
79 @Override
80 public ReturnCode filterKeyValue(Cell ignored) throws IOException {
81 return ReturnCode.SKIP;
82 }
83
84 @Override
85 public boolean filterAllRemaining() throws IOException {
86 return true;
87 }
88
89 @Override
90 public boolean filterRow() throws IOException {
91 return true;
92 }
93 }
94
95
96
97
98
99 public static class EmptyRegionObsever extends BaseRegionObserver {
100 }
101
102
103
104
105 public static class NoDataFromScan extends BaseRegionObserver {
106 @Override
107 public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
108 Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s)
109 throws IOException {
110 scan.setFilter(new NoDataFilter());
111 return new StoreScanner(store, store.getScanInfo(), scan, targetCols,
112 ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
113 }
114 }
115
116
117
118
119 public static class NoDataFromFlush extends BaseRegionObserver {
120 @Override
121 public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
122 Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
123 Scan scan = new Scan();
124 scan.setFilter(new NoDataFilter());
125 return new StoreScanner(store, store.getScanInfo(), scan,
126 Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
127 store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
128 }
129 }
130
131
132
133
134
135 public static class NoDataFromCompaction extends BaseRegionObserver {
136 @Override
137 public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
138 Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
139 long earliestPutTs, InternalScanner s) throws IOException {
140 Scan scan = new Scan();
141 scan.setFilter(new NoDataFilter());
142 return new StoreScanner(store, store.getScanInfo(), scan, scanners,
143 ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
144 HConstants.OLDEST_TIMESTAMP);
145 }
146 }
147
148 Region initHRegion(byte[] tableName, String callingMethod, Configuration conf,
149 byte[]... families) throws IOException {
150 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
151 for (byte[] family : families) {
152 htd.addFamily(new HColumnDescriptor(family));
153 }
154 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
155 Path path = new Path(DIR + callingMethod);
156 HRegion r = HRegion.createHRegion(info, path, conf, htd);
157
158
159
160
161 RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
162 r.setCoprocessorHost(host);
163 return r;
164 }
165
166 @Test
167 public void testRegionObserverScanTimeStacking() throws Exception {
168 byte[] ROW = Bytes.toBytes("testRow");
169 byte[] TABLE = Bytes.toBytes(getClass().getName());
170 byte[] A = Bytes.toBytes("A");
171 byte[][] FAMILIES = new byte[][] { A };
172
173 Configuration conf = HBaseConfiguration.create();
174 Region region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES);
175 RegionCoprocessorHost h = region.getCoprocessorHost();
176 h.load(NoDataFromScan.class, Coprocessor.PRIORITY_HIGHEST, conf);
177 h.load(EmptyRegionObsever.class, Coprocessor.PRIORITY_USER, conf);
178
179 Put put = new Put(ROW);
180 put.add(A, A, A);
181 region.put(put);
182
183 Get get = new Get(ROW);
184 Result r = region.get(get);
185 assertNull(
186 "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: "
187 + r, r.listCells());
188 }
189
190 @Test
191 public void testRegionObserverFlushTimeStacking() throws Exception {
192 byte[] ROW = Bytes.toBytes("testRow");
193 byte[] TABLE = Bytes.toBytes(getClass().getName());
194 byte[] A = Bytes.toBytes("A");
195 byte[][] FAMILIES = new byte[][] { A };
196
197 Configuration conf = HBaseConfiguration.create();
198 Region region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES);
199 RegionCoprocessorHost h = region.getCoprocessorHost();
200 h.load(NoDataFromFlush.class, Coprocessor.PRIORITY_HIGHEST, conf);
201 h.load(EmptyRegionObsever.class, Coprocessor.PRIORITY_USER, conf);
202
203
204 Put put = new Put(ROW);
205 put.add(A, A, A);
206 region.put(put);
207 region.flush(true);
208 Get get = new Get(ROW);
209 Result r = region.get(get);
210 assertNull(
211 "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: "
212 + r, r.listCells());
213 }
214
215
216
217
218 public static class CompactionCompletionNotifyingRegion extends HRegion {
219 private static volatile CountDownLatch compactionStateChangeLatch = null;
220
221 @SuppressWarnings("deprecation")
222 public CompactionCompletionNotifyingRegion(Path tableDir, WAL log,
223 FileSystem fs, Configuration confParam, HRegionInfo info,
224 HTableDescriptor htd, RegionServerServices rsServices) {
225 super(tableDir, log, fs, confParam, info, htd, rsServices);
226 }
227
228 public CountDownLatch getCompactionStateChangeLatch() {
229 if (compactionStateChangeLatch == null) compactionStateChangeLatch = new CountDownLatch(1);
230 return compactionStateChangeLatch;
231 }
232
233 @Override
234 public boolean compact(CompactionContext compaction, Store store,
235 CompactionThroughputController throughputController) throws IOException {
236 boolean ret = super.compact(compaction, store, throughputController);
237 if (ret) compactionStateChangeLatch.countDown();
238 return ret;
239 }
240
241 @Override
242 public boolean compact(CompactionContext compaction, Store store,
243 CompactionThroughputController throughputController, User user) throws IOException {
244 boolean ret = super.compact(compaction, store, throughputController, user);
245 if (ret) compactionStateChangeLatch.countDown();
246 return ret;
247 }
248 }
249
250
251
252
253
254
255 @Test
256 public void testRegionObserverCompactionTimeStacking() throws Exception {
257
258 Configuration conf = UTIL.getConfiguration();
259 conf.setClass(HConstants.REGION_IMPL, CompactionCompletionNotifyingRegion.class, HRegion.class);
260 conf.setInt("hbase.hstore.compaction.min", 2);
261 UTIL.startMiniCluster();
262 String tableName = "testRegionObserverCompactionTimeStacking";
263 byte[] ROW = Bytes.toBytes("testRow");
264 byte[] A = Bytes.toBytes("A");
265 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
266 desc.addFamily(new HColumnDescriptor(A));
267 desc.addCoprocessor(EmptyRegionObsever.class.getName(), null, Coprocessor.PRIORITY_USER, null);
268 desc.addCoprocessor(NoDataFromCompaction.class.getName(), null, Coprocessor.PRIORITY_HIGHEST,
269 null);
270
271 Admin admin = UTIL.getHBaseAdmin();
272 admin.createTable(desc);
273
274 Table table = new HTable(conf, desc.getTableName());
275
276
277 Put put = new Put(ROW);
278 put.add(A, A, A);
279 table.put(put);
280
281 HRegionServer rs = UTIL.getRSForFirstRegionInTable(desc.getTableName());
282 List<Region> regions = rs.getOnlineRegions(desc.getTableName());
283 assertEquals("More than 1 region serving test table with 1 row", 1, regions.size());
284 Region region = regions.get(0);
285 admin.flushRegion(region.getRegionInfo().getRegionName());
286 CountDownLatch latch = ((CompactionCompletionNotifyingRegion)region)
287 .getCompactionStateChangeLatch();
288
289
290 put = new Put(Bytes.toBytes("anotherrow"));
291 put.add(A, A, A);
292 table.put(put);
293 admin.flushRegion(region.getRegionInfo().getRegionName());
294
295
296
297 latch.await();
298
299 Get get = new Get(ROW);
300 Result r = table.get(get);
301 assertNull(
302 "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: "
303 + r, r.listCells());
304
305 get = new Get(Bytes.toBytes("anotherrow"));
306 r = table.get(get);
307 assertNull(
308 "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor Found: "
309 + r, r.listCells());
310
311 table.close();
312 UTIL.shutdownMiniCluster();
313 }
314 }