manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1612101 - in /manifoldcf/branches/CONNECTORS-989/framework: agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/ agents/src/main/java/org/apache/manifoldcf/agents/interfaces/ pull-agent/src/main/java/org/apache/manifoldcf/c...
Date Sun, 20 Jul 2014 15:17:33 GMT
Author: kwright
Date: Sun Jul 20 15:17:33 2014
New Revision: 1612101

URL: http://svn.apache.org/r1612101
Log:
Finish the changes

Modified:
    manifoldcf/branches/CONNECTORS-989/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
    manifoldcf/branches/CONNECTORS-989/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatusSet.java
    manifoldcf/branches/CONNECTORS-989/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
    manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IExistingVersions.java
    manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java
    manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java
    manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Modified: manifoldcf/branches/CONNECTORS-989/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-989/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1612101&r1=1612100&r2=1612101&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-989/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ manifoldcf/branches/CONNECTORS-989/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Sun Jul 20 15:17:33 2014
@@ -64,6 +64,7 @@ public class IncrementalIngester extends
   protected final static String idField = "id";
   protected final static String outputConnNameField = "connectionname";
   protected final static String docKeyField = "dockey";
+  protected final static String componentHashField = "componenthash";
   protected final static String docURIField = "docuri";
   protected final static String uriHashField = "urihash";
   protected final static String lastVersionField = "lastversion";
@@ -122,6 +123,7 @@ public class IncrementalIngester extends
         map.put(idField,new ColumnDescription("BIGINT",true,false,null,null,false));
         map.put(outputConnNameField,new ColumnDescription("VARCHAR(32)",false,false,outputConnectionTableName,outputConnectionNameField,false));
         map.put(docKeyField,new ColumnDescription("VARCHAR(73)",false,false,null,null,false));
+        map.put(componentHashField,new ColumnDescription("VARCHAR(40)",false,true,null,null,false));
         // The document URI field, if null, indicates that the document was not actually ingested!
         // This happens when a connector wishes to keep track of a version string, but not actually ingest the doc.
         map.put(docURIField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
@@ -156,10 +158,18 @@ public class IncrementalIngester extends
           performAlter(addMap,null,null,null);
         }
 
+        cd = (ColumnDescription)existing.get(componentHashField);
+        if (cd == null)
+        {
+          Map<String,ColumnDescription> addMap = new HashMap<String,ColumnDescription>();
+          addMap.put(componentHashField,new ColumnDescription("VARCHAR(40)",false,true,null,null,false));
+          performAlter(addMap,null,null,null);
+        }
+
       }
 
       // Now, do indexes
-      IndexDescription keyIndex = new IndexDescription(true,new String[]{docKeyField,outputConnNameField});
+      IndexDescription keyIndex = new IndexDescription(true,new String[]{docKeyField,outputConnNameField,componentHashField});
       IndexDescription uriHashIndex = new IndexDescription(false,new String[]{uriHashField,outputConnNameField});
       IndexDescription outputConnIndex = new IndexDescription(false,new String[]{outputConnNameField});
 
@@ -228,7 +238,20 @@ public class IncrementalIngester extends
     int count = pipelineSpecificationBasic.getOutputCount();
     if (count == 0)
       return null;
