manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1610560 - in /manifoldcf/branches/CONNECTORS-990/framework: agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/ pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/ pull-agent/src/main/java/org/apache/manifol...
Date Mon, 14 Jul 2014 23:33:47 GMT
Author: kwright
Date: Mon Jul 14 23:33:47 2014
New Revision: 1610560

URL: http://svn.apache.org/r1610560
Log:
Get basic crawling working again in a backwards-compatible way

Modified:
    manifoldcf/branches/CONNECTORS-990/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
    manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java
    manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Modified: manifoldcf/branches/CONNECTORS-990/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-990/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1610560&r1=1610559&r2=1610560&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-990/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
(original)
+++ manifoldcf/branches/CONNECTORS-990/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
Mon Jul 14 23:33:47 2014
@@ -511,12 +511,11 @@ public class IncrementalIngester extends
     String newParameterVersion,
     String newAuthorityNameString)
   {
+    if (newAuthorityNameString == null)
+      newAuthorityNameString = "";
     IPipelineSpecification pipelineSpecification = pipelineSpecificationWithVersions.getPipelineSpecification();
     IPipelineSpecificationBasic basicSpecification = pipelineSpecification.getBasicPipelineSpecification();
-    // Empty document version has a special meaning....
-    if (newDocumentVersion.length() == 0)
-      return true;
-    // Otherwise, cycle through the outputs
+    // Cycle through the outputs
     for (int i = 0; i < basicSpecification.getOutputCount(); i++)
     {
       int stage = basicSpecification.getOutputStage(i);
@@ -531,7 +530,7 @@ public class IncrementalIngester extends
       if (!oldDocumentVersion.equals(newDocumentVersion) ||
         !oldParameterVersion.equals(newParameterVersion) ||
         !oldAuthorityName.equals(newAuthorityNameString) ||
-        !oldOutputVersion.equals(pipelineSpecification.getStageDescriptionString(stage)))
+        !oldOutputVersion.equals(pipelineSpecification.getStageDescriptionString(stage).getVersionString()))
         return true;
       
       // Everything matches so far.  Next step is to compute a transformation path an corresponding
version string.

Modified: manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java?rev=1610560&r1=1610559&r2=1610560&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java
(original)
+++ manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java
Mon Jul 14 23:33:47 2014
@@ -368,9 +368,24 @@ public abstract class BaseRepositoryConn
         if (vc != null)
         {
           if (dv.isAlwaysRefetch(documentIdentifier) || activities.checkDocumentNeedsReindexing(documentIdentifier,vc.getVersionString()))
+          {
+            System.out.println("Reprocessing "+documentIdentifier+"; computed version string
= '"+vc.getVersionString()+"'; old version string= '"+oldVersions[i]+"'; alwaysRefetch="+dv.isAlwaysRefetch(documentIdentifier));
+            // These documents need reprocessing
             fetchDocuments.add(documentIdentifier);
+          }
+          else
+          {
+            // These documents have been checked and found NOT to need reprocessing
+            activities.noteUnchangedDocument(documentIdentifier);
+          }
           scanDocuments.add(documentIdentifier);
         }
+        else
+        {
+          // These documents must go away permanently
+          // MHL to collect these and do them as a group
+          activities.deleteDocument(documentIdentifier);
+        }
       }
 
       // Construct the appropriate data to call processDocuments() with
@@ -388,6 +403,7 @@ public abstract class BaseRepositoryConn
         }
       }
       processDocuments(processIDs,dv,activities,scanOnly,jobMode);
