Return-Path: Delivered-To: apmail-jakarta-avalon-cvs-archive@apache.org Received: (qmail 98193 invoked from network); 24 May 2002 16:32:58 -0000 Received: from unknown (HELO nagoya.betaversion.org) (192.18.49.131) by daedalus.apache.org with SMTP; 24 May 2002 16:32:58 -0000 Received: (qmail 12079 invoked by uid 97); 24 May 2002 16:32:55 -0000 Delivered-To: qmlist-jakarta-archive-avalon-cvs@jakarta.apache.org Received: (qmail 11830 invoked by uid 97); 24 May 2002 16:32:54 -0000 Mailing-List: contact avalon-cvs-help@jakarta.apache.org; run by ezmlm Precedence: bulk List-Unsubscribe: List-Subscribe: List-Help: List-Post: List-Id: "Avalon CVS List" Reply-To: "Avalon Developers List" Delivered-To: mailing list avalon-cvs@jakarta.apache.org Received: (qmail 11604 invoked by uid 97); 24 May 2002 16:32:52 -0000 X-Antivirus: nagoya (v4198 created Apr 24 2002) Date: 24 May 2002 16:32:44 -0000 Message-ID: <20020524163244.7013.qmail@icarus.apache.org> From: proyal@apache.org To: jakarta-avalon-excalibur-cvs@apache.org Subject: cvs commit: jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command TPCThreadManager.java X-Spam-Rating: daedalus.apache.org 1.6.2 0/1000/N X-Spam-Rating: daedalus.apache.org 1.6.2 0/1000/N proyal 02/05/24 09:32:44 Modified: event/src/test/org/apache/excalibur/event/command/test TPCThreadManagerTestCase.java event/src/java/org/apache/excalibur/event/command TPCThreadManager.java Log: TPCThreadManager: * Made AbstractLogEnabled to support a Logger * Removed parameters from constructor, now passed via parameterize() * Made Initializable * Modified error handling in run() to catch any runtime exceptions to keep main thread alive TPCThreadManagerTestCase * Restyled code * Modified to work with TPCThreadManager changes Revision Changes Path 1.2 +65 -29 jakarta-avalon-excalibur/event/src/test/org/apache/excalibur/event/command/test/TPCThreadManagerTestCase.java Index: TPCThreadManagerTestCase.java =================================================================== RCS file: /home/cvs/jakarta-avalon-excalibur/event/src/test/org/apache/excalibur/event/command/test/TPCThreadManagerTestCase.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- TPCThreadManagerTestCase.java 24 May 2002 14:52:45 -0000 1.1 +++ TPCThreadManagerTestCase.java 24 May 2002 16:32:44 -0000 1.2 @@ -9,7 +9,8 @@ import java.io.PrintWriter; import java.io.StringWriter; -import junit.framework.TestCase; + +import org.apache.avalon.framework.parameters.Parameters; import org.apache.excalibur.event.DefaultQueue; import org.apache.excalibur.event.EventHandler; import org.apache.excalibur.event.Queue; @@ -20,13 +21,16 @@ import org.apache.excalibur.event.command.EventPipeline; import org.apache.excalibur.event.command.TPCThreadManager; +import junit.framework.TestCase; + /** * @author Gregory Steuck */ public class TPCThreadManagerTestCase extends TestCase { - public TPCThreadManagerTestCase(String name) { - super(name); + public TPCThreadManagerTestCase( String name ) + { + super( name ); } // number of milliseconds it reasonably takes the JVM to switch threads @@ -35,6 +39,17 @@ // number of times the handler should be called private final static int MINIMAL_NUMBER_INVOCATIONS = 2; + private Parameters createParameters( int processors, int threadsPerProcessor, long sleep ) + { + final Parameters parameters = new Parameters(); + + parameters.setParameter( "processors", String.valueOf( processors ) ); + parameters.setParameter( "threads-per-processor", String.valueOf( threadsPerProcessor ) ); + parameters.setParameter( "sleep-time", String.valueOf( sleep ) ); + + return parameters; + } + /** * Checks TPCThreadManager ability to survive the situation when * it tries to schedule more tasks than it has threads. Originally @@ -48,25 +63,37 @@ { // enforces only 1 thread and no timeout which makes it // fail quickly - final TPCThreadManager threadManager = new TPCThreadManager(1, 1, 0); + final TPCThreadManager threadManager = new TPCThreadManager(); + + threadManager.parameterize( createParameters( 1, 1, 0 ) ); + threadManager.initialize(); + // an obviously syncronized component final StringBuffer result = new StringBuffer(); final StringWriter exceptionBuffer = new StringWriter(); - final PrintWriter errorOut = new PrintWriter(exceptionBuffer); - threadManager.register(new Pipeline(result, errorOut)); + final PrintWriter errorOut = new PrintWriter( exceptionBuffer ); + + threadManager.register( new Pipeline( result, errorOut ) ); + // sleeps for 1 more scheduling timeout to surely go over limit - Thread.sleep(SCHEDULING_TIMEOUT * (MINIMAL_NUMBER_INVOCATIONS + 1)); + Thread.sleep( SCHEDULING_TIMEOUT * ( MINIMAL_NUMBER_INVOCATIONS + 1 ) ); + int numberCalls = result.length(); + String msg = - "Number of calls to handler (" + numberCalls + - ") is less than the expected number of calls (" + - MINIMAL_NUMBER_INVOCATIONS + ")"; - assertTrue(msg, numberCalls >= MINIMAL_NUMBER_INVOCATIONS); + "Number of calls to handler (" + numberCalls + + ") is less than the expected number of calls (" + + MINIMAL_NUMBER_INVOCATIONS + ")"; + + assertTrue( msg, numberCalls >= MINIMAL_NUMBER_INVOCATIONS ); + errorOut.flush(); // why not? + String stackTrace = exceptionBuffer.toString(); - assertEquals("Exceptions while running the test", - "", - stackTrace); + + assertEquals( "Exceptions while running the test", + "", + stackTrace ); } private static class Pipeline implements EventPipeline, EventHandler @@ -76,17 +103,20 @@ private final StringBuffer m_result; private final PrintWriter m_errorOut; - Pipeline(StringBuffer resultAccumulator, PrintWriter errorOut) - throws SinkException + Pipeline( StringBuffer resultAccumulator, PrintWriter errorOut ) + throws SinkException { m_result = resultAccumulator; m_errorOut = errorOut; // even though TPCThreadManager currently calls event handlers // when there is nothing to do, that may change - m_queue.enqueue(new QueueElement() {}); + m_queue.enqueue( new QueueElement() + { + } ); } - public EventHandler getEventHandler() { + public EventHandler getEventHandler() + { return this; } @@ -100,23 +130,29 @@ return m_queue; } - - public void handleEvent(QueueElement element) { - handleEvents(new QueueElement[] {element}); + public void handleEvent( QueueElement element ) + { + handleEvents( new QueueElement[]{element} ); } - public void handleEvents(QueueElement[] elements) { + public void handleEvents( QueueElement[] elements ) + { // records the fact that the handler was called - m_result.append('a'); - try { + m_result.append( 'a' ); + try + { // sleeps to occupy the thread and let thread manager try to reschedule - Thread.sleep(SCHEDULING_TIMEOUT); + Thread.sleep( SCHEDULING_TIMEOUT ); // enqueues another element to be called again - m_queue.enqueue(new QueueElement() {}); - } catch (Exception e) { + m_queue.enqueue( new QueueElement() + { + } ); + } + catch( Exception e ) + { // fails the test, no exceptions are expected - e.printStackTrace(m_errorOut); - + e.printStackTrace( m_errorOut ); + } } } 1.16 +115 -48 jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java Index: TPCThreadManager.java =================================================================== RCS file: /home/cvs/jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java,v retrieving revision 1.15 retrieving revision 1.16 diff -u -r1.15 -r1.16 --- TPCThreadManager.java 24 May 2002 14:52:45 -0000 1.15 +++ TPCThreadManager.java 24 May 2002 16:32:44 -0000 1.16 @@ -15,7 +15,12 @@ import org.apache.avalon.excalibur.thread.ThreadPool; import org.apache.avalon.excalibur.thread.impl.ResourceLimitingThreadPool; import org.apache.avalon.framework.activity.Disposable; +import org.apache.avalon.framework.activity.Initializable; +import org.apache.avalon.framework.logger.AbstractLogEnabled; import org.apache.avalon.framework.logger.NullLogger; +import org.apache.avalon.framework.parameters.ParameterException; +import org.apache.avalon.framework.parameters.Parameterizable; +import org.apache.avalon.framework.parameters.Parameters; import org.apache.excalibur.event.EventHandler; import org.apache.excalibur.event.Source; import org.apache.excalibur.util.SystemUtil; @@ -23,63 +28,93 @@ /** * This is a ThreadManager that uses a certain number of threads per processor. * The number of threads in the pool is a direct proportion to the number of - * processors. + * processors. The size of the thread pool is (processors * threads-per-processor) + 1 * * @author Berin Loritsch + * @author Peter Royal */ -public final class TPCThreadManager - implements Runnable, ThreadManager, Disposable +public final class TPCThreadManager extends AbstractLogEnabled + implements Runnable, ThreadManager, Parameterizable, Initializable, Disposable { - private final ThreadPool m_threadPool; private final Mutex m_mutex = new Mutex(); private final HashMap m_pipelines = new HashMap(); + + private ThreadPool m_threadPool; private ThreadControl m_threadControl; private boolean m_done = false; - private final long m_sleepTime; - - /** - * The default constructor assumes there is a system property named "os.arch.cpus" - * that has a default for the number of CPUs on a system. Otherwise, the value - * is 1. - */ - public TPCThreadManager() - { - this( SystemUtil.numProcessors() ); - } - - /** - * Constructor provides one thread per number of processors. - */ - public TPCThreadManager( int numProcessors ) - { - this( numProcessors, 1 ); - } - - /** - * Constructor provides a specified number of threads per processor. If - * either value is less then one, then the value is rewritten as one. - */ - public TPCThreadManager( int numProcessors, int threadsPerProcessor ) - { - this( numProcessors, threadsPerProcessor, 1000 ); - } - - /** - * Constructor provides a specified number of threads per processor. If - * either value is less then one, then the value is rewritten as one. - */ - public TPCThreadManager( int numProcessors, int threadsPerProcessor, long sleepTime ) - { - int processors = Math.max( numProcessors, 1 ); - int threads = Math.max( threadsPerProcessor, 1 ); - ResourceLimitingThreadPool tpool = new ResourceLimitingThreadPool( "TPCThreadManager", - ( processors * threads ) + 1, true, true, 1000L, 10L * 1000L ); - tpool.enableLogging( new NullLogger() ); - m_threadPool = tpool; - - m_sleepTime = sleepTime; - m_threadControl = m_threadPool.execute( this ); + //Set reasonable defaults in case parameterize() is never called. + private long m_sleepTime = 1000L; + private long m_blockTimeout = 1000L; + private int m_processors = 1; + private int m_threadsPerProcessor = 1; + + private boolean m_initialized = false; + + /** + * The following parameters can be set for this class: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Name Description Default Value
processorsNumber of processors (Rewritten to 1 if less than one)System property named "os.arch.cpus", otherwise 1
threads-per-processorThreads per processor to use (Rewritten to 1 if less than one)1
sleep-timeTime (in milliseconds) to wait between queue pipeline processing runs1000
block-timeoutTime (in milliseconds) to wait for a thread to process a pipeline1000
+ */ + public void parameterize( Parameters parameters ) throws ParameterException + { + this.m_processors = + Math.max( parameters.getParameterAsInteger( "processors", SystemUtil.numProcessors() ), 1 ); + + this.m_threadsPerProcessor = Math.max( parameters.getParameterAsInteger( "threads-per-processor", 1 ), 1 ); + this.m_sleepTime = parameters.getParameterAsLong( "sleep-time", 1000L ); + this.m_blockTimeout = parameters.getParameterAsLong( "block-timeout", 1000L ); + } + + public void initialize() throws Exception + { + if( this.m_initialized ) + { + throw new IllegalStateException( "ThreadManager is already initailized" ); + } + + final ResourceLimitingThreadPool tpool = + new ResourceLimitingThreadPool( "TPCThreadManager", + ( m_processors * m_threadsPerProcessor ) + 1, + true, + true, + this.m_blockTimeout, + 10L * 1000L ); + + if( null == getLogger() ) + { + this.enableLogging( new NullLogger() ); + } + + tpool.enableLogging( getLogger() ); + + this.m_threadPool = tpool; + this.m_threadControl = this.m_threadPool.execute( this ); + this.m_initialized = true; } /** @@ -87,6 +122,11 @@ */ public void register( EventPipeline pipeline ) { + if( !this.m_initialized ) + { + throw new IllegalStateException( "ThreadManager must be initialized before registering a pipeline" ); + } + try { m_mutex.acquire(); @@ -116,6 +156,11 @@ */ public void deregister( EventPipeline pipeline ) { + if( !this.m_initialized ) + { + throw new IllegalStateException( "ThreadManager must be initialized before deregistering a pipeline" ); + } + try { m_mutex.acquire(); @@ -146,6 +191,11 @@ */ public void deregisterAll() { + if( !this.m_initialized ) + { + throw new IllegalStateException( "ThreadManager must be initialized before deregistering pipelines" ); + } + try { m_mutex.acquire(); @@ -203,6 +253,23 @@ // that it has no threads available, will still try // to go on, hopefully at one point there will be // a thread to execute our runner + + if( getLogger().isWarnEnabled() ) + { + getLogger().warn( "Unable to execute pipeline (If out of threads, " + + "increase block-timeout or number of threads per processor", e ); + } + } + catch( RuntimeException e ) + { + //We want to catch this, because if an unexpected runtime exception comes through a single + //pipeline it can bring down the primary thread + + if( getLogger().isErrorEnabled() ) + { + getLogger().error( "Exception processing EventPipeline [msg: " + e.getMessage() + "]", + e ); + } } } } -- To unsubscribe, e-mail: For additional commands, e-mail: