manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1610987 - in /manifoldcf/trunk: ./ connectors/documentfilter/connector/src/main/java/org/apache/manifoldcf/agents/transformation/documentfilter/ connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/...
Date Wed, 16 Jul 2014 12:54:53 GMT
Author: kwright
Date: Wed Jul 16 12:54:53 2014
New Revision: 1610987

URL: http://svn.apache.org/r1610987
Log:
Fix for CONNECTORS-993.  Pipeline code now has an explicit path for handling the no-document
case.

Modified:
    manifoldcf/trunk/CHANGES.txt
    manifoldcf/trunk/connectors/documentfilter/connector/src/main/java/org/apache/manifoldcf/agents/transformation/documentfilter/DocumentFilter.java
    manifoldcf/trunk/connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ForcedMetadataConnector.java
    manifoldcf/trunk/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java
    manifoldcf/trunk/connectors/tika/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tika/TikaExtractor.java
    manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
    manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
    manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Modified: manifoldcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/CHANGES.txt?rev=1610987&r1=1610986&r2=1610987&view=diff
==============================================================================
--- manifoldcf/trunk/CHANGES.txt (original)
+++ manifoldcf/trunk/CHANGES.txt Wed Jul 16 12:54:53 2014
@@ -3,6 +3,16 @@ $Id$
 
 ======================= 1.7-dev =====================
 
+CONNECTORS-993: Pipeline code not handling "no document" case properly.
+This problem was an oversight of the new pipeline code.  Essentially,
+transformation connectors could choose not to send a document onward
+to the next stage.  If this happened, then the document version information
+for the corresponding output connection would never get written, and
+no incremental-ness would be possible.
+I fixed this by introducing a "noDocument()" IOutputAddActivity method,
+which transformation connectors should call when they reject a document.
+(Karl Wright)
+
 CONNECTORS-992: Add a test to exercise ServiceInterruption within
 a connector.
 (Karl Wright)

Modified: manifoldcf/trunk/connectors/documentfilter/connector/src/main/java/org/apache/manifoldcf/agents/transformation/documentfilter/DocumentFilter.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/documentfilter/connector/src/main/java/org/apache/manifoldcf/agents/transformation/documentfilter/DocumentFilter.java?rev=1610987&r1=1610986&r2=1610987&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/documentfilter/connector/src/main/java/org/apache/manifoldcf/agents/transformation/documentfilter/DocumentFilter.java
(original)
+++ manifoldcf/trunk/connectors/documentfilter/connector/src/main/java/org/apache/manifoldcf/agents/transformation/documentfilter/DocumentFilter.java
Wed Jul 16 12:54:53 2014
@@ -117,7 +117,7 @@ public class DocumentFilter extends org.
   public int addOrReplaceDocumentWithException(String documentURI, VersionContext outputDescription,
RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
     throws ManifoldCFException, ServiceInterruption, IOException
   {
-    return activities.sendDocument(documentURI, document, authorityNameString);
+    return activities.sendDocument(documentURI, document);
   }
   
   protected static void fillInContentsSpecificationMap(Map<String,Object> paramMap,
Specification os)

Modified: manifoldcf/trunk/connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ForcedMetadataConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ForcedMetadataConnector.java?rev=1610987&r1=1610986&r2=1610987&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ForcedMetadataConnector.java
(original)
+++ manifoldcf/trunk/connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ForcedMetadataConnector.java
Wed Jul 16 12:54:53 2014
@@ -132,7 +132,7 @@ public class ForcedMetadataConnector ext
     // Finally, send the modified repository document onward to the next pipeline stage.
     // If we'd done anything to the stream, we'd have needed to create a new RepositoryDocument
object and copied the
     // data into it, and closed the new stream after sendDocument() was called.
-    return activities.sendDocument(documentURI,docCopy,authorityNameString);
+    return activities.sendDocument(documentURI,docCopy);
   }
 
   // UI support methods.

