View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.coprocessor;
21  
22  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
23  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
24  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
25  import static org.junit.Assert.assertFalse;
26  import static org.junit.Assert.assertNotNull;
27  import static org.junit.Assert.assertNull;
28  import static org.junit.Assert.assertTrue;
29  import static org.junit.Assert.fail;
30  import static org.mockito.Mockito.when;
31  
32  import java.io.IOException;
33  import java.util.ArrayList;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.concurrent.ConcurrentMap;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.Cell;
43  import org.apache.hadoop.hbase.Coprocessor;
44  import org.apache.hadoop.hbase.CoprocessorEnvironment;
45  import org.apache.hadoop.hbase.HBaseTestCase;
46  import org.apache.hadoop.hbase.HBaseTestingUtility;
47  import org.apache.hadoop.hbase.HColumnDescriptor;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.HTableDescriptor;
51  import org.apache.hadoop.hbase.Server;
52  import org.apache.hadoop.hbase.SmallTests;
53  import org.apache.hadoop.hbase.TableName;
54  import org.apache.hadoop.hbase.client.Get;
55  import org.apache.hadoop.hbase.client.Scan;
56  import org.apache.hadoop.hbase.regionserver.HRegion;
57  import org.apache.hadoop.hbase.regionserver.InternalScanner;
58  import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
59  import org.apache.hadoop.hbase.regionserver.RegionScanner;
60  import org.apache.hadoop.hbase.regionserver.ScanType;
61  import org.apache.hadoop.hbase.regionserver.SplitTransaction;
62  import org.apache.hadoop.hbase.regionserver.Store;
63  import org.apache.hadoop.hbase.regionserver.StoreFile;
64  import org.apache.hadoop.hbase.util.PairOfSameType;
65  import org.junit.Rule;
66  import org.junit.Test;
67  import org.junit.experimental.categories.Category;
68  import org.junit.rules.TestName;
69  import org.mockito.Mockito;
70  
71  @Category(SmallTests.class)
72  public class TestCoprocessorInterface {
73    @Rule public TestName name = new TestName();
74    static final Log LOG = LogFactory.getLog(TestCoprocessorInterface.class);
75    private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
76    static final Path DIR = TEST_UTIL.getDataTestDir();
77  
78    private static class CustomScanner implements RegionScanner {
79  
80      private RegionScanner delegate;
81  
82      public CustomScanner(RegionScanner delegate) {
83        this.delegate = delegate;
84      }
85  
86      @Override
87      public boolean next(List<Cell> results) throws IOException {
88        return delegate.next(results);
89      }
90  
91      @Override
92      public boolean next(List<Cell> result, int limit) throws IOException {
93        return delegate.next(result, limit);
94      }
95  
96      @Override
97      public boolean nextRaw(List<Cell> result) 
98          throws IOException {
99        return delegate.nextRaw(result);
100     }
101 
102     @Override
103     public boolean nextRaw(List<Cell> result, int limit)
104         throws IOException {
105       return delegate.nextRaw(result, limit);
106     }
107 
108     @Override
109     public void close() throws IOException {
110       delegate.close();
111     }
112 
113     @Override
114     public HRegionInfo getRegionInfo() {
115       return delegate.getRegionInfo();
116     }
117 
118     @Override
119     public boolean isFilterDone() throws IOException {
120       return delegate.isFilterDone();
121     }
122 
123     @Override
124     public boolean reseek(byte[] row) throws IOException {
125       return false;
126     }
127 
128     @Override
129     public long getMaxResultSize() {
130       return delegate.getMaxResultSize();
131     }
132 
133     @Override
134     public long getMvccReadPoint() {
135       return delegate.getMvccReadPoint();
136     }
137   }
138 
139   public static class CoprocessorImpl extends BaseRegionObserver {
140 
141     private boolean startCalled;
142     private boolean stopCalled;
143     private boolean preOpenCalled;
144     private boolean postOpenCalled;
145     private boolean preCloseCalled;
146     private boolean postCloseCalled;
147     private boolean preCompactCalled;
148     private boolean postCompactCalled;
149     private boolean preFlushCalled;
150     private boolean postFlushCalled;
151     private boolean preSplitCalled;
152     private boolean postSplitCalled;
153     private boolean preSplitWithSplitRowCalled;
154     private ConcurrentMap<String, Object> sharedData;
155 
156     @Override
157     public void start(CoprocessorEnvironment e) {
158       sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
159       // using new String here, so that there will be new object on each invocation
160       sharedData.putIfAbsent("test1", new Object());
161       startCalled = true;
162     }
163 
164     @Override
165     public void stop(CoprocessorEnvironment e) {
166       sharedData = null;
167       stopCalled = true;
168     }
169 
170     @Override
171     public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
172       preOpenCalled = true;
173     }
174     @Override
175     public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
176       postOpenCalled = true;
177     }
178     @Override
179     public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) {
180       preCloseCalled = true;
181     }
182     @Override
183     public void postClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) {
184       postCloseCalled = true;
185     }
186     @Override
187     public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
188         Store store, InternalScanner scanner, ScanType scanType) {
189       preCompactCalled = true;
190       return scanner;
191     }
192     @Override
193     public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
194         Store store, StoreFile resultFile) {
195       postCompactCalled = true;
196     }
197     @Override
198     public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) {
199       preFlushCalled = true;
200     }
201     @Override
202     public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) {
203       postFlushCalled = true;
204     }
205     @Override
206     public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) {
207       preSplitCalled = true;
208     }
209     
210     @Override
211     public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c,
212         byte[] splitRow) throws IOException {
213       preSplitWithSplitRowCalled = true;
214     }
215     @Override
216     public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, HRegion l, HRegion r) {
217       postSplitCalled = true;
218     }
219 
220     @Override
221     public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
222         final Scan scan, final RegionScanner s) throws IOException {
223       return new CustomScanner(s);
224     }
225 
226     boolean wasStarted() {
227       return startCalled;
228     }
229     boolean wasStopped() {
230       return stopCalled;
231     }
232     boolean wasOpened() {
233       return (preOpenCalled && postOpenCalled);
234     }
235     boolean wasClosed() {
236       return (preCloseCalled && postCloseCalled);
237     }
238     boolean wasFlushed() {
239       return (preFlushCalled && postFlushCalled);
240     }
241     boolean wasCompacted() {
242       return (preCompactCalled && postCompactCalled);
243     }
244     boolean wasSplit() {
245       return (preSplitCalled && postSplitCalled && preSplitWithSplitRowCalled);
246     }
247     Map<String, Object> getSharedData() {
248       return sharedData;
249     }
250   }
251 
252   public static class CoprocessorII extends BaseRegionObserver {
253     private ConcurrentMap<String, Object> sharedData;
254     @Override
255     public void start(CoprocessorEnvironment e) {
256       sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
257       sharedData.putIfAbsent("test2", new Object());
258     }
259     @Override
260     public void stop(CoprocessorEnvironment e) {
261       sharedData = null;
262     }
263     @Override
264     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
265         final Get get, final List<Cell> results) throws IOException {
266       if (1/0 == 1) {
267         e.complete();
268       }
269     }
270 
271     Map<String, Object> getSharedData() {
272       return sharedData;
273     }
274   }
275 
276   @Test
277   public void testSharedData() throws IOException {
278     TableName tableName = TableName.valueOf(name.getMethodName());
279     byte [][] families = { fam1, fam2, fam3 };
280 
281     Configuration hc = initSplit();
282     HRegion region = initHRegion(tableName, name.getMethodName(), hc,
283       new Class<?>[]{}, families);
284 
285     for (int i = 0; i < 3; i++) {
286       HBaseTestCase.addContent(region, fam3);
287       region.flushcache();
288     }
289 
290     region.compactStores();
291 
292     byte [] splitRow = region.checkSplit();
293 
294     assertNotNull(splitRow);
295     HRegion [] regions = split(region, splitRow);
296     for (int i = 0; i < regions.length; i++) {
297       regions[i] = reopenRegion(regions[i], CoprocessorImpl.class, CoprocessorII.class);
298     }
299     Coprocessor c = regions[0].getCoprocessorHost().
300         findCoprocessor(CoprocessorImpl.class.getName());
301     Coprocessor c2 = regions[0].getCoprocessorHost().
302         findCoprocessor(CoprocessorII.class.getName());
303     Object o = ((CoprocessorImpl)c).getSharedData().get("test1");
304     Object o2 = ((CoprocessorII)c2).getSharedData().get("test2");
305     assertNotNull(o);
306     assertNotNull(o2);
307     // to coprocessors get different sharedDatas
308     assertFalse(((CoprocessorImpl)c).getSharedData() == ((CoprocessorII)c2).getSharedData());
309     for (int i = 1; i < regions.length; i++) {
310       c = regions[i].getCoprocessorHost().
311           findCoprocessor(CoprocessorImpl.class.getName());
312       c2 = regions[i].getCoprocessorHost().
313           findCoprocessor(CoprocessorII.class.getName());
314       // make sure that all coprocessor of a class have identical sharedDatas
315       assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
316       assertTrue(((CoprocessorII)c2).getSharedData().get("test2") == o2);
317     }
318     // now have all Environments fail
319     for (int i = 0; i < regions.length; i++) {
320       try {
321         byte [] r = regions[i].getStartKey();
322         if (r == null || r.length <= 0) {
323           // Its the start row.  Can't ask for null.  Ask for minimal key instead.
324           r = new byte [] {0};
325         }
326         Get g = new Get(r);
327         regions[i].get(g);
328         fail();
329       } catch (org.apache.hadoop.hbase.DoNotRetryIOException xc) {
330       }
331       assertNull(regions[i].getCoprocessorHost().
332           findCoprocessor(CoprocessorII.class.getName()));
333     }
334     c = regions[0].getCoprocessorHost().
335         findCoprocessor(CoprocessorImpl.class.getName());
336     assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
337     c = c2 = null;
338     // perform a GC
339     System.gc();
340     // reopen the region
341     region = reopenRegion(regions[0], CoprocessorImpl.class, CoprocessorII.class);
342     c = region.getCoprocessorHost().
343         findCoprocessor(CoprocessorImpl.class.getName());
344     // CPimpl is unaffected, still the same reference
345     assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
346     c2 = region.getCoprocessorHost().
347         findCoprocessor(CoprocessorII.class.getName());
348     // new map and object created, hence the reference is different
349     // hence the old entry was indeed removed by the GC and new one has been created
350     Object o3 = ((CoprocessorII)c2).getSharedData().get("test2");
351     assertFalse(o3 == o2);
352   }
353 
354   @Test
355   public void testCoprocessorInterface() throws IOException {
356     TableName tableName = TableName.valueOf(name.getMethodName());
357     byte [][] families = { fam1, fam2, fam3 };
358 
359     Configuration hc = initSplit();
360     HRegion region = initHRegion(tableName, name.getMethodName(), hc,
361       new Class<?>[]{CoprocessorImpl.class}, families);
362     for (int i = 0; i < 3; i++) {
363       HBaseTestCase.addContent(region, fam3);
364       region.flushcache();
365     }
366 
367     region.compactStores();
368 
369     byte [] splitRow = region.checkSplit();
370 
371     assertNotNull(splitRow);
372     HRegion [] regions = split(region, splitRow);
373     for (int i = 0; i < regions.length; i++) {
374       regions[i] = reopenRegion(regions[i], CoprocessorImpl.class);
375     }
376     HRegion.closeHRegion(region);
377     Coprocessor c = region.getCoprocessorHost().
378       findCoprocessor(CoprocessorImpl.class.getName());
379 
380     // HBASE-4197
381     Scan s = new Scan();
382     RegionScanner scanner = regions[0].getCoprocessorHost().postScannerOpen(s, regions[0].getScanner(s));
383     assertTrue(scanner instanceof CustomScanner);
384     // this would throw an exception before HBASE-4197
385     scanner.next(new ArrayList<Cell>());
386 
387     assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted());
388     assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped());
389     assertTrue(((CoprocessorImpl)c).wasOpened());
390     assertTrue(((CoprocessorImpl)c).wasClosed());
391     assertTrue(((CoprocessorImpl)c).wasFlushed());
392     assertTrue(((CoprocessorImpl)c).wasCompacted());
393     assertTrue(((CoprocessorImpl)c).wasSplit());
394 
395     for (int i = 0; i < regions.length; i++) {
396       HRegion.closeHRegion(regions[i]);
397       c = region.getCoprocessorHost()
398             .findCoprocessor(CoprocessorImpl.class.getName());
399       assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted());
400       assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped());
401       assertTrue(((CoprocessorImpl)c).wasOpened());
402       assertTrue(((CoprocessorImpl)c).wasClosed());
403       assertTrue(((CoprocessorImpl)c).wasCompacted());
404     }
405   }
406 
407   HRegion reopenRegion(final HRegion closedRegion, Class<?> ... implClasses)
408       throws IOException {
409     //HRegionInfo info = new HRegionInfo(tableName, null, null, false);
410     HRegion r = HRegion.openHRegion(closedRegion, null);
411 
412     // this following piece is a hack. currently a coprocessorHost
413     // is secretly loaded at OpenRegionHandler. we don't really
414     // start a region server here, so just manually create cphost
415     // and set it to region.
416     Configuration conf = TEST_UTIL.getConfiguration();
417     RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
418     r.setCoprocessorHost(host);
419 
420     for (Class<?> implClass : implClasses) {
421       host.load(implClass, Coprocessor.PRIORITY_USER, conf);
422     }
423     // we need to manually call pre- and postOpen here since the
424     // above load() is not the real case for CP loading. A CP is
425     // expected to be loaded by default from 1) configuration; or 2)
426     // HTableDescriptor. If it's loaded after HRegion initialized,
427     // the pre- and postOpen() won't be triggered automatically.
428     // Here we have to call pre and postOpen explicitly.
429     host.preOpen();
430     host.postOpen();
431     return r;
432   }
433 
434   HRegion initHRegion (TableName tableName, String callingMethod,
435       Configuration conf, Class<?> [] implClasses, byte [][] families)
436       throws IOException {
437     HTableDescriptor htd = new HTableDescriptor(tableName);
438     for(byte [] family : families) {
439       htd.addFamily(new HColumnDescriptor(family));
440     }
441     HRegionInfo info = new HRegionInfo(tableName, null, null, false);
442     Path path = new Path(DIR + callingMethod);
443     HRegion r = HRegion.createHRegion(info, path, conf, htd);
444 
445     // this following piece is a hack.
446     RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
447     r.setCoprocessorHost(host);
448 
449     for (Class<?> implClass : implClasses) {
450       host.load(implClass, Coprocessor.PRIORITY_USER, conf);
451       Coprocessor c = host.findCoprocessor(implClass.getName());
452       assertNotNull(c);
453     }
454 
455     // Here we have to call pre and postOpen explicitly.
456     host.preOpen();
457     host.postOpen();
458     return r;
459   }
460 
461   Configuration initSplit() {
462     // Always compact if there is more than one store file.
463     TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 2);
464     // Make lease timeout longer, lease checks less frequent
465     TEST_UTIL.getConfiguration().setInt(
466         "hbase.master.lease.thread.wakefrequency", 5 * 1000);
467     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
468     // Increase the amount of time between client retries
469     TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
470     // This size should make it so we always split using the addContent
471     // below.  After adding all data, the first region is 1.3M
472     TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE,
473         1024 * 128);
474     TEST_UTIL.getConfiguration().setBoolean("hbase.testing.nocluster",
475         true);
476     TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
477 
478     return TEST_UTIL.getConfiguration();
479   }
480 
481   private HRegion [] split(final HRegion r, final byte [] splitRow)
482       throws IOException {
483 
484     HRegion[] regions = new HRegion[2];
485 
486     SplitTransaction st = new SplitTransaction(r, splitRow);
487     int i = 0;
488 
489     if (!st.prepare()) {
490       // test fails.
491       assertTrue(false);
492     }
493     try {
494       Server mockServer = Mockito.mock(Server.class);
495       when(mockServer.getConfiguration()).thenReturn(
496           TEST_UTIL.getConfiguration());
497       PairOfSameType<HRegion> daughters = st.execute(mockServer, null);
498       for (HRegion each_daughter: daughters) {
499         regions[i] = each_daughter;
500         i++;
501       }
502     } catch (IOException ioe) {
503       LOG.info("Split transaction of " + r.getRegionNameAsString() +
504           " failed:" + ioe.getMessage());
505       assertTrue(false);
506     } catch (RuntimeException e) {
507       LOG.info("Failed rollback of failed split of " +
508           r.getRegionNameAsString() + e.getMessage());
509     }
510 
511     assertTrue(i == 2);
512     return regions;
513   }
514 
515 }