-    return pipelineSpecificationBasic.getStageConnectionName(count-1);
+    return pipelineSpecificationBasic.getStageConnectionName(pipelineSpecificationBasic.getOutputStage(count-1));
+  }
+
+  /** From a pipeline specification, get the name of the output connection that will be indexed first
+  * in the pipeline.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@return the first indexed output connection name.
+  */
+  @Override
+  public String getFirstIndexedOutputConnectionName(IPipelineSpecificationBasic pipelineSpecificationBasic)
+  {
+    if (pipelineSpecificationBasic.getOutputCount() == 0)
+      return null;
+    return pipelineSpecificationBasic.getStageConnectionName(pipelineSpecificationBasic.getOutputStage(0));
   }
 
   /** Check if a mime type is indexable.
@@ -623,7 +646,6 @@ public class IncrementalIngester extends
     String documentVersion, long recordTime)
     throws ManifoldCFException
   {
-    // MHL
     // This method is called when a connector decides that the last indexed version of the document is in fact just fine,
     // but the document version information should be updated.
     // The code pathway is therefore similar to that of document indexing, EXCEPT that no indexing will ever
@@ -642,7 +664,7 @@ public class IncrementalIngester extends
 
     if (Logging.ingest.isDebugEnabled())
     {
-      Logging.ingest.debug("Recording document '"+docKey+"' for output connections '"+outputConnectionNames+"'");
+      Logging.ingest.debug("Recording document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" for output connections '"+outputConnectionNames+"'");
     }
 
     for (int k = 0; k < outputConnectionNames.length; k++)
@@ -651,7 +673,7 @@ public class IncrementalIngester extends
 
       // If we get here, it means we are noting that the document was examined, but that no change was required.  This is signaled
       // to noteDocumentIngest by having the null documentURI.
-      noteDocumentIngest(outputConnectionName,docKey,documentVersion,null,null,null,null,recordTime,null,null);
+      noteDocumentIngest(outputConnectionName,docKey,componentHash,documentVersion,null,null,null,null,recordTime,null,null);
     }
   }
 
@@ -681,14 +703,13 @@ public class IncrementalIngester extends
     IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
-    // MHL
     PipelineConnectionsWithVersions pipelineConnectionsWithVersions = new PipelineConnectionsWithVersions(pipelineSpecificationWithVersions);
     
     String docKey = makeKey(identifierClass,identifierHash);
 
     if (Logging.ingest.isDebugEnabled())
     {
-      Logging.ingest.debug("Logging empty document '"+docKey+"' into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
+      Logging.ingest.debug("Logging empty document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
     }
 
     // Set up a pipeline
@@ -698,7 +719,7 @@ public class IncrementalIngester extends
       throw new ServiceInterruption("Pipeline connector not installed",0L);
     try
     {
-      pipeline.noDocument(docKey,documentVersion,parameterVersion,authorityName,activities,recordTime);
+      pipeline.noDocument(docKey,componentHash,documentVersion,parameterVersion,authorityName,activities,recordTime);
     }
     finally
     {
@@ -737,14 +758,13 @@ public class IncrementalIngester extends
     IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption, IOException
   {
-    // MHL
     PipelineConnectionsWithVersions pipelineConnectionsWithVersions = new PipelineConnectionsWithVersions(pipelineSpecificationWithVersions);
     
     String docKey = makeKey(identifierClass,identifierHash);
 
     if (Logging.ingest.isDebugEnabled())
     {
-      Logging.ingest.debug("Ingesting document '"+docKey+"' into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
+      Logging.ingest.debug("Ingesting document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
     }
 
     // Set indexing date
@@ -757,7 +777,7 @@ public class IncrementalIngester extends
       throw new ServiceInterruption("Pipeline connector not installed",0L);
     try
     {
-      return pipeline.addOrReplaceDocumentWithException(docKey,documentURI,data,documentVersion,parameterVersion,authorityName,activities,ingestTime) == IPipelineConnector.DOCUMENTSTATUS_ACCEPTED;
+      return pipeline.addOrReplaceDocumentWithException(docKey,componentHash,documentURI,data,documentVersion,parameterVersion,authorityName,activities,ingestTime) == IPipelineConnector.DOCUMENTSTATUS_ACCEPTED;
     }
     finally
     {
@@ -779,9 +799,11 @@ public class IncrementalIngester extends
     IOutputRemoveActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
-    // MHL
-    documentDelete(pipelineSpecificationBasic,
-      identifierClass,identifierHash,activities);
+    documentRemoveMultiple(pipelineSpecificationBasic,
+      new String[]{identifierClass},
+      new String[]{identifierHash},
+      componentHash,
+      activities);
   }
 
   protected static String[] extractOutputConnectionNames(IPipelineSpecificationBasic pipelineSpecificationBasic)
@@ -1176,6 +1198,218 @@ public class IncrementalIngester extends
     }
   }
 
+  /** Remove multiple document components from the search engine index.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@param identifierClasses are the names of the spaces in which the identifier hash should be interpreted.
+  *@param identifierHashes are the hashes of the ids of the documents.
+  *@param componentHash is the hashed component identifier, if any.
+  *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
+  */
+  @Override
+  public void documentRemoveMultiple(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String[] identifierClasses, String[] identifierHashes, String componentHash,
+    IOutputRemoveActivity activities)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
+    // Load connection managers up front to save time
+    IOutputConnection[] outputConnections = connectionManager.loadMultiple(outputConnectionNames);
+    
+    // No transactions here, so we can cycle through the connection names one at a time
+    for (int z = 0; z < outputConnectionNames.length; z++)
+    {
+      String outputConnectionName = outputConnectionNames[z];
+      IOutputConnection connection = outputConnections[z];
+
+      activities = new OutputRemoveActivitiesWrapper(activities,outputConnectionName);
+
+      if (Logging.ingest.isDebugEnabled())
+      {
+        for (int i = 0; i < identifierHashes.length; i++)
+        {
+          Logging.ingest.debug("Request to remove document '"+makeKey(identifierClasses[i],identifierHashes[i])+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" from output connection '"+outputConnectionName+"'");
+        }
+      }
+
+      // No transactions.  Time for the operation may exceed transaction timeout.
+
+      // Obtain the current URIs of all of these.
+      DeleteInfo[] uris = getDocumentURIMultiple(outputConnectionName,identifierClasses,identifierHashes,componentHash);
+
+      // Grab critical section locks so that we can't attempt to ingest at the same time we are deleting.
+      // (This guarantees that when this operation is complete the database reflects reality.)
+      int validURIcount = 0;
+      for (int i = 0; i < uris.length; i++)
+      {
+        if (uris[i] != null && uris[i].getURI() != null)
+          validURIcount++;
+      }
+      String[] lockArray = new String[validURIcount];
+      String[] validURIArray = new String[validURIcount];
+      validURIcount = 0;
+      for (int i = 0; i < uris.length; i++)
+      {
+        if (uris[i] != null && uris[i].getURI() != null)
+        {
+          validURIArray[validURIcount] = uris[i].getURI();
+          lockArray[validURIcount] = outputConnectionName+":"+validURIArray[validURIcount];
+          validURIcount++;
+        }
+      }
+
+      lockManager.enterLocks(null,null,lockArray);
+      try
+      {
+        // Fetch the document URIs for the listed documents
+        for (int i = 0; i < uris.length; i++)
+        {
+          if (uris[i] != null && uris[i].getURI() != null)
+            removeDocument(connection,uris[i].getURI(),uris[i].getOutputVersion(),activities);
+        }
+
+        // Now, get rid of all rows that match the given uris.
+        // Do the queries together, then the deletes
+        beginTransaction();
+        try
+        {
+          // The basic process is this:
+          // 1) Come up with a set of urihash values
+          // 2) Find the matching, corresponding id values
+          // 3) Delete the rows corresponding to the id values, in sequence
+
+          // Process (1 & 2) has to be broken down into chunks that contain the maximum
+          // number of doc hash values each.  We need to avoid repeating doc hash values,
+          // so the first step is to come up with ALL the doc hash values before looping
+          // over them.
+
+          int maxClauses;
+          
+          // Find all the documents that match this set of URIs
+          Set<String> docURIHashValues = new HashSet<String>();
+          Set<String> docURIValues = new HashSet<String>();
+          for (String docDBString : validURIArray)
+          {
+            String docDBHashString = ManifoldCF.hash(docDBString);
+            docURIValues.add(docDBString);
+            docURIHashValues.add(docDBHashString);
+          }
+
+          // Now, perform n queries, each of them no larger the maxInClause in length.
+          // Create a list of row id's from this.
+          Set<Long> rowIDSet = new HashSet<Long>();
+          Iterator<String> iter = docURIHashValues.iterator();
+          int j = 0;
+          List<String> hashList = new ArrayList<String>();
+          maxClauses = maxClausesRowIdsForURIs(outputConnectionName);
+          while (iter.hasNext())
+          {
+            if (j == maxClauses)
+            {
+              findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
+              hashList.clear();
+              j = 0;
+            }
+            hashList.add(iter.next());
+            j++;
+          }
+
+          if (j > 0)
+            findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
+
+          // Next, go through the list of row IDs, and delete them in chunks
+          j = 0;
+          List<Long> list = new ArrayList<Long>();
+          Iterator<Long> iter2 = rowIDSet.iterator();
+          maxClauses = maxClausesDeleteRowIds();
+          while (iter2.hasNext())
+          {
+            if (j == maxClauses)
+            {
+              deleteRowIds(list);
+              list.clear();
+              j = 0;
+            }
+            list.add(iter2.next());
+            j++;
+          }
+
+          if (j > 0)
+            deleteRowIds(list);
+
+          // Now, find the set of documents that remain that match the document identifiers.
+          Set<String> docIdValues = new HashSet<String>();
+          for (int i = 0; i < identifierHashes.length; i++)
+          {
+            String docDBString = makeKey(identifierClasses[i],identifierHashes[i]);
+            docIdValues.add(docDBString);
+          }
+
+          // Now, perform n queries, each of them no larger the maxInClause in length.
+          // Create a list of row id's from this.
+          rowIDSet.clear();
+          iter = docIdValues.iterator();
+          j = 0;
+          List<String> list2 = new ArrayList<String>();
+          maxClauses = maxClausesRowIdsForDocIds(outputConnectionName);
+          while (iter.hasNext())
+          {
+            if (j == maxClauses)
+            {
+              findRowIdsForDocIds(outputConnectionName,rowIDSet,list2);
+              list2.clear();
+              j = 0;
+            }
+            list2.add(iter.next());
+            j++;
+          }
+
+          if (j > 0)
+            findRowIdsForDocIds(outputConnectionName,rowIDSet,list2);
+
+          // Next, go through the list of row IDs, and delete them in chunks
+          j = 0;
+          list.clear();
+          iter2 = rowIDSet.iterator();
+          maxClauses = maxClausesDeleteRowIds();
+          while (iter2.hasNext())
+          {
+            if (j == maxClauses)
+            {
+              deleteRowIds(list);
+              list.clear();
+              j = 0;
+            }
+            list.add(iter2.next());
+            j++;
+          }
+
+          if (j > 0)
+            deleteRowIds(list);
+
+        }
+        catch (ManifoldCFException e)
+        {
+          signalRollback();
+          throw e;
+        }
+        catch (Error e)
+        {
+          signalRollback();
+          throw e;
+        }
+        finally
+        {
+          endTransaction();
+        }
+      }
+      finally
+      {
+        lockManager.leaveLocks(null,null,lockArray);
+      }
+    }
+  }
+
   /** Calculate the clauses.
   */
   protected int maxClausesRowIdsForURIs(String outputConnectionName)