Modified: manifoldcf/trunk/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java?rev=1610987&r1=1610986&r2=1610987&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java
(original)
+++ manifoldcf/trunk/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java
Wed Jul 16 12:54:53 2014
@@ -72,7 +72,7 @@ public class NullConnector extends org.a
     try
     {
       long binaryLength = document.getBinaryLength();
-      int rval = activities.sendDocument(documentURI,document,authorityNameString);
+      int rval = activities.sendDocument(documentURI,document);
       length =  new Long(binaryLength);
       resultCode = (rval == DOCUMENTSTATUS_ACCEPTED)?"ACCEPTED":"REJECTED";
       return rval;

Modified: manifoldcf/trunk/connectors/tika/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tika/TikaExtractor.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/tika/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tika/TikaExtractor.java?rev=1610987&r1=1610986&r2=1610987&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/tika/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tika/TikaExtractor.java
(original)
+++ manifoldcf/trunk/connectors/tika/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tika/TikaExtractor.java
Wed Jul 16 12:54:53 2014
@@ -153,7 +153,10 @@ public class TikaExtractor extends org.a
   {
     // First, make sure downstream pipeline will now accept text/plain;charset=utf-8
     if (!activities.checkMimeTypeIndexable("text/plain;charset=utf-8"))
+    {
+      activities.noDocument();
       return DOCUMENTSTATUS_REJECTED;
+    }
 
     SpecPacker sp = new SpecPacker(pipelineDescription.getVersionString());
 
@@ -245,7 +248,10 @@ public class TikaExtractor extends org.a
       
       // Check to be sure downstream pipeline will accept document of specified length
       if (!activities.checkLengthIndexable(ds.getBinaryLength()))
+      {
+        activities.noDocument();
         return DOCUMENTSTATUS_REJECTED;
+      }
         
       // Parsing complete!
       // Create a copy of Repository Document
@@ -278,7 +284,7 @@ public class TikaExtractor extends org.a
         }
 
         // Send new document downstream
-        int rval = activities.sendDocument(documentURI,docCopy,authorityNameString);
+        int rval = activities.sendDocument(documentURI,docCopy);
         length =  new Long(newBinaryLength);
         resultCode = (rval == DOCUMENTSTATUS_ACCEPTED)?"ACCEPTED":"REJECTED";
         return rval;

Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1610987&r1=1610986&r2=1610987&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
(original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
Wed Jul 16 12:54:53 2014
@@ -653,6 +653,55 @@ public class IncrementalIngester extends
     }
   }
 
+  /** Remove a document from specified indexes, just as if an empty document
+  * was indexed, and record the necessary version information.
+  * This method is conceptually similar to documentIngest(), but does not actually take
+  * a document or allow it to be transformed.  If there is a document already
+  * indexed, it is removed from the index.
+  *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched
output versioning information.
+  *@param identifierClass is the name of the space in which the identifier hash should be
interpreted.
+  *@param identifierHash is the hashed document identifier.
+  *@param documentVersion is the document version.
+  *@param parameterVersion is the version string for the forced parameters.
+  *@param authorityName is the name of the authority associated with the document, if any.
+  *@param recordTime is the time at which the recording took place, in milliseconds since
epoch.
+  *@param activities is an object providing a set of methods that the implementer can use
to perform the operation.
+  */
+  @Override
+  public void documentNoData(
+    IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+    String identifierClass, String identifierHash,
+    String documentVersion,
+    String parameterVersion,
+    String authorityName,
+    long recordTime,
+    IOutputActivity activities)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    PipelineConnectionsWithVersions pipelineConnectionsWithVersions = new PipelineConnectionsWithVersions(pipelineSpecificationWithVersions);
+    
+    String docKey = makeKey(identifierClass,identifierHash);
+
+    if (Logging.ingest.isDebugEnabled())
+    {
+      Logging.ingest.debug("Logging empty document '"+docKey+"' into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
+    }
+
+    // Set up a pipeline
+    PipelineObjectWithVersions pipeline = pipelineGrabWithVersions(pipelineConnectionsWithVersions);
+    if (pipeline == null)
+      // A connector is not installed; treat this as a service interruption.
+      throw new ServiceInterruption("Pipeline connector not installed",0L);
+    try
+    {
+      pipeline.noDocument(docKey,documentVersion,parameterVersion,authorityName,activities,recordTime);
+    }
+    finally
+    {
+      pipeline.release();
+    }
+  }
+
   /** Ingest a document.
   * This ingests the document, and notes it.  If this is a repeat ingestion of the document,
this
   * method also REMOVES ALL OLD METADATA.  When complete, the index will contain only the
metadata
@@ -2034,14 +2083,24 @@ public class IncrementalIngester extends
     /** Send a document via the pipeline to the next output connection.
     *@param documentURI is the document's URI.
     *@param document is the document data to be processed (handed to the output data store).
-    *@param authorityNameString is the authority name string that should be used to qualify
the document's access tokens.
     *@return the document status (accepted or permanently rejected); return codes are listed
in IPipelineConnector.
     *@throws IOException only if there's an IO error reading the data from the document.
     */
