manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1602680 [2/4] - in /manifoldcf/trunk: ./ framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/ framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/ framework/pull-agent/src/main/java/org/apache/m...
Date Sun, 15 Jun 2014 11:49:19 GMT
Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1602680&r1=1602679&r2=1602680&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Sun Jun 15 11:49:19 2014
@@ -216,6 +216,21 @@ public class IncrementalIngester extends
     performDelete("",null,null);
   }
 
+  /** From a pipeline specification, get the name of the output connection that will be indexed last
+  * in the pipeline.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@return the last indexed output connection name.
+  */
+  @Override
+  public String getLastIndexedOutputConnectionName(IPipelineSpecificationBasic pipelineSpecificationBasic)
+  {
+    // It's always the last in the sequence.
+    int count = pipelineSpecificationBasic.getOutputCount();
+    if (count == 0)
+      return null;
+    return pipelineSpecificationBasic.getStageConnectionName(count-1);
+  }
+
   /** Check if a mime type is indexable.
   *@param outputConnectionName is the name of the output connection associated with this action.
   *@param outputDescription is the output description string.
@@ -223,35 +238,31 @@ public class IncrementalIngester extends
   *@return true if the mimeType is indexable.
   */
   @Override
+  @Deprecated
   public boolean checkMimeTypeIndexable(String outputConnectionName, String outputDescription, String mimeType)
     throws ManifoldCFException, ServiceInterruption
   {
-    return checkMimeTypeIndexable(new String[0], new String[0],
-      outputConnectionName, outputDescription,
+    return checkMimeTypeIndexable(
+      new RuntPipelineSpecification(outputConnectionName,outputDescription),
       mimeType,null);
   }
 
+  
   /** Check if a mime type is indexable.
-  *@param transformationConnectionNames is the ordered list of transformation connection names.
-  *@param transformationDescriptions is the ordered list of transformation description strings.
-  *@param outputConnectionName is the name of the output connection associated with this action.
-  *@param outputDescription is the output description string.
+  *@param pipelineSpecification is the pipeline specification.
   *@param mimeType is the mime type to check.
   *@param activity are the activities available to this method.
   *@return true if the mimeType is indexable.
   */
   @Override
   public boolean checkMimeTypeIndexable(
-    String[] transformationConnectionNames, String[] transformationDescriptions,
-    String outputConnectionName, String outputDescription,
+    IPipelineSpecification pipelineSpecification,
     String mimeType,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
     PipelineObject pipeline = pipelineGrab(
-      transformationConnectionManager.loadMultiple(transformationConnectionNames),
-      connectionManager.load(outputConnectionName),
-      transformationDescriptions,outputDescription);
+      new PipelineConnections(pipelineSpecification));
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -265,8 +276,6 @@ public class IncrementalIngester extends
     }
   }
 
-  
-  
   /** Check if a file is indexable.
   *@param outputConnectionName is the name of the output connection associated with this action.
   *@param outputDescription is the output description string.
@@ -274,35 +283,30 @@ public class IncrementalIngester extends
   *@return true if the local file is indexable.
   */
   @Override
