avalon-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pro...@apache.org
Subject cvs commit: jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command TPCThreadManager.java
Date Fri, 24 May 2002 16:32:44 GMT
proyal      02/05/24 09:32:44

  Modified:    event/src/test/org/apache/excalibur/event/command/test
                        TPCThreadManagerTestCase.java
               event/src/java/org/apache/excalibur/event/command
                        TPCThreadManager.java
  Log:
  TPCThreadManager:
   * Made AbstractLogEnabled to support a Logger
   * Removed parameters from constructor, now passed via parameterize()
   * Made Initializable
   * Modified error handling in run() to catch any runtime exceptions to keep main thread
alive
  
  TPCThreadManagerTestCase
   * Restyled code
   * Modified to work with TPCThreadManager changes
  
  Revision  Changes    Path
  1.2       +65 -29    jakarta-avalon-excalibur/event/src/test/org/apache/excalibur/event/command/test/TPCThreadManagerTestCase.java
  
  Index: TPCThreadManagerTestCase.java
  ===================================================================
  RCS file: /home/cvs/jakarta-avalon-excalibur/event/src/test/org/apache/excalibur/event/command/test/TPCThreadManagerTestCase.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- TPCThreadManagerTestCase.java	24 May 2002 14:52:45 -0000	1.1
  +++ TPCThreadManagerTestCase.java	24 May 2002 16:32:44 -0000	1.2
  @@ -9,7 +9,8 @@
   
   import java.io.PrintWriter;
   import java.io.StringWriter;
  -import junit.framework.TestCase;
  +
  +import org.apache.avalon.framework.parameters.Parameters;
   import org.apache.excalibur.event.DefaultQueue;
   import org.apache.excalibur.event.EventHandler;
   import org.apache.excalibur.event.Queue;
  @@ -20,13 +21,16 @@
   import org.apache.excalibur.event.command.EventPipeline;
   import org.apache.excalibur.event.command.TPCThreadManager;
   
  +import junit.framework.TestCase;
  +
   /**
    * @author <a href="mailto:greg-tpcthreadmanager@nest.cx">Gregory Steuck</a>
    */
   public class TPCThreadManagerTestCase extends TestCase
   {
  -    public TPCThreadManagerTestCase(String name) {
  -        super(name);
  +    public TPCThreadManagerTestCase( String name )
  +    {
  +        super( name );
       }
   
       // number of milliseconds it reasonably takes the JVM to switch threads
  @@ -35,6 +39,17 @@
       // number of times the handler should be called
       private final static int MINIMAL_NUMBER_INVOCATIONS = 2;
   
  +    private Parameters createParameters( int processors, int threadsPerProcessor, long
sleep )
  +    {
  +        final Parameters parameters = new Parameters();
  +
  +        parameters.setParameter( "processors", String.valueOf( processors ) );
  +        parameters.setParameter( "threads-per-processor", String.valueOf( threadsPerProcessor
) );
  +        parameters.setParameter( "sleep-time", String.valueOf( sleep ) );
  +
  +        return parameters;
  +    }
  +
       /**
        * Checks TPCThreadManager ability to survive the situation when
        * it tries to schedule more tasks than it has threads. Originally
  @@ -48,25 +63,37 @@
       {
           // enforces only 1 thread and no timeout which makes it
           // fail quickly
  -        final TPCThreadManager threadManager = new TPCThreadManager(1, 1, 0);
  +        final TPCThreadManager threadManager = new TPCThreadManager();
  +
  +        threadManager.parameterize( createParameters( 1, 1, 0 ) );
  +        threadManager.initialize();
  +
           // an obviously syncronized component
           final StringBuffer result = new StringBuffer();
           final StringWriter exceptionBuffer = new StringWriter();
  -        final PrintWriter errorOut = new PrintWriter(exceptionBuffer);
  -        threadManager.register(new Pipeline(result, errorOut));
  +        final PrintWriter errorOut = new PrintWriter( exceptionBuffer );
  +
  +        threadManager.register( new Pipeline( result, errorOut ) );
  +
           // sleeps for 1 more scheduling timeout to surely go over limit
  -        Thread.sleep(SCHEDULING_TIMEOUT * (MINIMAL_NUMBER_INVOCATIONS + 1));
  +        Thread.sleep( SCHEDULING_TIMEOUT * ( MINIMAL_NUMBER_INVOCATIONS + 1 ) );
  +
           int numberCalls = result.length();
  +
           String msg =
  -            "Number of calls to handler (" + numberCalls +
  -            ") is less than the expected number of calls (" +
  -            MINIMAL_NUMBER_INVOCATIONS + ")";
  -        assertTrue(msg, numberCalls >= MINIMAL_NUMBER_INVOCATIONS);
  +          "Number of calls to handler (" + numberCalls +
  +          ") is less than the expected number of calls (" +
  +          MINIMAL_NUMBER_INVOCATIONS + ")";
  +
  +        assertTrue( msg, numberCalls >= MINIMAL_NUMBER_INVOCATIONS );
  +
           errorOut.flush(); // why not?
  +
           String stackTrace = exceptionBuffer.toString();
  -        assertEquals("Exceptions while running the test",
  -                     "",
  -                     stackTrace);
  +
  +        assertEquals( "Exceptions while running the test",
  +                      "",
  +                      stackTrace );
       }
   
       private static class Pipeline implements EventPipeline, EventHandler
  @@ -76,17 +103,20 @@
           private final StringBuffer m_result;
           private final PrintWriter m_errorOut;
   
  -        Pipeline(StringBuffer resultAccumulator, PrintWriter errorOut)
  -            throws SinkException
  +        Pipeline( StringBuffer resultAccumulator, PrintWriter errorOut )
  +          throws SinkException
           {
               m_result = resultAccumulator;
               m_errorOut = errorOut;
               // even though TPCThreadManager currently calls event handlers
               // when there is nothing to do, that may change
  -            m_queue.enqueue(new QueueElement() {});
  +            m_queue.enqueue( new QueueElement()
  +            {
  +            } );
           }
   
  -        public EventHandler getEventHandler() {
  +        public EventHandler getEventHandler()
  +        {
               return this;
           }
   
  @@ -100,23 +130,29 @@
               return m_queue;
           }
   
  -
  -        public void handleEvent(QueueElement element) {
  -            handleEvents(new QueueElement[] {element});
  +        public void handleEvent( QueueElement element )
  +        {
  +            handleEvents( new QueueElement[]{element} );
           }
   
  -        public void handleEvents(QueueElement[] elements) {
  +        public void handleEvents( QueueElement[] elements )
  +        {
               // records the fact that the handler was called
  -            m_result.append('a');
  -            try {
  +            m_result.append( 'a' );
  +            try
  +            {
                   // sleeps to occupy the thread and let thread manager try to reschedule
  -                Thread.sleep(SCHEDULING_TIMEOUT);
  +                Thread.sleep( SCHEDULING_TIMEOUT );
                   // enqueues another element to be called again
  -                m_queue.enqueue(new QueueElement() {});
  -            } catch (Exception e) {
  +                m_queue.enqueue( new QueueElement()
  +                {
  +                } );
  +            }
  +            catch( Exception e )
  +            {
                   // fails the test, no exceptions are expected
  -                e.printStackTrace(m_errorOut);
  -                
  +                e.printStackTrace( m_errorOut );
  +
               }
           }
       }
  
  
  
  1.16      +115 -48   jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java
  
  Index: TPCThreadManager.java
  ===================================================================
  RCS file: /home/cvs/jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java,v
  retrieving revision 1.15
  retrieving revision 1.16
  diff -u -r1.15 -r1.16
  --- TPCThreadManager.java	24 May 2002 14:52:45 -0000	1.15
  +++ TPCThreadManager.java	24 May 2002 16:32:44 -0000	1.16
  @@ -15,7 +15,12 @@
   import org.apache.avalon.excalibur.thread.ThreadPool;
   import org.apache.avalon.excalibur.thread.impl.ResourceLimitingThreadPool;
   import org.apache.avalon.framework.activity.Disposable;
  +import org.apache.avalon.framework.activity.Initializable;
  +import org.apache.avalon.framework.logger.AbstractLogEnabled;
   import org.apache.avalon.framework.logger.NullLogger;
  +import org.apache.avalon.framework.parameters.ParameterException;
  +import org.apache.avalon.framework.parameters.Parameterizable;
  +import org.apache.avalon.framework.parameters.Parameters;
   import org.apache.excalibur.event.EventHandler;
   import org.apache.excalibur.event.Source;
   import org.apache.excalibur.util.SystemUtil;
  @@ -23,63 +28,93 @@
   /**
    * This is a ThreadManager that uses a certain number of threads per processor.
    * The number of threads in the pool is a direct proportion to the number of
  - * processors.
  + * processors. The size of the thread pool is (processors * threads-per-processor) + 1
    *
    * @author <a href="mailto:bloritsch@apache.org">Berin Loritsch</a>
  + * @author <a href="mailto:proyal@apache.org">Peter Royal</a>
    */
  -public final class TPCThreadManager
  -  implements Runnable, ThreadManager, Disposable
  +public final class TPCThreadManager extends AbstractLogEnabled
  +  implements Runnable, ThreadManager, Parameterizable, Initializable, Disposable
   {
  -    private final ThreadPool m_threadPool;
       private final Mutex m_mutex = new Mutex();
       private final HashMap m_pipelines = new HashMap();
  +
  +    private ThreadPool m_threadPool;
       private ThreadControl m_threadControl;
       private boolean m_done = false;
  -    private final long m_sleepTime;
  -
  -    /**
  -     * The default constructor assumes there is a system property named "os.arch.cpus"
  -     * that has a default for the number of CPUs on a system.  Otherwise, the value
  -     * is 1.
  -     */
  -    public TPCThreadManager()
  -    {
  -        this( SystemUtil.numProcessors() );
  -    }
  -
  -    /**
  -     * Constructor provides one thread per number of processors.
  -     */
  -    public TPCThreadManager( int numProcessors )
  -    {
  -        this( numProcessors, 1 );
  -    }
  -
  -    /**
  -     * Constructor provides a specified number of threads per processor.  If
  -     * either value is less then one, then the value is rewritten as one.
  -     */
  -    public TPCThreadManager( int numProcessors, int threadsPerProcessor )
  -    {
  -        this( numProcessors, threadsPerProcessor, 1000 );
  -    }
  -
  -    /**
  -     * Constructor provides a specified number of threads per processor.  If
  -     * either value is less then one, then the value is rewritten as one.
  -     */
  -    public TPCThreadManager( int numProcessors, int threadsPerProcessor, long sleepTime
)
  -    {
  -        int processors = Math.max( numProcessors, 1 );
  -        int threads = Math.max( threadsPerProcessor, 1 );
   
  -        ResourceLimitingThreadPool tpool = new ResourceLimitingThreadPool( "TPCThreadManager",
  -                                                                           ( processors
* threads ) + 1, true, true, 1000L, 10L * 1000L );
  -        tpool.enableLogging( new NullLogger() );
  -        m_threadPool = tpool;
  -
  -        m_sleepTime = sleepTime;
  -        m_threadControl = m_threadPool.execute( this );
  +    //Set reasonable defaults in case parameterize() is never called.
  +    private long m_sleepTime = 1000L;
  +    private long m_blockTimeout = 1000L;
  +    private int m_processors = 1;
  +    private int m_threadsPerProcessor = 1;
  +
  +    private boolean m_initialized = false;
  +
  +    /**
  +     * The following parameters can be set for this class:
  +     *
  +     * <table>
  +     *   <tr>
  +     *     <th>Name</th> <th>Description</td> <th>Default
Value</th>
  +     *   </tr>
  +     *   <tr>
  +     *     <td>processors</td>
  +     *     <td>Number of processors (Rewritten to 1 if less than one)</td>
  +     *     <td>System property named "os.arch.cpus", otherwise 1</td>
  +     *   </tr>
  +     *   <tr>
  +     *     <td>threads-per-processor</td>
  +     *     <td>Threads per processor to use (Rewritten to 1 if less than one)</td>
  +     *     <td>1</td>
  +     *   </tr>
  +     *   <tr>
  +     *     <td>sleep-time</td>
  +     *     <td>Time (in milliseconds) to wait between queue pipeline processing runs</td>
  +     *     <td>1000</td>
  +     *   </tr>
  +     *   <tr>
  +     *     <td>block-timeout</td>
  +     *     <td>Time (in milliseconds) to wait for a thread to process a pipeline</td>
  +     *     <td>1000</td>
  +     *   </tr>
  +     * </table>
  +     */
  +    public void parameterize( Parameters parameters ) throws ParameterException
  +    {
  +        this.m_processors =
  +          Math.max( parameters.getParameterAsInteger( "processors", SystemUtil.numProcessors()
), 1 );
  +
  +        this.m_threadsPerProcessor = Math.max( parameters.getParameterAsInteger( "threads-per-processor",
1 ), 1 );
  +        this.m_sleepTime = parameters.getParameterAsLong( "sleep-time", 1000L );
  +        this.m_blockTimeout = parameters.getParameterAsLong( "block-timeout", 1000L );
  +    }
  +
  +    public void initialize() throws Exception
  +    {
  +        if( this.m_initialized )
  +        {
  +            throw new IllegalStateException( "ThreadManager is already initailized" );
  +        }
  +
  +        final ResourceLimitingThreadPool tpool =
  +          new ResourceLimitingThreadPool( "TPCThreadManager",
  +                                          ( m_processors * m_threadsPerProcessor ) + 1,
  +                                          true,
  +                                          true,
  +                                          this.m_blockTimeout,
  +                                          10L * 1000L );
  +
  +        if( null == getLogger() )
  +        {
  +            this.enableLogging( new NullLogger() );
  +        }
  +
  +        tpool.enableLogging( getLogger() );
  +
  +        this.m_threadPool = tpool;
  +        this.m_threadControl = this.m_threadPool.execute( this );
  +        this.m_initialized = true;
       }
   
       /**
  @@ -87,6 +122,11 @@
        */
       public void register( EventPipeline pipeline )
       {
  +        if( !this.m_initialized )
  +        {
  +            throw new IllegalStateException( "ThreadManager must be initialized before
registering a pipeline" );
  +        }
  +
           try
           {
               m_mutex.acquire();
  @@ -116,6 +156,11 @@
        */
       public void deregister( EventPipeline pipeline )
       {
  +        if( !this.m_initialized )
  +        {
  +            throw new IllegalStateException( "ThreadManager must be initialized before
deregistering a pipeline" );
  +        }
  +
           try
           {
               m_mutex.acquire();
  @@ -146,6 +191,11 @@
        */
       public void deregisterAll()
       {
  +        if( !this.m_initialized )
  +        {
  +            throw new IllegalStateException( "ThreadManager must be initialized before
deregistering pipelines" );
  +        }
  +
           try
           {
               m_mutex.acquire();
  @@ -203,6 +253,23 @@
                               // that it has no threads available, will still try
                               // to go on, hopefully at one point there will be
                               // a thread to execute our runner
  +
  +                            if( getLogger().isWarnEnabled() )
  +                            {
  +                                getLogger().warn( "Unable to execute pipeline (If out of
threads, "
  +                                                  + "increase block-timeout or number of
threads per processor", e );
  +                            }
  +                        }
  +                        catch( RuntimeException e )
  +                        {
  +                            //We want to catch this, because if an unexpected runtime exception
comes through a single
  +                            //pipeline it can bring down the primary thread
  +
  +                            if( getLogger().isErrorEnabled() )
  +                            {
  +                                getLogger().error( "Exception processing EventPipeline
[msg: " + e.getMessage() + "]",
  +                                                   e );
  +                            }
                           }
                       }
                   }
  
  
  

--
To unsubscribe, e-mail:   <mailto:avalon-cvs-unsubscribe@jakarta.apache.org>
For additional commands, e-mail: <mailto:avalon-cvs-help@jakarta.apache.org>


Mime
View raw message