commons-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nutty...@apache.org
Subject svn commit: r436914 [1/4] - in /jakarta/commons/sandbox/pipeline/trunk/src: main/java/org/apache/commons/pipeline/ main/java/org/apache/commons/pipeline/config/ main/java/org/apache/commons/pipeline/driver/ main/java/org/apache/commons/pipeline/event/ ...
Date Fri, 25 Aug 2006 19:30:48 GMT
Author: nuttycom
Date: Fri Aug 25 12:30:39 2006
New Revision: 436914

URL: http://svn.apache.org/viewvc?rev=436914&view=rev
Log:
Changed to use Maven 2 for build; added test framework, documentation; refactored drivers.

Added:
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Feeder.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageContext.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriverFactory.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriverFactory.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/FaultTolerance.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriverFactory.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/package.html
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/ObjectProcessedEvent.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/PipelineShutdownRequest.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/package.html
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/listener/
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/listener/ObjectProcessedEventCounter.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/listener/package.html
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/server/
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/server/PipelineContext.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/server/PipelineService.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/server/package.html
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/BaseStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/InvokeMethodStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/InvokeStaticMethodStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/PipelineShutdownStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseEventStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestStageContext.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/package.html
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/Branches.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/ConsumedTypes.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/PipelineValidator.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/ProducedTypes.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/ProducesConsumed.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/ProductionOnBranch.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/SimplePipelineValidator.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/ValidationException.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/ValidationFailure.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/ValidationUtils.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/package.html
    jakarta/commons/sandbox/pipeline/trunk/src/site/
    jakarta/commons/sandbox/pipeline/trunk/src/site/apt/
    jakarta/commons/sandbox/pipeline/trunk/src/site/apt/config_digester.apt
    jakarta/commons/sandbox/pipeline/trunk/src/site/apt/index.apt
    jakarta/commons/sandbox/pipeline/trunk/src/site/fml/
    jakarta/commons/sandbox/pipeline/trunk/src/site/fml/faq.fml
    jakarta/commons/sandbox/pipeline/trunk/src/site/site.xml
    jakarta/commons/sandbox/pipeline/trunk/src/test/filters/
    jakarta/commons/sandbox/pipeline/trunk/src/test/filters/test.properties
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/AbstractLoggingTestCase.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/AbstractStageDriverTest.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriverTest.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SynchronousStageDriverTest.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AbstractStageTest.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeMethodStageTest.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeStaticMethodStageTest.java
Removed:
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/BaseStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageMonitor.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageMonitor.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SimpleStageDriver.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SingleThreadStageDriver.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SimpleStageDriverTest.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SingleThreadStageDriverTest.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStageTest.java
Modified:
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/PipelineFactory.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Stage.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriver.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageEventListener.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageException.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/PipelineRuleSet.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FileFinderStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/HttpFileDownloadStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/LogStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/config/DigesterPipelineFactoryTest.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/URLToInputStreamStageTest.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/resources/TestResources.properties
    jakarta/commons/sandbox/pipeline/trunk/src/test/resources/log4j_conf.xml
    jakarta/commons/sandbox/pipeline/trunk/src/test/resources/test_conf.xml

Added: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Feeder.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Feeder.java?rev=436914&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Feeder.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Feeder.java Fri Aug 25 12:30:39 2006
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline;
+
+import java.util.EventObject;
+
+/**
+ * This interface represents a data channel into which objects can be fed.
+ * Feeders act as intermediaries between stages in a pipeline and the drivers
+ * for subsequent stages. Each {@link StageDriver} implementation will 
+ * ordinarily provide a custom Feeder implementation that integrates receiving
+ * objects with its internal stage processing workflow.
+ *
+ * @author kjn
+ */
+public interface Feeder {
+    /**
+     * This Feeder implementation provides a standard, no-op sink for objects.
+     * It is useful for situations like the terminus of a pipeline, where
+     * there is nothing to be done with a generated object.
+     */
+    public static final Feeder VOID = new Feeder() {
+        public void feed(Object obj) { }
+    };
+    
+    /**
+     * Feeds the specified object to an underlying receiver.
+     * @param obj The object being fed to the receiver.
+     */
+    public void feed(Object obj);
+}

Modified: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java?rev=436914&r1=436913&r2=436914&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java Fri Aug 25 12:30:39 2006
@@ -17,11 +17,18 @@
 package org.apache.commons.pipeline;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
-import org.apache.commons.pipeline.driver.SimpleStageDriver;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.validation.PipelineValidator;
+import org.apache.commons.pipeline.validation.ValidationException;
+import org.apache.commons.pipeline.validation.ValidationFailure;
 
 /**
  * This class represents a processing system consisting of a number of stages
@@ -33,131 +40,258 @@
  * with methods to start and stop processing for all stages, as well as
  * a simple framework for asynchronous event-based communication between stages.
  */