+  @Deprecated
   public boolean checkDocumentIndexable(String outputConnectionName, String outputDescription, File localFile)
     throws ManifoldCFException, ServiceInterruption
   {
-    return checkDocumentIndexable(new String[0], new String[0],
-      outputConnectionName, outputDescription,
+    return checkDocumentIndexable(
+      new RuntPipelineSpecification(outputConnectionName,outputDescription),
       localFile,null);
   }
   
   /** Check if a file is indexable.
-  *@param transformationConnectionNames is the ordered list of transformation connection names.
-  *@param transformationDescriptions is the ordered list of transformation description strings.
-  *@param outputConnectionName is the name of the output connection associated with this action.
-  *@param outputDescription is the output description string.
+  *@param pipelineSpecification is the pipeline specification.
   *@param localFile is the local file to check.
   *@param activity are the activities available to this method.
   *@return true if the local file is indexable.
   */
   @Override
   public boolean checkDocumentIndexable(
-    String[] transformationConnectionNames, String[] transformationDescriptions,
-    String outputConnectionName, String outputDescription,
+    IPipelineSpecification pipelineSpecification,
     File localFile,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
     PipelineObject pipeline = pipelineGrab(
-      transformationConnectionManager.loadMultiple(transformationConnectionNames),
-      connectionManager.load(outputConnectionName),
-      transformationDescriptions,outputDescription);
+      new PipelineConnections(pipelineSpecification));
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -324,36 +328,31 @@ public class IncrementalIngester extends
   *@return true if the file is indexable.
   */
   @Override
+  @Deprecated
   public boolean checkLengthIndexable(String outputConnectionName, String outputDescription, long length)
     throws ManifoldCFException, ServiceInterruption
   {
-    return checkLengthIndexable(new String[0], new String[0],
-      outputConnectionName, outputDescription,
+    return checkLengthIndexable(
+      new RuntPipelineSpecification(outputConnectionName,outputDescription),
       length,null);
   }
   
   /** Pre-determine whether a document's length is indexable by this connector.  This method is used by participating repository connectors
   * to help filter out documents that are too long to be indexable.
-  *@param transformationConnectionNames is the ordered list of transformation connection names.
-  *@param transformationDescriptions is the ordered list of transformation description strings.
-  *@param outputConnectionName is the name of the output connection associated with this action.
-  *@param outputDescription is the output description string.
+  *@param pipelineSpecification is the pipeline specification.
   *@param length is the length of the document.
   *@param activity are the activities available to this method.
   *@return true if the file is indexable.
   */
   @Override
   public boolean checkLengthIndexable(
-    String[] transformationConnectionNames, String[] transformationDescriptions,
-    String outputConnectionName, String outputDescription,
+    IPipelineSpecification pipelineSpecification,
     long length,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
     PipelineObject pipeline = pipelineGrab(
-      transformationConnectionManager.loadMultiple(transformationConnectionNames),
-      connectionManager.load(outputConnectionName),
-      transformationDescriptions,outputDescription);
+      new PipelineConnections(pipelineSpecification));
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -375,36 +374,31 @@ public class IncrementalIngester extends
   *@return true if the file is indexable.
   */
   @Override
+  @Deprecated
   public boolean checkURLIndexable(String outputConnectionName, String outputDescription, String url)
     throws ManifoldCFException, ServiceInterruption
   {
-    return checkURLIndexable(new String[0], new String[0],
-      outputConnectionName, outputDescription,
+    return checkURLIndexable(
+      new RuntPipelineSpecification(outputConnectionName,outputDescription),
       url,null);
   }
   
   /** Pre-determine whether a document's URL is indexable by this connector.  This method is used by participating repository connectors
   * to help filter out documents that not indexable.
-  *@param transformationConnectionNames is the ordered list of transformation connection names.
-  *@param transformationDescriptions is the ordered list of transformation description strings.
-  *@param outputConnectionName is the name of the output connection associated with this action.
-  *@param outputDescription is the output description string.
+  *@param pipelineSpecification is the pipeline specification.
   *@param url is the url of the document.
   *@param activity are the activities available to this method.
   *@return true if the file is indexable.
   */
   @Override
   public boolean checkURLIndexable(
-    String[] transformationConnectionNames, String[] transformationDescriptions,
-    String outputConnectionName, String outputDescription,
+    IPipelineSpecification pipelineSpecification,
     String url,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
     PipelineObject pipeline = pipelineGrab(
-      transformationConnectionManager.loadMultiple(transformationConnectionNames),
-      connectionManager.load(outputConnectionName),
-      transformationDescriptions,outputDescription);
+      new PipelineConnections(pipelineSpecification));
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -418,7 +412,6 @@ public class IncrementalIngester extends
     }
   }
 
-  
   /** Grab the entire pipeline.
   *@param transformationConnections - the transformation connections, in order
   *@param outputConnection - the output connection
@@ -426,42 +419,88 @@ public class IncrementalIngester extends
   *@param outputDescriptionString - the output description string
   *@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
   */
-  protected PipelineObject pipelineGrab(ITransformationConnection[] transformationConnections, IOutputConnection outputConnection,
-    String[] transformationDescriptionStrings, String outputDescriptionString)
+  protected PipelineObjectWithVersions pipelineGrabWithVersions(PipelineConnectionsWithVersions pipelineConnections)
     throws ManifoldCFException
   {
     // Pick up all needed transformation connectors
-    String[] transformationConnectionNames = new String[transformationConnections.length];
-    for (int i = 0; i < transformationConnections.length; i++)
+    ITransformationConnector[] transformationConnectors = transformationConnectorPool.grabMultiple(pipelineConnections.getTransformationConnectionNames(),pipelineConnections.getTransformationConnections());
+    for (ITransformationConnector c : transformationConnectors)
     {
-      transformationConnectionNames[i] = transformationConnections[i].getName();
+      if (c == null)
+      {
+        transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
+        return null;
+      }
     }
     
-    ITransformationConnector[] transformationConnectors = transformationConnectorPool.grabMultiple(transformationConnectionNames,transformationConnections);
+    // Pick up all needed output connectors.  If this fails we have to release the transformation connectors.
+    try
+    {
+      IOutputConnector[] outputConnectors = outputConnectorPool.grabMultiple(pipelineConnections.getOutputConnectionNames(),pipelineConnections.getOutputConnections());
+      for (IOutputConnector c : outputConnectors)
+      {
+        if (c == null)
+        {
+          outputConnectorPool.releaseMultiple(pipelineConnections.getOutputConnections(),outputConnectors);
+          transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
+          return null;
+        }
+      }
+      return new PipelineObjectWithVersions(pipelineConnections,transformationConnectors,outputConnectors);
+    }
+    catch (Throwable e)
+    {
+      transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
+      if (e instanceof ManifoldCFException)
+        throw (ManifoldCFException)e;
+      else if (e instanceof RuntimeException)
+        throw (RuntimeException)e;
+      else if (e instanceof Error)
+        throw (Error)e;
+      else
+        throw new RuntimeException("Unexpected exception type: "+e.getClass().getName()+": "+e.getMessage(),e);
+    }
+  }
+
+  /** Grab the entire pipeline.
+  *@param transformationConnections - the transformation connections, in order
+  *@param outputConnection - the output connection
+  *@param transformationDescriptionStrings - the array of description strings for transformations
+  *@param outputDescriptionString - the output description string
+  *@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
+  */
+  protected PipelineObject pipelineGrab(PipelineConnections pipelineConnections)
+    throws ManifoldCFException
+  {
+    // Pick up all needed transformation connectors
+    ITransformationConnector[] transformationConnectors = transformationConnectorPool.grabMultiple(pipelineConnections.getTransformationConnectionNames(),pipelineConnections.getTransformationConnections());
     for (ITransformationConnector c : transformationConnectors)
     {
       if (c == null)
       {
-        transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
+        transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
         return null;
       }
     }
     
-    // Last, pick up output connector.  If it fails we have to release the transformation connectors.
+    // Pick up all needed output connectors.  If this fails we have to release the transformation connectors.
     try
     {
-      IOutputConnector outputConnector = outputConnectorPool.grab(outputConnection);
-      if (outputConnector == null)
+      IOutputConnector[] outputConnectors = outputConnectorPool.grabMultiple(pipelineConnections.getOutputConnectionNames(),pipelineConnections.getOutputConnections());
+      for (IOutputConnector c : outputConnectors)
       {
-        transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
-        return null;
+        if (c == null)
+        {
+          outputConnectorPool.releaseMultiple(pipelineConnections.getOutputConnections(),outputConnectors);
+          transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
+          return null;
+        }
       }
-      return new PipelineObject(transformationConnections,transformationConnectors,outputConnection,outputConnector,
-        transformationDescriptionStrings,outputDescriptionString);
+      return new PipelineObject(pipelineConnections,transformationConnectors,outputConnectors);
     }
     catch (Throwable e)
     {
-      transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
+      transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
       if (e instanceof ManifoldCFException)
         throw (ManifoldCFException)e;
       else if (e instanceof RuntimeException)
@@ -498,35 +537,141 @@ public class IncrementalIngester extends
 
   }
 
-  /** Get transformation version strings for a document.
-  *@param transformationConnectionNames are the names of the transformation connections associated with this action.
-  *@param specs are the transformation specifications.
-  *@return the description strings.
+  /** Get transformation version string for a document.
+  *@param transformationConnectionName is the names of the transformation connection associated with this action.
+  *@param spec is the transformation specification.
+  *@return the description string.
   */
-  @Override
-  public String[] getTransformationDescriptions(String[] transformationConnectionNames, OutputSpecification[] specs)
+  public String getTransformationDescription(String transformationConnectionName, OutputSpecification spec)
     throws ManifoldCFException, ServiceInterruption
   {
-    String[] rval = new String[transformationConnectionNames.length];
-    for (int i = 0; i < rval.length; i++)
+    ITransformationConnection connection = transformationConnectionManager.load(transformationConnectionName);
+    ITransformationConnector connector = transformationConnectorPool.grab(connection);
+    if (connector == null)
+      // The connector is not installed; treat this as a service interruption.
+      throw new ServiceInterruption("Transformation connector not installed",0L);
+    try
     {
-      String transformationConnectionName = transformationConnectionNames[i];
-      OutputSpecification spec = specs[i];
-      ITransformationConnection connection = transformationConnectionManager.load(transformationConnectionName);
-      ITransformationConnector connector = transformationConnectorPool.grab(connection);
-      if (connector == null)
-        // The connector is not installed; treat this as a service interruption.
-        throw new ServiceInterruption("Transformation connector not installed",0L);
-      try
-      {
-        rval[i] = connector.getPipelineDescription(spec);
-      }
-      finally
+      return connector.getPipelineDescription(spec);
+    }
+    finally
+    {
+      transformationConnectorPool.release(connection,connector);
+    }
+  }
+
+  /** Determine whether we need to fetch or refetch a document.
+  * Pass in information including the pipeline specification with existing version info, plus new document and parameter version strings.
+  * If no outputs need to be updated, then this method will return false.  If any outputs need updating, then true is returned.
+  *@param pipelineSpecificationWithVersions is the pipeline specification including new version info for all transformation and output
+  *  connections.
+  *@param newDocumentVersion is the newly-determined document version.
+  *@param newParameterVersion is the newly-determined parameter version.
+  *@param newAuthorityNameString is the newly-determined authority name.
+  *@return true if the document needs to be refetched.
+  */
+  @Override
+  public boolean checkFetchDocument(
+    IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+    String newDocumentVersion,
+    String newParameterVersion,
+    String newAuthorityNameString)
+  {
+    IPipelineSpecification pipelineSpecification = pipelineSpecificationWithVersions.getPipelineSpecification();
+    IPipelineSpecificationBasic basicSpecification = pipelineSpecification.getBasicPipelineSpecification();
+    // Empty document version has a special meaning....
+    if (newDocumentVersion.length() == 0)
+      return true;
+    // Otherwise, cycle through the outputs
+    for (int i = 0; i < basicSpecification.getOutputCount(); i++)
+    {
+      int stage = basicSpecification.getOutputStage(i);
+      String oldDocumentVersion = pipelineSpecificationWithVersions.getOutputDocumentVersionString(i);
+      String oldParameterVersion = pipelineSpecificationWithVersions.getOutputParameterVersionString(i);
+      String oldOutputVersion = pipelineSpecificationWithVersions.getOutputVersionString(i);
+      String oldAuthorityName = pipelineSpecificationWithVersions.getAuthorityNameString(i);
+      // If it looks like we never indexed this output before, we need to do it now.
+      if (oldDocumentVersion == null)
+        return true;
+      // Look first at the version strings that aren't pipeline dependent
+      if (!oldDocumentVersion.equals(newDocumentVersion) ||
+        !oldParameterVersion.equals(newParameterVersion) ||
+        !oldAuthorityName.equals(newAuthorityNameString) ||
+        !oldOutputVersion.equals(pipelineSpecification.getStageDescriptionString(stage)))
+        return true;
+      
+      // Everything matches so far.  Next step is to compute a transformation path an corresponding version string.
+      String newTransformationVersion = computePackedTransformationVersion(pipelineSpecification,stage);
+      if (!pipelineSpecificationWithVersions.getOutputTransformationVersionString(i).equals(newTransformationVersion))
+        return true;
+    }
+    // Everything matches, so no reindexing is needed.
+    return false;
+  }
+
+  /** Compute a transformation version given a pipeline specification and starting output stage.
+  *@param pipelineSpecification is the pipeline specification.
+  *@param stage is the stage number of the output stage.
+  *@return the transformation version string, which will be a composite of all the transformations applied.
+  */
+  protected static String computePackedTransformationVersion(IPipelineSpecification pipelineSpecification, int stage)
+  {
+    IPipelineSpecificationBasic basicSpecification = pipelineSpecification.getBasicPipelineSpecification();
+    // First, count the stages we need to represent
+    int stageCount = 0;
+    int currentStage = stage;
+    while (true)
+    {
+      int newStage = basicSpecification.getStageParent(currentStage);
+      if (newStage == -1)
+        break;
+      stageCount++;
+      currentStage = newStage;
+    }
+    // Doesn't matter how we pack it; I've chosen to do it in reverse for convenience
+    String[] stageNames = new String[stageCount];
+    String[] stageDescriptions = new String[stageCount];
+    stageCount = 0;
+    currentStage = stage;
+    while (true)
+    {
+      int newStage = basicSpecification.getStageParent(currentStage);
+      if (newStage == -1)
+        break;
+      stageNames[stageCount] = basicSpecification.getStageConnectionName(newStage);
+      stageDescriptions[stageCount] = pipelineSpecification.getStageDescriptionString(newStage);
+      stageCount++;
+      currentStage = newStage;
+    }
+    // Finally, do the packing.
+    StringBuilder sb = new StringBuilder();
+    packList(sb,stageNames,'+');
+    packList(sb,stageDescriptions,'!');
+    return sb.toString();
+  }
+  
+  protected static void packList(StringBuilder output, String[] values, char delimiter)
+  {
+    pack(output,Integer.toString(values.length),delimiter);
+    int i = 0;
+    while (i < values.length)
+    {
+      pack(output,values[i++],delimiter);
+    }
+  }
+
+  protected static void pack(StringBuilder sb, String value, char delim)
+  {
+    for (int i = 0; i < value.length(); i++)
+    {
+      char x = value.charAt(i);
+      if (x == delim || x == '\\')
       {
-        transformationConnectorPool.release(connection,connector);
+        sb.append('\\');
       }
+      sb.append(x);
     }
-    return rval;
+    sb.append(delim);
   }
 
   /** Record a document version, but don't ingest it.
@@ -540,40 +685,145 @@ public class IncrementalIngester extends
   *@param activities is the object used in case a document needs to be removed from the output index as the result of this operation.
   */
   @Override
+  @Deprecated
   public void documentRecord(String outputConnectionName,
     String identifierClass, String identifierHash,
     String documentVersion,
     long recordTime, IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
+    documentRecord(
+      new RuntPipelineSpecificationBasic(outputConnectionName),
+      identifierClass, identifierHash,
+      documentVersion,
+      recordTime, activities);
+  }
+  
+  /** Record a document version, but don't ingest it.
+  * The purpose of this method is to keep track of the frequency at which ingestion "attempts" take place.
+  * ServiceInterruption is thrown if this action must be rescheduled.
+  *@param pipelineSpecificationBasic is the basic pipeline specification needed.
+  *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+  *@param identifierHash is the hashed document identifier.
+  *@param documentVersion is the document version.
+  *@param recordTime is the time at which the recording took place, in milliseconds since epoch.
+  *@param activities is the object used in case a document needs to be removed from the output index as the result of this operation.
+  */
+  @Override
+  public void documentRecord(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String identifierClass, String identifierHash,
+    String documentVersion, long recordTime,
+    IOutputActivity activities)
+    throws ManifoldCFException, ServiceInterruption
+  {
     String docKey = makeKey(identifierClass,identifierHash);
 
+    String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
+    IOutputConnection[] outputConnections = connectionManager.loadMultiple(outputConnectionNames);
+
     if (Logging.ingest.isDebugEnabled())
     {
-      Logging.ingest.debug("Recording document '"+docKey+"' for output connection '"+outputConnectionName+"'");
+      Logging.ingest.debug("Recording document '"+docKey+"' for output connections '"+outputConnectionNames+"'");
     }
 
-    // With a null document URI, this can't throw either ServiceInterruption or IOException
-    try
-    {
-      performIngestion(new ITransformationConnection[0],new String[0],
-        connectionManager.load(outputConnectionName),null,
-        docKey,documentVersion,null,null,null,
-        null,
-        null,
-        recordTime,
-        null,
-        activities);
-    }
-    catch (IOException e)
-    {
-      throw new RuntimeException("Unexpected IOException thrown: "+e.getMessage(),e);
-    }
-    catch (ServiceInterruption e)
+    for (int k = 0; k < outputConnectionNames.length; k++)
     {
-      throw new RuntimeException("Unexpected ServiceInterruption thrown: "+e.getMessage(),e);
-    }
+      String outputConnectionName = outputConnectionNames[k];
+      IOutputConnection connection = outputConnections[k];
+
+      String oldURI = null;
+      String oldURIHash = null;
+      String oldOutputVersion = null;
+
+      // Repeat if needed
+      while (true)
+      {
+        long sleepAmt = 0L;
+        try
+        {
+          // See what uri was used before for this doc, if any
+          ArrayList list = new ArrayList();
+          String query = buildConjunctionClause(list,new ClauseDescription[]{
+            new UnitaryClause(docKeyField,docKey),
+            new UnitaryClause(outputConnNameField,outputConnectionName)});
+            
+          IResultSet set = performQuery("SELECT "+docURIField+","+uriHashField+","+lastOutputVersionField+" FROM "+getTableName()+
+            " WHERE "+query,list,null,null);
+
+          if (set.getRowCount() > 0)
+          {
+            IResultRow row = set.getRow(0);
+            oldURI = (String)row.getValue(docURIField);
+            oldURIHash = (String)row.getValue(uriHashField);
+            oldOutputVersion = (String)row.getValue(lastOutputVersionField);
+          }
+          
+          break;
+        }
+        catch (ManifoldCFException e)
+        {
+          // Look for deadlock and retry if so
+          if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+          {
+            if (Logging.perf.isDebugEnabled())
+              Logging.perf.debug("Aborted select looking for status: "+e.getMessage());
+            sleepAmt = getSleepAmt();
+            continue;
+          }
+          throw e;
+        }
+        finally
+        {
+          sleepFor(sleepAmt);
+        }
+      }
+
+      // If uri hashes collide, then we must be sure to eliminate only the *correct* records from the table, or we will leave
+      // dangling documents around.  So, all uri searches and comparisons MUST compare the actual uri as well.
+
+      // But, since we need to insure that any given URI is only worked on by one thread at a time, use critical sections
+      // to block the rare case that multiple threads try to work on the same URI.
+      
+      String[] lockArray = computeLockArray(null,oldURI,outputConnectionName);
+      lockManager.enterLocks(null,null,lockArray);
+      try
+      {
+
+        ArrayList list = new ArrayList();
+        
+        if (oldURI != null)
+        {
+          IOutputConnector connector = outputConnectorPool.grab(connection);
+          if (connector == null)
+            // The connector is not installed; treat this as a service interruption.
+            throw new ServiceInterruption("Output connector not installed",0L);
+          try
+          {
+            connector.removeDocument(oldURI,oldOutputVersion,new OutputRemoveActivitiesWrapper(activities,outputConnectionName));
+          }
+          finally
+          {
+            outputConnectorPool.release(connection,connector);
+          }
+          // Delete all records from the database that match the old URI, except for THIS record.
+          list.clear();
+          String query = buildConjunctionClause(list,new ClauseDescription[]{
+            new UnitaryClause(uriHashField,"=",oldURIHash),
+            new UnitaryClause(outputConnNameField,outputConnectionName)});
+          list.add(docKey);
+          performDelete("WHERE "+query+" AND "+docKeyField+"!=?",list,null);
+        }
 
+        // If we get here, it means we are noting that the document was examined, but that no change was required.  This is signaled
+        // to noteDocumentIngest by having the null documentURI.
+        noteDocumentIngest(outputConnectionName,docKey,documentVersion,null,null,null,null,recordTime,null,null);
+      }
+      finally
+      {
+        lockManager.leaveLocks(null,null,lockArray);
+      }
+    }
   }
 
   /** Ingest a document.
@@ -651,14 +901,11 @@ public class IncrementalIngester extends
   {
     try
     {
-      return documentIngest(new String[0],
-        new String[0],
-        outputConnectionName,
-        outputVersion,
+      return documentIngest(
+        new RuntPipelineSpecificationWithVersions(outputConnectionName,outputVersion,
+          "","","","",""),
         identifierClass, identifierHash,
         documentVersion,
-        "",
-        outputVersion,
         parameterVersion,
         authorityName,
         data,
@@ -763,15 +1010,10 @@ public class IncrementalIngester extends
   * method also REMOVES ALL OLD METADATA.  When complete, the index will contain only the metadata
   * described by the RepositoryDocument object passed to this method.
   * ServiceInterruption is thrown if the document ingestion must be rescheduled.
-  *@param transformationConnectionNames are the names of the transformation connections associated with this action.
-  *@param transformationDescriptionStrings are the description strings corresponding to the transformation connection names.
-  *@param outputConnectionName is the name of the output connection associated with this action.
-  *@param otuputDescriptionString is the description string corresponding to the output connection.
+  *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hashed document identifier.
   *@param documentVersion is the document version.
-  *@param transformationVersion is the version string for the transformations to be performed on the document.
-  *@param outputVersion is the output version string for the output connection.
   *@param parameterVersion is the version string for the forced parameters.
   *@param authorityName is the name of the authority associated with the document, if any.
   *@param data is the document data.  The data is closed after ingestion is complete.
@@ -783,189 +1025,40 @@ public class IncrementalIngester extends
   */
   @Override
   public boolean documentIngest(
-    String[] transformationConnectionNames,
-    String[] transformationDescriptionStrings,
-    String outputConnectionName,
-    String outputDescriptionString,
+    IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
     String identifierClass, String identifierHash,
     String documentVersion,
-    String transformationVersion,
-    String outputVersion,
     String parameterVersion,
     String authorityName,
-    RepositoryDocument data,
+    RepositoryDocument document,
     long ingestTime, String documentURI,
     IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption, IOException
   {
-    IOutputConnection outputConnection = connectionManager.load(outputConnectionName);
-    ITransformationConnection[] transformationConnections = transformationConnectionManager.loadMultiple(transformationConnectionNames);
+    PipelineConnectionsWithVersions pipelineConnectionsWithVersions = new PipelineConnectionsWithVersions(pipelineSpecificationWithVersions);
     
     String docKey = makeKey(identifierClass,identifierHash);
 
     if (Logging.ingest.isDebugEnabled())
     {
-      Logging.ingest.debug("Ingesting document '"+docKey+"' into output connection '"+outputConnectionName+"'");
+      Logging.ingest.debug("Ingesting document '"+docKey+"' into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
     }
-    return performIngestion(transformationConnections,transformationDescriptionStrings,
-      outputConnection,outputDescriptionString,
-      docKey,documentVersion,outputVersion,transformationVersion,parameterVersion,
-      authorityName,
-      data,
-      ingestTime,documentURI,
-      activities);
-  }
-
-  /** Do the actual ingestion, or just record it if there's nothing to ingest. */
-  protected boolean performIngestion(
-    ITransformationConnection[] transformationConnections, String[] transformationDescriptionStrings,
-    IOutputConnection outputConnection, String outputDescriptionString,
-    String docKey, String documentVersion, String outputVersion, String transformationVersion, String parameterVersion,
-    String authorityNameString,
-    RepositoryDocument data,
-    long ingestTime, String documentURI,
-    IOutputActivity activities)
-    throws ManifoldCFException, ServiceInterruption, IOException
-  {
-    String outputConnectionName = outputConnection.getName();
-    
-    // No transactions; not safe because post may take too much time
-
-    // First, calculate a document uri hash value
-    String documentURIHash = null;
-    if (documentURI != null)
-      documentURIHash = ManifoldCF.hash(documentURI);
-
-    String oldURI = null;
-    String oldURIHash = null;
-    String oldOutputVersion = null;
 
+    // Set indexing date
+    document.setIndexingDate(new Date());
     
-    while (true)
-    {
-      long sleepAmt = 0L;
-      try
-      {
-        // See what uri was used before for this doc, if any
-        ArrayList list = new ArrayList();
-        String query = buildConjunctionClause(list,new ClauseDescription[]{
-          new UnitaryClause(docKeyField,docKey),
-          new UnitaryClause(outputConnNameField,outputConnectionName)});
-          
-        IResultSet set = performQuery("SELECT "+docURIField+","+uriHashField+","+lastOutputVersionField+" FROM "+getTableName()+
-          " WHERE "+query,list,null,null);
-
-        if (set.getRowCount() > 0)
-        {
-          IResultRow row = set.getRow(0);
-          oldURI = (String)row.getValue(docURIField);
-          oldURIHash = (String)row.getValue(uriHashField);
-          oldOutputVersion = (String)row.getValue(lastOutputVersionField);
-        }
-        
-        break;
-      }
-      catch (ManifoldCFException e)
-      {
-        // Look for deadlock and retry if so
-        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
-        {
-          if (Logging.perf.isDebugEnabled())
-            Logging.perf.debug("Aborted select looking for status: "+e.getMessage());
-          sleepAmt = getSleepAmt();
-          continue;
-        }
-        throw e;
-      }
-      finally
-      {
-        sleepFor(sleepAmt);
-      }
-    }
-
-    // If uri hashes collide, then we must be sure to eliminate only the *correct* records from the table, or we will leave
-    // dangling documents around.  So, all uri searches and comparisons MUST compare the actual uri as well.
-
-    // But, since we need to insure that any given URI is only worked on by one thread at a time, use critical sections
-    // to block the rare case that multiple threads try to work on the same URI.
-    int uriCount = 0;
-    if (documentURI != null)
-      uriCount++;
-    if (oldURI != null && (documentURI == null || !documentURI.equals(oldURI)))
-      uriCount++;
-    String[] lockArray = new String[uriCount];
-    uriCount = 0;
-    if (documentURI != null)
-      lockArray[uriCount++] = outputConnectionName+":"+documentURI;
-    if (oldURI != null && (documentURI == null || !documentURI.equals(oldURI)))
-      lockArray[uriCount++] = outputConnectionName+":"+oldURI;
-
-    lockManager.enterCriticalSections(null,null,lockArray);
+    // Set up a pipeline
+    PipelineObjectWithVersions pipeline = pipelineGrabWithVersions(pipelineConnectionsWithVersions);
+    if (pipeline == null)
+      // A connector is not installed; treat this as a service interruption.
+      throw new ServiceInterruption("Pipeline connector not installed",0L);
     try
     {
-
-      ArrayList list = new ArrayList();
-      
-      if (oldURI != null && (documentURI == null || !oldURI.equals(documentURI)))
-      {
-        // Delete all records from the database that match the old URI, except for THIS record.
-        list.clear();
-        String query = buildConjunctionClause(list,new ClauseDescription[]{
-          new UnitaryClause(uriHashField,"=",oldURIHash),
-          new UnitaryClause(outputConnNameField,"=",outputConnectionName)});
-        list.add(docKey);
-        performDelete("WHERE "+query+" AND "+docKeyField+"!=?",list,null);
-        removeDocument(outputConnection,oldURI,oldOutputVersion,new OutputRemoveActivitiesWrapper(activities,outputConnectionName));
-      }
-
-      if (documentURI != null)
-      {
-        // Get rid of all records that match the NEW uri, except for this record.
-        list.clear();
-        String query = buildConjunctionClause(list,new ClauseDescription[]{
-          new UnitaryClause(uriHashField,"=",documentURIHash),
-          new UnitaryClause(outputConnNameField,"=",outputConnectionName)});
-        list.add(docKey);
-        performDelete("WHERE "+query+" AND "+ docKeyField+"!=?",list,null);
-      }
-
-      // Now, we know we are ready for the ingest.
-      if (documentURI != null)
-      {
-        // Here are the cases:
-        // 1) There was a service interruption before the upload started.
-        // (In that case, we don't need to log anything, just reschedule).
-        // 2) There was a service interruption after the document was transmitted.
-        // (In that case, we should presume that the document was ingested, but
-        //  reschedule another import anyway.)
-        // 3) Everything went OK
-        // (need to log the ingestion.)
-        // 4) Everything went OK, but we were told we have an illegal document.
-        // (We note the ingestion because if we don't we will be forced to repeat ourselves.
-        //  In theory, document doesn't need to be deleted, but there is no way to signal
-        //  that at the moment.)
-
-        // Note an ingestion before we actually try it.
-        // This is a marker that says "something is there"; it has an empty version, which indicates
-        // that we don't know anything about it.  That means it will be reingested when the
-        // next version comes along, and will be deleted if called for also.
-        noteDocumentIngest(outputConnectionName,docKey,null,null,null,null,null,ingestTime,documentURI,documentURIHash);
-        int result = addOrReplaceDocument(transformationConnections,transformationDescriptionStrings,
-          outputConnection,outputDescriptionString,
-          documentURI,data,authorityNameString,
-          activities);
-        noteDocumentIngest(outputConnectionName,docKey,documentVersion,transformationVersion, outputVersion,parameterVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
-        return result == IOutputConnector.DOCUMENTSTATUS_ACCEPTED;
-      }
-
-      // If we get here, it means we are noting that the document was examined, but that no change was required.  This is signaled
-      // to noteDocumentIngest by having the null documentURI.
-      noteDocumentIngest(outputConnectionName,docKey,documentVersion,transformationVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,null,null);
-      return true;
+      return pipeline.addOrReplaceDocumentWithException(docKey,documentURI,document,documentVersion,parameterVersion,authorityName,activities,ingestTime) == IPipelineConnector.DOCUMENTSTATUS_ACCEPTED;
     }
     finally
     {
-      lockManager.leaveCriticalSections(null,null,lockArray);
+      pipeline.release();
     }
   }
 
@@ -977,37 +1070,66 @@ public class IncrementalIngester extends
   *@param checkTime is the time at which the check took place, in milliseconds since epoch.
   */
   @Override
+  @Deprecated
   public void documentCheckMultiple(String outputConnectionName,
     String[] identifierClasses, String[] identifierHashes,
     long checkTime)
     throws ManifoldCFException
   {
+    documentCheckMultiple(new RuntPipelineSpecificationBasic(outputConnectionName),
+      identifierClasses,identifierHashes,checkTime);
+  }
+  
+  protected static String[] extractOutputConnectionNames(IPipelineSpecificationBasic pipelineSpecificationBasic)
+  {
+    String[] rval = new String[pipelineSpecificationBasic.getOutputCount()];
+    for (int i = 0; i < rval.length; i++)
+    {
+      rval[i] = pipelineSpecificationBasic.getStageConnectionName(pipelineSpecificationBasic.getOutputStage(i));
+    }
+    return rval;
+  }
+  
+  /** Note the fact that we checked a document (and found that it did not need to be ingested, because the
+  * versions agreed).
+  *@param pipelineSpecificationBasic is a pipeline specification.
+  *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+  *@param identifierHashes are the set of document identifier hashes.
+  *@param checkTime is the time at which the check took place, in milliseconds since epoch.
+  */
+  @Override
+  public void documentCheckMultiple(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String[] identifierClasses, String[] identifierHashes,
+    long checkTime)
+    throws ManifoldCFException
+  {
+    // Extract output connection names from pipeline spec
+    String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
     beginTransaction();
     try
     {
       int maxClauses;
       
-      HashMap docIDValues = new HashMap();
-      int j = 0;
-      while (j < identifierHashes.length)
+      Set<String> docIDValues = new HashSet<String>();
+      for (int j = 0; j < identifierHashes.length; j++)
       {
         String docDBString = makeKey(identifierClasses[j],identifierHashes[j]);
-        docIDValues.put(docDBString,docDBString);
-        j++;
+        docIDValues.add(docDBString);
       }
 
       // Now, perform n queries, each of them no larger the maxInClause in length.
       // Create a list of row id's from this.
-      HashMap rowIDSet = new HashMap();
-      Iterator iter = docIDValues.keySet().iterator();
-      j = 0;
-      ArrayList list = new ArrayList();
-      maxClauses = maxClausesRowIdsForDocIds(outputConnectionName);
+      Set<Long> rowIDSet = new HashSet<Long>();
+      Iterator<String> iter = docIDValues.iterator();
+      int j = 0;
+      List<String> list = new ArrayList<String>();
+      maxClauses = maxClausesRowIdsForDocIds(outputConnectionNames);
       while (iter.hasNext())
       {
         if (j == maxClauses)
         {
-          findRowIdsForDocIds(outputConnectionName,rowIDSet,list);
+          findRowIdsForDocIds(outputConnectionNames,rowIDSet,list);
           list.clear();
           j = 0;
         }
@@ -1016,27 +1138,27 @@ public class IncrementalIngester extends
       }
 
       if (j > 0)
-        findRowIdsForDocIds(outputConnectionName,rowIDSet,list);
+        findRowIdsForDocIds(outputConnectionNames,rowIDSet,list);
 
       // Now, break row id's into chunks too; submit one chunk at a time
       j = 0;
-      list.clear();
-      iter = rowIDSet.keySet().iterator();
+      List<Long> list2 = new ArrayList<Long>();
+      Iterator<Long> iter2 = rowIDSet.iterator();
       maxClauses = maxClausesUpdateRowIds();
-      while (iter.hasNext())
+      while (iter2.hasNext())
       {
         if (j == maxClauses)
         {
-          updateRowIds(list,checkTime);
-          list.clear();
+          updateRowIds(list2,checkTime);
+          list2.clear();
           j = 0;
         }
-        list.add(iter.next());
+        list2.add(iter2.next());
         j++;
       }
 
       if (j > 0)
-        updateRowIds(list,checkTime);
+        updateRowIds(list2,checkTime);
     }
     catch (ManifoldCFException e)
     {
@@ -1062,12 +1184,31 @@ public class IncrementalIngester extends
   *@param checkTime is the time at which the check took place, in milliseconds since epoch.
   */
   @Override
+  @Deprecated
   public void documentCheck(String outputConnectionName,
     String identifierClass, String identifierHash,
     long checkTime)
     throws ManifoldCFException
   {
-    documentCheckMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash},checkTime);
+    documentCheck(new RuntPipelineSpecificationBasic(outputConnectionName),
+      identifierClass,identifierHash,checkTime);
+  }
+  
+  /** Note the fact that we checked a document (and found that it did not need to be ingested, because the
+  * versions agreed).
+  *@param pipelineSpecificationBasic is a basic pipeline specification.
+  *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+  *@param identifierHash is the hashed document identifier.
+  *@param checkTime is the time at which the check took place, in milliseconds since epoch.
+  */
+  @Override
+  public void documentCheck(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String identifierClass, String identifierHash,
+    long checkTime)
+    throws ManifoldCFException
+  {
+    documentCheckMultiple(pipelineSpecificationBasic,new String[]{identifierClass},new String[]{identifierHash},checkTime);
   }
 
   /** Calculate the number of clauses.
@@ -1079,7 +1220,7 @@ public class IncrementalIngester extends
   
   /** Update a chunk of row ids.
   */
-  protected void updateRowIds(ArrayList list, long checkTime)
+  protected void updateRowIds(List<Long> list, long checkTime)
     throws ManifoldCFException
   {
     ArrayList newList = new ArrayList();
@@ -1091,6 +1232,7 @@ public class IncrementalIngester extends
     performUpdate(map,"WHERE "+query,newList,null);
   }
 
+
   /** Delete multiple documents from the search engine index.
   *@param outputConnectionNames are the names of the output connections associated with this action.
   *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
@@ -1098,45 +1240,64 @@ public class IncrementalIngester extends
   *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
   */
   @Override
+  @Deprecated
   public void documentDeleteMultiple(String[] outputConnectionNames,
     String[] identifierClasses, String[] identifierHashes,
     IOutputRemoveActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
-
-    // Segregate request by connection names
-    HashMap keyMap = new HashMap();
-    int i = 0;
-    while (i < outputConnectionNames.length)
+    IPipelineSpecificationBasic[] pipelineSpecs = new IPipelineSpecificationBasic[outputConnectionNames.length];
+    for (int i = 0; i < pipelineSpecs.length; i++)
     {
-      String outputConnectionName = outputConnectionNames[i];
-      ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
+      pipelineSpecs[i] = new RuntPipelineSpecificationBasic(outputConnectionNames[i]);
+    }
+    documentDeleteMultiple(pipelineSpecs,
+      identifierClasses,identifierHashes,activities);
+  }
+  
+  /** Delete multiple documents from the search engine index.
+  *@param pipelineSpecificationBasics are the pipeline specifications associated with the documents.
+  *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+  *@param identifierHashes is tha array of document identifier hashes if the documents.
+  *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
+  */
+  @Override
+  public void documentDeleteMultiple(
+    IPipelineSpecificationBasic[] pipelineSpecificationBasics,
+    String[] identifierClasses, String[] identifierHashes,
+    IOutputRemoveActivity activities)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    // Segregate request by pipeline spec instance address.  Not perfect but works in the
+    // environment it is used it.
+    Map<IPipelineSpecificationBasic,List<Integer>> keyMap = new HashMap<IPipelineSpecificationBasic,List<Integer>>();
+    for (int i = 0; i < pipelineSpecificationBasics.length; i++)
+    {
+      IPipelineSpecificationBasic spec = pipelineSpecificationBasics[i];
+      List<Integer> list = keyMap.get(spec);
       if (list == null)
       {
-        list = new ArrayList();
-        keyMap.put(outputConnectionName,list);
+        list = new ArrayList<Integer>();
+        keyMap.put(spec,list);
       }
       list.add(new Integer(i));
-      i++;
     }
 
     // Create the return array.
-    Iterator iter = keyMap.keySet().iterator();
+    Iterator<IPipelineSpecificationBasic> iter = keyMap.keySet().iterator();
     while (iter.hasNext())
     {
-      String outputConnectionName = (String)iter.next();
-      ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
+      IPipelineSpecificationBasic spec = iter.next();
+      List<Integer> list = keyMap.get(spec);
       String[] localIdentifierClasses = new String[list.size()];
       String[] localIdentifierHashes = new String[list.size()];
-      i = 0;
-      while (i < localIdentifierClasses.length)
+      for (int i = 0; i < localIdentifierClasses.length; i++)
       {
-        int index = ((Integer)list.get(i)).intValue();
+        int index = list.get(i).intValue();
         localIdentifierClasses[i] = identifierClasses[index];
         localIdentifierHashes[i] = identifierHashes[index];
-        i++;
       }
-      documentDeleteMultiple(outputConnectionName,localIdentifierClasses,localIdentifierHashes,activities);
+      documentDeleteMultiple(spec,localIdentifierClasses,localIdentifierHashes,activities);
     }
   }
 
@@ -1147,210 +1308,225 @@ public class IncrementalIngester extends
   *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
   */
   @Override
+  @Deprecated
   public void documentDeleteMultiple(String outputConnectionName,
     String[] identifierClasses, String[] identifierHashes,
     IOutputRemoveActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
-    activities = new OutputRemoveActivitiesWrapper(activities,outputConnectionName);
+    documentDeleteMultiple(new RuntPipelineSpecificationBasic(outputConnectionName),
+      identifierClasses,identifierHashes,activities);
+  }
+  
+  /** Delete multiple documents from the search engine index.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+  *@param identifierHashes is tha array of document identifier hashes if the documents.
+  *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
+  */
+  @Override
+  public void documentDeleteMultiple(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String[] identifierClasses, String[] identifierHashes,
+    IOutputRemoveActivity activities)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
+    // Load connection managers up front to save time
+    IOutputConnection[] outputConnections = connectionManager.loadMultiple(outputConnectionNames);
+    
+    // No transactions here, so we can cycle through the connection names one at a time
+    for (int z = 0; z < outputConnectionNames.length; z++)
+    {
+      String outputConnectionName = outputConnectionNames[z];
+      IOutputConnection connection = outputConnections[z];
 
-    IOutputConnection connection = connectionManager.load(outputConnectionName);
+      activities = new OutputRemoveActivitiesWrapper(activities,outputConnectionName);
 
-    if (Logging.ingest.isDebugEnabled())
-    {
-      int i = 0;
-      while (i < identifierHashes.length)
+      if (Logging.ingest.isDebugEnabled())
       {
-        Logging.ingest.debug("Request to delete document '"+makeKey(identifierClasses[i],identifierHashes[i])+"' from output connection '"+outputConnectionName+"'");
-        i++;
+        for (int i = 0; i < identifierHashes.length; i++)
+        {
+          Logging.ingest.debug("Request to delete document '"+makeKey(identifierClasses[i],identifierHashes[i])+"' from output connection '"+outputConnectionName+"'");
+        }
       }
-    }
 
-    // No transactions.  Time for the operation may exceed transaction timeout.
+      // No transactions.  Time for the operation may exceed transaction timeout.
 
-    // Obtain the current URIs of all of these.
-    DeleteInfo[] uris = getDocumentURIMultiple(outputConnectionName,identifierClasses,identifierHashes);
+      // Obtain the current URIs of all of these.
+      DeleteInfo[] uris = getDocumentURIMultiple(outputConnectionName,identifierClasses,identifierHashes);
 
-    // Grab critical section locks so that we can't attempt to ingest at the same time we are deleting.
-    // (This guarantees that when this operation is complete the database reflects reality.)
-    int validURIcount = 0;
-    int i = 0;
-    while (i < uris.length)
-    {
-      if (uris[i] != null && uris[i].getURI() != null)
-        validURIcount++;
-      i++;
-    }
-    String[] lockArray = new String[validURIcount];
-    String[] validURIArray = new String[validURIcount];
-    validURIcount = 0;
-    i = 0;
-    while (i < uris.length)
-    {
-      if (uris[i] != null && uris[i].getURI() != null)
+      // Grab critical section locks so that we can't attempt to ingest at the same time we are deleting.
+      // (This guarantees that when this operation is complete the database reflects reality.)
+      int validURIcount = 0;
+      for (int i = 0; i < uris.length; i++)
       {
-        validURIArray[validURIcount] = uris[i].getURI();
-        lockArray[validURIcount] = outputConnectionName+":"+validURIArray[validURIcount];
-        validURIcount++;
+        if (uris[i] != null && uris[i].getURI() != null)
+          validURIcount++;
       }
-      i++;
-    }
-
-    lockManager.enterCriticalSections(null,null,lockArray);
-    try
-    {
-      // Fetch the document URIs for the listed documents
-      int j = 0;
-      while (j < uris.length)
+      String[] lockArray = new String[validURIcount];
+      String[] validURIArray = new String[validURIcount];
+      validURIcount = 0;
+      for (int i = 0; i < uris.length; i++)
       {
-        if (uris[j] != null && uris[j].getURI() != null)
-          removeDocument(connection,uris[j].getURI(),uris[j].getOutputVersion(),activities);
-        j++;
+        if (uris[i] != null && uris[i].getURI() != null)
+        {
+          validURIArray[validURIcount] = uris[i].getURI();
+          lockArray[validURIcount] = outputConnectionName+":"+validURIArray[validURIcount];
+          validURIcount++;
+        }
       }
 
-      // Now, get rid of all rows that match the given uris.
-      // Do the queries together, then the deletes
-      beginTransaction();
+      lockManager.enterLocks(null,null,lockArray);
       try
       {
-        // The basic process is this:
-        // 1) Come up with a set of urihash values
-        // 2) Find the matching, corresponding id values
-        // 3) Delete the rows corresponding to the id values, in sequence
-
-        // Process (1 & 2) has to be broken down into chunks that contain the maximum
-        // number of doc hash values each.  We need to avoid repeating doc hash values,
-        // so the first step is to come up with ALL the doc hash values before looping
-        // over them.
-
-        int maxClauses;
-        
-        // Find all the documents that match this set of URIs
-        HashMap docURIHashValues = new HashMap();
-        HashMap docURIValues = new HashMap();
-        j = 0;
-        while (j < validURIArray.length)
+        // Fetch the document URIs for the listed documents
+        for (int i = 0; i < uris.length; i++)
         {
-          String docDBString = validURIArray[j++];
-          String docDBHashString = ManifoldCF.hash(docDBString);
-          docURIValues.put(docDBString,docDBString);
-          docURIHashValues.put(docDBHashString,docDBHashString);
+          if (uris[i] != null && uris[i].getURI() != null)
+            removeDocument(connection,uris[i].getURI(),uris[i].getOutputVersion(),activities);
         }
 
-        // Now, perform n queries, each of them no larger the maxInClause in length.
-        // Create a list of row id's from this.
-        HashMap rowIDSet = new HashMap();
-        Iterator iter = docURIHashValues.keySet().iterator();
-        j = 0;
-        ArrayList hashList = new ArrayList();
-        maxClauses = maxClausesRowIdsForURIs(outputConnectionName);
-        while (iter.hasNext())
+        // Now, get rid of all rows that match the given uris.
+        // Do the queries together, then the deletes
+        beginTransaction();
+        try
         {
-          if (j == maxClauses)
+          // The basic process is this:
+          // 1) Come up with a set of urihash values
+          // 2) Find the matching, corresponding id values
+          // 3) Delete the rows corresponding to the id values, in sequence
+
+          // Process (1 & 2) has to be broken down into chunks that contain the maximum
+          // number of doc hash values each.  We need to avoid repeating doc hash values,
+          // so the first step is to come up with ALL the doc hash values before looping
+          // over them.
+
+          int maxClauses;
+          
+          // Find all the documents that match this set of URIs
+          Set<String> docURIHashValues = new HashSet<String>();
+          Set<String> docURIValues = new HashSet<String>();
+          for (String docDBString : validURIArray)
           {
-            findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
-            hashList.clear();
-            j = 0;
+            String docDBHashString = ManifoldCF.hash(docDBString);
+            docURIValues.add(docDBString);
+            docURIHashValues.add(docDBHashString);
           }
-          hashList.add(iter.next());
-          j++;
-        }
 
-        if (j > 0)
-          findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
+          // Now, perform n queries, each of them no larger the maxInClause in length.
+          // Create a list of row id's from this.
+          Set<Long> rowIDSet = new HashSet<Long>();
+          Iterator<String> iter = docURIHashValues.iterator();
+          int j = 0;
+          List<String> hashList = new ArrayList<String>();
+          maxClauses = maxClausesRowIdsForURIs(outputConnectionName);
+          while (iter.hasNext())
+          {
+            if (j == maxClauses)
+            {
+              findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
+              hashList.clear();
+              j = 0;
+            }
+            hashList.add(iter.next());
+            j++;
+          }
 
-        // Next, go through the list of row IDs, and delete them in chunks
-        j = 0;
-        ArrayList list = new ArrayList();
-        iter = rowIDSet.keySet().iterator();
-        maxClauses = maxClausesDeleteRowIds();
-        while (iter.hasNext())
-        {
-          if (j == maxClauses)
+          if (j > 0)
+            findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
+
+          // Next, go through the list of row IDs, and delete them in chunks
+          j = 0;
+          List<Long> list = new ArrayList<Long>();
+          Iterator<Long> iter2 = rowIDSet.iterator();
+          maxClauses = maxClausesDeleteRowIds();
+          while (iter2.hasNext())
           {
-            deleteRowIds(list);
-            list.clear();
-            j = 0;
+            if (j == maxClauses)
+            {
+              deleteRowIds(list);
+              list.clear();
+              j = 0;
+            }
+            list.add(iter2.next());
+            j++;
           }
-          list.add(iter.next());
-          j++;
-        }
 
-        if (j > 0)
-          deleteRowIds(list);
+          if (j > 0)
+            deleteRowIds(list);
 
-        // Now, find the set of documents that remain that match the document identifiers.
-        HashMap docIdValues = new HashMap();
-        j = 0;
-        while (j < identifierHashes.length)
-        {
-          String docDBString = makeKey(identifierClasses[j],identifierHashes[j]);
-          docIdValues.put(docDBString,docDBString);
-          j++;
-        }
+          // Now, find the set of documents that remain that match the document identifiers.
+          Set<String> docIdValues = new HashSet<String>();
+          for (int i = 0; i < identifierHashes.length; i++)
+          {
+            String docDBString = makeKey(identifierClasses[i],identifierHashes[i]);
+            docIdValues.add(docDBString);
+          }
 
-        // Now, perform n queries, each of them no larger the maxInClause in length.
-        // Create a list of row id's from this.
-        rowIDSet.clear();
-        iter = docIdValues.keySet().iterator();
-        j = 0;
-        list.clear();
-        maxClauses = maxClausesRowIdsForDocIds(outputConnectionName);
-        while (iter.hasNext())
-        {
-          if (j == maxClauses)
+          // Now, perform n queries, each of them no larger the maxInClause in length.
+          // Create a list of row id's from this.
+          rowIDSet.clear();
+          iter = docIdValues.iterator();
+          j = 0;
+          List<String> list2 = new ArrayList<String>();
+          maxClauses = maxClausesRowIdsForDocIds(outputConnectionName);
+          while (iter.hasNext())
           {
-            findRowIdsForDocIds(outputConnectionName,rowIDSet,list);
-            list.clear();
-            j = 0;
+            if (j == maxClauses)
+            {
+              findRowIdsForDocIds(outputConnectionName,rowIDSet,list2);
+              list2.clear();
+              j = 0;
+            }
+            list2.add(iter.next());
+            j++;
           }
-          list.add(iter.next());
-          j++;
-        }
 
-        if (j > 0)
-          findRowIdsForDocIds(outputConnectionName,rowIDSet,list);
+          if (j > 0)
+            findRowIdsForDocIds(outputConnectionName,rowIDSet,list2);
 
-        // Next, go through the list of row IDs, and delete them in chunks
-        j = 0;
-        list.clear();
-        iter = rowIDSet.keySet().iterator();
-        maxClauses = maxClausesDeleteRowIds();
-        while (iter.hasNext())
-        {
-          if (j == maxClauses)
+          // Next, go through the list of row IDs, and delete them in chunks
+          j = 0;
+          list.clear();
+          iter2 = rowIDSet.iterator();
+          maxClauses = maxClausesDeleteRowIds();
+          while (iter2.hasNext())
           {
-            deleteRowIds(list);
-            list.clear();
-            j = 0;
+            if (j == maxClauses)
+            {
+              deleteRowIds(list);
+              list.clear();
+              j = 0;
+            }
+            list.add(iter2.next());
+            j++;
           }
-          list.add(iter.next());
-          j++;
-        }
 
-        if (j > 0)
-          deleteRowIds(list);
+          if (j > 0)
+            deleteRowIds(list);
 
-      }
-      catch (ManifoldCFException e)
-      {
-        signalRollback();
-        throw e;
-      }
-      catch (Error e)
-      {
-        signalRollback();
-        throw e;
+        }
+        catch (ManifoldCFException e)
+        {
+          signalRollback();
+          throw e;
+        }
+        catch (Error e)
+        {
+          signalRollback();
+          throw e;
+        }
+        finally
+        {
+          endTransaction();
+        }
       }
       finally
       {
-        endTransaction();
+        lockManager.leaveLocks(null,null,lockArray);
       }
     }
-    finally
-    {
-      lockManager.leaveCriticalSections(null,null,lockArray);
-    }
   }
 
   /** Calculate the clauses.
@@ -1364,7 +1540,7 @@ public class IncrementalIngester extends
   /** Given values and parameters corresponding to a set of hash values, add corresponding
   * table row id's to the output map.
   */
-  protected void findRowIdsForURIs(String outputConnectionName, HashMap rowIDSet, HashMap uris, ArrayList hashParamValues)
+  protected void findRowIdsForURIs(String outputConnectionName, Set<Long> rowIDSet, Set<String> uris, List<String> hashParamValues)
     throws ManifoldCFException
   {
     ArrayList list = new ArrayList();
@@ -1374,18 +1550,17 @@ public class IncrementalIngester extends
       
     IResultSet set = performQuery("SELECT "+idField+","+docURIField+" FROM "+
       getTableName()+" WHERE "+query,list,null,null);
-      
-    int i = 0;
-    while (i < set.getRowCount())
+    
+    for (int i = 0; i < set.getRowCount(); i++)
     {
-      IResultRow row = set.getRow(i++);
+      IResultRow row = set.getRow(i);
       String docURI = (String)row.getValue(docURIField);
       if (docURI != null && docURI.length() > 0)
       {
-        if (uris.get(docURI) != null)
+        if (uris.contains(docURI))
         {
           Long rowID = (Long)row.getValue(idField);
-          rowIDSet.put(rowID,rowID);
+          rowIDSet.add(rowID);
         }
       }
     }
@@ -1398,11 +1573,19 @@ public class IncrementalIngester extends
     return findConjunctionClauseMax(new ClauseDescription[]{
       new UnitaryClause(outputConnNameField,outputConnectionName)});
   }
+
+  /** Calculate the maximum number of doc ids we should use.
+  */
+  protected int maxClausesRowIdsForDocIds(String[] outputConnectionNames)
+  {
+    return findConjunctionClauseMax(new ClauseDescription[]{
+      new MultiClause(outputConnNameField,outputConnectionNames)});
+  }
   
   /** Given values and parameters corresponding to a set of hash values, add corresponding
   * table row id's to the output map.
   */
-  protected void findRowIdsForDocIds(String outputConnectionName, HashMap rowIDSet, ArrayList paramValues)
+  protected void findRowIdsForDocIds(String outputConnectionName, Set<Long> rowIDSet, List<String> paramValues)
     throws ManifoldCFException
   {
     ArrayList list = new ArrayList();
@@ -1412,13 +1595,34 @@ public class IncrementalIngester extends
       
     IResultSet set = performQuery("SELECT "+idField+" FROM "+
       getTableName()+" WHERE "+query,list,null,null);
+    
+    for (int i = 0; i < set.getRowCount(); i++)
+    {
+      IResultRow row = set.getRow(i);
+      Long rowID = (Long)row.getValue(idField);
+      rowIDSet.add(rowID);
+    }
+  }
+
+  /** Given values and parameters corresponding to a set of hash values, add corresponding
+  * table row id's to the output map.
+  */
+  protected void findRowIdsForDocIds(String[] outputConnectionNames, Set<Long> rowIDSet, List<String> paramValues)
+    throws ManifoldCFException
+  {
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new MultiClause(docKeyField,paramValues),
+      new MultiClause(outputConnNameField,outputConnectionNames)});
       
-    int i = 0;
-    while (i < set.getRowCount())
+    IResultSet set = performQuery("SELECT "+idField+" FROM "+
+      getTableName()+" WHERE "+query,list,null,null);
+    
+    for (int i = 0; i < set.getRowCount(); i++)
     {
-      IResultRow row = set.getRow(i++);
+      IResultRow row = set.getRow(i);
       Long rowID = (Long)row.getValue(idField);
-      rowIDSet.put(rowID,rowID);
+      rowIDSet.add(rowID);
     }
   }
 
