tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Costin Manolache" <cos...@gmail.com>
Subject Re: svn commit: r410234 - /tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java
Date Tue, 30 May 2006 17:24:41 GMT
Few questions:

- why is this needed ( i.e. what problem with the ThreadPoolExecutor is it
solving ) ?

- who is cleaning worker threads ( after a peak ) ?

- it would be good to have some comments on the stack - what happens on
push() if end == workers.length for
example, or why this won't happen

- try/catch may be good in worker.run, or you may miss recycle and cleanup

- why not just add executor interface to the existing ( and relatively well
tested ) thread pool ??

- hooks in TPExecutor are nice and may be useful...

I think it would be ok to add this to sandbox for example, but not very sure
about adding it to the main tree.
Doesn't look very solid....


Costin

On 5/30/06, remm@apache.org <remm@apache.org> wrote:
>
> Author: remm
> Date: Tue May 30 02:58:41 2006
> New Revision: 410234
>
> URL: http://svn.apache.org/viewvc?rev=410234&view=rev
> Log:
> - Add a brain dead executor.
> - Submitted by Vincenc Beltran Querol.
>
> Added:
>     tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java
> (with props)
>
> Added:
> tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java
> URL:
> http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java?rev=410234&view=auto
>
> ==============================================================================
> ---
> tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java
> (added)
> +++
> tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java
> Tue May 30 02:58:41 2006
> @@ -0,0 +1,312 @@
> +/*
> + *  Copyright 1999-2006 The Apache Software Foundation
> + *
> + *  Licensed under the Apache License, Version 2.0 (the "License");
> + *  you may not use this file except in compliance with the License.
> + *  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + *  Unless required by applicable law or agreed to in writing, software
> + *  distributed under the License is distributed on an "AS IS" BASIS,
> + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + *  See the License for the specific language governing permissions and
> + *  limitations under the License.
> + */
> +
> +package org.apache.tomcat.util.net;
> +
> +import java.util.concurrent.Executor;
> +
> +public class SimpleThreadPoolExecutor implements Executor {
> +
> +
> +    private boolean running = true;
> +
> +    /**
> +     * Available workers.
> +     */
> +    protected WorkerStack workers = null;
> +
> +    /**
> +     * Current worker threads busy count.
> +     */
> +    protected int curThreadsBusy = 0;
> +
> +
> +    /**
> +     * Current worker threads count.
> +     */
> +    protected int curThreads = 0;
> +
> +
> +    /**
> +     * Sequence number used to generate thread names.
> +     */
> +    protected int sequence = 0;
> +
> +
> +    /**
> +     * Maximum amount of worker threads.
> +     */
> +    protected int maxThreads = 40;
> +    public void setMaxThreads(int maxThreads) { this.maxThreads =
> maxThreads; }
> +    public int getMaxThreads() { return maxThreads; }
> +
> +    /**
> +     * Name of the thread pool, which will be used for naming child
> threads.
> +     */
> +    protected String name = "TP";
> +    public void setName(String name) { this.name = name; }
> +    public String getName() { return name; }
> +
> +    public int getCurrentThreadCount() {
> +        return curThreads;
> +    }
> +
> +    public int getCurrentThreadsBusy() {
> +        return curThreads - workers.size();
> +    }
> +
> +    public SimpleThreadPoolExecutor(String name, int maxThreads) {
> +        this.name = name;
> +        this.maxThreads = maxThreads;
> +        workers = new WorkerStack(maxThreads);
> +    }
> +
> +    public void execute(Runnable job){
> +        getWorkerThread().assign(job);
> +    }
> +
> +    // ----------------------------------------------------- Worker Inner
> Class
> +
> +
> +    protected class Worker implements Runnable {
> +
> +        protected Thread thread = null;
> +        protected boolean available = false;
> +        protected Runnable job = null;
> +
> +
> +        /**
> +         * Process an incoming TCP/IP connection on the specified
> socket.  Any
> +         * exception that occurs during processing must be logged and
> swallowed.
> +         * <b>NOTE</b>:  This method is called from our Connector's
> thread.  We
> +         * must assign it to our own thread so that multiple simultaneous
> +         * requests can be handled.
> +         *
> +         * @param socket TCP socket to process
> +         */
> +        synchronized void assign(Runnable job) {
> +
> +            // Wait for the Processor to get the previous Socket
> +            while (available) {
> +                try {
> +                    wait();
> +                } catch (InterruptedException e) {
> +                }
> +            }
> +
> +            // Store the newly available Socket and notify our thread
> +            this.job = job;
> +            available = true;
> +            notifyAll();
> +
> +        }
> +
> +
> +        /**
> +         * Await a newly assigned Socket from our Connector, or
> <code>null</code>
> +         * if we are supposed to shut down.
> +         */
> +        private synchronized Runnable await() {
> +
> +            // Wait for the Connector to provide a new Socket
> +            while (!available) {
> +                try {
> +                    wait();
> +                } catch (InterruptedException e) {
> +                }
> +            }
> +
> +            // Notify the Connector that we have received this Socket
> +            Runnable job = this.job;
> +            available = false;
> +            notifyAll();
> +
> +            return (job);
> +
> +        }
> +
> +        public void shutdown(){
> +            running = false;
> +        }
> +
> +        /**
> +         * The background thread that listens for incoming TCP/IP
> connections and
> +         * hands them off to an appropriate processor.
> +         */
> +        public void run() {
> +
> +            // Process requests until we receive a shutdown signal
> +            while (running) {
> +
> +                // Wait for the next socket to be assigned
> +                Runnable job = await();
> +                if (job == null)
> +                    continue;
> +
> +                job.run();
> +
> +                job = null;
> +
> +                recycleWorkerThread(this);
> +            }
> +
> +        }
> +
> +
> +        /**
> +         * Start the background processing thread.
> +         */
> +        public void start() {
> +            thread = new Thread(this);
> +            thread.setName(getName() + "-" + (++curThreads));
> +            thread.setDaemon(true);
> +            thread.start();
> +        }
> +
> +
> +    }
> +
> +
> +    /**
> +     * Create (or allocate) and return an available processor for use in
> +     * processing a specific HTTP request, if possible.  If the maximum
> +     * allowed processors have already been created and are in use,
> return
> +     * <code>null</code> instead.
> +     */
> +    protected Worker createWorkerThread() {
> +
> +        synchronized (workers) {
> +            if (workers.size() > 0) {
> +                curThreadsBusy++;
> +                return workers.pop();
> +            }
> +            if ((maxThreads > 0) && (curThreads < maxThreads)) {
> +                curThreadsBusy++;
> +                return (newWorkerThread());
> +            } else {
> +                if (maxThreads < 0) {
> +                    curThreadsBusy++;
> +                    return (newWorkerThread());
> +                } else {
> +                    return (null);
> +                }
> +            }
> +        }
> +
> +    }
> +
> +    /**
> +     * Create and return a new processor suitable for processing HTTP
> +     * requests and returning the corresponding responses.
> +     */
> +    protected Worker newWorkerThread() {
> +
> +        Worker workerThread = new Worker();
> +        workerThread.start();
> +        return (workerThread);
> +
> +    }
> +
> +    /**
> +     * Return a new worker thread, and block while to worker is
> available.
> +     */
> +    protected Worker getWorkerThread() {
> +        // Allocate a new worker thread
> +        Worker workerThread = createWorkerThread();
> +        while (workerThread == null) {
> +            try {
> +                synchronized (workers) {
> +                    workers.wait();
> +                }
> +            } catch (InterruptedException e) {
> +                // Ignore
> +            }
> +            workerThread = createWorkerThread();
> +        }
> +        return workerThread;
> +    }
> +
> +
> +    /**
> +     * Recycle the specified Processor so that it can be used again.
> +     *
> +     * @param workerThread The processor to be recycled
> +     */
> +    protected void recycleWorkerThread(Worker workerThread) {
> +        synchronized (workers) {
> +            workers.push(workerThread);
> +            curThreadsBusy--;
> +            workers.notify();
> +        }
> +    }
> +
> +    // ------------------------------------------------- WorkerStack
> Inner Class
> +
> +
> +    public class WorkerStack {
> +
> +        protected Worker[] workers = null;
> +        protected int end = 0;
> +
> +        public WorkerStack(int size) {
> +            workers = new Worker[size];
> +        }
> +
> +        /**
> +         * Put the object into the queue.
> +         *
> +         * @param   object      the object to be appended to the queue
> (first element).
> +         */
> +        public void push(Worker worker) {
> +            workers[end++] = worker;
> +        }
> +
> +        /**
> +         * Get the first object out of the queue. Return null if the
> queue
> +         * is empty.
> +         */
> +        public Worker pop() {
> +            if (end > 0) {
> +                return workers[--end];
> +            }
> +            return null;
> +        }
> +
> +        /**
> +         * Get the first object out of the queue, Return null if the
> queue
> +         * is empty.
> +         */
> +        public Worker peek() {
> +            return workers[end];
> +        }
> +
> +        /**
> +         * Is the queue empty?
> +         */
> +        public boolean isEmpty() {
> +            return (end == 0);
> +        }
> +
> +        /**
> +         * How many elements are there in this queue?
> +         */
> +        public int size() {
> +            return (end);
> +        }
> +    }
> +
> +
> +}
>
> Propchange:
> tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java
>
> ------------------------------------------------------------------------------
>     svn:eol-style = native
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
> For additional commands, e-mail: dev-help@tomcat.apache.org
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message