commons-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nutty...@apache.org
Subject svn commit: r562653 - in /commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver: AbstractStageDriver.java DedicatedThreadStageDriver.java SynchronousStageDriver.java SynchronousStageDriverFactory.java ThreadPoolStageDriver.java
Date Sat, 04 Aug 2007 04:31:44 GMT
Author: nuttycom
Date: Fri Aug  3 21:31:35 2007
New Revision: 562653

URL: http://svn.apache.org/viewvc?view=rev&rev=562653
Log:
Refactored duplicate functionality from pooling, dedicated thread drivers into abstract base
class. Retrofitted synchronous drivers with fault tolerance.

Modified:
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriverFactory.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java?view=diff&rev=562653&r1=562652&r2=562653
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
(original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
Fri Aug  3 21:31:35 2007
@@ -37,7 +37,18 @@
     /**
      * The context for the stage being run
      */
-    protected StageContext context;
+    protected StageContext context;    
+    
+    /**
+     * The current state of processing. In most drivers, this is used for
+     * thread control.
+     */
+    protected volatile State currentState = State.STOPPED;    
+            
+    /**
+     * Enumerated value indicating the fault tolerance level of the StageDriver.
+     */
+    protected FaultTolerance faultTolerance = FaultTolerance.NONE;
     
     /**
      * List of processing failures that have occurred.
@@ -56,10 +67,21 @@
      * @param context The context in which to run the stage
      */
     public AbstractStageDriver(Stage stage, StageContext context) {
+        this(stage, context, FaultTolerance.NONE);
+    }
+    
+    /**
+     * Creates a StageDriver for the specified stage.
+     * 
+     * @param stage The stage for which the driver will be created
+     * @param context The context in which to run the stage
+     */
+    public AbstractStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance)
{
         if (stage == null) throw new IllegalArgumentException("Stage may not be null.");
         if (context == null) throw new IllegalArgumentException("Context may not be null.");
         this.stage = stage;
         this.context = context;
+        this.faultTolerance = faultTolerance;
     }
     
     /**
@@ -76,14 +98,48 @@
      * in which the driver is being run and the managed stage.
      * @return the Feeder used to feed objects to the managed stage for processing.
      */
-    public abstract Feeder getFeeder();
+    public abstract Feeder getFeeder();    
+
+    /**
+     * Return the current state of stage processing.
+     * @return the current state of processing
+     */
+    public State getState() {
+        return this.currentState;
+    }
     
     /**
-     * Returns the current state of stage processing.
-     * @return The current state
+     * Atomically tests to determine whether or not the driver is in the one of
+     * the specified states.
      */
