commons-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nutty...@apache.org
Subject svn commit: r441279 - in /jakarta/commons/sandbox/pipeline/trunk/src: main/java/org/apache/commons/pipeline/ main/java/org/apache/commons/pipeline/driver/ test/java/org/apache/commons/pipeline/driver/
Date Thu, 07 Sep 2006 22:39:57 GMT
Author: nuttycom
Date: Thu Sep  7 15:39:56 2006
New Revision: 441279

URL: http://svn.apache.org/viewvc?view=rev&rev=441279
Log:
Refactored StageDriver to make it an interface; this will allow for
StageDrivers to be created that decorate other drivers with alternative
feeder behavior.

Added:
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/ProcessingException.java
  (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
  (with props)
Modified:
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriver.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SynchronousStageDriverTest.java

Added: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/ProcessingException.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/ProcessingException.java?view=auto&rev=441279
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/ProcessingException.java
(added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/ProcessingException.java
Thu Sep  7 15:39:56 2006
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline;
+
+import org.apache.commons.pipeline.StageDriver.State;
+
+/**
+ * This exception class is used to store detailed information about
+ * a failure in the processing step of a stage including the failing data, 
+ * the driver state at the time of failure, and any exceptions encountered.
+ */
+public class ProcessingException extends StageException {
+    private final Object data;
+    private final State driverState;
+    
+    /**
+     * Creates a new instance of ProcessingException
+     * 
+     * @param data The object which was not able to be processed.
+     * @param throwable The exception that occurred.
+     */
+    public ProcessingException(Stage stage, Throwable cause, Object data, State driverState)
{
+        super(stage, cause);
+        this.data = data;
+        this.driverState = driverState;
+    }            
+    
+    /** Returns the data
+     *@return The object which was not able to be processed.
+     */
+    public Object getData(){
+        return this.data;
+    }    
+    
+    public State getDriverState() {
+        return this.driverState;
+    }
+}
\ No newline at end of file

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/ProcessingException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriver.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriver.java?view=diff&rev=441279&r1=441278&r2=441279
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriver.java
(original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriver.java
Thu Sep  7 15:39:56 2006
@@ -16,194 +16,97 @@
 
 package org.apache.commons.pipeline;
 
-import java.util.ArrayList;
 import java.util.List;
 
 /**
  * This interface is used to define how processing for a stage is started,
  * stopped, and run. StageDriver implementations may run stages in one or
- * more threads, and use the {@link StageMonitor} interface to provide communication
- * between the stage, the driver, and the enclosing pipeline.
+ * more threads, and use the {@link StageContext} interface to provide
+ * communication between the stage and the context it is run, usually
+ * a pipeline.
+ *
+ *
  */
-public abstract class StageDriver {
+public interface StageDriver {
     /**
      * This enumeration represents possible states of the a stage driver during
      * processing.
      */
-    public enum State { 
+    public enum State {
+        /** Resources have been released and all stage activity has stopped, or
+         * the stage has never been started. This is the default state. */
+        STOPPED,
         /** The stage driver has started and the preprocess() method is being run. */
-        STARTED, 
+        STARTED,
         /** Preprocessing is complete and objects are being processed.*/
-        RUNNING, 
-        /** A stop has been requested - the stage will finish processing, 
+        RUNNING,
+        /** A stop has been requested - the stage will finish processing,
          * then postprocess and shut down. */
-        STOP_REQUESTED, 
+        STOP_REQUESTED,
         /** Postprocessing tasks are complete; the stage is shutting down. */
-        FINISHED, 
-        /** Resources have been released and all stage activity has stopped. 
-         * This is the default state. */
-        STOPPED, 
-        /** A fatal error has occurred that has caused the driver to stop in an 
-         * inconsistent state. The driver cannot be restarted from the error state. 
+        FINISHED,
+        /** A fatal error has occurred that has caused the driver to stop in an
+         * inconsistent state. The driver cannot be restarted from the error state.
          * The error(s) can be obtained using the getFatalErrors method. */
-        ERROR 
-            }
-    
-    /**
-     * The stage to run.
-     */
-    protected Stage stage;
+        ERROR
+    }
     
     /**
-     * The context for the stage being run
+     * This method is used to start the driver, run the
+     * {@link Stage#preprocess() preprocess()} method of the attached stage
+     * and to then begin processing any objects fed to this driver's Feeder.
+     *
+     * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during
stage startup. In most cases, such errors
+     * will be handled internally by the driver.
      */
-    protected StageContext context;
+    public void start() throws StageException;
     
     /**
-     * List of processing failures that have occurred.
-     */
-    protected List<ProcessingFailure> processingFailures = new ArrayList<ProcessingFailure>();
-
-    /**
-     * List of errors that have occurred.
+     * This method waits for the stage(s) queue(s) to empty and any processor thread(s) to
exit
+     * cleanly and then calls release() to release any resources acquired during processing,
if possible.
+     *
+     * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during
driver shutdown. Ordinarily such
+     * exceptions will be handled internally.
      */
-    protected List<Throwable> errors = new ArrayList<Throwable>();
+    public void finish() throws StageException;
     
     /**
-     * Creates a StageDriver for the specified stage.
-     * @param stage The stage for which the driver will be created
-     * @param context The context in which to run the stage
+     * This method is used to provide a communication channel between the context
+     * in which the driver is being run and the managed stage.
+     *
+     * @return the Feeder used to feed objects to the managed stage for processing.
      */
-    public StageDriver(Stage stage, StageContext context) {
-        if (stage == null) throw new IllegalArgumentException("Stage may not be null.");
-        if (context == null) throw new IllegalArgumentException("Context may not be null.");
-        this.stage = stage;
-        this.context = context;
-        }
+    public Feeder getFeeder();
     
     /**
      * Returns the Stage being run by this StageDriver.
+     *
      * @return The stage being run by this StageDriver instance
      */
-    public Stage getStage() {
-        return this.stage;
-    }    
-    
-    /**
-     * This method is used to provide a communication channel between the context 
-     * in which the driver is being run and the managed stage.
-     * @return the Feeder used to feed objects to the managed stage for processing.
-     */
-    public abstract Feeder getFeeder();
+    public Stage getStage();
     
     /**
      * Returns the current state of stage processing.
-     * @return The current state
-     */
-    public abstract State getState();
-
-    /**
-     * This method is used to start the driver, run the 
-     * {@link Stage#preprocess() preprocess()} method of the attached stage
-     * and to then begin processing any objects fed to this driver's Feeder.
      *
-     * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during
stage startup. In most cases, such errors
-     * will be handled internally by the driver.
+     * @return The current state
      */
-    public abstract void start() throws StageException;
+    public State getState();
     
     /**
-     * This method waits for the stage(s) queue(s) to empty and any processor thread(s) to
exit
-     * cleanly and then calls release() to release any resources acquired during processing,
if possible.
-     * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during
driver shutdown. Ordinarily such 
-     * exceptions will be handled internally.
-     */
-    public abstract void finish() throws StageException;
-
-    /**
      * Returns a list of unrecoverable errors that occurred during stage
      * processing.
+     *
      * @return A list of unrecoverable errors that occurred during stage processing.
      */
-    public List<Throwable> getFatalErrors() {
-        return this.errors;
-            }
-    
-    /**
-     * Store a fatal error.
-     * @param error The error to be stored for later analysis
-     */
-    protected void recordFatalError(Throwable error) {
-        this.errors.add(error);
-        }
+    public List<Throwable> getFatalErrors();
     
     /**
      * Returns a list of errors that occurred while processing data objects,
      * along with the objects that were being processed when the errors
      * were generated.
+     *
      * @return The list of non-fatal processing errors.
      */
-    public List<ProcessingFailure> getProcessingFailures() {
-        return this.processingFailures;
-    }
-    
-    /**
-     * Store processing failure information for the specified data object.
-     * @param data The data being processed at the time of the error
-     * @param error The error encountered
-     */
-    protected void recordProcessingFailure(Object data, Throwable error) {
-        this.processingFailures.add(new ProcessingFailure(data, error));
-    }
-    
-    /**
-     * FailedProcess objects are used to store detailed information about
-     * processing failures including the failing data, the driver state
-     * at the time of failure
-     */
-    public class ProcessingFailure {
-        private Object data;
-        private Throwable throwable;
-        private State driverState;
-        
-        /** Creates a new instance of FailedProcess
-         *@param data The object which was not able to be processed.
-         *@param throwable The exception that occurred.
-         */
-        protected ProcessingFailure(Object data, Throwable throwable){
-            this.data = data;
-            this.throwable = throwable;
-            this.driverState = StageDriver.this.getState();
-        }
-        
-        /** Returns the data
-         *@return The object which was not able to be processed.
-     */
-        public Object getData(){
-            return this.data;
-        }
-        
-        /** Returns the exception
-         *@return The throwable
-         */
-        public Throwable getThrowable(){
-            return this.throwable;
-        }
-        
-        /** Returns the stage which threw the exception.
-         *@return Stage
-         */
-        public Stage getStage() {
-            return StageDriver.this.getStage();
-        }
-        
-        /**
-         * This method is used to determine what stage driver handled a particular error.
-         * @return A reference to the stage driver that encountered the error.
-         */
-        public StageDriver getStageDriver() {
-            return StageDriver.this;
-        }
-    }
+    public List<ProcessingException> getProcessingExceptions();
     
 }

Added: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java?view=auto&rev=441279
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
(added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
Thu Sep  7 15:39:56 2006
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.pipeline.*;
+
+/**
+ * This interface is used to define how processing for a stage is started,
+ * stopped, and run. AbstractStageDriver implementations may run stages in one or
+ * more threads, and use the {@link StageMonitor} interface to provide communication
+ * between the stage, the driver, and the enclosing pipeline.
+ */
+public abstract class AbstractStageDriver implements StageDriver {
+    
+    /**
+     * The stage to run.
+     */
+    protected Stage stage;
+    
+    /**
+     * The context for the stage being run
+     */
+    protected StageContext context;
+    
+    /**
+     * List of processing failures that have occurred.
+     */
+    protected List<ProcessingException> processingExceptions = new ArrayList<ProcessingException>();
+
+    /**
+     * List of errors that have occurred.
+     */
+    protected List<Throwable> errors = new ArrayList<Throwable>();
+    
+    /**
+     * Creates a StageDriver for the specified stage.
+     * 
+     * @param stage The stage for which the driver will be created
+     * @param context The context in which to run the stage
+     */
+    public AbstractStageDriver(Stage stage, StageContext context) {
+        if (stage == null) throw new IllegalArgumentException("Stage may not be null.");
+        if (context == null) throw new IllegalArgumentException("Context may not be null.");
+        this.stage = stage;
+        this.context = context;
+    }
+    
+    /**
+     * Returns the Stage being run by this StageDriver.
+     * 
+     * @return The stage being run by this StageDriver instance
+     */
+    public Stage getStage() {
+        return this.stage;
+    }
+    
+    /**
+     * This method is used to provide a communication channel between the context 
+     * in which the driver is being run and the managed stage.
+     * @return the Feeder used to feed objects to the managed stage for processing.
+     */
+    public abstract Feeder getFeeder();
+    
+    /**
+     * Returns the current state of stage processing.
+     * @return The current state
+     */
+    public abstract State getState();
+
+    /**
+     * This method is used to start the driver, run the 
+     * {@link Stage#preprocess() preprocess()} method of the attached stage
+     * and to then begin processing any objects fed to this driver's Feeder.
+     *
+     * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during
stage startup. In most cases, such errors
+     * will be handled internally by the driver.
+     */
+    public abstract void start() throws StageException;
+    
+    /**
+     * This method waits for the stage(s) queue(s) to empty and any processor thread(s) to
exit
+     * cleanly and then calls release() to release any resources acquired during processing,
if possible.
+     * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during
driver shutdown. Ordinarily such 
+     * exceptions will be handled internally.
+     */
+    public abstract void finish() throws StageException;
+
+    /**
+     * Returns a list of unrecoverable errors that occurred during stage
+     * processing.
+     * @return A list of unrecoverable errors that occurred during stage processing.
+     */
+    public List<Throwable> getFatalErrors() {
+        return this.errors;
+    }
+    
+    /**
+     * Store a fatal error.
+     * @param error The error to be stored for later analysis
+     */
+    protected void recordFatalError(Throwable error) {
+        this.errors.add(error);
+    }
+    
+    /**
+     * Returns a list of errors that occurred while processing data objects,
+     * along with the objects that were being processed when the errors
+     * were generated.
+     * @return The list of non-fatal processing errors.
+     */
+    public List<ProcessingException> getProcessingExceptions() {
+        return this.processingExceptions;
+    }
+    
+    /**
+     * Store processing failure information for the specified data object.
+     * @param data The data being processed at the time of the error
+     * @param error The error encountered
+     */
+    protected void recordProcessingException(Object data, Throwable error) {
+        ProcessingException ex = new ProcessingException(this.stage, error, data, this.getState());
 
+        this.processingExceptions.add(ex);
+    }    
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java?view=diff&rev=441279&r1=441278&r2=441279
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
(original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
Thu Sep  7 15:39:56 2006
@@ -21,20 +21,21 @@
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.driver.AbstractStageDriver;
 import org.apache.commons.pipeline.Feeder;
 import org.apache.commons.pipeline.StageDriver;
 import org.apache.commons.pipeline.Stage;
 import org.apache.commons.pipeline.StageContext;
 import org.apache.commons.pipeline.StageException;
-import org.apache.commons.pipeline.StageDriver.State;
 import static org.apache.commons.pipeline.StageDriver.State.*;
+import org.apache.commons.pipeline.StageDriver.State;
 import static org.apache.commons.pipeline.driver.FaultTolerance.*;
 
 /**
- * This is a very simple implementation of a StageDriver which spawns
+ * This is a very simple implementation of a AbstractStageDriver which spawns
  * a single thread to process a stage.
  */
-public class DedicatedThreadStageDriver extends StageDriver {
+public class DedicatedThreadStageDriver extends AbstractStageDriver {
     private final Log log = LogFactory.getLog(DedicatedThreadStageDriver.class);
     
     //wait timeout to ensure deadlock cannot occur on thread termination
@@ -52,6 +53,24 @@
     //current state of thread processing
     private volatile State currentState = State.STOPPED;
     
+    //feeder used to feed data to this stage's queue
+    private final Feeder feeder = new Feeder() {
+        public void feed(Object obj) {
+            if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage
+                    + " (" + DedicatedThreadStageDriver.this.queue.remainingCapacity() +
" available slots in queue)");
+            try {
+                DedicatedThreadStageDriver.this.queue.put(obj);
+            } catch (InterruptedException e) {
+                throw new IllegalStateException("Unexpected interrupt while waiting for space
to become available for object "
+                        + obj + " in queue for stage " + stage, e);
+            }
+            
+            synchronized(DedicatedThreadStageDriver.this) {
+                DedicatedThreadStageDriver.this.notifyAll();
+            }
+        }
+    };
+    
     /**
      * Creates a new DedicatedThreadStageDriver with the specified thread wait
      * timeout and fault tolerance values.
@@ -62,7 +81,7 @@
      * @param timeout The amount of time, in milliseconds, that the worker thread
      * will wait before checking the processing state if no objects are available
      * in the thread's queue.
-     * @param faultTolerant Flag determining the behavior of the driver when
+     * @param faultTolerance Flag determining the behavior of the driver when
      * an error is encountered in execution of {@link Stage#process(Object)}.
      * If this is set to false, any exception thrown during {@link Stage#process(Object)}
      * will cause the worker thread to halt without executing {@link Stage#postprocess()}
@@ -80,15 +99,7 @@
      * @return The feeder for objects processed by this driver's stage.
      */
     public Feeder getFeeder() {
-        return new Feeder() {
-            public void feed(Object obj) {
-                if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage);
-                DedicatedThreadStageDriver.this.queue.add(obj);
-                synchronized(DedicatedThreadStageDriver.this) {
-                    DedicatedThreadStageDriver.this.notifyAll();
-                }
-            }
-        };
+        return this.feeder;
     }
     
     /**
@@ -106,7 +117,7 @@
             try {
                 while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
             } catch (InterruptedException e) {
-                throw new StageException("Worker thread unexpectedly interrupted while waiting
for thread startup.", e);
+                throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted
while waiting for thread startup.", e);
             }
         } else {
             throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
@@ -135,7 +146,7 @@
             log.debug("Worker thread for stage " + stage + " halted");
             
         } catch (InterruptedException e) {
-            throw new StageException("Worker thread unexpectedly interrupted while waiting
for graceful shutdown.", e);
+            throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted
while waiting for graceful shutdown.", e);
         }
         
         setState(STOPPED);
@@ -268,11 +279,11 @@
                             try {
                                 stage.process(obj);
                             } catch (StageException e) {
-                                recordProcessingFailure(obj, e);
+                                recordProcessingException(obj, e);
                                 if (faultTolerance == NONE) throw e;
                             } catch (RuntimeException e) {
-                                recordProcessingFailure(obj, e);
-                                if (faultTolerance == CHECKED || faultTolerance == NONE)
throw e;                                        
+                                recordProcessingException(obj, e);
+                                if (faultTolerance == CHECKED || faultTolerance == NONE)
throw e;
                             }
                         }
                     } catch (InterruptedException e) {
@@ -298,5 +309,22 @@
             //do not transition into finished state if an error has occurred
             testAndSetState(STOP_REQUESTED, FINISHED);
         }
+    }
+    
+    /**
+     * Get the size of the queue used by this StageDriver.
+     * @return the queue capacity
+     */
+    public int getQueueSize() {
+        return this.queue.size() + this.queue.remainingCapacity();
+    }
+    
+    /**
+     * Get the timeout value (in milliseconds) used by this StageDriver on
+     * thread termination.
+     * @return the timeout setting in milliseconds
+     */
+    public long getTimeout() {
+        return this.timeout;
     }
 }

Modified: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java?view=diff&rev=441279&r1=441278&r2=441279
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
(original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
Thu Sep  7 15:39:56 2006
@@ -22,16 +22,15 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.pipeline.Feeder;
 import org.apache.commons.pipeline.Stage;
-import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.driver.AbstractStageDriver;
 import org.apache.commons.pipeline.StageException;
 import org.apache.commons.pipeline.StageContext;
-import org.apache.commons.pipeline.StageDriver.State;
 import static org.apache.commons.pipeline.StageDriver.State.*;
 
 /**
- * This is a non-threaded version of the StageDriver.
+ * This is a non-threaded version of the AbstractStageDriver.
  */
-public class SynchronousStageDriver extends StageDriver {
+public class SynchronousStageDriver extends AbstractStageDriver {
     private final Log log = LogFactory.getLog(SynchronousStageDriver.class);
     
     //flag describing whether or not the driver is fault tolerant
@@ -44,6 +43,25 @@
     //when it is not in a running state
     private Queue<Object> queue = new LinkedList<Object>();
     
+    //Feeder used to feed objects to this stage
+    private final Feeder feeder = new Feeder() {
+        public void feed(Object obj) {
+            synchronized (SynchronousStageDriver.this) {
+                if (currentState != RUNNING) { //enqueue objects if stage has not been started
+                    if (currentState == ERROR) throw new IllegalStateException("Unable to
process data: driver in fatal error state.");
+                    queue.add(obj);
+                } else {
+                    try {
+                        stage.process(obj);
+                    } catch (StageException e) {
+                        recordProcessingException(obj, e);
+                        if (!faultTolerant) throw fatalError(e);
+                    }
+                }
+            }
+        }
+    };
+    
     /**
      * Creates a new instance of SimpleStageDriver
      * @param stage The stage to be run
@@ -61,23 +79,7 @@
      * @return The Feeder instance for the stage.
      */
     public Feeder getFeeder() {
-        return new Feeder() {
-            public void feed(Object obj) {
-                synchronized (SynchronousStageDriver.this) {
-                    if (currentState != RUNNING) { //enqueue objects if stage has not been
started
-                        if (currentState == ERROR) throw new IllegalStateException("Unable
to process data: driver in fatal error state.");
-                        queue.add(obj);
-                    } else {                        
-                        try {
-                            stage.process(obj);
-                        } catch (StageException e) {
-                            recordProcessingFailure(obj, e);
-                            if (!faultTolerant) throw fatalError(e);
-                        }
-                    }
-                }
-            }
-        };
+        return this.feeder;
     }
     
     /**
@@ -95,7 +97,7 @@
             
             this.currentState = RUNNING;
             this.notifyAll();
-
+            
             // feed any queued values before returning control
             while (!queue.isEmpty()) this.getFeeder().feed(queue.remove());
         } else {

Modified: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java?view=diff&rev=441279&r1=441278&r2=441279
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java
(original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java
Thu Sep  7 15:39:56 2006
@@ -17,8 +17,8 @@
 package org.apache.commons.pipeline.driver;
 
 import junit.framework.TestCase;
-import org.apache.commons.pipeline.testFramework.FaultingTestStage;
 import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.testFramework.FaultingTestStage;
 import org.apache.commons.pipeline.StageDriverFactory;
 import org.apache.commons.pipeline.testFramework.TestFeeder;
 import org.apache.commons.pipeline.testFramework.TestStage;
@@ -139,7 +139,7 @@
         
         test.assertEquals("Incorrect processed object count from stage 0.", 3, stage0.processedObjects.size());
         test.assertEquals("Incorrect processed object count from stage 1.", 2, stage1.processedObjects.size());
-        test.assertEquals("Incorrect number of processing failures recorded by driver 2.",
1, d1.getProcessingFailures().size());
+        test.assertEquals("Incorrect number of processing failures recorded by driver 2.",
1, d1.getProcessingExceptions().size());
         test.assertEquals("Incorrect processed object count from stage 2.", 2, stage2.processedObjects.size());
         
         test.assertEquals("Incorrect final processed object count.", 2, terminalFeeder.receivedValues.size());

Modified: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SynchronousStageDriverTest.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SynchronousStageDriverTest.java?view=diff&rev=441279&r1=441278&r2=441279
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SynchronousStageDriverTest.java
(original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SynchronousStageDriverTest.java
Thu Sep  7 15:39:56 2006
@@ -16,13 +16,11 @@
 
 package org.apache.commons.pipeline.driver;
 
-import junit.framework.*;
+import junit.framework.Test;
+import junit.framework.TestSuite;
 import org.apache.commons.pipeline.Feeder;
 import org.apache.commons.pipeline.StageDriver.State;
-import static org.apache.commons.pipeline.StageDriver.State.*;
-import org.apache.commons.pipeline.testFramework.TestFeeder;
-import org.apache.commons.pipeline.testFramework.TestStage;
-import org.apache.commons.pipeline.testFramework.TestStageContext;
+
 
 /**
  *
@@ -32,7 +30,7 @@
     
     public SynchronousStageDriverTest(String testName) {
         super(testName);
-    }    
+    }
     
     public static Test suite() {
         TestSuite suite = new TestSuite(SynchronousStageDriverTest.class);
@@ -47,12 +45,12 @@
         SynchronousStageDriver instance = new SynchronousStageDriver(stage, context);
         
         Feeder feeder = instance.getFeeder();
-        assertNotNull(feeder);        
+        assertNotNull(feeder);
     }
     
     /**
      * Due to the design of the SynchronousStageDriver, it is meaningless
-     * to independently test the start or finish methods; however, testing 
+     * to independently test the start or finish methods; however, testing
      * both together is meaningful. This test also provides verification of
      * proper behavior of the getState() method.
      */
@@ -69,14 +67,14 @@
         
         assertEquals(instance.getState(), State.STOPPED);
     }
-        
+    
     /*********************
      * INTEGRATION TESTS *
      *********************/
     
-    public void testSingleStage() throws Exception {       
+    public void testSingleStage() throws Exception {
         StageDriverTestUtils.testSingleStage(this, new SynchronousStageDriverFactory());
-    }    
+    }
     
     public void testMultiStage() throws Exception {
         StageDriverTestUtils.testMultiStage(this, new SynchronousStageDriverFactory());



---------------------------------------------------------------------
To unsubscribe, e-mail: commons-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: commons-dev-help@jakarta.apache.org


Mime
View raw message