@@ -1361,6 +1595,63 @@ public class IncrementalIngester extends
     }
   }
 
+  /** Find out what URIs a SET of document URIs are currently ingested.
+  *@param outputConnectionName is the output connection name.
+  *@param identifierClasses is the array of identifier classes.
+  *@param identifierHashes is the array of document id's to check.
+  *@param componentHash is the component hash to check.
+  *@return the array of current document uri's.  Null returned for identifiers
+  * that don't exist in the index.
+  */
+  protected DeleteInfo[] getDocumentURIMultiple(String outputConnectionName, String[] identifierClasses, String[] identifierHashes, String componentHash)
+    throws ManifoldCFException
+  {
+    DeleteInfo[] rval = new DeleteInfo[identifierHashes.length];
+    Map<String,Integer> map = new HashMap<String,Integer>();
+    for (int i = 0; i < identifierHashes.length; i++)
+    {
+      map.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
+      rval[i] = null;
+    }
+
+    beginTransaction();
+    try
+    {
+      List<String> list = new ArrayList<String>();
+      int maxCount = maxClauseDocumentURIChunk(outputConnectionName,componentHash);
+      int j = 0;
+      Iterator<String> iter = map.keySet().iterator();
+      while (iter.hasNext())
+      {
+        if (j == maxCount)
+        {
+          getDocumentURIChunk(rval,map,outputConnectionName,list,componentHash);
+          j = 0;
+          list.clear();
+        }
+        list.add(iter.next());
+        j++;
+      }
+      if (j > 0)
+        getDocumentURIChunk(rval,map,outputConnectionName,list,componentHash);
+      return rval;
+    }
+    catch (ManifoldCFException e)
+    {
+      signalRollback();
+      throw e;
+    }
+    catch (Error e)
+    {
+      signalRollback();
+      throw e;
+    }
+    finally
+    {
+      endTransaction();
+    }
+  }
+
   /** Look up ingestion data for a set of documents.
   *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
   *@param pipelineSpecificationBasics are the pipeline specifications corresponding to the identifier classes and hashes.
@@ -1482,7 +1773,7 @@ public class IncrementalIngester extends
       new MultiClause(outputConnNameField,outputConnectionNames)});
       
     // Get the primary records associated with this hash value
-    IResultSet set = performQuery("SELECT "+idField+","+outputConnNameField+","+docKeyField+","+lastVersionField+","+lastOutputVersionField+","+authorityNameField+","+forcedParamsField+","+lastTransformationVersionField+
+    IResultSet set = performQuery("SELECT "+idField+","+outputConnNameField+","+docKeyField+","+componentHashField+","+lastVersionField+","+lastOutputVersionField+","+authorityNameField+","+forcedParamsField+","+lastTransformationVersionField+
       " FROM "+getTableName()+" WHERE "+query,newList,null,null);
 
     // Now, go through the original request once more, this time building the result
@@ -1495,6 +1786,7 @@ public class IncrementalIngester extends
       {
         Long id = (Long)row.getValue(idField);
         String outputConnectionName = (String)row.getValue(outputConnNameField);
+        String componentHash = (String)row.getValue(componentHashField);
         String lastVersion = (String)row.getValue(lastVersionField);
         if (lastVersion == null)
           lastVersion = "";
@@ -1511,9 +1803,8 @@ public class IncrementalIngester extends
         if (authorityName == null)
           authorityName = "";
         int indexValue = position.intValue();
-        // MHL
         rval.addStatus(identifierClasses[indexValue],identifierHashes[indexValue],outputConnectionName,
-          null,new DocumentIngestStatus(lastVersion,lastTransformationVersion,lastOutputVersion,paramVersion,authorityName));
+          componentHash,new DocumentIngestStatus(lastVersion,lastTransformationVersion,lastOutputVersion,paramVersion,authorityName));
       }
     }
   }
@@ -1714,6 +2005,7 @@ public class IncrementalIngester extends
   /** Note the ingestion of a document, or the "update" of a document.
   *@param outputConnectionName is the name of the output connection.
   *@param docKey is the key string describing the document.
+  *@param componentHash is the component identifier hash for this document.
   *@param documentVersion is a string describing the new version of the document.
   *@param transformationVersion is a string describing all current transformations for the document.
   *@param outputVersion is the version string calculated for the output connection.
@@ -1725,7 +2017,7 @@ public class IncrementalIngester extends
   *@param documentURIHash is the hash of the document uri.
   */
   protected void noteDocumentIngest(String outputConnectionName,
-    String docKey, String documentVersion, String transformationVersion,
+    String docKey, String componentHash, String documentVersion, String transformationVersion,
     String outputVersion, String packedForcedParameters,
     String authorityNameString,
     long ingestTime, String documentURI, String documentURIHash)
@@ -1754,6 +2046,8 @@ public class IncrementalIngester extends
 
       // Try the update first.  Typically this succeeds except in the case where a doc is indexed for the first time.
       map.clear();
+      if (componentHash != null)
+        map.put(componentHashField,componentHash);
       map.put(lastVersionField,documentVersion);
       map.put(lastTransformationVersionField,transformationVersion);
       map.put(lastOutputVersionField,outputVersion);
@@ -1781,7 +2075,8 @@ public class IncrementalIngester extends
           ArrayList list = new ArrayList();
           String query = buildConjunctionClause(list,new ClauseDescription[]{
             new UnitaryClause(docKeyField,docKey),
-            new UnitaryClause(outputConnNameField,outputConnectionName)});
+            new UnitaryClause(outputConnNameField,outputConnectionName),
+            ((componentHash==null)?new NullCheckClause(componentHashField,true):new UnitaryClause(componentHashField,componentHash))});
           IResultSet set = performQuery("SELECT "+idField+","+changeCountField+" FROM "+getTableName()+" WHERE "+
             query+" FOR UPDATE",list,null,null);
           IResultRow row = null;
