Return-Path: X-Original-To: apmail-manifoldcf-commits-archive@www.apache.org Delivered-To: apmail-manifoldcf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E0C0F11735 for ; Tue, 15 Jul 2014 07:14:04 +0000 (UTC) Received: (qmail 99463 invoked by uid 500); 15 Jul 2014 07:14:04 -0000 Delivered-To: apmail-manifoldcf-commits-archive@manifoldcf.apache.org Received: (qmail 99413 invoked by uid 500); 15 Jul 2014 07:14:04 -0000 Mailing-List: contact commits-help@manifoldcf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@manifoldcf.apache.org Delivered-To: mailing list commits@manifoldcf.apache.org Received: (qmail 99366 invoked by uid 99); 15 Jul 2014 07:14:04 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Jul 2014 07:14:04 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Jul 2014 07:14:02 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A7AF723888D7; Tue, 15 Jul 2014 07:13:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1610599 - /manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Date: Tue, 15 Jul 2014 07:13:42 -0000 To: commits@manifoldcf.apache.org From: kwright@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140715071342.A7AF723888D7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kwright Date: Tue Jul 15 07:13:42 2014 New Revision: 1610599 URL: http://svn.apache.org/r1610599 Log: Revamp how service interruptions are handled Modified: manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Modified: manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1610599&r1=1610598&r2=1610599&view=diff ============================================================================== --- manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original) +++ manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Tue Jul 15 07:13:42 2014 @@ -299,9 +299,6 @@ public class WorkerThread extends Thread job.getID()+" connection '"+job.getConnectionName()+"': "+ e.getMessage()); - if (!e.jobInactiveAbort() && e.isAbortOnFail()) - abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause()); - // All documents get requeued, because we never got far enough to make distinctions. All we have to decide // is whether to requeue or abort. List requeueList = new ArrayList(); @@ -317,6 +314,7 @@ public class WorkerThread extends Thread if (e.isAbortOnFail()) { rescanList.add(qd); + abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause()); } else { @@ -380,119 +378,103 @@ public class WorkerThread extends Thread Logging.threads.debug("Worker thread about to process "+Integer.toString(documentIDs.length)+" documents"); // Now, process in bulk -- catching and handling ServiceInterruptions + ServiceInterruption serviceInterruption = null; try { connector.processDocuments(documentIDs,existingVersions,job.getSpecification(),activity,jobType,isDefaultAuthority); - - for (QueuedDocument qd : activeDocuments) - { - // If this document was aborted, then treat it specially. - if (activity.wasDocumentAborted(qd.getDocumentDescription().getDocumentIdentifier())) - { - // Special treatment for aborted documents. - // We ignore the returned version string completely, since it's presumed that processing was not completed for this doc. - // We want to give up immediately on this one, and just requeue it for immediate reprocessing (pending its prereqs being all met). - // Add to the finish list, so it gets requeued. Because the document is already marked as aborted, this should be enough to cause an - // unconditional requeue. - finishList.add(qd); - } - else if (activity.wasDocumentDeleted(qd.getDocumentDescription().getDocumentIdentifier())) - { - deleteList.add(qd); - } - else if (activity.wasDocumentUnchanged(qd.getDocumentDescription().getDocumentIdentifier())) - { - finishList.add(qd); - ingesterCheckList.add(qd.getDocumentDescription().getDocumentIdentifierHash()); - } - else - { - // All documents not specifically called out above are simply finished, since we know they haven't been deleted. - finishList.add(qd); - } - } - - // Flush remaining references into the database! - activity.flush(); - - // "Finish" the documents (removing unneeded carrydown info, etc.) - DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,documentIDHashes,job.getHopcountMode()); - - ManifoldCF.requeueDocumentsDueToCarrydown(jobManager, - requeueCandidates,connector,connection,rt,currentTime); - - if (Logging.threads.isDebugEnabled()) - Logging.threads.debug("Worker thread done processing "+Integer.toString(documentIDs.length)+" documents"); } catch (ServiceInterruption e) { - // This service interruption could have resulted - // after some or all of the documents ingested. But we can figure out what - // documents were processed and which weren't. - // The processed ones will need to go into the PENDINGPURGATORY - // state. - + serviceInterruption = e; if (!e.jobInactiveAbort()) Logging.jobs.warn("Service interruption reported for job "+ job.getID()+" connection '"+job.getConnectionName()+"': "+ e.getMessage()); + } - if (!e.jobInactiveAbort() && e.isAbortOnFail()) - abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause()); + // Flush remaining references into the database! + activity.flush(); - // Mark the current documents to be recrawled in the - // time specified, except for the ones beyond their limits. - // Those will either be deleted, or an exception will be thrown that - // will abort the current job. + // "Finish" the documents (removing unneeded carrydown info, etc.) + // ??? documentIDHashes is ALL documents; shouldn't we just be doing the ones successfully processed? + // Old code basically only called this on successful completion of ALL documents in the set, but is this + // right? Does carrydown and hopcount handling recover from being incomplete? + DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,documentIDHashes,job.getHopcountMode()); - deleteList.clear(); - List requeueList = new ArrayList(); + ManifoldCF.requeueDocumentsDueToCarrydown(jobManager, + requeueCandidates,connector,connection,rt,currentTime); + + if (Logging.threads.isDebugEnabled()) + Logging.threads.debug("Worker thread done processing "+Integer.toString(documentIDs.length)+" documents"); + + // Either way, handle the documents we were supposed to process. But if there was a service interruption, + // and the disposition of the document was unclear, then the document will need to be requeued instead of handled normally. + List requeueList = new ArrayList(); - Set fetchDocuments = new HashSet(); - for (QueuedDocument qd : activeDocuments) + for (QueuedDocument qd : activeDocuments) + { + // If this document was aborted, then treat it specially. + if (activity.wasDocumentAborted(qd.getDocumentDescription().getDocumentIdentifier())) { - fetchDocuments.add(qd.getDocumentDescription().getDocumentIdentifierHash()); + // Special treatment for aborted documents. + // We ignore the returned version string completely, since it's presumed that processing was not completed for this doc. + // We want to give up immediately on this one, and just requeue it for immediate reprocessing (pending its prereqs being all met). + // Add to the finish list, so it gets requeued. Because the document is already marked as aborted, this should be enough to cause an + // unconditional requeue. + finishList.add(qd); } - List newFinishList = new ArrayList(); - for (int i = 0; i < finishList.size(); i++) + else if (activity.wasDocumentDeleted(qd.getDocumentDescription().getDocumentIdentifier())) { - QueuedDocument qd = finishList.get(i); - if (fetchDocuments.contains(qd.getDocumentDescription().getDocumentIdentifierHash())) + deleteList.add(qd); + } + else if (serviceInterruption != null) + { + // Service interruption has precedence over unchanged, because we might have been interrupted while scanning the document + // for references + DocumentDescription dd = qd.getDocumentDescription(); + // Check for hard failure. But no hard failure possible of it's a job inactive abort. + if (!serviceInterruption.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < serviceInterruption.getRetryTime() || + dd.getFailRetryCount() == 0)) { - DocumentDescription dd = qd.getDocumentDescription(); - // Check for hard failure. But no hard failure possible of it's a job inactive abort. - if (!e.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() || - dd.getFailRetryCount() == 0)) + // Treat this as a hard failure. + if (serviceInterruption.isAbortOnFail()) { - // Treat this as a hard failure. - if (e.isAbortOnFail()) - { - rescanList.add(qd); - } - else - { - // We want this particular document to be not included in the - // reprocessing. Therefore, we do the same thing as we would - // if we got back a null version. - deleteList.add(qd); - } + // Make sure that the job aborts. + abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((serviceInterruption.getCause()!=null)?": "+serviceInterruption.getCause().getMessage():""),serviceInterruption.getCause()); + rescanList.add(qd); } else { - // Not a hard failure. Requeue. - requeueList.add(qd); + // Skip the document, rather than failing. + // We want this particular document to be not included in the + // reprocessing. Therefore, we do the same thing as we would + // if we got back a null version. + deleteList.add(qd); } } else - newFinishList.add(qd); + { + // Not a hard failure. Requeue. + requeueList.add(qd); + } } - - // Requeue the documents we've identified - requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(), - e.getFailRetryCount()); - - // We've disposed of all the documents, so finishlist is now clear - finishList = newFinishList; + else if (activity.wasDocumentUnchanged(qd.getDocumentDescription().getDocumentIdentifier())) + { + finishList.add(qd); + ingesterCheckList.add(qd.getDocumentDescription().getDocumentIdentifierHash()); + } + else + { + // All documents not specifically called out above are simply finished, since we know they haven't been deleted. + finishList.add(qd); + } + } + + if (serviceInterruption != null) + { + // Requeue the documents we've identified as needing to be repeated + requeueDocuments(jobManager,requeueList,serviceInterruption.getRetryTime(),serviceInterruption.getFailTime(), + serviceInterruption.getFailRetryCount()); } // Note the documents that have been checked but not reingested. This should happen BEFORE we need