manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1602428 [2/3] - in /manifoldcf/branches/CONNECTORS-962/framework: agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/ agents/src/main/java/org/apache/manifoldcf/agents/interfaces/ pull-agent/src/main/java/org/apache/manifo...
Date Fri, 13 Jun 2014 13:51:11 GMT

Modified: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1602428&r1=1602427&r2=1602428&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Fri Jun 13 13:51:11 2014
@@ -226,32 +226,27 @@ public class IncrementalIngester extends
   public boolean checkMimeTypeIndexable(String outputConnectionName, String outputDescription, String mimeType)
     throws ManifoldCFException, ServiceInterruption
   {
-    return checkMimeTypeIndexable(new String[0], new String[0],
-      outputConnectionName, outputDescription,
+    return checkMimeTypeIndexable(
+      new RuntPipelineSpecification(outputConnectionName,outputDescription),
       mimeType,null);
   }
 
+  
   /** Check if a mime type is indexable.
-  *@param transformationConnectionNames is the ordered list of transformation connection names.
-  *@param transformationDescriptions is the ordered list of transformation description strings.
-  *@param outputConnectionName is the name of the output connection associated with this action.
-  *@param outputDescription is the output description string.
+  *@param pipelineSpecification is the pipeline specification.
   *@param mimeType is the mime type to check.
   *@param activity are the activities available to this method.
   *@return true if the mimeType is indexable.
   */
   @Override
   public boolean checkMimeTypeIndexable(
-    String[] transformationConnectionNames, String[] transformationDescriptions,
-    String outputConnectionName, String outputDescription,
+    IPipelineSpecification pipelineSpecification,
     String mimeType,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
     PipelineObject pipeline = pipelineGrab(
-      transformationConnectionManager.loadMultiple(transformationConnectionNames),
-      connectionManager.load(outputConnectionName),
-      transformationDescriptions,outputDescription);
+      new PipelineConnections(pipelineSpecification));
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -265,8 +260,6 @@ public class IncrementalIngester extends
     }
   }
 
-  
-  
   /** Check if a file is indexable.
   *@param outputConnectionName is the name of the output connection associated with this action.
   *@param outputDescription is the output description string.
@@ -277,32 +270,26 @@ public class IncrementalIngester extends
   public boolean checkDocumentIndexable(String outputConnectionName, String outputDescription, File localFile)
     throws ManifoldCFException, ServiceInterruption
   {
-    return checkDocumentIndexable(new String[0], new String[0],
-      outputConnectionName, outputDescription,
+    return checkDocumentIndexable(
+      new RuntPipelineSpecification(outputConnectionName,outputDescription),
       localFile,null);
   }
   
   /** Check if a file is indexable.
-  *@param transformationConnectionNames is the ordered list of transformation connection names.
-  *@param transformationDescriptions is the ordered list of transformation description strings.
-  *@param outputConnectionName is the name of the output connection associated with this action.
-  *@param outputDescription is the output description string.
+  *@param pipelineSpecification is the pipeline specification.
   *@param localFile is the local file to check.
   *@param activity are the activities available to this method.
   *@return true if the local file is indexable.
   */
   @Override
   public boolean checkDocumentIndexable(
-    String[] transformationConnectionNames, String[] transformationDescriptions,
-    String outputConnectionName, String outputDescription,
+    IPipelineSpecification pipelineSpecification,
     File localFile,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
     PipelineObject pipeline = pipelineGrab(
-      transformationConnectionManager.loadMultiple(transformationConnectionNames),
-      connectionManager.load(outputConnectionName),
-      transformationDescriptions,outputDescription);
+      new PipelineConnections(pipelineSpecification));
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -327,33 +314,27 @@ public class IncrementalIngester extends
   public boolean checkLengthIndexable(String outputConnectionName, String outputDescription, long length)
     throws ManifoldCFException, ServiceInterruption
   {
-    return checkLengthIndexable(new String[0], new String[0],
-      outputConnectionName, outputDescription,
+    return checkLengthIndexable(
+      new RuntPipelineSpecification(outputConnectionName,outputDescription),
       length,null);
   }
   
   /** Pre-determine whether a document's length is indexable by this connector.  This method is used by participating repository connectors
   * to help filter out documents that are too long to be indexable.
-  *@param transformationConnectionNames is the ordered list of transformation connection names.
-  *@param transformationDescriptions is the ordered list of transformation description strings.
-  *@param outputConnectionName is the name of the output connection associated with this action.
-  *@param outputDescription is the output description string.
+  *@param pipelineSpecification is the pipeline specification.
   *@param length is the length of the document.
   *@param activity are the activities available to this method.
   *@return true if the file is indexable.
   */
   @Override
   public boolean checkLengthIndexable(
-    String[] transformationConnectionNames, String[] transformationDescriptions,
-    String outputConnectionName, String outputDescription,
+    IPipelineSpecification pipelineSpecification,
     long length,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
     PipelineObject pipeline = pipelineGrab(
-      transformationConnectionManager.loadMultiple(transformationConnectionNames),
-      connectionManager.load(outputConnectionName),
-      transformationDescriptions,outputDescription);
+      new PipelineConnections(pipelineSpecification));
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -378,33 +359,27 @@ public class IncrementalIngester extends
   public boolean checkURLIndexable(String outputConnectionName, String outputDescription, String url)
     throws ManifoldCFException, ServiceInterruption
   {
-    return checkURLIndexable(new String[0], new String[0],
-      outputConnectionName, outputDescription,
+    return checkURLIndexable(
+      new RuntPipelineSpecification(outputConnectionName,outputDescription),
       url,null);
   }
   
   /** Pre-determine whether a document's URL is indexable by this connector.  This method is used by participating repository connectors
   * to help filter out documents that not indexable.
-  *@param transformationConnectionNames is the ordered list of transformation connection names.
-  *@param transformationDescriptions is the ordered list of transformation description strings.
-  *@param outputConnectionName is the name of the output connection associated with this action.
-  *@param outputDescription is the output description string.
+  *@param pipelineSpecification is the pipeline specification.
   *@param url is the url of the document.
   *@param activity are the activities available to this method.
   *@return true if the file is indexable.
   */
   @Override
   public boolean checkURLIndexable(
-    String[] transformationConnectionNames, String[] transformationDescriptions,
-    String outputConnectionName, String outputDescription,
+    IPipelineSpecification pipelineSpecification,
     String url,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
     PipelineObject pipeline = pipelineGrab(
-      transformationConnectionManager.loadMultiple(transformationConnectionNames),
-      connectionManager.load(outputConnectionName),
-      transformationDescriptions,outputDescription);
+      new PipelineConnections(pipelineSpecification));
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -418,7 +393,6 @@ public class IncrementalIngester extends
     }
   }
 
-  
   /** Grab the entire pipeline.
   *@param transformationConnections - the transformation connections, in order
   *@param outputConnection - the output connection
@@ -426,42 +400,88 @@ public class IncrementalIngester extends
   *@param outputDescriptionString - the output description string
   *@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
   */
