View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.Random;
24  import java.util.concurrent.ExecutionException;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  
27  import junit.framework.Test;
28  import junit.framework.TestCase;
29  
30  import org.apache.hadoop.hbase.SmallTests;
31  import org.apache.hadoop.hbase.util.PoolMap.PoolType;
32  import org.junit.experimental.categories.Category;
33  import org.junit.runner.RunWith;
34  import org.junit.runners.Suite;
35  
36  @RunWith(Suite.class)
37  @Suite.SuiteClasses({TestPoolMap.TestRoundRobinPoolType.class, TestPoolMap.TestThreadLocalPoolType.class, TestPoolMap.TestReusablePoolType.class})
38  @Category(SmallTests.class)
39  public class TestPoolMap {
40    public abstract static class TestPoolType extends TestCase {
41      protected PoolMap<String, String> poolMap;
42      protected Random random = new Random();
43  
44      protected static final int POOL_SIZE = 3;
45  
46      @Override
47      protected void setUp() throws Exception {
48        this.poolMap = new PoolMap<String, String>(getPoolType(), POOL_SIZE);
49      }
50  
51      protected abstract PoolType getPoolType();
52  
53      @Override
54      protected void tearDown() throws Exception {
55        this.poolMap.clear();
56      }
57  
58      protected void runThread(final String randomKey, final String randomValue,
59          final String expectedValue) throws InterruptedException {
60        final AtomicBoolean matchFound = new AtomicBoolean(false);
61        Thread thread = new Thread(new Runnable() {
62          @Override
63          public void run() {
64            poolMap.put(randomKey, randomValue);
65            String actualValue = poolMap.get(randomKey);
66            matchFound.set(expectedValue == null ? actualValue == null
67                : expectedValue.equals(actualValue));
68          }
69        });
70        thread.start();
71        thread.join();
72        assertTrue(matchFound.get());
73      }
74    }
75  
76    @Category(SmallTests.class)
77    public static class TestRoundRobinPoolType extends TestPoolType {
78      @Override
79      protected PoolType getPoolType() {
80        return PoolType.RoundRobin;
81      }
82  
83      public void testSingleThreadedClient() throws InterruptedException,
84          ExecutionException {
85        String randomKey = String.valueOf(random.nextInt());
86        String randomValue = String.valueOf(random.nextInt());
87        // As long as the pool is not full, we'll get null back.
88        // This forces the user to create new values that can be used to populate
89        // the pool.
90        runThread(randomKey, randomValue, null);
91        assertEquals(1, poolMap.size(randomKey));
92      }
93  
94      public void testMultiThreadedClients() throws InterruptedException,
95          ExecutionException {
96        for (int i = 0; i < POOL_SIZE; i++) {
97          String randomKey = String.valueOf(random.nextInt());
98          String randomValue = String.valueOf(random.nextInt());
99          // As long as the pool is not full, we'll get null back
100         runThread(randomKey, randomValue, null);
101         // As long as we use distinct keys, each pool will have one value
102         assertEquals(1, poolMap.size(randomKey));
103       }
104       poolMap.clear();
105       String randomKey = String.valueOf(random.nextInt());
106       for (int i = 0; i < POOL_SIZE - 1; i++) {
107         String randomValue = String.valueOf(random.nextInt());
108         // As long as the pool is not full, we'll get null back
109         runThread(randomKey, randomValue, null);
110         // since we use the same key, the pool size should grow
111         assertEquals(i + 1, poolMap.size(randomKey));
112       }
113       // at the end of the day, there should be as many values as we put
114       assertEquals(POOL_SIZE - 1, poolMap.size(randomKey));
115     }
116 
117     public void testPoolCap() throws InterruptedException, ExecutionException {
118       String randomKey = String.valueOf(random.nextInt());
119       List<String> randomValues = new ArrayList<String>();
120       for (int i = 0; i < POOL_SIZE * 2; i++) {
121         String randomValue = String.valueOf(random.nextInt());
122         randomValues.add(randomValue);
123         if (i < POOL_SIZE - 1) {
124           // As long as the pool is not full, we'll get null back
125           runThread(randomKey, randomValue, null);
126         } else {
127           // when the pool becomes full, we expect the value we get back to be
128           // what we put earlier, in round-robin order
129           runThread(randomKey, randomValue,
130               randomValues.get((i - POOL_SIZE + 1) % POOL_SIZE));
131         }
132       }
133       assertEquals(POOL_SIZE, poolMap.size(randomKey));
134     }
135 
136   }
137 
138   @Category(SmallTests.class)
139   public static class TestThreadLocalPoolType extends TestPoolType {
140     @Override
141     protected PoolType getPoolType() {
142       return PoolType.ThreadLocal;
143     }
144 
145     public void testSingleThreadedClient() throws InterruptedException,
146         ExecutionException {
147       String randomKey = String.valueOf(random.nextInt());
148       String randomValue = String.valueOf(random.nextInt());
149       // As long as the pool is not full, we should get back what we put
150       runThread(randomKey, randomValue, randomValue);
151       assertEquals(1, poolMap.size(randomKey));
152     }
153 
154     public void testMultiThreadedClients() throws InterruptedException,
155         ExecutionException {
156       // As long as the pool is not full, we should get back what we put
157       for (int i = 0; i < POOL_SIZE; i++) {
158         String randomKey = String.valueOf(random.nextInt());
159         String randomValue = String.valueOf(random.nextInt());
160         runThread(randomKey, randomValue, randomValue);
161         assertEquals(1, poolMap.size(randomKey));
162       }
163       String randomKey = String.valueOf(random.nextInt());
164       for (int i = 0; i < POOL_SIZE; i++) {
165         String randomValue = String.valueOf(random.nextInt());
166         runThread(randomKey, randomValue, randomValue);
167         assertEquals(i + 1, poolMap.size(randomKey));
168       }
169     }
170 
171     public void testPoolCap() throws InterruptedException, ExecutionException {
172       String randomKey = String.valueOf(random.nextInt());
173       for (int i = 0; i < POOL_SIZE * 2; i++) {
174         String randomValue = String.valueOf(random.nextInt());
175         // as of HBASE-4150, pool limit is no longer used with ThreadLocalPool
176           runThread(randomKey, randomValue, randomValue);
177       }
178       assertEquals(POOL_SIZE * 2, poolMap.size(randomKey));
179     }
180 
181   }
182 
183   @Category(SmallTests.class)
184   public static class TestReusablePoolType extends TestPoolType {
185     @Override
186     protected PoolType getPoolType() {
187       return PoolType.Reusable;
188     }
189 
190     public void testSingleThreadedClient() throws InterruptedException,
191         ExecutionException {
192       String randomKey = String.valueOf(random.nextInt());
193       String randomValue = String.valueOf(random.nextInt());
194       // As long as we poll values we put, the pool size should remain zero
195       runThread(randomKey, randomValue, randomValue);
196       assertEquals(0, poolMap.size(randomKey));
197     }
198 
199     public void testMultiThreadedClients() throws InterruptedException,
200         ExecutionException {
201       // As long as we poll values we put, the pool size should remain zero
202       for (int i = 0; i < POOL_SIZE; i++) {
203         String randomKey = String.valueOf(random.nextInt());
204         String randomValue = String.valueOf(random.nextInt());
205         runThread(randomKey, randomValue, randomValue);
206         assertEquals(0, poolMap.size(randomKey));
207       }
208       poolMap.clear();
209       String randomKey = String.valueOf(random.nextInt());
210       for (int i = 0; i < POOL_SIZE - 1; i++) {
211         String randomValue = String.valueOf(random.nextInt());
212         runThread(randomKey, randomValue, randomValue);
213         assertEquals(0, poolMap.size(randomKey));
214       }
215       assertEquals(0, poolMap.size(randomKey));
216     }
217 
218     public void testPoolCap() throws InterruptedException, ExecutionException {
219       // As long as we poll values we put, the pool size should remain zero
220       String randomKey = String.valueOf(random.nextInt());
221       List<String> randomValues = new ArrayList<String>();
222       for (int i = 0; i < POOL_SIZE * 2; i++) {
223         String randomValue = String.valueOf(random.nextInt());
224         randomValues.add(randomValue);
225         runThread(randomKey, randomValue, randomValue);
226       }
227       assertEquals(0, poolMap.size(randomKey));
228     }
229 
230   }
231 
232 }
233