-    public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString)
+    @Override
+    public int sendDocument(String documentURI, RepositoryDocument document)
       throws ManifoldCFException, ServiceInterruption, IOException
     {
-      return addActivities.sendDocument(documentURI,document,authorityNameString);
+      return addActivities.sendDocument(documentURI,document);
+    }
+
+    /** Send NO document via the pipeline to the next output connection.  This is equivalent
+    * to sending an empty document placeholder.
+    */
+    @Override
+    public void noDocument()
+      throws ManifoldCFException, ServiceInterruption
+    {
+      addActivities.noDocument();
     }
 
     /** Detect if a mime type is acceptable downstream or not.  This method is used to determine
whether it makes sense to fetch a document
@@ -2247,9 +2306,16 @@ public class IncrementalIngester extends
       throws ManifoldCFException, ServiceInterruption, IOException
     {
       PipelineAddFanout entryPoint = buildAddPipeline(finalActivity,newDocumentVersion,newParameterVersion,authorityNameString,ingestTime,docKey);
-      return entryPoint.sendDocument(documentURI,document,authorityNameString);
+      return entryPoint.sendDocument(documentURI,document);
     }
-    
+
+    public void noDocument(String docKey, String newDocumentVersion, String newParameterVersion,
String authorityNameString, IOutputActivity finalActivity, long ingestTime)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      PipelineAddFanout entryPoint = buildAddPipeline(finalActivity,newDocumentVersion,newParameterVersion,authorityNameString,ingestTime,docKey);
+      entryPoint.noDocument();
+    }
+
     protected PipelineAddFanout buildAddPipeline(IOutputActivity finalActivity,
       String newDocumentVersion, String newParameterVersion, String newAuthorityNameString,
       long ingestTime, String docKey)
@@ -2307,7 +2373,8 @@ public class IncrementalIngester extends
           ingestTime,
           newDocumentVersion,
           newParameterVersion,
-          docKey);
+          docKey,
+          newAuthorityNameString);
         currentSet.put(new Integer(outputStage), outputStageEntryPoint);
       }
       // Cycle through the "current set"
@@ -2357,7 +2424,7 @@ public class IncrementalIngester extends
           return pcf;
         PipelineAddEntryPoint newEntry = new PipelineAddEntryPoint(
           transformationConnectors[pipelineConnections.getTransformationConnectionIndex(parent).intValue()],
-          pipelineSpec.getStageDescriptionString(parent),pcf,pcf.checkNeedToReindex());
+          pipelineSpec.getStageDescriptionString(parent),newAuthorityNameString,pcf,pcf.checkNeedToReindex());
         currentSet.put(new Integer(parent), newEntry);
       }
 
@@ -2555,11 +2622,11 @@ public class IncrementalIngester extends
     /** Send a document via the pipeline to the next output connection.
     *@param documentURI is the document's URI.
     *@param document is the document data to be processed (handed to the output data store).
-    *@param authorityNameString is the authority name string that should be used to qualify
the document's access tokens.
     *@return the document status (accepted or permanently rejected); return codes are listed
in IPipelineConnector.
     *@throws IOException only if there's an IO error reading the data from the document.
     */
-    public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString)
+    @Override
+    public int sendDocument(String documentURI, RepositoryDocument document)
       throws ManifoldCFException, ServiceInterruption, IOException
     {
       // First, count the number of active entry points.
@@ -2577,7 +2644,7 @@ public class IncrementalIngester extends
         {
           if (!p.isActive())
             continue;
-          if (p.addOrReplaceDocumentWithException(documentURI,document,authorityNameString)
== IPipelineConnector.DOCUMENTSTATUS_ACCEPTED)
+          if (p.addOrReplaceDocumentWithException(documentURI,document) == IPipelineConnector.DOCUMENTSTATUS_ACCEPTED)
             rval = IPipelineConnector.DOCUMENTSTATUS_ACCEPTED;
         }
         return rval;
@@ -2594,7 +2661,7 @@ public class IncrementalIngester extends
           {
             if (!p.isActive())
               continue;
-            if (p.addOrReplaceDocumentWithException(documentURI,factory.createDocument(),authorityNameString)
== IPipelineConnector.DOCUMENTSTATUS_ACCEPTED)
+            if (p.addOrReplaceDocumentWithException(documentURI,factory.createDocument())
== IPipelineConnector.DOCUMENTSTATUS_ACCEPTED)
               rval = IPipelineConnector.DOCUMENTSTATUS_ACCEPTED;
           }
           return rval;
@@ -2606,6 +2673,23 @@ public class IncrementalIngester extends
       }
     }
 