-  protected PipelineObject pipelineGrab(ITransformationConnection[] transformationConnections, IOutputConnection outputConnection,
-    String[] transformationDescriptionStrings, String outputDescriptionString)
+  protected PipelineObjectWithVersions pipelineGrabWithVersions(PipelineConnectionsWithVersions pipelineConnections)
     throws ManifoldCFException
   {
     // Pick up all needed transformation connectors
-    String[] transformationConnectionNames = new String[transformationConnections.length];
-    for (int i = 0; i < transformationConnections.length; i++)
+    ITransformationConnector[] transformationConnectors = transformationConnectorPool.grabMultiple(pipelineConnections.getTransformationConnectionNames(),pipelineConnections.getTransformationConnections());
+    for (ITransformationConnector c : transformationConnectors)
     {
-      transformationConnectionNames[i] = transformationConnections[i].getName();
+      if (c == null)
+      {
+        transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
+        return null;
+      }
     }
     
-    ITransformationConnector[] transformationConnectors = transformationConnectorPool.grabMultiple(transformationConnectionNames,transformationConnections);
+    // Pick up all needed output connectors.  If this fails we have to release the transformation connectors.
+    try
+    {
+      IOutputConnector[] outputConnectors = outputConnectorPool.grabMultiple(pipelineConnections.getOutputConnectionNames(),pipelineConnections.getOutputConnections());
+      for (IOutputConnector c : outputConnectors)
+      {
+        if (c == null)
+        {
+          outputConnectorPool.releaseMultiple(pipelineConnections.getOutputConnections(),outputConnectors);
+          transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
+          return null;
+        }
+      }
+      return new PipelineObjectWithVersions(pipelineConnections,transformationConnectors,outputConnectors);
+    }
+    catch (Throwable e)
+    {
+      transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
+      if (e instanceof ManifoldCFException)
+        throw (ManifoldCFException)e;
+      else if (e instanceof RuntimeException)
+        throw (RuntimeException)e;
+      else if (e instanceof Error)
+        throw (Error)e;
+      else
+        throw new RuntimeException("Unexpected exception type: "+e.getClass().getName()+": "+e.getMessage(),e);
+    }
+  }
+
+  /** Grab the entire pipeline.
+  *@param transformationConnections - the transformation connections, in order
+  *@param outputConnection - the output connection
+  *@param transformationDescriptionStrings - the array of description strings for transformations
+  *@param outputDescriptionString - the output description string
+  *@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
+  */
+  protected PipelineObject pipelineGrab(PipelineConnections pipelineConnections)
+    throws ManifoldCFException
+  {
+    // Pick up all needed transformation connectors
+    ITransformationConnector[] transformationConnectors = transformationConnectorPool.grabMultiple(pipelineConnections.getTransformationConnectionNames(),pipelineConnections.getTransformationConnections());
     for (ITransformationConnector c : transformationConnectors)
     {
       if (c == null)
       {
-        transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
+        transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
         return null;
       }
     }
     
-    // Last, pick up output connector.  If it fails we have to release the transformation connectors.
+    // Pick up all needed output connectors.  If this fails we have to release the transformation connectors.
     try
     {
-      IOutputConnector outputConnector = outputConnectorPool.grab(outputConnection);
-      if (outputConnector == null)
+      IOutputConnector[] outputConnectors = outputConnectorPool.grabMultiple(pipelineConnections.getOutputConnectionNames(),pipelineConnections.getOutputConnections());
+      for (IOutputConnector c : outputConnectors)
       {
-        transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
-        return null;
+        if (c == null)
+        {
+          outputConnectorPool.releaseMultiple(pipelineConnections.getOutputConnections(),outputConnectors);
+          transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
+          return null;
+        }
       }
-      return new PipelineObject(transformationConnections,transformationConnectors,outputConnection,outputConnector,
-        transformationDescriptionStrings,outputDescriptionString);
+      return new PipelineObject(pipelineConnections,transformationConnectors,outputConnectors);
     }
     catch (Throwable e)
     {
-      transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
+      transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
       if (e instanceof ManifoldCFException)
         throw (ManifoldCFException)e;
       else if (e instanceof RuntimeException)
@@ -498,35 +518,138 @@ public class IncrementalIngester extends
 
   }
 
-  /** Get transformation version strings for a document.
-  *@param transformationConnectionNames are the names of the transformation connections associated with this action.
-  *@param specs are the transformation specifications.
-  *@return the description strings.
+  /** Get transformation version string for a document.
+  *@param transformationConnectionName is the names of the transformation connection associated with this action.
+  *@param spec is the transformation specification.
+  *@return the description string.
   */
-  @Override
-  public String[] getTransformationDescriptions(String[] transformationConnectionNames, OutputSpecification[] specs)
+  public String getTransformationDescription(String transformationConnectionName, OutputSpecification spec)
     throws ManifoldCFException, ServiceInterruption
   {
-    String[] rval = new String[transformationConnectionNames.length];
-    for (int i = 0; i < rval.length; i++)
+    ITransformationConnection connection = transformationConnectionManager.load(transformationConnectionName);
+    ITransformationConnector connector = transformationConnectorPool.grab(connection);
+    if (connector == null)
+      // The connector is not installed; treat this as a service interruption.
+      throw new ServiceInterruption("Transformation connector not installed",0L);
+    try
     {
-      String transformationConnectionName = transformationConnectionNames[i];
-      OutputSpecification spec = specs[i];
-      ITransformationConnection connection = transformationConnectionManager.load(transformationConnectionName);
-      ITransformationConnector connector = transformationConnectorPool.grab(connection);
-      if (connector == null)
-        // The connector is not installed; treat this as a service interruption.
-        throw new ServiceInterruption("Transformation connector not installed",0L);
-      try
-      {
-        rval[i] = connector.getPipelineDescription(spec);
-      }
-      finally
+      return connector.getPipelineDescription(spec);
+    }
+    finally
+    {
+      transformationConnectorPool.release(connection,connector);
+    }
+  }
+
+  /** Determine whether we need to fetch or refetch a document.
+  * Pass in information including the pipeline specification with existing version info, plus new document and parameter version strings.
+  * If no outputs need to be updated, then this method will return false.  If any outputs need updating, then true is returned.
+  *@param pipelineSpecificationWithVersions is the pipeline specification including new version info for all transformation and output
+  *  connections.
+  *@param newDocumentVersion is the newly-determined document version.
+  *@param newParameterVersion is the newly-determined parameter version.
+  *@param newAuthorityNameString is the newly-determined authority name.
+  *@return true if the document needs to be refetched.
+  */
+  @Override
+  public boolean checkFetchDocument(
+    IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+    String newDocumentVersion,
+    String newParameterVersion,
+    String newAuthorityNameString)
+  {
+    // Empty document version has a special meaning....
+    if (newDocumentVersion.length() == 0)
+      return true;
+    // Otherwise, cycle through the outputs
+    for (int i = 0; i < pipelineSpecificationWithVersions.getOutputCount(); i++)
+    {
+      int stage = pipelineSpecificationWithVersions.getOutputStage(i);
+      String oldDocumentVersion = pipelineSpecificationWithVersions.getOutputDocumentVersionString();
+      String oldParameterVersion = pipelineSpecificationWithVersions.getOutputParameterVersionString();
+      String oldOutputVersion = pipelineSpecificationWithVersions.getOutputVersionString();
+      String oldAuthorityName = pipelineSpecificationWithVersions.getAuthorityNameString();
+      // If it looks like we never indexed this output before, we need to do it now.
+      if (oldDocumentVersion == null)
+        return true;
+      // Look first at the version strings that aren't pipeline dependent
+      if (!oldDocumentVersion.equals(newDocumentVersion) ||
+        !oldParameterVersion.equals(newParameterVersion) ||
+        !oldAuthorityName.equals(newAuthorityNameString) ||
+        !oldOutputVersion.equals(pipelineSpecificationWithVersions.getStageDescriptionString(stage)))
+        return true;
+      
+      // Everything matches so far.  Next step is to compute a transformation path an corresponding version string.
+      String newTransformationVersion = computePackedTransformationVersion(pipelineSpecificationWithVersions,stage);
+      if (!pipelineSpecificationWithVersions.getOutputTransformationVersionString().equals(newTransformationVersion))
+        return true;
+    }
+    // Everything matches, so no reindexing is needed.
+    return false;
+  }
+
+  /** Compute a transformation version given a pipeline specification and starting output stage.
+  *@param pipelineSpecification is the pipeline specification.
+  *@param stage is the stage number of the output stage.
+  *@return the transformation version string, which will be a composite of all the transformations applied.
+  */
+  protected static String computePackedTransformationVersion(IPipelineSpecification pipelineSpecification, int stage)
+  {
+    // First, count the stages we need to represent
+    int stageCount = 0;
+    int currentStage = stage;
+    while (true)
+    {
+      int newStage = stage.getStageParent(currentStage);
+      if (newStage == -1)
+        break;
+      stageCount++;
+      currentStage = newStage;
+    }
+    // Doesn't matter how we pack it; I've chosen to do it in reverse for convenience
+    String[] stageNames = new String[stageCount];
+    String[] stageDescriptions = new String[stageCount];
+    stageCount = 0;
+    currentStage = stage;
+    while (true)
+    {
+      int newStage = stage.getStageParent(currentStage);
+      if (newStage == -1)
+        break;
+      stageNames[stageCount] = pipelineSpecification.getStageConnectionName(newStage);
+      stageDescriptions[stageCount] = pipelineSpecification.getStageDescriptionString(newStage);
+      stageCount++;
+      currentStage = newStage;
+    }
+    // Finally, do the packing.
+    StringBuilder sb = new StringBuilder();
+    packList(sb,stageNames,'+');
+    packList(sb,stageDescriptions,'!');
+    return sb.toString();
+  }
+  
+  protected static void packList(StringBuilder output, String[] values, char delimiter)
+  {
+    pack(output,Integer.toString(values.length),delimiter);
+    int i = 0;
+    while (i < values.length)
+    {
+      pack(output,values[i++],delimiter);
+    }
+  }
+
+  protected static void pack(StringBuilder sb, String value, char delim)
+  {
+    for (int i = 0; i < value.length(); i++)
+    {
+      char x = value.charAt(i);
+      if (x == delim || x == '\\')
       {
-        transformationConnectorPool.release(connection,connector);
+        sb.append('\\');
       }
+      sb.append(x);
     }
-    return rval;
+    sb.append(delim);
   }
 
   /** Record a document version, but don't ingest it.
@@ -540,19 +663,48 @@ public class IncrementalIngester extends
   *@param activities is the object used in case a document needs to be removed from the output index as the result of this operation.
   */
   @Override
+  @Deprecated
   public void documentRecord(String outputConnectionName,
     String identifierClass, String identifierHash,
     String documentVersion,
     long recordTime, IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
+    documentRecord(
+      new RuntPipelineSpecificationBasic(outputConnectionName),
+      identifierClass, identifierHash,
+      documentVersion,
+      recordTime, activities);
+  }
+  
+  /** Record a document version, but don't ingest it.
+  * The purpose of this method is to keep track of the frequency at which ingestion "attempts" take place.
+  * ServiceInterruption is thrown if this action must be rescheduled.
+  *@param pipelineSpecificationBasic is the basic pipeline specification needed.
+  *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+  *@param identifierHash is the hashed document identifier.
+  *@param documentVersion is the document version.
+  *@param recordTime is the time at which the recording took place, in milliseconds since epoch.
+  *@param activities is the object used in case a document needs to be removed from the output index as the result of this operation.
+  */
+  @Override
+  public void documentRecord(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String identifierClass, String identifierHash,
+    String documentVersion, long recordTime,
+    IOutputActivity activities)
+    throws ManifoldCFException, ServiceInterruption
+  {
     String docKey = makeKey(identifierClass,identifierHash);
 
-    if (Logging.ingest.isDebugEnabled())
-    {
-      Logging.ingest.debug("Recording document '"+docKey+"' for output connection '"+outputConnectionName+"'");
-    }
+    // MHL - move to somewhere else
+    //if (Logging.ingest.isDebugEnabled())
+    //{
+    //  Logging.ingest.debug("Recording document '"+docKey+"' for output connection '"+outputConnectionName+"'");
+    //}
 
+    /* This should invoke the pipeline to do the recording...
+    MHL
     // With a null document URI, this can't throw either ServiceInterruption or IOException
     try
     {
@@ -569,11 +721,7 @@ public class IncrementalIngester extends
     {
       throw new RuntimeException("Unexpected IOException thrown: "+e.getMessage(),e);
     }
-    catch (ServiceInterruption e)
-    {
-      throw new RuntimeException("Unexpected ServiceInterruption thrown: "+e.getMessage(),e);
-    }
-
+    */
   }
 
   /** Ingest a document.
@@ -651,10 +799,9 @@ public class IncrementalIngester extends
   {
     try
     {
-      return documentIngest(new String[0],
-        new String[0],
-        outputConnectionName,
-        outputVersion,
+      return documentIngest(
+        new RuntPipelineSpecificationWithVersions(outputConnectionName,outputVersion,
+          "","","","",""),
         identifierClass, identifierHash,
         documentVersion,
         "",
@@ -763,15 +910,10 @@ public class IncrementalIngester extends
   * method also REMOVES ALL OLD METADATA.  When complete, the index will contain only the metadata
   * described by the RepositoryDocument object passed to this method.
   * ServiceInterruption is thrown if the document ingestion must be rescheduled.
-  *@param transformationConnectionNames are the names of the transformation connections associated with this action.
-  *@param transformationDescriptionStrings are the description strings corresponding to the transformation connection names.
-  *@param outputConnectionName is the name of the output connection associated with this action.
-  *@param otuputDescriptionString is the description string corresponding to the output connection.
+  *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hashed document identifier.
   *@param documentVersion is the document version.
-  *@param transformationVersion is the version string for the transformations to be performed on the document.
-  *@param outputVersion is the output version string for the output connection.
   *@param parameterVersion is the version string for the forced parameters.
   *@param authorityName is the name of the authority associated with the document, if any.
   *@param data is the document data.  The data is closed after ingestion is complete.
@@ -783,192 +925,46 @@ public class IncrementalIngester extends
   */
   @Override
   public boolean documentIngest(
-    String[] transformationConnectionNames,
-    String[] transformationDescriptionStrings,
-    String outputConnectionName,
-    String outputDescriptionString,
+    IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
     String identifierClass, String identifierHash,
     String documentVersion,
-    String transformationVersion,
-    String outputVersion,
     String parameterVersion,
     String authorityName,
-    RepositoryDocument data,
+    RepositoryDocument document,
     long ingestTime, String documentURI,
     IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption, IOException
   {
-    IOutputConnection outputConnection = connectionManager.load(outputConnectionName);
-    ITransformationConnection[] transformationConnections = transformationConnectionManager.loadMultiple(transformationConnectionNames);
+    PipelineConnectionsWithVersions pipelineConnectionsWithVersions = new PipelineConnectionsWithVersions(pipelineSpecificationWithVersions);
     
     String docKey = makeKey(identifierClass,identifierHash);
 
-    if (Logging.ingest.isDebugEnabled())
-    {
-      Logging.ingest.debug("Ingesting document '"+docKey+"' into output connection '"+outputConnectionName+"'");
-    }
-    return performIngestion(transformationConnections,transformationDescriptionStrings,
-      outputConnection,outputDescriptionString,
-      docKey,documentVersion,outputVersion,transformationVersion,parameterVersion,
-      authorityName,
-      data,
-      ingestTime,documentURI,
-      activities);
-  }
-
-  /** Do the actual ingestion, or just record it if there's nothing to ingest. */
-  protected boolean performIngestion(
-    ITransformationConnection[] transformationConnections, String[] transformationDescriptionStrings,
-    IOutputConnection outputConnection, String outputDescriptionString,
-    String docKey, String documentVersion, String outputVersion, String transformationVersion, String parameterVersion,
-    String authorityNameString,
-    RepositoryDocument data,
-    long ingestTime, String documentURI,
-    IOutputActivity activities)
-    throws ManifoldCFException, ServiceInterruption, IOException
-  {
-    String outputConnectionName = outputConnection.getName();
-    
-    // No transactions; not safe because post may take too much time
-
-    // First, calculate a document uri hash value
-    String documentURIHash = null;
-    if (documentURI != null)
-      documentURIHash = ManifoldCF.hash(documentURI);
-
-    String oldURI = null;
-    String oldURIHash = null;
-    String oldOutputVersion = null;
+    // MHL - move to the appropriate place
+    //if (Logging.ingest.isDebugEnabled())
+    //{
+    //  Logging.ingest.debug("Ingesting document '"+docKey+"' into output connection '"+outputConnectionName+"'");
+    //}
 
+    // Set indexing date
+    document.setIndexingDate(new Date());
     
-    while (true)
-    {
-      long sleepAmt = 0L;
-      try
-      {
-        // See what uri was used before for this doc, if any
-        ArrayList list = new ArrayList();
-        String query = buildConjunctionClause(list,new ClauseDescription[]{
-          new UnitaryClause(docKeyField,docKey),
-          new UnitaryClause(outputConnNameField,outputConnectionName)});
-          
-        IResultSet set = performQuery("SELECT "+docURIField+","+uriHashField+","+lastOutputVersionField+" FROM "+getTableName()+
-          " WHERE "+query,list,null,null);
-
-        if (set.getRowCount() > 0)
-        {
-          IResultRow row = set.getRow(0);
-          oldURI = (String)row.getValue(docURIField);
-          oldURIHash = (String)row.getValue(uriHashField);
-          oldOutputVersion = (String)row.getValue(lastOutputVersionField);
-        }
-        
-        break;
-      }
-      catch (ManifoldCFException e)
-      {
-        // Look for deadlock and retry if so
-        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
-        {
-          if (Logging.perf.isDebugEnabled())
-            Logging.perf.debug("Aborted select looking for status: "+e.getMessage());
-          sleepAmt = getSleepAmt();
-          continue;
-        }
-        throw e;
-      }
-      finally
-      {
-        sleepFor(sleepAmt);
-      }
-    }
-
-    // If uri hashes collide, then we must be sure to eliminate only the *correct* records from the table, or we will leave
-    // dangling documents around.  So, all uri searches and comparisons MUST compare the actual uri as well.
-
-    // But, since we need to insure that any given URI is only worked on by one thread at a time, use critical sections
-    // to block the rare case that multiple threads try to work on the same URI.
-    int uriCount = 0;
-    if (documentURI != null)
-      uriCount++;
-    if (oldURI != null && (documentURI == null || !documentURI.equals(oldURI)))
-      uriCount++;
-    String[] lockArray = new String[uriCount];
-    uriCount = 0;
-    if (documentURI != null)
-      lockArray[uriCount++] = outputConnectionName+":"+documentURI;
-    if (oldURI != null && (documentURI == null || !documentURI.equals(oldURI)))
-      lockArray[uriCount++] = outputConnectionName+":"+oldURI;
-
-    lockManager.enterCriticalSections(null,null,lockArray);
+    // Set up a pipeline
+    PipelineObjectWithVersions pipeline = pipelineGrabWithVersions(pipelineConnectionsWithVersions);
+    if (pipeline == null)
+      // A connector is not installed; treat this as a service interruption.
+      throw new ServiceInterruption("Pipeline connector not installed",0L);
     try
     {
-
-      ArrayList list = new ArrayList();
-      
-      if (oldURI != null && (documentURI == null || !oldURI.equals(documentURI)))
-      {
-        // Delete all records from the database that match the old URI, except for THIS record.
-        list.clear();
-        String query = buildConjunctionClause(list,new ClauseDescription[]{
-          new UnitaryClause(uriHashField,"=",oldURIHash),
-          new UnitaryClause(outputConnNameField,"=",outputConnectionName)});
-        list.add(docKey);
-        performDelete("WHERE "+query+" AND "+docKeyField+"!=?",list,null);
-        removeDocument(outputConnection,oldURI,oldOutputVersion,new OutputRemoveActivitiesWrapper(activities,outputConnectionName));
-      }
-
-      if (documentURI != null)
-      {
-        // Get rid of all records that match the NEW uri, except for this record.
-        list.clear();
-        String query = buildConjunctionClause(list,new ClauseDescription[]{
-          new UnitaryClause(uriHashField,"=",documentURIHash),
-          new UnitaryClause(outputConnNameField,"=",outputConnectionName)});
-        list.add(docKey);
-        performDelete("WHERE "+query+" AND "+ docKeyField+"!=?",list,null);
-      }
-
-      // Now, we know we are ready for the ingest.
-      if (documentURI != null)
-      {
-        // Here are the cases:
-        // 1) There was a service interruption before the upload started.
-        // (In that case, we don't need to log anything, just reschedule).
-        // 2) There was a service interruption after the document was transmitted.
-        // (In that case, we should presume that the document was ingested, but
-        //  reschedule another import anyway.)
-        // 3) Everything went OK
-        // (need to log the ingestion.)
-        // 4) Everything went OK, but we were told we have an illegal document.
-        // (We note the ingestion because if we don't we will be forced to repeat ourselves.
-        //  In theory, document doesn't need to be deleted, but there is no way to signal
-        //  that at the moment.)
-
-        // Note an ingestion before we actually try it.
-        // This is a marker that says "something is there"; it has an empty version, which indicates
-        // that we don't know anything about it.  That means it will be reingested when the
-        // next version comes along, and will be deleted if called for also.
-        noteDocumentIngest(outputConnectionName,docKey,null,null,null,null,null,ingestTime,documentURI,documentURIHash);
-        int result = addOrReplaceDocument(transformationConnections,transformationDescriptionStrings,
-          outputConnection,outputDescriptionString,
-          documentURI,data,authorityNameString,
-          activities);
-        noteDocumentIngest(outputConnectionName,docKey,documentVersion,transformationVersion, outputVersion,parameterVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
-        return result == IOutputConnector.DOCUMENTSTATUS_ACCEPTED;
-      }
-
-      // If we get here, it means we are noting that the document was examined, but that no change was required.  This is signaled
-      // to noteDocumentIngest by having the null documentURI.
-      noteDocumentIngest(outputConnectionName,docKey,documentVersion,transformationVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,null,null);
-      return true;
+      return pipeline.addOrReplaceDocumentWithException(documentURI,document,documentVersion,parameterVersion,authorityNameString,finalActivities,ingestTime);
     }
     finally
     {
-      lockManager.leaveCriticalSections(null,null,lockArray);
+      pipeline.release();
     }
   }
 
+
+
   /** Note the fact that we checked a document (and found that it did not need to be ingested, because the
   * versions agreed).
   *@param outputConnectionName is the name of the output connection associated with this action.
@@ -977,37 +973,66 @@ public class IncrementalIngester extends
   *@param checkTime is the time at which the check took place, in milliseconds since epoch.
   */
   @Override
+  @Deprecated
   public void documentCheckMultiple(String outputConnectionName,
     String[] identifierClasses, String[] identifierHashes,
     long checkTime)
     throws ManifoldCFException
   {
+    documentCheckMultiple(new RuntPipelineSpecificationBasic(outputConnectionName),
+      identifierClasses,identifierHashes,checkTime);
+  }
+  
+  protected static String[] extractOutputConnectionNames(IPipelineSpecificationBasic pipelineSpecificationBasic)
+  {
+    String[] rval = new String[pipelineSpecificationBasic.getOutputCount()];
+    for (int i = 0; i < rval.length; i++)
+    {
+      rval[i] = pipelineSpecificationBasic.getStageConnectionName(pipelineSpecificationBasic.getOutputStage(i));
+    }
+    return rval;
+  }
+  
+  /** Note the fact that we checked a document (and found that it did not need to be ingested, because the
+  * versions agreed).
+  *@param pipelineSpecificationBasic is a pipeline specification.
+  *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+  *@param identifierHashes are the set of document identifier hashes.
+  *@param checkTime is the time at which the check took place, in milliseconds since epoch.
+  */
+  @Override
+  public void documentCheckMultiple(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String[] identifierClasses, String[] identifierHashes,
+    long checkTime)
+    throws ManifoldCFException
+  {
+    // Extract output connection names from pipeline spec
+    String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
     beginTransaction();
     try
     {
       int maxClauses;
       
-      HashMap docIDValues = new HashMap();
-      int j = 0;
-      while (j < identifierHashes.length)
+      Set<String> docIDValues = new HashSet<String>();
+      for (int j = 0; j < identifierHashes.length; j++)
       {
         String docDBString = makeKey(identifierClasses[j],identifierHashes[j]);
-        docIDValues.put(docDBString,docDBString);
-        j++;
+        docIDValues.add(docDBString);
       }
 
       // Now, perform n queries, each of them no larger the maxInClause in length.
       // Create a list of row id's from this.
-      HashMap rowIDSet = new HashMap();
-      Iterator iter = docIDValues.keySet().iterator();
-      j = 0;
-      ArrayList list = new ArrayList();
-      maxClauses = maxClausesRowIdsForDocIds(outputConnectionName);
+      Set<Long> rowIDSet = new HashSet<Long>();
+      Iterator<String> iter = docIDValues.iterator();
+      int j = 0;
+      List<String> list = new ArrayList<String>();
+      maxClauses = maxClausesRowIdsForDocIds(outputConnectionNames);
       while (iter.hasNext())
       {
         if (j == maxClauses)
         {
-          findRowIdsForDocIds(outputConnectionName,rowIDSet,list);
+          findRowIdsForDocIds(outputConnectionNames,rowIDSet,list);
           list.clear();
           j = 0;
         }
@@ -1016,27 +1041,27 @@ public class IncrementalIngester extends
       }
 
       if (j > 0)
-        findRowIdsForDocIds(outputConnectionName,rowIDSet,list);
+        findRowIdsForDocIds(outputConnectionNames,rowIDSet,list);
 
       // Now, break row id's into chunks too; submit one chunk at a time
       j = 0;
-      list.clear();
-      iter = rowIDSet.keySet().iterator();
+      List<Long> list2 = new ArrayList<Long>();
+      Iterator<Long> iter2 = rowIDSet.iterator();
       maxClauses = maxClausesUpdateRowIds();
-      while (iter.hasNext())
+      while (iter2.hasNext())
       {
         if (j == maxClauses)
         {
-          updateRowIds(list,checkTime);
-          list.clear();
+          updateRowIds(list2,checkTime);
+          list2.clear();
           j = 0;
         }
-        list.add(iter.next());
+        list.add(iter2.next());
         j++;
       }
 
       if (j > 0)
-        updateRowIds(list,checkTime);
+        updateRowIds(list2,checkTime);
     }
     catch (ManifoldCFException e)
     {
@@ -1062,12 +1087,31 @@ public class IncrementalIngester extends
   *@param checkTime is the time at which the check took place, in milliseconds since epoch.
   */
   @Override
+  @Deprecated
   public void documentCheck(String outputConnectionName,
     String identifierClass, String identifierHash,
     long checkTime)
     throws ManifoldCFException
   {
-    documentCheckMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash},checkTime);
+    documentCheck(new RuntPipelineSpecificationBasic(outputConnectionName),
+      identifierClass,identifierHash,checkTime);
+  }
+  
+  /** Note the fact that we checked a document (and found that it did not need to be ingested, because the
+  * versions agreed).
+  *@param pipelineSpecificationBasic is a basic pipeline specification.
+  *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+  *@param identifierHash is the hashed document identifier.
+  *@param checkTime is the time at which the check took place, in milliseconds since epoch.
+  */
+  @Override
+  public void documentCheck(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String identifierClass, String identifierHash,
+    long checkTime)
+    throws ManifoldCFException
+  {
+    documentCheckMultiple(pipelineSpecificationBasic,new String[]{identifierClass},new String[]{identifierHash},checkTime);
   }
 
   /** Calculate the number of clauses.
@@ -1079,7 +1123,7 @@ public class IncrementalIngester extends
   
   /** Update a chunk of row ids.
   */
-  protected void updateRowIds(ArrayList list, long checkTime)
+  protected void updateRowIds(List<Long> list, long checkTime)
     throws ManifoldCFException
   {
     ArrayList newList = new ArrayList();
@@ -1091,6 +1135,7 @@ public class IncrementalIngester extends
     performUpdate(map,"WHERE "+query,newList,null);
   }
 
+
   /** Delete multiple documents from the search engine index.
   *@param outputConnectionNames are the names of the output connections associated with this action.
   *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
@@ -1098,45 +1143,64 @@ public class IncrementalIngester extends
   *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
   */
   @Override
+  @Deprecated
   public void documentDeleteMultiple(String[] outputConnectionNames,
     String[] identifierClasses, String[] identifierHashes,
     IOutputRemoveActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
-
-    // Segregate request by connection names
-    HashMap keyMap = new HashMap();
-    int i = 0;
-    while (i < outputConnectionNames.length)
+    IPipelineSpecificationBasic[] pipelineSpecs = new IPipelineSpecificationBasic[outputConnectionNames.length];
+    for (int i = 0; i < pipelineSpecs.length; i++)
     {
-      String outputConnectionName = outputConnectionNames[i];
-      ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
-      if (list == null)
+      pipelineSpecs[i] = new RuntPipelineSpecificationBasic(outputConnectionNames[i]);
+    }
+    documentDeleteMultiple(pipelineSpecs,
+      identifierClasses,identifierHashes,activities);
+  }
+  
+  /** Delete multiple documents from the search engine index.
+  *@param pipelineSpecificationBasics are the pipeline specifications associated with the documents.
+  *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+  *@param identifierHashes is tha array of document identifier hashes if the documents.
+  *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
+  */
+  @Override
+  public void documentDeleteMultiple(
+    IPipelineSpecificationBasic[] pipelineSpecificationBasics,
+    String[] identifierClasses, String[] identifierHashes,
+    IOutputRemoveActivity activities)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    // Segregate request by pipeline spec instance address.  Not perfect but works in the
+    // environment it is used it.
+    Map<IPipelineSpecificationBasic,List<Integer>> keyMap = new HashMap<IPipelineSpecificationBasic,List<Integer>>();
+    for (int i = 0; i < pipelineSpecificationBasics.length; i++)
+    {
+      IPipelineSpecificationBasic spec = pipelineSpecificationBasics[i];
+      List<Integer> list = keyMap.get(spec);
+      if (list == null)
       {
-        list = new ArrayList();
-        keyMap.put(outputConnectionName,list);
+        list = new ArrayList<Integer>();
+        keyMap.put(spec,list);
       }
       list.add(new Integer(i));
-      i++;
     }
 
     // Create the return array.
-    Iterator iter = keyMap.keySet().iterator();
+    Iterator<IPipelineSpecificationBasic> iter = keyMap.keySet().iterator();
     while (iter.hasNext())
     {
-      String outputConnectionName = (String)iter.next();
-      ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
+      IPipelineSpecificationBasic spec = iter.next();
+      List<Integer> list = keyMap.get(spec);
       String[] localIdentifierClasses = new String[list.size()];
       String[] localIdentifierHashes = new String[list.size()];
-      i = 0;
-      while (i < localIdentifierClasses.length)
+      for (int i = 0; i < localIdentifierClasses.length; i++)
       {
-        int index = ((Integer)list.get(i)).intValue();
+        int index = list.get(i).intValue();
         localIdentifierClasses[i] = identifierClasses[index];
         localIdentifierHashes[i] = identifierHashes[index];
-        i++;
       }
-      documentDeleteMultiple(outputConnectionName,localIdentifierClasses,localIdentifierHashes,activities);
+      documentDeleteMultiple(spec,localIdentifierClasses,localIdentifierHashes,activities);
     }
   }
 
@@ -1147,210 +1211,225 @@ public class IncrementalIngester extends
   *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
   */
   @Override
+  @Deprecated
   public void documentDeleteMultiple(String outputConnectionName,
     String[] identifierClasses, String[] identifierHashes,
     IOutputRemoveActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
-    activities = new OutputRemoveActivitiesWrapper(activities,outputConnectionName);
+    documentDeleteMultiple(new RuntPipelineSpecificationBasic(outputConnectionName),
+      identifierClasses,identifierHashes,activities);
+  }
+  
+  /** Delete multiple documents from the search engine index.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+  *@param identifierHashes is tha array of document identifier hashes if the documents.
+  *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
+  */
+  @Override
+  public void documentDeleteMultiple(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String[] identifierClasses, String[] identifierHashes,
+    IOutputRemoveActivity activities)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
+    // Load connection managers up front to save time
+    IOutputConnection[] outputConnections = connectionManager.loadMultiple(outputConnectionNames);
+    
+    // No transactions here, so we can cycle through the connection names one at a time
+    for (int z = 0; z < outputConnectionNames.length; z++)
+    {
+      String outputConnectionName = outputConnectionNames[z];
+      IOutputConnection connection = outputConnections[z];
 
-    IOutputConnection connection = connectionManager.load(outputConnectionName);
+      activities = new OutputRemoveActivitiesWrapper(activities,outputConnectionName);
 
-    if (Logging.ingest.isDebugEnabled())
-    {
-      int i = 0;
-      while (i < identifierHashes.length)
+      if (Logging.ingest.isDebugEnabled())
       {
-        Logging.ingest.debug("Request to delete document '"+makeKey(identifierClasses[i],identifierHashes[i])+"' from output connection '"+outputConnectionName+"'");
-        i++;
+        for (int i = 0; i < identifierHashes.length; i++)
+        {
+          Logging.ingest.debug("Request to delete document '"+makeKey(identifierClasses[i],identifierHashes[i])+"' from output connection '"+outputConnectionName+"'");
+        }
       }
-    }
 
-    // No transactions.  Time for the operation may exceed transaction timeout.
+      // No transactions.  Time for the operation may exceed transaction timeout.
 
-    // Obtain the current URIs of all of these.
-    DeleteInfo[] uris = getDocumentURIMultiple(outputConnectionName,identifierClasses,identifierHashes);
+      // Obtain the current URIs of all of these.
+      DeleteInfo[] uris = getDocumentURIMultiple(outputConnectionName,identifierClasses,identifierHashes);
 
-    // Grab critical section locks so that we can't attempt to ingest at the same time we are deleting.
-    // (This guarantees that when this operation is complete the database reflects reality.)
-    int validURIcount = 0;
-    int i = 0;
-    while (i < uris.length)
-    {
-      if (uris[i] != null && uris[i].getURI() != null)
-        validURIcount++;
-      i++;
-    }
-    String[] lockArray = new String[validURIcount];
-    String[] validURIArray = new String[validURIcount];
-    validURIcount = 0;
-    i = 0;
-    while (i < uris.length)
-    {
-      if (uris[i] != null && uris[i].getURI() != null)
-      {
-        validURIArray[validURIcount] = uris[i].getURI();
-        lockArray[validURIcount] = outputConnectionName+":"+validURIArray[validURIcount];
-        validURIcount++;
+      // Grab critical section locks so that we can't attempt to ingest at the same time we are deleting.
+      // (This guarantees that when this operation is complete the database reflects reality.)
+      int validURIcount = 0;
+      for (int i = 0; i < uris.length; i++)
+      {
+        if (uris[i] != null && uris[i].getURI() != null)
+          validURIcount++;
       }
-      i++;
-    }
-
-    lockManager.enterCriticalSections(null,null,lockArray);
-    try
-    {
-      // Fetch the document URIs for the listed documents
-      int j = 0;
-      while (j < uris.length)
+      String[] lockArray = new String[validURIcount];
+      String[] validURIArray = new String[validURIcount];
+      validURIcount = 0;
+      for (int i = 0; i < uris.length; i++)
       {
-        if (uris[j] != null && uris[j].getURI() != null)
-          removeDocument(connection,uris[j].getURI(),uris[j].getOutputVersion(),activities);
-        j++;
+        if (uris[i] != null && uris[i].getURI() != null)
+        {
+          validURIArray[validURIcount] = uris[i].getURI();
+          lockArray[validURIcount] = outputConnectionName+":"+validURIArray[validURIcount];
+          validURIcount++;
+        }
       }
 
-      // Now, get rid of all rows that match the given uris.
-      // Do the queries together, then the deletes
-      beginTransaction();
+      lockManager.enterLocks(null,null,lockArray);
       try
       {
-        // The basic process is this:
-        // 1) Come up with a set of urihash values
-        // 2) Find the matching, corresponding id values
-        // 3) Delete the rows corresponding to the id values, in sequence
-
-        // Process (1 & 2) has to be broken down into chunks that contain the maximum
-        // number of doc hash values each.  We need to avoid repeating doc hash values,
-        // so the first step is to come up with ALL the doc hash values before looping
-        // over them.
-
-        int maxClauses;
-        
-        // Find all the documents that match this set of URIs
-        HashMap docURIHashValues = new HashMap();
-        HashMap docURIValues = new HashMap();
-        j = 0;
-        while (j < validURIArray.length)
+        // Fetch the document URIs for the listed documents
+        for (int i = 0; j < uris.length; i++)
         {
-          String docDBString = validURIArray[j++];
-          String docDBHashString = ManifoldCF.hash(docDBString);
-          docURIValues.put(docDBString,docDBString);
-          docURIHashValues.put(docDBHashString,docDBHashString);
+          if (uris[i] != null && uris[i].getURI() != null)
+            removeDocument(connection,uris[i].getURI(),uris[i].getOutputVersion(),activities);
         }
 
-        // Now, perform n queries, each of them no larger the maxInClause in length.
-        // Create a list of row id's from this.
-        HashMap rowIDSet = new HashMap();
-        Iterator iter = docURIHashValues.keySet().iterator();
-        j = 0;
-        ArrayList hashList = new ArrayList();
-        maxClauses = maxClausesRowIdsForURIs(outputConnectionName);
-        while (iter.hasNext())
+        // Now, get rid of all rows that match the given uris.
+        // Do the queries together, then the deletes
+        beginTransaction();
+        try
         {
-          if (j == maxClauses)
+          // The basic process is this:
+          // 1) Come up with a set of urihash values
+          // 2) Find the matching, corresponding id values
+          // 3) Delete the rows corresponding to the id values, in sequence
+
+          // Process (1 & 2) has to be broken down into chunks that contain the maximum
+          // number of doc hash values each.  We need to avoid repeating doc hash values,
+          // so the first step is to come up with ALL the doc hash values before looping
+          // over them.
+
+          int maxClauses;
+          
+          // Find all the documents that match this set of URIs
+          Set<String> docURIHashValues = new HashSet<String>();
+          Set<String> docURIValues = new HashSet<String>();
+          for (String docDBString : validURIArray)
           {
-            findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
-            hashList.clear();
-            j = 0;
+            String docDBHashString = ManifoldCF.hash(docDBString);
+            docURIValues.add(docDBString);
+            docURIHashValues.add(docDBHashString);
+          }
+
+          // Now, perform n queries, each of them no larger the maxInClause in length.
+          // Create a list of row id's from this.
+          Set<Long> rowIDSet = new HashSet<Long>();
+          Iterator<String> iter = docURIHashValues.iterator();
+          int j = 0;
+          List<String> hashList = new ArrayList<String>();
+          maxClauses = maxClausesRowIdsForURIs(outputConnectionName);
+          while (iter.hasNext())
+          {
+            if (j == maxClauses)
+            {
+              findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
+              hashList.clear();
+              j = 0;
+            }
+            hashList.add(iter.next());
+            j++;
           }
-          hashList.add(iter.next());
-          j++;
-        }
 
-        if (j > 0)
-          findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
+          if (j > 0)
+            findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
 
-        // Next, go through the list of row IDs, and delete them in chunks
-        j = 0;
-        ArrayList list = new ArrayList();
-        iter = rowIDSet.keySet().iterator();
-        maxClauses = maxClausesDeleteRowIds();
-        while (iter.hasNext())
-        {
-          if (j == maxClauses)
+          // Next, go through the list of row IDs, and delete them in chunks
+          j = 0;
+          List<Long> list = new ArrayList<Long>();
+          Iterator<Long> iter2 = rowIDSet.iterator();
+          maxClauses = maxClausesDeleteRowIds();
+          while (iter2.hasNext())
           {
-            deleteRowIds(list);
-            list.clear();
-            j = 0;
+            if (j == maxClauses)
+            {
+              deleteRowIds(list);
+              list.clear();
+              j = 0;
+            }
+            list.add(iter.next());
+            j++;
           }
-          list.add(iter.next());
-          j++;
-        }
 
-        if (j > 0)
-          deleteRowIds(list);
+          if (j > 0)
+            deleteRowIds(list);
 
-        // Now, find the set of documents that remain that match the document identifiers.
-        HashMap docIdValues = new HashMap();
-        j = 0;
-        while (j < identifierHashes.length)
-        {
-          String docDBString = makeKey(identifierClasses[j],identifierHashes[j]);
-          docIdValues.put(docDBString,docDBString);
-          j++;
-        }
+          // Now, find the set of documents that remain that match the document identifiers.
+          Set<String> docIdValues = new HashSet<String>();
+          for (int i = 0; i < identifierHashes.length; i++)
+          {
+            String docDBString = makeKey(identifierClasses[i],identifierHashes[i]);
+            docIdValues.add(docDBString);
+          }
 
-        // Now, perform n queries, each of them no larger the maxInClause in length.
-        // Create a list of row id's from this.
-        rowIDSet.clear();
-        iter = docIdValues.keySet().iterator();
-        j = 0;
-        list.clear();
-        maxClauses = maxClausesRowIdsForDocIds(outputConnectionName);
-        while (iter.hasNext())
-        {
-          if (j == maxClauses)
+          // Now, perform n queries, each of them no larger the maxInClause in length.
+          // Create a list of row id's from this.
+          rowIDSet.clear();
+          iter = docIdValues.iterator();
+          j = 0;
+          List<String> list2 = new ArrayList<String>();
+          maxClauses = maxClausesRowIdsForDocIds(outputConnectionName);
+          while (iter.hasNext())
           {
-            findRowIdsForDocIds(outputConnectionName,rowIDSet,list);
-            list.clear();
-            j = 0;
+            if (j == maxClauses)
+            {
+              findRowIdsForDocIds(outputConnectionName,rowIDSet,list2);
+              list2.clear();
+              j = 0;
+            }
+            list2.add(iter.next());
+            j++;
           }
-          list.add(iter.next());
-          j++;
-        }
 
-        if (j > 0)
-          findRowIdsForDocIds(outputConnectionName,rowIDSet,list);
+          if (j > 0)
+            findRowIdsForDocIds(outputConnectionName,rowIDSet,list2);
 
-        // Next, go through the list of row IDs, and delete them in chunks
-        j = 0;
-        list.clear();
-        iter = rowIDSet.keySet().iterator();
-        maxClauses = maxClausesDeleteRowIds();
-        while (iter.hasNext())
-        {
-          if (j == maxClauses)
+          // Next, go through the list of row IDs, and delete them in chunks
+          j = 0;
+          list.clear();
+          iter2 = rowIDSet.iterator();
+          maxClauses = maxClausesDeleteRowIds();
+          while (iter2.hasNext())
           {
-            deleteRowIds(list);
-            list.clear();
-            j = 0;
+            if (j == maxClauses)
+            {
+              deleteRowIds(list);
+              list.clear();
+              j = 0;
+            }
+            list.add(iter2.next());
+            j++;
           }
-          list.add(iter.next());
-          j++;
-        }
 
-        if (j > 0)
-          deleteRowIds(list);
+          if (j > 0)
+            deleteRowIds(list);
 
-      }
-      catch (ManifoldCFException e)
-      {
-        signalRollback();
-        throw e;
-      }
-      catch (Error e)
-      {
-        signalRollback();
-        throw e;
+        }
+        catch (ManifoldCFException e)
+        {
+          signalRollback();
+          throw e;
+        }
+        catch (Error e)
+        {
+          signalRollback();
+          throw e;
+        }
+        finally
+        {
+          endTransaction();
+        }
       }
       finally
       {
-        endTransaction();
+        lockManager.leaveLocks(null,null,lockArray);
       }
     }
-    finally
-    {
-      lockManager.leaveCriticalSections(null,null,lockArray);
-    }
   }
 
   /** Calculate the clauses.
@@ -1364,7 +1443,7 @@ public class IncrementalIngester extends
   /** Given values and parameters corresponding to a set of hash values, add corresponding
   * table row id's to the output map.
   */
-  protected void findRowIdsForURIs(String outputConnectionName, HashMap rowIDSet, HashMap uris, ArrayList hashParamValues)
+  protected void findRowIdsForURIs(String outputConnectionName, Set<Long> rowIDSet, Set<String> uris, List<String> hashParamValues)
     throws ManifoldCFException
   {
     ArrayList list = new ArrayList();
@@ -1374,18 +1453,17 @@ public class IncrementalIngester extends
       
     IResultSet set = performQuery("SELECT "+idField+","+docURIField+" FROM "+
       getTableName()+" WHERE "+query,list,null,null);
-      
-    int i = 0;
-    while (i < set.getRowCount())
+    
+    for (int i = 0; i < set.getRowCount(); i++)
     {
-      IResultRow row = set.getRow(i++);
+      IResultRow row = set.getRow(i);
       String docURI = (String)row.getValue(docURIField);
       if (docURI != null && docURI.length() > 0)
       {
-        if (uris.get(docURI) != null)
+        if (uris.contains(docURI))
         {
           Long rowID = (Long)row.getValue(idField);
-          rowIDSet.put(rowID,rowID);
+          rowIDSet.add(rowID);
         }
       }
     }
@@ -1402,7 +1480,7 @@ public class IncrementalIngester extends
   /** Given values and parameters corresponding to a set of hash values, add corresponding
   * table row id's to the output map.
   */
-  protected void findRowIdsForDocIds(String outputConnectionName, HashMap rowIDSet, ArrayList paramValues)
+  protected void findRowIdsForDocIds(String outputConnectionName, Set<Long> rowIDSet, List<String> paramValues)
     throws ManifoldCFException
   {
     ArrayList list = new ArrayList();
@@ -1412,13 +1490,12 @@ public class IncrementalIngester extends
       
     IResultSet set = performQuery("SELECT "+idField+" FROM "+
       getTableName()+" WHERE "+query,list,null,null);
-      
-    int i = 0;
-    while (i < set.getRowCount())
+    
+    for (int i = 0; i < set.getRowCount(); i++)
     {
-      IResultRow row = set.getRow(i++);
+      IResultRow row = set.getRow(i);
       Long rowID = (Long)row.getValue(idField);
-      rowIDSet.put(rowID,rowID);
+      rowIDSet.add(rowID);
     }
   }
 
@@ -1447,12 +1524,30 @@ public class IncrementalIngester extends
   *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
   */
   @Override
+  @Deprecated
   public void documentDelete(String outputConnectionName,
     String identifierClass, String identifierHash,
     IOutputRemoveActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
-    documentDeleteMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash},activities);
+    documentDelete(new RuntPipelineSpecificationBasic(outputConnectionName),
+      identifierClass,identifierHash,activities);
+  }
+  
+  /** Delete a document from the search engine index.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+  *@param identifierHash is the hash of the id of the document.
+  *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
+  */
+  @Override
+  public void documentDelete(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String identifierClass, String identifierHash,
+    IOutputRemoveActivity activities)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    documentDeleteMultiple(pipelineSpecificationBasic,new String[]{identifierClass},new String[]{identifierHash},activities);
   }
 
   /** Find out what URIs a SET of document URIs are currently ingested.
@@ -2081,34 +2176,6 @@ public class IncrementalIngester extends
 
   // Protected methods
 
-  /** Add or replace document, using the specified output connection, via the standard pool.
-  */
-  protected int addOrReplaceDocument(
-    ITransformationConnection[] transformationConnections, String[] transformationDescriptionStrings,
-    IOutputConnection outputConnection, String outputDescriptionString,
-    String documentURI, RepositoryDocument document, String authorityNameString,
-    IOutputAddActivity finalActivities)
-    throws ManifoldCFException, ServiceInterruption, IOException
-  {
-    // Set indexing date
-    document.setIndexingDate(new Date());
-    
-    // Set up a pipeline
-    PipelineObject pipeline = pipelineGrab(transformationConnections,outputConnection,
-      transformationDescriptionStrings,outputDescriptionString);
-    if (pipeline == null)
-      // A connector is not installed; treat this as a service interruption.
-      throw new ServiceInterruption("Pipeline connector not installed",0L);
-    try
-    {
-      return pipeline.addOrReplaceDocumentWithException(documentURI,document,authorityNameString,finalActivities);
-    }
-    finally
-    {
-      pipeline.release();
-    }
-  }
-
   /** Remove document, using the specified output connection, via the standard pool.
   */
   protected void removeDocument(IOutputConnection connection, String documentURI, String outputDescription, IOutputRemoveActivity activities)