@@ -1431,7 +1635,7 @@ public class IncrementalIngester extends
     
   /** Delete a chunk of row ids.
   */
-  protected void deleteRowIds(ArrayList list)
+  protected void deleteRowIds(List<Long> list)
     throws ManifoldCFException
   {
     ArrayList newList = new ArrayList();
@@ -1447,12 +1651,30 @@ public class IncrementalIngester extends
   *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
   */
   @Override
+  @Deprecated
   public void documentDelete(String outputConnectionName,
     String identifierClass, String identifierHash,
     IOutputRemoveActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
-    documentDeleteMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash},activities);
+    documentDelete(new RuntPipelineSpecificationBasic(outputConnectionName),
+      identifierClass,identifierHash,activities);
+  }
+  
+  /** Delete a document from the search engine index.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+  *@param identifierHash is the hash of the id of the document.
+  *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
+  */
+  @Override
+  public void documentDelete(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String identifierClass, String identifierHash,
+    IOutputRemoveActivity activities)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    documentDeleteMultiple(pipelineSpecificationBasic,new String[]{identifierClass},new String[]{identifierHash},activities);
   }
 
   /** Find out what URIs a SET of document URIs are currently ingested.
@@ -1464,22 +1686,20 @@ public class IncrementalIngester extends
     throws ManifoldCFException
   {
     DeleteInfo[] rval = new DeleteInfo[identifierHashes.length];
-    HashMap map = new HashMap();
-    int i = 0;
-    while (i < identifierHashes.length)
+    Map<String,Integer> map = new HashMap<String,Integer>();
+    for (int i = 0; i < identifierHashes.length; i++)
     {
       map.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
       rval[i] = null;
-      i++;
     }
 
     beginTransaction();
     try
     {
-      ArrayList list = new ArrayList();
+      List<String> list = new ArrayList<String>();
       int maxCount = maxClauseDocumentURIChunk(outputConnectionName);
       int j = 0;
-      Iterator iter = map.keySet().iterator();
+      Iterator<String> iter = map.keySet().iterator();
       while (iter.hasNext())
       {
         if (j == maxCount)
@@ -1511,100 +1731,85 @@ public class IncrementalIngester extends
     }
   }
 
-  /** Look up ingestion data for a SET of documents.
-  *@param outputConnectionNames are the names of the output connections associated with this action.
+  /** Look up ingestion data for a set of documents.
+  *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+  *@param pipelineSpecificationBasics are the pipeline specifications corresponding to the identifier classes and hashes.
   *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
   *@param identifierHashes is the array of document identifier hashes to look up.
-  *@return the array of document data.  Null will come back for any identifier that doesn't
-  * exist in the index.
   */
   @Override