@@ -1833,6 +2128,8 @@ public class IncrementalIngester extends
 
       // Set up for insert
       map.clear();
+      if (componentHash != null)
+        map.put(componentHashField,componentHash);
       map.put(lastVersionField,documentVersion);
       map.put(lastTransformationVersionField,transformationVersion);
       map.put(lastOutputVersionField,outputVersion);
@@ -1896,9 +2193,10 @@ public class IncrementalIngester extends
   *@param rval is the string array where the uris should be put.
   *@param map is the map from id to index.
   *@param clause is the in clause for the query.
-  *@param list is the parameter list for the query.
+  *@param list are the doc keys for the query.
   */
-  protected void getDocumentURIChunk(DeleteInfo[] rval, Map<String,Integer> map, String outputConnectionName, List<String> list)
+  protected void getDocumentURIChunk(DeleteInfo[] rval, Map<String,Integer> map, String outputConnectionName,
+    List<String> list)
     throws ManifoldCFException
   {
     ArrayList newList = new ArrayList();
@@ -1926,6 +2224,52 @@ public class IncrementalIngester extends
     }
   }
 
+  /** Calculate how many clauses at a time
+  */
+  protected int maxClauseDocumentURIChunk(String outputConnectionName, String componentHash)
+  {
+    return findConjunctionClauseMax(new ClauseDescription[]{
+      new UnitaryClause(outputConnNameField,outputConnectionName),
+      ((componentHash==null)?new NullCheckClause(componentHashField,true):new UnitaryClause(componentHashField,componentHash))});
+  }
+
+  /** Get a chunk of document uris.
+  *@param rval is the string array where the uris should be put.
+  *@param map is the map from id to index.
+  *@param clause is the in clause for the query.
+  *@param list are the doc keys for the query.
+  *@param componentHash is the component hash, if any, for the query.
+  */
+  protected void getDocumentURIChunk(DeleteInfo[] rval, Map<String,Integer> map, String outputConnectionName,
+    List<String> list, String componentHash)
+    throws ManifoldCFException
+  {
+    ArrayList newList = new ArrayList();
+    String query = buildConjunctionClause(newList,new ClauseDescription[]{
+      new MultiClause(docKeyField,list),
+      new UnitaryClause(outputConnNameField,outputConnectionName),
+      ((componentHash==null)?new NullCheckClause(componentHashField,true):new UnitaryClause(componentHashField,componentHash))});
+      
+    IResultSet set = performQuery("SELECT "+docKeyField+","+docURIField+","+lastOutputVersionField+" FROM "+getTableName()+" WHERE "+
+      query,newList,null,null);
+
+    // Go through list and put into buckets.
+    for (int i = 0; i < set.getRowCount(); i++)
+    {
+      IResultRow row = set.getRow(i);
+      String docHash = row.getValue(docKeyField).toString();
+      Integer position = (Integer)map.get(docHash);
+      if (position != null)
+      {
+        String lastURI = (String)row.getValue(docURIField);
+        if (lastURI != null && lastURI.length() == 0)
+          lastURI = null;
+        String lastOutputVersion = (String)row.getValue(lastOutputVersionField);
+        rval[position.intValue()] = new DeleteInfo(lastURI,lastOutputVersion);
+      }
+    }
+  }
+
   /** Count the clauses
   */
   protected int maxClauseDocumentIngestDataChunk(String outputConnectionName)
@@ -2328,23 +2672,23 @@ public class IncrementalIngester extends
       this.pipelineConnectionsWithVersions = pipelineConnectionsWithVersions;
     }
 
-    public int addOrReplaceDocumentWithException(String docKey, String documentURI, RepositoryDocument document, String newDocumentVersion, String newParameterVersion, String authorityNameString, IOutputActivity finalActivity, long ingestTime)
+    public int addOrReplaceDocumentWithException(String docKey, String componentHash, String documentURI, RepositoryDocument document, String newDocumentVersion, String newParameterVersion, String authorityNameString, IOutputActivity finalActivity, long ingestTime)
       throws ManifoldCFException, ServiceInterruption, IOException
     {
-      PipelineAddFanout entryPoint = buildAddPipeline(finalActivity,newDocumentVersion,newParameterVersion,authorityNameString,ingestTime,docKey);
+      PipelineAddFanout entryPoint = buildAddPipeline(finalActivity,newDocumentVersion,newParameterVersion,authorityNameString,ingestTime,docKey,componentHash);
       return entryPoint.sendDocument(documentURI,document);
     }
 
-    public void noDocument(String docKey, String newDocumentVersion, String newParameterVersion, String authorityNameString, IOutputActivity finalActivity, long ingestTime)
+    public void noDocument(String docKey, String componentHash, String newDocumentVersion, String newParameterVersion, String authorityNameString, IOutputActivity finalActivity, long ingestTime)
       throws ManifoldCFException, ServiceInterruption
     {
-      PipelineAddFanout entryPoint = buildAddPipeline(finalActivity,newDocumentVersion,newParameterVersion,authorityNameString,ingestTime,docKey);
+      PipelineAddFanout entryPoint = buildAddPipeline(finalActivity,newDocumentVersion,newParameterVersion,authorityNameString,ingestTime,docKey,componentHash);
       entryPoint.noDocument();
     }
 
     protected PipelineAddFanout buildAddPipeline(IOutputActivity finalActivity,
       String newDocumentVersion, String newParameterVersion, String newAuthorityNameString,
-      long ingestTime, String docKey)
+      long ingestTime, String docKey, String componentHash)
     {
       // Algorithm for building a pipeline:
       // (1) We start with the set of final output connection stages, and build an entry point for each one.  That's our "current set".
@@ -2400,6 +2744,7 @@ public class IncrementalIngester extends
           newDocumentVersion,
           newParameterVersion,
           docKey,
+          componentHash,
           newAuthorityNameString);
         currentSet.put(new Integer(outputStage), outputStageEntryPoint);
       }
@@ -2833,6 +3178,7 @@ public class IncrementalIngester extends
     protected final String documentVersion;
     protected final String parameterVersion;
     protected final String docKey;
+    protected final String componentHash;
     protected final IOutputActivity activity;
     
     public OutputAddEntryPoint(IOutputConnector outputConnector,
@@ -2845,6 +3191,7 @@ public class IncrementalIngester extends
       String documentVersion,
       String parameterVersion,
       String docKey,
+      String componentHash,
       String authorityNameString)
     {
       super(outputConnector,outputDescriptionString,authorityNameString,activity,isActive);
@@ -2855,6 +3202,7 @@ public class IncrementalIngester extends
       this.documentVersion = documentVersion;
       this.parameterVersion = parameterVersion;
       this.docKey = docKey;
+      this.componentHash = componentHash;
       this.activity = activity;
     }
     
