1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.executor;
20
21 import static org.junit.Assert.*;
22 import java.io.IOException;
23 import java.io.StringWriter;
24 import java.util.concurrent.ThreadPoolExecutor;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.*;
32 import org.apache.hadoop.hbase.Waiter.Predicate;
33 import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
34 import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus;
35 import org.apache.hadoop.hbase.testclassification.SmallTests;
36 import org.junit.Test;
37 import org.junit.experimental.categories.Category;
38
39 import static org.mockito.Mockito.*;
40
41 @Category(SmallTests.class)
42 public class TestExecutorService {
43 private static final Log LOG = LogFactory.getLog(TestExecutorService.class);
44
45 @Test
46 public void testExecutorService() throws Exception {
47 int maxThreads = 5;
48 int maxTries = 10;
49 int sleepInterval = 10;
50
51 Server mockedServer = mock(Server.class);
52 when(mockedServer.getConfiguration()).thenReturn(HBaseConfiguration.create());
53
54
55 ExecutorService executorService = new ExecutorService("unit_test");
56 executorService.startExecutorService(
57 ExecutorType.MASTER_SERVER_OPERATIONS, maxThreads);
58
59 Executor executor =
60 executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS);
61 ThreadPoolExecutor pool = executor.threadPoolExecutor;
62
63
64 assertEquals(0, pool.getPoolSize());
65
66 AtomicBoolean lock = new AtomicBoolean(true);
67 AtomicInteger counter = new AtomicInteger(0);
68
69
70 for (int i = 0; i < maxThreads; i++) {
71 executorService.submit(
72 new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN,
73 lock, counter));
74 }
75
76
77 int tries = 0;
78 while (counter.get() < maxThreads && tries < maxTries) {
79 LOG.info("Waiting for all event handlers to start...");
80 Thread.sleep(sleepInterval);
81 tries++;
82 }
83
84
85 assertEquals(maxThreads, counter.get());
86 assertEquals(maxThreads, pool.getPoolSize());
87
88 ExecutorStatus status = executor.getStatus();
89 assertTrue(status.queuedEvents.isEmpty());
90 assertEquals(5, status.running.size());
91 checkStatusDump(status);
92
93
94
95 synchronized (lock) {
96 lock.set(false);
97 lock.notifyAll();
98 }
99
100
101 while (counter.get() < (maxThreads * 2) && tries < maxTries) {
102 System.out.println("Waiting for all event handlers to finish...");
103 Thread.sleep(sleepInterval);
104 tries++;
105 }
106
107 assertEquals(maxThreads * 2, counter.get());
108 assertEquals(maxThreads, pool.getPoolSize());
109
110
111
112 for (int i = 0; i < (2 * maxThreads); i++) {
113 executorService.submit(
114 new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN,
115 lock, counter));
116 }
117
118 synchronized (lock) {
119 lock.set(false);
120 lock.notifyAll();
121 }
122
123
124 Thread.sleep(ExecutorService.Executor.keepAliveTimeInMillis * 2);
125 assertEquals(maxThreads, pool.getPoolSize());
126
127 executorService.shutdown();
128
129 assertEquals(0, executorService.getAllExecutorStatuses().size());
130
131
132 executorService.submit(
133 new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN,
134 lock, counter));
135 }
136
137 private void checkStatusDump(ExecutorStatus status) throws IOException {
138 StringWriter sw = new StringWriter();
139 status.dumpTo(sw, "");
140 String dump = sw.toString();
141 LOG.info("Got status dump:\n" + dump);
142
143 assertTrue(dump.contains("Waiting on java.util.concurrent.atomic.AtomicBoolean"));
144 }
145
146 public static class TestEventHandler extends EventHandler {
147 private AtomicBoolean lock;
148 private AtomicInteger counter;
149
150 public TestEventHandler(Server server, EventType eventType,
151 AtomicBoolean lock, AtomicInteger counter) {
152 super(server, eventType);
153 this.lock = lock;
154 this.counter = counter;
155 }
156
157 @Override
158 public void process() throws IOException {
159 int num = counter.incrementAndGet();
160 LOG.info("Running process #" + num + ", threadName=" +
161 Thread.currentThread().getName());
162 synchronized (lock) {
163 while (lock.get()) {
164 try {
165 lock.wait();
166 } catch (InterruptedException e) {
167
168 }
169 }
170 }
171 counter.incrementAndGet();
172 }
173 }
174
175 @Test
176 public void testAborting() throws Exception {
177 final Configuration conf = HBaseConfiguration.create();
178 final Server server = mock(Server.class);
179 when(server.getConfiguration()).thenReturn(conf);
180
181 ExecutorService executorService = new ExecutorService("unit_test");
182 executorService.startExecutorService(
183 ExecutorType.MASTER_SERVER_OPERATIONS, 1);
184
185
186 executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) {
187 @Override
188 public void process() throws IOException {
189 throw new RuntimeException("Should cause abort");
190 }
191 });
192
193 Waiter.waitFor(conf, 30000, new Predicate<Exception>() {
194 @Override
195 public boolean evaluate() throws Exception {
196 try {
197 verify(server, times(1)).abort(anyString(), (Throwable) anyObject());
198 return true;
199 } catch (Throwable t) {
200 return false;
201 }
202 }
203 });
204
205 executorService.shutdown();
206 }
207
208 }
209