View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.executor;
20  
21  import static org.junit.Assert.*;
22  import static org.junit.Assert.assertEquals;
23  
24  import java.io.IOException;
25  import java.io.StringWriter;
26  import java.util.concurrent.ThreadPoolExecutor;
27  import java.util.concurrent.atomic.AtomicBoolean;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.*;
33  import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
34  import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus;
35  import org.apache.hadoop.hbase.executor.ExecutorType;
36  import org.junit.Test;
37  import org.junit.experimental.categories.Category;
38  
39  import static org.mockito.Mockito.*;
40  
41  @Category(SmallTests.class)
42  public class TestExecutorService {
43    private static final Log LOG = LogFactory.getLog(TestExecutorService.class);
44  
45    @Test
46    public void testExecutorService() throws Exception {
47      int maxThreads = 5;
48      int maxTries = 10;
49      int sleepInterval = 10;
50  
51      Server mockedServer = mock(Server.class);
52      when(mockedServer.getConfiguration()).thenReturn(HBaseConfiguration.create());
53  
54      // Start an executor service pool with max 5 threads
55      ExecutorService executorService = new ExecutorService("unit_test");
56      executorService.startExecutorService(
57        ExecutorType.MASTER_SERVER_OPERATIONS, maxThreads);
58  
59      Executor executor =
60        executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS);
61      ThreadPoolExecutor pool = executor.threadPoolExecutor;
62  
63      // Assert no threads yet
64      assertEquals(0, pool.getPoolSize());
65  
66      AtomicBoolean lock = new AtomicBoolean(true);
67      AtomicInteger counter = new AtomicInteger(0);
68  
69      // Submit maxThreads executors.
70      for (int i = 0; i < maxThreads; i++) {
71        executorService.submit(
72          new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN,
73              lock, counter));
74      }
75  
76      // The TestEventHandler will increment counter when it starts.
77      int tries = 0;
78      while (counter.get() < maxThreads && tries < maxTries) {
79        LOG.info("Waiting for all event handlers to start...");
80        Thread.sleep(sleepInterval);
81        tries++;
82      }
83  
84      // Assert that pool is at max threads.
85      assertEquals(maxThreads, counter.get());
86      assertEquals(maxThreads, pool.getPoolSize());
87  
88      ExecutorStatus status = executor.getStatus();
89      assertTrue(status.queuedEvents.isEmpty());
90      assertEquals(5, status.running.size());
91      checkStatusDump(status);
92      
93      
94      // Now interrupt the running Executor
95      synchronized (lock) {
96        lock.set(false);
97        lock.notifyAll();
98      }
99  
100     // Executor increments counter again on way out so.... test that happened.
101     while (counter.get() < (maxThreads * 2) && tries < maxTries) {
102       System.out.println("Waiting for all event handlers to finish...");
103       Thread.sleep(sleepInterval);
104       tries++;
105     }
106 
107     assertEquals(maxThreads * 2, counter.get());
108     assertEquals(maxThreads, pool.getPoolSize());
109 
110     // Add more than the number of threads items.
111     // Make sure we don't get RejectedExecutionException.
112     for (int i = 0; i < (2 * maxThreads); i++) {
113       executorService.submit(
114         new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN,
115             lock, counter));
116     }
117     // Now interrupt the running Executor
118     synchronized (lock) {
119       lock.set(false);
120       lock.notifyAll();
121     }
122 
123     // Make sure threads are still around even after their timetolive expires.
124     Thread.sleep(ExecutorService.Executor.keepAliveTimeInMillis * 2);
125     assertEquals(maxThreads, pool.getPoolSize());
126 
127     executorService.shutdown();
128 
129     assertEquals(0, executorService.getAllExecutorStatuses().size());
130 
131     // Test that submit doesn't throw NPEs
132     executorService.submit(
133       new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN,
134             lock, counter));
135   }
136 
137   private void checkStatusDump(ExecutorStatus status) throws IOException {
138     StringWriter sw = new StringWriter();
139     status.dumpTo(sw, "");
140     String dump = sw.toString();
141     LOG.info("Got status dump:\n" + dump);
142     
143     assertTrue(dump.contains("Waiting on java.util.concurrent.atomic.AtomicBoolean"));
144   }
145 
146   public static class TestEventHandler extends EventHandler {
147     private AtomicBoolean lock;
148     private AtomicInteger counter;
149 
150     public TestEventHandler(Server server, EventType eventType,
151                             AtomicBoolean lock, AtomicInteger counter) {
152       super(server, eventType);
153       this.lock = lock;
154       this.counter = counter;
155     }
156 
157     @Override
158     public void process() throws IOException {
159       int num = counter.incrementAndGet();
160       LOG.info("Running process #" + num + ", threadName=" +
161         Thread.currentThread().getName());
162       synchronized (lock) {
163         while (lock.get()) {
164           try {
165             lock.wait();
166           } catch (InterruptedException e) {
167             // do nothing
168           }
169         }
170       }
171       counter.incrementAndGet();
172     }
173   }
174 
175 }
176