@@ -2984,15 +3332,15 @@ public class IncrementalIngester extends
           // This is a marker that says "something is there"; it has an empty version, which indicates
           // that we don't know anything about it.  That means it will be reingested when the
           // next version comes along, and will be deleted if called for also.
-          noteDocumentIngest(outputConnectionName,docKey,null,null,null,null,null,ingestTime,documentURI,documentURIHash);
+          noteDocumentIngest(outputConnectionName,docKey,componentHash,null,null,null,null,null,ingestTime,documentURI,documentURIHash);
           int result = super.addOrReplaceDocumentWithException(documentURI, document);
-          noteDocumentIngest(outputConnectionName,docKey,documentVersion,transformationVersion,pipelineDescriptionString.getVersionString(),parameterVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
+          noteDocumentIngest(outputConnectionName,docKey,componentHash,documentVersion,transformationVersion,pipelineDescriptionString.getVersionString(),parameterVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
           return result;
         }
 
         // If we get here, it means we are noting that the document was examined, but that no change was required.  This is signaled
         // to noteDocumentIngest by having the null documentURI.
-        noteDocumentIngest(outputConnectionName,docKey,documentVersion,transformationVersion,pipelineDescriptionString.getVersionString(),parameterVersion,authorityNameString,ingestTime,null,null);
+        noteDocumentIngest(outputConnectionName,docKey,componentHash,documentVersion,transformationVersion,pipelineDescriptionString.getVersionString(),parameterVersion,authorityNameString,ingestTime,null,null);
         return IPipelineConnector.DOCUMENTSTATUS_ACCEPTED;
       }
       finally

Modified: manifoldcf/branches/CONNECTORS-989/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatusSet.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-989/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatusSet.java?rev=1612101&r1=1612100&r2=1612101&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-989/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatusSet.java (original)
+++ manifoldcf/branches/CONNECTORS-989/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatusSet.java Sun Jul 20 15:17:33 2014
@@ -65,4 +65,12 @@ public class DocumentIngestStatusSet
       return primary;
     return components.get(componentHash);
   }
+  
+  /** Iterate over components.
+  *@return an iterator over component hashes.
+  */
+  public Iterator<String> componentIterator()
+  {
+    return components.keySet().iterator();
+  }
 }

Modified: manifoldcf/branches/CONNECTORS-989/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-989/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java?rev=1612101&r1=1612100&r2=1612101&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-989/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java (original)
+++ manifoldcf/branches/CONNECTORS-989/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java Sun Jul 20 15:17:33 2014
@@ -66,7 +66,14 @@ public interface IIncrementalIngester
   *@return the last indexed output connection name.
   */
   public String getLastIndexedOutputConnectionName(IPipelineSpecificationBasic pipelineSpecificationBasic);
-  
+
+  /** From a pipeline specification, get the name of the output connection that will be indexed first
+  * in the pipeline.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@return the first indexed output connection name.
+  */
+  public String getFirstIndexedOutputConnectionName(IPipelineSpecificationBasic pipelineSpecificationBasic);
+
   /** Get an output version string for a document.
   *@param outputConnectionName is the name of the output connection associated with this action.
   *@param spec is the output specification.
@@ -232,6 +239,19 @@ public interface IIncrementalIngester
     IOutputRemoveActivity activities)
     throws ManifoldCFException, ServiceInterruption;
 
+  /** Remove multiple document components from the search engine index.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@param identifierClasses are the names of the spaces in which the identifier hash should be interpreted.
+  *@param identifierHashes are the hashes of the ids of the documents.
+  *@param componentHash is the hashed component identifier, if any.
+  *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
+  */
+  public void documentRemoveMultiple(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String[] identifierClasses, String[] identifierHashes, String componentHash,
+    IOutputRemoveActivity activities)
+    throws ManifoldCFException, ServiceInterruption;
+
   /** Note the fact that we checked a document (and found that it did not need to be ingested, because the
   * versions agreed).
   *@param pipelineSpecificationBasic is a pipeline specification.

Modified: manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IExistingVersions.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IExistingVersions.java?rev=1612101&r1=1612100&r2=1612101&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IExistingVersions.java (original)
+++ manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IExistingVersions.java Sun Jul 20 15:17:33 2014
@@ -28,10 +28,19 @@ public interface IExistingVersions
 {
   public static final String _rcsid = "@(#)$Id$";
 
-  /** Retrieve an existing version string given a document identifier.
+  /** Retrieve the primary existing version string given a document identifier.
   *@param documentIdentifier is the document identifier.
   *@return the document version string, or null if the document was never previously indexed.
   */
-  public String getIndexedVersionString(String documentIdentifier);
+  public String getIndexedVersionString(String documentIdentifier)
+    throws ManifoldCFException;
 
+  /** Retrieve a component existing version string given a document identifier.
+  *@param documentIdentifier is the document identifier.
+  *@param componentIdentifier is the component identifier, if any.
+  *@return the document version string, or null of the document component was never previously indexed.
+  */
+  public String getIndexedVersionString(String documentIdentifier, String componentIdentifier)
+    throws ManifoldCFException;
+  
 }

Modified: manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java?rev=1612101&r1=1612100&r2=1612101&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java (original)
+++ manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java Sun Jul 20 15:17:33 2014
@@ -205,28 +205,24 @@ public interface IProcessActivity extend
     String version)
     throws ManifoldCFException, ServiceInterruption;
 
-  /** Remove the specified document component permanently from the search engine index, and from the status table.
-  * This method does NOT keep track of any document version information for the document and thus can
-  * lead to "churn", whereby the same document is queued, processed,
-  * and removed on subsequent crawls.  It is therefore preferable to use noDocument() instead,
-  * in any case where the same decision will need to be made over and over.
+  /** Remove the specified document primary component permanently from the search engine index,
+  * and from the status table.  Use this method when your document has components and
+  * now also has a primary document, but will not have a primary document again for the foreseeable
+  * future.  This is a rare situation.
   *@param documentIdentifier is the document's identifier.
-  *@param componentIdentifier is the component document identifier, if any.
   */
   public void removeDocument(String documentIdentifier)
     throws ManifoldCFException, ServiceInterruption;
 
-  /** Remove the specified document component permanently from the search engine index, and from the status table.
-  * This method does NOT keep track of any document version information for the document and thus can
-  * lead to "churn", whereby the same document is queued, processed,
-  * and removed on subsequent crawls.  It is therefore preferable to use noDocument() instead,
-  * in any case where the same decision will need to be made over and over.
+  /** Retain existing document component.  Use this method to signal that an already-existing
+  * document component does not need to be reindexed.  The default behavior is to remove
+  * components that are not mentioned during processing.
   *@param documentIdentifier is the document's identifier.
-  *@param componentIdentifier is the component document identifier, if any.
+  *@param componentIdentifier is the component document identifier, which cannot be null.
   */
