1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.util;
21
22 import static org.junit.Assert.assertTrue;
23
24 import java.util.Map;
25 import java.util.Random;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ExecutorCompletionService;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.TimeUnit;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.hbase.MediumTests;
37 import org.junit.Test;
38 import org.junit.experimental.categories.Category;
39
40 @Category(MediumTests.class)
41
42 public class TestIdLock {
43
44 private static final Log LOG = LogFactory.getLog(TestIdLock.class);
45
46 private static final int NUM_IDS = 16;
47 private static final int NUM_THREADS = 128;
48 private static final int NUM_SECONDS = 15;
49
50 private IdLock idLock = new IdLock();
51
52 private Map<Long, String> idOwner = new ConcurrentHashMap<Long, String>();
53
54 private class IdLockTestThread implements Callable<Boolean> {
55
56 private String clientId;
57
58 public IdLockTestThread(String clientId) {
59 this.clientId = clientId;
60 }
61
62 @Override
63 public Boolean call() throws Exception {
64 Thread.currentThread().setName(clientId);
65 Random rand = new Random();
66 long endTime = System.currentTimeMillis() + NUM_SECONDS * 1000;
67 while (System.currentTimeMillis() < endTime) {
68 long id = rand.nextInt(NUM_IDS);
69
70 IdLock.Entry lockEntry = idLock.getLockEntry(id);
71 try {
72 int sleepMs = 1 + rand.nextInt(4);
73 String owner = idOwner.get(id);
74 if (owner != null) {
75 LOG.error("Id " + id + " already taken by " + owner + ", "
76 + clientId + " failed");
77 return false;
78 }
79
80 idOwner.put(id, clientId);
81 Thread.sleep(sleepMs);
82 idOwner.remove(id);
83
84 } finally {
85 idLock.releaseLockEntry(lockEntry);
86 }
87 }
88 return true;
89 }
90
91 }
92
93 @Test
94 public void testMultipleClients() throws Exception {
95 ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS);
96 try {
97 ExecutorCompletionService<Boolean> ecs =
98 new ExecutorCompletionService<Boolean>(exec);
99 for (int i = 0; i < NUM_THREADS; ++i)
100 ecs.submit(new IdLockTestThread("client_" + i));
101 for (int i = 0; i < NUM_THREADS; ++i) {
102 Future<Boolean> result = ecs.take();
103 assertTrue(result.get());
104 }
105 idLock.assertMapEmpty();
106 } finally {
107 exec.shutdown();
108 exec.awaitTermination(5000, TimeUnit.MILLISECONDS);
109 }
110 }
111
112
113 }
114