+    /** Send NO document via the pipeline to the next output connection.  This is equivalent
+    * to sending an empty document placeholder.
+    */
+    @Override
+    public void noDocument()
+      throws ManifoldCFException, ServiceInterruption
+    {
+      for (PipelineAddEntryPoint p : entryPoints)
+      {
+        if (p.isActive())
+        {
+          // Invoke the addEntryPoint method for handling "noDocument"
+          p.noDocument();
+        }
+      }
+    }
+
     /** Qualify an access token appropriately, to match access tokens as returned by mod_aa.
 This method
     * includes the authority name with the access token, if any, so that each authority may
establish its own token space.
     *@param authorityNameString is the name of the authority to use to qualify the access
token.
@@ -2652,16 +2736,19 @@ public class IncrementalIngester extends
   {
     protected final IPipelineConnector pipelineConnector;
     protected final VersionContext pipelineDescriptionString;
+    protected final String authorityNameString;
     protected final IOutputAddActivity addActivity;
     protected final boolean isActive;
     
     public PipelineAddEntryPoint(IPipelineConnector pipelineConnector,
       VersionContext pipelineDescriptionString,
+      String authorityNameString,
       IOutputAddActivity addActivity,
       boolean isActive)
     {
       this.pipelineConnector = pipelineConnector;
       this.pipelineDescriptionString = pipelineDescriptionString;
+      this.authorityNameString = authorityNameString;
       this.addActivity = addActivity;
       this.isActive = isActive;
     }
@@ -2695,13 +2782,20 @@ public class IncrementalIngester extends
       return pipelineConnector.checkURLIndexable(pipelineDescriptionString,uri,addActivity);
     }
 
-    public int addOrReplaceDocumentWithException(String documentURI, RepositoryDocument document,
String authorityNameString)
+    public int addOrReplaceDocumentWithException(String documentURI, RepositoryDocument document)
       throws ManifoldCFException, ServiceInterruption, IOException
     {
       return pipelineConnector.addOrReplaceDocumentWithException(
         documentURI,pipelineDescriptionString,
         document,authorityNameString,addActivity);
     }
+    
+    public void noDocument()
+      throws ManifoldCFException, ServiceInterruption
+    {
+      // Call the addActivity method for handling no document
+      addActivity.noDocument();
+    }
   }
   
   public class OutputAddEntryPoint extends PipelineAddEntryPoint
@@ -2724,9 +2818,10 @@ public class IncrementalIngester extends
       long ingestTime,
       String documentVersion,
       String parameterVersion,
-      String docKey)
+      String docKey,
+      String authorityNameString)
     {
-      super(outputConnector,outputDescriptionString,activity,isActive);
+      super(outputConnector,outputDescriptionString,authorityNameString,activity,isActive);
       this.outputConnector = outputConnector;
       this.outputConnectionName = outputConnectionName;
       this.transformationVersion = transformationVersion;
@@ -2738,7 +2833,21 @@ public class IncrementalIngester extends
     }
     
     @Override
-    public int addOrReplaceDocumentWithException(String documentURI, RepositoryDocument document,
String authorityNameString)
+    public void noDocument()
+      throws ManifoldCFException, ServiceInterruption
+    {
+      try
+      {
+        addOrReplaceDocumentWithException(null,null);
+      }
+      catch (IOException e)
+      {
+        throw new RuntimeException("Unexpected IOException: "+e.getMessage(),e);
+      }
+    }
+    
+    @Override
+    public int addOrReplaceDocumentWithException(String documentURI, RepositoryDocument document)
       throws ManifoldCFException, ServiceInterruption, IOException
     {
       // No transactions; not safe because post may take too much time
@@ -2850,7 +2959,7 @@ public class IncrementalIngester extends
           // that we don't know anything about it.  That means it will be reingested when
the
           // next version comes along, and will be deleted if called for also.
           noteDocumentIngest(outputConnectionName,docKey,null,null,null,null,null,ingestTime,documentURI,documentURIHash);
-          int result = super.addOrReplaceDocumentWithException(documentURI, document, authorityNameString);
+          int result = super.addOrReplaceDocumentWithException(documentURI, document);
           noteDocumentIngest(outputConnectionName,docKey,documentVersion,transformationVersion,pipelineDescriptionString.getVersionString(),parameterVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
           return result;
         }

Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java?rev=1610987&r1=1610986&r2=1610987&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
(original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
Wed Jul 16 12:54:53 2014
@@ -163,6 +163,30 @@ public interface IIncrementalIngester
     String documentVersion, long recordTime)
     throws ManifoldCFException;
 
+  /** Remove a document from specified indexes, just as if an empty document
+  * was indexed, and record the necessary version information.
+  * This method is conceptually similar to documentIngest(), but does not actually take
+  * a document or allow it to be transformed.  If there is a document already
+  * indexed, it is removed from the index.
+  *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched
output versioning information.
+  *@param identifierClass is the name of the space in which the identifier hash should be
interpreted.
+  *@param identifierHash is the hashed document identifier.
+  *@param documentVersion is the document version.
+  *@param parameterVersion is the version string for the forced parameters.
+  *@param authorityName is the name of the authority associated with the document, if any.
+  *@param recordTime is the time at which the recording took place, in milliseconds since
epoch.
+  *@param activities is an object providing a set of methods that the implementer can use
to perform the operation.
+  */
+  public void documentNoData(
+    IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+    String identifierClass, String identifierHash,
+    String documentVersion,
+    String parameterVersion,
+    String authorityName,
+    long recordTime,
+    IOutputActivity activities)
+    throws ManifoldCFException, ServiceInterruption;
+
   /** Ingest a document.
   * This ingests the document, and notes it.  If this is a repeat ingestion of the document,
this
   * method also REMOVES ALL OLD METADATA.  When complete, the index will contain only the
metadata

Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java?rev=1610987&r1=1610986&r2=1610987&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java
(original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java
Wed Jul 16 12:54:53 2014
@@ -33,11 +33,16 @@ public interface IOutputAddActivity exte
   /** Send a document via the pipeline to the next output connection.
   *@param documentURI is the document's URI.
   *@param document is the document data to be processed (handed to the output data store).
-  *@param authorityNameString is the authority name string that should be used to qualify
the document's access tokens.
   *@return the document status (accepted or permanently rejected); return codes are listed
in IPipelineConnector.
   *@throws IOException only if there's an IO error reading the data from the document.
   */