-  public void removeDocument(String documentIdentifier,
+  public void retainDocument(String documentIdentifier,
     String componentIdentifier)
-    throws ManifoldCFException, ServiceInterruption;
+    throws ManifoldCFException;
 
   /** Record a document version, WITHOUT reindexing it, or removing it.  (Other
   * documents with the same URL, however, will still be removed.)  This is

Modified: manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java?rev=1612101&r1=1612100&r2=1612101&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java (original)
+++ manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java Sun Jul 20 15:17:33 2014
@@ -28,13 +28,14 @@ public class PipelineSpecificationWithVe
 {
   protected final IPipelineSpecification pipelineSpecification;
   protected final QueuedDocument queuedDocument;
+  protected final String componentIDHash;
     
   public PipelineSpecificationWithVersions(IPipelineSpecification pipelineSpecification,
-    QueuedDocument queuedDocument)
-    throws ManifoldCFException, ServiceInterruption
+    QueuedDocument queuedDocument, String componentIDHash)
   {
     this.pipelineSpecification = pipelineSpecification;
     this.queuedDocument = queuedDocument;
+    this.componentIDHash = componentIDHash;
   }
   
   /** Get pipeline specification.
@@ -49,11 +50,10 @@ public class PipelineSpecificationWithVe
   protected DocumentIngestStatus getStatus(int index)
   {
     IPipelineSpecificationBasic basic = pipelineSpecification.getBasicPipelineSpecification();
-    // MHL
     DocumentIngestStatusSet set = queuedDocument.getLastIngestedStatus(basic.getStageConnectionName(basic.getOutputStage(index)));
     if (set == null)
       return null;
-    return set.getPrimary();
+    return set.getComponent(componentIDHash);
   }
   
   /** For a given output index, return a document version string.

Modified: manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1612101&r1=1612100&r2=1612101&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/CONNECTORS-989/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Sun Jul 20 15:17:33 2014
@@ -346,22 +346,20 @@ public class WorkerThread extends Thread
                     boolean isDefaultAuthority = (aclAuthority.length() == 0);
 
                     // Build the processActivity object
+                    Map<String,QueuedDocument> previousDocuments = new HashMap<String,QueuedDocument>();
                     
-                    
-                    Map<String,IPipelineSpecificationWithVersions> fetchPipelineSpecifications = new HashMap<String,IPipelineSpecificationWithVersions>();
                     String[] documentIDs = new String[activeDocuments.size()];
                     int k = 0;
                     for (QueuedDocument qd : activeDocuments)
                     {
-                      fetchPipelineSpecifications.put(qd.getDocumentDescription().getDocumentIdentifierHash(),
-                        new PipelineSpecificationWithVersions(pipelineSpecification,qd));
+                      previousDocuments.put(qd.getDocumentDescription().getDocumentIdentifierHash(),qd);
                       documentIDs[k++] = qd.getDocumentDescription().getDocumentIdentifier();
                     }
                     
                     ProcessActivity activity = new ProcessActivity(job.getID(),processID,
                       threadContext,rt,jobManager,ingester,
                       connectionName,pipelineSpecification,
-                      fetchPipelineSpecifications,
+                      previousDocuments,
                       currentTime,
                       job.getExpiration(),
                       job.getForcedMetadata(),
@@ -380,6 +378,41 @@ public class WorkerThread extends Thread
                       try
                       {
                         connector.processDocuments(documentIDs,existingVersions,job.getSpecification(),activity,jobType,isDefaultAuthority);
+                        
+                        // Now do everything that the connector might have done if we were not doing it for it.
+
+                        // Right now, that's just getting rid of untouched components.
+                        for (QueuedDocument qd : activeDocuments)
+                        {
+                          String documentIdentifier = qd.getDocumentDescription().getDocumentIdentifier();
+                          if (!activity.wasDocumentAborted(documentIdentifier) && !activity.wasDocumentDeleted(documentIdentifier))
+                          {
+                            String documentIdentifierHash = qd.getDocumentDescription().getDocumentIdentifierHash();
+                            // In order to be able to loop over all the components that the incremental ingester knows about, we need to know
+                            // what the FIRST output is.
+                            DocumentIngestStatusSet set = qd.getLastIngestedStatus(ingester.getFirstIndexedOutputConnectionName(pipelineSpecificationBasic));
+                            if (set != null)
+                            {
+                              Iterator<String> componentHashes = set.componentIterator();
+                              while (componentHashes.hasNext())
+                              {
+                                String componentHash = componentHashes.next();
+                                // Check whether we've indexed or not
+                                if (!activity.wasDocumentComponentTouched(documentIdentifier,
+                                  componentHash))
+                                {
+                                  // This component must be removed.
+                                  ingester.documentRemove(
+                                    pipelineSpecificationBasic,
+                                    connectionName,documentIdentifierHash,componentHash,
+                                    ingestLogger);
+                                }
+                              }
+                            }
+                          }
+                        }
+
+                        // Done with connector functionality!
                       }
                       catch (ServiceInterruption e)
                       {
@@ -457,6 +490,7 @@ public class WorkerThread extends Thread
                           ingesterCheckList.add(qd.getDocumentDescription().getDocumentIdentifierHash());
                         }
                       }
+                      
 
                       if (serviceInterruption != null)
                       {
@@ -1086,7 +1120,7 @@ public class WorkerThread extends Thread
     protected final IIncrementalIngester ingester;
     protected final String connectionName;
     protected final IPipelineSpecification pipelineSpecification;
-    protected final Map<String,IPipelineSpecificationWithVersions> fetchPipelineSpecifications;
+    protected final Map<String,QueuedDocument> previousDocuments;
     protected final long currentTime;
     protected final Long expireInterval;
     protected final Map<String,Set<String>> forcedMetadata;
@@ -1122,6 +1156,10 @@ public class WorkerThread extends Thread
     // Whether document was deleted
     protected final Set<String> documentDeletedSet = new HashSet<String>();
     
+    // Whether a component was touched or not, keyed by document identifier.
+    // This does not include primary document.  The set is keyed by component id hash.
+    protected final Map<String,Set<String>> touchedComponentSet = new HashMap<String,Set<String>>();
+    
     /** Constructor.
     *@param jobManager is the job manager
     *@param ingester is the ingester
@@ -1132,7 +1170,7 @@ public class WorkerThread extends Thread
       IIncrementalIngester ingester,
       String connectionName,
       IPipelineSpecification pipelineSpecification,
-      Map<String,IPipelineSpecificationWithVersions> fetchPipelineSpecifications,
+      Map<String,QueuedDocument> previousDocuments,
       long currentTime,
       Long expireInterval,
       Map<String,Set<String>> forcedMetadata,
@@ -1151,7 +1189,7 @@ public class WorkerThread extends Thread
       this.ingester = ingester;
       this.connectionName = connectionName;
       this.pipelineSpecification = pipelineSpecification;
-      this.fetchPipelineSpecifications = fetchPipelineSpecifications;
+      this.previousDocuments = previousDocuments;
       this.currentTime = currentTime;
       this.expireInterval = expireInterval;
       this.forcedMetadata = forcedMetadata;
@@ -1183,6 +1221,17 @@ public class WorkerThread extends Thread
     {
       return touchedSet.contains(documentIdentifier);
     }
+
+    /** Check whether a document component was touched or not.
+    */
+    public boolean wasDocumentComponentTouched(String documentIdentifier,
+      String componentIdentifierHash)
+    {
+      Set<String> components = touchedComponentSet.get(documentIdentifier);
+      if (components == null)
+        return false;
+      return components.contains(componentIdentifierHash);
+    }
     
     /** Check whether document was deleted or not.
     */
@@ -1198,6 +1247,41 @@ public class WorkerThread extends Thread
       return abortSet.contains(documentIdentifier);
     }
     