-  public DocumentIngestStatus[] getDocumentIngestDataMultiple(String[] outputConnectionNames,
+  public void getPipelineDocumentIngestDataMultiple(
+    Map<OutputKey,DocumentIngestStatus> rval,
+    IPipelineSpecificationBasic[] pipelineSpecificationBasics,
     String[] identifierClasses, String[] identifierHashes)
     throws ManifoldCFException
   {
-    // Segregate request by connection names
-    HashMap keyMap = new HashMap();
-    int i = 0;
-    while (i < outputConnectionNames.length)
+    // Organize by pipeline spec.
+    Map<IPipelineSpecificationBasic,List<Integer>> keyMap = new HashMap<IPipelineSpecificationBasic,List<Integer>>();
+    for (int i = 0; i < pipelineSpecificationBasics.length; i++)
     {
-      String outputConnectionName = outputConnectionNames[i];
-      ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
+      IPipelineSpecificationBasic spec = pipelineSpecificationBasics[i];
+      List<Integer> list = keyMap.get(spec);
       if (list == null)
       {
-        list = new ArrayList();
-        keyMap.put(outputConnectionName,list);
+        list = new ArrayList<Integer>();
+        keyMap.put(spec,list);
       }
       list.add(new Integer(i));
-      i++;
     }
 
     // Create the return array.
-    DocumentIngestStatus[] rval = new DocumentIngestStatus[outputConnectionNames.length];
-    Iterator iter = keyMap.keySet().iterator();
+    Iterator<IPipelineSpecificationBasic> iter = keyMap.keySet().iterator();
     while (iter.hasNext())
     {
-      String outputConnectionName = (String)iter.next();
-      ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
+      IPipelineSpecificationBasic spec = iter.next();
+      List<Integer> list = keyMap.get(spec);
       String[] localIdentifierClasses = new String[list.size()];
       String[] localIdentifierHashes = new String[list.size()];
-      i = 0;
-      while (i < localIdentifierClasses.length)
+      for (int i = 0; i < localIdentifierClasses.length; i++)
       {
-        int index = ((Integer)list.get(i)).intValue();
+        int index = list.get(i).intValue();
         localIdentifierClasses[i] = identifierClasses[index];
         localIdentifierHashes[i] = identifierHashes[index];
-        i++;
-      }
-      DocumentIngestStatus[] localRval = getDocumentIngestDataMultiple(outputConnectionName,localIdentifierClasses,localIdentifierHashes);
-      i = 0;
-      while (i < localRval.length)
-      {
-        int index = ((Integer)list.get(i)).intValue();
-        rval[index] = localRval[i];
-        i++;
       }
+      getPipelineDocumentIngestDataMultiple(rval,spec,localIdentifierClasses,localIdentifierHashes);
     }
-    return rval;
   }
 
   /** Look up ingestion data for a SET of documents.
-  *@param outputConnectionName is the names of the output connection associated with this action.
+  *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+  *@param pipelineSpecificationBasic is the pipeline specification for all documents.
   *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
   *@param identifierHashes is the array of document identifier hashes to look up.
-  *@return the array of document data.  Null will come back for any identifier that doesn't
-  * exist in the index.
   */
   @Override
-  public DocumentIngestStatus[] getDocumentIngestDataMultiple(String outputConnectionName,
+  public void getPipelineDocumentIngestDataMultiple(
+    Map<OutputKey,DocumentIngestStatus> rval,
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
     String[] identifierClasses, String[] identifierHashes)
     throws ManifoldCFException
   {
-    // Build the return array
-    DocumentIngestStatus[] rval = new DocumentIngestStatus[identifierHashes.length];
+    String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
 
     // Build a map, so we can convert an identifier into an array index.
-    HashMap indexMap = new HashMap();
-    int i = 0;
-    while (i < identifierHashes.length)
+    Map<String,Integer> indexMap = new HashMap<String,Integer>();
+    for (int i = 0; i < identifierHashes.length; i++)
     {
       indexMap.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
-      rval[i] = null;
-      i++;
     }
 
     beginTransaction();
     try
     {
-      ArrayList list = new ArrayList();
-      int maxCount = maxClauseDocumentIngestDataChunk(outputConnectionName);
+      List<String> list = new ArrayList<String>();
+      int maxCount = maxClausePipelineDocumentIngestDataChunk(outputConnectionNames);
       int j = 0;
-      Iterator iter = indexMap.keySet().iterator();
+      Iterator<String> iter = indexMap.keySet().iterator();
       while (iter.hasNext())
       {
         if (j == maxCount)
         {
-          getDocumentIngestDataChunk(rval,indexMap,outputConnectionName,list);
+          getPipelineDocumentIngestDataChunk(rval,indexMap,outputConnectionNames,list,identifierClasses,identifierHashes);
           j = 0;
           list.clear();
         }
@@ -1612,8 +1817,7 @@ public class IncrementalIngester extends
         j++;
       }
       if (j > 0)
-        getDocumentIngestDataChunk(rval,indexMap,outputConnectionName,list);
-      return rval;
+        getPipelineDocumentIngestDataChunk(rval,indexMap,outputConnectionNames,list,identifierClasses,identifierHashes);
     }
     catch (ManifoldCFException e)
     {
@@ -1629,78 +1833,248 @@ public class IncrementalIngester extends
     {
       endTransaction();
     }
+
   }
 
-  /** Look up ingestion data for a documents.
-  *@param outputConnectionName is the name of the output connection associated with this action.
-  *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
-  *@param identifierHash is the hash of the id of the document.
-  *@return the current document's ingestion data, or null if the document is not currently ingested.
+  /** Get a chunk of document ingest data records.
+  *@param rval is the document ingest status array where the data should be put.
+  *@param map is the map from id to index.
+  *@param clause is the in clause for the query.
+  *@param list is the parameter list for the query.
   */
-  @Override
-  public DocumentIngestStatus getDocumentIngestData(String outputConnectionName,
-    String identifierClass, String identifierHash)
+  protected void getPipelineDocumentIngestDataChunk(Map<OutputKey,DocumentIngestStatus> rval, Map<String,Integer> map, String[] outputConnectionNames, List<String> list,
+    String[] identifierClasses, String[] identifierHashes)
     throws ManifoldCFException
   {
-    return getDocumentIngestDataMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash})[0];
-  }
-
-  /** Calculate the average time interval between changes for a document.
-  * This is based on the data gathered for the document.
+    ArrayList newList = new ArrayList();
+    String query = buildConjunctionClause(newList,new ClauseDescription[]{
+      new MultiClause(docKeyField,list),
+      new MultiClause(outputConnNameField,outputConnectionNames)});
+      
+    // Get the primary records associated with this hash value
+    IResultSet set = performQuery("SELECT "+idField+","+outputConnNameField+","+docKeyField+","+lastVersionField+","+lastOutputVersionField+","+authorityNameField+","+forcedParamsField+","+lastTransformationVersionField+
+      " FROM "+getTableName()+" WHERE "+query,newList,null,null);
+
+    // Now, go through the original request once more, this time building the result
+    for (int i = 0; i < set.getRowCount(); i++)
+    {
+      IResultRow row = set.getRow(i);
+      String docHash = row.getValue(docKeyField).toString();
+      Integer position = map.get(docHash);
+      if (position != null)
+      {
+        Long id = (Long)row.getValue(idField);
+        String outputConnectionName = (String)row.getValue(outputConnNameField);
+        String lastVersion = (String)row.getValue(lastVersionField);
+        if (lastVersion == null)
+          lastVersion = "";
+        String lastTransformationVersion = (String)row.getValue(lastTransformationVersionField);
+        if (lastTransformationVersion == null)
+          lastTransformationVersion = "";
+        String lastOutputVersion = (String)row.getValue(lastOutputVersionField);
+        if (lastOutputVersion == null)
+          lastOutputVersion = "";
+        String paramVersion = (String)row.getValue(forcedParamsField);
+        if (paramVersion == null)
+          paramVersion = "";
+        String authorityName = (String)row.getValue(authorityNameField);
+        if (authorityName == null)
+          authorityName = "";
+        int indexValue = position.intValue();
+        rval.put(new OutputKey(identifierClasses[indexValue],identifierHashes[indexValue],outputConnectionName),
+          new DocumentIngestStatus(lastVersion,lastTransformationVersion,lastOutputVersion,paramVersion,authorityName));
+      }
+    }
+  }
+  
+  /** Look up ingestion data for a document.
+  *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+  *@param pipelineSpecificationBasic is the pipeline specification for the document.
+  *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+  *@param identifierHash is the hash of the id of the document.
+  */
+  @Override
+  public void getPipelineDocumentIngestData(
+    Map<OutputKey,DocumentIngestStatus> rval,
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String identifierClass, String identifierHash)
+    throws ManifoldCFException
+  {
+    getPipelineDocumentIngestDataMultiple(rval,pipelineSpecificationBasic,
+      new String[]{identifierClass},new String[]{identifierHash});
+  }
+
+  /** Look up ingestion data for a SET of documents.
+  *@param outputConnectionNames are the names of the output connections associated with this action.
+  *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+  *@param identifierHashes is the array of document identifier hashes to look up.
+  *@return the array of document data.  Null will come back for any identifier that doesn't
+  * exist in the index.
+  */
+  @Override
+  @Deprecated
+  public DocumentIngestStatus[] getDocumentIngestDataMultiple(String[] outputConnectionNames,
+    String[] identifierClasses, String[] identifierHashes)
+    throws ManifoldCFException
+  {
+    // Segregate request by connection names
+    Map<String,List<Integer>> keyMap = new HashMap<String,List<Integer>>();
+    for (int i = 0; i < outputConnectionNames.length; i++)
+    {
+      String outputConnectionName = outputConnectionNames[i];
+      List<Integer> list = keyMap.get(outputConnectionName);
+      if (list == null)
+      {
+        list = new ArrayList<Integer>();
+        keyMap.put(outputConnectionName,list);
+      }
+      list.add(new Integer(i));
+    }
+
+    // Create the return array.
+    DocumentIngestStatus[] rval = new DocumentIngestStatus[outputConnectionNames.length];
+    Iterator<String> iter = keyMap.keySet().iterator();
+    while (iter.hasNext())
+    {
+      String outputConnectionName = iter.next();
+      List<Integer> list = keyMap.get(outputConnectionName);
+      String[] localIdentifierClasses = new String[list.size()];
+      String[] localIdentifierHashes = new String[list.size()];
+      for (int i = 0; i < localIdentifierClasses.length; i++)
+      {
+        int index = list.get(i).intValue();
+        localIdentifierClasses[i] = identifierClasses[index];
+        localIdentifierHashes[i] = identifierHashes[index];
+      }
+      DocumentIngestStatus[] localRval = getDocumentIngestDataMultiple(outputConnectionName,localIdentifierClasses,localIdentifierHashes);
+      for (int i = 0; i < localRval.length; i++)
+      {
+        int index = list.get(i).intValue();
+        rval[index] = localRval[i];
+      }
+    }
+    return rval;
+  }
+
+  /** Look up ingestion data for a SET of documents.
+  *@param outputConnectionName is the names of the output connection associated with this action.
+  *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+  *@param identifierHashes is the array of document identifier hashes to look up.
+  *@return the array of document data.  Null will come back for any identifier that doesn't
+  * exist in the index.
+  */
+  @Override
+  @Deprecated
+  public DocumentIngestStatus[] getDocumentIngestDataMultiple(String outputConnectionName,
+    String[] identifierClasses, String[] identifierHashes)
+    throws ManifoldCFException
+  {
+    // Build the return array
+    DocumentIngestStatus[] rval = new DocumentIngestStatus[identifierHashes.length];
+
+    // Build a map, so we can convert an identifier into an array index.
+    Map<String,Integer> indexMap = new HashMap<String,Integer>();
+    for (int i = 0; i < identifierHashes.length; i++)
+    {
+      indexMap.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
+      rval[i] = null;
+    }
+
+    beginTransaction();
+    try
+    {
+      List<String> list = new ArrayList<String>();
+      int maxCount = maxClauseDocumentIngestDataChunk(outputConnectionName);
+      int j = 0;
+      Iterator<String> iter = indexMap.keySet().iterator();
+      while (iter.hasNext())
+      {
+        if (j == maxCount)
+        {
+          getDocumentIngestDataChunk(rval,indexMap,outputConnectionName,list);
+          j = 0;
+          list.clear();
+        }
+        list.add(iter.next());
+        j++;
+      }
+      if (j > 0)
+        getDocumentIngestDataChunk(rval,indexMap,outputConnectionName,list);
+      return rval;
+    }
+    catch (ManifoldCFException e)
+    {
+      signalRollback();
+      throw e;
+    }
+    catch (Error e)
+    {
+      signalRollback();
+      throw e;
+    }
+    finally
+    {
+      endTransaction();
+    }
+  }
+
+  /** Look up ingestion data for a documents.
   *@param outputConnectionName is the name of the output connection associated with this action.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hash of the id of the document.
-  *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
+  *@return the current document's ingestion data, or null if the document is not currently ingested.
   */
   @Override
-  public long getDocumentUpdateInterval(String outputConnectionName,
+  @Deprecated
+  public DocumentIngestStatus getDocumentIngestData(String outputConnectionName,
     String identifierClass, String identifierHash)
     throws ManifoldCFException
   {
-    return getDocumentUpdateIntervalMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash})[0];
+    return getDocumentIngestDataMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash})[0];
   }
 
   /** Calculate the average time interval between changes for a document.
   * This is based on the data gathered for the document.
-  *@param outputConnectionName is the name of the output connection associated with this action.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
   *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
   *@param identifierHashes is the hashes of the ids of the documents.
   *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
   */
-  @Override
-  public long[] getDocumentUpdateIntervalMultiple(String outputConnectionName,
+  public long[] getDocumentUpdateIntervalMultiple(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
     String[] identifierClasses, String[] identifierHashes)

[... 1595 lines stripped ...]


Mime
View raw message