manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1610599 - /manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Date Tue, 15 Jul 2014 07:13:42 GMT
Author: kwright
Date: Tue Jul 15 07:13:42 2014
New Revision: 1610599

URL: http://svn.apache.org/r1610599
Log:
Revamp how service interruptions are handled

Modified:
    manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Modified: manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1610599&r1=1610598&r2=1610599&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
(original)
+++ manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Tue Jul 15 07:13:42 2014
@@ -299,9 +299,6 @@ public class WorkerThread extends Thread
                       job.getID()+" connection '"+job.getConnectionName()+"': "+
                       e.getMessage());
 
-                    if (!e.jobInactiveAbort() && e.isAbortOnFail())
-                      abortOnFail = new ManifoldCFException("Repeated service interruptions
- failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
-
                     // All documents get requeued, because we never got far enough to make
distinctions.  All we have to decide
                     // is whether to requeue or abort.
                     List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>();
@@ -317,6 +314,7 @@ public class WorkerThread extends Thread
                         if (e.isAbortOnFail())
                         {
                           rescanList.add(qd);
+                          abortOnFail = new ManifoldCFException("Repeated service interruptions
- failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
                         }
                         else
                         {
@@ -380,119 +378,103 @@ public class WorkerThread extends Thread
                         Logging.threads.debug("Worker thread about to process "+Integer.toString(documentIDs.length)+"
documents");
 
                       // Now, process in bulk -- catching and handling ServiceInterruptions
+                      ServiceInterruption serviceInterruption = null;
                       try
                       {
                         connector.processDocuments(documentIDs,existingVersions,job.getSpecification(),activity,jobType,isDefaultAuthority);
-                        
-                        for (QueuedDocument qd : activeDocuments)
-                        {
-                          // If this document was aborted, then treat it specially.
-                          if (activity.wasDocumentAborted(qd.getDocumentDescription().getDocumentIdentifier()))
-                          {
-                            // Special treatment for aborted documents.
-                            // We ignore the returned version string completely, since it's
presumed that processing was not completed for this doc.
-                            // We want to give up immediately on this one, and just requeue
it for immediate reprocessing (pending its prereqs being all met).
-                            // Add to the finish list, so it gets requeued.  Because the
document is already marked as aborted, this should be enough to cause an
-                            // unconditional requeue.
-                            finishList.add(qd);
-                          }
-                          else if (activity.wasDocumentDeleted(qd.getDocumentDescription().getDocumentIdentifier()))
-                          {
-                            deleteList.add(qd);
-                          }
-                          else if (activity.wasDocumentUnchanged(qd.getDocumentDescription().getDocumentIdentifier()))
-                          {
-                            finishList.add(qd);
-                            ingesterCheckList.add(qd.getDocumentDescription().getDocumentIdentifierHash());
-                          }
-                          else
-                          {
-                            // All documents not specifically called out above are simply
finished, since we know they haven't been deleted.
-                            finishList.add(qd);
-                          }
-                        }
-                        
-                        // Flush remaining references into the database!
-                        activity.flush();
-
-                        // "Finish" the documents (removing unneeded carrydown info, etc.)
-                        DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,documentIDHashes,job.getHopcountMode());
-
-                        ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,
-                          requeueCandidates,connector,connection,rt,currentTime);
-
-                        if (Logging.threads.isDebugEnabled())
-                          Logging.threads.debug("Worker thread done processing "+Integer.toString(documentIDs.length)+"
documents");
                       }
                       catch (ServiceInterruption e)
                       {
-                        // This service interruption could have resulted
-                        // after some or all of the documents ingested.  But we can figure
out what
-                        // documents were processed and which weren't.
-                        // The processed ones will need to go into the PENDINGPURGATORY
-                        // state.
-
+                        serviceInterruption = e;
                         if (!e.jobInactiveAbort())
                           Logging.jobs.warn("Service interruption reported for job "+
                           job.getID()+" connection '"+job.getConnectionName()+"': "+
                           e.getMessage());
+                      }
 
-                        if (!e.jobInactiveAbort() && e.isAbortOnFail())
-                          abortOnFail = new ManifoldCFException("Repeated service interruptions
- failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
+                      // Flush remaining references into the database!
+                      activity.flush();
 
-                        // Mark the current documents to be recrawled in the
-                        // time specified, except for the ones beyond their limits.
-                        // Those will either be deleted, or an exception will be thrown that
-                        // will abort the current job.
+                      // "Finish" the documents (removing unneeded carrydown info, etc.)
+                      // ??? documentIDHashes is ALL documents; shouldn't we just be doing
the ones successfully processed?
+                      // Old code basically only called this on successful completion of
ALL documents in the set, but is this
+                      // right?  Does carrydown and hopcount handling recover from being
incomplete?
+                      DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,documentIDHashes,job.getHopcountMode());
 
-                        deleteList.clear();
-                        List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>();
+                      ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,
+                        requeueCandidates,connector,connection,rt,currentTime);
+                      
+                      if (Logging.threads.isDebugEnabled())
+                        Logging.threads.debug("Worker thread done processing "+Integer.toString(documentIDs.length)+"
documents");
+                      
+                      // Either way, handle the documents we were supposed to process.  But
if there was a service interruption,
+                      // and the disposition of the document was unclear, then the document
will need to be requeued instead of handled normally.
+                      List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>();
 
-                        Set<String> fetchDocuments = new HashSet<String>();
-                        for (QueuedDocument qd : activeDocuments)
+                      for (QueuedDocument qd : activeDocuments)
+                      {
+                        // If this document was aborted, then treat it specially.
+                        if (activity.wasDocumentAborted(qd.getDocumentDescription().getDocumentIdentifier()))
                         {
-                          fetchDocuments.add(qd.getDocumentDescription().getDocumentIdentifierHash());
+                          // Special treatment for aborted documents.
+                          // We ignore the returned version string completely, since it's
presumed that processing was not completed for this doc.
+                          // We want to give up immediately on this one, and just requeue
it for immediate reprocessing (pending its prereqs being all met).
+                          // Add to the finish list, so it gets requeued.  Because the document
is already marked as aborted, this should be enough to cause an
+                          // unconditional requeue.
+                          finishList.add(qd);
                         }
-                        List<QueuedDocument> newFinishList = new ArrayList<QueuedDocument>();
-                        for (int i = 0; i < finishList.size(); i++)
+                        else if (activity.wasDocumentDeleted(qd.getDocumentDescription().getDocumentIdentifier()))
                         {
-                          QueuedDocument qd = finishList.get(i);
-                          if (fetchDocuments.contains(qd.getDocumentDescription().getDocumentIdentifierHash()))
+                          deleteList.add(qd);
+                        }
+                        else if (serviceInterruption != null)
+                        {
+                          // Service interruption has precedence over unchanged, because
we might have been interrupted while scanning the document
+                          // for references
+                          DocumentDescription dd = qd.getDocumentDescription();
+                          // Check for hard failure.  But no hard failure possible of it's
a job inactive abort.
+                          if (!serviceInterruption.jobInactiveAbort() && (dd.getFailTime()
!= -1L && dd.getFailTime() < serviceInterruption.getRetryTime() ||
+                            dd.getFailRetryCount() == 0))
                           {
-                            DocumentDescription dd = qd.getDocumentDescription();
-                            // Check for hard failure.  But no hard failure possible of it's
a job inactive abort.
-                            if (!e.jobInactiveAbort() && (dd.getFailTime() != -1L
&& dd.getFailTime() < e.getRetryTime() ||
-                              dd.getFailRetryCount() == 0))
+                            // Treat this as a hard failure.
+                            if (serviceInterruption.isAbortOnFail())
                             {
-                              // Treat this as a hard failure.
-                              if (e.isAbortOnFail())
-                              {
-                                rescanList.add(qd);
-                              }
-                              else
-                              {
-                                // We want this particular document to be not included in
the
-                                // reprocessing.  Therefore, we do the same thing as we would
-                                // if we got back a null version.
-                                deleteList.add(qd);
-                              }
+                              // Make sure that the job aborts.
+                              abortOnFail = new ManifoldCFException("Repeated service interruptions
- failure processing document"+((serviceInterruption.getCause()!=null)?": "+serviceInterruption.getCause().getMessage():""),serviceInterruption.getCause());
+                              rescanList.add(qd);
                             }
                             else
                             {
-                              // Not a hard failure.  Requeue.
-                              requeueList.add(qd);
+                              // Skip the document, rather than failing.
+                              // We want this particular document to be not included in the
+                              // reprocessing.  Therefore, we do the same thing as we would
+                              // if we got back a null version.
+                              deleteList.add(qd);
                             }
                           }
                           else
-                            newFinishList.add(qd);
+                          {
+                            // Not a hard failure.  Requeue.
+                            requeueList.add(qd);
+                          }
                         }
-
-                        // Requeue the documents we've identified
-                        requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(),
-                          e.getFailRetryCount());
-
-                        // We've disposed of all the documents, so finishlist is now clear
-                        finishList = newFinishList;
+                        else if (activity.wasDocumentUnchanged(qd.getDocumentDescription().getDocumentIdentifier()))
+                        {
+                          finishList.add(qd);
+                          ingesterCheckList.add(qd.getDocumentDescription().getDocumentIdentifierHash());
+                        }
+                        else
+                        {
+                          // All documents not specifically called out above are simply finished,
since we know they haven't been deleted.
+                          finishList.add(qd);
+                        }
+                      }
+                        
+                      if (serviceInterruption != null)
+                      {
+                        // Requeue the documents we've identified as needing to be repeated
+                        requeueDocuments(jobManager,requeueList,serviceInterruption.getRetryTime(),serviceInterruption.getFailTime(),
+                          serviceInterruption.getFailRetryCount());
                       }
                       
                       // Note the documents that have been checked but not reingested.  This
should happen BEFORE we need



Mime
View raw message