Return-Path: Delivered-To: apmail-jakarta-tomcat-dev-archive@jakarta.apache.org Received: (qmail 45433 invoked by uid 500); 1 Jun 2001 20:24:44 -0000 Mailing-List: contact tomcat-dev-help@jakarta.apache.org; run by ezmlm Precedence: bulk list-help: list-unsubscribe: list-post: Reply-To: tomcat-dev@jakarta.apache.org Delivered-To: mailing list tomcat-dev@jakarta.apache.org Received: (qmail 45213 invoked from network); 1 Jun 2001 20:24:40 -0000 From: "Hector Gonzalez" To: "tomcat-dev" Cc: , , Subject: [PATCH] ThreadPool rewrite with sanity checks Date: Fri, 1 Jun 2001 16:33:33 -0400 Message-ID: MIME-Version: 1.0 Content-Type: multipart/mixed; boundary="----=_NextPart_000_0011_01C0EAB8.998842E0" X-Priority: 3 (Normal) X-MSMail-Priority: Normal X-Mailer: Microsoft Outlook IMO, Build 9.0.2416 (9.0.2910.0) X-MimeOLE: Produced By Microsoft MimeOLE V5.00.2919.6600 Importance: Normal X-Spam-Rating: h31.sny.collab.net 1.6.2 0/1000/N This is a multi-part message in MIME format. ------=_NextPart_000_0011_01C0EAB8.998842E0 Content-Type: text/plain; charset="iso-8859-1" Content-Transfer-Encoding: 7bit Hi all, After performing stress testing on Tomcat 3.2.2 I found that one area of trouble was the Thread Pool. Long running servlets may eat up all resources and prevent tomcat from accepting any new connections on the socket. Also for some still unknown reason performance would decrease over time. I have patched the code to do perform two safety checks: 1. If a thread has been processing a request for too long (as specified in a parameter) the thread is stoped. 2. After a thread has already processed X requests or more (X TcpWorkerThreads have been attached), it is terminated and a new thread is created. I also changed the code and moved most synchronization from ThreadPool to ObjectHASH (a fully synchronized hash table) I tested the patched code with 60 threads from 3 different machines with very good results in terms of reliability and performance. This patch involves changes to: util/ThreadPool.java util/ThreadPoolRunnable.java service/PoolTcpEndPoint.java Regards Hector Gonzalez hgonzalez@questionexchange.com ------=_NextPart_000_0011_01C0EAB8.998842E0 Content-Type: text/plain; name="patchfile.txt" Content-Transfer-Encoding: quoted-printable Content-Disposition: attachment; filename="patchfile.txt" Index: ThreadPool.java=0A= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=0A= RCS file: = /home/cvspublic/jakarta-tomcat/src/share/org/apache/tomcat/util/Attic/Thr= eadPool.java,v=0A= retrieving revision 1.9.2.2=0A= diff -u -r1.9.2.2 ThreadPool.java=0A= --- ThreadPool.java 2001/04/23 02:16:03 1.9.2.2=0A= +++ ThreadPool.java 2001/06/01 20:10:21=0A= @@ -1,7 +1,7 @@=0A= /*=0A= - * $Header: = /home/cvspublic/jakarta-tomcat/src/share/org/apache/tomcat/util/Attic/Thr= eadPool.java,v 1.9.2.2 2001/04/23 02:16:03 marcsaeg Exp $=0A= - * $Revision: 1.9.2.2 $=0A= - * $Date: 2001/04/23 02:16:03 $=0A= + * $Header: = /home/cvs/jakarta-tomcat/src/share/org/apache/tomcat/util/Attic/ThreadPoo= l.java,v 1.9.2.1 2000/07/06 22:20:17 alex Exp $=0A= + * $Revision: 1.9.2.1 $=0A= + * $Date: 2000/07/06 22:20:17 $=0A= *=0A= * = =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=0A= *=0A= @@ -71,12 +71,17 @@=0A= import org.apache.tomcat.logging.*; =0A= =0A= /**=0A= - * A thread pool that is trying to copy the apache process management.=0A= + * A thread pool that creates a number of threads and=0A= + * assigns them to runnable objects as requested.=0A= + * The pool provides maintenance on the individual=0A= + * threads by cheking running time and number of runs.=0A= + * Tried to keep synchronization to a minimum, most of the=0A= + * synchronization happens at the idleWorkers (ObjectHASH)=0A= + * level.=0A= *=0A= - * @author Gal Shachor=0A= + * @author Hector Gonzalez=0A= */=0A= -public class ThreadPool {=0A= -=0A= +public class ThreadPool{=0A= /*=0A= * Default values ...=0A= */=0A= @@ -84,470 +89,675 @@=0A= public static final int MAX_SPARE_THREADS =3D 50;=0A= public static final int MIN_SPARE_THREADS =3D 10;=0A= public static final int WORK_WAIT_TIMEOUT =3D 60*1000;=0A= + public static final int MAX_THREAD_RUNS =3D 100;=0A= + public static final int MAX_THREAD_ITERATIONS =3D 10;=0A= =0A= - /*=0A= - * Where the threads are held.=0A= - */=0A= - protected Vector pool;=0A= + static int debug=3D0;=0A= + LogHelper loghelper =3D new LogHelper("tc_log", "ThreadPool");=0A= =0A= /*=0A= - * A monitor thread that monitors the pool for idel threads.=0A= + * Max number of threads that you can open in the pool.=0A= */=0A= - protected MonitorRunnable monitor;=0A= -=0A= + protected int maxThreads;=0A= =0A= /*=0A= - * Max number of threads that you can open in the pool.=0A= + * Min number of idle threads that you can leave in the pool.=0A= */=0A= - protected int maxThreads;=0A= + protected int maxSpareThreads;=0A= =0A= /*=0A= - * Min number of idel threads that you can leave in the pool.=0A= + * Max number of idle threads that you can leave in the pool.=0A= */=0A= protected int minSpareThreads;=0A= =0A= /*=0A= - * Max number of idel threads that you can leave in the pool.=0A= + * Number of runs before thread is replaced=0A= */=0A= - protected int maxSpareThreads;=0A= + protected int maxThreadRuns;=0A= =0A= - /*=0A= - * Number of threads in the pool.=0A= + /* =0A= + * Number of times the monitor thread will run before=0A= + * stopping a thread that has been continuously busy=0A= + * effective time before stopping =3D WORK_WAIT_TIMEOUT * = MAX_THREAD_ITERATIONS=0A= */=0A= - protected int currentThreadCount;=0A= + protected int maxThreadIterations;=0A= =0A= /*=0A= - * Number of busy threads in the pool.=0A= + * Keep list of idle threads=0A= */=0A= - protected int currentThreadsBusy;=0A= + ObjectHASH idleWorkers;=0A= =0A= /*=0A= - * Flag that the pool should terminate all the threads and stop.=0A= + * List of all the created threads, idle and busy=0A= */=0A= - protected boolean stopThePool;=0A= -=0A= - static int debug=3D0;=0A= + Hashtable workerList;=0A= =0A= - /**=0A= - * Helper object for logging=0A= - **/=0A= - LogHelper loghelper =3D new LogHelper("tc_log", "ThreadPool");=0A= - =0A= - public ThreadPool() {=0A= - maxThreads =3D MAX_THREADS;=0A= - maxSpareThreads =3D MAX_SPARE_THREADS;=0A= - minSpareThreads =3D MIN_SPARE_THREADS;=0A= - currentThreadCount =3D 0;=0A= - currentThreadsBusy =3D 0;=0A= - stopThePool =3D false;=0A= - }=0A= -=0A= - public synchronized void start() {=0A= - adjustLimits();=0A= -=0A= - openThreads(minSpareThreads);=0A= - monitor =3D new MonitorRunnable(this);=0A= - }=0A= + /*=0A= + * Thread that monitors the pool=0A= + */=0A= + protected MonitorRunnable monitor;=0A= =0A= - public void setMaxThreads(int maxThreads) {=0A= + public void setMaxThreads(int maxThreads){=0A= this.maxThreads =3D maxThreads;=0A= }=0A= =0A= - public int getMaxThreads() {=0A= + public int getMaxThreads(){=0A= return maxThreads;=0A= + } =0A= +=0A= + public void setMaxSpareThreads(int maxSpareThreads){=0A= + this.maxSpareThreads =3D maxSpareThreads;=0A= }=0A= =0A= - public void setMinSpareThreads(int minSpareThreads) {=0A= + public int getMaxSpareThreads(){=0A= + return maxSpareThreads;=0A= + } =0A= +=0A= + public void setMinSpareThreads(int minSpareThreads){=0A= this.minSpareThreads =3D minSpareThreads;=0A= }=0A= =0A= - public int getMinSpareThreads() {=0A= + public int getMinSpareThreads(){=0A= return minSpareThreads;=0A= - }=0A= + } =0A= =0A= - public void setMaxSpareThreads(int maxSpareThreads) {=0A= - this.maxSpareThreads =3D maxSpareThreads;=0A= + public void setMaxThreadRuns(int maxThreadRuns){=0A= + this.maxThreadRuns =3D maxThreadRuns;=0A= }=0A= =0A= - public int getMaxSpareThreads() {=0A= - return maxSpareThreads;=0A= + public int getMaxThreadRuns(){=0A= + return maxThreadRuns;=0A= }=0A= -=0A= - //=0A= - // You may wonder what you see here ... basically I am trying=0A= - // to maintain a stack of threads. This way locality in time=0A= - // is kept and there is a better chance to find residues of the=0A= - // thread in memory next time it runs.=0A= - //=0A= =0A= - /**=0A= - * Executes a given Runnable on a thread in the pool, block if = needed.=0A= - */=0A= - public void runIt(ThreadPoolRunnable r) {=0A= + public void setMaxThreadIterations(int maxThreadIterations){=0A= + this.maxThreadIterations =3D maxThreadIterations;=0A= + }=0A= =0A= - if(null =3D=3D r) {=0A= - throw new NullPointerException();=0A= - }=0A= + public int getMaxThreadIterations(){=0A= + return maxThreadIterations;=0A= + } =0A= =0A= - if(0 =3D=3D currentThreadCount || stopThePool) {=0A= - throw new IllegalStateException();=0A= - }=0A= + public ThreadPool() {=0A= + maxThreads =3D MAX_THREADS;=0A= + maxSpareThreads=3D MAX_SPARE_THREADS;=0A= + minSpareThreads=3DMIN_SPARE_THREADS;=0A= + maxThreadRuns=3DMAX_THREAD_RUNS;=0A= + maxThreadIterations=3DMAX_THREAD_ITERATIONS;=0A= + }=0A= =0A= - ControlRunnable c =3D null;=0A= + public void start() {=0A= + adjustLimits();=0A= =0A= - // Obtain a free thread from the pool.=0A= - synchronized(this) {=0A= - if(currentThreadsBusy =3D=3D currentThreadCount) {=0A= - // All threads are busy=0A= - if(currentThreadCount < maxThreads) {=0A= - // Not all threads were open,=0A= - // Open new threads up to the max number of idel = threads=0A= - int toOpen =3D currentThreadCount + minSpareThreads;=0A= - openThreads(toOpen);=0A= - } else {=0A= - // XXX There really should be a way to log which = pool is exhuasted=0A= - loghelper.log("Pool exhausted with " + = currentThreadCount + " threads.");=0A= -=0A= - // Wait for a thread to become idel.=0A= - while(currentThreadsBusy =3D=3D currentThreadCount) = {=0A= - try {=0A= - this.wait();=0A= - }=0A= - // was just catch Throwable -- but no other=0A= - // exceptions can be thrown by wait, right?=0A= - // So we catch and ignore this one, since=0A= - // it'll never actually happen, since nowhere=0A= - // do we say pool.interrupt().=0A= - catch(InterruptedException e) {=0A= - loghelper.log("Unexpected exception", e);=0A= - }=0A= -=0A= - // Pool was stopped. Get away of the pool.=0A= - if(0 =3D=3D currentThreadCount || stopThePool) {=0A= - throw new IllegalStateException();=0A= - }=0A= - }=0A= + if(debug > 0){=0A= + loghelper.log("ThreadPool: "+this);=0A= + loghelper.log("ThreadPool: maxThreads: "+maxThreads);=0A= + loghelper.log("ThreadPool: maxSpareThreads: = "+maxSpareThreads);=0A= + loghelper.log("ThreadPool: minSpareThreads: = "+minSpareThreads);=0A= + loghelper.log("ThreadPool: maxThreadRuns: "+maxThreadRuns);=0A= + loghelper.log("ThreadPool: maxThreadIterations: = "+maxThreadIterations);=0A= + } =0A= +=0A= + idleWorkers =3D new ObjectHASH(maxThreads);=0A= + workerList =3D new Hashtable();=0A= +=0A= + openThreads();=0A= + monitor =3D new MonitorRunnable(this,WORK_WAIT_TIMEOUT);=0A= + }=0A= +=0A= + public void openThreads(){=0A= + try{=0A= + //Needs to be synchronized so that we never open more than = maxThreads=0A= + synchronized(idleWorkers){=0A= + int currentThreadCount =3D workerList.size();=0A= +=0A= + int toOpen =3D currentThreadCount+minSpareThreads;=0A= + if(toOpen > maxThreads)=0A= + toOpen =3D maxThreads;=0A= +=0A= + for(int i=3DcurrentThreadCount;i 0){=0A= + loghelper.log("ThreadPool: open thread = "+c.getId());=0A= + } =0A= }=0A= - }=0A= -=0A= - // If we are here it means that there is a free thred. Take = it.=0A= - c =3D (ControlRunnable)pool.lastElement();=0A= - pool.removeElement(c);=0A= - currentThreadsBusy++;=0A= + } =0A= + }catch(Exception e){=0A= }=0A= - c.runIt(r);=0A= }=0A= =0A= - /**=0A= - * Stop the thread pool=0A= - */=0A= - public synchronized void shutdown() {=0A= - if(!stopThePool) {=0A= - stopThePool =3D true;=0A= - monitor.terminate();=0A= - monitor =3D null;=0A= - for(int i =3D 0 ; i < (currentThreadCount - = currentThreadsBusy) ; i++) {=0A= - try {=0A= - ((ControlRunnable)(pool.elementAt(i))).terminate();=0A= - } catch(Throwable t) {=0A= - /* =0A= - * Do nothing... The show must go on, we are shutting =0A= - * down the pool and nothing should stop that.=0A= - */=0A= - loghelper.log("Ignored exception while shutting down thread = pool", t, Logger.ERROR);=0A= + public void closeThreads(){=0A= + try{=0A= + //Needs to be synchronized so that we do not open more = threads than minSpareThreads=0A= + synchronized(idleWorkers){=0A= + int currentThreadCountBusy =3D idleWorkers.getSize();=0A= + int currentThreadCount =3D workerList.size();=0A= +=0A= + if((currentThreadCount - currentThreadCountBusy) > = maxSpareThreads) {=0A= + int toFree =3D currentThreadCount -=0A= + currentThreadCountBusy -=0A= + maxSpareThreads;=0A= +=0A= + for(int i =3D 0 ; i < toFree ; i++) {=0A= + ControlRunnable c =3D = (ControlRunnable)idleWorkers.remove();=0A= + workerList.remove(c.getId());=0A= + c.stopRequest();=0A= + } =0A= }=0A= - }=0A= - currentThreadsBusy =3D currentThreadCount =3D 0;=0A= - pool =3D null;=0A= - notifyAll();=0A= + } =0A= + }catch(Exception e){=0A= }=0A= - }=0A= -=0A= - /**=0A= - * Called by the monitor thread to harvest idel threads.=0A= - */=0A= - protected synchronized void checkSpareControllers() {=0A= + } =0A= =0A= - if(stopThePool) {=0A= - return;=0A= - }=0A= - if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) = {=0A= - int toFree =3D currentThreadCount -=0A= - currentThreadsBusy -=0A= - maxSpareThreads;=0A= -=0A= - for(int i =3D 0 ; i < toFree ; i++) {=0A= - ControlRunnable c =3D = (ControlRunnable)pool.firstElement();=0A= - pool.removeElement(c);=0A= - c.terminate();=0A= - currentThreadCount --;=0A= + public ControlRunnable getControlRunnable(){=0A= + ControlRunnable c =3D null;=0A= + try{=0A= + //could remove synchronized to improve performance=0A= + //workerList may not be up to date but that is fine as long = as you=0A= + //double check idleWorkers before using workerList = (checkWorkers does this)=0A= +=0A= + synchronized(idleWorkers){=0A= + c =3D (ControlRunnable)idleWorkers.remove();=0A= + WorkerStatus ws =3D = (WorkerStatus)workerList.get(c.getId());=0A= + if(ws !=3D null){=0A= + ws.t =3D 1;=0A= + ws.runs++;=0A= + } =0A= }=0A= + }catch(Exception e){=0A= }=0A= - }=0A= + return c;=0A= + } =0A= =0A= - /**=0A= - * Returns the thread to the pool.=0A= - * Called by threads as they are becoming idel.=0A= + /*=0A= + * Method called by the worker thread to indicate that it=0A= + * has begun working. It has moved past socket.accept=0A= */=0A= - protected synchronized void returnController(ControlRunnable c) {=0A= -=0A= - if(0 =3D=3D currentThreadCount || stopThePool) {=0A= - c.terminate();=0A= - return;=0A= - }=0A= -=0A= - currentThreadsBusy--;=0A= - pool.addElement(c);=0A= - notify();=0A= + public void setWorkerTimer(String id,int t){=0A= + WorkerStatus ws =3D (WorkerStatus)workerList.get(id);=0A= + if(ws !=3D null) ws.t =3D t;=0A= }=0A= =0A= - /**=0A= - * Inform the pool that the specific thread finish.=0A= - *=0A= - * Called by the ControlRunnable.run() when the runnable =0A= - * throws an exception.=0A= - */=0A= - protected synchronized void notifyThreadEnd(ControlRunnable c) {=0A= - currentThreadsBusy--;=0A= - currentThreadCount --;=0A= - notify();=0A= - }=0A= - =0A= -=0A= - /*=0A= - * Checks for problematic configuration and fix it.=0A= - * The fix provides reasonable settings for a single CPU=0A= - * with medium load.=0A= - */=0A= - protected void adjustLimits() {=0A= - if(maxThreads <=3D 0) {=0A= + public void adjustLimits() {=0A= + if(maxThreads <=3D 0){=0A= maxThreads =3D MAX_THREADS;=0A= }=0A= =0A= - if(maxSpareThreads >=3D maxThreads) {=0A= + if(maxSpareThreads >=3D maxThreads){=0A= maxSpareThreads =3D maxThreads;=0A= }=0A= =0A= - if(maxSpareThreads <=3D 0) {=0A= - if(1 =3D=3D maxThreads) {=0A= + if(maxSpareThreads <=3D 0){=0A= + if(maxThreads =3D=3D 1) {=0A= maxSpareThreads =3D 1;=0A= - } else {=0A= + }else{=0A= maxSpareThreads =3D maxThreads/2;=0A= }=0A= - }=0A= + }=0A= =0A= - if(minSpareThreads > maxSpareThreads) {=0A= - minSpareThreads =3D maxSpareThreads;=0A= - }=0A= + if(minSpareThreads > maxSpareThreads){=0A= + minSpareThreads =3D maxSpareThreads;=0A= + }=0A= =0A= - if(minSpareThreads <=3D 0) {=0A= - if(1 =3D=3D maxSpareThreads) {=0A= + if(minSpareThreads <=3D 0){=0A= + if(maxSpareThreads =3D=3D 1){=0A= minSpareThreads =3D 1;=0A= - } else {=0A= + }else{=0A= minSpareThreads =3D maxSpareThreads/2;=0A= }=0A= + }=0A= +=0A= +=0A= + if(maxThreadRuns <=3D 0){=0A= + maxThreadRuns =3D MAX_THREAD_RUNS;=0A= + }=0A= +=0A= + if(maxThreadIterations <=3D 0){=0A= + maxThreadIterations =3D MAX_THREAD_ITERATIONS;=0A= + } =0A= + } =0A= + =0A= +=0A= + public void runIt(ThreadPoolRunnable r){=0A= + try{=0A= + ControlRunnable c =3D getControlRunnable();=0A= + c.process(r);=0A= + }catch(InterruptedException e){=0A= + }=0A= + }=0A= +=0A= + public void stopRequestIdleWorkers() {=0A= + try{=0A= + Object[] idle =3D idleWorkers.removeAll();=0A= + for(int i=3D0;i 0){=0A= + loghelper.log("ThreadPool: checkWorkers run");=0A= + loghelper.log("ThreadPool: checkWorkers = workerList.size() "+workerList.size());=0A= + loghelper.log("ThreadPool: checkWorkers = idleWorkers.getSize() "+idleWorkers.getSize());=0A= + } =0A= +=0A= + Vector all_keys =3D new Vector();=0A= + Enumeration keys =3D workerList.keys();=0A= + int count=3D0;=0A= + while(keys.hasMoreElements()){=0A= + count++;=0A= + all_keys.add(keys.nextElement());=0A= + } =0A= +=0A= + for(int i=3D0;i0){=0A= + loghelper.log("ThreadPool: checkWorkers: = "+key+" runs: "+ws.runs+" t: "+ws.t);=0A= + }=0A= +=0A= + if(ws.t > 1){=0A= + if(ws.t > maxThreadIterations){=0A= + ControlRunnable c =3D = (ControlRunnable)idleWorkers.remove(key);=0A= + if(c !=3D null){=0A= + if(debug>0){=0A= + loghelper.log("ThreadPool: OOPS, = SOMEONE FREED THE THREAD");=0A= + } =0A= + }else{ =0A= + workerList.remove(key);=0A= + ws.c.stopRequest();=0A= + if(debug>0){=0A= + loghelper.log("ThreadPool: = TERMINATED LONG RUNNING THREAD: "+key);=0A= + } =0A= + }=0A= + }else{=0A= + ws.t++; =0A= + } =0A= + }=0A= + else if(ws.t =3D=3D 0){=0A= + if(ws.runs > maxThreadRuns){=0A= + Object c =3D idleWorkers.remove(key);=0A= + if(c =3D=3D null){=0A= + if(debug>0){=0A= + loghelper.log("ThreadPool: OOPS, = SOMEONE REMOVED THE IDLEWORKER FOR THE TIRED THREAD "+key);=0A= + } =0A= + }else{ =0A= + workerList.remove(key);=0A= + ws.c.stopRequest();=0A= + if(debug>0){=0A= + loghelper.log("ThreadPool: = TERMINATED TIRED THREAD: "+key);=0A= + } =0A= + } =0A= + } =0A= + }=0A= + =0A= + }=0A= + } =0A= +=0A= + //Adjust pool size=0A= + if(idleWorkers.getSize() =3D=3D 0){=0A= + openThreads();=0A= + }else{=0A= + closeThreads();=0A= + } =0A= +=0A= +=0A= + if(debug>0){=0A= + loghelper.log("ThreadPool: idleWorkers.getSize(): = "+idleWorkers.getSize());=0A= + loghelper.log("ThreadPool: workerList.size(): = "+workerList.size());=0A= + } =0A= +=0A= + }=0A= +=0A= +=0A= + }catch(Exception e){=0A= }=0A= }=0A= +=0A= + /**=0A= + * The thread to be ran on the thread pool=0A= + */=0A= + private class ControlRunnable extends Object {=0A= + /*=0A= + *List of idle workers, received from ThreadPool=0A= + */=0A= + private ObjectHASH idleWorkers;=0A= +=0A= + /*=0A= + *List of all workers (idle and busy) received from ThreadPool=0A= + */=0A= + private Hashtable workerList;=0A= +=0A= + /*=0A= + *Used to get the object to run=0A= + */=0A= + private ObjectHASH handoffBox;=0A= +=0A= + private Thread internalThread;=0A= + private volatile boolean noStopRequested;=0A= + private boolean noThData;=0A= + private Object thData[]=3Dnull;=0A= + private long maxThreadRuns;=0A= +=0A= + public ControlRunnable(ObjectHASH idleWorkers,Hashtable = workerList,long maxThreadRuns) {=0A= + this.idleWorkers =3D idleWorkers;=0A= + this.workerList =3D workerList;=0A= + this.maxThreadRuns =3D maxThreadRuns;=0A= +=0A= + handoffBox =3D new ObjectHASH(1);=0A= + noStopRequested =3D true;=0A= + noThData =3D true;=0A= +=0A= + workerList.put(this.toString(),new WorkerStatus(this));=0A= +=0A= + Runnable r =3D new Runnable(){=0A= + public void run(){=0A= + try{=0A= + runWork();=0A= + } catch(Exception x){=0A= + }=0A= + } =0A= + };=0A= +=0A= + internalThread =3D new Thread(r);=0A= + internalThread.start();=0A= + } =0A= +=0A= + public String getId(){=0A= + return this.toString();=0A= + } =0A= +=0A= + public void process(ThreadPoolRunnable r) throws = InterruptedException {=0A= + handoffBox.add(r);=0A= + }=0A= +=0A= + private void runWork() {=0A= + while(noStopRequested){=0A= + try{=0A= + //Could elminate synchronized but need to=0A= + //keep in mind that workerList may not get updated = in time=0A= + synchronized(idleWorkers){=0A= + idleWorkers.add(this);=0A= + WorkerStatus ws =3D = (WorkerStatus)workerList.get(getId());=0A= + if(ws !=3D null){=0A= + ws.t =3D 0;=0A= + }=0A= + }=0A= =0A= - protected void openThreads(int toOpen) {=0A= + /*=0A= + //If you threads need to stop after exactly = maxThreadRuns=0A= + //comment above code, and uncomment this code=0A= + WorkerStatus ws =3D = (WorkerStatus)workerList.get(getId());=0A= + if(ws !=3D null){=0A= + if(ws.runs > maxThreadRuns){=0A= + workerList.remove(getId());=0A= + //Create new thread to replace this one=0A= + System.out.println("ThreadPool: TERMINATED = THREAD: "+getId()+" runs: "+ws.runs);=0A= + ControlRunnable c =3D new = ControlRunnable(idleWorkers,workerList,maxThreadRuns);=0A= + break;=0A= + }else{=0A= + ws.t =3D 0;=0A= + idleWorkers.add(this);=0A= + } =0A= + }else{=0A= + //Should never get here=0A= + break;=0A= + } =0A= + */=0A= =0A= - if(toOpen > maxThreads) {=0A= - toOpen =3D maxThreads;=0A= + //Blocks until there is an object to run=0A= + ThreadPoolRunnable r =3D (ThreadPoolRunnable) = handoffBox.remove();=0A= + runIt(r);=0A= + } catch(Exception e){=0A= + Thread.currentThread().interrupt();=0A= + }=0A= + } =0A= }=0A= =0A= - if(0 =3D=3D currentThreadCount) {=0A= - pool =3D new Vector(toOpen);=0A= + private void runIt(ThreadPoolRunnable toRun) {=0A= + try{=0A= + if(toRun !=3D null){=0A= + if(noThData) {=0A= + thData=3DtoRun.getInitData();=0A= + noThData =3D false;=0A= + } =0A= + toRun.runIt(thData,getId());=0A= + } =0A= + }catch(Exception runex){=0A= + }finally{=0A= + Thread.interrupted();=0A= + }=0A= }=0A= =0A= - for(int i =3D currentThreadCount ; i < toOpen ; i++) {=0A= - pool.addElement(new ControlRunnable(this));=0A= + public void stopRequest() {=0A= + noStopRequested =3D false;=0A= + internalThread.interrupt();=0A= }=0A= =0A= - currentThreadCount =3D toOpen;=0A= + public boolean isAlive() {=0A= + return internalThread.isAlive();=0A= + } =0A= }=0A= =0A= - void log( String s ) {=0A= - loghelper.log(s);=0A= - }=0A= - =0A= - /** =0A= - * Periodically execute an action - cleanup in this case=0A= + /**=0A= + * Class used to monitor the thread pool=0A= */=0A= - class MonitorRunnable implements Runnable {=0A= + private class MonitorRunnable implements Runnable {=0A= ThreadPool p;=0A= Thread t;=0A= - boolean shouldTerminate;=0A= + boolean noStopRequested;=0A= + int timeout;=0A= =0A= - MonitorRunnable(ThreadPool p) {=0A= - shouldTerminate =3D false;=0A= + MonitorRunnable(ThreadPool p,int timeout) {=0A= + noStopRequested =3D true;=0A= + this.timeout =3D timeout;=0A= this.p =3D p;=0A= t =3D new Thread(this);=0A= t.start();=0A= }=0A= =0A= public void run() {=0A= - while(true) {=0A= + while(noStopRequested) {=0A= try {=0A= - // Sleep for a while.=0A= - synchronized(this) {=0A= - this.wait(WORK_WAIT_TIMEOUT);=0A= - }=0A= -=0A= - // Check if should terminate.=0A= - // termination happens when the pool is shutting = down.=0A= - if(shouldTerminate) {=0A= - break;=0A= - }=0A= -=0A= - // Harvest idle threads.=0A= - p.checkSpareControllers();=0A= -=0A= - } catch(Throwable t) {=0A= - loghelper.log("Unexpected exception", t, Logger.ERROR);=0A= + t.sleep(timeout);=0A= + p.checkWorkers();=0A= + } catch(InterruptedException x){=0A= + Thread.currentThread().interrupt();=0A= }=0A= }=0A= }=0A= =0A= - /** Stop the monitor=0A= - */=0A= - public synchronized void terminate() {=0A= - shouldTerminate =3D true;=0A= - this.notify();=0A= + public void stopRequest() {=0A= + noStopRequested =3D false;=0A= + t.interrupt();=0A= }=0A= - }=0A= + } =0A= =0A= /**=0A= - * A Thread object that executes various actions ( = ThreadPoolRunnable )=0A= - * under control of ThreadPool=0A= + * Class to keep track of the status of a worker thread=0A= + * in the thread pool=0A= */=0A= - class ControlRunnable implements Runnable {=0A= + private class WorkerStatus{=0A= + /*=0A= + * Number of times that the monitor thread has found=0A= + * this thread to be busy. Busy meas the thread is=0A= + * handling a request, not that the thread is blocked=0A= + * in a socket.accept call.=0A= + * Possible values:=0A= + * 0 - the thread is waiting for an object to run=0A= + * 1 - the thread has been claimed and is in socket.accept=0A= + * 2 and greater - the thread is handling a request=0A= + */=0A= + public long t; =0A= +=0A= + /*=0A= + * Number of times the thread has been ran=0A= + */=0A= + public long runs;=0A= +=0A= + /*=0A= + * Pointer to the thread itself=0A= + */=0A= + public ControlRunnable c; =0A= +=0A= + public WorkerStatus(ControlRunnable c){=0A= + this.c =3D c;=0A= + this.t =3D 0;=0A= + this.runs =3D 0;=0A= + } =0A= + } =0A= =0A= - /**=0A= - * ThreadPool where this thread will be returned=0A= - */=0A= - ThreadPool p;=0A= + /**=0A= + * thread safe hashtable=0A= + * used by thread pool to store available threads=0A= + * and by ControlRunnable to get object to run=0A= + */=0A= + private class ObjectHASH extends Object {=0A= + /*=0A= + * Hashtable to contain the data=0A= + */=0A= + private Hashtable index;=0A= =0A= - /**=0A= - * The thread that executes the actions=0A= - */=0A= - Thread t;=0A= + private int capacity;=0A= + private int size;=0A= =0A= - /**=0A= - * The method that is executed in this thread=0A= - */=0A= - ThreadPoolRunnable toRun;=0A= -=0A= - /**=0A= - * Stop this thread=0A= - */=0A= - boolean shouldTerminate;=0A= -=0A= - /**=0A= - * Activate the execution of the action=0A= - */=0A= - boolean shouldRun;=0A= -=0A= - /**=0A= - * Per thread data - can be used only if all actions are=0A= - * of the same type.=0A= - * A better mechanism is possible ( that would allow association of=0A= - * thread data with action type ), but right now it's enough.=0A= - */=0A= - boolean noThData;=0A= - Object thData[]=3Dnull;=0A= -=0A= - /**=0A= - * Start a new thread, with no method in it=0A= - */=0A= - ControlRunnable(ThreadPool p) {=0A= - toRun =3D null;=0A= - shouldTerminate =3D false;=0A= - shouldRun =3D false;=0A= - this.p =3D p;=0A= - t =3D new Thread(this);=0A= - t.start();=0A= - noThData=3Dtrue;=0A= - thData=3Dnull;=0A= + public ObjectHASH(int cap){=0A= + capacity =3D (cap > 0) ? cap : 1;=0A= +=0A= + index =3D new Hashtable();=0A= + size =3D 0;=0A= }=0A= =0A= - public void run() {=0A= - =0A= - while(true) {=0A= - try { =0A= - /* Wait for work. */=0A= - synchronized(this) {=0A= - if(!shouldRun && !shouldTerminate) {=0A= - this.wait();=0A= - }=0A= - }=0A= - if(toRun =3D=3D null ) {=0A= - if( p.debug>0) p.log( "No toRun ???");=0A= - }=0A= -=0A= - if( shouldTerminate ) {=0A= - if( p.debug>0) p.log( "Terminate");=0A= - break;=0A= - }=0A= -=0A= - /* Check if should execute a runnable. */=0A= - try {=0A= - if(noThData) {=0A= - if(p.debug>0) p.log( "Getting new thread data");=0A= - thData=3DtoRun.getInitData();=0A= - noThData =3D false;=0A= - }=0A= -=0A= - if(shouldRun) {=0A= - toRun.runIt(thData);=0A= - }=0A= - } catch(Throwable t) {=0A= - loghelper.log("Caught exception executing " + toRun.toString() + ", = terminating thread", t);=0A= - /*=0A= - * The runnable throw an exception (can be even = a ThreadDeath),=0A= - * signalling that the thread die.=0A= - *=0A= - * The meaning is that we should release the thread = from=0A= - * the pool.=0A= - */=0A= - shouldTerminate =3D true;=0A= - shouldRun =3D false;=0A= - p.notifyThreadEnd(this);=0A= - } finally {=0A= - if(shouldRun) {=0A= - shouldRun =3D false;=0A= - /*=0A= - * Notify the pool that the thread is now idle.=0A= - */=0A= - p.returnController(this);=0A= - }=0A= - }=0A= + public int getCapacity(){=0A= + return capacity;=0A= + }=0A= =0A= - /*=0A= - * Check if should terminate.=0A= - * termination happens when the pool is shutting = down.=0A= - */=0A= - if(shouldTerminate) {=0A= - break;=0A= - }=0A= - } catch(InterruptedException ie) { /* for the wait = operation */=0A= - // can never happen, since we don't call interrupt=0A= - loghelper.log("Unexpected exception", ie);=0A= - }=0A= - }=0A= + public synchronized int getSize(){=0A= + return size;=0A= }=0A= =0A= - public synchronized void runIt(ThreadPoolRunnable toRun) {=0A= - if( toRun =3D=3D null ) {=0A= - throw new NullPointerException("No Runnable");=0A= - }=0A= - this.toRun =3D toRun;=0A= - shouldRun =3D true;=0A= - this.notify();=0A= + public synchronized boolean isEmpty(){=0A= + return (size =3D=3D 0);=0A= }=0A= =0A= - public synchronized void terminate() {=0A= - shouldTerminate =3D true;=0A= - this.notify();=0A= + public synchronized boolean isFull(){=0A= + return (size =3D=3D capacity);=0A= }=0A= - }=0A= +=0A= + public synchronized void add(Object obj) throws = InterruptedException{=0A= + waitWhileFull();=0A= +=0A= + index.put(obj.toString(),obj);=0A= +=0A= + size++;=0A= + notifyAll();=0A= + }=0A= +=0A= + public synchronized Object remove() throws InterruptedException{=0A= + waitWhileEmpty();=0A= +=0A= + //XXX: =0A= + //Need to find a more efficient way to get the=0A= + //first element of a hash=0A= +=0A= + String key =3D null;=0A= +=0A= + Enumeration keys =3D index.keys();=0A= + if(keys.hasMoreElements()){=0A= + key =3D (String)keys.nextElement();=0A= + } =0A= +=0A= + Object obj =3D index.remove(key);=0A= + size --;=0A= +=0A= + notifyAll();=0A= +=0A= + return obj;=0A= + }=0A= +=0A= + /*=0A= + * Method to get a particular object from the hash=0A= + * does not block, if the object does not exist it returns null=0A= + */=0A= + public synchronized Object remove(String key) throws = InterruptedException{=0A= + Object o =3D index.remove(key);=0A= + if(o !=3D null){=0A= + size--;=0A= + notifyAll();=0A= + } =0A= + return o;=0A= + } =0A= +=0A= + public synchronized Object[] removeAll() throws = InterruptedException{=0A= + Object[] list =3D new Object[size];=0A= +=0A= + Enumeration keys =3D index.keys();=0A= + int i =3D 0;=0A= + while(keys.hasMoreElements()){=0A= + String key =3D (String)keys.nextElement();=0A= + list[i++] =3D remove(key);=0A= + } =0A= +=0A= + return list;=0A= + } =0A= +=0A= +=0A= + public synchronized void waitWhileFull() throws = InterruptedException{=0A= + while(isFull()){=0A= + wait();=0A= + }=0A= + }=0A= +=0A= + public synchronized void waitWhileEmpty() throws = InterruptedException{=0A= + while(isEmpty()){=0A= + wait();=0A= + }=0A= + }=0A= + } =0A= }=0A= Index: ThreadPoolRunnable.java=0A= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=0A= RCS file: = /home/cvspublic/jakarta-tomcat/src/share/org/apache/tomcat/util/Attic/Thr= eadPoolRunnable.java,v=0A= retrieving revision 1.1=0A= diff -u -r1.1 ThreadPoolRunnable.java=0A= --- ThreadPoolRunnable.java 2000/05/26 23:06:39 1.1=0A= +++ ThreadPoolRunnable.java 2001/06/01 20:10:29=0A= @@ -64,6 +64,8 @@=0A= import java.util.*;=0A= import java.io.*;=0A= =0A= +=0A= +=0A= /** Implemented if you want to run a piece of code inside a thread pool.=0A= */=0A= public interface ThreadPoolRunnable {=0A= @@ -77,9 +79,17 @@=0A= */=0A= public Object[] getInitData();=0A= =0A= +=0A= /** This method will be executed in one of the pool's threads. The=0A= * thread will be returned to the pool.=0A= */=0A= public void runIt(Object thData[]);=0A= =0A= + /* =0A= + * Same as above, but a thread id is passed,=0A= + * so that the working thread can indicate when it has begun=0A= + * to perform real work as oppossed to just waiting on a socket=0A= + * useful to stop long running threads=0A= + */=0A= + public void runIt(Object thData[],String id);=0A= }=0A= Index: PoolTcpEndpoint.java=0A= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=0A= RCS file: = /home/cvspublic/jakarta-tomcat/src/share/org/apache/tomcat/service/Attic/= PoolTcpEndpoint.java,v=0A= retrieving revision 1.8.2.5=0A= diff -u -r1.8.2.5 PoolTcpEndpoint.java=0A= --- PoolTcpEndpoint.java 2001/03/21 17:11:29 1.8.2.5=0A= +++ PoolTcpEndpoint.java 2001/06/01 20:10:48=0A= @@ -388,8 +388,12 @@=0A= return obj;=0A= }=0A= }=0A= +=0A= + public void runIt(Object perThrData[]){=0A= + runIt(perThrData,null);=0A= + }=0A= =0A= - public void runIt(Object perThrData[]) {=0A= + public void runIt(Object perThrData[],String id) {=0A= TcpConnection con=3Dnull;=0A= if( ! usePool ) {=0A= // extract the original.=0A= @@ -403,6 +407,10 @@=0A= if(null !=3D s) {=0A= // Continue accepting on another thread...=0A= endpoint.tp.runIt(this);=0A= + //Tell the thread pool that we are about to process a = request=0A= + if(id !=3D null){=0A= + endpoint.tp.setWorkerTimer(id,2);=0A= + } =0A= =0A= try {=0A= if( usePool ) {=0A= ------=_NextPart_000_0011_01C0EAB8.998842E0--