1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.coprocessor;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNull;
24
25 import java.io.IOException;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.NavigableSet;
29 import java.util.concurrent.CountDownLatch;
30
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.Cell;
35 import org.apache.hadoop.hbase.Coprocessor;
36 import org.apache.hadoop.hbase.HBaseConfiguration;
37 import org.apache.hadoop.hbase.HBaseTestingUtility;
38 import org.apache.hadoop.hbase.HColumnDescriptor;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.HRegionInfo;
41 import org.apache.hadoop.hbase.HTableDescriptor;
42 import org.apache.hadoop.hbase.MediumTests;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.client.Get;
45 import org.apache.hadoop.hbase.client.HBaseAdmin;
46 import org.apache.hadoop.hbase.client.HTable;
47 import org.apache.hadoop.hbase.client.IsolationLevel;
48 import org.apache.hadoop.hbase.client.Put;
49 import org.apache.hadoop.hbase.client.Result;
50 import org.apache.hadoop.hbase.client.Scan;
51 import org.apache.hadoop.hbase.filter.FilterBase;
52 import org.apache.hadoop.hbase.regionserver.HRegion;
53 import org.apache.hadoop.hbase.regionserver.HRegionServer;
54 import org.apache.hadoop.hbase.regionserver.HStore;
55 import org.apache.hadoop.hbase.regionserver.InternalScanner;
56 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
57 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
58 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
59 import org.apache.hadoop.hbase.regionserver.ScanType;
60 import org.apache.hadoop.hbase.regionserver.Store;
61 import org.apache.hadoop.hbase.regionserver.StoreScanner;
62 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
63 import org.apache.hadoop.hbase.regionserver.wal.HLog;
64 import org.apache.hadoop.hbase.util.Bytes;
65 import org.junit.Test;
66 import org.junit.experimental.categories.Category;
67
68 @Category(MediumTests.class)
69 public class TestRegionObserverScannerOpenHook {
70 private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
71 static final Path DIR = UTIL.getDataTestDir();
72
73 public static class NoDataFilter extends FilterBase {
74
75 @Override
76 public ReturnCode filterKeyValue(Cell ignored) throws IOException {
77 return ReturnCode.SKIP;
78 }
79
80 @Override
81 public boolean filterAllRemaining() throws IOException {
82 return true;
83 }
84
85 @Override
86 public boolean filterRow() throws IOException {
87 return true;
88 }
89 }
90
91
92
93
94
95 public static class EmptyRegionObsever extends BaseRegionObserver {
96 }
97
98
99
100
101 public static class NoDataFromScan extends BaseRegionObserver {
102 @Override
103 public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
104 Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s)
105 throws IOException {
106 scan.setFilter(new NoDataFilter());
107 return new StoreScanner(store, store.getScanInfo(), scan, targetCols,
108 ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
109 }
110 }
111
112
113
114
115 public static class NoDataFromFlush extends BaseRegionObserver {
116 @Override
117 public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
118 Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
119 Scan scan = new Scan();
120 scan.setFilter(new NoDataFilter());
121 return new StoreScanner(store, store.getScanInfo(), scan,
122 Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
123 store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
124 }
125 }
126
127
128
129
130
131 public static class NoDataFromCompaction extends BaseRegionObserver {
132 @Override
133 public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
134 Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
135 long earliestPutTs, InternalScanner s) throws IOException {
136 Scan scan = new Scan();
137 scan.setFilter(new NoDataFilter());
138 return new StoreScanner(store, store.getScanInfo(), scan, scanners,
139 ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
140 HConstants.OLDEST_TIMESTAMP);
141 }
142 }
143
144 HRegion initHRegion(byte[] tableName, String callingMethod, Configuration conf,
145 byte[]... families) throws IOException {
146 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
147 for (byte[] family : families) {
148 htd.addFamily(new HColumnDescriptor(family));
149 }
150 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
151 Path path = new Path(DIR + callingMethod);
152 HRegion r = HRegion.createHRegion(info, path, conf, htd);
153
154
155
156
157 RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
158 r.setCoprocessorHost(host);
159 return r;
160 }
161
162 @Test
163 public void testRegionObserverScanTimeStacking() throws Exception {
164 byte[] ROW = Bytes.toBytes("testRow");
165 byte[] TABLE = Bytes.toBytes(getClass().getName());
166 byte[] A = Bytes.toBytes("A");
167 byte[][] FAMILIES = new byte[][] { A };
168
169 Configuration conf = HBaseConfiguration.create();
170 HRegion region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES);
171 RegionCoprocessorHost h = region.getCoprocessorHost();
172 h.load(NoDataFromScan.class, Coprocessor.PRIORITY_HIGHEST, conf);
173 h.load(EmptyRegionObsever.class, Coprocessor.PRIORITY_USER, conf);
174
175 Put put = new Put(ROW);
176 put.add(A, A, A);
177 region.put(put);
178
179 Get get = new Get(ROW);
180 Result r = region.get(get);
181 assertNull(
182 "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: "
183 + r, r.listCells());
184 }
185
186 @Test
187 public void testRegionObserverFlushTimeStacking() throws Exception {
188 byte[] ROW = Bytes.toBytes("testRow");
189 byte[] TABLE = Bytes.toBytes(getClass().getName());
190 byte[] A = Bytes.toBytes("A");
191 byte[][] FAMILIES = new byte[][] { A };
192
193 Configuration conf = HBaseConfiguration.create();
194 HRegion region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES);
195 RegionCoprocessorHost h = region.getCoprocessorHost();
196 h.load(NoDataFromFlush.class, Coprocessor.PRIORITY_HIGHEST, conf);
197 h.load(EmptyRegionObsever.class, Coprocessor.PRIORITY_USER, conf);
198
199
200 Put put = new Put(ROW);
201 put.add(A, A, A);
202 region.put(put);
203 region.flushcache();
204 Get get = new Get(ROW);
205 Result r = region.get(get);
206 assertNull(
207 "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: "
208 + r, r.listCells());
209 }
210
211
212
213
214 public static class CompactionCompletionNotifyingRegion extends HRegion {
215 private static volatile CountDownLatch compactionStateChangeLatch = null;
216
217 @SuppressWarnings("deprecation")
218 public CompactionCompletionNotifyingRegion(Path tableDir, HLog log,
219 FileSystem fs, Configuration confParam, HRegionInfo info,
220 HTableDescriptor htd, RegionServerServices rsServices) {
221 super(tableDir, log, fs, confParam, info, htd, rsServices);
222 }
223
224 public CountDownLatch getCompactionStateChangeLatch() {
225 if (compactionStateChangeLatch == null) compactionStateChangeLatch = new CountDownLatch(1);
226 return compactionStateChangeLatch;
227 }
228 @Override
229 public boolean compact(CompactionContext compaction, Store store) throws IOException {
230 boolean ret = super.compact(compaction, store);
231 if (ret) compactionStateChangeLatch.countDown();
232 return ret;
233 }
234 }
235
236
237
238
239
240
241 @Test
242 public void testRegionObserverCompactionTimeStacking() throws Exception {
243
244 Configuration conf = UTIL.getConfiguration();
245 conf.setClass(HConstants.REGION_IMPL, CompactionCompletionNotifyingRegion.class, HRegion.class);
246 conf.setInt("hbase.hstore.compaction.min", 2);
247 UTIL.startMiniCluster();
248 String tableName = "testRegionObserverCompactionTimeStacking";
249 byte[] ROW = Bytes.toBytes("testRow");
250 byte[] A = Bytes.toBytes("A");
251 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
252 desc.addFamily(new HColumnDescriptor(A));
253 desc.addCoprocessor(EmptyRegionObsever.class.getName(), null, Coprocessor.PRIORITY_USER, null);
254 desc.addCoprocessor(NoDataFromCompaction.class.getName(), null, Coprocessor.PRIORITY_HIGHEST,
255 null);
256
257 HBaseAdmin admin = UTIL.getHBaseAdmin();
258 admin.createTable(desc);
259
260 HTable table = new HTable(conf, desc.getTableName());
261
262
263 Put put = new Put(ROW);
264 put.add(A, A, A);
265 table.put(put);
266 table.flushCommits();
267
268 HRegionServer rs = UTIL.getRSForFirstRegionInTable(desc.getTableName());
269 List<HRegion> regions = rs.getOnlineRegions(desc.getTableName());
270 assertEquals("More than 1 region serving test table with 1 row", 1, regions.size());
271 HRegion region = regions.get(0);
272 admin.flush(region.getRegionName());
273 CountDownLatch latch = ((CompactionCompletionNotifyingRegion)region)
274 .getCompactionStateChangeLatch();
275
276
277 put = new Put(Bytes.toBytes("anotherrow"));
278 put.add(A, A, A);
279 table.put(put);
280 table.flushCommits();
281 admin.flush(region.getRegionName());
282
283
284
285 latch.await();
286
287 Get get = new Get(ROW);
288 Result r = table.get(get);
289 assertNull(
290 "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: "
291 + r, r.listCells());
292
293 get = new Get(Bytes.toBytes("anotherrow"));
294 r = table.get(get);
295 assertNull(
296 "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor Found: "
297 + r, r.listCells());
298
299 table.close();
300 UTIL.shutdownMiniCluster();
301 }
302 }