+    /** Check if a document needs to be reindexed, based on a computed version string.
+    * Call this method to determine whether reindexing is necessary.  Pass in a newly-computed version
+    * string.  This method will return "true" if the document needs to be re-indexed.
+    *@param documentIdentifier is the document identifier.
+    *@param newVersionString is the newly-computed version string.
+    *@return true if the document needs to be reindexed.
+    */
+    @Override
+    public boolean checkDocumentNeedsReindexing(String documentIdentifier,
+      String newVersionString)
+      throws ManifoldCFException
+    {
+      return checkDocumentNeedsReindexing(documentIdentifier,null,newVersionString);
+    }
+
+    /** Check if a document needs to be reindexed, based on a computed version string.
+    * Call this method to determine whether reindexing is necessary.  Pass in a newly-computed version
+    * string.  This method will return "true" if the document needs to be re-indexed.
+    *@param documentIdentifier is the document identifier.
+    *@param componentIdentifier is the component document identifier, if any.
+    *@param newVersionString is the newly-computed version string.
+    *@return true if the document needs to be reindexed.
+    */
+    @Override
+    public boolean checkDocumentNeedsReindexing(String documentIdentifier,
+      String componentIdentifier,
+      String newVersionString)
+      throws ManifoldCFException
+    {
+      String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
+      String componentIdentifierHash = computeComponentIDHash(componentIdentifier);
+      IPipelineSpecificationWithVersions spec = computePipelineSpecification(documentIdentifierHash,componentIdentifierHash);
+      return ingester.checkFetchDocument(spec,newVersionString,parameterVersion,connection.getACLAuthority());
+    }
+
     /** Add a document description to the current job's queue.
     *@param localIdentifier is the local document identifier to add (for the connector that
     * fetched the document).
@@ -1299,41 +1383,6 @@ public class WorkerThread extends Thread
       existingDr.addPrerequisiteEvents(prereqEventNames);
     }
 
-    /** Check if a document needs to be reindexed, based on a computed version string.
-    * Call this method to determine whether reindexing is necessary.  Pass in a newly-computed version
-    * string.  This method will return "true" if the document needs to be re-indexed.
-    *@param documentIdentifier is the document identifier.
-    *@param newVersionString is the newly-computed version string.
-    *@return true if the document needs to be reindexed.
-    */
-    @Override
-    public boolean checkDocumentNeedsReindexing(String documentIdentifier,
-      String newVersionString)
-      throws ManifoldCFException
-    {
-      String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
-      IPipelineSpecificationWithVersions spec = fetchPipelineSpecifications.get(documentIdentifierHash);
-      return ingester.checkFetchDocument(spec,newVersionString,parameterVersion,connection.getACLAuthority());
-    }
-
-    /** Check if a document needs to be reindexed, based on a computed version string.
-    * Call this method to determine whether reindexing is necessary.  Pass in a newly-computed version
-    * string.  This method will return "true" if the document needs to be re-indexed.
-    *@param documentIdentifier is the document identifier.
-    *@param componentIdentifier is the component document identifier, if any.
-    *@param newVersionString is the newly-computed version string.
-    *@return true if the document needs to be reindexed.
-    */
-    @Override
-    public boolean checkDocumentNeedsReindexing(String documentIdentifier,
-      String componentIdentifier,
-      String newVersionString)
-      throws ManifoldCFException
-    {
-      // MHL
-      return false;
-    }
-
     /** Add a document description to the current job's queue.
     *@param localIdentifier is the local document identifier to add (for the connector that
     * fetched the document).
@@ -1451,16 +1500,13 @@ public class WorkerThread extends Thread
       throws ManifoldCFException
     {
       String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
-      String componentIdentifierHash;
-      if (componentIdentifier != null)
-        componentIdentifierHash = ManifoldCF.hash(componentIdentifier);
-      else
-        componentIdentifierHash = null;
+      String componentIdentifierHash = computeComponentIDHash(componentIdentifier);
       ingester.documentRecord(
         pipelineSpecification.getBasicPipelineSpecification(),
         connectionName,documentIdentifierHash,componentIdentifierHash,
         version,currentTime);
       touchedSet.add(documentIdentifier);
+      touchComponentSet(documentIdentifier,componentIdentifierHash);
     }
 
     /** Ingest the current document.
@@ -1502,6 +1548,7 @@ public class WorkerThread extends Thread
     public void ingestDocumentWithException(String documentIdentifier, String version, String documentURI, RepositoryDocument data)
       throws ManifoldCFException, ServiceInterruption, IOException
     {
+      ingestDocumentWithException(documentIdentifier,null,version,documentURI,data);
     }
 
     /** Ingest the current document.
@@ -1525,11 +1572,7 @@ public class WorkerThread extends Thread
       // always ingest (essentially)
 
       String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
-      String componentIdentifierHash;
-      if (componentIdentifier != null)
-        componentIdentifierHash = ManifoldCF.hash(componentIdentifier);
-      else
-        componentIdentifierHash = null;
+      String componentIdentifierHash = computeComponentIDHash(componentIdentifier);
 
       if (data != null)
       {
@@ -1551,7 +1594,7 @@ public class WorkerThread extends Thread
         
       // First, we need to add into the metadata the stuff from the job description.
       ingester.documentIngest(
-        fetchPipelineSpecifications.get(documentIdentifierHash),
+        computePipelineSpecification(documentIdentifierHash,componentIdentifierHash),
         connectionName,documentIdentifierHash,componentIdentifierHash,
         version,parameterVersion,
         connection.getACLAuthority(),
@@ -1560,6 +1603,7 @@ public class WorkerThread extends Thread
         ingestLogger);
       
       touchedSet.add(documentIdentifier);
+      touchComponentSet(documentIdentifier,componentIdentifierHash);
     }
 
     /** Remove the specified document from the search engine index, while keeping track of the version information
@@ -1589,13 +1633,10 @@ public class WorkerThread extends Thread
       // Special interpretation for empty version string; treat as if the document doesn't exist
       // (by ignoring it and allowing it to be deleted later)
       String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
-      String componentIdentifierHash;
-      if (componentIdentifier != null)
-        componentIdentifierHash = ManifoldCF.hash(componentIdentifier);
-      else
-        componentIdentifierHash = null;
+      String componentIdentifierHash = computeComponentIDHash(componentIdentifier);
+
       ingester.documentNoData(
-        fetchPipelineSpecifications.get(documentIdentifierHash),
+        computePipelineSpecification(documentIdentifierHash,componentIdentifierHash),
         connectionName,documentIdentifierHash,componentIdentifierHash,
         version,parameterVersion,
         connection.getACLAuthority(),
@@ -1603,54 +1644,44 @@ public class WorkerThread extends Thread
         ingestLogger);
       
       touchedSet.add(documentIdentifier);
+      touchComponentSet(documentIdentifier,componentIdentifierHash);
     }
 
-    /** Remove the specified document component permanently from the search engine index, and from the status table.
-    * This method does NOT keep track of any document version information for the document and thus can
-    * lead to "churn", whereby the same document is queued, processed,
-    * and removed on subsequent crawls.  It is therefore preferable to use noDocument() instead,
-    * in any case where the same decision will need to be made over and over.
+    /** Remove the specified document primary component permanently from the search engine index,
+    * and from the status table.  Use this method when your document has components and
+    * now also has a primary document, but will not have a primary document again for the foreseeable
+    * future.  This is a rare situation.
     *@param documentIdentifier is the document's identifier.
-    *@param componentIdentifier is the component document identifier, if any.
     */
     @Override
     public void removeDocument(String documentIdentifier)
       throws ManifoldCFException, ServiceInterruption
     {
-      removeDocument(documentIdentifier,null);
-    }
-
-    /** Remove the specified document component permanently from the search engine index, and from the status table.
-    * This method does NOT keep track of any document version information for the document and thus can
-    * lead to "churn", whereby the same document is queued, processed,
-    * and removed on subsequent crawls.  It is therefore preferable to use noDocument() instead,
-    * in any case where the same decision will need to be made over and over.
-    *@param documentIdentifier is the document's identifier.
-    *@param componentIdentifier is the component document identifier, if any.
-    */
-    @Override
-    public void removeDocument(String documentIdentifier,
-      String componentIdentifier)
-      throws ManifoldCFException, ServiceInterruption
-    {
-      // Remove from incremental ingester ONLY.
       String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
-      String componentIdentifierHash;
-      if (componentIdentifier != null)
-        componentIdentifierHash = ManifoldCF.hash(componentIdentifier);
-      else
-        componentIdentifierHash = null;
-
       ingester.documentRemove(
         pipelineSpecification.getBasicPipelineSpecification(),
-        connectionName,documentIdentifierHash,componentIdentifierHash,
+        connectionName,documentIdentifierHash,null,
         ingestLogger);
-      
+        
       // Note that we touched it, so it won't get checked
       touchedSet.add(documentIdentifier);
     }
 
