1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.PrintWriter;
22 import java.lang.Thread.UncaughtExceptionHandler;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.ThreadPoolExecutor;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.classification.InterfaceAudience;
32 import org.apache.hadoop.util.ReflectionUtils;
33
34
35
36
37 @InterfaceAudience.Private
38 public class Threads {
39 protected static final Log LOG = LogFactory.getLog(Threads.class);
40 private static final AtomicInteger poolNumber = new AtomicInteger(1);
41
42
43
44
45
46 public static Thread setDaemonThreadRunning(final Thread t) {
47 return setDaemonThreadRunning(t, t.getName());
48 }
49
50
51
52
53
54
55
56 public static Thread setDaemonThreadRunning(final Thread t,
57 final String name) {
58 return setDaemonThreadRunning(t, name, null);
59 }
60
61
62
63
64
65
66
67
68
69 public static Thread setDaemonThreadRunning(final Thread t,
70 final String name, final UncaughtExceptionHandler handler) {
71 t.setName(name);
72 if (handler != null) {
73 t.setUncaughtExceptionHandler(handler);
74 }
75 t.setDaemon(true);
76 t.start();
77 return t;
78 }
79
80
81
82
83
84 public static void shutdown(final Thread t) {
85 shutdown(t, 0);
86 }
87
88
89
90
91
92
93 public static void shutdown(final Thread t, final long joinwait) {
94 if (t == null) return;
95 while (t.isAlive()) {
96 try {
97 t.join(joinwait);
98 } catch (InterruptedException e) {
99 LOG.warn(t.getName() + "; joinwait=" + joinwait, e);
100 }
101 }
102 }
103
104
105
106
107
108
109
110 public static void threadDumpingIsAlive(final Thread t)
111 throws InterruptedException {
112 if (t == null) {
113 return;
114 }
115
116 while (t.isAlive()) {
117 t.join(60 * 1000);
118 if (t.isAlive()) {
119 ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
120 "Automatic Stack Trace every 60 seconds waiting on " +
121 t.getName());
122 }
123 }
124 }
125
126
127
128
129
130 public static void sleep(long millis) {
131 try {
132 Thread.sleep(millis);
133 } catch (InterruptedException e) {
134 e.printStackTrace();
135 Thread.currentThread().interrupt();
136 }
137 }
138
139
140
141
142
143
144 public static void sleepWithoutInterrupt(final long msToWait) {
145 long timeMillis = System.currentTimeMillis();
146 long endTime = timeMillis + msToWait;
147 boolean interrupted = false;
148 while (timeMillis < endTime) {
149 try {
150 Thread.sleep(endTime - timeMillis);
151 } catch (InterruptedException ex) {
152 interrupted = true;
153 }
154 timeMillis = System.currentTimeMillis();
155 }
156
157 if (interrupted) {
158 Thread.currentThread().interrupt();
159 }
160 }
161
162
163
164
165
166
167
168
169
170
171
172
173 public static ThreadPoolExecutor getBoundedCachedThreadPool(
174 int maxCachedThread, long timeout, TimeUnit unit,
175 ThreadFactory threadFactory) {
176 ThreadPoolExecutor boundedCachedThreadPool =
177 new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout,
178 unit, new LinkedBlockingQueue<Runnable>(), threadFactory);
179
180 boundedCachedThreadPool.allowCoreThreadTimeOut(true);
181 return boundedCachedThreadPool;
182 }
183
184
185
186
187
188
189
190
191 public static ThreadFactory getNamedThreadFactory(final String prefix) {
192 SecurityManager s = System.getSecurityManager();
193 final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
194 .getThreadGroup();
195
196 return new ThreadFactory() {
197 final AtomicInteger threadNumber = new AtomicInteger(1);
198 private final int poolNumber = Threads.poolNumber.getAndIncrement();
199 final ThreadGroup group = threadGroup;
200
201 @Override
202 public Thread newThread(Runnable r) {
203 final String name = prefix + "-pool" + poolNumber + "-t" + threadNumber.getAndIncrement();
204 return new Thread(group, r, name);
205 }
206 };
207 }
208
209
210
211
212
213 public static ThreadFactory newDaemonThreadFactory(final String prefix) {
214 return newDaemonThreadFactory(prefix, null);
215 }
216
217
218
219
220
221
222
223
224 public static ThreadFactory newDaemonThreadFactory(final String prefix,
225 final UncaughtExceptionHandler handler) {
226 final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
227 return new ThreadFactory() {
228 @Override
229 public Thread newThread(Runnable r) {
230 Thread t = namedFactory.newThread(r);
231 if (handler != null) {
232 t.setUncaughtExceptionHandler(handler);
233 }
234 if (!t.isDaemon()) {
235 t.setDaemon(true);
236 }
237 if (t.getPriority() != Thread.NORM_PRIORITY) {
238 t.setPriority(Thread.NORM_PRIORITY);
239 }
240 return t;
241 }
242
243 };
244 }
245 }