@@ -2347,303 +2414,443 @@ public class IncrementalIngester extends
   
   protected class PipelineObject
   {
-    public final IOutputConnection outputConnection;
-    public final IOutputConnector outputConnector;
-    public final ITransformationConnection[] transformationConnections;
+    public final PipelineConnections pipelineConnections;
+    public final IOutputConnector[] outputConnectors;
     public final ITransformationConnector[] transformationConnectors;
-    public final String outputDescription;
-    public final String[] transformationDescriptions;
     
     public PipelineObject(
-      ITransformationConnection[] transformationConnections, ITransformationConnector[] transformationConnectors,
-      IOutputConnection outputConnection, IOutputConnector outputConnector,
-      String[] transformationDescriptions, String outputDescription)
+      PipelineConnections pipelineConnections,
+      ITransformationConnector[] transformationConnectors,
+      IOutputConnector[] outputConnectors)
     {
-      this.transformationConnections = transformationConnections;
+      this.pipelineConnections = pipelineConnections;
       this.transformationConnectors = transformationConnectors;
-      this.outputConnection = outputConnection;
-      this.outputConnector = outputConnector;
-      this.outputDescription = outputDescription;
-      this.transformationDescriptions = transformationDescriptions;
+      this.outputConnectors = outputConnectors;
     }
     
     public boolean checkMimeTypeIndexable(String mimeType, IOutputCheckActivity finalActivity)
       throws ManifoldCFException, ServiceInterruption
     {
-      PipelineCheckEntryPoint entryPoint = buildCheckPipeline(finalActivity);
-      return entryPoint.getPipelineConnector().checkMimeTypeIndexable(entryPoint.getPipelineDescriptionString(),mimeType,entryPoint.getPipelineCheckActivity());
+      PipelineCheckFanout entryPoint = buildCheckPipeline(finalActivity);
+      return entryPoint.checkMimeTypeIndexable(mimeType);
     }
 
     public boolean checkDocumentIndexable(File localFile, IOutputCheckActivity finalActivity)
       throws ManifoldCFException, ServiceInterruption
     {
-      PipelineCheckEntryPoint entryPoint = buildCheckPipeline(finalActivity);
-      return entryPoint.getPipelineConnector().checkDocumentIndexable(entryPoint.getPipelineDescriptionString(),localFile,entryPoint.getPipelineCheckActivity());
+      PipelineCheckFanout entryPoint = buildCheckPipeline(finalActivity);
+      return entryPoint.checkDocumentIndexable(localFile);
     }
 
     public boolean checkLengthIndexable(long length, IOutputCheckActivity finalActivity)
       throws ManifoldCFException, ServiceInterruption
     {
-      PipelineCheckEntryPoint entryPoint = buildCheckPipeline(finalActivity);
-      return entryPoint.getPipelineConnector().checkLengthIndexable(entryPoint.getPipelineDescriptionString(),length,entryPoint.getPipelineCheckActivity());
+      PipelineCheckFanout entryPoint = buildCheckPipeline(finalActivity);
+      return entryPoint.checkLengthIndexable(length);
     }
     
     public boolean checkURLIndexable(String uri, IOutputCheckActivity finalActivity)
       throws ManifoldCFException, ServiceInterruption
     {
-      PipelineCheckEntryPoint entryPoint = buildCheckPipeline(finalActivity);
-      return entryPoint.getPipelineConnector().checkURLIndexable(entryPoint.getPipelineDescriptionString(),uri,entryPoint.getPipelineCheckActivity());
+      PipelineCheckFanout entryPoint = buildCheckPipeline(finalActivity);
+      return entryPoint.checkURLIndexable(uri);
     }
 
