commons-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rwins...@apache.org
Subject cvs commit: jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl SingleThreadStageDriver.java FileFinderStage.java FtpFileDownloadStage.java HttpFileDownloadStage.java LogStage.java SingleThreadStageQueue.java
Date Fri, 26 Nov 2004 10:53:55 GMT
rwinston    2004/11/26 02:53:55

  Modified:    pipeline/src/java/org/apache/commons/pipeline Pipeline.java
                        StageEventListener.java
               pipeline/src/java/org/apache/commons/pipeline/config
                        PipelineRuleSet.java
               pipeline/src/java/org/apache/commons/pipeline/impl
                        FileFinderStage.java FtpFileDownloadStage.java
                        HttpFileDownloadStage.java LogStage.java
  Added:       pipeline/src/java/org/apache/commons/pipeline BaseStage.java
                        PipelineCreationException.java PipelineFactory.java
                        Stage.java StageDriver.java StageMonitor.java
               pipeline/src/java/org/apache/commons/pipeline/config
                        DigesterPipelineFactory.java
               pipeline/src/java/org/apache/commons/pipeline/impl
                        SingleThreadStageDriver.java
  Removed:     pipeline/src/java/org/apache/commons/pipeline
                        StageHandler.java StageQueue.java
               pipeline/src/java/org/apache/commons/pipeline/config
                        PipelineFactory.java
               pipeline/src/java/org/apache/commons/pipeline/impl
                        SingleThreadStageQueue.java
  Log:
  Updated pipeline repository as per Kris Nuttycombe's changes (PR 32299)
  
  Revision  Changes    Path
  1.2       +133 -133  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/Pipeline.java
  
  Index: Pipeline.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/Pipeline.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- Pipeline.java	2 Oct 2004 19:01:19 -0000	1.1
  +++ Pipeline.java	26 Nov 2004 10:53:53 -0000	1.2
  @@ -4,19 +4,24 @@
    * Licensed under the Apache License, Version 2.0 (the "License");
    * you may not use this file except in compliance with the License.
    * You may obtain a copy of the License at
  - * 
  + *
    *     http://www.apache.org/licenses/LICENSE-2.0
  - * 
  + *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
  - * limitations under the License. 
  + * limitations under the License.
    */
   
   package org.apache.commons.pipeline;
   
  +import java.lang.Iterable;
   import java.util.*;
  +import org.apache.commons.collections.OrderedMap;
  +import org.apache.commons.collections.OrderedMapIterator;
  +import org.apache.commons.collections.map.ListOrderedMap;
  +
   
   
   /**
  @@ -32,64 +37,137 @@
    * @author Kris Nuttycombe, National Geophysical Data Center
    * @version $Revision$
    */
  -public final class Pipeline {
  -    private List stages = new LinkedList();
  -    private List listeners = new ArrayList();
  -    private Map branches = new HashMap();
  +public final class Pipeline implements Iterable<Stage>, Runnable {
  +    private List<StageEventListener> listeners = new ArrayList<StageEventListener>();
  +
  +    /**
  +     * Ordered map of stages in the pipeline where the keys are stages and the
  +     * values are the associated StageDrivers.
  +     */
  +    protected OrderedMap stages = new ListOrderedMap();
  +    //private OrderedMap<Stage,StageDriver> stages = new ListOrderedMap<Stage,StageDriver>();
  +    
  +    /**
  +     * Map of pipeline branches where the keys are branch names.
  +     */
  +    protected Map<String,Pipeline> branches = new HashMap<String,Pipeline>();
       
       
  -    /** Creates a new Pipeline */
  +    /**
  +     * Creates a new Pipeline
  +     */
       public Pipeline() {  }
       
       
       /**
  -     * Adds a Stage object to the end of this Pipeline.
  +     * Adds a Stage object to the end of this Pipeline. The pipeline will use
  +     * the specified StageDriver to run the stage.
  +     *
  +     * It is critical that all stages added to a pipeline have distinct hash codes
  +     * to maintain stage ordering integrity. For this reason, it is
  +     * strongly suggested that Stage implementations <i>do not</i> override
  +     * the default {@link java.lang.Object#hashCode() hashCode()} implementation
  +     * in java.lang.Object.
        *
        * @todo throw IllegalStateException if the stage is being used in a different pipeline
        */
  -    public final void addStage(Stage stage) {
  -        if (!stages.isEmpty()) ((Stage) stages.get(stages.size() - 1)).next = stage;
  -        stage.pipeline = this;
  -        stages.add(stage);
  +    public void addStage(Stage stage, StageDriver driver) {
  +        if (stage == null) throw new IllegalArgumentException("Argument \"stage\" for call to Pipeline.addStage() may not be null.");
  +        if (driver == null) throw new IllegalArgumentException("Argument \"driver\" for call to Pipeline.addStage() may not be null.");
  +        
  +        stage.setPipeline(this);        
  +        this.stages.put(stage, driver);
       }
       
       
       /**
  +     * Returns the first stage in the pipeline
  +     */
  +    public Stage head() {
  +        return (Stage) stages.firstKey();
  +    }
  +    
  +    /**
  +     * Returns the stage after the specified stage in the pipeline
  +     */
  +    public Stage getNextStage(Stage stage) {
  +        return (Stage) stages.nextKey(stage);
  +    }
  +    
  +    /**
  +     * Returns an Iterator for stages in the pipeline.
  +     */
  +    public Iterator<Stage> iterator() {
  +        return (Iterator<Stage>) stages.mapIterator();
  +    }
  +    
  +    /**
        * Adds a branch to the pipeline.
        */
  -    public final void addBranch(String key, Pipeline pipeline) {
  +    public void addBranch(String key, Pipeline pipeline) {
           if (key == null) throw new IllegalArgumentException("Branch key may not be null.");
           if (pipeline == null) throw new IllegalArgumentException("Illegal attempt to set reference to null branch.");
  -        if (pipeline == this) throw new IllegalArgumentException("Illegal attempt to set reference to self as a branch (infinite recursion potential)");
  +        if (pipeline == this || this.hasBranch(pipeline))
  +            throw new IllegalArgumentException("Illegal attempt to set reference to self as a branch (infinite recursion potential)");
           
           this.branches.put(key, pipeline);
       }
       
       
       /**
  -     * Calls {@link StageQueue#start() start()} on
  -     * the {@link StageQueue} of each stage in the pipeline in the order they were added.
  +     * Runs the pipeline from start to finish.
        */
  -    public final void start() {
  -        if (!stages.isEmpty()) ((Stage) stages.get(0)).startAll();
  -        for (Iterator iter = branches.values().iterator(); iter.hasNext();) ((Pipeline) iter.next()).start();
  +    public void run() {
  +        try {
  +            start();
  +            finish();
  +        }
  +        catch (InterruptedException e) {
  +            throw new RuntimeException(e.getMessage(), e);
  +        }
       }
       
       
       /**
  -     * Calls {@link StageQueue#finish() finish()} 
  -     * on the {@link StageQueue} of each stage in the order they were added to the pipeline. 
  -     * The {@link StageQueue#finish() finish()}
  -     * method blocks until the stage's queue is exhausted, so this method
  +     * This method iterates over the stages in the pipeline, looking up a {@link StageDriver}
  +     * for each stage and using that driver to start the stage. Startups
  +     * may occur sequentially or in parallel, depending upon the stage driver
  +     * used.
  +     */
  +    public void start() {
  +        for (OrderedMapIterator iter = stages.orderedMapIterator(); iter.hasNext();) {
  +            Stage stage = (Stage) iter.next();
  +            StageDriver driver = (StageDriver) iter.getValue();
  +            driver.start(stage);
  +        }
  +        
  +        for (Pipeline branch : branches.values()) {
  +            branch.start();
  +        }
  +    }
  +    
  +    
  +    /**
  +     * This method iterates over the stages in the pipeline, looking up a {@link StageDriver}
  +     * for each stage and using that driver to request that the stage finish
  +     * execution. The {@link StageDriver#finish(Stage)}
  +     * method will block until the stage's queue is exhausted, so this method
        * may be used to safely finalize all stages without the risk of
        * losing data in the queues.
        *
        * @throws InterruptedException if a worker thread was interrupted at the time
        * a stage was asked to finish execution.
        */
  -    public final void finish() throws InterruptedException {
  -        if (!stages.isEmpty()) ((Stage) stages.get(0)).finishAll();
  -        for (Iterator iter = branches.values().iterator(); iter.hasNext();) ((Pipeline) iter.next()).finish();
  +    public void finish() throws InterruptedException {
  +        for (OrderedMapIterator iter = stages.orderedMapIterator(); iter.hasNext();) {
  +            Stage stage = (Stage) iter.next();
  +            StageDriver driver = (StageDriver) iter.getValue();
  +            driver.finish(stage);
  +        }
  +        
  +        for (Pipeline pipeline : branches.values()) {
  +            pipeline.finish();
  +        }
       }
       
       
  @@ -97,8 +175,31 @@
        * Enqueues an object on the first stage if the pipeline is not empty
        * @param o the object to enque
        */
  -    public final void enqueue(Object o){
  -        if (!stages.isEmpty()) ((Stage) stages.get(0)).enqueue(o);
  +    public void enqueue(Object o){
  +        if (!stages.isEmpty()) ((Stage) stages.firstKey()).enqueue(o);
  +    }
  +    
  +    
  +    /**
  +     * This method is used by stages to pass data from one stage to the next.
  +     */
  +    public void pass(Stage source, Object data) {
  +        Stage next = (Stage) this.stages.nextKey(source);
  +        if (next != null) next.enqueue(data);
  +    }
  +    
  +    
  +    /**
  +     * Simple method that recursively checks whether the specified
  +     * pipeline is a branch of this pipeline.
  +     */
  +    private boolean hasBranch(Pipeline pipeline) {
  +        if (branches.containsValue(pipeline)) return true;
  +        for (Pipeline branch : branches.values()) {
  +            if (branch.hasBranch(pipeline)) return true;
  +        }
  +        
  +        return false;
       }
       
       
  @@ -106,7 +207,7 @@
        * Adds an EventListener to the pipline that will be notified by calls
        * to {@link Stage#raise(StageEvent)}.
        */
  -    public final void addEventListener(StageEventListener listener) {
  +    public void addEventListener(StageEventListener listener) {
           listeners.add(listener);
       }
       
  @@ -115,7 +216,7 @@
        * Sequentially notifies each listener in the list of an event, and propagates
        * the event to any attached branches
        */
  -    private void notifyListeners(final StageEvent ev) {
  +    public void notifyListeners(final java.util.EventObject ev) {
           new Thread() {
               public void run() {
                   for (Iterator iter = listeners.iterator(); iter.hasNext();) {
  @@ -127,106 +228,5 @@
                   }
               }
           }.start();
  -    }
  -    
  -        
  -    /**
  -     * This abstract base class provides a foundation for processing stages in 
  -     * the pipeline.
  -     *
  -     * @todo This should probably be a non-static inner class so that
  -     * we can avoid the absolute reference to the enclosing Pipeline if somebody
  -     * can figure out how to properly handle the constructor using Digester.
  -     */
  -    public static abstract class Stage implements StageHandler {
  -        private StageQueue queue;
  -        private Pipeline pipeline;
  -        private Stage next;
  -        
  -        
  -        /** Builds a new stage that wraps the specified StageQueue */
  -        public Stage(StageQueue queue) {
  -            queue.setStageHandler(this);
  -            this.queue = queue;
  -        }
  -        
  -        
  -        /**
  -         * This method recursively starts each element in the process chain in sequence.
  -         */
  -        private final void startAll() throws IllegalThreadStateException {
  -            System.out.println("Starting " + this.getClass().getName());
  -            queue.start();
  -            if (next != null) next.startAll();
  -        }
  -        
  -        
  -        /**
  -         * Calls the finish() method on the wrapped worker queue (which waits for the
  -         * worker thread(s) to die) then calls the next chain element's finishAll() method.
  -         * This method attempts to finish all threads even if exceptions are thrown.
  -         */
  -        private final void finishAll() throws InterruptedException {
  -            try {
  -                queue.finish();
  -            }
  -            finally {
  -                if (next != null) next.finishAll();
  -            }
  -        }
  -        
  -        
  -        /**
  -         * Delegate method of the wrapped {@link StageQueue}.
  -         */
  -        public void enqueue(Object obj) {
  -            queue.enqueue(obj);
  -        }
  -        
  -        
  -        /**
  -         * Enqueues the specified object onto the next stage in the pipeline
  -         * if such a stage exists.
  -         */
  -        public void exqueue(Object obj) {
  -            if (this.next != null) this.next.enqueue(obj);
  -        }
  -        
  -        
  -        /**
  -         * Enqueues the specified object onto the first stage in the pipeline 
  -         * branch corresponding to the specified key, if such a brach exists.
  -         */
  -        public void exqueue(String key, Object obj) {
  -            Pipeline branch = (Pipeline) this.pipeline.branches.get(key);
  -            if (branch != null && !branch.stages.isEmpty()) {
  -                ((Stage) branch.stages.get(0)).enqueue(obj);
  -            }
  -        }
  -        
  -        
  -        /**
  -         * Raises an event on the pipeline. Any listeners registered with the pipeline
  -         * will be notified.
  -         */
  -        public final void raise(StageEvent ev) {
  -            this.pipeline.notifyListeners(ev);
  -        }
  -        
  -        /** Do nothing default implementation */
  -        public void process(Object obj) {
  -        }
  -        
  -        /** Do nothing default implementation */
  -        public void release() {
  -        }
  -        
  -        /** Do nothing default implementation */
  -        public void postprocess() {
  -        }
  -        
  -        /** Do nothing default implementation */
  -        public void preprocess() {
  -        }        
       }
   }
  
  
  
  1.2       +3 -2      jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/StageEventListener.java
  
  Index: StageEventListener.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/StageEventListener.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- StageEventListener.java	2 Oct 2004 19:01:19 -0000	1.1
  +++ StageEventListener.java	26 Nov 2004 10:53:53 -0000	1.2
  @@ -17,6 +17,7 @@
   package org.apache.commons.pipeline;
   
   import java.util.EventListener;
  +import java.util.EventObject;
   
   /**
    * Listener interface for {@link StageEvent}s
  @@ -29,5 +30,5 @@
       /**
        * Notify this listener of a {@link StageEvent}
        */
  -    public abstract void notify(StageEvent ev);
  +    public abstract void notify(EventObject ev);
   }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/BaseStage.java
  
  Index: BaseStage.java
  ===================================================================
  /*
   * BaseStage.java
   *
   * Created on October 12, 2004, 11:31 AM
   */
  
  package org.apache.commons.pipeline;
  
  import java.util.Queue;
  import java.util.concurrent.LinkedBlockingQueue;
  
  /**
   *
   * @author  kjn
   */
  public class BaseStage extends Stage {
      
      /** Creates a new instance of BaseStage using an unbounded BlockingQueue. */
      public BaseStage() {
          this(new LinkedBlockingQueue<Object>());
      }
      
      
      /** Creates a new instance of BaseStage with the specified queue. */
      public BaseStage(Queue<Object> queue) {
          super(queue);
      }
      
      /**
       * No-op implementation. This method should be overridden to provide
       * preprocessing capability for the stage.
       *
       * @throws StageException an Exception thrown by the implementation should
       * be wrapped in a {@link StageException}
       * @see Stage#preprocess()
       */
      public void preprocess() throws StageException {
      }
      
      /**
       * The only operation performed by this implementation of process()
       * is to exqueue the specified object, passing it to the subsequent stage.
       * This method should be overridden to provide
       * processing capability for the stage.
       *
       * @throws ClassCastException if the object is of an incorrect type
       * for the processing operation
       * @throws StageException an Exception thrown by the implementation should
       * be wrapped in a {@link StageException}
       */
      public void process(Object obj) throws StageException {
          this.exqueue(obj);
      }
      
      /**
       * No-op implementation. This method should be overridden to provide
       * postprocessing capability for the stage.
       *
       * @throws StageException an Exception thrown by the implementation should
       * be wrapped in a {@link StageException}
       */
      public void postprocess() throws StageException {
      }
      
      /**
       * No-op implementation. This method should be overridden to provide
       * resource release capability for the stage.
       *
       * Implementations overriding this method should clean up any lingering resources
       * that might otherwise be left allocated if an exception is thrown during
       * processing.
       * @see Stage#release()
       */
      public void release() {
      }
      
      
      
  }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/PipelineCreationException.java
  
  Index: PipelineCreationException.java
  ===================================================================
  /*
   * PipelineCreationException.java
   *
   * Created on October 4, 2004, 1:35 PM
   */
  
  package org.apache.commons.pipeline;
  
  /**
   *
   * @author  kjn
   */
  public class PipelineCreationException extends java.lang.Exception {
      
      /**
       * Creates a new instance of <code>PipelineCreationException</code> without detail message.
       */
      public PipelineCreationException() {
      }
      
      
      /**
       * Constructs an instance of <code>PipelineCreationException</code> with the specified detail message.
       * @param msg the detail message.
       */
      public PipelineCreationException(String msg) {
          super(msg);
      }
      
      /**
       * Constructs an instance of <code>PipelineCreationException</code> with the specified detail message.
       * @param msg the detail message.
       */
      public PipelineCreationException(String msg, Throwable cause) {
          super(msg, cause);
      }
      
  }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/PipelineFactory.java
  
  Index: PipelineFactory.java
  ===================================================================
  /*
   * PipelineFactory.java
   *
   * Created on October 4, 2004, 1:22 PM
   */
  
  package org.apache.commons.pipeline;
  
  import java.util.Map;
  
  /**
   * Simple factory interface for creating pipelines.
   *
   * @author  kjn
   */
  public interface PipelineFactory {
      /** Returns a Pipeline created by the factory. */
      public Pipeline createPipeline() throws PipelineCreationException;
      
      /** Configure the factory */
      public void configure(Map<String,?> context);
  }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/Stage.java
  
  Index: Stage.java
  ===================================================================
  /*
   * Stage.java
   *
   * Created on November 18, 2004, 10:34 AM
   */
  
  package org.apache.commons.pipeline;
  
  import java.util.*;
  
  /**
   * <P>
   * A Stage represents a set of tasks that can be performed on objects
   * in a queue, and methods used to communicate with other stages
   * in a {@link Pipeline}.
   * </P>
   * <P>
   * A Stage must provide a unique {@link StageMonitor} object to allow for
   * proper handling of multiple processing threads to the {@link StageDriver}
   * that runs the stage. Because Stage does not specify the exact behavior of the
   * queue (whether it is capacity-bounded or automatically synchronizes accesses,
   * etc) the monitor is necessary to provide proper synchronization.
   * </P>
   * <P>
   * Stages extending this abstract base class automatically establish a relationship
   * with a pipeline when added to that pipeline.
   * </P>
   *
   * @author  Kris Nuttycombe
   */
  public abstract class Stage {
      private Queue<Object> queue;
      protected Pipeline pipeline;
      protected StageMonitor monitor;
      
      /**
       * Builds a new stage with the specified queue.
       */
      public Stage(Queue<Object> queue) {
          this.queue = queue;
          this.monitor =  new StageMonitor();
      }
      
      /**
       * Default implementation of setPipeline. This method may be overridden
       * to provide additional initialization functions for when the stage
       * is added to a pipeline.
       */
      protected void setPipeline(Pipeline pipeline) {
          if (this.pipeline != null) throw new IllegalStateException("A pipeline has already been associated with this stage.");
          this.pipeline = pipeline;
      }
      
      /**
       * Enqueues an object on the wrapped queue. Classes that override this
       * method must also override {@link #poll()}.
       */
      public void enqueue(Object obj) {
          queue.add(obj);
          this.monitor.enqueueOccurred();
      }
      
      /**
       * Retrieves an object from the head of the wrapped queue, or null
       * if the queue is empty. Classes that override this method must also
       * override {@link #enqueue(Object)}
       */
      public Object poll() {
          synchronized (queue) {
              return queue.poll();
          }
      }
      
      /**
       * Enqueues the specified object onto the next stage in the pipeline
       * if such a stage exists.
       */
      public final void exqueue(Object obj) {
          this.pipeline.pass(this, obj);
      }
      
      /**
       * Enqueues the specified object onto the first stage in the pipeline
       * branch corresponding to the specified key, if such a brach exists.
       */
      public final void exqueue(String key, Object obj) {
          Pipeline branch = (Pipeline) this.pipeline.branches.get(key);
          if (branch != null && !branch.stages.isEmpty()) {
              ((Stage) branch.stages.firstKey()).enqueue(obj);
          }
      }
      
      /**
       * Raises an event on the pipeline. Any listeners registered with the pipeline
       * will be notified.
       */
      public final void raise(java.util.EventObject ev) {
          this.pipeline.notifyListeners(ev);
      }
      
      /**
       * Getter for wrapped queue.
       * @return Value of property queue.
       */
      public Queue getQueue() {
          return this.queue;
      }
      
      /**
       * Setter for wrapped queue.
       * @param queue New value of property queue.
       */
      public void setQueue(Queue<Object> queue) {
          this.queue = queue;
      }
      
      /**
       * Returns the monitor for this stage.
       */
      public StageMonitor getMonitor() {
          return this.monitor;
      }
      
      /**
       * Implementations of this method should perform any necessary setup that
       * needs to be done before any data is processed from this Stage's queue.
       *
       * @throws StageException an Exception thrown by the implementation should
       * be wrapped in a {@link StageException}
       */
      public abstract void preprocess() throws StageException;
      
      /**
       * Implementations of this method should atomically process a single data
       * object.
       *
       * @throws ClassCastException if the object is of an incorrect type
       * for the processing operation
       * @throws StageException an Exception thrown by the implementation should
       * be wrapped in a {@link StageException}
       */
      public abstract void process(Object obj) throws StageException;
      
      /**
       * Implementations of this method should do any additional processing or
       * finalization necessary after all data has been processed. This method
       * usually runs following a call to the implementing {@link org.apache.commons.pipeline.Pipeline$Stage Stage}'s
       * {@link StageQueue#finish()} method.
       *
       * @throws StageException an Exception thrown by the implementation should
       * be wrapped in a {@link StageException}
       */
      public abstract void postprocess() throws StageException;
      
      /**
       * Implementations of this method should clean up any lingering resources
       * that might otherwise be left allocated if an exception is thrown during
       * processing.
       */
      public abstract void release();
      
      /**
       * Stages may not further override hashCode(). This is necessary to maintain stage
       * ordering integrity within the pipeline.
       */
      public final int hashCode() {
          int retValue;
          
          retValue = super.hashCode();
          return retValue;
      }
  }
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/StageDriver.java
  
  Index: StageDriver.java
  ===================================================================
  /*
   * StageRunner.java
   *
   * Created on October 6, 2004, 4:30 PM
   */
  
  package org.apache.commons.pipeline;
  
  import org.apache.commons.pipeline.Stage;
  
  
  /**
   *
   * @author  kjn
   */
  public interface StageDriver {
      /**
       * Creates and starts new worker thread(s) to process items in the queue.
       * Implementations of this method must call {@link StageMonitor#driverStarting()}
       * on the specified stage's monitor.
       */
      public void start(Stage stage) throws IllegalThreadStateException;
      
      
      /**
       * This method waits for the queue to empty and any processor thread(s) to exit
       * cleanly and then calls release() to release any resources acquired during processing, if possible.
       * Implementations of this method must call {@link StageMonitor#driverStopped()}
       * on the specified stage's monitor upon completion.
       */
      public void finish(Stage stage) throws InterruptedException;    
  }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/StageMonitor.java
  
  Index: StageMonitor.java
  ===================================================================
  /*
   *   Copyright 2004 The Apache Software Foundation
   *
   *   Licensed under the Apache License, Version 2.0 (the "License");
   *   you may not use this file except in compliance with the License.
   *   You may obtain a copy of the License at
   *
   *       http://www.apache.org/licenses/LICENSE-2.0
   *
   *   Unless required by applicable law or agreed to in writing, software
   *   distributed under the License is distributed on an "AS IS" BASIS,
   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *   See the License for the specific language governing permissions and
   *   limitations under the License.
   *
   */
  
  package org.apache.commons.pipeline;
  
  
  import java.util.*;
  
  /**
   * A monitor used to control concurrent processing of data in a stage.
   *
   * @author <a href="mailto:directory-dev@incubator.apache.org">Apache Directory Project</a>
   * @version $Rev$
   */
  public class StageMonitor {
      /** Enumeration of possible states for the stage. */
      public enum Status { STARTING, RUNNING, STOP_REQUESTED, STOPPED }
      private Status status = Status.STOPPED;
      private List<Throwable> errors = new ArrayList<Throwable>();
      
      /**
       * StageDriver has been requested to start stage processing.
       * Implementations of this method should change the monitor's status to
       * {@link Status#STARTING}.
       */
      public synchronized void startRequested() {
          if (this.status == Status.STOPPED) this.status = Status.STARTING;
      }
      
      /**
       * StageDriver has started execution.
       * Implementations of this method should change the monitor's status to
       * {@link Status#RUNNING}.
       */
      public synchronized void driverStarted() {
          if (this.status == Status.STOPPED || this.status == Status.STARTING) this.status = Status.RUNNING;
      }
      
      /**
       * StageDriver has been requested to halt stage processing.
       * Implementations of this method should change the monitor's status to
       * {@link Status#STOPPING}.
       */
      public synchronized void stopRequested() {
          this.status = Status.STOP_REQUESTED;
          this.notifyAll();
      }
      
      /**
       * StageDriver has finished execution.
       * Implementations of this method should change the monitor's status to
       * {@link Status#STOPPED}.
       */
      public synchronized void driverStopped() {
          this.status = Status.STOPPED;
      }
      
      /**
       * Monitor for successful enqueue operations on the stage. Implementations
       * overriding this method must call {@link Object#notifyAll() this.notifyAll()} to
       * ensure that any threads waiting on this monitor are notified.
       */
      public synchronized void enqueueOccurred() {
          this.notifyAll();
      }
      
      /**
       * Monitors driver thread interruption failures.
       *
       * @param fault the faulting exception
       */
      public void driverFailed( InterruptedException fault ) {
          this.errors.add(fault);
      }
      
      /**
       * Monitors handler failures.
       *
       * @param data the data that was being processed as the fault occurred
       * @param fault the faulting exception
       */
      public void processingFailed( Object data, Throwable fault ) {
          this.errors.add(fault);
      }
      
      /**
       * Returns the current status of stage processing.
       */
      public synchronized Status status() {
          return this.status;
      }
      
      /**
       * Returns a list of errors recorded by this monitor
       */
      public List<Throwable> getErrors() {
          return errors;
      }
  }
  
  
  
  
  1.2       +34 -33    jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/config/PipelineRuleSet.java
  
  Index: PipelineRuleSet.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/config/PipelineRuleSet.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- PipelineRuleSet.java	2 Oct 2004 19:01:19 -0000	1.1
  +++ PipelineRuleSet.java	26 Nov 2004 10:53:54 -0000	1.2
  @@ -18,13 +18,9 @@
   
   package org.apache.commons.pipeline.config;
   
  -import java.lang.reflect.Constructor;
  -import java.util.Iterator;
   import java.util.List;
   import org.apache.commons.digester.*;
  -import org.apache.commons.pipeline.Pipeline;
  -import org.apache.commons.pipeline.StageQueue;
  -import org.apache.commons.pipeline.impl.SingleThreadStageQueue;
  +import org.apache.commons.pipeline.*;
   import org.xml.sax.Attributes;
   
   
  @@ -70,14 +66,15 @@
    * tag processing.
    */
   public class PipelineRuleSet extends RuleSetBase {
  -    private List nestedRuleSets;
  +    private static Class[] addBranchTypes = { String.class, Pipeline.class };
  +    private List<RuleSet> nestedRuleSets;
       
       /** Creates a new instance of ChainRuleSet */
       public PipelineRuleSet() {
       }
       
       /** Creates a new instance of ChainRuleSet */
  -    public PipelineRuleSet(List nestedRuleSets) {
  +    public PipelineRuleSet(List<RuleSet> nestedRuleSets) {
           this.nestedRuleSets = nestedRuleSets;
       }
       
  @@ -88,44 +85,39 @@
       public void addRuleInstances(Digester digester) {
           ObjectCreationFactory factory = new PipelineFactory();
           
  +        //rules to create pipeline
           digester.addFactoryCreate("pipeline", factory);
           digester.addSetProperties("pipeline");
  -        
  +                
           // these rules are used to add subchains to the main pipeline
           digester.addFactoryCreate("*/branch/pipeline", factory);
  -        digester.addRule("*/branch/pipeline", new CallMethodRule(1, "addBranch", 2, new Class[] { String.class, Pipeline.class }));
  +        digester.addRule("*/branch/pipeline", new CallMethodRule(1, "addBranch", 2, addBranchTypes));
           digester.addCallParam("*/branch/pipeline", 0, "key");
           digester.addCallParam("*/branch/pipeline", 1, 0);
           
           //this rule is intended to be used to add a pipeline element. the ChainLogger is
           //simply the default if no pipeline element class is specified
  -        digester.addFactoryCreate("*/pipeline/stage", StageFactory.class, "stageFactory", false);
  +        digester.addObjectCreate("*/pipeline/stage", "org.apache.commons.pipeline.BaseStage", "className");
           digester.addSetProperties("*/pipeline/stage");
  -        digester.addSetNext("*/pipeline/stage", "addStage", "org.apache.commons.pipeline.Pipeline$Stage");
  +        digester.addRule("*/pipeline/stage", new CallMethodRule(1, "addStage", 2, new Class[] { Stage.class, StageDriver.class }));
  +        digester.addCallParam("*/pipeline/stage", 0, true);
  +        
  +        //this rule is used to create a stage driver for a specific stage
  +        digester.addObjectCreate("*/pipeline/stage/stageDriver", "org.apache.commons.pipeline.impl.SingleThreadStageDriver", "className");
  +        digester.addSetProperties("*/pipeline/stage/stageDriver");
  +        digester.addCallParam("*/pipeline/stage/stageDriver", 1, true);
           
  -        //rule for enqueuing string
  +        //rule for enqueuing string onto the first stage in a pipeline
           digester.addCallMethod("*/stage/enqueue/value", "enqueue", 0);
  -    }
  -    
  -    
  -    public static class StageFactory extends AbstractObjectCreationFactory {
  -        private static final Class[] DEFAULT_STAGE_CONSTRUCTOR_PARAMCLASSES = { StageQueue.class };
           
  -        public Object createObject(Attributes attributes) throws java.lang.Exception {
  -            String queueClassName = attributes.getValue("queueClass");
  -            Class queueClass = (queueClassName == null) ? SingleThreadStageQueue.class : Class.forName(queueClassName);
  -            
  -            String stageClassName = attributes.getValue("className");
  -            if (stageClassName == null) throw new IllegalArgumentException("className attribute may not be null for element <stage>");
  -            Class stageClass = Class.forName(stageClassName);
  -            
  -            Constructor constructor = stageClass.getConstructor(DEFAULT_STAGE_CONSTRUCTOR_PARAMCLASSES);
  -            return constructor.newInstance(new Object[] {queueClass.newInstance()});
  -        }       
  +        //rules for enqueueing an object
  +        digester.addObjectCreate("*/stage/enqueue/object", "java.lang.Object", "className");
  +        digester.addSetProperties("*/stage/enqueue/object");
  +        digester.addSetNext("*/stage/enqueue/object", "enqueue", "java.lang.Object");
       }
  +            
       
  -    
  -    public class PipelineFactory extends AbstractObjectCreationFactory {        
  +    private class PipelineFactory extends AbstractObjectCreationFactory {        
           public Object createObject(Attributes attributes) throws java.lang.Exception {
               String configURI = attributes.getValue("configURI");
               if (configURI == null) {
  @@ -134,8 +126,8 @@
               else {
                   Digester subDigester = new Digester();
                   if (nestedRuleSets != null) {
  -                    for (Iterator iter = nestedRuleSets.iterator(); iter.hasNext();) {
  -                        subDigester.addRuleSet((RuleSet) iter.next());
  +                    for (RuleSet ruleset : nestedRuleSets) {
  +                        subDigester.addRuleSet(ruleset);
                       }
                       
                       Pipeline pipeline = (Pipeline) subDigester.parse(configURI);
  @@ -146,5 +138,14 @@
                   }
               }
           }
  +    }
  +    
  +    
  +    private class StageCompletionRule extends Rule {
  +        public void end(String namespace, String name) throws Exception {
  +
  +            super.end(namespace, name);
  +        }
  +        
       }
   }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java
  
  Index: DigesterPipelineFactory.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   *
   * Created on February 12, 2004, 3:42 PM
   */
  
  package org.apache.commons.pipeline.config;
  
  import java.io.File;
  import java.io.InputStream;
  import java.util.ArrayList;
  import java.util.Iterator;
  import java.util.List;
  import org.apache.commons.digester.Digester;
  import org.apache.commons.digester.RuleSet;
  import org.apache.commons.pipeline.PipelineCreationException;
  import org.apache.commons.pipeline.Pipeline;
  
  
  /**
   * This factory is designed to simplify creating a pipeline using Digester.
   *
   * @author Kris Nuttycombe, National Geophysical Data Center
   * @version $Revision: 1.1 $
   */
  public class DigesterPipelineFactory implements org.apache.commons.pipeline.PipelineFactory {
      
      /** Digester rule sets used to configure the Digester instance. */
      private List<RuleSet> ruleSets = new ArrayList<RuleSet>();
      
      /**
       * Factory will create a pipeline from the specified Digester configuration file
       * if this filename is not null.
       */
      private String configFile;
      
      /**
       * Factory will create a pipeline from this input stream if it is not null. Useful for instances where
       * a pipeline configuration is being read from inside a jarfile.
       */
      private InputStream configStream;
          
      /** 
       * A factory created by this constructor will create a pipeline from the specified
       * Digester creation file.
       */
      public DigesterPipelineFactory(String configFile) {
          this.init();
          this.configFile = configFile;
      }
      
      
      /**
       * A factory created by this constructor will create a pipeline from the specified
       * input stream. Useful for instances where a pipeline configuration is being read from inside a jarfile.
       */
      public DigesterPipelineFactory(InputStream configStream) {
          this.init();
          this.configStream = configStream;
      }
      
      
      /**
       * Adds the base RuleSets to the digester configuration.
       */
      private void init() {
          //PipelineRuleSet needs a reference to {@link org.apache.commons.digester.RuleSet RuleSet}s
          //used to parse the configuration file in case configuration is split up between multiple
          //files.
          ruleSets.add(new PipelineRuleSet(ruleSets));        
      }
      
      
      /** Creates a new pipeline */
      public Pipeline createPipeline() throws PipelineCreationException {
          try {
              if (this.configFile != null) {
                  Digester digester = new Digester();
                  this.initDigester(digester);
                  
                  File conf = new File(configFile);
                  return (Pipeline) digester.parse(conf);
              }
              else if (this.configStream != null) {
                  Digester digester = new Digester();
                  this.initDigester(digester);
                  
                  return (Pipeline) digester.parse(configStream);
              }
              else {
                  throw new IllegalStateException("No configuration file or stream found.");
              }
          }
          catch (Exception e) {
              throw new PipelineCreationException(e.getMessage(), e);
          }
      }
      
      
      /**
       * Initialize a Digester instance with the rule sets provided to this factory.
       */
      public void initDigester(Digester digester) {
          for (Iterator iter = ruleSets.iterator(); iter.hasNext();) {
              digester.addRuleSet((RuleSet) iter.next());
          }
      }
      
      
      /**
       * Adds a RuleSet to the list of rules available to Digester for parsing
       * the configuration file.
       */
      public void addRuleSet(RuleSet ruleSet) {
          this.ruleSets.add(ruleSet);
      }
      
      
      /**
       * No-op implementation - all configuration information exists in the XML file.
       */
      public void configure(java.util.Map<String,?> context) {
      }
      
      
      /**
       * The simplest possible main method that creates a pipeline from a configuration file,
       * then runs the pipeline processing from start to finish.
       *
       * @param argv the command line arguments
       */
      public static void main(String[] argv) {
          try {
              DigesterPipelineFactory factory = new DigesterPipelineFactory(argv[0]);
              Pipeline pipeline = factory.createPipeline();
              for (int i = 1; i < argv.length; i++) {
                  pipeline.enqueue(argv[i]);
              }
              
              System.out.println("Pipeline created, about to begin processing...");
              
              pipeline.start();
              pipeline.finish();
              
              System.out.println("Pipeline successfully finished processing. See logs for details.");
          }
          catch (Exception e) {
              e.printStackTrace(System.err);
          }
      }
  }
  
  
  
  1.2       +9 -7      jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl/FileFinderStage.java
  
  Index: FileFinderStage.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl/FileFinderStage.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- FileFinderStage.java	2 Oct 2004 19:01:19 -0000	1.1
  +++ FileFinderStage.java	26 Nov 2004 10:53:54 -0000	1.2
  @@ -18,8 +18,8 @@
   
   import java.io.File;
   import java.util.regex.Pattern;
  -import org.apache.commons.pipeline.StageQueue;
  -import org.apache.commons.pipeline.Pipeline.Stage;
  +import java.util.Queue;
  +import org.apache.commons.pipeline.BaseStage;
   import org.apache.log4j.Logger;
   
   /**
  @@ -32,16 +32,18 @@
    * @author Kris Nuttycombe, National Geophysical Data Center
    * @version $Revision$
    */
  -public class FileFinderStage extends Stage {
  +public class FileFinderStage extends BaseStage {
       private static final Logger log = Logger.getLogger(FileFinderStage.class);
       private String filePattern = ".*";
       Pattern pattern;
       
       /** Creates a new instance of FileFinder */
  -    public FileFinderStage(StageQueue queue) {
  -        super(queue);
  -    };
  +    public FileFinderStage() { }
       
  +    /** Creates a new instance of FileFinder that uses the specified queue. */     
  +    public FileFinderStage(Queue<Object> queue) {
  +        super(queue);
  +    }
       
       /**
        * Precompiles the regex pattern for matching against filenames
  
  
  
  1.2       +20 -9     jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl/FtpFileDownloadStage.java
  
  Index: FtpFileDownloadStage.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl/FtpFileDownloadStage.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- FtpFileDownloadStage.java	2 Oct 2004 19:01:19 -0000	1.1
  +++ FtpFileDownloadStage.java	26 Nov 2004 10:53:54 -0000	1.2
  @@ -21,14 +21,13 @@
   import java.io.IOException;
   import java.io.OutputStream;
   import java.util.regex.Pattern;
  +import java.util.Queue;
   import org.apache.commons.net.ftp.FTPClient;
   import org.apache.commons.net.ftp.FTPReply;
  +import org.apache.commons.pipeline.BaseStage;
   import org.apache.commons.pipeline.StageException;
  -import org.apache.commons.pipeline.StageQueue;
  -import org.apache.commons.pipeline.Pipeline.Stage;
   import org.apache.log4j.Logger;
   
  -
   /**
    * This {@link org.apache.commons.pipeline.Pipeline$Stage Stage} provides the 
    * functionality needed to retrieve data from an FTP URL. Multipart responses 
  @@ -37,10 +36,10 @@
    * @author Kris Nuttycombe, National Geophysical Data Center
    * @version $Revision$
    */
  -public class FtpFileDownloadStage extends Stage {
  +public class FtpFileDownloadStage extends BaseStage {
       private static Logger log = Logger.getLogger(FtpFileDownloadStage.class);
       
  -    private String workDir;
  +    private String workDir = "/tmp";
       private File fworkDir;
       private FTPClient client = new FTPClient();
       
  @@ -57,16 +56,28 @@
       /**
        * Default constructor - creates work directory in /tmp
        */
  -    public FtpFileDownloadStage(StageQueue queue) {
  +    public FtpFileDownloadStage() {
  +    }
  +   
  +    /**
  +     * Constructor specifying work directory.
  +     */
  +    public FtpFileDownloadStage(String workDir) {
  +        this.workDir = workDir;
  +    }
  +   
  +    /**
  +     * Default constructor - creates work directory in /tmp
  +     */
  +    public FtpFileDownloadStage(Queue<Object>  queue) {
           super(queue);
  -        this.workDir = "/tmp";
       }
       
       /**
        * Creates a new instance of HttpFileDownload with the specified work directory
        * into which to download files.
        */
  -    public FtpFileDownloadStage(StageQueue queue, String workDir) {
  +    public FtpFileDownloadStage(Queue<Object> queue, String workDir) {
           super(queue);
           this.workDir = workDir;
       }
  
  
  
  1.2       +19 -10    jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl/HttpFileDownloadStage.java
  
  Index: HttpFileDownloadStage.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl/HttpFileDownloadStage.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- HttpFileDownloadStage.java	2 Oct 2004 19:01:19 -0000	1.1
  +++ HttpFileDownloadStage.java	26 Nov 2004 10:53:54 -0000	1.2
  @@ -17,15 +17,16 @@
   package org.apache.commons.pipeline.impl;
   
   import java.io.*;
  +import java.net.HttpURLConnection;
   import java.net.MalformedURLException;
   import java.net.URL;
   import java.util.HashMap;
   import java.util.Map;
  +import java.util.Queue;
   import org.apache.commons.pipeline.StageException;
  -import org.apache.commons.pipeline.StageQueue;
  -import org.apache.commons.pipeline.Pipeline.Stage;
  +import org.apache.commons.pipeline.BaseStage;
   import org.apache.log4j.Logger;
  -import sun.net.www.protocol.http.HttpURLConnection;
  +
   
   /**
    * This {@link org.apache.commons.pipeline.Pipeline$Stage Stage} provides the functionality 
  @@ -34,25 +35,33 @@
    * @author Kris Nuttycombe, National Geophysical Data Center
    * @version $Revision$
    */
  -public class HttpFileDownloadStage extends Stage {
  +public class HttpFileDownloadStage extends BaseStage {
       private static final int BUFFER_SIZE = 10000;
       private static Logger log = Logger.getLogger(HttpFileDownloadStage.class);
  -    private String workDir;
  +    private String workDir = "/tmp";
       private File fworkDir;
       
  +    public HttpFileDownloadStage() { }
  +    
  +    /**
  +     * Creates a new HttpFileDownloadStage with the specified work directory.
  +     */
  +    public HttpFileDownloadStage(String workDir) {
  +        this.workDir = workDir;
  +    }
  +    
       /**
        * Default constructor - creates work directory in /tmp
        */
  -    public HttpFileDownloadStage(StageQueue queue) {
  +    public HttpFileDownloadStage(Queue<Object> queue) {
           super(queue);
  -        this.workDir = "/tmp";
       }
       
       /**
        * Creates a new instance of HttpFileDownload with the specified work directory
        * into which to download files.
        */
  -    public HttpFileDownloadStage(StageQueue queue, String workDir) {
  +    public HttpFileDownloadStage(Queue<Object> queue, String workDir) {
           super(queue);
           this.workDir = workDir;
       }
  @@ -191,7 +200,7 @@
        */
       public static URL handleRedirects(URL url) throws IOException, MalformedURLException {
           java.net.HttpURLConnection.setFollowRedirects(false);
  -        HttpURLConnection con = new HttpURLConnection(url, url.getHost(), url.getPort());
  +        HttpURLConnection con = (HttpURLConnection) url.openConnection();
           int response = con.getResponseCode();
           log.debug("Response code for " + url + " = " + response);
           
  
  
  
  1.2       +11 -5     jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl/LogStage.java
  
  Index: LogStage.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl/LogStage.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- LogStage.java	2 Oct 2004 19:01:19 -0000	1.1
  +++ LogStage.java	26 Nov 2004 10:53:54 -0000	1.2
  @@ -17,9 +17,9 @@
   package org.apache.commons.pipeline.impl;
   
   import org.apache.commons.pipeline.StageException;
  -import org.apache.commons.pipeline.StageQueue;
  -import org.apache.commons.pipeline.Pipeline.Stage;
  +import org.apache.commons.pipeline.BaseStage;
   import org.apache.log4j.Logger;
  +import java.util.Queue;
   
   
   /**
  @@ -28,13 +28,19 @@
    * @author Kris Nuttycombe, National Geophysical Data Center
    * @version $Revision$
    */
  -public class LogStage extends Stage {
  +public class LogStage extends BaseStage {
       private Logger log = Logger.getLogger(this.getClass());
       
       /**
  +     * Creates a new LogStage.
  +     */
  +    public LogStage() {
  +    }
  +    
  +    /**
        * Creates a new LogStage with the specified {@link StageQueue}.
        */
  -    public LogStage(StageQueue queue) {
  +    public LogStage(Queue<Object> queue) {
           super(queue);
       }
       
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl/SingleThreadStageDriver.java
  
  Index: SingleThreadStageDriver.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  package org.apache.commons.pipeline.impl;
  
  import org.apache.commons.pipeline.StageException;
  import org.apache.commons.pipeline.StageDriver;
  import org.apache.commons.pipeline.StageMonitor;
  import org.apache.commons.pipeline.Stage;
  import org.apache.log4j.Logger;
  import java.util.*;
  
  /**
   * This is a very simple implementation of a StageDriver which spawns
   * only a single thread to process a stage. A SingleThreadStageDriver
   * may not be used to manipulate more than a single stage.
   *
   * @author Kris Nuttycombe, National Geophysical Data Center
   * @version $Revision: 1.1 $
   */
  public class SingleThreadStageDriver implements StageDriver {
      private Logger log = Logger.getLogger(this.getClass());
      
      //wait timeout to ensure deadlock cannot occur on thread termination
      private long timeout = 500;
      
      //flag describing whether or not the driver is fault tolerant
      private boolean faultTolerant = false;
      
      //map of stages to worker threads
      private Map<Stage, Thread> threadMap = new HashMap<Stage,Thread>();
          
      /**
       * Default constructor
       */
      public SingleThreadStageDriver() {
      }
      
      /**
       *
       */
      public SingleThreadStageDriver(long timeout, boolean faultTolerant) {
          this.timeout = timeout;
          this.faultTolerant = faultTolerant;
      }
          
      /**
       * Creates and starts a new worker thread to process items in the stage's queue.
       */
      public final void start(Stage stage) throws IllegalThreadStateException {
          StageMonitor monitor = stage.getMonitor();
          synchronized (monitor) {
              if (monitor.status() != StageMonitor.Status.STOPPED) {
                  throw new IllegalThreadStateException("Processor thread has already been started.");
              }
          }
          
          monitor.startRequested();
          
          log.debug("Starting worker thread.");
          Thread workerThread = new WorkerThread(stage);
          workerThread.start();
          this.threadMap.put(stage, workerThread);
          log.debug("Worker thread started.");
      }
      
      
      /**
       * This method waits for the queue to empty and the processor thread to exit
       * cleanly and release any resources acquired during processing, if possible.
       */
      public void finish(Stage stage) throws InterruptedException {
          StageMonitor monitor = stage.getMonitor();
          
          log.debug("Requesting worker thread stop.");
          monitor.stopRequested();
          
          synchronized (monitor) {
              if (monitor.status() == StageMonitor.Status.STOPPED) return;
          }
          
          log.debug("Waiting for worker thread stop.");
          this.threadMap.remove(stage).join();
          log.debug("Worker thread has finished.");
          
          monitor.driverStopped();
      }
      
      
      /**
       * Sets the failure tolerance flag for the worker thread. If faultTolerant
       * is set to true, {@link StageException StageException}s thrown by
       * the process() method will not interrupt queue processing, but will simply
       * be logged with a severity of ERROR.
       */
      public final void setFaultTolerant(boolean faultTolerant) {
          this.faultTolerant = faultTolerant;
      }
      
      /**
       * Getter for property faultTolerant.
       * @return Value of property faultTolerant.
       */
      public boolean isFaultTolerant() {
          return this.faultTolerant;
      }
      
      
      /**
       * This worker thread removes and processes data objects from the incoming
       * queue. It first calls preprocess(), then begins a loop that calls the process()
       * method to process data from the queue. This loop runs as long as the
       * {@link getRunning() running} property is true or the queue is not empty. To break the loop the
       * calling code must run the writer's finish() method to set the running property to false.
       * At this point the loop will continue to run until the queue is empty, then the loop will
       * exit and the postprocess() method is called.<P>
       *
       * @throws StageException if an error is encountered during data processing
       * and faultTolerant is set to false.
       */
      private class WorkerThread extends Thread {
          /** The Stage this thread will work on */
          private Stage stage;               
          
          public WorkerThread(Stage stage) {
              this.stage = stage;
          }
          
          public final void run() {
              StageMonitor monitor = stage.getMonitor();
              try {
                  log.debug("preprocessing...");
                  stage.preprocess();
                  
                  monitor.driverStarted();
                  
                  running: while (true) {
                      Object obj = null;
                      try {
                          obj = stage.poll();
                          if (obj == null) {
                              synchronized (monitor) {
                                  log.debug("Monitor status is: " + monitor.status());
                                  if (monitor.status() == StageMonitor.Status.STOP_REQUESTED) break running;
                                  monitor.wait(timeout);
                                  continue running;
                              }
                          }
                          else {
                              stage.process(obj);
                          }
                      }
                      catch (InterruptedException e) {
                          monitor.driverFailed(e);
                          throw new RuntimeException("Driver thread unexpectedly interrupted.", e);
                      }
                      catch (Throwable t) {
                          monitor.processingFailed(obj, t);
                          if (!faultTolerant) {
                              log.error("Aborting due to error.", t);
                              throw new RuntimeException("Stage processing failed, check monitor for details.", t);
                          }
                      }
                  }
                  
                  log.debug("postprocessing...");
                  stage.postprocess();
              }
              finally {
                  stage.release();
              }
          }
      }
  }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: commons-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: commons-dev-help@jakarta.apache.org


Mime
View raw message