View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package org.jboss.netty.channel.socket.nio;
18  
19  import org.jboss.netty.channel.socket.Worker;
20  import org.jboss.netty.logging.InternalLogger;
21  import org.jboss.netty.logging.InternalLoggerFactory;
22  import org.jboss.netty.util.ExternalResourceReleasable;
23  import org.jboss.netty.util.internal.ExecutorUtil;
24  
25  import java.util.concurrent.Executor;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.atomic.AtomicInteger;
28  
29  /**
30   * Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s
31   * up-front and return them in a "fair" fashion when calling {@link #nextWorker()}
32   */
33  public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker>
34          implements WorkerPool<E>, ExternalResourceReleasable {
35  
36      /**
37       * The worker pool raises an exception unless all worker threads start and run within this timeout (in seconds.)
38       */
39      private static final int INITIALIZATION_TIMEOUT = 10;
40  
41      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractNioWorkerPool.class);
42  
43      private final AbstractNioWorker[] workers;
44      private final AtomicInteger workerIndex = new AtomicInteger();
45      private final Executor workerExecutor;
46      private volatile boolean initialized;
47  
48      /**
49       * Create a new instance
50       *
51       * @param workerExecutor the {@link Executor} to use for the {@link Worker}'s
52       * @param workerCount the count of {@link Worker}'s to create
53       */
54      AbstractNioWorkerPool(Executor workerExecutor, int workerCount) {
55          this(workerExecutor, workerCount, true);
56      }
57  
58      AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean autoInit) {
59          if (workerExecutor == null) {
60              throw new NullPointerException("workerExecutor");
61          }
62          if (workerCount <= 0) {
63              throw new IllegalArgumentException(
64                      "workerCount (" + workerCount + ") " + "must be a positive integer.");
65          }
66          workers = new AbstractNioWorker[workerCount];
67          this.workerExecutor = workerExecutor;
68          if (autoInit) {
69              init();
70          }
71      }
72  
73      protected void init() {
74          if (initialized) {
75              throw new IllegalStateException("initialized already");
76          }
77  
78          initialized = true;
79  
80          for (int i = 0; i < workers.length; i++) {
81              workers[i] = newWorker(workerExecutor);
82          }
83  
84          waitForWorkerThreads();
85      }
86  
87      private void waitForWorkerThreads() {
88          long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(INITIALIZATION_TIMEOUT);
89          boolean warn = false;
90          for (AbstractNioSelector worker: workers) {
91              long waitTime = deadline - System.nanoTime();
92              try {
93                  if (waitTime <= 0) {
94                      if (worker.thread == null) {
95                          warn = true;
96                          break;
97                      }
98                  } else if (!worker.startupLatch.await(waitTime, TimeUnit.NANOSECONDS)) {
99                      warn = true;
100                     break;
101                 }
102             } catch (InterruptedException ignore) {
103                 // Stop waiting for the worker threads and let someone else take care of the interruption.
104                 Thread.currentThread().interrupt();
105                 break;
106             }
107         }
108 
109         if (warn) {
110             logger.warn(
111                     "Failed to get all worker threads ready within " + INITIALIZATION_TIMEOUT + " second(s). " +
112                     "Make sure to specify the executor which has more threads than the requested workerCount. " +
113                     "If unsure, use Executors.newCachedThreadPool().");
114         }
115     }
116 
117     /**
118      * Only here for backward compability and will be removed in later releases. Please use
119      * {@link #newWorker(Executor)}
120      *
121      *
122      * @param executor the {@link Executor} to use
123      * @return worker the new {@link Worker}
124      * @deprecated use {@link #newWorker(Executor)}
125      */
126     @Deprecated
127     protected E createWorker(Executor executor) {
128         throw new IllegalStateException("This will be removed. Override this and the newWorker(..) method!");
129     }
130 
131     /**
132      * Create a new {@link Worker} which uses the given {@link Executor} to service IO.
133      *
134      * This method will be made abstract in further releases (once {@link #createWorker(Executor)}
135      * was removed).
136      *
137      *
138      * @param executor the {@link Executor} to use
139      * @return worker the new {@link Worker}
140      */
141     @SuppressWarnings("deprecation")
142     protected E newWorker(Executor executor) {
143         return createWorker(executor);
144     }
145 
146     @SuppressWarnings("unchecked")
147     public E nextWorker() {
148         return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
149     }
150 
151     public void rebuildSelectors() {
152         for (AbstractNioWorker worker: workers) {
153             worker.rebuildSelector();
154         }
155     }
156 
157     public void releaseExternalResources() {
158         shutdown();
159         ExecutorUtil.shutdownNow(workerExecutor);
160     }
161 
162     public void shutdown() {
163         for (AbstractNioWorker worker: workers) {
164             worker.shutdown();
165         }
166     }
167 
168 }