-    public int addOrReplaceDocumentWithException(String documentURI, RepositoryDocument document, String authorityNameString, IOutputAddActivity finalActivity)
-      throws ManifoldCFException, ServiceInterruption, IOException
-    {
-      PipelineAddEntryPoint entryPoint = buildAddPipeline(finalActivity);
-      return entryPoint.getPipelineConnector().addOrReplaceDocumentWithException(documentURI,entryPoint.getPipelineDescriptionString(),
-        document,authorityNameString,entryPoint.getPipelineAddActivity());
-    }
-    
     public void release()
       throws ManifoldCFException
     {
-      outputConnectorPool.release(outputConnection,outputConnector);
-      transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
+      outputConnectorPool.releaseMultiple(pipelineConnections.getOutputConnections(),outputConnectors);
+      transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
     }
     
-    protected PipelineCheckEntryPoint buildCheckPipeline(IOutputCheckActivity finalActivity)
+    protected PipelineCheckFanout buildCheckPipeline(IOutputCheckActivity finalActivity)
     {
-      // Build output stage first
-      PipelineCheckEntryPoint currentStage = new PipelineCheckEntryPoint(outputConnector,outputDescription,finalActivity);
-      // Go through transformations backwards
-      int i = transformationConnectors.length;
-      while (i > 0)
+      // Algorithm for building a pipeline:
+      // (1) We start with the set of final output connection stages, and build an entry point for each one.  That's our "current set".
+      // (2) We cycle through the "current set".  For each member, we attempt to go upstream a level.
+      // (3) Before we can build the pipeline activity class for the next upstream stage, we need to have present ALL of the children that share that
+      //   parent.  If we don't have that yet, we throw the stage back into the list.
+      // (4) We continue until there is one stage left that has no parent, and that's what we return.
+      
+      // Create the current set
+      Map<Integer,PipelineCheckEntryPoint> currentSet = new HashMap<Integer,PipelineCheckEntryPoint>();
+      // First, locate all the output stages, and enter them into the set
+      IPipelineSpecification spec = pipelineConnections.getSpecification();
+      int count = spec.getOutputCount();
+      for (int i = 0; i < count; i++)
+      {
+        int outputStage = spec.getOutputStage(i);
+        PipelineCheckEntryPoint outputStageEntryPoint = new PipelineCheckEntryPoint(
+          outputConnectors[pipelineConnections.getOutputConnectionIndex(outputStage).intValue()],
+          spec.getStageDescriptionString(outputStage),finalActivity);
+        currentSet.put(new Integer(outputStage), outputStageEntryPoint);
+      }
+      // Cycle through the "current set"
+      while (true)
       {
-        i--;
-        currentStage = new PipelineCheckEntryPoint(transformationConnectors[i],transformationDescriptions[i],
-          new PipelineCheckActivity(currentStage.getPipelineConnector(),currentStage.getPipelineDescriptionString(),currentStage.getPipelineCheckActivity()));
+        int parent;
+        int[] siblings = null;
+        for (Integer outputStage : currentSet.keySet())
+        {
+          parent = spec.getStageParent(outputStage.intValue());
+          // Look up the children
+          siblings = spec.getStageChildren(parent);
+          // Are all the siblings in the current set yet?  If not, we can't proceed with this entry.
+          boolean skipToNext = false;
+          for (int sibling : siblings)
+          {
+            if (!currentSet.contains(new Integer(sibling)))
+            {
+              skipToNext = true;
+              break;
+            }
+          }
+          if (skipToNext)
+          {
+            siblings = null;
+            continue;
+          }
+          // All siblings are present!
+          break;
+        }
+        
+        // Siblings will be set if there's a stage we can do.  If not, we're done, but this should already have been detected.
+        if (siblings == null)
+          throw new IllegalStateException("Not at root but can't progress");
+        
+        PipelineCheckEntryPoint[] siblingEntryPoints = new PipelineCheckEntryPoint[siblings.size()];
+        for (int j = 0; j < siblings.size(); j++)
+        {
+          siblingEntryPoints[j] = currentSet.remove(new Integer(siblings[j]));
+        }
+        // Wrap the entry points in a fan-out class, which has pipe connector-like methods that fire across all the connectors.
+        PipelineCheckFanout pcf = new PipelineCheckFanout(siblingEntryPoints);
+        if (parent == -1)
+          return pcf;
+        PipelineCheckEntryPoint newEntry = new PipelineCheckEntryPoint(
+          transformationConnectors[pipelineConnections.getTransformationConnectionIndex(parent).intValue()],
+          spec.getStageDescriptionString(parent),pcf);
+        currentSet.put(new Integer(parent), newEntry);
       }
-      return currentStage;
+    }
+  }
+  
+  protected class PipelineObjectWithVersions extends PipelineObject
+  {
+    protected final PipelineConnectionsWithVersions pipelineConnectionsWithVersions;
+    
+    public PipelineObjectWithVersions(
+      PipelineConnectionsWithVersions pipelineConnectionsWithVersions,
+      ITransformationConnector[] transformationConnectors,
+      IOutputConnector[] outputConnectors)
+    {
+      super(pipelineConnectionsWithVersions,transformationConnectors,outputConnectors,
+        transformationDescriptions,outputDescription);
+      this.pipelineConnectionsWithVersions = pipelineConnectionsWithVersions;
     }
 
-    protected PipelineAddEntryPoint buildAddPipeline(IOutputAddActivity finalActivity)
+    public int addOrReplaceDocumentWithException(String documentURI, RepositoryDocument document, String newDocumentVersion, String newParameterVersion, String authorityNameString, IOutputAddActivity finalActivity, long ingestTime)
+      throws ManifoldCFException, ServiceInterruption, IOException
     {
-      // Build output stage first
-      PipelineAddEntryPoint currentStage = new PipelineAddEntryPoint(outputConnector,outputDescription,
-        new OutputAddActivitiesWrapper(finalActivity,outputConnection.getName()));
-      // Go through transformations backwards
-      int i = transformationConnectors.length;
-      while (i > 0)
+      PipelineAddFanout entryPoint = buildAddPipeline(finalActivity,newDocumentVersion,newParameterVersion,authorityNameString,ingestTime);
+      return entryPoint.addOrReplaceDocumentWithException(documentURI,document,authorityNameString);
+    }
+    
+    protected PipelineAddFanout buildAddPipeline(IOutputAddActivity finalActivity,
+      String newDocumentVersion, String newParameterVersion, String newAuthorityNameString,
+      long ingestTime)
+    {
+      // Algorithm for building a pipeline:
+      // (1) We start with the set of final output connection stages, and build an entry point for each one.  That's our "current set".
+      // (2) We cycle through the "current set".  For each member, we attempt to go upstream a level.
+      // (3) Before we can build the pipeline activity class for the next upstream stage, we need to have present ALL of the children that share that
+      //   parent.  If we don't have that yet, we throw the stage back into the list.
+      // (4) We continue until there is one stage left that has no parent, and that's what we return.
+      
+      // Create the current set
+      Map<Integer,PipelineAddEntryPoint> currentSet = new HashMap<Integer,PipelineAddEntryPoint>();
+      // First, locate all the output stages, and enter them into the set
+      IPipelineSpecificationWithVersions spec = pipelineConnectionsWithVersions.getSpecificationWithVersions();
+      int outputCount = spec.getOutputCount();
+      for (int i = 0; i < outputCount; i++)
+      {
+        int outputStage = spec.getOutputStage(i);
+        
+        // Compute whether we need to reindex this record to this output or not, based on spec.
+        String oldDocumentVersion = spec.getOutputDocumentVersionString(i);
+        String oldParameterVersion = spec.getOutputParameterVersionString(i);
+        String oldOutputVersion = spec.getOutputVersionString(i);
+        String oldTransformationVersion = spec.getOutputTransformationVersionString(i);
+        String oldAuthorityName = spec.getAuthorityNameString(i);
+
+        String newTransformationVersion = null;
+        
+        boolean needToReindex = (oldDocumentVersion == null);
+        if (needToReindex == false)
+        {
+          needToReindex = (!oldDocumentVersion.equals(newDocumentVersion) ||
+            !oldParameterVersion.equals(newParameterVersion) ||
+            !oldOutputVersion.equals(spec.getStageDescriptionString(outputStage)) ||
+            !oldAuthorityName.equals(newAuthorityNameString));
+        }
+        if (needToReindex == false)
+        {
+          // Compute the transformation version string
+          newTransformationVersion = computePackedTransformationVersion(spec,outputStage);
+          needToReindex = (!oldTransformationVersion.equals(newTransformationVersion));
+        }
+
+        int connectionIndex = pipelineConnectionsWithVersions.getOutputConnectionIndex(outputStage).intValue();
+        PipelineAddEntryPoint outputStageEntryPoint = new OutputAddEntryPoint(
+          outputConnectors[connectionIndex],
+          spec.getStageDescriptionString(outputStage),
+          new OutputAddActivitiesWrapper(finalActivity,spec.getStageConnectionName(outputStage)),
+          needToReindex,
+          spec.getStageConnectionName(outputStage),
+          newTransformationVersion,
+          ingestTime);
+        currentSet.put(new Integer(outputStage), outputStageEntryPoint);
+      }
+      // Cycle through the "current set"
+      while (true)
       {
-        i--;
-        currentStage = new PipelineAddEntryPoint(transformationConnectors[i],transformationDescriptions[i],
-          new PipelineAddActivity(currentStage.getPipelineConnector(),currentStage.getPipelineDescriptionString(),currentStage.getPipelineAddActivity(),
-            new TransformationRecordingActivity(finalActivity,transformationConnections[i].getName()),finalActivity));
+        int parent;
+        int[] siblings = null;
+        for (Integer outputStage : currentSet.keySet())
+        {
+          parent = spec.getStageParent(outputStage.intValue());
+          // Look up the children
+          siblings = spec.getStageChildren(parent);
+          // Are all the siblings in the current set yet?  If not, we can't proceed with this entry.
+          boolean skipToNext = false;
+          for (int sibling : siblings)
+          {
+            if (!currentSet.contains(new Integer(sibling)))
+            {
+              skipToNext = true;
+              break;
+            }
+          }
+          if (skipToNext)
+          {
+            siblings = null;
+            continue;
+          }
+          // All siblings are present!
+          break;
+        }
+        
+        // Siblings will be set if there's a stage we can do.  If not, we're done, but this should already have been detected.
+        if (siblings == null)
+          throw new IllegalStateException("Not at root but can't progress");
+        
+        PipelineAddEntryPoint[] siblingEntryPoints = new PipelineAddEntryPoint[siblings.size()];
+        for (int j = 0; j < siblings.size(); j++)
+        {
+          siblingEntryPoints[j] = currentSet.remove(new Integer(siblings[j]));
+        }
+        // Wrap the entry points in a fan-out class, which has pipe connector-like methods that fire across all the connectors.
+        PipelineAddFanout pcf = new PipelineAddFanout(siblingEntryPoints,
+          (parent==-1)?null:new TransformationRecordingActivity(finalActivity,
+            spec.getStageConnectionName(parent)),
+          finalActivity);
+        if (parent == -1)
+          return pcf;
+        PipelineAddEntryPoint newEntry = new PipelineAddEntryPoint(
+          transformationConnectors[pipelineConnections.getTransformationConnectionIndex(parent).intValue()],
+          spec.getStageDescriptionString(parent),pcf,pcf.checkNeedToReindex());
+        currentSet.put(new Integer(parent), newEntry);
       }
-      return currentStage;
+
     }
 
   }
 