-    public abstract State getState();
-
+    protected synchronized boolean isInState(State... states) {
+        for (State state : states) if (state == currentState) return true;
+        return false;
+    }
+    
+    /**
+     * Set the current state of stage processing and notify any listeners
+     * that may be waiting on a state change.
+     */
+    protected synchronized void setState(State nextState) {
+        this.currentState = nextState;
+        this.notifyAll();
+    }
+    
+    /**
+     * This method performs an atomic conditional state transition change
+     * to the value specified by the nextState parameter if and only if the
+     * current state is equal to the test state.
+     */
+    protected synchronized boolean testAndSetState(State testState, State nextState) {
+        if (currentState == testState) {
+            setState(nextState);
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
     /**
      * This method is used to start the driver, run the 
      * {@link Stage#preprocess() preprocess()} method of the attached stage
@@ -103,30 +159,41 @@
     public abstract void finish() throws StageException;
 
     /**
-     * Returns a list of unrecoverable errors that occurred during stage
-     * processing.
-     * @return A list of unrecoverable errors that occurred during stage processing.
+     * Sets the failure tolerance flag for the worker thread. If faultTolerance
+     * is set to CHECKED, {@link StageException StageException}s thrown by
+     * the {@link Stage#process(Object)} method will not interrupt queue
+     * processing, but will simply be logged with a severity of ERROR.
+     * If faultTolerance is set to ALL, runtime exceptions will also be
+     * logged and otherwise ignored.
+     * @param faultTolerance the flag value
      */
-    public List<Throwable> getFatalErrors() {
-        return this.errors;
+    public final void setFaultTolerance(FaultTolerance faultTolerance) {
+        this.faultTolerance = faultTolerance;
     }
     
     /**
+     * Getter for property faultTolerant.
+     * @return Value of property faultTolerant.
+     */
+    public final FaultTolerance getFaultTolerance() {
+        return this.faultTolerance;
+    }    
+    
+    /**
      * Store a fatal error.
      * @param error The error to be stored for later analysis
      */
     protected void recordFatalError(Throwable error) {
         this.errors.add(error);
     }
-    
+
     /**
-     * Returns a list of errors that occurred while processing data objects,
-     * along with the objects that were being processed when the errors
-     * were generated.
-     * @return The list of non-fatal processing errors.
+     * Returns a list of unrecoverable errors that occurred during stage
+     * processing.
+     * @return A list of unrecoverable errors that occurred during stage processing.
      */
-    public List<ProcessingException> getProcessingExceptions() {
-        return this.processingExceptions;
+    public List<Throwable> getFatalErrors() {
+        return this.errors;
     }
     
     /**
@@ -138,4 +205,14 @@
         ProcessingException ex = new ProcessingException(this.stage, error, data, this.getState());
 
         this.processingExceptions.add(ex);
     }    
+    
+    /**
+     * Returns a list of errors that occurred while processing data objects,
+     * along with the objects that were being processed when the errors
+     * were generated.
+     * @return The list of non-fatal processing errors.
+     */
+    public List<ProcessingException> getProcessingExceptions() {
+        return this.processingExceptions;
+    }
 }

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java?view=diff&rev=562653&r1=562652&r2=562653
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
(original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
Fri Aug  3 21:31:35 2007
@@ -45,17 +45,11 @@
     //poll timeout to ensure deadlock cannot occur on thread termination
     private long timeout;
     
-    //flag describing whether or not the driver is fault tolerant
-    private FaultTolerance faultTolerance = FaultTolerance.NONE;
-    
     //thread responsible for stage processing
     private Thread workerThread;
     
     //queue to hold data to be processed
-    private BlockingQueue queue;
-    
-    //current state of thread processing
-    private volatile State currentState = State.STOPPED;
+    private BlockingQueue queue;    
     
     //feeder used to feed data to this stage's queue
     private final Feeder feeder = new Feeder() {
@@ -92,10 +86,9 @@
      * ({@link Stage#release()} will be called.)
      */
     public DedicatedThreadStageDriver(Stage stage, StageContext context, BlockingQueue queue,
long timeout, FaultTolerance faultTolerance) {
-        super(stage, context);
+        super(stage, context, faultTolerance);
         this.queue = queue;
         this.timeout = timeout;
-        this.faultTolerance = faultTolerance;
     }
     
     /**
@@ -156,81 +149,6 @@
         setState(STOPPED);
     }
     
-    /**
-     * Return the current state of stage processing.
-     * @return the current state of processing
-     */
-    public StageDriver.State getState() {
-        return this.currentState;
-    }
-    
-    /**
-     * Atomically tests to determine whether or not the driver is in the one of
-     * the specified states.
-     */
-    private synchronized boolean isInState(State... states) {
-        for (State state : states) if (state == currentState) return true;
-        return false;
-    }
-    
-    /**
-     * Set the current state of stage processing and notify any listeners
-     * that may be waiting on a state change.
-     */
-    private synchronized void setState(State nextState) {
-        if (log.isDebugEnabled()) log.debug("State change for " + stage + ": " + this.currentState
+ " -> " + nextState);
-        this.currentState = nextState;
-        this.notifyAll();
-    }
-    
-    /**
-     * This method performs an atomic conditional state transition change
-     * to the value specified by the nextState parameter if and only if the
-     * current state is equal to the test state.
-     */
-    private synchronized boolean testAndSetState(State testState, State nextState) {
-        if (currentState == testState) {
-            setState(nextState);
-            return true;
-        } else {
-            return false;
-        }
-    }
-    
-    /**
-     * Sets the failure tolerance flag for the worker thread. If faultTolerance
-     * is set to CHECKED, {@link StageException StageException}s thrown by
-     * the {@link Stage#process(Object)} method will not interrupt queue
-     * processing, but will simply be logged with a severity of ERROR.
-     * If faultTolerance is set to ALL, runtime exceptions will also be
-     * logged and otherwise ignored.
-     * @param faultTolerance the flag value
-     */
-    public final void setFaultTolerance(String faultTolerance) {
-        this.faultTolerance = FaultTolerance.valueOf(faultTolerance);
-    }
-    
-    /**
-     * Sets the failure tolerance flag for the worker thread. If faultTolerance
-     * is set to CHECKED, {@link StageException StageException}s thrown by
-     * the {@link Stage#process(Object)} method will not interrupt queue
-     * processing, but will simply be logged with a severity of ERROR.
-     * If faultTolerance is set to ALL, runtime exceptions will also be
-     * logged and otherwise ignored.
-     * @param faultTolerance the flag value
-     */
-    public final void setFaultTolerance(FaultTolerance faultTolerance) {
-        this.faultTolerance = faultTolerance;
-    }
-    
-    /**
-     * Getter for property faultTolerant.
-     * @return Value of property faultTolerant.
-     */
-    public FaultTolerance getFaultTolerance() {
-        return this.faultTolerance;
-    }
-    
     /*********************************
      * WORKER THREAD IMPLEMENTATIONS *
      *********************************/
@@ -279,6 +197,7 @@
                         Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS);
                         if (obj == null) {
                             if (currentState == STOP_REQUESTED) break running;
+                            //else continue running;
                         } else {
                             try {
                                 stage.process(obj);

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java?view=diff&rev=562653&r1=562652&r2=562653
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
(original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
Fri Aug  3 21:31:35 2007
@@ -23,10 +23,11 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.pipeline.Feeder;
 import org.apache.commons.pipeline.Stage;
-import org.apache.commons.pipeline.driver.AbstractStageDriver;
 import org.apache.commons.pipeline.StageException;
 import org.apache.commons.pipeline.StageContext;
 import static org.apache.commons.pipeline.StageDriver.State.*;
+import org.apache.commons.pipeline.StageDriver.State;
+import static org.apache.commons.pipeline.driver.FaultTolerance.*;
 
 /**
  * This is a non-threaded version of the AbstractStageDriver.
@@ -34,12 +35,6 @@
 public class SynchronousStageDriver extends AbstractStageDriver {
     private final Log log = LogFactory.getLog(SynchronousStageDriver.class);
     
-    //flag describing whether or not the driver is fault tolerant
-    private boolean faultTolerant = false;
-    
-    //current state of thread processing
-    private State currentState = State.STOPPED;
-    
     //queue of objects to be processed that are fed to the driver
     //when it is not in a running state
     private Queue<Object> queue = new LinkedList<Object>();
@@ -48,18 +43,19 @@
     private final Feeder feeder = new Feeder() {
         public void feed(Object obj) {
             synchronized (SynchronousStageDriver.this) {
+                if (currentState == ERROR) throw new IllegalStateException("Unable to process
data: driver in fatal error state.");
                 if (currentState != RUNNING) { //enqueue objects if stage has not been started
-                    if (currentState == ERROR) throw new IllegalStateException("Unable to
process data: driver in fatal error state.");
                     queue.add(obj);
-                } else {
-                    try {
-                        stage.process(obj);
-                    } catch (StageException e) {
-                        recordProcessingException(obj, e);
-                        if (!faultTolerant) throw fatalError(e);
-                    }
+                    return;
                 }
             }
+            
+            try {
+                stage.process(obj);
+            } catch (StageException e) {
+                recordProcessingException(obj, e);
+                if (faultTolerance == NONE) throw fatalError(e);
+            }
         }
     };
     
@@ -68,8 +64,8 @@
      * @param stage The stage to be run
      * @param context The context in which the stage will be run
      */
-    public SynchronousStageDriver(Stage stage, StageContext context) {
-        super(stage, context);
+    public SynchronousStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance)
{
+        super(stage, context, faultTolerance);
     }
     
     /**
@@ -92,13 +88,11 @@
         if (this.currentState == STOPPED) {
             try {
                 stage.preprocess();
+                setState(RUNNING);
             } catch (StageException e) {
                 throw fatalError(e);
             }
             
-            this.currentState = RUNNING;
-            this.notifyAll();
-            
             // feed any queued values before returning control
             while (!queue.isEmpty()) this.getFeeder().feed(queue.remove());
         } else {
@@ -112,33 +106,29 @@
      * @throws org.apache.commons.pipeline.StageException Thrown if an error occurs during
postprocessing
      */
     public synchronized void finish() throws StageException {
-        if (this.currentState == RUNNING) {
+        if (this.currentState == RUNNING) {            
             try {
-                stage.postprocess();
+                testAndSetState(RUNNING, STOP_REQUESTED);
+                if (this.currentState == STOP_REQUESTED) stage.postprocess();
             } catch (StageException e) {
                 throw fatalError(e);
-            }
-            
-            stage.release();
-            
-            this.currentState = STOPPED;
-            this.notifyAll();
+            } finally {
+                stage.release();
+                testAndSetState(STOP_REQUESTED, STOPPED);
+            }            
         } else {
             throw new IllegalStateException("Driver is not running (current state: " + this.currentState
+ ")");
         }
     }
     
     /**
-     * Accessor for the current state of the stage driver
-     * @return the current driver state
+     * This method obtains a lock to set the current state of processing
+     * to error, records the error and returns a RuntimeException encapsulating
+     * the specified throwable.
      */
-    public synchronized State getState() {
-        return this.currentState;
-    }
-    
-    private synchronized RuntimeException fatalError(Throwable t) {
+    private RuntimeException fatalError(Throwable t) {
         try {
-            this.currentState = ERROR;
+            setState(ERROR);
             this.recordFatalError(t);
             stage.release();
             this.notifyAll();

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriverFactory.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriverFactory.java?view=diff&rev=562653&r1=562652&r2=562653
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriverFactory.java
(original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriverFactory.java
Fri Aug  3 21:31:35 2007
@@ -40,6 +40,27 @@
      * @return the newly created and configured driver
      */
     public StageDriver createStageDriver(Stage stage, StageContext context) {
-        return new SynchronousStageDriver(stage, context);
+        return new SynchronousStageDriver(stage, context, this.faultTolerance);
     }    
+
+    /**
+     * Holds value of property faultTolerance. Default value is {@link FaultTolerance.NONE}.
+     */
+    private FaultTolerance faultTolerance = FaultTolerance.NONE;
+
+    /**
+     * Getter for property faultTolerance.
+     * @return Value of property faultTolerance.
+     */
+    public FaultTolerance getFaultTolerance() {
+        return this.faultTolerance;
+    }
+
+    /**
+     * Setter for property faultTolerance.
+     * @param faultTolerance New value of property faultTolerance.
+     */
+    public void setFaultTolerance(FaultTolerance faultTolerance) {
+        this.faultTolerance = faultTolerance;
+    }
 }

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java?view=diff&rev=562653&r1=562652&r2=562653
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java
(original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java
Fri Aug  3 21:31:35 2007
@@ -6,16 +6,16 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
- * under the License.    
- */ 
+ * under the License.
+ */
 
 package org.apache.commons.pipeline.driver;
 
@@ -41,40 +41,37 @@
  * to process objects from an input queue.
  */
 public class ThreadPoolStageDriver extends AbstractStageDriver {
+    // logger for the class
     private final Log log = LogFactory.getLog(ThreadPoolStageDriver.class);
     
-    //wait timeout to ensure deadlock cannot occur on thread termination
+    // wait timeout to ensure deadlock cannot occur on thread termination
     private long timeout;
     
-    //flag describing whether or not the driver is fault tolerant
-    private FaultTolerance faultTolerance = FaultTolerance.NONE;
-    
-    //signal telling threads to start polling queue
-    final private CountDownLatch startSignal;
+    // signal telling threads to start polling queue
+    private final CountDownLatch startSignal;
     
-    //signal threads use to tell driver they have finished
-    final private CountDownLatch doneSignal;
+    // signal threads use to tell driver they have finished
+    private final CountDownLatch doneSignal;
     
     // number of threads polling queue
-    private int numThreads = 1;
-    
-    //queue to hold data to be processed
-    private BlockingQueue queue;
+    private final int numThreads;
     
-    //current state of thread processing
-    private volatile State currentState = State.STOPPED;
+    // queue to hold data to be processed
+    private final BlockingQueue queue;
     
     //feeder used to feed data to this stage's queue
     private final Feeder feeder = new Feeder() {
         public void feed(Object obj) {
             if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage
                     + " (" + ThreadPoolStageDriver.this.queue.remainingCapacity() + " available
slots in queue)");
+            
             try {
                 ThreadPoolStageDriver.this.queue.put(obj);
             } catch (InterruptedException e) {
                 throw new IllegalStateException("Unexpected interrupt while waiting for space
to become available for object "
                         + obj + " in queue for stage " + stage, e);
             }
+            
             synchronized(ThreadPoolStageDriver.this) {
                 ThreadPoolStageDriver.this.notifyAll();
             }
@@ -103,7 +100,7 @@
             long timeout,
             FaultTolerance faultTolerance,
             int numThreads) {
-        super(stage, context);
+        super(stage, context, faultTolerance);
         this.numThreads = numThreads;
         
         this.startSignal = new CountDownLatch(1);
@@ -111,7 +108,6 @@
         
         this.queue = queue;
         this.timeout = timeout;
-        this.faultTolerance = faultTolerance;
     }
     
     /**
@@ -146,13 +142,6 @@
             startSignal.countDown();
             
             log.debug("Worker threads for stage " + stage + " started.");
-            
-            //wait to ensure that the stage starts up correctly
-            try {
-                while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
-            } catch (InterruptedException e) {
-                throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted
while waiting for thread startup.", e);
-            }
         } else {
             throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
         }
@@ -160,7 +149,11 @@
     
     /**
      * Causes processing to shut down gracefully. Waits until all worker threads
-     * have completed.
+     * have completed. It is important that this method be called only after
+     * the completion of execution of finish() in the driver for the prior
+     * stage; parallel finish calls can cause the stage to shut down before
+     * all prior stages have finished processing.
+     *
      * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal
state for shutdown.
      */
     public synchronized void finish() throws StageException {
@@ -169,6 +162,9 @@
         }
         
         try {
+            //it may be the case that finish() is called when the driver is still in the
process
+            //of starting up, so it is necessary to wait to enter the running state before
+            //a stop can be requested
             while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
             
             //ask the worker threads to shut down
@@ -176,19 +172,27 @@
             
             if (log.isDebugEnabled()) log.debug("Waiting for worker threads to stop for stage
" + stage + ".");
             doneSignal.await();
+            if (log.isDebugEnabled()) log.debug("Worker threads for stage " + stage + " halted");
             
-            if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
-            ThreadPoolStageDriver.this.stage.postprocess();
-            if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete.");
-            
-            //do not transition into finished state if an error has occurred
+            //transition into finished state (not used internally?)
             testAndSetState(STOP_REQUESTED, FINISHED);
             
-            while ( !(this.currentState == FINISHED || this.currentState == ERROR) ) this.wait();
+            //do not run postprocessing if the driver is in an error state
+            if (this.currentState != ERROR) {
+                if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
+                this.stage.postprocess();
+                if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage +
" complete.");
+            }
+            
+            //the following lines appear to be artifacts of copy-and-paste from
+            //DedicatedThreadStageDriver.
+//            //do not transition into finished state if an error has occurred
+//            testAndSetState(STOP_REQUESTED, FINISHED);
+//
+//            while ( !(this.currentState == FINISHED || this.currentState == ERROR) ) this.wait();
             
-            log.debug("Worker threads for stage " + stage + " halted");
         } catch (StageException e) {
-            log.error("An error occurred during postprocess for stage " + stage , e);
+            log.error("An error occurred during postprocessing of stage " + stage , e);
             recordFatalError(e);
             setState(ERROR);
         } catch (InterruptedException e) {
@@ -199,82 +203,31 @@
             if (log.isDebugEnabled()) log.debug("Stage " + stage + " released.");
         }
         
-        setState(STOPPED);
+        testAndSetState(FINISHED, STOPPED);
     }
     
     /**
-     * Return the current state of stage processing.
-     * @return the current state of processing
-     */
-    public StageDriver.State getState() {
-        return this.currentState;
-    }
-    
-    /**
-     * Atomically tests to determine whether or not the driver is in the one of
-     * the specified states.
-     */
-    private synchronized boolean isInState(State... states) {
-        for (State state : states) if (state == currentState) return true;
-        return false;
-    }
-    
-    /**
-     * Set the current state of stage processing and notify any listeners
-     * that may be waiting on a state change.
-     */
-    private synchronized void setState(State nextState) {
-        if (log.isDebugEnabled()) log.debug("State change for " + stage + ": " + this.currentState
+ " -> " + nextState);
-        this.currentState = nextState;
-        this.notifyAll();
-    }
-    
-    /**
-     * This method performs an atomic conditional state transition change
-     * to the value specified by the nextState parameter if and only if the
-     * current state is equal to the test state.
-     */
-    private synchronized boolean testAndSetState(State testState, State nextState) {
-        if (currentState == testState) {
-            setState(nextState);
-            return true;
-        } else {
-            return false;
-        }
-    }
-    
-    /**
-     * Sets the failure tolerance flag for the worker thread. If faultTolerance
-     * is set to CHECKED, {@link StageException StageException}s thrown by
-     * the {@link Stage#process(Object)} method will not interrupt queue
-     * processing, but will simply be logged with a severity of ERROR.
-     * If faultTolerance is set to ALL, runtime exceptions will also be
-     * logged and otherwise ignored.
-     * @param faultTolerance the flag value
+     * Get the size of the queue used by this StageDriver.
+     * @return the queue capacity
      */
-    public final void setFaultTolerance(String faultTolerance) {
-        this.faultTolerance = FaultTolerance.valueOf(faultTolerance);
+    public int getQueueSize() {
+        return this.queue.size() + this.queue.remainingCapacity();
     }
     
     /**
-     * Sets the failure tolerance flag for the worker thread. If faultTolerance
-     * is set to CHECKED, {@link StageException StageException}s thrown by
-     * the {@link Stage#process(Object)} method will not interrupt queue
-     * processing, but will simply be logged with a severity of ERROR.
-     * If faultTolerance is set to ALL, runtime exceptions will also be
-     * logged and otherwise ignored.
-     * @param faultTolerance the flag value
+     * Get the timeout value (in milliseconds) used by this StageDriver on
+     * thread termination.
+     * @return the timeout setting in milliseconds
      */
-    public final void setFaultTolerance(FaultTolerance faultTolerance) {
-        this.faultTolerance = faultTolerance;
+    public long getTimeout() {
+        return this.timeout;
     }
     
     /**
-     * Getter for property faultTolerant.
-     * @return Value of property faultTolerant.
+     * Returns the number of threads allocated to the thread pool.
      */
-    public FaultTolerance getFaultTolerance() {
-        return this.faultTolerance;
+    public int getNumThreads() {
+        return numThreads;
     }
     
     /*********************************
@@ -304,7 +257,7 @@
         LatchWorkerThread(int threadID) {
             this.setUncaughtExceptionHandler(workerThreadExceptionHandler);
             this.threadID = threadID;
-        }        
+        }
         
         public final void run() {
             try {
@@ -337,7 +290,7 @@
                 recordFatalError(e);
                 setState(ERROR);
             } catch (InterruptedException e) {
-                log.error("Stage " + stage + " (threadID: " + threadID + ") interrupted while
waiting for barrier",e);
+                log.error("Stage " + stage + " (threadID: " + threadID + ") interrupted while
waiting for barrier", e);
                 recordFatalError(e);
                 setState(ERROR);
             } finally {
@@ -347,29 +300,5 @@
                 }
             }
         }
-    }
-    
-    /**
-     * Get the size of the queue used by this StageDriver.
-     * @return the queue capacity
-     */
-    public int getQueueSize() {
-        return this.queue.size() + this.queue.remainingCapacity();
-    }
-    
-    /**
-     * Get the timeout value (in milliseconds) used by this StageDriver on
-     * thread termination.
-     * @return the timeout setting in milliseconds
-     */
-    public long getTimeout() {
-        return this.timeout;
-    }
-    
-    /**
-     * Returns the number of threads allocated to the thread pool.
-     */
-    public int getNumThreads() {
-        return numThreads;
-    }
+    }    
 }



Mime
View raw message