View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.zookeeper.lock;
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertFalse;
24  import static org.junit.Assert.assertTrue;
25  
26  import java.io.IOException;
27  import java.io.InterruptedIOException;
28  import java.util.List;
29  import java.util.concurrent.Callable;
30  import java.util.concurrent.CountDownLatch;
31  import java.util.concurrent.ExecutorService;
32  import java.util.concurrent.Executors;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  import java.util.concurrent.atomic.AtomicInteger;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.DaemonThreadFactory;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.InterProcessLock;
44  import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
45  import org.apache.hadoop.hbase.MediumTests;
46  import org.apache.hadoop.hbase.MultithreadedTestUtil;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
49  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
50  import org.junit.After;
51  import org.junit.AfterClass;
52  import org.junit.BeforeClass;
53  import org.junit.Test;
54  import org.junit.experimental.categories.Category;
55  
56  import com.google.common.collect.Lists;
57  
58  @Category(MediumTests.class)
59  public class TestZKInterProcessReadWriteLock {
60  
61    private static final Log LOG =
62        LogFactory.getLog(TestZKInterProcessReadWriteLock.class);
63  
64    private static final HBaseTestingUtility TEST_UTIL =
65        new HBaseTestingUtility();
66  
67    private static final int NUM_THREADS = 10;
68  
69    private static Configuration conf;
70  
71    private final AtomicBoolean isLockHeld = new AtomicBoolean(false);
72    private final ExecutorService executor =
73        Executors.newFixedThreadPool(NUM_THREADS,
74            new DaemonThreadFactory("TestZKInterProcessReadWriteLock-"));
75  
76    @BeforeClass
77    public static void beforeAllTests() throws Exception {
78      conf = TEST_UTIL.getConfiguration();
79      TEST_UTIL.startMiniZKCluster();
80      conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 1000);
81      ZooKeeperWatcher zkw = getZooKeeperWatcher("setup");
82      ZKUtil.createWithParents(zkw, zkw.tableLockZNode);
83    }
84  
85    @AfterClass
86    public static void afterAllTests() throws Exception {
87      TEST_UTIL.shutdownMiniZKCluster();
88    }
89  
90    @After
91    public void tearDown() {
92      executor.shutdown();
93    }
94  
95    private static ZooKeeperWatcher getZooKeeperWatcher(String desc)
96    throws IOException {
97      return TEST_UTIL.getZooKeeperWatcher();
98    }
99  
100 
101   @Test(timeout = 30000)
102   public void testWriteLockExcludesWriters() throws Exception {
103     final String testName = "testWriteLockExcludesWriters";
104     final ZKInterProcessReadWriteLock readWriteLock =
105         getReadWriteLock(testName);
106     List<Future<Void>> results = Lists.newArrayList();
107     for (int i = 0; i < NUM_THREADS; ++i) {
108       final String threadDesc = testName + i;
109       results.add(executor.submit(new Callable<Void>() {
110         @Override
111         public Void call() throws IOException {
112           ZKInterProcessWriteLock writeLock =
113               readWriteLock.writeLock(Bytes.toBytes(threadDesc));
114           try {
115             writeLock.acquire();
116             try {
117               // No one else should hold the lock
118               assertTrue(isLockHeld.compareAndSet(false, true));
119               Thread.sleep(1000);
120               // No one else should have released the lock
121               assertTrue(isLockHeld.compareAndSet(true, false));
122             } finally {
123               isLockHeld.set(false);
124               writeLock.release();
125             }
126           } catch (InterruptedException e) {
127             LOG.warn(threadDesc + " interrupted", e);
128             Thread.currentThread().interrupt();
129             throw new InterruptedIOException();
130           }
131           return null;
132         }
133       }));
134 
135     }
136     MultithreadedTestUtil.assertOnFutures(results);
137   }
138 
139   @Test(timeout = 30000)
140   public void testReadLockDoesNotExcludeReaders() throws Exception {
141     final String testName = "testReadLockDoesNotExcludeReaders";
142     final ZKInterProcessReadWriteLock readWriteLock =
143         getReadWriteLock(testName);
144     final CountDownLatch locksAcquiredLatch = new CountDownLatch(NUM_THREADS);
145     final AtomicInteger locksHeld = new AtomicInteger(0);
146     List<Future<Void>> results = Lists.newArrayList();
147     for (int i = 0; i < NUM_THREADS; ++i) {
148       final String threadDesc = testName + i;
149       results.add(executor.submit(new Callable<Void>() {
150         @Override
151         public Void call() throws Exception {
152           ZKInterProcessReadLock readLock =
153               readWriteLock.readLock(Bytes.toBytes(threadDesc));
154           readLock.acquire();
155           try {
156             locksHeld.incrementAndGet();
157             locksAcquiredLatch.countDown();
158             Thread.sleep(1000);
159           } finally {
160             readLock.release();
161             locksHeld.decrementAndGet();
162           }
163           return null;
164         }
165       }));
166     }
167     locksAcquiredLatch.await();
168     assertEquals(locksHeld.get(), NUM_THREADS);
169     MultithreadedTestUtil.assertOnFutures(results);
170   }
171 
172   @Test(timeout = 30000)
173   public void testReadLockExcludesWriters() throws Exception {
174     // Submit a read lock request first
175     // Submit a write lock request second
176     final String testName = "testReadLockExcludesWriters";
177     List<Future<Void>> results = Lists.newArrayList();
178     final CountDownLatch readLockAcquiredLatch = new CountDownLatch(1);
179     Callable<Void> acquireReadLock = new Callable<Void>() {
180       @Override
181       public Void call() throws Exception {
182         final String threadDesc = testName + "-acquireReadLock";
183         ZKInterProcessReadLock readLock =
184             getReadWriteLock(testName).readLock(Bytes.toBytes(threadDesc));
185         readLock.acquire();
186         try {
187           assertTrue(isLockHeld.compareAndSet(false, true));
188           readLockAcquiredLatch.countDown();
189           Thread.sleep(1000);
190         } finally {
191           isLockHeld.set(false);
192           readLock.release();
193         }
194         return null;
195       }
196     };
197     Callable<Void> acquireWriteLock = new Callable<Void>() {
198       @Override
199       public Void call() throws Exception {
200         final String threadDesc = testName + "-acquireWriteLock";
201         ZKInterProcessWriteLock writeLock =
202             getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
203         readLockAcquiredLatch.await();
204         assertTrue(isLockHeld.get());
205         writeLock.acquire();
206         try {
207           assertFalse(isLockHeld.get());
208         } finally {
209           writeLock.release();
210         }
211         return null;
212       }
213     };
214     results.add(executor.submit(acquireReadLock));
215     results.add(executor.submit(acquireWriteLock));
216     MultithreadedTestUtil.assertOnFutures(results);
217   }
218 
219   private static ZKInterProcessReadWriteLock getReadWriteLock(String testName)
220       throws IOException {
221     MetadataHandler handler = new MetadataHandler() {
222       @Override
223       public void handleMetadata(byte[] ownerMetadata) {
224         LOG.info("Lock info: " + Bytes.toString(ownerMetadata));
225       }
226     };
227     ZooKeeperWatcher zkWatcher = getZooKeeperWatcher(testName);
228     String znode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, testName);
229 
230     return new ZKInterProcessReadWriteLock(zkWatcher, znode, handler);
231   }
232 
233   @Test(timeout = 30000)
234   public void testWriteLockExcludesReaders() throws Exception {
235     // Submit a read lock request first
236     // Submit a write lock request second
237     final String testName = "testReadLockExcludesWriters";
238     List<Future<Void>> results = Lists.newArrayList();
239     final CountDownLatch writeLockAcquiredLatch = new CountDownLatch(1);
240     Callable<Void> acquireWriteLock = new Callable<Void>() {
241       @Override
242       public Void call() throws Exception {
243         final String threadDesc = testName + "-acquireWriteLock";
244         ZKInterProcessWriteLock writeLock =
245             getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
246         writeLock.acquire();
247         try {
248           writeLockAcquiredLatch.countDown();
249           assertTrue(isLockHeld.compareAndSet(false, true));
250           Thread.sleep(1000);
251         } finally {
252           isLockHeld.set(false);
253           writeLock.release();
254         }
255         return null;
256       }
257     };
258     Callable<Void> acquireReadLock = new Callable<Void>() {
259       @Override
260       public Void call() throws Exception {
261         final String threadDesc = testName + "-acquireReadLock";
262         ZKInterProcessReadLock readLock =
263             getReadWriteLock(testName).readLock(Bytes.toBytes(threadDesc));
264         writeLockAcquiredLatch.await();
265         readLock.acquire();
266         try {
267           assertFalse(isLockHeld.get());
268         } finally {
269           readLock.release();
270         }
271         return null;
272       }
273     };
274     results.add(executor.submit(acquireWriteLock));
275     results.add(executor.submit(acquireReadLock));
276     MultithreadedTestUtil.assertOnFutures(results);
277   }
278 
279   @Test(timeout = 60000)
280   public void testTimeout() throws Exception {
281     final String testName = "testTimeout";
282     final CountDownLatch lockAcquiredLatch = new CountDownLatch(1);
283     Callable<Void> shouldHog = new Callable<Void>() {
284       @Override
285       public Void call() throws Exception {
286         final String threadDesc = testName + "-shouldHog";
287         ZKInterProcessWriteLock lock =
288             getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
289         lock.acquire();
290         lockAcquiredLatch.countDown();
291         Thread.sleep(10000);
292         lock.release();
293         return null;
294       }
295     };
296     Callable<Void> shouldTimeout = new Callable<Void>() {
297       @Override
298       public Void call() throws Exception {
299         final String threadDesc = testName + "-shouldTimeout";
300         ZKInterProcessWriteLock lock =
301             getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
302         lockAcquiredLatch.await();
303         assertFalse(lock.tryAcquire(5000));
304         return null;
305       }
306     };
307     Callable<Void> shouldAcquireLock = new Callable<Void>() {
308       @Override
309       public Void call() throws Exception {
310         final String threadDesc = testName + "-shouldAcquireLock";
311         ZKInterProcessWriteLock lock =
312             getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
313         lockAcquiredLatch.await();
314         assertTrue(lock.tryAcquire(30000));
315         lock.release();
316         return null;
317       }
318     };
319     List<Future<Void>> results = Lists.newArrayList();
320     results.add(executor.submit(shouldHog));
321     results.add(executor.submit(shouldTimeout));
322     results.add(executor.submit(shouldAcquireLock));
323     MultithreadedTestUtil.assertOnFutures(results);
324   }
325 
326   @Test(timeout = 60000)
327   public void testMultipleClients() throws Exception {
328     //tests lock usage from multiple zookeeper clients with different sessions.
329     //acquire one read lock, then one write lock
330     final String testName = "testMultipleClients";
331 
332     //different zookeeper sessions with separate identifiers
333     ZooKeeperWatcher zkWatcher1 = new ZooKeeperWatcher(conf, "testMultipleClients-1", null);
334     ZooKeeperWatcher zkWatcher2 = new ZooKeeperWatcher(conf, "testMultipleClients-2", null);
335 
336     String znode = ZKUtil.joinZNode(zkWatcher1.tableLockZNode, testName);
337 
338     ZKInterProcessReadWriteLock clientLock1
339       = new ZKInterProcessReadWriteLock(zkWatcher1, znode, null);
340     ZKInterProcessReadWriteLock clientLock2
341       = new ZKInterProcessReadWriteLock(zkWatcher2, znode, null);
342 
343     InterProcessLock lock1 = clientLock1.readLock(Bytes.toBytes("client1"));
344     lock1.acquire();
345 
346     //try to acquire, but it will timeout. We are testing whether this will cause any problems
347     //due to the read lock being from another client
348     InterProcessLock lock2 = clientLock2.writeLock(Bytes.toBytes("client2"));
349     assertFalse(lock2.tryAcquire(1000));
350 
351     lock1.release();
352 
353     //this time it will acquire
354     assertTrue(lock2.tryAcquire(5000));
355     lock2.release();
356     zkWatcher1.close();
357     zkWatcher2.close();
358   }
359 }