1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.util;
21
22 import static org.junit.Assert.assertTrue;
23 import static org.junit.Assert.assertEquals;
24
25 import java.util.Map;
26 import java.util.Random;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ExecutorCompletionService;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.locks.Lock;
35 import java.util.concurrent.locks.ReentrantReadWriteLock;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.hbase.testclassification.MediumTests;
40 import org.junit.Test;
41 import org.junit.experimental.categories.Category;
42
43 @Category({MediumTests.class})
44
45 public class TestIdReadWriteLock {
46
47 private static final Log LOG = LogFactory.getLog(TestIdReadWriteLock.class);
48
49 private static final int NUM_IDS = 16;
50 private static final int NUM_THREADS = 128;
51 private static final int NUM_SECONDS = 15;
52
53 private IdReadWriteLock idLock = new IdReadWriteLock();
54
55 private Map<Long, String> idOwner = new ConcurrentHashMap<Long, String>();
56
57 private class IdLockTestThread implements Callable<Boolean> {
58
59 private String clientId;
60
61 public IdLockTestThread(String clientId) {
62 this.clientId = clientId;
63 }
64
65 @Override
66 public Boolean call() throws Exception {
67 Thread.currentThread().setName(clientId);
68 Random rand = new Random();
69 long endTime = System.currentTimeMillis() + NUM_SECONDS * 1000;
70 while (System.currentTimeMillis() < endTime) {
71 long id = rand.nextInt(NUM_IDS);
72 boolean readLock = rand.nextBoolean();
73
74 ReentrantReadWriteLock readWriteLock = idLock.getLock(id);
75 Lock lock = readLock ? readWriteLock.readLock() : readWriteLock.writeLock();
76 try {
77 lock.lock();
78 int sleepMs = 1 + rand.nextInt(4);
79 String owner = idOwner.get(id);
80 if (owner != null && LOG.isDebugEnabled()) {
81 LOG.debug((readLock ? "Read" : "Write") + "lock of Id " + id + " already taken by "
82 + owner + ", we are " + clientId);
83 }
84
85 idOwner.put(id, clientId);
86 Thread.sleep(sleepMs);
87 idOwner.remove(id);
88
89 } finally {
90 lock.unlock();
91 if (LOG.isDebugEnabled()) {
92 LOG.debug("Release " + (readLock ? "Read" : "Write") + " lock of Id" + id + ", we are "
93 + clientId);
94 }
95 }
96 }
97 return true;
98 }
99
100 }
101
102 @Test(timeout = 60000)
103 public void testMultipleClients() throws Exception {
104 ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS);
105 try {
106 ExecutorCompletionService<Boolean> ecs =
107 new ExecutorCompletionService<Boolean>(exec);
108 for (int i = 0; i < NUM_THREADS; ++i)
109 ecs.submit(new IdLockTestThread("client_" + i));
110 for (int i = 0; i < NUM_THREADS; ++i) {
111 Future<Boolean> result = ecs.take();
112 assertTrue(result.get());
113 }
114
115 int entryPoolSize = idLock.purgeAndGetEntryPoolSize();
116 LOG.debug("Size of entry pool after gc and purge: " + entryPoolSize);
117 assertEquals(0, entryPoolSize);
118 } finally {
119 exec.shutdown();
120 exec.awaitTermination(5000, TimeUnit.MILLISECONDS);
121 }
122 }
123
124
125 }
126