1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Random;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27 import junit.framework.Test;
28 import junit.framework.TestCase;
29
30 import org.apache.hadoop.hbase.SmallTests;
31 import org.apache.hadoop.hbase.util.PoolMap.PoolType;
32 import org.junit.experimental.categories.Category;
33 import org.junit.runner.RunWith;
34 import org.junit.runners.Suite;
35
36 @RunWith(Suite.class)
37 @Suite.SuiteClasses({TestPoolMap.TestRoundRobinPoolType.class, TestPoolMap.TestThreadLocalPoolType.class, TestPoolMap.TestReusablePoolType.class})
38 @Category(SmallTests.class)
39 public class TestPoolMap {
40 public abstract static class TestPoolType extends TestCase {
41 protected PoolMap<String, String> poolMap;
42 protected Random random = new Random();
43
44 protected static final int POOL_SIZE = 3;
45
46 @Override
47 protected void setUp() throws Exception {
48 this.poolMap = new PoolMap<String, String>(getPoolType(), POOL_SIZE);
49 }
50
51 protected abstract PoolType getPoolType();
52
53 @Override
54 protected void tearDown() throws Exception {
55 this.poolMap.clear();
56 }
57
58 protected void runThread(final String randomKey, final String randomValue,
59 final String expectedValue) throws InterruptedException {
60 final AtomicBoolean matchFound = new AtomicBoolean(false);
61 Thread thread = new Thread(new Runnable() {
62 @Override
63 public void run() {
64 poolMap.put(randomKey, randomValue);
65 String actualValue = poolMap.get(randomKey);
66 matchFound.set(expectedValue == null ? actualValue == null
67 : expectedValue.equals(actualValue));
68 }
69 });
70 thread.start();
71 thread.join();
72 assertTrue(matchFound.get());
73 }
74 }
75
76 @Category(SmallTests.class)
77 public static class TestRoundRobinPoolType extends TestPoolType {
78 @Override
79 protected PoolType getPoolType() {
80 return PoolType.RoundRobin;
81 }
82
83 public void testSingleThreadedClient() throws InterruptedException,
84 ExecutionException {
85 String randomKey = String.valueOf(random.nextInt());
86 String randomValue = String.valueOf(random.nextInt());
87
88
89
90 runThread(randomKey, randomValue, null);
91 assertEquals(1, poolMap.size(randomKey));
92 }
93
94 public void testMultiThreadedClients() throws InterruptedException,
95 ExecutionException {
96 for (int i = 0; i < POOL_SIZE; i++) {
97 String randomKey = String.valueOf(random.nextInt());
98 String randomValue = String.valueOf(random.nextInt());
99
100 runThread(randomKey, randomValue, null);
101
102 assertEquals(1, poolMap.size(randomKey));
103 }
104 poolMap.clear();
105 String randomKey = String.valueOf(random.nextInt());
106 for (int i = 0; i < POOL_SIZE - 1; i++) {
107 String randomValue = String.valueOf(random.nextInt());
108
109 runThread(randomKey, randomValue, null);
110
111 assertEquals(i + 1, poolMap.size(randomKey));
112 }
113
114 assertEquals(POOL_SIZE - 1, poolMap.size(randomKey));
115 }
116
117 public void testPoolCap() throws InterruptedException, ExecutionException {
118 String randomKey = String.valueOf(random.nextInt());
119 List<String> randomValues = new ArrayList<String>();
120 for (int i = 0; i < POOL_SIZE * 2; i++) {
121 String randomValue = String.valueOf(random.nextInt());
122 randomValues.add(randomValue);
123 if (i < POOL_SIZE - 1) {
124
125 runThread(randomKey, randomValue, null);
126 } else {
127
128
129 runThread(randomKey, randomValue,
130 randomValues.get((i - POOL_SIZE + 1) % POOL_SIZE));
131 }
132 }
133 assertEquals(POOL_SIZE, poolMap.size(randomKey));
134 }
135
136 }
137
138 @Category(SmallTests.class)
139 public static class TestThreadLocalPoolType extends TestPoolType {
140 @Override
141 protected PoolType getPoolType() {
142 return PoolType.ThreadLocal;
143 }
144
145 public void testSingleThreadedClient() throws InterruptedException,
146 ExecutionException {
147 String randomKey = String.valueOf(random.nextInt());
148 String randomValue = String.valueOf(random.nextInt());
149
150 runThread(randomKey, randomValue, randomValue);
151 assertEquals(1, poolMap.size(randomKey));
152 }
153
154 public void testMultiThreadedClients() throws InterruptedException,
155 ExecutionException {
156
157 for (int i = 0; i < POOL_SIZE; i++) {
158 String randomKey = String.valueOf(random.nextInt());
159 String randomValue = String.valueOf(random.nextInt());
160 runThread(randomKey, randomValue, randomValue);
161 assertEquals(1, poolMap.size(randomKey));
162 }
163 String randomKey = String.valueOf(random.nextInt());
164 for (int i = 0; i < POOL_SIZE; i++) {
165 String randomValue = String.valueOf(random.nextInt());
166 runThread(randomKey, randomValue, randomValue);
167 assertEquals(i + 1, poolMap.size(randomKey));
168 }
169 }
170
171 public void testPoolCap() throws InterruptedException, ExecutionException {
172 String randomKey = String.valueOf(random.nextInt());
173 for (int i = 0; i < POOL_SIZE * 2; i++) {
174 String randomValue = String.valueOf(random.nextInt());
175
176 runThread(randomKey, randomValue, randomValue);
177 }
178 assertEquals(POOL_SIZE * 2, poolMap.size(randomKey));
179 }
180
181 }
182
183 @Category(SmallTests.class)
184 public static class TestReusablePoolType extends TestPoolType {
185 @Override
186 protected PoolType getPoolType() {
187 return PoolType.Reusable;
188 }
189
190 public void testSingleThreadedClient() throws InterruptedException,
191 ExecutionException {
192 String randomKey = String.valueOf(random.nextInt());
193 String randomValue = String.valueOf(random.nextInt());
194
195 runThread(randomKey, randomValue, randomValue);
196 assertEquals(0, poolMap.size(randomKey));
197 }
198
199 public void testMultiThreadedClients() throws InterruptedException,
200 ExecutionException {
201
202 for (int i = 0; i < POOL_SIZE; i++) {
203 String randomKey = String.valueOf(random.nextInt());
204 String randomValue = String.valueOf(random.nextInt());
205 runThread(randomKey, randomValue, randomValue);
206 assertEquals(0, poolMap.size(randomKey));
207 }
208 poolMap.clear();
209 String randomKey = String.valueOf(random.nextInt());
210 for (int i = 0; i < POOL_SIZE - 1; i++) {
211 String randomValue = String.valueOf(random.nextInt());
212 runThread(randomKey, randomValue, randomValue);
213 assertEquals(0, poolMap.size(randomKey));
214 }
215 assertEquals(0, poolMap.size(randomKey));
216 }
217
218 public void testPoolCap() throws InterruptedException, ExecutionException {
219
220 String randomKey = String.valueOf(random.nextInt());
221 List<String> randomValues = new ArrayList<String>();
222 for (int i = 0; i < POOL_SIZE * 2; i++) {
223 String randomValue = String.valueOf(random.nextInt());
224 randomValues.add(randomValue);
225 runThread(randomKey, randomValue, randomValue);
226 }
227 assertEquals(0, poolMap.size(randomKey));
228 }
229
230 }
231
232 }
233