-public final class Pipeline implements Iterable<Stage>, Runnable {
-    private List<StageEventListener> listeners = new ArrayList<StageEventListener>();
+public class Pipeline implements Runnable, StageContext {
+    /**
+     * The branch key for the main line of production. This value is reserved
+     * and may not be used as a key for other branch pipelines.
+     */
+    public static final String MAIN_BRANCH = "main";
+    
+    //The logger used for reporting by this pipeline
+    private final Log log = LogFactory.getLog(Pipeline.class);
+    
+    // List of stages in the pipeline, encapsulated in the drivers
+    // that will be used to run them.
+    private final LinkedList<StageDriver> drivers;
+    private final Map<Stage, StageDriver> driverMap;
+    
+    // The list of stages in the pipeline.
+    private final LinkedList<Stage> stages;
+    
+    // Map of pipeline branches where the keys are branch names.
+    private final Map<String,Pipeline> branches;
+    
+    // The list of listeners registered with the pipeline.
+    private final List<StageEventListener> listeners;
+    
+    // Holds value of property validator.
+    private PipelineValidator validator;
+    
+    // Feeder used to handle output of final stage
+    private Feeder terminalFeeder = Feeder.VOID;
     
     /**
-     * List of stages in the pipeline
+     * Creates and initializes a new Pipeline.
      */
-    protected List<Stage> stages = new ArrayList<Stage>();;
+    public Pipeline() {
+        this.drivers = new LinkedList<StageDriver>();
+        this.driverMap = new HashMap<Stage, StageDriver>();
+        this.stages = new LinkedList<Stage>();
+        this.branches = new HashMap<String,Pipeline>();
+        this.listeners = Collections.synchronizedList(new ArrayList<StageEventListener>());
+    }
     
     /**
-     * Map of pipeline branches where the keys are branch names.
+     * Adds a {@link StageEventListener} to the pipline that will be notified by calls
+     * to {@link Stage#raise(StageEvent)}.
+     * @param listener The listener to be notified.
      */
-    protected Map<String,Pipeline> branches = new HashMap<String,Pipeline>();
+    public void registerListener(StageEventListener listener) {
+        listeners.add(listener);
+    }
     
     /**
-     * Creates a new Pipeline
+     * Returns the collection of {@link StageEventListener}s registered with the
+     * context.
+     * @return The collection of registered listeners.
      */
-    public Pipeline() {
-        stages = new ArrayList<Stage>();
+    public Collection<StageEventListener> getRegisteredListeners() {
+        return this.listeners;
     }
     
     /**
-     * Creates a new Pipeline with the List of Stages
+     * Notifies each registered listener of an event and propagates
+     * the event to any attached branches
+     * @param ev The event to be sent to registered listeners
      */
-    public Pipeline(List<Stage> stages){
-        for (Stage stage: stages){
-            this.addStage(stage);
+    public void raise(final java.util.EventObject ev) {
+        new Thread() {
+            public void run() {
+                for (StageEventListener listener : listeners) {
+                    listener.notify(ev);
+                }
+                
+                for (Pipeline branch : branches.values()) {
+                    if (branch != Pipeline.this) branch.raise(ev);
+                }
         }
+        }.start();
     }
     
     /**
-     * Adds a {@link Stage} object to the end of this Pipeline. The pipeline will use
-     * the specified {@link StageDriver} to run the stage.
-     *
-     * @todo throw IllegalStateException if the stage is being used in a different pipeline
-     */
-    public void addStage(Stage stage, StageDriver driver) {
-        if (stage == null) throw new IllegalArgumentException("Argument \"stage\" for call to Pipeline.addStage(Stage, StageDriver) may not be null.");
-        if (driver == null) throw new IllegalArgumentException("Argument \"driver\" for call to Pipeline.addStage(Stage, StageDriver) may not be null.");
+     * This method is used by a stage driver to pass data from one stage to the next.
+     * @return the feeder for the downstream stage, or null if no downstream
+     * stage exists.
+     * @param stage the stage for which the downstream feeder will be retrieved
+     */
+    public Feeder getDownstreamFeeder(Stage stage) {
+        if (stage == null) throw new IllegalArgumentException("Unable to look up downstream feeder for null stage.");
+        if (stage == drivers.getLast().getStage()) {
+            return this.terminalFeeder;
+        } else {
+            //Iterate backwards over the list until the stage is found, then return
+            //the feeder for the subsequent stage. Comparisons are done using reference
+            //equality.
+            for (int i = drivers.size() - 2; i >= 0; i--) {
+                if (stage == drivers.get(i).getStage()) return drivers.get(i+1).getFeeder();
+            }
         
-        stage.setStageDriver(driver);
-        this.addStage(stage);
+            throw new IllegalStateException("Unable to find stage " + stage + " in pipeline.");
+        }
     }
     
     /**
-     * Adds a {@link Stage} object to the end of this Pipeline.
+     * Look up and return the source feeder for the specified pipeline branch.
+     * @param branch the string identifier of the branch for which a feeder will be returned
+     * @return the feeder for the specified branch
+     */
+    public Feeder getBranchFeeder(String branch) {
+        return branches.get(branch).getSourceFeeder();
+    }
+    
+    /**
+     * Adds a {@link Stage} object to the end of this Pipeline. If a
+     * {@link PipelineValidator} has been set using {@link #setValidator}, it will
+     * be used to validate that the appended Stage can consume the output of the
+     * previous stage of the pipeline. It does NOT validate the ability or availability
+     * of branches to consume data produced by the appended stage.
+     * @param stage the stage to be added to the pipeline
+     * @param driverFactory the factory that will be used to create a {@link StageDriver} that will run the stage
+     * @throws ValidationException if there is a non-null validator set for this pipeline and an error is
+     * encountered validating the addition of the stage to the pipeline.
      */
-    public void addStage(Stage stage){
+    public final void addStage(Stage stage, StageDriverFactory driverFactory) throws ValidationException {
         if (stage == null) throw new IllegalArgumentException("Argument \"stage\" for call to Pipeline.addStage() may not be null.");
-        stage.setPipeline(this);
+        
+        if (validator != null) {
+            List<ValidationFailure> errors = validator.validateAddStage(this, stage, driverFactory);
+            if (errors != null && !errors.isEmpty()) {
+                throw new ValidationException("An error occurred adding stage " + stage.toString(), errors);
+            }
+        }
+        
+        stage.init(this);
         this.stages.add(stage);
+        
+        StageDriver driver = driverFactory.createStageDriver(stage, this);
+        this.driverMap.put(stage, driver);
+        this.drivers.add(driver);
     }
     
     /**
-     * Returns the first stage in the pipeline, or null if there are no stages
+     * Returns an unmodifiable list of stages that have been added to this
+     * pipeline.
+     * @return A list of the stages that have been added to the pipeline
      */
-    public Stage head() {
-        if (stages.size() > 0){
-            return (Stage) stages.get(0);
-        } else {
-            return null;
-        }
+    public final List<Stage> getStages() {
+        return Collections.unmodifiableList(this.stages);
     }
     
     /**
-     * Returns the stage after the specified stage in the pipeline.
+     * Return the StageDriver for the specified Stage.
+     * @return the StageDriver for the specified Stage.
      */
-    public Stage getNextStage(Stage stage) {
-        int nextIndex = stages.indexOf(stage) + 1;
-        return (stages.size() > nextIndex) ? stages.get(nextIndex) : null;
+    public final StageDriver getStageDriver(Stage stage) {
+        return this.driverMap.get(stage);
     }
     
     /**
-     * Returns an Iterator for stages in the pipeline.
+     * Returns an unmodifiable list of stage drivers that have been added
+     * to the pipeline.
+     * @return the list of drivers for stages in the pipeline
      */
-    public Iterator<Stage> iterator() {
-        return stages.iterator();
+    public final List<StageDriver> getStageDrivers() {
+        return Collections.unmodifiableList(this.drivers);
     }
     
     /**
      * Adds a branch to the pipeline.
-     */
-    public void addBranch(String key, Pipeline pipeline) {
-        if (key == null) throw new IllegalArgumentException("Branch key may not be null.");
-        if (pipeline == null) throw new IllegalArgumentException("Illegal attempt to set reference to null branch.");
-        if (pipeline == this || this.hasBranch(pipeline))
+     * @param key the string identifier that will be used to refer to the added branch
+     * @param pipeline the branch pipeline
+     * @throws org.apache.commons.pipeline.validation.ValidationException if the pipeline has a non-null {@link PipelineValidator} and the branch
+     * cannot consume the data produced for the branch by stages in the pipeline.
+     */
+    public void addBranch(String key, Pipeline pipeline) throws ValidationException {
+        if (key == null)
+            throw new IllegalArgumentException("Branch key may not be null.");
+        if (MAIN_BRANCH.equalsIgnoreCase(key))
+            throw new IllegalArgumentException("Branch key name \"" + MAIN_BRANCH + "\" is reserved.");
+        if (pipeline == null)
+            throw new IllegalArgumentException("Illegal attempt to set reference to null branch.");
+        if (pipeline == this || pipeline.hasBranch(this))
             throw new IllegalArgumentException("Illegal attempt to set reference to self as a branch (infinite recursion potential)");
         
+        if (validator != null) {
+            List<ValidationFailure> errors = validator.validateAddBranch(this, key, pipeline);
+            if (errors != null && !errors.isEmpty()) {
+                throw new ValidationException("An error occurred adding branch pipeline " + pipeline, errors);
+            }
+        }
+        
         this.branches.put(key, pipeline);
     }
     
     /**
-     * Runs the pipeline from start to finish.
+     * Returns an unmodifiable map of branch pipelines, keyed by branch identifier.
+     * @return the map of registered branch pipelines, keyed by branch identifier
      */
-    public void run() {
-        try {
-            start();
-            finish();
-        } catch (StageException e) {
-            throw new RuntimeException(e.getMessage(), e);
-        }
+    public Map<String,Pipeline> getBranches() {
+        return Collections.unmodifiableMap(branches);
     }
     
     /**
-     * This method iterates over the stages in the pipeline, looking up a
-     * {@link StageDriver} for each stage and using that driver to start the stage.
-     * Startups may occur sequentially or in parallel, depending upon the stage driver
-     * used.  If a the stage has not been configured with a {@link StageDriver},
-     * we will use the default, non-threaded {@link SimpleStageDriver}.
+     * Simple method that recursively checks whether the specified
+     * pipeline is a branch of this pipeline.
+     * @param pipeline the candidate branch
+     * @return true if the specified pipeline is a branch of this pipeline, or recursively
+     * a branch of a branch. Tests are performed using reference equality.
      */
-    public void start() throws StageException {
-        for (Stage stage: this.stages){
-            StageDriver driver = stage.getStageDriver();
-            if (driver == null){
-                driver = new SimpleStageDriver();
-                stage.setStageDriver(driver);
+    private boolean hasBranch(Pipeline pipeline) {
+        if (branches.containsValue(pipeline)) return true;
+        for (Pipeline branch : branches.values()) {
+            if (branch.hasBranch(pipeline)) return true;
             }
             
-            driver.start(stage);
+        return false;
         }
         
-        for (Pipeline branch : branches.values()) {
-            branch.start();
+    /**
+     * Returns a feeder for the first stage if the pipeline is not empty
+     * @return the feeder to feed the first stage of the pipeline
+     */
+    public Feeder getSourceFeeder() {
+        if (drivers.isEmpty()) return null;
+        return drivers.peek().getFeeder();
         }
+    
+    /**
+     * Gets the feeder that receives output from the final stage.
+     * @return the terminal feeder being used to handle any output from the final stage. The default is {@link Feeder#VOID}
+     */
+    public Feeder getTerminalFeeder() {
+        return this.terminalFeeder;
     }
     
+    /**
+     * Sets the terminal feeder used to handle any output from the final stage.
+     * @param terminalFeeder the {@link Feeder} that will receive any output from the final stage
+     */
+    public void setTerminalFeeder(Feeder terminalFeeder) {
+        this.terminalFeeder = terminalFeeder;
+    }
+    
+    /**
+     * This method iterates over the stages in the pipeline, looking up a
+     * {@link StageDriver} for each stage and using that driver to start the stage.
+     * Startups may occur sequentially or in parallel, depending upon the stage driver
+     * used.  If a the stage has not been configured with a {@link StageDriver},
+     * we will use the default, non-threaded {@link SynchronousStageDriver}.
+     * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during pipeline startup
+     */
+    public void start() throws StageException {
+        for (StageDriver driver: this.drivers) driver.start();
+        for (Pipeline branch : branches.values()) branch.start();
+    }
     
     /**
      * This method iterates over the stages in the pipeline, looking up a {@link StageDriver}
@@ -166,14 +300,11 @@
      * method will block until the stage's queue is exhausted, so this method
      * may be used to safely finalize all stages without the risk of
      * losing data in the queues.
-     *
-     * @throws InterruptedException if a worker thread was interrupted at the time
-     * a stage was asked to finish execution.
+     * @throws org.apache.commons.pipeline.StageException Thrown if there is an unhandled error during stage shutdown
      */
     public void finish() throws StageException {
-        for (Stage stage: this.stages){
-            StageDriver driver = stage.getStageDriver();
-            driver.finish(stage);
+        for (StageDriver driver: this.drivers){
+            driver.finish();
         }
         
         for (Pipeline pipeline : branches.values()) {
@@ -182,57 +313,32 @@
     }
     
     /**
-     * Enqueues an object on the first stage if the pipeline is not empty
-     * @param o the object to enque
-     */
-    public void enqueue(Object o){
-        if (!stages.isEmpty()) stages.get(0).enqueue(o);
-    }
-    
-    /**
-     * This method is used by stages to pass data from one stage to the next.
-     */
-    public void pass(Stage source, Object data) {
-        Stage next = this.getNextStage(source);
-        if (next != null) next.enqueue(data);
-    }
-    
-    /**
-     * Simple method that recursively checks whether the specified
-     * pipeline is a branch of this pipeline.
+     * Runs the pipeline from start to finish.
      */
-    private boolean hasBranch(Pipeline pipeline) {
-        if (branches.containsValue(pipeline)) return true;
-        for (Pipeline branch : branches.values()) {
-            if (branch.hasBranch(pipeline)) return true;
+    public void run() {
+        try {
+            start();
+            finish();
+        } catch (StageException e) {
+            throw new RuntimeException(e);
         }
-        
-        return false;
     }
     
     /**
-     * Adds an EventListener to the pipline that will be notified by calls
-     * to {@link Stage#raise(StageEvent)}.
+     * Returns the validator being used to validate the pipeline structure,
+     * or null if no validation is being performed..
+     * @return Validator used to validate pipeline structure.
      */
-    public void addEventListener(StageEventListener listener) {
-        listeners.add(listener);
+    public PipelineValidator getValidator() {
+        return this.validator;
     }
     
     /**
-     * Sequentially notifies each listener in the list of an event, and propagates
-     * the event to any attached branches
+     * Sets the validator used to validate the pipeline as it is contstructed.
+     * Setting the validator to null disables validation
+     * @param validator Validator used to validate pipeline structure.
      */
-    public void notifyListeners(final java.util.EventObject ev) {
-        new Thread() {
-            public void run() {
-                for (Iterator iter = listeners.iterator(); iter.hasNext();) {
-                    ((StageEventListener) iter.next()).notify(ev);
-                }
-                
-                for (Iterator iter = branches.values().iterator(); iter.hasNext();) {
-                    ((Pipeline) iter.next()).notifyListeners(ev);
-                }
-            }
-        }.start();
+    public void setValidator(PipelineValidator validator) {
+        this.validator = validator;
     }
 }

Modified: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/PipelineFactory.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/PipelineFactory.java?rev=436914&r1=436913&r2=436914&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/PipelineFactory.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/PipelineFactory.java Fri Aug 25 12:30:39 2006
@@ -19,12 +19,15 @@
 import java.util.Map;
 
 /**
- * Simple factory interface for creating pipelines.
+ * Simple factory interface for creating pipelines. This interface is commonly implemented
+ * in different ways to allow creation of a pipeline based upon some external
+ * configuration source instead of in code.
  */
 public interface PipelineFactory {
-    /** Returns a Pipeline created by the factory. */
+    /**
+     * Returns a Pipeline created by the factory.
+     * @throws org.apache.commons.pipeline.PipelineCreationException if there is an error creating the pipeline
+     * @return the newly created pipeline
+     */
     public Pipeline createPipeline() throws PipelineCreationException;
-    
-    /** Configure the factory */
-    public void configure(Map<String,?> context);
 }

Modified: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Stage.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Stage.java?rev=436914&r1=436913&r2=436914&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Stage.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Stage.java Fri Aug 25 12:30:39 2006
@@ -16,193 +16,62 @@
 
 package org.apache.commons.pipeline;
 
-import java.util.Queue;
-
 /**
- * <P>A Stage represents a set of tasks that can be performed on objects
+ * <p>A Stage represents a set of tasks that can be performed on objects
  * in a queue, and methods used to communicate with other stages
- * in a {@link Pipeline}.</P>
- * <P>A Stage must provide a unique {@link StageMonitor} object to allow for
- * proper handling of multiple processing threads to the {@link StageDriver}
- * that runs the stage. Because Stage does not specify the exact behavior of the
- * queue (whether it is capacity-bounded or automatically synchronizes accesses,
- * etc) the monitor is necessary to provide proper synchronization.</P>
- * <P>Stages extending this abstract base class automatically establish a relationship
- * with a pipeline when added to that pipeline.</P>
- */
-public abstract class Stage {
-    
-    /**
-     * The {@link Pipeline} in which this stage will run.
-     */
-    protected Pipeline pipeline;
-    
-    /**
-     * The {@link StageMonitor} that will monitor stage changes and act
-     * as an intermediary between this stage and its {@link StageDriver}.
-     */
-    protected StageMonitor monitor;
-    
-    // StageDriver that is used to run this stage
-    private StageDriver stageDriver;
-    
-    // Queue for objects this stage is to process
-    private Queue<Object> queue;
-    
-    /**
-     * Builds a new stage with the specified queue.
-     */
-    public Stage(Queue<Object> queue) {
-        this.queue = queue;
-    }
-    
-    /**
-     * Default implementation of setPipeline. This method may be overridden
-     * to provide additional initialization functions for when the stage
-     * is added to a pipeline.
-     */
-    protected void setPipeline(Pipeline pipeline) {
-        if (this.pipeline != null) throw new IllegalStateException("A pipeline has already been associated with this stage.");
-        this.pipeline = pipeline;
-    }
-    
-    /**
-     * Enqueues an object on the wrapped queue. Classes wishing to override this
-     * method should override {@link #innerEnqueue(Object)} instead.
-     */
-    public final void enqueue(Object obj) {
-        this.innerEnqueue(obj);
-        if (this.monitor != null) this.monitor.enqueueOccurred();
-    }
-    
-    /**
-     * This protected method is designed to be overridden in cases where
-     * additional processing should be performed when an object is enqueued.
-     * Classes that override this method must also override {@link #poll()} if
-     * the underlying queue supporting the stage is changed.
-     */
-    protected void innerEnqueue(Object obj) {
-        queue.add(obj);
-    }
-    
-    /**
-     * Retrieves an object from the head of the wrapped queue, or null
-     * if the queue is empty. Classes that override this method must also
-     * override {@link #innerEnqueue(Object)} if the underlying queue supporting the
-     * stage is changed.
-     */
-    public Object poll() {
-        synchronized (queue) {
-            return queue.poll();
-        }
-    }
-    
-    /**
-     * Setter for property stageDriver.
-     * @param stageDriver New value of property stageDriver.
+ * in a {@link Pipeline}.</p>
      */
-    protected final void setStageDriver(StageDriver stageDriver) {
-        this.stageDriver = stageDriver;
-        this.monitor = stageDriver.createStageMonitor(this);
-    }
+public interface Stage {
     
     /**
-     * Enqueues the specified object onto the next stage in the pipeline
-     * if such a stage exists.
-     */
-    public final void exqueue(Object obj) {
-        this.pipeline.pass(this, obj);
-    }
-    
-    /**
-     * Enqueues the specified object onto the first stage in the pipeline
-     * branch corresponding to the specified key, if such a brach exists.
-     */
-    public final void exqueue(String key, Object obj) {
-        Pipeline branch = this.pipeline.branches.get(key);
-        if (branch != null && !branch.stages.isEmpty()) {
-            branch.head().enqueue(obj);
-        }
-    }
-    
-    /**
-     * Raises an event on the pipeline. Any listeners registered with the pipeline
-     * will be notified.
-     */
-    public final void raise(java.util.EventObject ev) {
-        this.pipeline.notifyListeners(ev);
-    }
-    
-    /**
-     * Getter for wrapped queue.
-     * @return Value of property queue.
-     */
-    public final Queue getQueue() {
-        return this.queue;
-    }
-    
-    /**
-     * Getter for property stageDriver.
-     * @return Value of property stageDriver.
-     */
-    public final StageDriver getStageDriver() {
-        return this.stageDriver;
-    }
-    
-    /**
-     * Returns the monitor for this stage.
-     */
-    public final StageMonitor getMonitor() {
-        return this.monitor;
-    }
-    
-    /**
-     * Stages may not further override hashCode(). This is necessary to maintain stage
-     * ordering integrity within the pipeline.
+     * <p>Initialization takes place when the stage is added to a pipeline.
+     * Implementations of this method should perform any necessary setup that
+     * is required for the driver to be able to correctly run the stage.</p>
+     * <p><strong>NOTE:</strong> Since this method is run when the stage is 
+     * added to the pipeline, certain information (such as the downstream
+     * feeder for the stage) may not yet be available until the pipeline is
+     * fully constructoed.</p>
+     * @param context the {@link StageContext} within which the stage sill be run
      */
-    public final int hashCode() {
-        int retValue;
-        
-        retValue = super.hashCode();
-        return retValue;
-    }
+    public void init(StageContext context);
     
     /**
      * Implementations of this method should perform any necessary setup that
-     * needs to be done before any data is processed from this Stage's queue.
+     * needs to be done before any data is processed.
+     * Preprocessing is performed after initialization.
      *
-     * @throws StageException an Exception thrown by the implementation should
+     * @throws StageException any checked Exception thrown by the implementation should
      * be wrapped in a {@link StageException}
      */
-    public abstract void preprocess() throws StageException;
+    public void preprocess() throws StageException;
     
     /**
      * Implementations of this method should atomically process a single data
-     * object.
+     * object and transfer any feed objects resulting from this processing to
+     * the downstream {@link Feeder}. This {@link Feeder} can be obtained from 
+     * the stage context made available during {@link #init initialization}.
+     *
+     * NOTE: Implementations of this method must be thread-safe!
      *
-     * @throws ClassCastException if the object is of an incorrect type
-     * for the processing operation
-     * @throws StageException an Exception thrown by the implementation should
+     * @param obj an object to be processed
+     * @throws StageException any checked Exception thrown by the implementation should
      * be wrapped in a {@link StageException}
      */
-    public abstract void process(Object obj) throws StageException;
+    public void process(Object obj) throws StageException;
     
     /**
      * Implementations of this method should do any additional processing or
-     * finalization necessary after all data has been processed. This method
-     * usually runs following a call to the implementing {@link org.apache.commons.pipeline.Pipeline$Stage Stage}'s
-     * {@link StageQueue#finish()} method.
-     *
-     * @throws StageException an Exception thrown by the implementation should
+     * finalization necessary after all data objects have been processed
+     * by the stage.
+     * @throws StageException any checked Exception thrown by the implementation should
      * be wrapped in a {@link StageException}
      */
-    public abstract void postprocess() throws StageException;
+    public void postprocess() throws StageException;
     
     /**
      * Implementations of this method should clean up any lingering resources
      * that might otherwise be left allocated if an exception is thrown during
-     * processing.
+     * processing (or pre/postprocessing).
      */
-    public abstract void release();
-    
+    public void release();
 }

Added: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageContext.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageContext.java?rev=436914&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageContext.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageContext.java Fri Aug 25 12:30:39 2006
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline;
+
+import java.util.Collection;
+import java.util.EventObject;
+
+/**
+ * This interface represents the context in which a stage is run. Ordinarily,
+ * the context will be provided by the pipeline in which the stage is embedded;
+ * however, this interface is also useful for creating isolated test environments
+ * in which a stage can be run.
+ *
+ * @author kjn
+ */
+public interface StageContext {
+    /**
+     * Adds a {@link StageEventListener} to the context that will be notified by calls
+     * to {@link #raise(EventObject)}.
+     * @param listener The listener to be registered with the context.
+     */
+    public void registerListener(StageEventListener listener);
+    
+    /**
+     * Returns the collection of {@link StageEventListener}s registered with the
+     * context.
+     * @return  the collection of {@link StageEventListener}s registered with the
+     * context.
+     */
+    public Collection<StageEventListener> getRegisteredListeners();
+    
+    /**
+     * Notifies each registered listener of an event and propagates
+     * the event to any attached branches
+     * @param ev The event to be passed to registered listeners
+     */
+    public void raise(EventObject ev);
+    
+    /**
+     * Return the source feeder for the specified pipeline branch.
+     * @param branch the string identifer of the branch for which a feeder will be retrieved
+     * @return the {@link Feeder} for the first stage of the specified branch
+     */
+    public Feeder getBranchFeeder(String branch);
+    
+    /**
+     * This method is used by a stage driver to pass data from one stage to the next.
+     * @returns the feeder for the downstream stage, or {@link Feeder#VOID} if no downstream
+     * stage exists.
+     * @param stage The stage from which "downstream" will be determined. Ordinarily a Stage implementation
+     * will call this method with a reference to itself.
+     * @return The {@link Feeder} for the subsequent stage.
+     */
+    public Feeder getDownstreamFeeder(Stage stage);
+}

Modified: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriver.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriver.java?rev=436914&r1=436913&r2=436914&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriver.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriver.java Fri Aug 25 12:30:39 2006
@@ -16,6 +16,7 @@
 
 package org.apache.commons.pipeline;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -25,54 +26,184 @@
  * between the stage, the driver, and the enclosing pipeline.
  */
 public abstract class StageDriver {
-    
     /**
-     * Creates and starts new worker thread(s) to process items in the queue.
+     * This enumeration represents possible states of the a stage driver during
+     * processing.
      */
-    public final void start(Stage stage) throws StageException {
-        synchronized (stage) {
-            if (stage.monitor == null || stage.monitor.getState() == StageMonitor.State.STOPPED) {
-                if (stage.monitor == null) stage.setStageDriver(this); // this will set the monitor on the stage using a callback.
-                stage.monitor.startRequested();
-                this.startInternal(stage);
+    public enum State { 
+        /** The stage driver has started and the preprocess() method is being run. */
+        STARTED, 
+        /** Preprocessing is complete and objects are being processed.*/
+        RUNNING, 
+        /** A stop has been requested - the stage will finish processing, 
+         * then postprocess and shut down. */
+        STOP_REQUESTED, 
+        /** Postprocessing tasks are complete; the stage is shutting down. */
+        FINISHED, 
+        /** Resources have been released and all stage activity has stopped. 
+         * This is the default state. */
+        STOPPED, 
+        /** A fatal error has occurred that has caused the driver to stop in an 
+         * inconsistent state. The driver cannot be restarted from the error state. 
+         * The error(s) can be obtained using the getFatalErrors method. */
+        ERROR 
             }
+    
+    /**
+     * The stage to run.
+     */
+    protected Stage stage;
+    
+    /**
+     * The context for the stage being run
+     */
+    protected StageContext context;
+    
+    /**
+     * List of processing failures that have occurred.
+     */
+    protected List<ProcessingFailure> processingFailures = new ArrayList<ProcessingFailure>();
+
+    /**
+     * List of errors that have occurred.
+     */
+    protected List<Throwable> errors = new ArrayList<Throwable>();
+    
+    /**
+     * Creates a StageDriver for the specified stage.
+     * @param stage The stage for which the driver will be created
+     * @param context The context in which to run the stage
+     */
+    public StageDriver(Stage stage, StageContext context) {
+        if (stage == null) throw new IllegalArgumentException("Stage may not be null.");
+        if (context == null) throw new IllegalArgumentException("Context may not be null.");
+        this.stage = stage;
+        this.context = context;
         }
+    
+    /**
+     * Returns the Stage being run by this StageDriver.
+     * @return The stage being run by this StageDriver instance
+     */
+    public Stage getStage() {
+        return this.stage;
     }    
     
     /**
-     * Implementations of this method must guarantee that the
-     * {@link StageMonitor#driverStarted()} method will be called on the
-     * specified stage's {@link Stage#getStageMonitor() monitor} when
-     * preprocessing is complete.
+     * This method is used to provide a communication channel between the context 
+     * in which the driver is being run and the managed stage.
+     * @return the Feeder used to feed objects to the managed stage for processing.
+     */
+    public abstract Feeder getFeeder();
+    
+    /**
+     * Returns the current state of stage processing.
+     * @return The current state
+     */
+    public abstract State getState();
+
+    /**
+     * This method is used to start the driver, run the 
+     * {@link Stage#preprocess() preprocess()} method of the attached stage
+     * and to then begin processing any objects fed to this driver's Feeder.
+     *
+     * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during stage startup. In most cases, such errors
+     * will be handled internally by the driver.
      */
-    protected abstract void startInternal(Stage stage) throws StageException;
+    public abstract void start() throws StageException;
     
     /**
      * This method waits for the stage(s) queue(s) to empty and any processor thread(s) to exit
      * cleanly and then calls release() to release any resources acquired during processing, if possible.
+     * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during driver shutdown. Ordinarily such 
+     * exceptions will be handled internally.
      */
-    public final void finish(Stage stage) throws StageException {
-        synchronized (stage) {
-            if (stage.monitor != null && stage.monitor.getState() != StageMonitor.State.STOPPED) {
-                stage.monitor.stopRequested();
-                this.finishInternal(stage);
+    public abstract void finish() throws StageException;
+
+    /**
+     * Returns a list of unrecoverable errors that occurred during stage
+     * processing.
+     * @return A list of unrecoverable errors that occurred during stage processing.
+     */
+    public List<Throwable> getFatalErrors() {
+        return this.errors;
             }
+    
+    /**
+     * Store a fatal error.
+     * @param error The error to be stored for later analysis
+     */
+    protected void recordFatalError(Throwable error) {
+        this.errors.add(error);
         }
+    
+    /**
+     * Returns a list of errors that occurred while processing data objects,
+     * along with the objects that were being processed when the errors
+     * were generated.
+     * @return The list of non-fatal processing errors.
+     */
+    public List<ProcessingFailure> getProcessingFailures() {
+        return this.processingFailures;
     }
     
     /**
-     * Implementations of this method must guarantee that the 
-     * {@link StageMonitor#driverStopped()} method will be called on the
-     * specified stage's {@link Stage#getStageMonitor() monitor} when
-     * postprocessing is complete and all stage resources have been released.
+     * Store processing failure information for the specified data object.
+     * @param data The data being processed at the time of the error
+     * @param error The error encountered
      */
-    protected abstract void finishInternal(Stage stage) throws StageException;
+    protected void recordProcessingFailure(Object data, Throwable error) {
+        this.processingFailures.add(new ProcessingFailure(data, error));
+    }
     
     /**
-     * This factory method must return a new instance of a {@link StageMonitor}
-     * that can be used to monitor processing for the specified stage
-     * in conjunction with this driver.
+     * FailedProcess objects are used to store detailed information about
+     * processing failures including the failing data, the driver state
+     * at the time of failure
+     */
+    public class ProcessingFailure {
+        private Object data;
+        private Throwable throwable;
+        private State driverState;
+        
+        /** Creates a new instance of FailedProcess
+         *@param data The object which was not able to be processed.
+         *@param throwable The exception that occurred.
+         */
+        protected ProcessingFailure(Object data, Throwable throwable){
+            this.data = data;
+            this.throwable = throwable;
+            this.driverState = StageDriver.this.getState();
+        }
+        
+        /** Returns the data
+         *@return The object which was not able to be processed.
      */
-    protected abstract StageMonitor createStageMonitor(Stage stage);
+        public Object getData(){
+            return this.data;
+        }
+        
+        /** Returns the exception
+         *@return The throwable
+         */
+        public Throwable getThrowable(){
+            return this.throwable;
+        }
+        
+        /** Returns the stage which threw the exception.
+         *@return Stage
+         */
+        public Stage getStage() {
+            return StageDriver.this.getStage();
+        }
+        
+        /**
+         * This method is used to determine what stage driver handled a particular error.
+         * @return A reference to the stage driver that encountered the error.
+         */
+        public StageDriver getStageDriver() {
+            return StageDriver.this;
+        }
+    }
     
 }

Added: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriverFactory.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriverFactory.java?rev=436914&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriverFactory.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriverFactory.java Fri Aug 25 12:30:39 2006
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline;
+
+/**
+ * This interface represents a factory that is used by a {@link Pipeline} to create
+ * a driver for a {@link Stage} when that stage is added to the pipeline. The factory
+ * pattern is used here to ensure that each stage is run by a unique driver
+ * instance.
+ * @author kjn
+ */
+public interface StageDriverFactory {
+    /**
+     * This method is used to create a driver that will run the specified stage
+     * in the specified context.
+     * @param stage The stage to be run by the newly created driver.
+     * @param context The context in which the stage will be run
+     * @return The newly created driver
+     */
+    public StageDriver createStageDriver(Stage stage, StageContext context);
+}

Modified: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageEventListener.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageEventListener.java?rev=436914&r1=436913&r2=436914&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageEventListener.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageEventListener.java Fri Aug 25 12:30:39 2006
@@ -20,12 +20,15 @@
 import java.util.EventObject;
 
 /**
- * Listener interface for {@link StageEvent}s
+ * Listener interface for {@link EventObject}s. Listeners are notified of
+ * events by the {@link StageContext} with which they are registered.
  */
 public interface StageEventListener extends EventListener {
     
     /**
      * Notify this listener of a {@link StageEvent}
+     * @param ev The event to be handled. Listeners should gracefully ignore any events that they
+     * do not know how to handle.
      */
     public abstract void notify(EventObject ev);
 }

Modified: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageException.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageException.java?rev=436914&r1=436913&r2=436914&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageException.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageException.java Fri Aug 25 12:30:39 2006
@@ -32,7 +32,7 @@
     
     /**
      * Constructs an instance of <code>StageException</code> with the specified detail message.
-     * @param msg the detail message.
+     * @param msg the detail message for the error that occurred.
      */
     public StageException(String msg) {
         super(msg);
@@ -40,7 +40,7 @@
     
     /**
      * Constructs an instance of <code>StageException</code> with the specified cause.
-     * @param msg the detail message.
+     * @param thr The underlying exception that caused this exception to be thrown.
      */
     public StageException(Throwable thr) {
         super(thr);
@@ -60,6 +60,7 @@
     
     /**
      * Creates a new instance of <code>StageException</code> without detail message.
+     * @param source the stage that was the source of the exception
      */
     public StageException(Stage source) {
         this.source = source;
@@ -68,6 +69,7 @@
     
     /**
      * Constructs an instance of <code>StageException</code> with the specified detail message.
+     * @param source the stage that was the source of the exception
      * @param msg the detail message.
      */
     public StageException(Stage source, String msg) {
@@ -78,6 +80,7 @@
     
     /**
      * Constructs an instance of <code>StageException</code> with the specified detail message and cause
+     * @param source the stage where the error occurred
      * @param msg the detail message.
      * @param cause Throwable that caused this exception.
      */
@@ -89,6 +92,7 @@
     
     /**
      * Returns a reference to the Stage object where the exception occurred.
+     * @return a reference to the Stage object where the exception occurred.
      */
     public Stage getSource() {
         return this.source;

Modified: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java?rev=436914&r1=436913&r2=436914&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java Fri Aug 25 12:30:39 2006
@@ -17,7 +17,9 @@
 package org.apache.commons.pipeline.config;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -25,10 +27,13 @@
 import org.apache.commons.digester.RuleSet;
 import org.apache.commons.pipeline.PipelineCreationException;
 import org.apache.commons.pipeline.Pipeline;
+import org.xml.sax.SAXException;
 
 
 /**
  * This factory is designed to simplify creating a pipeline using Digester.
+ * @see  PipelineRuleSet for additional information on the format of the
+ * XML configuration file.
  */
 public class DigesterPipelineFactory implements org.apache.commons.pipeline.PipelineFactory {
     
@@ -39,73 +44,49 @@
      * Factory will create a pipeline from the specified Digester configuration file
      * if this filename is not null.
      */
-    private String configFile;
-    
-    /**
-     * Factory will create a pipeline from this input stream if it is not null. Useful for instances where
-     * a pipeline configuration is being read from inside a jarfile.
-     */
-    private InputStream configStream;
-        
-    /** 
-     * A factory created by this constructor will create a pipeline from the specified
-     * Digester creation file.
-     */
-    public DigesterPipelineFactory(String configFile) {
-        this.init();
-        this.configFile = configFile;
-    }
-    
+    private URL confURL;
     
     /**
      * A factory created by this constructor will create a pipeline from the specified
-     * input stream. Useful for instances where a pipeline configuration is being read from inside a jarfile.
+     * XML configuration file.
+     * @param configFile the XML file containing pipeline configuration information
      */
-    public DigesterPipelineFactory(InputStream configStream) {
-        this.init();
-        this.configStream = configStream;
-    }
+    public DigesterPipelineFactory(URL confURL) {
+        if (confURL == null) throw new IllegalArgumentException("Configuration file URL may not be null.");
+        this.confURL = confURL;
     
-    
-    /**
-     * Adds the base RuleSets to the digester configuration.
-     */
-    private void init() {
         //PipelineRuleSet needs a reference to {@link org.apache.commons.digester.RuleSet RuleSet}s
         //used to parse the configuration file in case configuration is split up between multiple
         //files.
         ruleSets.add(new PipelineRuleSet(ruleSets));        
     }
     
-    
-    /** Creates a new pipeline */
+    /**
+     * Creates a new pipeline based upon the configuration of this factory instance.
+     * @throws org.apache.commons.pipeline.PipelineCreationException Thrown if an error is encountered parsing the configuration file.
+     * @return The newly created pipeline instance
+     */
     public Pipeline createPipeline() throws PipelineCreationException {
         try {
-            if (this.configFile != null) {
                 Digester digester = new Digester();
                 this.initDigester(digester);
                 
-                File conf = new File(configFile);
-                return (Pipeline) digester.parse(conf);
+            InputStream in = confURL.openStream();
+            try {
+                return (Pipeline) digester.parse(in);
+            } finally {
+                in.close();
             }
-            else if (this.configStream != null) {
-                Digester digester = new Digester();
-                this.initDigester(digester);
-                
-                return (Pipeline) digester.parse(configStream);
-            }
-            else {
-                throw new IllegalStateException("No configuration file or stream found.");
-            }
-        }
-        catch (Exception e) {
-            throw new PipelineCreationException(e.getMessage(), e);
+        } catch (IOException e) {
+            throw new PipelineCreationException("An IOException occurred reading the configuration file: " + e.getMessage(), e);
+        } catch (SAXException e) {
+            throw new PipelineCreationException("A formatting error exists in the configuration file: " + e.getMessage(), e);
         }
     }
     
-    
     /**
      * Initialize a Digester instance with the rule sets provided to this factory.
+     * @param digester The digester instance to be initialized
      */
     public void initDigester(Digester digester) {
         for (Iterator iter = ruleSets.iterator(); iter.hasNext();) {
@@ -113,35 +94,32 @@
         }
     }
     
-    
     /**
      * Adds a RuleSet to the list of rules available to Digester for parsing
      * the configuration file.
+     * @param ruleSet The rule set to be added to the Digester
      */
     public void addRuleSet(RuleSet ruleSet) {
         this.ruleSets.add(ruleSet);
     }
     
-    
-    /**
-     * No-op implementation - all configuration information exists in the XML file.
-     */
-    public void configure(java.util.Map<String,?> context) {
-    }
-    
-    
     /**
      * The simplest possible main method that creates a pipeline from a configuration file,
      * then runs the pipeline processing from start to finish.
      *
+     * When run from the command line, the only argument to this method should be
+     * the path to the configuration file.
+     *
      * @param argv the command line arguments
      */
     public static void main(String[] argv) {
         try {
-            DigesterPipelineFactory factory = new DigesterPipelineFactory(argv[0]);
+            File configFile = new File(argv[0]);
+            
+            DigesterPipelineFactory factory = new DigesterPipelineFactory(configFile.toURL());
             Pipeline pipeline = factory.createPipeline();
             for (int i = 1; i < argv.length; i++) {
-                pipeline.enqueue(argv[i]);
+                pipeline.getSourceFeeder().feed(argv[i]);
             }
             
             System.out.println("Pipeline created, about to begin processing...");
@@ -150,8 +128,7 @@
             pipeline.finish();
             
             System.out.println("Pipeline successfully finished processing. See logs for details.");
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             e.printStackTrace(System.err);
         }
     }

Modified: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/PipelineRuleSet.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/PipelineRuleSet.java?rev=436914&r1=436913&r2=436914&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/PipelineRuleSet.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/PipelineRuleSet.java Fri Aug 25 12:30:39 2006
@@ -16,49 +16,100 @@
 
 package org.apache.commons.pipeline.config;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.commons.digester.*;
 import org.apache.commons.pipeline.*;
 import org.xml.sax.Attributes;
 
 /**
- * <P>This is a Digester RuleSet that provides rules for parsing a process pipeline
- * XML file.</P>
+ * This is a Digester RuleSet that provides rules for parsing a pipeline
+ * XML file.
  *
  * The rules defined by this object are used for parsing the following tags:
  * <ul>
- *  <li>&lt;pipeline&gt;&lt;/pipeline&gt; - The root element of the
- *  XML configuration file for a pipeline. This tag supports two attributes
- *  that are for use only when configuring branch pipelines, <code>key</code>
- *  and <code>configURI</code>. These attributes are described more fully
- *  below in the %lt;branch&gt; documentation.</li>
- *
- *  <li>&lt;stage className="<i>name</i>" queueClass="<i>name</i>" ... &gt;&lt;/stage&gt; - A single stage is
- *  created and configured using this tag. It is a child of &lt;pipeline&gt;. Stages
- *  created in this manner are added to the pipeline in the order that they
- *  occur in the configuration file. The class of the stage is specified by the
- *  <i>className</i> attribute; all other attributes are used by Digester to set bean
- *  properties on the newly created Stage object. </li>
- *
- *  <li>&lt;enqueue/&gt; - Enqueue an object onto the first stage in the pipeline.</li>
- *  <li>&lt;branch/%gt; - Add a branch to a pipeline. The contents of this tag should
- *  be one or more &lt;pipeline/&gt;s. Branch pipelines added in this fashion must
- *  be configured with an attribute named <code>key</code> that holds the name by
- *  which the pipeline will be referred to by {@link org.apache.commons.pipeline.StageHandler StageHandler}s.
+ *     <li>
+ *         <code>&lt;pipeline&gt;&lt;/pipeline&gt;</code><br/>
+ *         The root element of the XML configuration file for a pipeline. This tag 
+ *         supports two optional attributes that are for use only when configuring 
+ *         branch pipelines, <code>key</code> and <code>configURI</code>. These 
+ *         attributes are described more fully below in the <code>&lt;branch&gt;</code>
+ *         documentation.
+ *     </li>     
+ *     <li>
+ *         <code>&lt;driverFactory className="<em>MyDriverFactory</em>" id="<em>my_id</em>" ... /&gt;</code><br/>
+ *         This tag is used to create and configure a {@link StageDriverFactory} that 
+ *         will be used to create {@link StageDriver} instances for stages
+ *         in the parent pipeline. Each {@link StageDriverFactory} is identified by a unique
+ *         string identifier that is used by the <code>&lt;stage&gt;</code> tag
+ *         to identify the factory used to create the driver for the stage.
+ *         The class of the factory (which must supply a no-argument constructor)
+ *         is specified by the <code>className</code> attribute, and all other
+ *         additional attributes are used by Digester to configure the associated properties
+ *         (using standard Java bean naming conventions) of the driver instance created.
+ *     </li>
+ *     <li>
+ *         <code>&lt;stage className="<em>MyStageClass</em>" driverFactoryId="<i>name</i>" ... &gt;&lt;/stage&gt;</code><br/>
+ *         A single stage is created, configured, and added to the parent pipeline using 
+ *         this tag. Stages created in this manner are added to the pipeline in the order 
+ *         that they occur in the configuration file. The class of the stage (which must
+ *         provide a no-argument constructor) is specified by the <em>className</em> attribute.
+ *         Each stage should be associated with a previously declared driver factory
+ *         by use of the <code>driverFactoryId</code> attribute. All other attributes are 
+ *         used by Digester to set bean properties on the newly created Stage object. 
+ *     </li>
+ *     <li>
+ *         <code>&lt;feed/&gt;</code><br/>
+ *         Enqueue an object onto the first stage in the pipeline. There are two 
+ *         types of usage available, provided by the following subtags:
+ *         <ul>
+ *             <li>
+ *                 <code>&lt;value&gt;my_value&lt;/value&gt;</code><br/>
+ *                 Feed the string value of the body of this tag to the first stage in the
+ *                 pipeline.    
+ *             </li>
+ *             <li>
+ *                 <code>&lt;object className="MyClass" ... /&gt;</code><br/>
+ *                 This tag uses the standard Digester ObjectCreateRule to create an
+ *                 arbitrary object of the specified class (which must provide a
+ *                 no-argument constructor) to the first stage in the pipeline.
+ *                 All attributes other than <code>className</codee> are used by 
+ *                 Digester to set bean properties on the newly created object.
+ *             </li>             
+ *         </ul>
+ *     </li>
+ *     <li>
+ *         <code>&lt;branch/&gt;</code><br/>
+ *         Add a branch to a pipeline. The contents of this tag should
+ *         be one or more <code>&lt;pipeline/&gt;</code> declarations. Branch 
+ *         pipelines added in this fashion must be configured with an attribute 
+ *         named <code>key</code> that holds the name by which the branch pipeline 
+ *         will be referred to.<br/>
  *  Branch pipelines may be configured either inline in the main configuration
  *  file or in a separate file referred to by the <code>configURI</code> pipeline
  *  attribute.
+ *     </li>
  * </ul>
  */
 public class PipelineRuleSet extends RuleSetBase {
     private static Class[] addBranchTypes = { String.class, Pipeline.class };
     private List<RuleSet> nestedRuleSets;
     
-    /** Creates a new instance of ChainRuleSet */
+    /**
+     * Creates a new instance of the rule set used by Digester to configure a pipeline.
+     */
     public PipelineRuleSet() {
     }
     
-    /** Creates a new instance of ChainRuleSet */
+    /**
+     * Creates a new pipeline rule set with the specified collection of additional
+     * rule sets that may be used for recursive creation of branch pipelines.
+     * @param nestedRuleSets A list of other RuleSet instances that are being used in conjunction with the 
+     * PipelineRuleSet. In the case that branch pipelines are referred to by URI, these
+     * rule sets will be added to a new Digester instance (along with a PipelineRuleSet
+     * instance) that is used to parse the branch configuration file.
+     */
     public PipelineRuleSet(List<RuleSet> nestedRuleSets) {
         this.nestedRuleSets = nestedRuleSets;
     }
@@ -66,49 +117,58 @@
     /**
      * Adds the rule instances for pipeline, stage, and enqueue
      * tasks to the Digester instance supplied.
+     * @param digester The Digester instance to which the rules should be added.
      */
     public void addRuleInstances(Digester digester) {
-        ObjectCreationFactory factory = new PipelineFactory();
+        ObjectCreationFactory pipelineFactory = new PipelineFactory();
+        ObjectCreationFactory driverFactoryFactory = new StageDriverFactoryFactory();
         
         //rules to create pipeline
-        digester.addFactoryCreate("pipeline", factory);
+        digester.addFactoryCreate("pipeline", pipelineFactory);
         digester.addSetProperties("pipeline");
+        digester.addRule("pipeline", new PipelineDriverFactoriesRule());
                 
         // these rules are used to add subchains to the main pipeline
-        digester.addFactoryCreate("*/branch/pipeline", factory);
+        digester.addFactoryCreate("*/branch/pipeline", pipelineFactory);
         digester.addRule("*/branch/pipeline", new CallMethodRule(1, "addBranch", 2, addBranchTypes));
         digester.addCallParam("*/branch/pipeline", 0, "key");
         digester.addCallParam("*/branch/pipeline", 1, 0);
+        digester.addRule("*/branch/pipeline", new PipelineDriverFactoriesRule());
         
-        //this rule is intended to be used to add a pipeline element. the ChainLogger is
-        //simply the default if no pipeline element class is specified
+        //this rule is intended to be used to add a StageEventListener to the pipeline.
+        digester.addObjectCreate("*/pipeline/listener", "org.apache.commons.pipeline.StageEventListener", "className");
+        digester.addSetProperties("*/pipeline/listener");
+        digester.addSetNext("*/pipeline/listener", "registerListener", "org.apache.commons.pipeline.StageEventListener");
+        
+        //this rule is intended to be used to add a StageDriverFactory to the pipeline.
+        digester.addFactoryCreate("*/pipeline/driverFactory", driverFactoryFactory);
+        digester.addSetProperties("*/pipeline/driverFactory");
+        
+        //this rule is intended to be used to add a stage to a pipeline
         digester.addObjectCreate("*/pipeline/stage", "org.apache.commons.pipeline.BaseStage", "className");
         digester.addSetProperties("*/pipeline/stage");
-        digester.addRule("*/pipeline/stage", new CallMethodRule(1, "addStage", 2, new Class[] { Stage.class, StageDriver.class }));
-        digester.addCallParam("*/pipeline/stage", 0, true);
-        
-        //this rule is used to create a stage driver for a specific stage
-        digester.addObjectCreate("*/pipeline/stage/stageDriver", "org.apache.commons.pipeline.driver.SingleThreadStageDriver", "className");
-        digester.addSetProperties("*/pipeline/stage/stageDriver");
-        digester.addCallParam("*/pipeline/stage/stageDriver", 1, true);
+        digester.addRule("*/pipeline/stage", new PipelineAddStageRule());
         
-        //rule for enqueuing string onto the first stage in a pipeline
-        digester.addCallMethod("*/stage/enqueue/value", "enqueue", 0);
+        //rule for feeding a string value to the source feeder
+        digester.addRule("*/pipeline/feed/value", new PipelineFeedValueRule());
         
         //rules for enqueueing an object
-        digester.addObjectCreate("*/stage/enqueue/object", "java.lang.Object", "className");
-        digester.addSetProperties("*/stage/enqueue/object");
-        digester.addSetNext("*/stage/enqueue/object", "enqueue", "java.lang.Object");
+        digester.addObjectCreate("*/pipeline/feed/object", "java.lang.Object", "className");
+        digester.addSetProperties("*/pipeline/feed/object");
+        digester.addRule("*/pipeline/feed/object", new PipelineFeedObjectRule());
     }
             
-    
+    /**
+     * This factory is used to create a pipeline. If the "configURI" parameter
+     * is specified, the pipeline is created based upon the configuration file
+     * located at that URI.
+     */
     private class PipelineFactory extends AbstractObjectCreationFactory {        
         public Object createObject(Attributes attributes) throws java.lang.Exception {
             String configURI = attributes.getValue("configURI");
             if (configURI == null) {
                 return new Pipeline();
-            }
-            else {
+            } else {
                 Digester subDigester = new Digester();
                 if (nestedRuleSets != null) {
                     for (RuleSet ruleset : nestedRuleSets) {
@@ -117,20 +177,92 @@
                     
                     Pipeline pipeline = (Pipeline) subDigester.parse(configURI);
                     return pipeline;
-                }
-                else {
+                } else {
                     throw new IllegalStateException("Unable to parse branch configuration file: No parsing rules provided to PipelineRuleSet constructor.");
                 }
             }
         }
     }
     
+    /**
+     * Configure the storage for the map of driver factories for the pipeline.
+     */
+    private class PipelineDriverFactoriesRule extends Rule {
+        public void begin(String namespace, String name, Attributes attributes) throws Exception {
+            digester.push("org.apache.commons.pipeline.config.DriverFactories", new HashMap<String, StageDriverFactory>());
+            super.begin(namespace, name, attributes);
+        }
     
-    private class StageCompletionRule extends Rule {
         public void end(String namespace, String name) throws Exception {
+            super.end(namespace, name);
+            digester.pop("org.apache.commons.pipeline.config.DriverFactories");
+        }
+    }
 
+    /**
+     * This ObjectCreationFactory creates a stage driver factory and stores
+     * it in the scope of the rule set so that it can be retrieved by the stage
+     * creation rule.
+     */
+    private class StageDriverFactoryFactory extends AbstractObjectCreationFactory {
+        public Object createObject(Attributes attributes) throws Exception {
+            Map<String, StageDriverFactory> driverFactories =
+                    (Map<String,StageDriverFactory>) digester.peek("org.apache.commons.pipeline.config.DriverFactories");
+            
+            String className = attributes.getValue("className");
+            String id = attributes.getValue("id");
+            Class clazz = Class.forName(className);
+            if (!StageDriverFactory.class.isAssignableFrom(clazz)) {
+                throw new IllegalArgumentException("Class " + clazz + " does not implement StageDriverFactory.");
+            } else {
+                StageDriverFactory factory = (StageDriverFactory) clazz.newInstance();
+                driverFactories.put(id, factory);
+                return factory;
+            }
+        }
+    }
+    
+    /**
+     * This Rule adds a stage to the pipeline using the factory specified
+     * by the driverFactoryId attribute.
+     */
+    private class PipelineAddStageRule extends Rule {
+        public void begin(String namespace, String name, Attributes attributes) throws Exception {
+            digester.push("org.apache.commons.pipeline.config.DriverFactoryIds", attributes.getValue("driverFactoryId"));
+            super.begin(namespace, name, attributes);
+        }
+        
+        public void end(String namespace, String name) throws Exception {
             super.end(namespace, name);
+            String factoryId = (String) digester.pop("org.apache.commons.pipeline.config.DriverFactoryIds");
+            Map<String, StageDriverFactory> driverFactories =
+                    (Map<String,StageDriverFactory>) digester.peek("org.apache.commons.pipeline.config.DriverFactories");
+            StageDriverFactory factory = driverFactories.get(factoryId);
+            Stage stage = (Stage) digester.peek();
+            Pipeline pipeline = (Pipeline) digester.peek(1);
+            pipeline.addStage(stage, factory);
+        }
         }
         
+    /**
+     * This Rule allows an object to be fed to the pipeline.
+     */
+    private class PipelineFeedValueRule extends Rule {
+        public void body(String namespace, String name, String text) throws Exception {
+            Pipeline pipeline = (Pipeline) digester.peek();
+            pipeline.getSourceFeeder().feed(text);
+            super.body(namespace, name, text);
+        }
+    }
+    
+    /**
+     * This Rule allows an object to be fed to the pipeline.
+     */
+    private class PipelineFeedObjectRule extends Rule {
+        public void end(String namespace, String name) throws Exception {
+            super.end(namespace, name);
+            Pipeline pipeline = (Pipeline) digester.peek(1);
+            pipeline.getSourceFeeder().feed(digester.peek());
+        }
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commons-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: commons-dev-help@jakarta.apache.org


Mime
View raw message