View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.coprocessor;
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertNull;
24  
25  import java.io.IOException;
26  import java.util.Collections;
27  import java.util.List;
28  import java.util.NavigableSet;
29  import java.util.concurrent.CountDownLatch;
30  
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.Coprocessor;
36  import org.apache.hadoop.hbase.HBaseConfiguration;
37  import org.apache.hadoop.hbase.HBaseTestingUtility;
38  import org.apache.hadoop.hbase.HColumnDescriptor;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.HRegionInfo;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.MediumTests;
43  import org.apache.hadoop.hbase.TableName;
44  import org.apache.hadoop.hbase.client.Get;
45  import org.apache.hadoop.hbase.client.HBaseAdmin;
46  import org.apache.hadoop.hbase.client.HTable;
47  import org.apache.hadoop.hbase.client.IsolationLevel;
48  import org.apache.hadoop.hbase.client.Put;
49  import org.apache.hadoop.hbase.client.Result;
50  import org.apache.hadoop.hbase.client.Scan;
51  import org.apache.hadoop.hbase.filter.FilterBase;
52  import org.apache.hadoop.hbase.regionserver.HRegion;
53  import org.apache.hadoop.hbase.regionserver.HRegionServer;
54  import org.apache.hadoop.hbase.regionserver.HStore;
55  import org.apache.hadoop.hbase.regionserver.InternalScanner;
56  import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
57  import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
58  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
59  import org.apache.hadoop.hbase.regionserver.ScanType;
60  import org.apache.hadoop.hbase.regionserver.Store;
61  import org.apache.hadoop.hbase.regionserver.StoreScanner;
62  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
63  import org.apache.hadoop.hbase.regionserver.wal.HLog;
64  import org.apache.hadoop.hbase.util.Bytes;
65  import org.junit.Test;
66  import org.junit.experimental.categories.Category;
67  
68  @Category(MediumTests.class)
69  public class TestRegionObserverScannerOpenHook {
70    private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
71    static final Path DIR = UTIL.getDataTestDir();
72  
73    public static class NoDataFilter extends FilterBase {
74  
75      @Override
76      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
77        return ReturnCode.SKIP;
78      }
79  
80      @Override
81      public boolean filterAllRemaining() throws IOException {
82        return true;
83      }
84  
85      @Override
86      public boolean filterRow() throws IOException {
87        return true;
88      }
89    }
90  
91    /**
92     * Do the same logic as the {@link BaseRegionObserver}. Needed since {@link BaseRegionObserver} is
93     * an abstract class.
94     */
95    public static class EmptyRegionObsever extends BaseRegionObserver {
96    }
97  
98    /**
99     * Don't return any data from a scan by creating a custom {@link StoreScanner}.
100    */
101   public static class NoDataFromScan extends BaseRegionObserver {
102     @Override
103     public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
104         Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s)
105         throws IOException {
106       scan.setFilter(new NoDataFilter());
107       return new StoreScanner(store, store.getScanInfo(), scan, targetCols,
108         ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
109     }
110   }
111 
112   /**
113    * Don't allow any data in a flush by creating a custom {@link StoreScanner}.
114    */
115   public static class NoDataFromFlush extends BaseRegionObserver {
116     @Override
117     public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
118         Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
119       Scan scan = new Scan();
120       scan.setFilter(new NoDataFilter());
121       return new StoreScanner(store, store.getScanInfo(), scan,
122           Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
123           store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
124     }
125   }
126 
127   /**
128    * Don't allow any data to be written out in the compaction by creating a custom
129    * {@link StoreScanner}.
130    */
131   public static class NoDataFromCompaction extends BaseRegionObserver {
132     @Override
133     public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
134         Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
135         long earliestPutTs, InternalScanner s) throws IOException {
136       Scan scan = new Scan();
137       scan.setFilter(new NoDataFilter());
138       return new StoreScanner(store, store.getScanInfo(), scan, scanners,
139           ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
140           HConstants.OLDEST_TIMESTAMP);
141     }
142   }
143 
144   HRegion initHRegion(byte[] tableName, String callingMethod, Configuration conf,
145       byte[]... families) throws IOException {
146     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
147     for (byte[] family : families) {
148       htd.addFamily(new HColumnDescriptor(family));
149     }
150     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
151     Path path = new Path(DIR + callingMethod);
152     HRegion r = HRegion.createHRegion(info, path, conf, htd);
153     // this following piece is a hack. currently a coprocessorHost
154     // is secretly loaded at OpenRegionHandler. we don't really
155     // start a region server here, so just manually create cphost
156     // and set it to region.
157     RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
158     r.setCoprocessorHost(host);
159     return r;
160   }
161 
162   @Test
163   public void testRegionObserverScanTimeStacking() throws Exception {
164     byte[] ROW = Bytes.toBytes("testRow");
165     byte[] TABLE = Bytes.toBytes(getClass().getName());
166     byte[] A = Bytes.toBytes("A");
167     byte[][] FAMILIES = new byte[][] { A };
168 
169     Configuration conf = HBaseConfiguration.create();
170     HRegion region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES);
171     RegionCoprocessorHost h = region.getCoprocessorHost();
172     h.load(NoDataFromScan.class, Coprocessor.PRIORITY_HIGHEST, conf);
173     h.load(EmptyRegionObsever.class, Coprocessor.PRIORITY_USER, conf);
174 
175     Put put = new Put(ROW);
176     put.add(A, A, A);
177     region.put(put);
178 
179     Get get = new Get(ROW);
180     Result r = region.get(get);
181     assertNull(
182       "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: "
183           + r, r.listCells());
184   }
185 
186   @Test
187   public void testRegionObserverFlushTimeStacking() throws Exception {
188     byte[] ROW = Bytes.toBytes("testRow");
189     byte[] TABLE = Bytes.toBytes(getClass().getName());
190     byte[] A = Bytes.toBytes("A");
191     byte[][] FAMILIES = new byte[][] { A };
192 
193     Configuration conf = HBaseConfiguration.create();
194     HRegion region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES);
195     RegionCoprocessorHost h = region.getCoprocessorHost();
196     h.load(NoDataFromFlush.class, Coprocessor.PRIORITY_HIGHEST, conf);
197     h.load(EmptyRegionObsever.class, Coprocessor.PRIORITY_USER, conf);
198 
199     // put a row and flush it to disk
200     Put put = new Put(ROW);
201     put.add(A, A, A);
202     region.put(put);
203     region.flushcache();
204     Get get = new Get(ROW);
205     Result r = region.get(get);
206     assertNull(
207       "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: "
208           + r, r.listCells());
209   }
210 
211   /*
212    * Custom HRegion which uses CountDownLatch to signal the completion of compaction
213    */
214   public static class CompactionCompletionNotifyingRegion extends HRegion {
215     private static volatile CountDownLatch compactionStateChangeLatch = null;
216     
217     @SuppressWarnings("deprecation")
218     public CompactionCompletionNotifyingRegion(Path tableDir, HLog log,
219         FileSystem fs, Configuration confParam, HRegionInfo info,
220         HTableDescriptor htd, RegionServerServices rsServices) {
221       super(tableDir, log, fs, confParam, info, htd, rsServices);
222     }
223     
224     public CountDownLatch getCompactionStateChangeLatch() {
225       if (compactionStateChangeLatch == null) compactionStateChangeLatch = new CountDownLatch(1);
226       return compactionStateChangeLatch;
227     }
228     @Override
229     public boolean compact(CompactionContext compaction, Store store) throws IOException {
230       boolean ret = super.compact(compaction, store);
231       if (ret) compactionStateChangeLatch.countDown();
232       return ret;
233     }    
234   }
235   
236   /**
237    * Unfortunately, the easiest way to test this is to spin up a mini-cluster since we want to do
238    * the usual compaction mechanism on the region, rather than going through the backdoor to the
239    * region
240    */
241   @Test
242   public void testRegionObserverCompactionTimeStacking() throws Exception {
243     // setup a mini cluster so we can do a real compaction on a region
244     Configuration conf = UTIL.getConfiguration();
245     conf.setClass(HConstants.REGION_IMPL, CompactionCompletionNotifyingRegion.class, HRegion.class);
246     conf.setInt("hbase.hstore.compaction.min", 2);
247     UTIL.startMiniCluster();
248     String tableName = "testRegionObserverCompactionTimeStacking";
249     byte[] ROW = Bytes.toBytes("testRow");
250     byte[] A = Bytes.toBytes("A");
251     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
252     desc.addFamily(new HColumnDescriptor(A));
253     desc.addCoprocessor(EmptyRegionObsever.class.getName(), null, Coprocessor.PRIORITY_USER, null);
254     desc.addCoprocessor(NoDataFromCompaction.class.getName(), null, Coprocessor.PRIORITY_HIGHEST,
255       null);
256 
257     HBaseAdmin admin = UTIL.getHBaseAdmin();
258     admin.createTable(desc);
259 
260     HTable table = new HTable(conf, desc.getTableName());
261 
262     // put a row and flush it to disk
263     Put put = new Put(ROW);
264     put.add(A, A, A);
265     table.put(put);
266     table.flushCommits();
267 
268     HRegionServer rs = UTIL.getRSForFirstRegionInTable(desc.getTableName());
269     List<HRegion> regions = rs.getOnlineRegions(desc.getTableName());
270     assertEquals("More than 1 region serving test table with 1 row", 1, regions.size());
271     HRegion region = regions.get(0);
272     admin.flush(region.getRegionName());
273     CountDownLatch latch = ((CompactionCompletionNotifyingRegion)region)
274         .getCompactionStateChangeLatch();
275     
276     // put another row and flush that too
277     put = new Put(Bytes.toBytes("anotherrow"));
278     put.add(A, A, A);
279     table.put(put);
280     table.flushCommits();
281     admin.flush(region.getRegionName());
282 
283     // run a compaction, which normally would should get rid of the data
284     // wait for the compaction checker to complete
285     latch.await();
286     // check both rows to ensure that they aren't there
287     Get get = new Get(ROW);
288     Result r = table.get(get);
289     assertNull(
290       "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: "
291           + r, r.listCells());
292 
293     get = new Get(Bytes.toBytes("anotherrow"));
294     r = table.get(get);
295     assertNull(
296       "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor Found: "
297           + r, r.listCells());
298 
299     table.close();
300     UTIL.shutdownMiniCluster();
301   }
302 }