1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.coprocessor;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23
24 import java.io.IOException;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.Coprocessor;
31 import org.apache.hadoop.hbase.HBaseConfiguration;
32 import org.apache.hadoop.hbase.HBaseTestingUtility;
33 import org.apache.hadoop.hbase.HColumnDescriptor;
34 import org.apache.hadoop.hbase.HTableDescriptor;
35 import org.apache.hadoop.hbase.testclassification.MediumTests;
36 import org.apache.hadoop.hbase.MiniHBaseCluster;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.MetaTableAccessor;
39 import org.apache.hadoop.hbase.client.Admin;
40 import org.apache.hadoop.hbase.client.HBaseAdmin;
41 import org.apache.hadoop.hbase.client.Mutation;
42 import org.apache.hadoop.hbase.regionserver.HRegion;
43 import org.apache.hadoop.hbase.regionserver.HRegionServer;
44 import org.apache.hadoop.hbase.regionserver.Region;
45 import org.apache.hadoop.hbase.regionserver.RegionMergeTransactionFactory;
46 import org.apache.hadoop.hbase.regionserver.RegionMergeTransactionImpl;
47 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.junit.Test;
50 import org.junit.experimental.categories.Category;
51
52
53
54
55
56 @Category(MediumTests.class)
57 public class TestRegionServerObserver {
58 private static final Log LOG = LogFactory.getLog(TestRegionServerObserver.class);
59
60
61
62
63
64 @Test
65 public void testCoprocessorHooksInRegionsMerge() throws Exception {
66 final int NUM_MASTERS = 1;
67 final int NUM_RS = 1;
68 final String TABLENAME = "testRegionServerObserver";
69 final String TABLENAME2 = "testRegionServerObserver_2";
70 final byte[] FAM = Bytes.toBytes("fam");
71
72
73 Configuration conf = HBaseConfiguration.create();
74 conf.setClass("hbase.coprocessor.regionserver.classes", CPRegionServerObserver.class,
75 RegionServerObserver.class);
76
77
78 HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
79 TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
80 Admin admin = new HBaseAdmin(conf);
81 try {
82 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
83 HRegionServer regionServer = cluster.getRegionServer(0);
84 RegionServerCoprocessorHost cpHost = regionServer.getRegionServerCoprocessorHost();
85 Coprocessor coprocessor = cpHost.findCoprocessor(CPRegionServerObserver.class.getName());
86 CPRegionServerObserver regionServerObserver = (CPRegionServerObserver) coprocessor;
87 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(TABLENAME));
88 desc.addFamily(new HColumnDescriptor(FAM));
89 admin.createTable(desc, new byte[][] { Bytes.toBytes("row") });
90 desc = new HTableDescriptor(TableName.valueOf(TABLENAME2));
91 desc.addFamily(new HColumnDescriptor(FAM));
92 admin.createTable(desc, new byte[][] { Bytes.toBytes("row") });
93 assertFalse(regionServerObserver.wasRegionMergeCalled());
94 List<Region> regions = regionServer.getOnlineRegions(TableName.valueOf(TABLENAME));
95 admin.mergeRegions(regions.get(0).getRegionInfo().getEncodedNameAsBytes(), regions.get(1)
96 .getRegionInfo().getEncodedNameAsBytes(), true);
97 int regionsCount = regionServer.getOnlineRegions(TableName.valueOf(TABLENAME)).size();
98 while (regionsCount != 1) {
99 regionsCount = regionServer.getOnlineRegions(TableName.valueOf(TABLENAME)).size();
100 Thread.sleep(1000);
101 }
102 assertTrue(regionServerObserver.wasRegionMergeCalled());
103 assertTrue(regionServerObserver.wasPreMergeCommit());
104 assertTrue(regionServerObserver.wasPostMergeCommit());
105 assertEquals(regionsCount, 1);
106 assertEquals(regionServer.getOnlineRegions(TableName.valueOf(TABLENAME2)).size(), 1);
107 } finally {
108 if (admin != null) admin.close();
109 TEST_UTIL.shutdownMiniCluster();
110 }
111 }
112
113 public static class CPRegionServerObserver extends BaseRegionServerObserver {
114 private RegionMergeTransactionImpl rmt = null;
115 private HRegion mergedRegion = null;
116
117 private boolean preMergeCalled;
118 private boolean preMergeBeforePONRCalled;
119 private boolean preMergeAfterPONRCalled;
120 private boolean preRollBackMergeCalled;
121 private boolean postRollBackMergeCalled;
122 private boolean postMergeCalled;
123
124 public void resetStates() {
125 preMergeCalled = false;
126 preMergeBeforePONRCalled = false;
127 preMergeAfterPONRCalled = false;
128 preRollBackMergeCalled = false;
129 postRollBackMergeCalled = false;
130 postMergeCalled = false;
131 }
132
133 @Override
134 public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA,
135 Region regionB) throws IOException {
136 preMergeCalled = true;
137 }
138
139 @Override
140 public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
141 Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException {
142 preMergeBeforePONRCalled = true;
143 RegionServerCoprocessorEnvironment environment = ctx.getEnvironment();
144 HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
145 List<Region> onlineRegions =
146 rs.getOnlineRegions(TableName.valueOf("testRegionServerObserver_2"));
147 rmt = (RegionMergeTransactionImpl) new RegionMergeTransactionFactory(rs.getConfiguration())
148 .create(onlineRegions.get(0), onlineRegions.get(1), true);
149 if (!rmt.prepare(rs)) {
150 LOG.error("Prepare for the region merge of table "
151 + onlineRegions.get(0).getTableDesc().getNameAsString()
152 + " failed. So returning null. ");
153 ctx.bypass();
154 return;
155 }
156 mergedRegion = rmt.stepsBeforePONR(rs, rs, false);
157 rmt.prepareMutationsForMerge(mergedRegion.getRegionInfo(), regionA.getRegionInfo(),
158 regionB.getRegionInfo(), rs.getServerName(), metaEntries,
159 regionA.getTableDesc().getRegionReplication());
160 MetaTableAccessor.mutateMetaTable(rs.getConnection(), metaEntries);
161 }
162
163 @Override
164 public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
165 Region regionA, Region regionB, Region mr) throws IOException {
166 preMergeAfterPONRCalled = true;
167 RegionServerCoprocessorEnvironment environment = ctx.getEnvironment();
168 HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
169 rmt.stepsAfterPONR(rs, rs, this.mergedRegion, null);
170 }
171
172 @Override
173 public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
174 Region regionA, Region regionB) throws IOException {
175 preRollBackMergeCalled = true;
176 }
177
178 @Override
179 public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
180 Region regionA, Region regionB) throws IOException {
181 postRollBackMergeCalled = true;
182 }
183
184 @Override
185 public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, Region regionA,
186 Region regionB, Region mergedRegion) throws IOException {
187 postMergeCalled = true;
188 }
189
190 public boolean wasPreMergeCalled() {
191 return this.preMergeCalled;
192 }
193
194 public boolean wasPostMergeCalled() {
195 return this.postMergeCalled;
196 }
197
198 public boolean wasPreMergeCommit() {
199 return this.preMergeBeforePONRCalled;
200 }
201
202 public boolean wasPostMergeCommit() {
203 return this.preMergeAfterPONRCalled;
204 }
205
206 public boolean wasPreRollBackMerge() {
207 return this.preRollBackMergeCalled;
208 }
209
210 public boolean wasPostRollBackMerge() {
211 return this.postRollBackMergeCalled;
212 }
213
214 public boolean wasRegionMergeCalled() {
215 return this.preMergeCalled && this.postMergeCalled;
216 }
217
218 }
219
220 }