-  /** This class describes the entry stage of a check pipeline.
+  /** This class describes the entry stage of multiple siblings in a check pipeline.
   */
-  public static class PipelineCheckEntryPoint
+  public static class PipelineCheckFanout implements IOutputCheckActivity
   {
-    protected final IPipelineConnector pipelineConnector;
-    protected final String pipelineDescriptionString;
-    protected final IOutputCheckActivity checkActivity;
+    protected final PipelineCheckEntryPoint[] entryPoints;
     
-    public PipelineCheckEntryPoint(IPipelineConnector pipelineConnector,
-      String pipelineDescriptionString,
-      IOutputCheckActivity checkActivity)
+    public PipelineCheckFanout(PipelineCheckEntryPoint[] entryPoints)
     {
-      this.pipelineConnector = pipelineConnector;
-      this.pipelineDescriptionString = pipelineDescriptionString;
-      this.checkActivity = checkActivity;
+      this.entryPoints = entryPoints;
     }
     
-    public IPipelineConnector getPipelineConnector()
+    @Override
+    public boolean checkMimeTypeIndexable(String mimeType)
+      throws ManifoldCFException, ServiceInterruption
     {
-      return pipelineConnector;
+      // OR all results
+      for (PipelineCheckEntryPoint p : entryPoints)
+      {
+        if (p.checkMimeTypeIndexable(mimeType))
+          return true;
+      }
+      return false;
     }
     
-    public String getPipelineDescriptionString()
+    @Override
+    public boolean checkDocumentIndexable(File localFile)
+      throws ManifoldCFException, ServiceInterruption
     {
-      return pipelineDescriptionString;
+      // OR all results
+      for (PipelineCheckEntryPoint p : entryPoints)
+      {
+        if (p.checkDocumentIndexable(localFile))
+          return true;
+      }
+      return false;
     }
-    
-    public IOutputCheckActivity getPipelineCheckActivity()
+
+    @Override
+    public boolean checkLengthIndexable(long length)
+      throws ManifoldCFException, ServiceInterruption
     {
-      return checkActivity;
-    }
+      // OR all results
+      for (PipelineCheckEntryPoint p : entryPoints)
+      {
+        if (p.checkLengthIndexable(length))
+          return true;
+      }
+      return false;
+    }
+
+    @Override
+    public boolean checkURLIndexable(String uri)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      // OR all results
+      for (PipelineCheckEntryPoint p : entryPoints)
+      {
+        if (p.checkURLIndexable(uri))
+          return true;
+      }
+      return false;
+    }
   }
   