+      
     }
     finally
     {

Modified: manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1610560&r1=1610559&r2=1610560&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
(original)
+++ manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Mon Jul 14 23:33:47 2014
@@ -290,6 +290,7 @@ public class WorkerThread extends Thread
                   try
                   {
                     pipelineSpecification = new PipelineSpecification(pipelineSpecificationBasic,job,ingester);
+                    System.out.println("Obtained pipeline specification");
                   }
                   catch (ServiceInterruption e)
                   {
@@ -342,6 +343,7 @@ public class WorkerThread extends Thread
                     // **** New worker thread code starts here!!! ****
                     
                     IExistingVersions existingVersions = new ExistingVersions(lastIndexedOutputConnectionName,activeDocuments);
+                    System.out.println("Built existingVersions");
                     String aclAuthority = connection.getACLAuthority();
                     if (aclAuthority == null)
                       aclAuthority = "";
@@ -361,7 +363,8 @@ public class WorkerThread extends Thread
                       documentIDs[i] = qd.getDocumentDescription().getDocumentIdentifier();
                       documentIDHashes[i] = qd.getDocumentDescription().getDocumentIdentifierHash();
                     }
-                        
+                    System.out.println("Built fetchPipelineSpecifications");
+                    
                     ProcessActivity activity = new ProcessActivity(job.getID(),processID,
                       threadContext,rt,jobManager,ingester,
                       connectionName,pipelineSpecification,
@@ -382,13 +385,16 @@ public class WorkerThread extends Thread
                       // Now, process in bulk -- catching and handling ServiceInterruptions
                       try
                       {
+                        System.out.println("Processing documents");
                         connector.processDocuments(documentIDs,existingVersions,job.getSpecification(),activity,jobType,isDefaultAuthority);
-
+                        System.out.println("Done processing documents");
+                        
                         for (QueuedDocument qd : activeDocuments)
                         {
                           // If this document was aborted, then treat it specially.
                           if (activity.wasDocumentAborted(qd.getDocumentDescription().getDocumentIdentifier()))
                           {
+                            System.out.println("Document was aborted");
                             // Special treatment for aborted documents.
                             // We ignore the returned version string completely, since it's
presumed that processing was not completed for this doc.
                             // We want to give up immediately on this one, and just requeue
it for immediate reprocessing (pending its prereqs being all met).
@@ -401,13 +407,26 @@ public class WorkerThread extends Thread
                             // If the document is not being deleted, add it to the finish
set.
                             if (activity.wasDocumentProcessed(qd.getDocumentDescription().getDocumentIdentifier()))
                             {
+                              System.out.println("Document was processed");
                               finishList.add(qd);
                             }
                             else if (activity.wasDocumentUnchanged(qd.getDocumentDescription().getDocumentIdentifier()))
                             {
+                              System.out.println("Document was unchanged");
                               finishList.add(qd);
                               ingesterCheckList.add(qd.getDocumentDescription().getDocumentIdentifierHash());
                             }
+                            else
+                            {
+                              // Anything else means that the document was not found and
should be deleted, eventually.
+                              System.out.println("Document was neither processed nor unchanged");
+                              // We can't just delete because of connector backwards compatibility.
 The case in question
+                              // is handling documents that are not indexed, such as file
system directories.  To prevent
+                              // the job from not terminating, we have to add this document
to the finish list so that it gets
+                              // marked as being done.
+                              //deleteList.add(qd);
+                              finishList.add(qd);
+                            }
                           }
                         }
                         
@@ -420,6 +439,7 @@ public class WorkerThread extends Thread
                         ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,
                           requeueCandidates,connector,connection,rt,currentTime);
 
+                        System.out.println("Done requeuing "+requeueCandidates.length+" documents");
                         if (Logging.threads.isDebugEnabled())
                           Logging.threads.debug("Worker thread done processing "+Integer.toString(documentIDs.length)+"
documents");
                       }
@@ -498,6 +518,7 @@ public class WorkerThread extends Thread
                       // the statistics (which are calculated during the finishlist step
below)
                       if (ingesterCheckList.size() > 0)
                       {
+                        System.out.println("ingesterCheckList has "+ingesterCheckList.size()+"
documents");
                         String[] checkClasses = new String[ingesterCheckList.size()];
                         String[] checkIDs = new String[ingesterCheckList.size()];
                         for (int i = 0; i < checkIDs.length; i++)
@@ -511,6 +532,7 @@ public class WorkerThread extends Thread
                       // Process the finish list!
                       if (finishList.size() > 0)
                       {
+                        System.out.println("Finishing "+finishList.size()+" documents");
                         // In both job types, we have to go through the finishList to figure
out what to do with the documents.
                         // In the case of a document that was aborted, we must requeue it
for immediate reprocessing in BOTH job types.
                         switch (job.getType())
@@ -649,7 +671,8 @@ public class WorkerThread extends Thread
                           throw new ManifoldCFException("Unexpected value for job type: '"+Integer.toString(job.getType())+"'");
                         }
 
-                        // Finally, if we're still alive, mark everything as "processed".
+                        // Finally, if we're still alive, mark everything we finished as
"processed".
+                        System.out.println("Marking "+finishList.size()+" documents as processed");
                         for (QueuedDocument qd : finishList)
                         {
                           qd.setProcessed();
@@ -670,11 +693,13 @@ public class WorkerThread extends Thread
                   }
                   
                   // Now, handle the delete list
+                  System.out.println("Deleting "+deleteList.size()+" documents");
                   processDeleteLists(pipelineSpecificationBasic,connector,connection,jobManager,
                     deleteList,ingester,
                     job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
 
                   // Handle hopcount removal
+                  System.out.println("Doing hopcount removal on "+hopcountremoveList.size()+"
documents");
                   processHopcountRemovalLists(pipelineSpecificationBasic,connector,connection,jobManager,
                     hopcountremoveList,ingester,
                     job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);



Mime
View raw message