+    /** Retain existing document component.  Use this method to signal that an already-existing
+    * document component does not need to be reindexed.  The default behavior is to remove
+    * components that are not mentioned during processing.
+    *@param documentIdentifier is the document's identifier.
+    *@param componentIdentifier is the component document identifier, which cannot be null.
+    */
+    @Override
+    public void retainDocument(String documentIdentifier,
+      String componentIdentifier)
+      throws ManifoldCFException
+    {
+      touchComponentSet(documentIdentifier,computeComponentIDHash(componentIdentifier));
+    }
 
+    
     /** Delete the current document from the search engine index, while keeping track of the version information
     * for it (to reduce churn).
     * Use noDocument() above instead.
@@ -2065,8 +2096,36 @@ public class WorkerThread extends Thread
       return ManifoldCF.createJobSpecificString(jobID,simpleString);
     }
 
+    protected void touchComponentSet(String documentIdentifier, String componentIdentifierHash)
+    {
+      if (componentIdentifierHash == null)
+        return;
+      Set<String> components = touchedComponentSet.get(documentIdentifier);
+      if (components == null)
+      {
+        components = new HashSet<String>();
+        touchedComponentSet.put(documentIdentifier,components);
+      }
+      components.add(componentIdentifierHash);
+    }
+    
+    protected IPipelineSpecificationWithVersions computePipelineSpecification(String documentIdentifierHash,
+      String componentIdentifierHash)
+    {
+      return new PipelineSpecificationWithVersions(pipelineSpecification,previousDocuments.get(documentIdentifierHash),componentIdentifierHash);
+    }
+
   }
 
+  protected static String computeComponentIDHash(String componentIdentifier)
+    throws ManifoldCFException
+  {
+    if (componentIdentifier != null)
+      return ManifoldCF.hash(componentIdentifier);
+    else
+      return null;
+  }
+    
   /** DocumentBin class */
   protected static class DocumentBin
   {
@@ -2121,6 +2180,7 @@ public class WorkerThread extends Thread
       }
       return true;
     }
+    
   }
 
   /** Class describing document reference.
@@ -2411,14 +2471,33 @@ public class WorkerThread extends Thread
     */
     @Override
     public String getIndexedVersionString(String documentIdentifier)
+      throws ManifoldCFException
+    {
+      return getIndexedVersionString(documentIdentifier,null);
+    }
+
+    /** Retrieve a component existing version string given a document identifier.
+    *@param documentIdentifier is the document identifier.
+    *@param componentIdentifier is the component identifier, if any.
+    *@return the document version string, or null of the document component was never previously indexed.
+    */
+    @Override
+    public String getIndexedVersionString(String documentIdentifier, String componentIdentifier)
+      throws ManifoldCFException
     {
       QueuedDocument qd = map.get(documentIdentifier);
       DocumentIngestStatusSet status = qd.getLastIngestedStatus(lastOutputConnectionName);
-      // MHL
-      if (status == null || status.getPrimary() == null)
+      if (status == null)
+        return null;
+      String componentIdentifierHash;
+      if (componentIdentifier == null)
+        componentIdentifierHash = null;
+      else
+        componentIdentifierHash = ManifoldCF.hash(componentIdentifier);
+      DocumentIngestStatus s = status.getComponent(componentIdentifierHash);
+      if (s == null)
         return null;
-      // MHL
-      return status.getPrimary().getDocumentVersion();
+      return s.getDocumentVersion();
     }
 
   }



Mime
View raw message