View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.Random;
24  import java.util.concurrent.ExecutionException;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  
27  import junit.framework.TestCase;
28  
29  import org.apache.hadoop.hbase.testclassification.SmallTests;
30  import org.apache.hadoop.hbase.util.PoolMap.PoolType;
31  import org.junit.experimental.categories.Category;
32  import org.junit.runner.RunWith;
33  import org.junit.runners.Suite;
34  
35  @RunWith(Suite.class)
36  @Suite.SuiteClasses({TestPoolMap.TestRoundRobinPoolType.class, TestPoolMap.TestThreadLocalPoolType.class, TestPoolMap.TestReusablePoolType.class})
37  @Category(SmallTests.class)
38  public class TestPoolMap {
39    public abstract static class TestPoolType extends TestCase {
40      protected PoolMap<String, String> poolMap;
41      protected Random random = new Random();
42  
43      protected static final int POOL_SIZE = 3;
44  
45      @Override
46      protected void setUp() throws Exception {
47        this.poolMap = new PoolMap<String, String>(getPoolType(), POOL_SIZE);
48      }
49  
50      protected abstract PoolType getPoolType();
51  
52      @Override
53      protected void tearDown() throws Exception {
54        this.poolMap.clear();
55      }
56  
57      protected void runThread(final String randomKey, final String randomValue,
58          final String expectedValue) throws InterruptedException {
59        final AtomicBoolean matchFound = new AtomicBoolean(false);
60        Thread thread = new Thread(new Runnable() {
61          @Override
62          public void run() {
63            poolMap.put(randomKey, randomValue);
64            String actualValue = poolMap.get(randomKey);
65            matchFound.set(expectedValue == null ? actualValue == null
66                : expectedValue.equals(actualValue));
67          }
68        });
69        thread.start();
70        thread.join();
71        assertTrue(matchFound.get());
72      }
73    }
74  
75    @Category(SmallTests.class)
76    public static class TestRoundRobinPoolType extends TestPoolType {
77      @Override
78      protected PoolType getPoolType() {
79        return PoolType.RoundRobin;
80      }
81  
82      public void testSingleThreadedClient() throws InterruptedException,
83          ExecutionException {
84        String randomKey = String.valueOf(random.nextInt());
85        String randomValue = String.valueOf(random.nextInt());
86        // As long as the pool is not full, we'll get null back.
87        // This forces the user to create new values that can be used to populate
88        // the pool.
89        runThread(randomKey, randomValue, null);
90        assertEquals(1, poolMap.size(randomKey));
91      }
92  
93      public void testMultiThreadedClients() throws InterruptedException,
94          ExecutionException {
95        for (int i = 0; i < POOL_SIZE; i++) {
96          String randomKey = String.valueOf(random.nextInt());
97          String randomValue = String.valueOf(random.nextInt());
98          // As long as the pool is not full, we'll get null back
99          runThread(randomKey, randomValue, null);
100         // As long as we use distinct keys, each pool will have one value
101         assertEquals(1, poolMap.size(randomKey));
102       }
103       poolMap.clear();
104       String randomKey = String.valueOf(random.nextInt());
105       for (int i = 0; i < POOL_SIZE - 1; i++) {
106         String randomValue = String.valueOf(random.nextInt());
107         // As long as the pool is not full, we'll get null back
108         runThread(randomKey, randomValue, null);
109         // since we use the same key, the pool size should grow
110         assertEquals(i + 1, poolMap.size(randomKey));
111       }
112       // at the end of the day, there should be as many values as we put
113       assertEquals(POOL_SIZE - 1, poolMap.size(randomKey));
114     }
115 
116     public void testPoolCap() throws InterruptedException, ExecutionException {
117       String randomKey = String.valueOf(random.nextInt());
118       List<String> randomValues = new ArrayList<String>();
119       for (int i = 0; i < POOL_SIZE * 2; i++) {
120         String randomValue = String.valueOf(random.nextInt());
121         randomValues.add(randomValue);
122         if (i < POOL_SIZE - 1) {
123           // As long as the pool is not full, we'll get null back
124           runThread(randomKey, randomValue, null);
125         } else {
126           // when the pool becomes full, we expect the value we get back to be
127           // what we put earlier, in round-robin order
128           runThread(randomKey, randomValue,
129               randomValues.get((i - POOL_SIZE + 1) % POOL_SIZE));
130         }
131       }
132       assertEquals(POOL_SIZE, poolMap.size(randomKey));
133     }
134 
135   }
136 
137   @Category(SmallTests.class)
138   public static class TestThreadLocalPoolType extends TestPoolType {
139     @Override
140     protected PoolType getPoolType() {
141       return PoolType.ThreadLocal;
142     }
143 
144     public void testSingleThreadedClient() throws InterruptedException,
145         ExecutionException {
146       String randomKey = String.valueOf(random.nextInt());
147       String randomValue = String.valueOf(random.nextInt());
148       // As long as the pool is not full, we should get back what we put
149       runThread(randomKey, randomValue, randomValue);
150       assertEquals(1, poolMap.size(randomKey));
151     }
152 
153     public void testMultiThreadedClients() throws InterruptedException,
154         ExecutionException {
155       // As long as the pool is not full, we should get back what we put
156       for (int i = 0; i < POOL_SIZE; i++) {
157         String randomKey = String.valueOf(random.nextInt());
158         String randomValue = String.valueOf(random.nextInt());
159         runThread(randomKey, randomValue, randomValue);
160         assertEquals(1, poolMap.size(randomKey));
161       }
162       String randomKey = String.valueOf(random.nextInt());
163       for (int i = 0; i < POOL_SIZE; i++) {
164         String randomValue = String.valueOf(random.nextInt());
165         runThread(randomKey, randomValue, randomValue);
166         assertEquals(i + 1, poolMap.size(randomKey));
167       }
168     }
169 
170     public void testPoolCap() throws InterruptedException, ExecutionException {
171       String randomKey = String.valueOf(random.nextInt());
172       for (int i = 0; i < POOL_SIZE * 2; i++) {
173         String randomValue = String.valueOf(random.nextInt());
174         // as of HBASE-4150, pool limit is no longer used with ThreadLocalPool
175           runThread(randomKey, randomValue, randomValue);
176       }
177       assertEquals(POOL_SIZE * 2, poolMap.size(randomKey));
178     }
179 
180   }
181 
182   @Category(SmallTests.class)
183   public static class TestReusablePoolType extends TestPoolType {
184     @Override
185     protected PoolType getPoolType() {
186       return PoolType.Reusable;
187     }
188 
189     public void testSingleThreadedClient() throws InterruptedException,
190         ExecutionException {
191       String randomKey = String.valueOf(random.nextInt());
192       String randomValue = String.valueOf(random.nextInt());
193       // As long as we poll values we put, the pool size should remain zero
194       runThread(randomKey, randomValue, randomValue);
195       assertEquals(0, poolMap.size(randomKey));
196     }
197 
198     public void testMultiThreadedClients() throws InterruptedException,
199         ExecutionException {
200       // As long as we poll values we put, the pool size should remain zero
201       for (int i = 0; i < POOL_SIZE; i++) {
202         String randomKey = String.valueOf(random.nextInt());
203         String randomValue = String.valueOf(random.nextInt());
204         runThread(randomKey, randomValue, randomValue);
205         assertEquals(0, poolMap.size(randomKey));
206       }
207       poolMap.clear();
208       String randomKey = String.valueOf(random.nextInt());
209       for (int i = 0; i < POOL_SIZE - 1; i++) {
210         String randomValue = String.valueOf(random.nextInt());
211         runThread(randomKey, randomValue, randomValue);
212         assertEquals(0, poolMap.size(randomKey));
213       }
214       assertEquals(0, poolMap.size(randomKey));
215     }
216 
217     public void testPoolCap() throws InterruptedException, ExecutionException {
218       // As long as we poll values we put, the pool size should remain zero
219       String randomKey = String.valueOf(random.nextInt());
220       List<String> randomValues = new ArrayList<String>();
221       for (int i = 0; i < POOL_SIZE * 2; i++) {
222         String randomValue = String.valueOf(random.nextInt());
223         randomValues.add(randomValue);
224         runThread(randomKey, randomValue, randomValue);
225       }
226       assertEquals(0, poolMap.size(randomKey));
227     }
228 
229   }
230 
231 }
232