-  /** This class is used to join together pipeline stages for check operations */
-  public static class PipelineCheckActivity implements IOutputCheckActivity
+  /** This class describes the entry stage of a check pipeline.
+  */
+  public static class PipelineCheckEntryPoint
   {
     protected final IPipelineConnector pipelineConnector;
     protected final String pipelineDescriptionString;
     protected final IOutputCheckActivity checkActivity;
-
-    public PipelineCheckActivity(IPipelineConnector pipelineConnector, String pipelineDescriptionString, IOutputCheckActivity checkActivity)
+    
+    public PipelineCheckEntryPoint(
+      IPipelineConnector pipelineConnector,
+      String pipelineDescriptionString,
+      IOutputCheckActivity checkActivity)
     {
-      this.pipelineConnector = pipelineConnector;
+      this.pipelineConnector= pipelineConnector;
       this.pipelineDescriptionString = pipelineDescriptionString;
       this.checkActivity = checkActivity;
     }
-
-    /** Detect if a mime type is acceptable downstream or not.  This method is used to determine whether it makes sense to fetch a document
-    * in the first place.
-    *@param mimeType is the mime type of the document.
-    *@return true if the mime type can be accepted by the downstream connection.
-    */
-    @Override
+    
     public boolean checkMimeTypeIndexable(String mimeType)
       throws ManifoldCFException, ServiceInterruption
     {
       return pipelineConnector.checkMimeTypeIndexable(pipelineDescriptionString,mimeType,checkActivity);
     }
-
-    /** Pre-determine whether a document (passed here as a File object) is acceptable downstream.  This method is
-    * used to determine whether a document needs to be actually transferred.  This hook is provided mainly to support
-    * search engines that only handle a small set of accepted file types.
-    *@param localFile is the local file to check.
-    *@return true if the file is acceptable by the downstream connection.
-    */
-    @Override
+    
     public boolean checkDocumentIndexable(File localFile)
       throws ManifoldCFException, ServiceInterruption
     {
       return pipelineConnector.checkDocumentIndexable(pipelineDescriptionString,localFile,checkActivity);
     }
-
-    /** Pre-determine whether a document's length is acceptable downstream.  This method is used
-    * to determine whether to fetch a document in the first place.
-    *@param length is the length of the document.
-    *@return true if the file is acceptable by the downstream connection.
-    */
-    @Override
+    
     public boolean checkLengthIndexable(long length)
       throws ManifoldCFException, ServiceInterruption
     {
       return pipelineConnector.checkLengthIndexable(pipelineDescriptionString,length,checkActivity);
     }
 
-    /** Pre-determine whether a document's URL is acceptable downstream.  This method is used
-    * to help filter out documents that cannot be indexed in advance.
-    *@param url is the URL of the document.
-    *@return true if the file is acceptable by the downstream connection.
-    */
-    @Override
-    public boolean checkURLIndexable(String url)
+    public boolean checkURLIndexable(String uri)
       throws ManifoldCFException, ServiceInterruption
     {
-      return pipelineConnector.checkURLIndexable(pipelineDescriptionString,url,checkActivity);
-    }
-
-  }
-
-  /** This class describes the entry stage of an add pipeline.
-  */
-  public static class PipelineAddEntryPoint
-  {
-    protected final IPipelineConnector pipelineConnector;
-    protected final String pipelineDescriptionString;
-    protected final IOutputAddActivity addActivity;
-    
-    public PipelineAddEntryPoint(IPipelineConnector pipelineConnector,
-      String pipelineDescriptionString,
-      IOutputAddActivity addActivity)
-    {
-      this.pipelineConnector = pipelineConnector;
-      this.pipelineDescriptionString = pipelineDescriptionString;
-      this.addActivity = addActivity;
+      return pipelineConnector.checkURLIndexable(pipelineDescriptionString,uri,checkActivity);
     }
     
-    public IPipelineConnector getPipelineConnector()
-    {
-      return pipelineConnector;
-    }
-    
-    public String getPipelineDescriptionString()
-    {
-      return pipelineDescriptionString;
-    }
-    
-    public IOutputAddActivity getPipelineAddActivity()
-    {
-      return addActivity;
-    }
   }
-
-  /** This class is used to join together pipeline stages for add operations */
-  public static class PipelineAddActivity implements IOutputAddActivity
+  
+  /** This class describes the entry stage of multiple siblings in an add pipeline.
+  */

[... 687 lines stripped ...]


Mime
View raw message