Return-Path: Mailing-List: contact dev-help@ant.apache.org; run by ezmlm Delivered-To: mailing list dev@ant.apache.org Received: (qmail 35205 invoked by uid 500); 11 Feb 2003 11:26:46 -0000 Received: (qmail 35202 invoked from network); 11 Feb 2003 11:26:46 -0000 Received: from icarus.apache.org (208.185.179.13) by daedalus.apache.org with SMTP; 11 Feb 2003 11:26:46 -0000 Received: (qmail 14247 invoked by uid 1142); 11 Feb 2003 11:26:44 -0000 Date: 11 Feb 2003 11:26:44 -0000 Message-ID: <20030211112644.14246.qmail@icarus.apache.org> From: conor@apache.org To: ant-cvs@apache.org Subject: cvs commit: ant/src/testcases/org/apache/tools/ant/taskdefs ParallelTest.java X-Spam-Rating: daedalus.apache.org 1.6.2 0/1000/N conor 2003/02/11 03:26:44 Modified: src/etc/testcases/taskdefs parallel.xml src/main/org/apache/tools/ant/taskdefs Parallel.java src/testcases/org/apache/tools/ant/taskdefs ParallelTest.java Log: Add a thread count to the parallel task to stop it using too many threads PR: 16906 Submitted by: Danno Ferrin Revision Changes Path 1.2 +95 -0 ant/src/etc/testcases/taskdefs/parallel.xml Index: parallel.xml =================================================================== RCS file: /home/cvs/ant/src/etc/testcases/taskdefs/parallel.xml,v retrieving revision 1.1 retrieving revision 1.2 diff -u -w -u -r1.1 -r1.2 --- parallel.xml 21 Feb 2002 15:38:16 -0000 1.1 +++ parallel.xml 11 Feb 2003 11:26:44 -0000 1.2 @@ -21,6 +21,101 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 1.15 +190 -8 ant/src/main/org/apache/tools/ant/taskdefs/Parallel.java Index: Parallel.java =================================================================== RCS file: /home/cvs/ant/src/main/org/apache/tools/ant/taskdefs/Parallel.java,v retrieving revision 1.14 retrieving revision 1.15 diff -u -w -u -r1.14 -r1.15 --- Parallel.java 10 Feb 2003 14:13:36 -0000 1.14 +++ Parallel.java 11 Feb 2003 11:26:44 -0000 1.15 @@ -53,6 +53,7 @@ */ package org.apache.tools.ant.taskdefs; +import java.lang.reflect.Method; import java.util.Enumeration; import java.util.Vector; import org.apache.tools.ant.BuildException; @@ -61,14 +62,22 @@ import org.apache.tools.ant.TaskContainer; import org.apache.tools.ant.util.StringUtils; - - /** * Executes the contained tasks in separate threads, continuing - * once all are completed. + * once all are completed.
+ * New behavior allows for the ant script to specify a maximum number of + * threads that will be executed in parallel. One should be very careful about + * using the waitFor task when specifying threadCount + * as it can cause deadlocks if the number of threads is too small or if one of + * the nested tasks fails to execute completely. The task selection algorithm + * will insure that the tasks listed before a task have started before that + * task is started, but it will not insure a successful completion of those + * tasks or that those tasks will finish first (i.e. it's a classic race + * condition). *

* @author Thomas Christen chr@active.ch * @author Conor MacNeill + * @author Danno Ferrin * @since Ant 1.4 * * @ant.task category="control" @@ -79,6 +88,17 @@ /** Collection holding the nested tasks */ private Vector nestedTasks = new Vector(); + /** Semaphore to notify of completed threads */ + private final Object semaphore = new Object(); + + /** Total number of threads to run */ + int numThreads = 0; + + /** Total number of threads per processor to run. */ + int numThreadsPerProcessor = 0; + + /** Interval (in ms) to poll for finished threads. */ + int pollInterval = 1000; // default is once a second /** * Add a nested task to execute in parallel. @@ -89,11 +109,154 @@ } /** - * Block execution until the specified time or for a - * specified amount of milliseconds and if defined, - * execute the wait status. + * Dynamically generates the number of threads to execute based on the + * number of available processors (via + * java.lang.Runtime.availableProcessors()). Requires a J2SE + * 1.4 VM, and it will overwrite the value set in threadCount. + * If used in a 1.1, 1.2, or 1.3 VM then the task will defer to + * threadCount.; optional + * @param numThreadsPerProcessor Number of threads to create per available + * processor. + * + */ + public void setThreadsPerProcessor(int numThreadsPerProcessor) { + this.numThreadsPerProcessor = numThreadsPerProcessor; + } + + /** + * Statically determine the maximum number of tasks to execute + * simultaneously. If there are less tasks than threads then all will be + * executed at once, if there are more then only threadCount + * tasks will be executed at one time. If threadsPerProcessor + * is set and the JVM is at least a 1.4 VM then this value is ignormed.; optional + * + * @param numThreads total number of therads. + * + */ + public void setThreadCount(int numThreads) { + this.numThreads = numThreads; + } + + /** + * Interval to poll for completed threads when threadCount or + * threadsPerProcessor is specified. Integer in milliseconds.; optional + * + * @param pollInterval New value of property pollInterval. */ + public void setPollInterval(int pollInterval) { + this.pollInterval = pollInterval; + } + public void execute() throws BuildException { + updateThreadCounts(); + if (numThreads == 0) { + spinAllThreads(); + } else { + spinNumThreads(); + } + } + + public void updateThreadCounts() { + if (numThreadsPerProcessor != 0) { + int numProcessors = getNumProcessors(); + if (numProcessors != 0) { + numThreads = numProcessors * numThreadsPerProcessor; + } + } + } + + /** + * Spin up threadCount threads. + */ + public void spinNumThreads() throws BuildException { + final int maxThreads = nestedTasks.size(); + Thread[] threads = new Thread[maxThreads]; + TaskThread[] taskThreads = new TaskThread[maxThreads]; + int threadNumber = 0; + for (Enumeration e = nestedTasks.elements(); e.hasMoreElements(); + threadNumber++) { + Task nestedTask = (Task) e.nextElement(); + ThreadGroup group = new ThreadGroup("parallel"); + TaskThread taskThread = new TaskThread(threadNumber, nestedTask); + taskThreads[threadNumber] = taskThread; + threads[threadNumber] = new Thread(group, taskThread); + } + + final int maxRunning = numThreads; + Thread[] running = new Thread[maxRunning]; + threadNumber = 0; + + // now run them in limited numbers... + outer: + while (threadNumber < maxThreads) { + synchronized(semaphore) { + for (int i = 0; i < maxRunning; i++) { + if (running[i] == null || !running[i].isAlive()) { + running[i] = threads[threadNumber++]; + running[i].start(); + // countinue on outer while loop in case we used our last thread + continue outer; + } + } + // if we got here all are running, so sleep a little + try { + semaphore.wait(pollInterval); + } catch (InterruptedException ie) { + // dosen't java know interruptions are rude? + // just pretend it didn't happen and go aobut out business. + // sheesh! + } + } + } + + // now join to all the threads + for (int i = 0; i < maxRunning; ++i) { + try { + if (running[i] != null) { + running[i].join(); + } + } catch (InterruptedException ie) { + // who would interrupt me at a time like this? + } + } + + // now did any of the threads throw an exception + StringBuffer exceptionMessage = new StringBuffer(); + int numExceptions = 0; + Throwable firstException = null; + Location firstLocation = Location.UNKNOWN_LOCATION;; + for (int i = 0; i < maxThreads; ++i) { + Throwable t = taskThreads[i].getException(); + if (t != null) { + numExceptions++; + if (firstException == null) { + firstException = t; + } + if (t instanceof BuildException && + firstLocation == Location.UNKNOWN_LOCATION) { + firstLocation = ((BuildException) t).getLocation(); + } + exceptionMessage.append(StringUtils.LINE_SEP); + exceptionMessage.append(t.getMessage()); + } + } + + if (numExceptions == 1) { + if (firstException instanceof BuildException) { + throw (BuildException) firstException; + } else { + throw new BuildException(firstException); + } + } else if (numExceptions > 1) { + throw new BuildException(exceptionMessage.toString(), + firstLocation); + } + } + + /** + * Spin up one thread per task. + */ + public void spinAllThreads() throws BuildException { int numTasks = nestedTasks.size(); Thread[] threads = new Thread[numTasks]; TaskThread[] taskThreads = new TaskThread[numTasks]; @@ -154,10 +317,25 @@ } } + public int getNumProcessors() { + try { + Class[] paramTypes = {}; + Method availableProcessors = + Runtime.class.getMethod("availableProcessors", paramTypes); + + Object[] args = {}; + Integer ret = (Integer) availableProcessors.invoke(Runtime.getRuntime(), args); + return ret.intValue(); + } catch (Exception e) { + // return a bogus number + return 0; + } + } + /** * thread that execs a task */ - private static class TaskThread implements Runnable { + private class TaskThread implements Runnable { private Throwable exception; private Task task; private int taskNumber; @@ -181,6 +359,10 @@ task.perform(); } catch (Throwable t) { exception = t; + } finally { + synchronized (semaphore) { + semaphore.notifyAll(); + } } } 1.4 +13 -0 ant/src/testcases/org/apache/tools/ant/taskdefs/ParallelTest.java Index: ParallelTest.java =================================================================== RCS file: /home/cvs/ant/src/testcases/org/apache/tools/ant/taskdefs/ParallelTest.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -w -u -r1.3 -r1.4 --- ParallelTest.java 10 Feb 2003 14:14:45 -0000 1.3 +++ ParallelTest.java 11 Feb 2003 11:26:44 -0000 1.4 @@ -103,6 +103,19 @@ } + /** tests basic operation of the parallel task */ + public void testTreadCount() { + // should get no output at all + Project project = getProject(); + project.setUserProperty("test.direct", DIRECT_MESSAGE); + project.setUserProperty("test.delayed", DELAYED_MESSAGE); + expectOutputAndError("testThreadCount", "", ""); + String log = getLog(); + assertEquals("parallel tasks did't block on threads properly", log, + "+1-1+2-2+3-3+1+2-1+3-2-3+1+2+3-1-2-3+1+2+3-1-2-3"); + + } + /** tests the failure of a task within a parallel construction */ public void testFail() { // should get no output at all