1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.zookeeper.lock;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertTrue;
25
26 import java.io.IOException;
27 import java.io.InterruptedIOException;
28 import java.util.List;
29 import java.util.concurrent.Callable;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.atomic.AtomicBoolean;
35 import java.util.concurrent.atomic.AtomicInteger;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.DaemonThreadFactory;
41 import org.apache.hadoop.hbase.HBaseTestingUtility;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.InterProcessLock;
44 import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
45 import org.apache.hadoop.hbase.MediumTests;
46 import org.apache.hadoop.hbase.MultithreadedTestUtil;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
49 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
50 import org.junit.After;
51 import org.junit.AfterClass;
52 import org.junit.BeforeClass;
53 import org.junit.Test;
54 import org.junit.experimental.categories.Category;
55
56 import com.google.common.collect.Lists;
57
58 @Category(MediumTests.class)
59 public class TestZKInterProcessReadWriteLock {
60
61 private static final Log LOG =
62 LogFactory.getLog(TestZKInterProcessReadWriteLock.class);
63
64 private static final HBaseTestingUtility TEST_UTIL =
65 new HBaseTestingUtility();
66
67 private static final int NUM_THREADS = 10;
68
69 private static Configuration conf;
70
71 private final AtomicBoolean isLockHeld = new AtomicBoolean(false);
72 private final ExecutorService executor =
73 Executors.newFixedThreadPool(NUM_THREADS,
74 new DaemonThreadFactory("TestZKInterProcessReadWriteLock-"));
75
76 @BeforeClass
77 public static void beforeAllTests() throws Exception {
78 conf = TEST_UTIL.getConfiguration();
79 TEST_UTIL.startMiniZKCluster();
80 conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 1000);
81 ZooKeeperWatcher zkw = getZooKeeperWatcher("setup");
82 ZKUtil.createWithParents(zkw, zkw.tableLockZNode);
83 }
84
85 @AfterClass
86 public static void afterAllTests() throws Exception {
87 TEST_UTIL.shutdownMiniZKCluster();
88 }
89
90 @After
91 public void tearDown() {
92 executor.shutdown();
93 }
94
95 private static ZooKeeperWatcher getZooKeeperWatcher(String desc)
96 throws IOException {
97 return TEST_UTIL.getZooKeeperWatcher();
98 }
99
100
101 @Test(timeout = 30000)
102 public void testWriteLockExcludesWriters() throws Exception {
103 final String testName = "testWriteLockExcludesWriters";
104 final ZKInterProcessReadWriteLock readWriteLock =
105 getReadWriteLock(testName);
106 List<Future<Void>> results = Lists.newArrayList();
107 for (int i = 0; i < NUM_THREADS; ++i) {
108 final String threadDesc = testName + i;
109 results.add(executor.submit(new Callable<Void>() {
110 @Override
111 public Void call() throws IOException {
112 ZKInterProcessWriteLock writeLock =
113 readWriteLock.writeLock(Bytes.toBytes(threadDesc));
114 try {
115 writeLock.acquire();
116 try {
117
118 assertTrue(isLockHeld.compareAndSet(false, true));
119 Thread.sleep(1000);
120
121 assertTrue(isLockHeld.compareAndSet(true, false));
122 } finally {
123 isLockHeld.set(false);
124 writeLock.release();
125 }
126 } catch (InterruptedException e) {
127 LOG.warn(threadDesc + " interrupted", e);
128 Thread.currentThread().interrupt();
129 throw new InterruptedIOException();
130 }
131 return null;
132 }
133 }));
134
135 }
136 MultithreadedTestUtil.assertOnFutures(results);
137 }
138
139 @Test(timeout = 30000)
140 public void testReadLockDoesNotExcludeReaders() throws Exception {
141 final String testName = "testReadLockDoesNotExcludeReaders";
142 final ZKInterProcessReadWriteLock readWriteLock =
143 getReadWriteLock(testName);
144 final CountDownLatch locksAcquiredLatch = new CountDownLatch(NUM_THREADS);
145 final AtomicInteger locksHeld = new AtomicInteger(0);
146 List<Future<Void>> results = Lists.newArrayList();
147 for (int i = 0; i < NUM_THREADS; ++i) {
148 final String threadDesc = testName + i;
149 results.add(executor.submit(new Callable<Void>() {
150 @Override
151 public Void call() throws Exception {
152 ZKInterProcessReadLock readLock =
153 readWriteLock.readLock(Bytes.toBytes(threadDesc));
154 readLock.acquire();
155 try {
156 locksHeld.incrementAndGet();
157 locksAcquiredLatch.countDown();
158 Thread.sleep(1000);
159 } finally {
160 readLock.release();
161 locksHeld.decrementAndGet();
162 }
163 return null;
164 }
165 }));
166 }
167 locksAcquiredLatch.await();
168 assertEquals(locksHeld.get(), NUM_THREADS);
169 MultithreadedTestUtil.assertOnFutures(results);
170 }
171
172 @Test(timeout = 30000)
173 public void testReadLockExcludesWriters() throws Exception {
174
175
176 final String testName = "testReadLockExcludesWriters";
177 List<Future<Void>> results = Lists.newArrayList();
178 final CountDownLatch readLockAcquiredLatch = new CountDownLatch(1);
179 Callable<Void> acquireReadLock = new Callable<Void>() {
180 @Override
181 public Void call() throws Exception {
182 final String threadDesc = testName + "-acquireReadLock";
183 ZKInterProcessReadLock readLock =
184 getReadWriteLock(testName).readLock(Bytes.toBytes(threadDesc));
185 readLock.acquire();
186 try {
187 assertTrue(isLockHeld.compareAndSet(false, true));
188 readLockAcquiredLatch.countDown();
189 Thread.sleep(1000);
190 } finally {
191 isLockHeld.set(false);
192 readLock.release();
193 }
194 return null;
195 }
196 };
197 Callable<Void> acquireWriteLock = new Callable<Void>() {
198 @Override
199 public Void call() throws Exception {
200 final String threadDesc = testName + "-acquireWriteLock";
201 ZKInterProcessWriteLock writeLock =
202 getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
203 readLockAcquiredLatch.await();
204 assertTrue(isLockHeld.get());
205 writeLock.acquire();
206 try {
207 assertFalse(isLockHeld.get());
208 } finally {
209 writeLock.release();
210 }
211 return null;
212 }
213 };
214 results.add(executor.submit(acquireReadLock));
215 results.add(executor.submit(acquireWriteLock));
216 MultithreadedTestUtil.assertOnFutures(results);
217 }
218
219 private static ZKInterProcessReadWriteLock getReadWriteLock(String testName)
220 throws IOException {
221 MetadataHandler handler = new MetadataHandler() {
222 @Override
223 public void handleMetadata(byte[] ownerMetadata) {
224 LOG.info("Lock info: " + Bytes.toString(ownerMetadata));
225 }
226 };
227 ZooKeeperWatcher zkWatcher = getZooKeeperWatcher(testName);
228 String znode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, testName);
229
230 return new ZKInterProcessReadWriteLock(zkWatcher, znode, handler);
231 }
232
233 @Test(timeout = 30000)
234 public void testWriteLockExcludesReaders() throws Exception {
235
236
237 final String testName = "testReadLockExcludesWriters";
238 List<Future<Void>> results = Lists.newArrayList();
239 final CountDownLatch writeLockAcquiredLatch = new CountDownLatch(1);
240 Callable<Void> acquireWriteLock = new Callable<Void>() {
241 @Override
242 public Void call() throws Exception {
243 final String threadDesc = testName + "-acquireWriteLock";
244 ZKInterProcessWriteLock writeLock =
245 getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
246 writeLock.acquire();
247 try {
248 writeLockAcquiredLatch.countDown();
249 assertTrue(isLockHeld.compareAndSet(false, true));
250 Thread.sleep(1000);
251 } finally {
252 isLockHeld.set(false);
253 writeLock.release();
254 }
255 return null;
256 }
257 };
258 Callable<Void> acquireReadLock = new Callable<Void>() {
259 @Override
260 public Void call() throws Exception {
261 final String threadDesc = testName + "-acquireReadLock";
262 ZKInterProcessReadLock readLock =
263 getReadWriteLock(testName).readLock(Bytes.toBytes(threadDesc));
264 writeLockAcquiredLatch.await();
265 readLock.acquire();
266 try {
267 assertFalse(isLockHeld.get());
268 } finally {
269 readLock.release();
270 }
271 return null;
272 }
273 };
274 results.add(executor.submit(acquireWriteLock));
275 results.add(executor.submit(acquireReadLock));
276 MultithreadedTestUtil.assertOnFutures(results);
277 }
278
279 @Test(timeout = 60000)
280 public void testTimeout() throws Exception {
281 final String testName = "testTimeout";
282 final CountDownLatch lockAcquiredLatch = new CountDownLatch(1);
283 Callable<Void> shouldHog = new Callable<Void>() {
284 @Override
285 public Void call() throws Exception {
286 final String threadDesc = testName + "-shouldHog";
287 ZKInterProcessWriteLock lock =
288 getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
289 lock.acquire();
290 lockAcquiredLatch.countDown();
291 Thread.sleep(10000);
292 lock.release();
293 return null;
294 }
295 };
296 Callable<Void> shouldTimeout = new Callable<Void>() {
297 @Override
298 public Void call() throws Exception {
299 final String threadDesc = testName + "-shouldTimeout";
300 ZKInterProcessWriteLock lock =
301 getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
302 lockAcquiredLatch.await();
303 assertFalse(lock.tryAcquire(5000));
304 return null;
305 }
306 };
307 Callable<Void> shouldAcquireLock = new Callable<Void>() {
308 @Override
309 public Void call() throws Exception {
310 final String threadDesc = testName + "-shouldAcquireLock";
311 ZKInterProcessWriteLock lock =
312 getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
313 lockAcquiredLatch.await();
314 assertTrue(lock.tryAcquire(30000));
315 lock.release();
316 return null;
317 }
318 };
319 List<Future<Void>> results = Lists.newArrayList();
320 results.add(executor.submit(shouldHog));
321 results.add(executor.submit(shouldTimeout));
322 results.add(executor.submit(shouldAcquireLock));
323 MultithreadedTestUtil.assertOnFutures(results);
324 }
325
326 @Test(timeout = 60000)
327 public void testMultipleClients() throws Exception {
328
329
330 final String testName = "testMultipleClients";
331
332
333 ZooKeeperWatcher zkWatcher1 = new ZooKeeperWatcher(conf, "testMultipleClients-1", null);
334 ZooKeeperWatcher zkWatcher2 = new ZooKeeperWatcher(conf, "testMultipleClients-2", null);
335
336 String znode = ZKUtil.joinZNode(zkWatcher1.tableLockZNode, testName);
337
338 ZKInterProcessReadWriteLock clientLock1
339 = new ZKInterProcessReadWriteLock(zkWatcher1, znode, null);
340 ZKInterProcessReadWriteLock clientLock2
341 = new ZKInterProcessReadWriteLock(zkWatcher2, znode, null);
342
343 InterProcessLock lock1 = clientLock1.readLock(Bytes.toBytes("client1"));
344 lock1.acquire();
345
346
347
348 InterProcessLock lock2 = clientLock2.writeLock(Bytes.toBytes("client2"));
349 assertFalse(lock2.tryAcquire(1000));
350
351 lock1.release();
352
353
354 assertTrue(lock2.tryAcquire(5000));
355 lock2.release();
356 zkWatcher1.close();
357 zkWatcher2.close();
358 }
359 }