commons-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From craig...@apache.org
Subject cvs commit: jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl FileFinderStage.java FtpFileDownloadStage.java HttpFileDownloadStage.java LogStage.java SingleThreadStageQueue.java package.html
Date Sat, 02 Oct 2004 19:01:20 GMT
craigmcc    2004/10/02 12:01:20

  Added:       pipeline .cvsignore LICENSE-2.0.txt NOTICE.txt build.xml
                        maven.xml project.xml
               pipeline/src/java/org/apache/commons/pipeline Pipeline.java
                        StageEvent.java StageEventListener.java
                        StageException.java StageHandler.java
                        StageQueue.java package.html
               pipeline/src/java/org/apache/commons/pipeline/config
                        PipelineFactory.java PipelineRuleSet.java
                        package.html
               pipeline/src/java/org/apache/commons/pipeline/impl
                        FileFinderStage.java FtpFileDownloadStage.java
                        HttpFileDownloadStage.java LogStage.java
                        SingleThreadStageQueue.java package.html
  Log:
  Staged pipeline implementation package, per discussion on the COMMONS-DEV
  list.  This code was proposed and contributed by Kris Nuttycombe of NOAA
  (CLA is on file with the ASF).  Beginning tonight, nightly builds of this
  package will be available at:
  
    http://cvs.apache.org/builds/jakarta-commons/nightly/commons-pipeline/
  
  Revision  Changes    Path
  1.1                  jakarta-commons-sandbox/pipeline/.cvsignore
  
  Index: .cvsignore
  ===================================================================
  .nbattrs
  dist
  target
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/LICENSE-2.0.txt
  
  Index: LICENSE-2.0.txt
  ===================================================================
  
                                   Apache License
                             Version 2.0, January 2004
                          http://www.apache.org/licenses/
  
     TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
  
     1. Definitions.
  
        "License" shall mean the terms and conditions for use, reproduction,
        and distribution as defined by Sections 1 through 9 of this document.
  
        "Licensor" shall mean the copyright owner or entity authorized by
        the copyright owner that is granting the License.
  
        "Legal Entity" shall mean the union of the acting entity and all
        other entities that control, are controlled by, or are under common
        control with that entity. For the purposes of this definition,
        "control" means (i) the power, direct or indirect, to cause the
        direction or management of such entity, whether by contract or
        otherwise, or (ii) ownership of fifty percent (50%) or more of the
        outstanding shares, or (iii) beneficial ownership of such entity.
  
        "You" (or "Your") shall mean an individual or Legal Entity
        exercising permissions granted by this License.
  
        "Source" form shall mean the preferred form for making modifications,
        including but not limited to software source code, documentation
        source, and configuration files.
  
        "Object" form shall mean any form resulting from mechanical
        transformation or translation of a Source form, including but
        not limited to compiled object code, generated documentation,
        and conversions to other media types.
  
        "Work" shall mean the work of authorship, whether in Source or
        Object form, made available under the License, as indicated by a
        copyright notice that is included in or attached to the work
        (an example is provided in the Appendix below).
  
        "Derivative Works" shall mean any work, whether in Source or Object
        form, that is based on (or derived from) the Work and for which the
        editorial revisions, annotations, elaborations, or other modifications
        represent, as a whole, an original work of authorship. For the purposes
        of this License, Derivative Works shall not include works that remain
        separable from, or merely link (or bind by name) to the interfaces of,
        the Work and Derivative Works thereof.
  
        "Contribution" shall mean any work of authorship, including
        the original version of the Work and any modifications or additions
        to that Work or Derivative Works thereof, that is intentionally
        submitted to Licensor for inclusion in the Work by the copyright owner
        or by an individual or Legal Entity authorized to submit on behalf of
        the copyright owner. For the purposes of this definition, "submitted"
        means any form of electronic, verbal, or written communication sent
        to the Licensor or its representatives, including but not limited to
        communication on electronic mailing lists, source code control systems,
        and issue tracking systems that are managed by, or on behalf of, the
        Licensor for the purpose of discussing and improving the Work, but
        excluding communication that is conspicuously marked or otherwise
        designated in writing by the copyright owner as "Not a Contribution."
  
        "Contributor" shall mean Licensor and any individual or Legal Entity
        on behalf of whom a Contribution has been received by Licensor and
        subsequently incorporated within the Work.
  
     2. Grant of Copyright License. Subject to the terms and conditions of
        this License, each Contributor hereby grants to You a perpetual,
        worldwide, non-exclusive, no-charge, royalty-free, irrevocable
        copyright license to reproduce, prepare Derivative Works of,
        publicly display, publicly perform, sublicense, and distribute the
        Work and such Derivative Works in Source or Object form.
  
     3. Grant of Patent License. Subject to the terms and conditions of
        this License, each Contributor hereby grants to You a perpetual,
        worldwide, non-exclusive, no-charge, royalty-free, irrevocable
        (except as stated in this section) patent license to make, have made,
        use, offer to sell, sell, import, and otherwise transfer the Work,
        where such license applies only to those patent claims licensable
        by such Contributor that are necessarily infringed by their
        Contribution(s) alone or by combination of their Contribution(s)
        with the Work to which such Contribution(s) was submitted. If You
        institute patent litigation against any entity (including a
        cross-claim or counterclaim in a lawsuit) alleging that the Work
        or a Contribution incorporated within the Work constitutes direct
        or contributory patent infringement, then any patent licenses
        granted to You under this License for that Work shall terminate
        as of the date such litigation is filed.
  
     4. Redistribution. You may reproduce and distribute copies of the
        Work or Derivative Works thereof in any medium, with or without
        modifications, and in Source or Object form, provided that You
        meet the following conditions:
  
        (a) You must give any other recipients of the Work or
            Derivative Works a copy of this License; and
  
        (b) You must cause any modified files to carry prominent notices
            stating that You changed the files; and
  
        (c) You must retain, in the Source form of any Derivative Works
            that You distribute, all copyright, patent, trademark, and
            attribution notices from the Source form of the Work,
            excluding those notices that do not pertain to any part of
            the Derivative Works; and
  
        (d) If the Work includes a "NOTICE" text file as part of its
            distribution, then any Derivative Works that You distribute must
            include a readable copy of the attribution notices contained
            within such NOTICE file, excluding those notices that do not
            pertain to any part of the Derivative Works, in at least one
            of the following places: within a NOTICE text file distributed
            as part of the Derivative Works; within the Source form or
            documentation, if provided along with the Derivative Works; or,
            within a display generated by the Derivative Works, if and
            wherever such third-party notices normally appear. The contents
            of the NOTICE file are for informational purposes only and
            do not modify the License. You may add Your own attribution
            notices within Derivative Works that You distribute, alongside
            or as an addendum to the NOTICE text from the Work, provided
            that such additional attribution notices cannot be construed
            as modifying the License.
  
        You may add Your own copyright statement to Your modifications and
        may provide additional or different license terms and conditions
        for use, reproduction, or distribution of Your modifications, or
        for any such Derivative Works as a whole, provided Your use,
        reproduction, and distribution of the Work otherwise complies with
        the conditions stated in this License.
  
     5. Submission of Contributions. Unless You explicitly state otherwise,
        any Contribution intentionally submitted for inclusion in the Work
        by You to the Licensor shall be under the terms and conditions of
        this License, without any additional terms or conditions.
        Notwithstanding the above, nothing herein shall supersede or modify
        the terms of any separate license agreement you may have executed
        with Licensor regarding such Contributions.
  
     6. Trademarks. This License does not grant permission to use the trade
        names, trademarks, service marks, or product names of the Licensor,
        except as required for reasonable and customary use in describing the
        origin of the Work and reproducing the content of the NOTICE file.
  
     7. Disclaimer of Warranty. Unless required by applicable law or
        agreed to in writing, Licensor provides the Work (and each
        Contributor provides its Contributions) on an "AS IS" BASIS,
        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
        implied, including, without limitation, any warranties or conditions
        of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
        PARTICULAR PURPOSE. You are solely responsible for determining the
        appropriateness of using or redistributing the Work and assume any
        risks associated with Your exercise of permissions under this License.
  
     8. Limitation of Liability. In no event and under no legal theory,
        whether in tort (including negligence), contract, or otherwise,
        unless required by applicable law (such as deliberate and grossly
        negligent acts) or agreed to in writing, shall any Contributor be
        liable to You for damages, including any direct, indirect, special,
        incidental, or consequential damages of any character arising as a
        result of this License or out of the use or inability to use the
        Work (including but not limited to damages for loss of goodwill,
        work stoppage, computer failure or malfunction, or any and all
        other commercial damages or losses), even if such Contributor
        has been advised of the possibility of such damages.
  
     9. Accepting Warranty or Additional Liability. While redistributing
        the Work or Derivative Works thereof, You may choose to offer,
        and charge a fee for, acceptance of support, warranty, indemnity,
        or other liability obligations and/or rights consistent with this
        License. However, in accepting such obligations, You may act only
        on Your own behalf and on Your sole responsibility, not on behalf
        of any other Contributor, and only if You agree to indemnify,
        defend, and hold each Contributor harmless for any liability
        incurred by, or claims asserted against, such Contributor by reason
        of your accepting any such warranty or additional liability.
  
     END OF TERMS AND CONDITIONS
  
     APPENDIX: How to apply the Apache License to your work.
  
        To apply the Apache License to your work, attach the following
        boilerplate notice, with the fields enclosed by brackets "[]"
        replaced with your own identifying information. (Don't include
        the brackets!)  The text should be enclosed in the appropriate
        comment syntax for the file format. We also recommend that a
        file or class name and description of purpose be included on the
        same "printed page" as the copyright notice for easier
        identification within third-party archives.
  
     Copyright [yyyy] [name of copyright owner]
  
     Licensed under the Apache License, Version 2.0 (the "License");
     you may not use this file except in compliance with the License.
     You may obtain a copy of the License at
  
         http://www.apache.org/licenses/LICENSE-2.0
  
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/NOTICE.txt
  
  Index: NOTICE.txt
  ===================================================================
  This product includes software developed by
  The Apache Software Foundation (http://www.apache.org/).
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/build.xml
  
  Index: build.xml
  ===================================================================
  <?xml version="1.0" encoding="UTF-8"?>
  
  <!--build.xml generated by maven from project.xml version 0.0.1
    on date October 2 2004, time 1145-->
  
  <project default="jar" name="commons-pipeline" basedir=".">
    <property name="defaulttargetdir" value="/home/craigmcc/Apache/jakarta-commons-sandbox/pipeline/target">
    </property>
    <property name="libdir" value="/home/craigmcc/Apache/jakarta-commons-sandbox/pipeline/target/lib">
    </property>
    <property name="classesdir" value="/home/craigmcc/Apache/jakarta-commons-sandbox/pipeline/target/classes">
    </property>
    <property name="testclassesdir" value="/home/craigmcc/Apache/jakarta-commons-sandbox/pipeline/target/test-classes">
    </property>
    <property name="distdir" value="dist">
    </property>
    <property name="javadocdir" value="dist/docs/api">
    </property>
    <property name="final.name" value="commons-pipeline-0.0.1">
    </property>
    <target name="init" description="o Initializes some properties">
      <mkdir dir="${libdir}">
      </mkdir>
      <condition property="noget">
        <equals arg2="only" arg1="${build.sysclasspath}">
        </equals>
      </condition>
    </target>
    <target name="compile" description="o Compile the code" depends="get-deps">
      <mkdir dir="${classesdir}">
      </mkdir>
      <javac destdir="${classesdir}" deprecation="true" debug="true" optimize="false" excludes="**/package.html">
        <src>
          <pathelement location="src/java">
          </pathelement>
        </src>
        <classpath>
          <fileset dir="${libdir}">
            <include name="*.jar">
            </include>
          </fileset>
        </classpath>
      </javac>
    </target>
    <target name="jar" description="o Create the jar" depends="compile,test">
      <jar jarfile="${defaulttargetdir}/${final.name}.jar" excludes="**/package.html" basedir="${classesdir}">
      </jar>
    </target>
    <target name="clean" description="o Clean up the generated directories">
      <delete dir="${defaulttargetdir}">
      </delete>
      <delete dir="${distdir}">
      </delete>
    </target>
    <target name="dist" description="o Create a distribution" depends="jar, javadoc">
      <mkdir dir="dist">
      </mkdir>
      <copy todir="dist">
        <fileset dir="${defaulttargetdir}" includes="*.jar">
        </fileset>
        <fileset dir="${basedir}" includes="LICENSE*, README*">
        </fileset>
      </copy>
    </target>
    <target name="test" description="o Run the test cases" if="test.failure" depends="internal-test">
      <fail message="There were test failures.">
      </fail>
    </target>
    <target name="internal-test" depends="compile-tests">
    </target>
    <target name="compile-tests" depends="compile">
    </target>
    <target name="javadoc" description="o Generate javadoc">
      <mkdir dir="${javadocdir}">
      </mkdir>
      <tstamp>
        <format pattern="2004-yyyy" property="year">
        </format>
      </tstamp>
      <property name="copyright" value="Copyright &amp;copy;  The Apache Software Foundation. All Rights Reserved.">
      </property>
      <property name="title" value="commons-pipeline 0.0.1 API">
      </property>
      <javadoc use="true" private="true" destdir="${javadocdir}" author="true" version="true" sourcepath="src/java" packagenames="org.apache.commons.pipeline.*">
        <classpath>
          <fileset dir="${libdir}">
            <include name="*.jar">
            </include>
          </fileset>
          <pathelement location="${defaulttargetdir}/${final.name}.jar">
          </pathelement>
        </classpath>
      </javadoc>
    </target>
    <target name="get-deps" unless="noget" depends="init">
      <get dest="${libdir}/commons-digester-1.6.jar" usetimestamp="true" ignoreerrors="true" src="http://www.ibiblio.org/maven/commons-digester/jars/commons-digester-1.6.jar">
      </get>
      <get dest="${libdir}/commons-net-1.2.1.jar" usetimestamp="true" ignoreerrors="true" src="http://www.ibiblio.org/maven/commons-net/jars/commons-net-1.2.1.jar">
      </get>
      <get dest="${libdir}/log4j-1.2.8.jar" usetimestamp="true" ignoreerrors="true" src="http://www.ibiblio.org/maven/log4j/jars/log4j-1.2.8.jar">
      </get>
      <get dest="${libdir}/junit-3.8.1.jar" usetimestamp="true" ignoreerrors="true" src="http://www.ibiblio.org/maven/junit/jars/junit-3.8.1.jar">
      </get>
      <get dest="${libdir}/ant-1.5.jar" usetimestamp="true" ignoreerrors="true" src="http://www.ibiblio.org/maven/ant/jars/ant-1.5.jar">
      </get>
      <get dest="${libdir}/ant-optional-1.5.jar" usetimestamp="true" ignoreerrors="true" src="http://www.ibiblio.org/maven/ant/jars/ant-optional-1.5.jar">
      </get>
    </target>
    <target name="install-maven">
      <get dest="${user.home}/maven-install-latest.jar" usetimestamp="true" src="${repo}/maven/maven-install-latest.jar">
      </get>
      <unjar dest="${maven.home}" src="${user.home}/maven-install-latest.jar">
      </unjar>
    </target>
  </project>
  
  
  1.1                  jakarta-commons-sandbox/pipeline/maven.xml
  
  Index: maven.xml
  ===================================================================
  <project default="war"
     xmlns:j="jelly:core"
     xmlns:u="jelly:util"
     xmlns:ant="jelly:ant"
     xmlns:m="jelly:maven">
  
      <postGoal name="xdoc:register-reports">
          <attainGoal name= "maven-changes-plugin:deregister"/>
          <attainGoal name= "maven-checkstyle-plugin:deregister"/>
          <attainGoal name= "maven-license-plugin:deregister"/>
          <attainGoal name= "maven-linkcheck-plugin:deregister"/>
      </postGoal>
  
  </project>
  
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/project.xml
  
  Index: project.xml
  ===================================================================
  <?xml version="1.0"?>
  <project>
    <pomVersion>3</pomVersion>
    <name>commons-pipeline</name>
    <id>commons-pipeline</id>
    <currentVersion>0.0.1</currentVersion>
    <organization>
      <name>The Apache Software Foundation</name>
      <url>http://www.apache.org/</url>
    </organization>
    <inceptionYear>2004</inceptionYear>
    <package>org.apache.commons.pipeline</package>
  
    <shortDescription>A simple pipeline processing framework.</shortDescription>
  
    <description>
      
    </description>
  
    <url>http://jakarta.apache.org/commons/sandbox/pipeline/</url>
    <siteAddress></siteAddress>
    <siteDirectory></siteDirectory>
    <distributionDirectory></distributionDirectory>
  
    <repository>
      <connection></connection>
      <url></url>
    </repository>
  
    <developers>
      <developer>
        <name>Kris Nuttycombe</name>
        <id>kjn</id>
        <email>kris.nuttycombe@noaa.gov</email>
        <organization>National Geophysical Data Center</organization>
      </developer>
    </developers>
  
    <dependencies>
      <dependency>
        <groupId>commons-digester</groupId>
        <artifactId>commons-digester</artifactId>
        <version>1.6</version>
        <url>http://jakarta.apache.org/commons/digester</url>
      </dependency>
  
      <dependency>
        <groupId>commons-net</groupId>
        <artifactId>commons-net</artifactId>
        <version>1.2.1</version>
        <url>http://jakarta.apache.org/commons/net</url>
      </dependency>
  
      <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.8</version>
        <url>http://logging.apache.org/</url>
      </dependency>
    </dependencies>
  
    <build>
      <nagEmailAddress>kris.nuttycombe@noaa.gov </nagEmailAddress>
      <sourceDirectory>src/java</sourceDirectory>
      <unitTestSourceDirectory>src/test</unitTestSourceDirectory>
      <aspectSourceDirectory/>
  
      <!-- Unit test cases -->
      <unitTest>
        <includes>
          <include>**/*Test.java</include>
        </includes>
      </unitTest>
                
      <!-- J A R  R E S O U R C E S -->
      <!-- Resources that are packaged up inside the JAR file -->
      <resources></resources>
  
      <!-- Integration unit test cases -->
      <integrationUnitTest/>
  
      <jars></jars>
    </build>
  </project>
  
  
  
  
  
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/Pipeline.java
  
  Index: Pipeline.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *     http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License. 
   */
  
  package org.apache.commons.pipeline;
  
  import java.util.*;
  
  
  /**
   * This class represents a processing system consisting of a number of stages
   * and branches. Each stage contains a queue and manages one or more threads
   * that process data in that stage's queue and allow processed data to be
   * passed along to subsequent stages and onto branches of the pipeline.<P>
   *
   * This class allows all stages in the pipeline to be managed collectively
   * with methods to start and stop processing for all stages, as well as
   * a simple framework for asynchronous event-based communication between stages.
   *
   * @author Kris Nuttycombe, National Geophysical Data Center
   * @version $Revision: 1.1 $
   */
  public final class Pipeline {
      private List stages = new LinkedList();
      private List listeners = new ArrayList();
      private Map branches = new HashMap();
      
      
      /** Creates a new Pipeline */
      public Pipeline() {  }
      
      
      /**
       * Adds a Stage object to the end of this Pipeline.
       *
       * @todo throw IllegalStateException if the stage is being used in a different pipeline
       */
      public final void addStage(Stage stage) {
          if (!stages.isEmpty()) ((Stage) stages.get(stages.size() - 1)).next = stage;
          stage.pipeline = this;
          stages.add(stage);
      }
      
      
      /**
       * Adds a branch to the pipeline.
       */
      public final void addBranch(String key, Pipeline pipeline) {
          if (key == null) throw new IllegalArgumentException("Branch key may not be null.");
          if (pipeline == null) throw new IllegalArgumentException("Illegal attempt to set reference to null branch.");
          if (pipeline == this) throw new IllegalArgumentException("Illegal attempt to set reference to self as a branch (infinite recursion potential)");
          
          this.branches.put(key, pipeline);
      }
      
      
      /**
       * Calls {@link StageQueue#start() start()} on
       * the {@link StageQueue} of each stage in the pipeline in the order they were added.
       */
      public final void start() {
          if (!stages.isEmpty()) ((Stage) stages.get(0)).startAll();
          for (Iterator iter = branches.values().iterator(); iter.hasNext();) ((Pipeline) iter.next()).start();
      }
      
      
      /**
       * Calls {@link StageQueue#finish() finish()} 
       * on the {@link StageQueue} of each stage in the order they were added to the pipeline. 
       * The {@link StageQueue#finish() finish()}
       * method blocks until the stage's queue is exhausted, so this method
       * may be used to safely finalize all stages without the risk of
       * losing data in the queues.
       *
       * @throws InterruptedException if a worker thread was interrupted at the time
       * a stage was asked to finish execution.
       */
      public final void finish() throws InterruptedException {
          if (!stages.isEmpty()) ((Stage) stages.get(0)).finishAll();
          for (Iterator iter = branches.values().iterator(); iter.hasNext();) ((Pipeline) iter.next()).finish();
      }
      
      
      /**
       * Enqueues an object on the first stage if the pipeline is not empty
       * @param o the object to enque
       */
      public final void enqueue(Object o){
          if (!stages.isEmpty()) ((Stage) stages.get(0)).enqueue(o);
      }
      
      
      /**
       * Adds an EventListener to the pipline that will be notified by calls
       * to {@link Stage#raise(StageEvent)}.
       */
      public final void addEventListener(StageEventListener listener) {
          listeners.add(listener);
      }
      
      
      /**
       * Sequentially notifies each listener in the list of an event, and propagates
       * the event to any attached branches
       */
      private void notifyListeners(final StageEvent ev) {
          new Thread() {
              public void run() {
                  for (Iterator iter = listeners.iterator(); iter.hasNext();) {
                      ((StageEventListener) iter.next()).notify(ev);
                  }
                  
                  for (Iterator iter = branches.values().iterator(); iter.hasNext();) {
                      ((Pipeline) iter.next()).notifyListeners(ev);
                  }
              }
          }.start();
      }
      
          
      /**
       * This abstract base class provides a foundation for processing stages in 
       * the pipeline.
       *
       * @todo This should probably be a non-static inner class so that
       * we can avoid the absolute reference to the enclosing Pipeline if somebody
       * can figure out how to properly handle the constructor using Digester.
       */
      public static abstract class Stage implements StageHandler {
          private StageQueue queue;
          private Pipeline pipeline;
          private Stage next;
          
          
          /** Builds a new stage that wraps the specified StageQueue */
          public Stage(StageQueue queue) {
              queue.setStageHandler(this);
              this.queue = queue;
          }
          
          
          /**
           * This method recursively starts each element in the process chain in sequence.
           */
          private final void startAll() throws IllegalThreadStateException {
              System.out.println("Starting " + this.getClass().getName());
              queue.start();
              if (next != null) next.startAll();
          }
          
          
          /**
           * Calls the finish() method on the wrapped worker queue (which waits for the
           * worker thread(s) to die) then calls the next chain element's finishAll() method.
           * This method attempts to finish all threads even if exceptions are thrown.
           */
          private final void finishAll() throws InterruptedException {
              try {
                  queue.finish();
              }
              finally {
                  if (next != null) next.finishAll();
              }
          }
          
          
          /**
           * Delegate method of the wrapped {@link StageQueue}.
           */
          public void enqueue(Object obj) {
              queue.enqueue(obj);
          }
          
          
          /**
           * Enqueues the specified object onto the next stage in the pipeline
           * if such a stage exists.
           */
          public void exqueue(Object obj) {
              if (this.next != null) this.next.enqueue(obj);
          }
          
          
          /**
           * Enqueues the specified object onto the first stage in the pipeline 
           * branch corresponding to the specified key, if such a brach exists.
           */
          public void exqueue(String key, Object obj) {
              Pipeline branch = (Pipeline) this.pipeline.branches.get(key);
              if (branch != null && !branch.stages.isEmpty()) {
                  ((Stage) branch.stages.get(0)).enqueue(obj);
              }
          }
          
          
          /**
           * Raises an event on the pipeline. Any listeners registered with the pipeline
           * will be notified.
           */
          public final void raise(StageEvent ev) {
              this.pipeline.notifyListeners(ev);
          }
          
          /** Do nothing default implementation */
          public void process(Object obj) {
          }
          
          /** Do nothing default implementation */
          public void release() {
          }
          
          /** Do nothing default implementation */
          public void postprocess() {
          }
          
          /** Do nothing default implementation */
          public void preprocess() {
          }        
      }
  }
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/StageEvent.java
  
  Index: StageEvent.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *     http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License. 
   */
  
  package org.apache.commons.pipeline;
  
  import java.util.EventObject;
  import org.apache.commons.pipeline.Pipeline.Stage;
  
  /**
   * This is a simple base class for stage events.
   *
   * @author Kris Nuttycombe, National Geophysical Data Center
   * @version $Revision: 1.1 $
   */
  public class StageEvent extends EventObject {
      private Object eventData;
      
      /** Creates a new instance of ProcessEvent */
      public StageEvent(Stage source, Object eventData) {
          super(source);
          this.eventData = eventData;
      }
      
      public Object getEventData() {
          return this.eventData;
      }    
  }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/StageEventListener.java
  
  Index: StageEventListener.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *     http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License. 
   */
  
  package org.apache.commons.pipeline;
  
  import java.util.EventListener;
  
  /**
   * Listener interface for {@link StageEvent}s
   *
   * @author Kris Nuttycombe, National Geophysical Data Center
   * @version $Revision: 1.1 $
   */
  public interface StageEventListener extends EventListener {
      
      /**
       * Notify this listener of a {@link StageEvent}
       */
      public abstract void notify(StageEvent ev);
  }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/StageException.java
  
  Index: StageException.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *     http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License. 
   *
   * Created on December 9, 2003, 4:24 PM
   */
  
  package org.apache.commons.pipeline;
  
  /**
   * Exception wrapper class for exceptions that occur while processing a stage.
   *  
   * @author Kris Nuttycombe, National Geophysical Data Center
   * @version $Revision: 1.1 $
   */
  public class StageException extends java.lang.RuntimeException {
      
      /**
       * Creates a new instance of <code>StageException</code> without detail message.
       */
      public StageException() {
      }
      
      
      /**
       * Constructs an instance of <code>StageException</code> with the specified detail message.
       * @param msg the detail message.
       */
      public StageException(String msg) {
          super(msg);
      }
      
      
      /**
       * Constructs an instance of <code>StageException</code> with the specified detail message and cause
       * @param msg the detail message.
       * @param cause Throwable that caused this exception.
       */
      public StageException(String msg, Throwable cause) {
          super(msg, cause);
      }
  }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/StageHandler.java
  
  Index: StageHandler.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *     http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License. 
   *
   * Created on September 20, 2004, 12:30 PM
   */
  
  package org.apache.commons.pipeline;
  
  /**
   * Defines an interface for methods available to a StageQueue for execution
   * by individual worker threads. Handlers should not implement this interface
   * directly, but should extend {@link org.apache.commons.pipeline.Pipeline$Stage Stage} instead.
   *  
   * @author Kris Nuttycombe, National Geophysical Data Center
   * @version $Revision: 1.1 $
   */
  public interface StageHandler {
      /**
       * Implementations of this method should perform any necessary setup that
       * needs to be done before any data is processed from the {@link org.apache.commons.pipeline.Pipeline$Stage Stage}'s queue.
       *
       * @throws StageException an Exception thrown by the implementation should
       * be wrapped in a {@link StageException}
       */
      public void preprocess() throws StageException;    
      
      /**
       * Implementations of this method should atomically process a single data
       * object. 
       *
       * @throws ClassCastException if the object is of an incorrect type
       * for the processing operation
       * @throws StageException an Exception thrown by the implementation should
       * be wrapped in a {@link StageException}
       */
      public abstract void process(Object obj) throws StageException;
      
      
      /**
       * Implementations of this method should do any additional processing or
       * finalization necessary after all data has been processed. This method
       * usually runs following a call to the implementing {@link org.apache.commons.pipeline.Pipeline$Stage Stage}'s
       * {@link StageQueue#finish()} method.
       *
       * @throws StageException an Exception thrown by the implementation should
       * be wrapped in a {@link StageException}
       */
      public abstract void postprocess() throws StageException;   
      
      
      /**
       * Implementations of this method should clean up any lingering resources
       * that might otherwise be left allocated if an exception is thrown during
       * processing.
       */
      public abstract void release();
  }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/StageQueue.java
  
  Index: StageQueue.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *     http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License. 
   */
  
  package org.apache.commons.pipeline;
  
  /**
   * Defines an abstract base class that connects a {@link StageHandler} to
   * a work queue implementation, with methods to start and stop the
   * execution of the worker threads.
   *  
   * @author Kris Nuttycombe, National Geophysical Data Center
   * @version $Revision: 1.1 $
   */
  public abstract class StageQueue { 
      
      /**
       * Holds value of property stageHandler.
       */
      protected StageHandler stageHandler;
      
      /**
       * Default constructor (StageHandler uninitialized)
       */
      public StageQueue() { }
      
      /**
       * Creates a StageQueue with the specified StageHandler.
       */
      public StageQueue(StageHandler stageHandler){
          this.stageHandler = stageHandler;
      }
          
      /**
       * Adds an object to the tail of the queue.
       */
      public abstract void enqueue(Object obj);
      
      /**
       * Creates and starts new worker thread(s) to process items in the queue.
       */
      public abstract void start() throws IllegalThreadStateException;
      
      /**
       * This method waits for the queue to empty and any processor thread(s) to exit
       * cleanly and then calls release() to release any resources acquired during processing, if possible.
       * Implementations of this method must block until all items have been removed
       * from the queue.
       */
      public abstract void finish() throws InterruptedException;
      
      
      /**
       * Returns the status of the thread(s) that process data from the queue.
       */
      public abstract boolean isRunning(); 
      
      /**
       * Getter for property stageHandler.
       * @return Value of property stageHandler.
       */
      public StageHandler getStageHandler() {
          return this.stageHandler;
      }    
      
      /**
       * Setter for property stageHandler.
       * @param stageHandler New value of property stageHandler.
       */
      public void setStageHandler(StageHandler stageHandler) {
          this.stageHandler = stageHandler;
      }  
  }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/package.html
  
  Index: package.html
  ===================================================================
  <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
  
  <html>
    <head>
      <title></title>
    </head>
    <body>
    This package provides a set of pipeline utilities designed around 
    work queues that run in parallel to sequentially process data objects 
    data objects.
    </body>
  </html>
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/config/PipelineFactory.java
  
  Index: PipelineFactory.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *     http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License. 
   *
   * Created on February 12, 2004, 3:42 PM
   */
  
  package org.apache.commons.pipeline.config;
  
  import java.io.File;
  import java.io.IOException;
  import java.io.InputStream;
  import java.util.ArrayList;
  import java.util.Iterator;
  import java.util.List;
  import javax.xml.parsers.ParserConfigurationException;
  import org.apache.commons.digester.Digester;
  import org.apache.commons.digester.RuleSet;
  import org.apache.commons.pipeline.Pipeline;
  import org.xml.sax.SAXException;
  
  
  
  /**
   * This factory is designed to simplify creating a pipeline using Digester.
   *
   * @author Kris Nuttycombe, National Geophysical Data Center
   * @version $Revision: 1.1 $
   */
  public class PipelineFactory {
      
      /** Digester rule sets used to configure the Digester instance. */
      private List ruleSets = new ArrayList();
      
      
      /** Creates a new instance of PipelineFactory */
      public PipelineFactory() {
          //PipelineRuleSet needs a reference to {@link org.apache.commons.digester.RuleSet RuleSet}s 
          //used to parse the configuration file in case configuration is split up between multiple
          //files.
          ruleSets.add(new PipelineRuleSet(ruleSets));
      }            
      
      
      /**
       * Creates a pipeline from the specified Digester configuration file.
       */
      public Pipeline getPipeline(String configFile) throws IOException, SAXException, ParserConfigurationException  {
          Digester digester = new Digester();
          this.initDigester(digester);
          
          File conf = new File(configFile);
          return (Pipeline) digester.parse(conf);
      }
      
      
      /**
       * Creates a pipeline from the specified input stream. Useful for instances where
       * a pipeline configuration is being read from inside a jarfile.
       */
      public Pipeline getPipeline(InputStream configStream) throws IOException, SAXException, ParserConfigurationException  {
          Digester digester = new Digester();
          this.initDigester(digester);
          
          return (Pipeline) digester.parse(configStream);
      }
      
      
      /**
       * Initialize a Digester instance with the rule sets provided to this factory.
       */
      public void initDigester(Digester digester) {
          for (Iterator iter = ruleSets.iterator(); iter.hasNext();) {
              digester.addRuleSet((RuleSet) iter.next());
          }
      }
      
      
      /**
       * Adds a RuleSet to the list of rules available to Digester for parsing 
       * the configuration file.
       */
      public void addRuleSet(RuleSet ruleSet) {
          this.ruleSets.add(ruleSet);
      }
      
      
      /**
       * The simplest possible main method that creates a pipeline from a configuration file,
       * then runs the pipeline processing from start to finish.
       *
       * @param argv the command line arguments
       */
      public static void main(String[] argv) {
          try {
              PipelineFactory factory = new PipelineFactory();
              Pipeline pipeline = factory.getPipeline(argv[0]);
              for (int i = 1; i < argv.length; i++) {
                  pipeline.enqueue(argv[i]);
              }
              
              System.out.println("Pipeline created, about to begin processing...");
              
              pipeline.start();
              pipeline.finish();
              
              System.out.println("Pipeline successfully finished processing. See logs for details.");
          }
          catch (Exception e) {
              e.printStackTrace(System.err);
          }
      }
  }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/config/PipelineRuleSet.java
  
  Index: PipelineRuleSet.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *     http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License. 
   *
   * Created on February 12, 2004, 1:48 PM
   */
  
  package org.apache.commons.pipeline.config;
  
  import java.lang.reflect.Constructor;
  import java.util.Iterator;
  import java.util.List;
  import org.apache.commons.digester.*;
  import org.apache.commons.pipeline.Pipeline;
  import org.apache.commons.pipeline.StageQueue;
  import org.apache.commons.pipeline.impl.SingleThreadStageQueue;
  import org.xml.sax.Attributes;
  
  
  
  /**
   * <P>This is a Digester RuleSet that provides rules for parsing a process pipeline
   * XML file.</P>
   *
   * The rules defined by this object are used for parsing the following tags:
   * <ul>
   *  <li>&lt;pipeline&gt;&lt;/pipeline&gt; - The root element of the
   *  XML configuration file for a pipeline. This tag supports two attributes
   *  that are for use only when configuring branch pipelines, <code>key</code>
   *  and <code>configURI</code>. These attributes are described more fully
   *  below in the %lt;branch&gt; documentation.</li>
   *
   *  <li>&lt;stage className="<i>name</i>" queueClass="<i>name</i>" ... &gt;&lt;/stage&gt; - A single stage is
   *  created and configured using this tag. It is a child of &lt;pipeline&gt;. Stages
   *  created in this manner are added to the pipeline in the order that they
   *  occur in the configuration file. The class of the stage is specified by the
   *  <i>className</i> attribute; all other attributes are used by Digester to set bean
   *  properties on the newly created Stage object. At present, Stages configured using
   *  this tag must provide a one-argument constructor that takes a StageQueue instance. 
   *  By default, the stage will be constructed with a 
   * {@link org.apache.commons.pipeline.impl.SingleThreadStageQueue SingleThreadStageQueue}
   *  instance if the queueClass attribute is not set; otherwise the stage will be
   *  constructed with a new instance of the specified class, which should provide
   *  a no-arguments constructor.</li>
   *
   *  <li>&lt;enqueue/&gt; - Enqueue an object onto the first stage in the pipeline.</li>
   *  <li>&lt;branch/%gt; - Add a branch to a pipeline. The contents of this tag should
   *  be one or more &lt;pipeline/&gt;s. Branch pipelines added in this fashion must
   *  be configured with an attribute named <code>key</code> that holds the name by
   *  which the pipeline will be referred to by {@link org.apache.commons.pipeline.StageHandler StageHandler}s.
   *  Branch pipelines may be configured either inline in the main configuration
   *  file or in a separate file referred to by the <code>configURI</code> pipeline
   *  attribute.
   * </ul>
   *
   * @author Kris Nuttycombe, National Geophysical Data Center
   * @version $Revision: 1.1 $
   * @todo Add support for more complicated StageQueue construction and configuration as part of the Stage 
   * tag processing.
   */
  public class PipelineRuleSet extends RuleSetBase {
      private List nestedRuleSets;
      
      /** Creates a new instance of ChainRuleSet */
      public PipelineRuleSet() {
      }
      
      /** Creates a new instance of ChainRuleSet */
      public PipelineRuleSet(List nestedRuleSets) {
          this.nestedRuleSets = nestedRuleSets;
      }
      
      /**
       * Adds the rule instances for pipeline, stage, and enqueue
       * tasks to the Digester instance supplied.
       */
      public void addRuleInstances(Digester digester) {
          ObjectCreationFactory factory = new PipelineFactory();
          
          digester.addFactoryCreate("pipeline", factory);
          digester.addSetProperties("pipeline");
          
          // these rules are used to add subchains to the main pipeline
          digester.addFactoryCreate("*/branch/pipeline", factory);
          digester.addRule("*/branch/pipeline", new CallMethodRule(1, "addBranch", 2, new Class[] { String.class, Pipeline.class }));
          digester.addCallParam("*/branch/pipeline", 0, "key");
          digester.addCallParam("*/branch/pipeline", 1, 0);
          
          //this rule is intended to be used to add a pipeline element. the ChainLogger is
          //simply the default if no pipeline element class is specified
          digester.addFactoryCreate("*/pipeline/stage", StageFactory.class, "stageFactory", false);
          digester.addSetProperties("*/pipeline/stage");
          digester.addSetNext("*/pipeline/stage", "addStage", "org.apache.commons.pipeline.Pipeline$Stage");
          
          //rule for enqueuing string
          digester.addCallMethod("*/stage/enqueue/value", "enqueue", 0);
      }
      
      
      public static class StageFactory extends AbstractObjectCreationFactory {
          private static final Class[] DEFAULT_STAGE_CONSTRUCTOR_PARAMCLASSES = { StageQueue.class };
          
          public Object createObject(Attributes attributes) throws java.lang.Exception {
              String queueClassName = attributes.getValue("queueClass");
              Class queueClass = (queueClassName == null) ? SingleThreadStageQueue.class : Class.forName(queueClassName);
              
              String stageClassName = attributes.getValue("className");
              if (stageClassName == null) throw new IllegalArgumentException("className attribute may not be null for element <stage>");
              Class stageClass = Class.forName(stageClassName);
              
              Constructor constructor = stageClass.getConstructor(DEFAULT_STAGE_CONSTRUCTOR_PARAMCLASSES);
              return constructor.newInstance(new Object[] {queueClass.newInstance()});
          }       
      }
      
      
      public class PipelineFactory extends AbstractObjectCreationFactory {        
          public Object createObject(Attributes attributes) throws java.lang.Exception {
              String configURI = attributes.getValue("configURI");
              if (configURI == null) {
                  return new Pipeline();
              }
              else {
                  Digester subDigester = new Digester();
                  if (nestedRuleSets != null) {
                      for (Iterator iter = nestedRuleSets.iterator(); iter.hasNext();) {
                          subDigester.addRuleSet((RuleSet) iter.next());
                      }
                      
                      Pipeline pipeline = (Pipeline) subDigester.parse(configURI);
                      return pipeline;
                  }
                  else {
                      throw new IllegalStateException("Unable to parse branch configuration file: No parsing rules provided to PipelineRuleSet constructor.");
                  }
              }
          }
      }
  }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/config/package.html
  
  Index: package.html
  ===================================================================
  <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
  
  <html>
    <head>
      <title></title>
    </head>
    <body>
    This package provides utilities for creating executing a pipeline using Digester.
    </body>
  </html>
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl/FileFinderStage.java
  
  Index: FileFinderStage.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *     http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License. 
   */
  
  package org.apache.commons.pipeline.impl;
  
  import java.io.File;
  import java.util.regex.Pattern;
  import org.apache.commons.pipeline.StageQueue;
  import org.apache.commons.pipeline.Pipeline.Stage;
  import org.apache.log4j.Logger;
  
  /**
   * This {@link org.apache.commons.pipeline.Pipeline$Stage Stage} is used
   * to recursively find (non-directory) files that match the specified regex.
   *
   * File elements in the stage's queue will be recursively searched with the
   * resulting File objects placed on the subsequent stage's queue.
   *
   * @author Kris Nuttycombe, National Geophysical Data Center
   * @version $Revision: 1.1 $
   */
  public class FileFinderStage extends Stage {
      private static final Logger log = Logger.getLogger(FileFinderStage.class);
      private String filePattern = ".*";
      Pattern pattern;
      
      /** Creates a new instance of FileFinder */
      public FileFinderStage(StageQueue queue) {
          super(queue);
      };
      
      
      /**
       * Precompiles the regex pattern for matching against filenames
       */
      public void preprocess() {
          this.pattern = Pattern.compile(this.filePattern);
      }
      
      
      /**
       * This method inspects a File object to determine if
       * it matches this FileFinder's filePattern property. If the File
       * represents a directory, it recursively searches that directory and
       * all subdirectories for matching files. Matched files are placed
       * on the next stage's queue.
       */
      public void process(Object obj) {
          File file = (obj instanceof String) ? new File((String) obj) : (File) obj;        
          log.debug("Examining file " + file.getAbsolutePath());
          
          if (!file.exists()) {
              log.debug("File does not exist.");
          }
          else if (file.isDirectory()) {
              File[] files = file.listFiles();
              log.debug(file.getName() + " is a directory, processing " + files.length + " files within.");
              for (int i = 0; i < files.length; i++) {
                  process(files[i]);
              }
          }
          else if (this.pattern.matcher(file.getName()).matches()){
              this.exqueue(file);
              log.debug("Enqueueing file: " + file.getName());
          }
      }    
      
      
      /** Getter for property filePattern.
       * @return Value of property filePattern.
       *
       */
      public String getFilePattern() {
          return this.filePattern;
      }
      
      
      /** Setter for property filePattern.
       * @param pattern Value of property filePattern.
       *
       */
      public void setFilePattern(String pattern) {
          this.filePattern = pattern;
      }    
  }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl/FtpFileDownloadStage.java
  
  Index: FtpFileDownloadStage.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *     http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License. 
   */
  
  package org.apache.commons.pipeline.impl;
  
  import java.io.File;
  import java.io.FileOutputStream;
  import java.io.IOException;
  import java.io.OutputStream;
  import java.util.regex.Pattern;
  import org.apache.commons.net.ftp.FTPClient;
  import org.apache.commons.net.ftp.FTPReply;
  import org.apache.commons.pipeline.StageException;
  import org.apache.commons.pipeline.StageQueue;
  import org.apache.commons.pipeline.Pipeline.Stage;
  import org.apache.log4j.Logger;
  
  
  /**
   * This {@link org.apache.commons.pipeline.Pipeline$Stage Stage} provides the 
   * functionality needed to retrieve data from an FTP URL. Multipart responses 
   * are not yet supported.
   *
   * @author Kris Nuttycombe, National Geophysical Data Center
   * @version $Revision: 1.1 $
   */
  public class FtpFileDownloadStage extends Stage {
      private static Logger log = Logger.getLogger(FtpFileDownloadStage.class);
      
      private String workDir;
      private File fworkDir;
      private FTPClient client = new FTPClient();
      
      /** Holds value of property host. */
      private String host;
      
      /** Holds value of property user. */
      private String user;
      
      /** Holds value of property password. */
      private String password;
      
      
      /**
       * Default constructor - creates work directory in /tmp
       */
      public FtpFileDownloadStage(StageQueue queue) {
          super(queue);
          this.workDir = "/tmp";
      }
      
      /**
       * Creates a new instance of HttpFileDownload with the specified work directory
       * into which to download files.
       */
      public FtpFileDownloadStage(StageQueue queue, String workDir) {
          super(queue);
          this.workDir = workDir;
      }
      
      /**
       * Creates the download directory {@link #setWorkDir(String) workDir} uf it does
       * not exist.
       */
      public void preprocess() throws StageException {
          if (fworkDir == null) fworkDir = new File(workDir);
          if (!this.fworkDir.exists()) fworkDir.mkdirs();
          
          try {
              //connect to the ftp site
              client.connect(host);
              log.debug(client.getReplyString());
              if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
                  throw new IOException("FTP server at host " + host + " refused connection.");
              }
              
              client.login(user, password);
              log.debug(client.getReplyString());
              if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
                  throw new IOException("FTP login failed for user " + user + ": " + client.getReplyString());
              }
          }
          catch (IOException e) {
              throw new StageException(e.getMessage(), e);
          }
      }
      
      /**
       * Removes a java.net.URL (an HTTP URL) from the input queue, follows any redirects
       * specified by that URL, and then retrieves the data to a file over an HTTP
       * connection. The file name for download is the
       * last element of the URL path for download appended with a timestamp
       * value, and it is stored in the directory specified by {@link #setWorkDir(String) setWorkDir()}, or to
       * /tmp if no work directory is set.
       *
       * @param obj The URL from which to download data.
       * @throws ClassCastException if the parameter obj is not an instance of java.net.URL
       */
      public void process(Object obj) throws StageException {
          if (!this.fworkDir.exists()) throw new StageException("The work directory for file download " + workDir.toString() + " does not exist.");
          
          FileSpec spec = (FileSpec) obj;
          
          try {
              client.changeWorkingDirectory(spec.path);
              log.debug(client.getReplyString());
              if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
                  throw new IOException("FTP client could not change to remote directory " + spec.path + ": " + client.getReplyString());
              }
              
              log.debug("FTP connection successfully established to " + host + spec.path);
              
              //get the list of files
              client.enterLocalPassiveMode();
              String[] dirs = client.listNames();
              if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
                  throw new IOException("FTP client could not obtain file list : " + client.getReplyString());
              }
              //client.enterLocalActiveMode();
                         
              log.debug("FTP file list successfully obtained.");
              
              Pattern pattern = Pattern.compile(spec.pattern);
              
              log.debug("File pattern is " + spec.pattern);
              
              //create the list of netcdf track files to get
              for (int i = 0; i < dirs.length; i++){
                  log.debug("Obtaining files in directory " + dirs[i]);
                  String[] files = client.listNames(dirs[i]);
                  
                  for (int j = 0; j < files.length; j++) {
                      if (pattern.matcher(files[j]).matches()) {
                          log.debug("Matched file name " + files[j] + " against pattern " + spec.pattern);
                          File f = new File(workDir + File.separatorChar + files[j]);
                          if (! f.getParentFile().exists()) f.getParentFile().mkdir();
                          
                          OutputStream out = new FileOutputStream(f);
                          client.retrieveFile(files[j], out);
                          this.exqueue(f);
                      }
                  }
              }
          }
          catch (IOException e) {
              throw new StageException(e.getMessage(), e);
          }
      }
      
      
      /**
       * Disconnects from FTP server. Errors are logged.
       */
      public void release() {
          try {
              client.disconnect(); //close ftp connection
          }
          catch (IOException e) {
              log.error(e.getMessage(), e);
          }
      }
      
      
      /**
       * Sets the working directory for the file download. If the directory does
       * not already exist, it will be created during the preprocess() step.
       */
      public void setWorkDir(String workDir) {
          this.workDir = workDir;
      }
      
      /**
       * Returns the name of the file download directory.
       */
      public String getWorkDir() {
          return this.workDir;
      }
      
      /** Getter for property host.
       * @return Value of property host.
       *
       */
      public String getHost() {
          return this.host;
      }
      
      /** Setter for property host.
       * @param host New value of property host.
       *
       */
      public void setHost(String host) {
          this.host = host;
      }
      
      /** Getter for property user.
       * @return Value of property user.
       *
       */
      public String getUser() {
          return this.user;
      }
      
      /** Setter for property user.
       * @param user New value of property user.
       *
       */
      public void setUser(String user) {
          this.user = user;
      }
      
      /** Setter for property password.
       * @param password New value of property password.
       *
       */
      public void setPassword(String password) {
          this.password = password;
      }
      
      
      /**
       * This class is used to specify a path and pattern of file for the FtpFileDownload
       * to retrieve.
       */
      public static class FileSpec {
          
          /** Holds value of property path. */
          private String path = "/";
          
          /** Holds value of property pattern. */
          private String pattern = ".*";
          
          /** Getter for property path.
           * @return Value of property path.
           *
           */
          public String getPath() {
              return this.path;
          }
          
          /** Setter for property path.
           * @param path New value of property path.
           *
           */
          public void setPath(String path) {
              this.path = path;
          }
          
          /** Getter for property pattern.
           * @return Value of property pattern.
           *
           */
          public String getPattern() {
              return this.pattern;
          }
          
          /** Setter for property pattern.
           * @param pattern New value of property pattern.
           *
           */
          public void setPattern(String pattern) {
              this.pattern = pattern;
          }
      }
  }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl/HttpFileDownloadStage.java
  
  Index: HttpFileDownloadStage.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *     http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License. 
   */
  
  package org.apache.commons.pipeline.impl;
  
  import java.io.*;
  import java.net.MalformedURLException;
  import java.net.URL;
  import java.util.HashMap;
  import java.util.Map;
  import org.apache.commons.pipeline.StageException;
  import org.apache.commons.pipeline.StageQueue;
  import org.apache.commons.pipeline.Pipeline.Stage;
  import org.apache.log4j.Logger;
  import sun.net.www.protocol.http.HttpURLConnection;
  
  /**
   * This {@link org.apache.commons.pipeline.Pipeline$Stage Stage} provides the functionality 
   * needed to retrieve data from an HTTP URL. Multipart responses are not yet supported.
   *
   * @author Kris Nuttycombe, National Geophysical Data Center
   * @version $Revision: 1.1 $
   */
  public class HttpFileDownloadStage extends Stage {
      private static final int BUFFER_SIZE = 10000;
      private static Logger log = Logger.getLogger(HttpFileDownloadStage.class);
      private String workDir;
      private File fworkDir;
      
      /**
       * Default constructor - creates work directory in /tmp
       */
      public HttpFileDownloadStage(StageQueue queue) {
          super(queue);
          this.workDir = "/tmp";
      }
      
      /**
       * Creates a new instance of HttpFileDownload with the specified work directory
       * into which to download files.
       */
      public HttpFileDownloadStage(StageQueue queue, String workDir) {
          super(queue);
          this.workDir = workDir;
      }
      
      /**
       * Creates the directory {@link #setWorkDir(String) workDir} if it does
       * not exist.
       */
      public void preprocess() throws StageException {
          if (fworkDir == null) fworkDir = new File(workDir);
          if (!this.fworkDir.exists()) fworkDir.mkdirs();
      }
      
      /**
       * Removes a java.net.URL (an HTTP URL) from the input queue, follows any redirects
       * specified by that URL, and then retrieves the data to a file over an HTTP
       * connection. The file name for download is the
       * last element of the URL path for download appended with a timestamp
       * value, and it is stored in the directory specified by {@link #setWorkDir(String) setWorkDir()}, or to
       * /tmp if no work directory is set.
       *
       * @param obj The URL from which to download data.
       * @throws ClassCastException if the parameter obj is not an instance of java.net.URL
       */
      public void process(Object obj) throws StageException {
          if (!this.fworkDir.exists()) throw new StageException("The work directory for file download " + workDir.toString() + " does not exist.");
          Map params = new HashMap();
          
          URL url;
          try {
              if (obj instanceof String) {
                  String loc = (String) obj;
                  /*int paramIndex = loc.indexOf('?');
                  if (paramIndex > 0) {
                      url = new URL(loc.substring(0, paramIndex));
                      for (StringTokenizer st = new StringTokenizer(loc.substring(paramIndex + 1), "&"); st.hasMoreTokens();) {
                          String tok = st.nextToken();
                          int eqIndex = tok.indexOf('=');
                          if (eqIndex > 0) {
                              params.put(tok.substring(0, eqIndex), tok.substring(eqIndex + 1));
                          }
                          else {
                              params.put(tok, null);
                          }
                      }
                  }
                  else {*/
                      url = new URL((String) obj);
                  //}
              }
              else if (obj instanceof URL) {
                  url = (URL) obj;
              }
              else {
                  throw new IllegalArgumentException("Unrecognized parameter class to process() for HttpFileDownload: " + obj.getClass().getName() + "; must be URL or String");
              }
          }
          catch (MalformedURLException e) {
              throw new StageException("Malformed URL: " + obj.toString(), e);
          }
          
          log.debug("Retrieving data from " + url.toString());
          
          /*
          try {
              url = handleRedirects(url);
          }
          catch (Exception e) { //catches MalformedURLException, IOException
              throw new StageException("An error was encountered attempting to follow URL redirects from " + url.toString(), e);
          }
           */
          
          java.net.HttpURLConnection con = null;
          try {
              con = (java.net.HttpURLConnection) url.openConnection();
              /*if (!params.isEmpty()) {
                  con.setRequestMethod("GET");
                  for (Iterator iter = params.entrySet().iterator(); iter.hasNext();) {
                      Map.Entry entry = (Map.Entry) iter.next();
                      con.setRequestProperty((String) entry.getKey(), (String) entry.getValue());
                  }
              }*/
          }
          catch (IOException e) {
              throw new StageException(e.getMessage(), e);
          }
          
          long time = System.currentTimeMillis();
          String path = url.getPath();
          String fileName = path.substring(path.lastIndexOf('/')) + "." + time; //tag the downloaded file with the time of retrieval.
          File workFile = new File(workDir, fileName);
          
          try {
              //log.debug("About to connect.");
              //con.connect();
              //log.debug("Connection status: " + con.getResponseCode());
              InputStream in = new BufferedInputStream(con.getInputStream());
              OutputStream out = new BufferedOutputStream(new FileOutputStream(workFile, false));
              byte[] buffer = new byte[BUFFER_SIZE]; //attempt to read 10k at a time
              for (int results = 0; (results = in.read(buffer)) != -1;) {
                  out.write(buffer, 0, results);
              }
          }
          catch (IOException e) {
              throw new StageException("An error occurred downloading a data file from " + url.toString() + ": " + e.getMessage(), e);
          }
          finally {
              con.disconnect();
          }
          
          this.exqueue(workFile);
      }
      
  
      /**
       * Sets the working directory for the file download. If the directory does
       * not already exist, it will be created during the preprocess() step.
       */
      public void setWorkDir(String workDir) {
          this.workDir = workDir;
      }
      
      /**
       * Returns the name of the file download directory.
       */
      public String getWorkDir() {
          return this.workDir;
      }
      
      /**
       * Follows redirects from the specified URL and recursively returns the destination
       * URL. This method does not check for circular redirects, so it is possible that a malicious
       * site could force this method into infinite recursion.
       *
       * TODO: Add a max_hops parameterized version
       */
      public static URL handleRedirects(URL url) throws IOException, MalformedURLException {
          java.net.HttpURLConnection.setFollowRedirects(false);
          HttpURLConnection con = new HttpURLConnection(url, url.getHost(), url.getPort());
          int response = con.getResponseCode();
          log.debug("Response code for " + url + " = " + response);
          
          if (response == java.net.HttpURLConnection.HTTP_MOVED_PERM || response == java.net.HttpURLConnection.HTTP_MOVED_TEMP) {
              String location = con.getHeaderField("Location");
              log.debug("Handling redirect to location: " + location);
              
              if (location.startsWith("http:")) {
                  url = new URL(location);
              }
              else if (location.startsWith("/")) {
                  url = new URL("http://" + url.getHost() + location);
              }
              else {
                  url = new URL(con.getURL(), location);
              }
              
              url = handleRedirects(url); // to handle nested redirections
          }
          
          return url;
      }
  }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl/LogStage.java
  
  Index: LogStage.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *     http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License. 
   */
  
  package org.apache.commons.pipeline.impl;
  
  import org.apache.commons.pipeline.StageException;
  import org.apache.commons.pipeline.StageQueue;
  import org.apache.commons.pipeline.Pipeline.Stage;
  import org.apache.log4j.Logger;
  
  
  /**
   * A do-nothing implementation of Stage with Log4j logging. Useful for debugging purposes.
   *
   * @author Kris Nuttycombe, National Geophysical Data Center
   * @version $Revision: 1.1 $
   */
  public class LogStage extends Stage {
      private Logger log = Logger.getLogger(this.getClass());
      
      /**
       * Creates a new LogStage with the specified {@link StageQueue}.
       */
      public LogStage(StageQueue queue) {
          super(queue);
      }
      
      /**
       * Logs the point at which preprocessing runs.
       */
      public void preprocess() throws StageException {
          log.info("Stage " + this.getClass().getName() + " preprocessing.");
      }
      
      /**
       * Logs the current state of an object on the queue and passes the
       * object unchanged to the next stage in the pipeline.
       */
      public void process(Object obj) throws StageException {
          log.info("Processing object " + obj);
          this.exqueue(obj);
      }
          
      /**
       * Logs tht point at which postprocessing runs
       */
      public void postprocess() throws StageException {
          log.info("Stage " + this.getClass().getName() + " postprocessing.");
      }
      
      /**
       * Logs the point at which stage resources are released.
       */
      public void release() {
          log.info("Stage " + this.getClass().getName() + " released.");
      }
  
      /**
       * Default toString implementation.
       */
      public String toString() {
          return this.getClass().getName();
      }
  }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl/SingleThreadStageQueue.java
  
  Index: SingleThreadStageQueue.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *     http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License. 
   */
  
  package org.apache.commons.pipeline.impl;
  
  import java.util.LinkedList;
  import java.util.List;
  import org.apache.commons.pipeline.StageException;
  import org.apache.commons.pipeline.StageQueue;
  import org.apache.log4j.Logger;
  
  /**
   * This is a simple implementation of a work queue, based upon the example given 
   * in Effective Java, by Joshua Bloch &copyright; Sun Microsystems 2001, Pg 196.
   *
   * @author Kris Nuttycombe, National Geophysical Data Center
   * @version $Revision: 1.1 $
   */
  public class SingleThreadStageQueue extends StageQueue {
      private Logger log = Logger.getLogger(this.getClass());
      
      private final List queue = new LinkedList();
      private volatile boolean failureTolerant = true;
      private volatile boolean running = false;
      private WorkerThread workerThread;
      
      
      /**
       * Default constructor
       */
      public SingleThreadStageQueue() {
      }
      
      
      /**
       * Add an object to the tail of the queue.
       */
      public final void enqueue(Object obj) {
          synchronized (queue) {
              queue.add(obj);
              queue.notify();
          }
      }
      
      
      /**
       * Creates and starts a new worker thread to process items in the queue.
       */
      public final void start() throws IllegalThreadStateException {
          if (running) {
              throw new IllegalThreadStateException("Processor thread has already been started.");
          }
          else {
              this.running = true;
              this.workerThread = new WorkerThread();
              
              log.debug("Starting worker thread.");
              this.workerThread.start();
          }
      }
      
      
      /**
       * This method waits for the queue to empty and the processor thread to exit 
       * cleanly and release any resources acquired during processing, if possible.
       */
      public void finish() throws InterruptedException {
          log.debug("Finishing processing...");
          synchronized (queue) {
              this.running = false;
              queue.notify();
          }
          
          this.workerThread.join();
          log.debug("Finished; worker thread stopped.");
      }
      
      
      /**
       * Returns the status of the loop that writes data from the queue.
       */
      public final boolean isRunning() {
          return this.running;
      }
      
      
      /**
       * This worker thread removes and processes data objects from the incoming
       * queue. It first calls preprocess(), then begins a loop that calls the process()
       * method to process data from the queue. This loop runs as long as the
       * {@link getRunning() running} property is true or the queue is not empty. To break the loop the
       * calling code must run the writer's finish() method to set the running property to false.
       * At this point the loop will continue to run until the queue is empty, then the loop will
       * exit and the postprocess() method is called.<P>
       *
       * @throws StageException if an error is encountered during data processing
       * and failureTolerant is set to false.
       */
      private class WorkerThread extends Thread {
          public final void run() {
              try {
                  log.debug("preprocessing...");
                  SingleThreadStageQueue.this.stageHandler.preprocess();
                  
                  while (true) {
                      Object obj = null;
                      synchronized (queue) {
                          while (running && queue.isEmpty()) queue.wait();
                          
                          if (!running && queue.isEmpty()) break;
                          obj = queue.remove(0);
                      }
                      
                      try {
                          //if (log.isDebugEnabled()) log.debug("Running method \"process()\" on object " + obj.toString());
                          SingleThreadStageQueue.this.stageHandler.process(obj);
                      }
                      catch (StageException e) {
                          if (!failureTolerant) throw e;
                          log.error(e.getMessage(), e);
                      }
                  }
                  
                  log.debug("postprocessing...");
                  SingleThreadStageQueue.this.stageHandler.postprocess();
              }
              catch (InterruptedException e) {
                  log.fatal(e.getMessage(), e);
              }
              catch (StageException e) {
                  log.fatal(e.getMessage(), e);
                  throw e;
              }
              finally {
                  running = false;
                  SingleThreadStageQueue.this.stageHandler.release();
              }
          }
      }
          
      
      /**
       * Sets the failure tolerance flag for the worker thread. If failureTolerant
       * is set to true, {@link StageException StageException}s thrown by 
       * the process() method will not interrupt queue processing, but will simply 
       * be logged with a severity of ERROR. 
       */
      public final void setFailureTolerant(boolean failureTolerant) {
          this.failureTolerant = failureTolerant;
      }
      
      
      /**
       * Getter for property failureTolerant.
       * @return Value of property failureTolerant.
       */
      public boolean isFailureTolerant() {
          return this.failureTolerant;
      }
  }
  
  
  
  1.1                  jakarta-commons-sandbox/pipeline/src/java/org/apache/commons/pipeline/impl/package.html
  
  Index: package.html
  ===================================================================
  <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
  
  <html>
    <head>
      <title></title>
    </head>
    <body>
    A few simple Stage and StageQueue implementations for common use cases.
    </body>
  </html>
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: commons-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: commons-dev-help@jakarta.apache.org


Mime
View raw message