Return-Path: Delivered-To: apmail-tomcat-dev-archive@www.apache.org Received: (qmail 10164 invoked from network); 24 Aug 2009 16:35:59 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 24 Aug 2009 16:35:59 -0000 Received: (qmail 22167 invoked by uid 500); 24 Aug 2009 15:34:22 -0000 Delivered-To: apmail-tomcat-dev-archive@tomcat.apache.org Received: (qmail 22073 invoked by uid 500); 24 Aug 2009 15:34:21 -0000 Mailing-List: contact dev-help@tomcat.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: "Tomcat Developers List" Delivered-To: mailing list dev@tomcat.apache.org Received: (qmail 22062 invoked by uid 99); 24 Aug 2009 15:34:21 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Aug 2009 15:34:21 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Aug 2009 15:34:10 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E696A23888AD; Mon, 24 Aug 2009 15:33:49 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r807284 - in /tomcat/trunk/java/org/apache/tomcat/util: net/AbstractEndpoint.java net/AprEndpoint.java net/BaseEndpoint.java net/JIoEndpoint.java net/NioEndpoint.java threads/ResizableExecutor.java Date: Mon, 24 Aug 2009 15:33:49 -0000 To: dev@tomcat.apache.org From: fhanik@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090824153349.E696A23888AD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: fhanik Date: Mon Aug 24 15:33:48 2009 New Revision: 807284 URL: http://svn.apache.org/viewvc?rev=807284&view=rev Log: First round of refactoring connectors. Remove the worker based thread pools Enable local or injected executors Add in a resizable executors interface to be used in future revisions start abstracting out and using a base class. There was one, deleted, since its not used anywhere Added: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java (with props) tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java (with props) Removed: tomcat/trunk/java/org/apache/tomcat/util/net/BaseEndpoint.java Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Added: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java?rev=807284&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java (added) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java Mon Aug 24 15:33:48 2009 @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.net; + +import org.apache.tomcat.util.res.StringManager; +/** + * + * @author fhanik + * @author Mladen Turk + * @author Remy Maucherat + */ +public abstract class AbstractEndpoint { + + // -------------------------------------------------------------- Constants + protected StringManager sm = StringManager.getManager("org.apache.tomcat.util.net.res"); + + /** + * The Request attribute key for the cipher suite. + */ + public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; + + /** + * The Request attribute key for the key size. + */ + public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; + + /** + * The Request attribute key for the client certificate chain. + */ + public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; + + /** + * The Request attribute key for the session id. + * This one is a Tomcat extension to the Servlet spec. + */ + public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; + + /** + * The request attribute key for the session manager. + * This one is a Tomcat extension to the Servlet spec. + */ + public static final String SESSION_MGR = "javax.servlet.request.ssl_session_mgr"; + +} Propchange: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=807284&r1=807283&r2=807284&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Mon Aug 24 15:33:48 2009 @@ -21,6 +21,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; @@ -37,6 +39,10 @@ import org.apache.tomcat.jni.Socket; import org.apache.tomcat.jni.Status; import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.util.threads.ResizableExecutor; +import org.apache.tomcat.util.threads.TaskQueue; +import org.apache.tomcat.util.threads.TaskThreadFactory; +import org.apache.tomcat.util.threads.ThreadPoolExecutor; /** * APR tailored thread pool, providing the following services: @@ -53,7 +59,7 @@ * @author Mladen Turk * @author Remy Maucherat */ -public class AprEndpoint { +public class AprEndpoint extends AbstractEndpoint { // -------------------------------------------------------------- Constants @@ -61,8 +67,6 @@ protected static Log log = LogFactory.getLog(AprEndpoint.class); - protected static StringManager sm = - StringManager.getManager("org.apache.tomcat.util.net.res"); /** @@ -86,24 +90,11 @@ */ public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; - /** - * The request attribute key for the session manager. - * This one is a Tomcat extension to the Servlet spec. - */ - public static final String SESSION_MGR = - "javax.servlet.request.ssl_session_mgr"; - // ----------------------------------------------------------------- Fields /** - * Available workers. - */ - protected WorkerStack workers = null; - - - /** * Running state of the endpoint. */ protected volatile boolean running = false; @@ -163,6 +154,10 @@ protected long sslContext = 0; + /** + * Are we using an internal executor + */ + protected volatile boolean internalExecutor = false; // ------------------------------------------------------------- Properties @@ -188,10 +183,8 @@ protected int maxThreads = 200; public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; - if (running) { - synchronized(workers) { - workers.resize(maxThreads); - } + if (running && executor instanceof ResizableExecutor) { + ((ResizableExecutor)executor).resizePool(getMinSpareThreads(), getMaxThreads()); } } public int getMaxThreads() { return maxThreads; } @@ -545,9 +538,15 @@ */ public int getCurrentThreadCount() { if (executor!=null) { - return -1; + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getPoolSize(); + } else if (executor instanceof ResizableExecutor) { + return ((ResizableExecutor)executor).getPoolSize(); + } else { + return -1; + } } else { - return curThreads; + return -2; } } @@ -558,9 +557,15 @@ */ public int getCurrentThreadsBusy() { if (executor!=null) { - return -1; + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getActiveCount(); + } else if (executor instanceof ResizableExecutor) { + return ((ResizableExecutor)executor).getActiveCount(); + } else { + return -1; + } } else { - return workers!=null?curThreads - workers.size():0; + return -2; } } @@ -744,7 +749,11 @@ // Create worker collection if (executor == null) { - workers = new WorkerStack(maxThreads); + internalExecutor = true; + TaskQueue taskqueue = new TaskQueue(); + TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); + executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); + taskqueue.setParent( (ThreadPoolExecutor) executor); } // Start poller threads @@ -838,6 +847,16 @@ sendfiles = null; } } + if ( executor!=null && internalExecutor ) { + if ( executor instanceof ThreadPoolExecutor ) { + //this is our internal one, so we need to shut it down + ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor; + tpe.shutdownNow(); + TaskQueue queue = (TaskQueue) tpe.getQueue(); + queue.setParent(null); + } + executor = null; + } } @@ -946,86 +965,6 @@ } - /** - * Create (or allocate) and return an available processor for use in - * processing a specific HTTP request, if possible. If the maximum - * allowed processors have already been created and are in use, return - * null instead. - */ - protected Worker createWorkerThread() { - - synchronized (workers) { - if (workers.size() > 0) { - curThreadsBusy++; - return (workers.pop()); - } - if ((maxThreads > 0) && (curThreads < maxThreads)) { - curThreadsBusy++; - if (curThreadsBusy == maxThreads) { - log.info(sm.getString("endpoint.info.maxThreads", - Integer.toString(maxThreads), address, - Integer.toString(port))); - } - return (newWorkerThread()); - } else { - if (maxThreads < 0) { - curThreadsBusy++; - return (newWorkerThread()); - } else { - return (null); - } - } - } - - } - - - /** - * Create and return a new processor suitable for processing HTTP - * requests and returning the corresponding responses. - */ - protected Worker newWorkerThread() { - - Worker workerThread = new Worker(); - workerThread.start(); - return (workerThread); - - } - - - /** - * Return a new worker thread, and block while to worker is available. - */ - protected Worker getWorkerThread() { - // Allocate a new worker thread - Worker workerThread = createWorkerThread(); - while (workerThread == null) { - try { - synchronized (workers) { - workers.wait(); - } - } catch (InterruptedException e) { - // Ignore - } - workerThread = createWorkerThread(); - } - return workerThread; - } - - - /** - * Recycle the specified Processor so that it can be used again. - * - * @param workerThread The processor to be recycled - */ - protected void recycleWorkerThread(Worker workerThread) { - synchronized (workers) { - workers.push(workerThread); - curThreadsBusy--; - workers.notify(); - } - } - /** * Allocate a new poller of the specified size. @@ -1050,11 +989,10 @@ */ protected boolean processSocketWithOptions(long socket) { try { - if (executor == null) { - getWorkerThread().assignWithOptions(socket); - } else { - executor.execute(new SocketWithOptionsProcessor(socket)); - } + executor.execute(new SocketWithOptionsProcessor(socket)); + } catch (RejectedExecutionException x) { + log.warn("Socket processing request was rejected for:"+socket,x); + return false; } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full @@ -1070,11 +1008,10 @@ */ protected boolean processSocket(long socket) { try { - if (executor == null) { - getWorkerThread().assign(socket); - } else { - executor.execute(new SocketProcessor(socket)); - } + executor.execute(new SocketProcessor(socket)); + } catch (RejectedExecutionException x) { + log.warn("Socket processing request was rejected for:"+socket,x); + return false; } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full @@ -1090,11 +1027,10 @@ */ protected boolean processSocket(long socket, SocketStatus status) { try { - if (executor == null) { - getWorkerThread().assign(socket, status); - } else { - executor.execute(new SocketEventProcessor(socket, status)); - } + executor.execute(new SocketEventProcessor(socket, status)); + } catch (RejectedExecutionException x) { + log.warn("Socket processing request was rejected for:"+socket,x); + return false; } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full @@ -1389,178 +1325,6 @@ // ----------------------------------------------------- Worker Inner Class - /** - * Server processor class. - */ - protected class Worker implements Runnable { - - - protected Thread thread = null; - protected boolean available = false; - protected long socket = 0; - protected SocketStatus status = null; - protected boolean options = false; - - - /** - * Process an incoming TCP/IP connection on the specified socket. Any - * exception that occurs during processing must be logged and swallowed. - * NOTE: This method is called from our Connector's thread. We - * must assign it to our own thread so that multiple simultaneous - * requests can be handled. - * - * @param socket TCP socket to process - */ - protected synchronized void assignWithOptions(long socket) { - - // Wait for the Processor to get the previous Socket - while (available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Store the newly available Socket and notify our thread - this.socket = socket; - status = null; - options = true; - available = true; - notifyAll(); - - } - - - /** - * Process an incoming TCP/IP connection on the specified socket. Any - * exception that occurs during processing must be logged and swallowed. - * NOTE: This method is called from our Connector's thread. We - * must assign it to our own thread so that multiple simultaneous - * requests can be handled. - * - * @param socket TCP socket to process - */ - protected synchronized void assign(long socket) { - - // Wait for the Processor to get the previous Socket - while (available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Store the newly available Socket and notify our thread - this.socket = socket; - status = null; - options = false; - available = true; - notifyAll(); - - } - - - protected synchronized void assign(long socket, SocketStatus status) { - - // Wait for the Processor to get the previous Socket - while (available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Store the newly available Socket and notify our thread - this.socket = socket; - this.status = status; - options = false; - available = true; - notifyAll(); - - } - - - /** - * Await a newly assigned Socket from our Connector, or null - * if we are supposed to shut down. - */ - protected synchronized long await() { - - // Wait for the Connector to provide a new Socket - while (!available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Notify the Connector that we have received this Socket - long socket = this.socket; - available = false; - notifyAll(); - - return (socket); - - } - - - /** - * The background thread that listens for incoming TCP/IP connections and - * hands them off to an appropriate processor. - */ - public void run() { - - // Process requests until we receive a shutdown signal - while (running) { - - // Wait for the next socket to be assigned - long socket = await(); - if (socket == 0) - continue; - - if (!deferAccept && options) { - if (setSocketOptions(socket)) { - getPoller().add(socket); - } else { - // Close socket and pool - Socket.destroy(socket); - socket = 0; - } - } else { - - // Process the request from this socket - if ((status != null) && (handler.event(socket, status) == Handler.SocketState.CLOSED)) { - // Close socket and pool - Socket.destroy(socket); - socket = 0; - } else if ((status == null) && ((options && !setSocketOptions(socket)) - || handler.process(socket) == Handler.SocketState.CLOSED)) { - // Close socket and pool - Socket.destroy(socket); - socket = 0; - } - } - - // Finish up this request - recycleWorkerThread(this); - - } - - } - - - /** - * Start the background processing thread. - */ - public void start() { - thread = new Thread(this); - thread.setName(getName() + "-" + (++curThreads)); - thread.setDaemon(true); - thread.start(); - } - - - } // ----------------------------------------------- SendfileData Inner Class @@ -1887,83 +1651,6 @@ } - // ------------------------------------------------- WorkerStack Inner Class - - - public class WorkerStack { - - protected Worker[] workers = null; - protected int end = 0; - - public WorkerStack(int size) { - workers = new Worker[size]; - } - - /** - * Put the object into the queue. If the queue is full (for example if - * the queue has been reduced in size) the object will be dropped. - * - * @param object the object to be appended to the queue (first - * element). - */ - public void push(Worker worker) { - if (end < workers.length) { - workers[end++] = worker; - } else { - curThreads--; - } - } - - /** - * Get the first object out of the queue. Return null if the queue - * is empty. - */ - public Worker pop() { - if (end > 0) { - return workers[--end]; - } - return null; - } - - /** - * Get the first object out of the queue, Return null if the queue - * is empty. - */ - public Worker peek() { - return workers[end]; - } - - /** - * Is the queue empty? - */ - public boolean isEmpty() { - return (end == 0); - } - - /** - * How many elements are there in this queue? - */ - public int size() { - return (end); - } - - /** - * Resize the queue. If there are too many objects in the queue for the - * new size, drop the excess. - * - * @param newSize - */ - public void resize(int newSize) { - Worker[] newWorkers = new Worker[newSize]; - int len = workers.length; - if (newSize < len) { - len = newSize; - } - System.arraycopy(workers, 0, newWorkers, 0, len); - workers = newWorkers; - } - } - // ---------------------------------------------- SocketProcessor Inner Class Modified: tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java?rev=807284&r1=807283&r2=807284&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java Mon Aug 24 15:33:48 2009 @@ -23,11 +23,17 @@ import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.IntrospectionUtils; import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.util.threads.ResizableExecutor; +import org.apache.tomcat.util.threads.TaskQueue; +import org.apache.tomcat.util.threads.TaskThreadFactory; +import org.apache.tomcat.util.threads.ThreadPoolExecutor; /** * Handle incoming TCP connections. @@ -45,50 +51,17 @@ * @author Yoav Shapira * @author Remy Maucherat */ -public class JIoEndpoint { +public class JIoEndpoint extends AbstractEndpoint { // -------------------------------------------------------------- Constants - protected static Log log = LogFactory.getLog(JIoEndpoint.class); - protected StringManager sm = - StringManager.getManager("org.apache.tomcat.util.net.res"); - - - /** - * The Request attribute key for the cipher suite. - */ - public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; - - /** - * The Request attribute key for the key size. - */ - public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; - - /** - * The Request attribute key for the client certificate chain. - */ - public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; - - /** - * The Request attribute key for the session id. - * This one is a Tomcat extension to the Servlet spec. - */ - public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; - - // ----------------------------------------------------------------- Fields /** - * Available workers. - */ - protected WorkerStack workers = null; - - - /** * Running state of the endpoint. */ protected volatile boolean running = false; @@ -134,6 +107,10 @@ */ protected SocketProperties socketProperties = new SocketProperties(); + /** + * Are we using an internal executor + */ + protected volatile boolean internalExecutor = false; // ------------------------------------------------------------- Properties @@ -177,13 +154,19 @@ public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; if (running) { - synchronized(workers) { - workers.resize(maxThreads); - } + //TODO Dynamic resize + log.error("Resizing executor dynamically is not possible at this time."); } } public int getMaxThreads() { return maxThreads; } + public int minSpareThreads = 10; + public int getMinSpareThreads() { + return Math.min(minSpareThreads,getMaxThreads()); + } + public void setMinSpareThreads(int minSpareThreads) { + this.minSpareThreads = minSpareThreads; + } /** * Priority of the acceptor and poller threads. @@ -304,9 +287,15 @@ */ public int getCurrentThreadCount() { if (executor!=null) { - return -1; + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getPoolSize(); + } else if (executor instanceof ResizableExecutor) { + return ((ResizableExecutor)executor).getPoolSize(); + } else { + return -1; + } } else { - return curThreads; + return -2; } } @@ -317,9 +306,15 @@ */ public int getCurrentThreadsBusy() { if (executor!=null) { - return -1; + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getActiveCount(); + } else if (executor instanceof ResizableExecutor) { + return ((ResizableExecutor)executor).getActiveCount(); + } else { + return -1; + } } else { - return workers!=null?curThreads - workers.size():0; + return -2; } } @@ -426,113 +421,6 @@ } - // ----------------------------------------------------- Worker Inner Class - - - protected class Worker implements Runnable { - - protected Thread thread = null; - protected boolean available = false; - protected Socket socket = null; - - - /** - * Process an incoming TCP/IP connection on the specified socket. Any - * exception that occurs during processing must be logged and swallowed. - * NOTE: This method is called from our Connector's thread. We - * must assign it to our own thread so that multiple simultaneous - * requests can be handled. - * - * @param socket TCP socket to process - */ - synchronized void assign(Socket socket) { - - // Wait for the Processor to get the previous Socket - while (available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Store the newly available Socket and notify our thread - this.socket = socket; - available = true; - notifyAll(); - - } - - - /** - * Await a newly assigned Socket from our Connector, or null - * if we are supposed to shut down. - */ - private synchronized Socket await() { - - // Wait for the Connector to provide a new Socket - while (!available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Notify the Connector that we have received this Socket - Socket socket = this.socket; - available = false; - notifyAll(); - - return (socket); - - } - - - - /** - * The background thread that listens for incoming TCP/IP connections and - * hands them off to an appropriate processor. - */ - public void run() { - - // Process requests until we receive a shutdown signal - while (running) { - - // Wait for the next socket to be assigned - Socket socket = await(); - if (socket == null) - continue; - - // Process the request from this socket - if (!setSocketOptions(socket) || !handler.process(socket)) { - // Close socket - try { - socket.close(); - } catch (IOException e) { - } - } - - // Finish up this request - socket = null; - recycleWorkerThread(this); - - } - - } - - - /** - * Start the background processing thread. - */ - public void start() { - thread = new Thread(this); - thread.setName(getName() + "-" + (++curThreads)); - thread.setDaemon(true); - thread.start(); - } - - - } - // -------------------- Public methods -------------------- @@ -583,7 +471,11 @@ // Create worker collection if (executor == null) { - workers = new WorkerStack(maxThreads); + internalExecutor = true; + TaskQueue taskqueue = new TaskQueue(); + TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); + executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); + taskqueue.setParent( (ThreadPoolExecutor) executor); } // Start acceptor threads @@ -614,6 +506,16 @@ running = false; unlockAccept(); } + if ( executor!=null && internalExecutor ) { + if ( executor instanceof ThreadPoolExecutor ) { + //this is our internal one, so we need to shut it down + ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor; + tpe.shutdownNow(); + TaskQueue queue = (TaskQueue) tpe.getQueue(); + queue.setParent(null); + } + executor = null; + } } /** @@ -696,97 +598,16 @@ /** - * Create (or allocate) and return an available processor for use in - * processing a specific HTTP request, if possible. If the maximum - * allowed processors have already been created and are in use, return - * null instead. - */ - protected Worker createWorkerThread() { - - synchronized (workers) { - if (workers.size() > 0) { - curThreadsBusy++; - return workers.pop(); - } - if ((maxThreads > 0) && (curThreads < maxThreads)) { - curThreadsBusy++; - if (curThreadsBusy == maxThreads) { - log.info(sm.getString("endpoint.info.maxThreads", - Integer.toString(maxThreads), address, - Integer.toString(port))); - } - return (newWorkerThread()); - } else { - if (maxThreads < 0) { - curThreadsBusy++; - return (newWorkerThread()); - } else { - return (null); - } - } - } - - } - - - /** - * Create and return a new processor suitable for processing HTTP - * requests and returning the corresponding responses. - */ - protected Worker newWorkerThread() { - - Worker workerThread = new Worker(); - workerThread.start(); - return (workerThread); - - } - - - /** - * Return a new worker thread, and block while to worker is available. - */ - protected Worker getWorkerThread() { - // Allocate a new worker thread - Worker workerThread = createWorkerThread(); - while (workerThread == null) { - try { - synchronized (workers) { - workers.wait(); - } - } catch (InterruptedException e) { - // Ignore - } - workerThread = createWorkerThread(); - } - return workerThread; - } - - - /** - * Recycle the specified Processor so that it can be used again. - * - * @param workerThread The processor to be recycled - */ - protected void recycleWorkerThread(Worker workerThread) { - synchronized (workers) { - workers.push(workerThread); - curThreadsBusy--; - workers.notify(); - } - } - - - /** * Process given socket. */ protected boolean processSocket(Socket socket) { try { - if (executor == null) { - getWorkerThread().assign(socket); - } else { - executor.execute(new SocketProcessor(socket)); - } + executor.execute(new SocketProcessor(socket)); + } catch (RejectedExecutionException x) { + log.warn("Socket processing request was rejected for:"+socket,x); + return false; } catch (Throwable t) { + // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString("endpoint.process.fail"), t); @@ -796,81 +617,4 @@ } - // ------------------------------------------------- WorkerStack Inner Class - - - public class WorkerStack { - - protected Worker[] workers = null; - protected int end = 0; - - public WorkerStack(int size) { - workers = new Worker[size]; - } - - /** - * Put the object into the queue. If the queue is full (for example if - * the queue has been reduced in size) the object will be dropped. - * - * @param object the object to be appended to the queue (first - * element). - */ - public void push(Worker worker) { - if (end < workers.length) { - workers[end++] = worker; - } else { - curThreads--; - } - } - - /** - * Get the first object out of the queue. Return null if the queue - * is empty. - */ - public Worker pop() { - if (end > 0) { - return workers[--end]; - } - return null; - } - - /** - * Get the first object out of the queue, Return null if the queue - * is empty. - */ - public Worker peek() { - return workers[end]; - } - - /** - * Is the queue empty? - */ - public boolean isEmpty() { - return (end == 0); - } - - /** - * How many elements are there in this queue? - */ - public int size() { - return (end); - } - - /** - * Resize the queue. If there are too many objects in the queue for the - * new size, drop the excess. - * - * @param newSize - */ - public void resize(int newSize) { - Worker[] newWorkers = new Worker[newSize]; - int len = workers.length; - if (newSize < len) { - len = newSize; - } - System.arraycopy(workers, 0, newWorkers, 0, len); - workers = newWorkers; - } - } - } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=807284&r1=807283&r2=807284&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Aug 24 15:33:48 2009 @@ -58,6 +58,7 @@ import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler; import org.apache.tomcat.util.net.jsse.NioX509KeyManager; import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.util.threads.ResizableExecutor; import org.apache.tomcat.util.threads.TaskQueue; import org.apache.tomcat.util.threads.TaskThreadFactory; import org.apache.tomcat.util.threads.ThreadPoolExecutor; @@ -77,7 +78,7 @@ * @author Remy Maucherat * @author Filip Hanik */ -public class NioEndpoint { +public class NioEndpoint extends AbstractEndpoint { // -------------------------------------------------------------- Constants @@ -85,30 +86,6 @@ protected static Log log = LogFactory.getLog(NioEndpoint.class); - protected static StringManager sm = - StringManager.getManager("org.apache.tomcat.util.net.res"); - - - /** - * The Request attribute key for the cipher suite. - */ - public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; - - /** - * The Request attribute key for the key size. - */ - public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; - - /** - * The Request attribute key for the client certificate chain. - */ - public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; - - /** - * The Request attribute key for the session id. - * This one is a Tomcat extension to the Servlet spec. - */ - public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; public static final int OP_REGISTER = 0x100; //register interest op public static final int OP_CALLBACK = 0x200; //callback interest op @@ -333,7 +310,7 @@ /** * Are we using an internal executor */ - protected boolean internalExecutor = true; + protected volatile boolean internalExecutor = false; protected boolean useExecutor = true; /** @@ -518,13 +495,16 @@ /** * Dummy maxSpareThreads property. */ - public int getMaxSpareThreads() { return Math.min(getMaxThreads(),5); } + public int getMaxSpareThreads() { return Math.min(getMaxThreads(),getMinSpareThreads()); } - /** - * Dummy minSpareThreads property. - */ - public int getMinSpareThreads() { return Math.min(getMaxThreads(),5); } + public int minSpareThreads = 10; + public int getMinSpareThreads() { + return Math.min(minSpareThreads,getMaxThreads()); + } + public void setMinSpareThreads(int minSpareThreads) { + this.minSpareThreads = minSpareThreads; + } /** * Generic properties, introspected @@ -733,6 +713,8 @@ if (executor!=null) { if (executor instanceof ThreadPoolExecutor) { return ((ThreadPoolExecutor)executor).getPoolSize(); + } else if (executor instanceof ResizableExecutor) { + return ((ResizableExecutor)executor).getPoolSize(); } else { return -1; } @@ -750,6 +732,8 @@ if (executor!=null) { if (executor instanceof ThreadPoolExecutor) { return ((ThreadPoolExecutor)executor).getActiveCount(); + } else if (executor instanceof ResizableExecutor) { + return ((ResizableExecutor)executor).getActiveCount(); } else { return -1; } @@ -1142,9 +1126,7 @@ if ( dispatch && executor!=null ) executor.execute(sc); else sc.run(); } catch (RejectedExecutionException rx) { - if (log.isDebugEnabled()) { - log.debug("Unable to process socket, executor rejected the task.",rx); - } + log.warn("Socket processing request was rejected for:"+socket,rx); return false; } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that Added: tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java?rev=807284&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java (added) +++ tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java Mon Aug 24 15:33:48 2009 @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.threads; + +import java.util.concurrent.Executor; + +public interface ResizableExecutor extends Executor { + /** + * {@link java.util.concurrent.ThreadPoolExecutor#getPoolSize()} + * @return {@link java.util.concurrent.ThreadPoolExecutor#getPoolSize()} + */ + public int getPoolSize(); + + /** + * {@link java.util.concurrent.ThreadPoolExecutor#getActiveCount()} + * @return {@link java.util.concurrent.ThreadPoolExecutor#getActiveCount()} + */ + public int getActiveCount(); + + public boolean resizePool(int corePoolSize, int maximumPoolSize); + + public boolean resizeQueue(int capacity); + +} Propchange: tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java ------------------------------------------------------------------------------ svn:eol-style = native --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org For additional commands, e-mail: dev-help@tomcat.apache.org