1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.master;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
26
27 import java.io.IOException;
28 import java.util.List;
29 import java.util.Random;
30 import java.util.concurrent.Callable;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.Future;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.Chore;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.HBaseTestingUtility;
43 import org.apache.hadoop.hbase.HColumnDescriptor;
44 import org.apache.hadoop.hbase.HRegionInfo;
45 import org.apache.hadoop.hbase.HTableDescriptor;
46 import org.apache.hadoop.hbase.LargeTests;
47 import org.apache.hadoop.hbase.NotServingRegionException;
48 import org.apache.hadoop.hbase.ServerName;
49 import org.apache.hadoop.hbase.TableNotDisabledException;
50 import org.apache.hadoop.hbase.Waiter;
51 import org.apache.hadoop.hbase.client.HBaseAdmin;
52 import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
53 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
54 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
55 import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
56 import org.apache.hadoop.hbase.regionserver.HRegion;
57 import org.apache.hadoop.hbase.util.Bytes;
58 import org.apache.hadoop.hbase.util.LoadTestTool;
59 import org.apache.hadoop.hbase.util.StoppableImplementation;
60 import org.apache.hadoop.hbase.util.Threads;
61 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
62 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
63 import org.junit.After;
64 import org.junit.Test;
65 import org.junit.experimental.categories.Category;
66
67
68
69
70 @Category(LargeTests.class)
71 public class TestTableLockManager {
72
73 private static final Log LOG =
74 LogFactory.getLog(TestTableLockManager.class);
75
76 private static final TableName TABLE_NAME =
77 TableName.valueOf("TestTableLevelLocks");
78
79 private static final byte[] FAMILY = Bytes.toBytes("f1");
80
81 private static final byte[] NEW_FAMILY = Bytes.toBytes("f2");
82
83 private final HBaseTestingUtility TEST_UTIL =
84 new HBaseTestingUtility();
85
86 private static final CountDownLatch deleteColumn = new CountDownLatch(1);
87 private static final CountDownLatch addColumn = new CountDownLatch(1);
88
89 public void prepareMiniCluster() throws Exception {
90 TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
91 TEST_UTIL.startMiniCluster(2);
92 TEST_UTIL.createTable(TABLE_NAME, FAMILY);
93 }
94
95 public void prepareMiniZkCluster() throws Exception {
96 TEST_UTIL.startMiniZKCluster(1);
97 }
98
99 @After
100 public void tearDown() throws Exception {
101 TEST_UTIL.shutdownMiniCluster();
102 }
103
104 @Test(timeout = 600000)
105 public void testLockTimeoutException() throws Exception {
106 Configuration conf = TEST_UTIL.getConfiguration();
107 conf.setInt(TableLockManager.TABLE_WRITE_LOCK_TIMEOUT_MS, 3000);
108 prepareMiniCluster();
109 HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
110 master.getCoprocessorHost().load(TestLockTimeoutExceptionMasterObserver.class,
111 0, TEST_UTIL.getConfiguration());
112
113 ExecutorService executor = Executors.newSingleThreadExecutor();
114 Future<Object> shouldFinish = executor.submit(new Callable<Object>() {
115 @Override
116 public Object call() throws Exception {
117 HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
118 admin.deleteColumn(TABLE_NAME, FAMILY);
119 return null;
120 }
121 });
122
123 deleteColumn.await();
124
125 try {
126 HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
127 admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY));
128 fail("Was expecting TableLockTimeoutException");
129 } catch (LockTimeoutException ex) {
130
131 }
132 shouldFinish.get();
133 }
134
135 public static class TestLockTimeoutExceptionMasterObserver extends BaseMasterObserver {
136 @Override
137 public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
138 TableName tableName, byte[] c) throws IOException {
139 deleteColumn.countDown();
140 }
141 @Override
142 public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
143 TableName tableName, byte[] c) throws IOException {
144 Threads.sleep(10000);
145 }
146
147 @Override
148 public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
149 TableName tableName, HColumnDescriptor column) throws IOException {
150 fail("Add column should have timeouted out for acquiring the table lock");
151 }
152 }
153
154 @Test(timeout = 600000)
155 public void testAlterAndDisable() throws Exception {
156 prepareMiniCluster();
157
158
159
160
161 HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
162 master.getCoprocessorHost().load(TestAlterAndDisableMasterObserver.class,
163 0, TEST_UTIL.getConfiguration());
164
165 ExecutorService executor = Executors.newFixedThreadPool(2);
166 Future<Object> alterTableFuture = executor.submit(new Callable<Object>() {
167 @Override
168 public Object call() throws Exception {
169 HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
170 admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY));
171 LOG.info("Added new column family");
172 HTableDescriptor tableDesc = admin.getTableDescriptor(TABLE_NAME);
173 assertTrue(tableDesc.getFamiliesKeys().contains(NEW_FAMILY));
174 return null;
175 }
176 });
177 Future<Object> disableTableFuture = executor.submit(new Callable<Object>() {
178 @Override
179 public Object call() throws Exception {
180 HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
181 admin.disableTable(TABLE_NAME);
182 assertTrue(admin.isTableDisabled(TABLE_NAME));
183 admin.deleteTable(TABLE_NAME);
184 assertFalse(admin.tableExists(TABLE_NAME));
185 return null;
186 }
187 });
188
189 try {
190 disableTableFuture.get();
191 alterTableFuture.get();
192 } catch (ExecutionException e) {
193 if (e.getCause() instanceof AssertionError) {
194 throw (AssertionError) e.getCause();
195 }
196 throw e;
197 }
198 }
199
200 public static class TestAlterAndDisableMasterObserver extends BaseMasterObserver {
201 @Override
202 public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
203 TableName tableName, HColumnDescriptor column) throws IOException {
204 LOG.debug("addColumn called");
205 addColumn.countDown();
206 }
207
208 @Override
209 public void postAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
210 TableName tableName, HColumnDescriptor column) throws IOException {
211 Threads.sleep(6000);
212 try {
213 ctx.getEnvironment().getMasterServices().checkTableModifiable(tableName);
214 } catch(TableNotDisabledException expected) {
215
216 return;
217 } catch(IOException ex) {
218 }
219 fail("was expecting the table to be enabled");
220 }
221
222 @Override
223 public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
224 TableName tableName) throws IOException {
225 try {
226 LOG.debug("Waiting for addColumn to be processed first");
227
228 addColumn.await();
229 LOG.debug("addColumn started, we can continue");
230 } catch (InterruptedException ex) {
231 LOG.warn("Sleep interrupted while waiting for addColumn countdown");
232 }
233 }
234
235 @Override
236 public void postDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
237 TableName tableName) throws IOException {
238 Threads.sleep(3000);
239 }
240 }
241
242 @Test(timeout = 600000)
243 public void testDelete() throws Exception {
244 prepareMiniCluster();
245
246 HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
247 admin.disableTable(TABLE_NAME);
248 admin.deleteTable(TABLE_NAME);
249
250
251 final ZooKeeperWatcher zkWatcher = TEST_UTIL.getZooKeeperWatcher();
252 final String znode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, TABLE_NAME.getNameAsString());
253
254 TEST_UTIL.waitFor(5000, new Waiter.Predicate<Exception>() {
255 @Override
256 public boolean evaluate() throws Exception {
257 int ver = ZKUtil.checkExists(zkWatcher, znode);
258 return ver < 0;
259 }
260 });
261 int ver = ZKUtil.checkExists(zkWatcher,
262 ZKUtil.joinZNode(zkWatcher.tableLockZNode, TABLE_NAME.getNameAsString()));
263 assertTrue("Unexpected znode version " + ver, ver < 0);
264
265 }
266
267
268 @Test(timeout = 600000)
269 public void testReapAllTableLocks() throws Exception {
270 prepareMiniZkCluster();
271 ServerName serverName = ServerName.valueOf("localhost:10000", 0);
272 final TableLockManager lockManager = TableLockManager.createTableLockManager(
273 TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName);
274
275 String tables[] = {"table1", "table2", "table3", "table4"};
276 ExecutorService executor = Executors.newFixedThreadPool(6);
277
278 final CountDownLatch writeLocksObtained = new CountDownLatch(4);
279 final CountDownLatch writeLocksAttempted = new CountDownLatch(10);
280
281
282
283 for (int i = 0; i < tables.length; i++) {
284 final String table = tables[i];
285 for (int j = 0; j < i+1; j++) {
286 executor.submit(new Callable<Void>() {
287 @Override
288 public Void call() throws Exception {
289 writeLocksAttempted.countDown();
290 lockManager.writeLock(TableName.valueOf(table),
291 "testReapAllTableLocks").acquire();
292 writeLocksObtained.countDown();
293 return null;
294 }
295 });
296 }
297 }
298
299 writeLocksObtained.await();
300 writeLocksAttempted.await();
301
302
303 lockManager.reapWriteLocks();
304
305 TEST_UTIL.getConfiguration().setInt(TableLockManager.TABLE_WRITE_LOCK_TIMEOUT_MS, 0);
306 TableLockManager zeroTimeoutLockManager = TableLockManager.createTableLockManager(
307 TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName);
308
309
310 zeroTimeoutLockManager.writeLock(
311 TableName.valueOf(tables[tables.length - 1]),
312 "zero timeout")
313 .acquire();
314
315 executor.shutdownNow();
316 }
317
318 @Test(timeout = 600000)
319 public void testTableReadLock() throws Exception {
320
321
322
323
324
325 prepareMiniCluster();
326 LoadTestTool loadTool = new LoadTestTool();
327 loadTool.setConf(TEST_UTIL.getConfiguration());
328 int numKeys = 10000;
329 final TableName tableName = TableName.valueOf("testTableReadLock");
330 final HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
331 final HTableDescriptor desc = new HTableDescriptor(tableName);
332 final byte[] family = Bytes.toBytes("test_cf");
333 desc.addFamily(new HColumnDescriptor(family));
334 admin.createTable(desc);
335
336
337 int ret = loadTool.run(new String[] { "-tn", tableName.getNameAsString(), "-write",
338 String.format("%d:%d:%d", 1, 10, 10), "-num_keys", String.valueOf(numKeys), "-skip_init" });
339 if (0 != ret) {
340 String errorMsg = "Load failed with error code " + ret;
341 LOG.error(errorMsg);
342 fail(errorMsg);
343 }
344
345 int familyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size();
346 StoppableImplementation stopper = new StoppableImplementation();
347
348
349 Chore alterThread = new Chore("Alter Chore", 10000, stopper) {
350 @Override
351 protected void chore() {
352 Random random = new Random();
353 try {
354 HTableDescriptor htd = admin.getTableDescriptor(tableName);
355 String val = String.valueOf(random.nextInt());
356 htd.getFamily(family).setValue(val, val);
357 desc.getFamily(family).setValue(val, val);
358
359 admin.modifyTable(tableName, htd);
360 } catch (Exception ex) {
361 LOG.warn("Caught exception", ex);
362 fail(ex.getMessage());
363 }
364 }
365 };
366
367
368 Chore splitThread = new Chore("Split thread", 5000, stopper) {
369 @Override
370 public void chore() {
371 try {
372 HRegion region = TEST_UTIL.getSplittableRegion(tableName, -1);
373 if (region != null) {
374 byte[] regionName = region.getRegionName();
375 admin.flush(regionName);
376 admin.compact(regionName);
377 admin.split(regionName);
378 } else {
379 LOG.warn("Could not find suitable region for the table. Possibly the " +
380 "region got closed and the attempts got over before " +
381 "the region could have got reassigned.");
382 }
383 } catch (NotServingRegionException nsre) {
384
385 LOG.warn("Caught exception", nsre);
386 } catch (Exception ex) {
387 LOG.warn("Caught exception", ex);
388 fail(ex.getMessage());
389 }
390 }
391 };
392
393 alterThread.start();
394 splitThread.start();
395 while (true) {
396 List<HRegionInfo> regions = admin.getTableRegions(tableName);
397 LOG.info(String.format("Table #regions: %d regions: %s:", regions.size(), regions));
398 assertEquals(admin.getTableDescriptor(tableName), desc);
399 for (HRegion region : TEST_UTIL.getMiniHBaseCluster().getRegions(tableName)) {
400 assertEquals(desc, region.getTableDesc());
401 }
402 if (regions.size() >= 5) {
403 break;
404 }
405 Threads.sleep(1000);
406 }
407 stopper.stop("test finished");
408
409 int newFamilyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size();
410 LOG.info(String.format("Altered the table %d times", newFamilyValues - familyValues));
411 assertTrue(newFamilyValues > familyValues);
412
413
414 ret = loadTool.run(new String[] { "-tn", tableName.getNameAsString(), "-read", "100:10",
415 "-num_keys", String.valueOf(numKeys), "-skip_init" });
416 if (0 != ret) {
417 String errorMsg = "Verify failed with error code " + ret;
418 LOG.error(errorMsg);
419 fail(errorMsg);
420 }
421
422 admin.close();
423 }
424
425 }