-  public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString)
+  public int sendDocument(String documentURI, RepositoryDocument document)
     throws ManifoldCFException, ServiceInterruption, IOException;
 
+  /** Send NO document via the pipeline to the next output connection.  This is equivalent
+  * to sending an empty document placeholder.
+  */
+  public void noDocument()
+    throws ManifoldCFException, ServiceInterruption;
+
 }

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1610987&r1=1610986&r2=1610987&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
(original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Wed Jul 16 12:54:53 2014
@@ -1522,19 +1522,14 @@ public class WorkerThread extends Thread
     {
       // Special interpretation for empty version string; treat as if the document doesn't
exist
       // (by ignoring it and allowing it to be deleted later)
-      if (version.length() > 0)
-      {
-        try
-        {
-          ingestDocumentWithException(documentIdentifier,version,null,null);
-        }
-        catch (IOException e)
-        {
-          // Should never occur, since we passed in no data
-          throw new IllegalStateException("IngestDocumentWithException threw an illegal IOException:
"+e.getMessage(),e);
-        }
-      }
-
+      String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
+      ingester.documentNoData(
+        fetchPipelineSpecifications.get(documentIdentifierHash),
+        connectionName,documentIdentifierHash,
+        version,parameterVersion,
+        connection.getACLAuthority(),
+        currentTime,
+        ingestLogger);
     }
 
     /** Delete the current document from the search engine index, while keeping track of
the version information
@@ -2365,17 +2360,26 @@ public class WorkerThread extends Thread
     /** Send a document via the pipeline to the next output connection.
     *@param documentURI is the document's URI.
     *@param document is the document data to be processed (handed to the output data store).
-    *@param authorityNameString is the authority name string that should be used to qualify
the document's access tokens.
     *@return the document status (accepted or permanently rejected); return codes are listed
in IPipelineConnector.
     *@throws IOException only if there's an IO error reading the data from the document.
     */
-    public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString)
+    @Override
+    public int sendDocument(String documentURI, RepositoryDocument document)
       throws ManifoldCFException, ServiceInterruption, IOException
     {
       // No downstream connection at output connection level.
       return IPipelineConnector.DOCUMENTSTATUS_REJECTED;
     }
 
+    /** Send NO document via the pipeline to the next output connection.  This is equivalent
+    * to sending an empty document placeholder.
+    */
+    @Override
+    public void noDocument()
+      throws ManifoldCFException, ServiceInterruption
+    {
+    }
+
   }
 
   protected final static long interruptionRetryTime = 5L*60L*1000L;



Mime
View raw message