Return-Path: X-Original-To: apmail-manifoldcf-commits-archive@www.apache.org Delivered-To: apmail-manifoldcf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BB0B611054 for ; Wed, 11 Jun 2014 21:35:09 +0000 (UTC) Received: (qmail 71236 invoked by uid 500); 11 Jun 2014 21:35:09 -0000 Delivered-To: apmail-manifoldcf-commits-archive@manifoldcf.apache.org Received: (qmail 71190 invoked by uid 500); 11 Jun 2014 21:35:09 -0000 Mailing-List: contact commits-help@manifoldcf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@manifoldcf.apache.org Delivered-To: mailing list commits@manifoldcf.apache.org Received: (qmail 71183 invoked by uid 99); 11 Jun 2014 21:35:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Jun 2014 21:35:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Jun 2014 21:35:06 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 43293238897A; Wed, 11 Jun 2014 21:34:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1602020 - in /manifoldcf/trunk: connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation... Date: Wed, 11 Jun 2014 21:34:45 -0000 To: commits@manifoldcf.apache.org From: kwright@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140611213446.43293238897A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kwright Date: Wed Jun 11 21:34:45 2014 New Revision: 1602020 URL: http://svn.apache.org/r1602020 Log: Allow IOException to be thrown through the stack wherever RepositoryDocument may be read, so that we can rethrow errors reading from the stream and the Repository connector can catch them. Modified: manifoldcf/trunk/connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ForcedMetadataConnector.java manifoldcf/trunk/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnector.java manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/output/BaseOutputConnector.java manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/transformation/BaseTransformationConnector.java manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Modified: manifoldcf/trunk/connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ForcedMetadataConnector.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ForcedMetadataConnector.java?rev=1602020&r1=1602019&r2=1602020&view=diff ============================================================================== --- manifoldcf/trunk/connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ForcedMetadataConnector.java (original) +++ manifoldcf/trunk/connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ForcedMetadataConnector.java Wed Jun 11 21:34:45 2014 @@ -121,9 +121,10 @@ public class ForcedMetadataConnector ext *@param activities is the handle to an object that the implementer of a pipeline connector may use to perform operations, such as logging processing activity, * or sending a modified document to the next stage in the pipeline. *@return the document status (accepted or permanently rejected). + *@throws IOException only if there's a stream error reading the document data. */ - public int addOrReplaceDocument(String documentURI, String pipelineDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities) - throws ManifoldCFException, ServiceInterruption + public int addOrReplaceDocumentWithException(String documentURI, String pipelineDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities) + throws ManifoldCFException, ServiceInterruption, IOException { // Unpack the forced metadata and add it to the document int index = 0; Modified: manifoldcf/trunk/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java?rev=1602020&r1=1602019&r2=1602020&view=diff ============================================================================== --- manifoldcf/trunk/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java (original) +++ manifoldcf/trunk/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java Wed Jun 11 21:34:45 2014 @@ -59,10 +59,11 @@ public class NullConnector extends org.a *@param activities is the handle to an object that the implementer of a pipeline connector may use to perform operations, such as logging processing activity, * or sending a modified document to the next stage in the pipeline. *@return the document status (accepted or permanently rejected). + *@throws IOException only if there's a stream error reading the document data. */ @Override - public int addOrReplaceDocument(String documentURI, String pipelineDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities) - throws ManifoldCFException, ServiceInterruption + public int addOrReplaceDocumentWithException(String documentURI, String pipelineDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities) + throws ManifoldCFException, ServiceInterruption, IOException { long startTime = System.currentTimeMillis(); String resultCode = "OK"; @@ -88,6 +89,12 @@ public class NullConnector extends org.a description = e.getMessage(); throw e; } + catch (IOException e) + { + resultCode = "IOEXCEPTION"; + description = e.getMessage(); + throw e; + } finally { activities.recordActivity(new Long(startTime), ACTIVITY_PROCESS, length, documentURI, Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1602020&r1=1602019&r2=1602020&view=diff ============================================================================== --- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original) +++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Wed Jun 11 21:34:45 2014 @@ -553,14 +553,27 @@ public class IncrementalIngester extends Logging.ingest.debug("Recording document '"+docKey+"' for output connection '"+outputConnectionName+"'"); } - performIngestion(new ITransformationConnection[0],new String[0], - connectionManager.load(outputConnectionName),null, - docKey,documentVersion,null,null,null, - null, - null, - recordTime, - null, - activities); + // With a null document URI, this can't throw either ServiceInterruption or IOException + try + { + performIngestion(new ITransformationConnection[0],new String[0], + connectionManager.load(outputConnectionName),null, + docKey,documentVersion,null,null,null, + null, + null, + recordTime, + null, + activities); + } + catch (IOException e) + { + throw new RuntimeException("Unexpected IOException thrown: "+e.getMessage(),e); + } + catch (ServiceInterruption e) + { + throw new RuntimeException("Unexpected ServiceInterruption thrown: "+e.getMessage(),e); + } + } /** Ingest a document. @@ -581,6 +594,7 @@ public class IncrementalIngester extends *@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated). */ @Override + @Deprecated public boolean documentIngest(String outputConnectionName, String identifierClass, String identifierHash, String documentVersion, @@ -623,6 +637,7 @@ public class IncrementalIngester extends *@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated). */ @Override + @Deprecated public boolean documentIngest(String outputConnectionName, String identifierClass, String identifierHash, String documentVersion, @@ -634,21 +649,115 @@ public class IncrementalIngester extends IOutputActivity activities) throws ManifoldCFException, ServiceInterruption { - return documentIngest(new String[0], - new String[0], - outputConnectionName, - outputVersion, - identifierClass, identifierHash, - documentVersion, - "", - outputVersion, - parameterVersion, - authorityName, - data, - ingestTime, documentURI, - activities); + try + { + return documentIngest(new String[0], + new String[0], + outputConnectionName, + outputVersion, + identifierClass, identifierHash, + documentVersion, + "", + outputVersion, + parameterVersion, + authorityName, + data, + ingestTime, documentURI, + activities); + } + catch (IOException e) + { + handleIOException(e,"fetching"); + return false; + } } + // Standard handling for IOExceptions from reading data + protected final static long interruptionRetryTime = 5L*60L*1000L; + protected static void handleIOException(IOException e, String context) + throws ManifoldCFException, ServiceInterruption + { + if ((e instanceof InterruptedIOException) && (!(e instanceof java.net.SocketTimeoutException))) + throw new ManifoldCFException(e.getMessage(), ManifoldCFException.INTERRUPTED); + + long currentTime = System.currentTimeMillis(); + + if (e instanceof java.net.ConnectException) + { + // Server isn't up at all. Try for a brief time then give up. + String message = "Server could not be contacted during "+context+": "+e.getMessage(); + Logging.ingest.warn(message,e); + throw new ServiceInterruption(message, + e, + currentTime + interruptionRetryTime, + -1L, + 3, + true); + } + + if (e instanceof java.net.SocketTimeoutException) + { + String message2 = "Socket timeout exception during "+context+": "+e.getMessage(); + Logging.ingest.warn(message2,e); + throw new ServiceInterruption(message2, + e, + currentTime + interruptionRetryTime, + currentTime + 20L * 60000L, + -1, + false); + } + + if (e.getClass().getName().equals("java.net.SocketException")) + { + // In the past we would have treated this as a straight document rejection, and + // treated it in the same manner as a 400. The reasoning is that the server can + // perfectly legally send out a 400 and drop the connection immediately thereafter, + // this a race condition. + // However, Solr 4.0 (or the Jetty version that the example runs on) seems + // to have a bug where it drops the connection when two simultaneous documents come in + // at the same time. This is the final version of Solr 4.0 so we need to deal with + // this. + if (e.getMessage().toLowerCase(Locale.ROOT).indexOf("broken pipe") != -1 || + e.getMessage().toLowerCase(Locale.ROOT).indexOf("connection reset") != -1 || + e.getMessage().toLowerCase(Locale.ROOT).indexOf("target server failed to respond") != -1) + { + // Treat it as a service interruption, but with a limited number of retries. + // In that way we won't burden the user with a huge retry interval; it should + // give up fairly quickly, and yet NOT give up if the error was merely transient + String message = "Server dropped connection during "+context+": "+e.getMessage(); + Logging.ingest.warn(message,e); + throw new ServiceInterruption(message, + e, + currentTime + interruptionRetryTime, + -1L, + 3, + false); + } + + // Other socket exceptions are service interruptions - but if we keep getting them, it means + // that a socket timeout is probably set too low to accept this particular document. So + // we retry for a while, then skip the document. + String message2 = "Socket exception during "+context+": "+e.getMessage(); + Logging.ingest.warn(message2,e); + throw new ServiceInterruption(message2, + e, + currentTime + interruptionRetryTime, + currentTime + 20L * 60000L, + -1, + false); + } + + // Otherwise, no idea what the trouble is, so presume that retries might fix it. + String message3 = "IO exception during "+context+": "+e.getMessage(); + Logging.ingest.warn(message3,e); + throw new ServiceInterruption(message3, + e, + currentTime + interruptionRetryTime, + currentTime + 2L * 60L * 60000L, + -1, + true); + } + /** Ingest a document. * This ingests the document, and notes it. If this is a repeat ingestion of the document, this * method also REMOVES ALL OLD METADATA. When complete, the index will contain only the metadata @@ -670,7 +779,9 @@ public class IncrementalIngester extends *@param documentURI is the URI of the document, which will be used as the key of the document in the index. *@param activities is an object providing a set of methods that the implementer can use to perform the operation. *@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated). + *@throws IOException only if data stream throws an IOException. */ + @Override public boolean documentIngest( String[] transformationConnectionNames, String[] transformationDescriptionStrings, @@ -685,7 +796,7 @@ public class IncrementalIngester extends RepositoryDocument data, long ingestTime, String documentURI, IOutputActivity activities) - throws ManifoldCFException, ServiceInterruption + throws ManifoldCFException, ServiceInterruption, IOException { IOutputConnection outputConnection = connectionManager.load(outputConnectionName); ITransformationConnection[] transformationConnections = transformationConnectionManager.loadMultiple(transformationConnectionNames); @@ -714,7 +825,7 @@ public class IncrementalIngester extends RepositoryDocument data, long ingestTime, String documentURI, IOutputActivity activities) - throws ManifoldCFException, ServiceInterruption + throws ManifoldCFException, ServiceInterruption, IOException { String outputConnectionName = outputConnection.getName(); @@ -1977,7 +2088,7 @@ public class IncrementalIngester extends IOutputConnection outputConnection, String outputDescriptionString, String documentURI, RepositoryDocument document, String authorityNameString, IOutputAddActivity finalActivities) - throws ManifoldCFException, ServiceInterruption + throws ManifoldCFException, ServiceInterruption, IOException { // Set indexing date document.setIndexingDate(new Date()); @@ -1990,7 +2101,7 @@ public class IncrementalIngester extends throw new ServiceInterruption("Pipeline connector not installed",0L); try { - return pipeline.addOrReplaceDocument(documentURI,document,authorityNameString,finalActivities); + return pipeline.addOrReplaceDocumentWithException(documentURI,document,authorityNameString,finalActivities); } finally { @@ -2164,9 +2275,10 @@ public class IncrementalIngester extends *@param document is the document data to be processed (handed to the output data store). *@param authorityNameString is the authority name string that should be used to qualify the document's access tokens. *@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector. + *@throws IOException only if there's an IO error reading the data from the document. */ public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString) - throws ManifoldCFException, ServiceInterruption + throws ManifoldCFException, ServiceInterruption, IOException { return activities.sendDocument(documentURI,document,authorityNameString); } @@ -2283,11 +2395,11 @@ public class IncrementalIngester extends return entryPoint.getPipelineConnector().checkURLIndexable(entryPoint.getPipelineDescriptionString(),uri,entryPoint.getPipelineCheckActivity()); } - public int addOrReplaceDocument(String documentURI, RepositoryDocument document, String authorityNameString, IOutputAddActivity finalActivity) - throws ManifoldCFException, ServiceInterruption + public int addOrReplaceDocumentWithException(String documentURI, RepositoryDocument document, String authorityNameString, IOutputAddActivity finalActivity) + throws ManifoldCFException, ServiceInterruption, IOException { PipelineAddEntryPoint entryPoint = buildAddPipeline(finalActivity); - return entryPoint.getPipelineConnector().addOrReplaceDocument(documentURI,entryPoint.getPipelineDescriptionString(), + return entryPoint.getPipelineConnector().addOrReplaceDocumentWithException(documentURI,entryPoint.getPipelineDescriptionString(), document,authorityNameString,entryPoint.getPipelineAddActivity()); } @@ -2539,12 +2651,13 @@ public class IncrementalIngester extends *@param document is the document data to be processed (handed to the output data store). *@param authorityNameString is the authority name string that should be used to qualify the document's access tokens. *@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector. + *@throws IOException only if there's an IO error reading the data from the document. */ public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString) - throws ManifoldCFException, ServiceInterruption + throws ManifoldCFException, ServiceInterruption, IOException { // This goes to the next pipeline stage. - return pipelineConnector.addOrReplaceDocument(documentURI,pipelineDescriptionString, + return pipelineConnector.addOrReplaceDocumentWithException(documentURI,pipelineDescriptionString, document,authorityNameString,addActivity); } Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java?rev=1602020&r1=1602019&r2=1602020&view=diff ============================================================================== --- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java (original) +++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java Wed Jun 11 21:34:45 2014 @@ -213,6 +213,7 @@ public interface IIncrementalIngester *@param activities is an object providing a set of methods that the implementer can use to perform the operation. *@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated). */ + @Deprecated public boolean documentIngest(String outputConnectionName, String identifierClass, String identifierHash, String documentVersion, @@ -241,6 +242,7 @@ public interface IIncrementalIngester *@param activities is an object providing a set of methods that the implementer can use to perform the operation. *@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated). */ + @Deprecated public boolean documentIngest(String outputConnectionName, String identifierClass, String identifierHash, String documentVersion, @@ -273,6 +275,7 @@ public interface IIncrementalIngester *@param documentURI is the URI of the document, which will be used as the key of the document in the index. *@param activities is an object providing a set of methods that the implementer can use to perform the operation. *@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated). + *@throws IOException only if data stream throws an IOException. */ public boolean documentIngest( String[] transformationConnectionNames, @@ -288,7 +291,7 @@ public interface IIncrementalIngester RepositoryDocument data, long ingestTime, String documentURI, IOutputActivity activities) - throws ManifoldCFException, ServiceInterruption; + throws ManifoldCFException, ServiceInterruption, IOException; /** Note the fact that we checked a document (and found that it did not need to be ingested, because the * versions agreed). Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java?rev=1602020&r1=1602019&r2=1602020&view=diff ============================================================================== --- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java (original) +++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java Wed Jun 11 21:34:45 2014 @@ -18,6 +18,8 @@ */ package org.apache.manifoldcf.agents.interfaces; +import java.io.*; + import org.apache.manifoldcf.core.interfaces.*; import org.apache.manifoldcf.agents.interfaces.*; @@ -33,8 +35,9 @@ public interface IOutputAddActivity exte *@param document is the document data to be processed (handed to the output data store). *@param authorityNameString is the authority name string that should be used to qualify the document's access tokens. *@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector. + *@throws IOException only if there's an IO error reading the data from the document. */ public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString) - throws ManifoldCFException, ServiceInterruption; + throws ManifoldCFException, ServiceInterruption, IOException; } Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnector.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnector.java?rev=1602020&r1=1602019&r2=1602020&view=diff ============================================================================== --- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnector.java (original) +++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnector.java Wed Jun 11 21:34:45 2014 @@ -112,9 +112,10 @@ public interface IPipelineConnector exte *@param activities is the handle to an object that the implementer of a pipeline connector may use to perform operations, such as logging processing activity, * or sending a modified document to the next stage in the pipeline. *@return the document status (accepted or permanently rejected). + *@throws IOException only if there's a stream error reading the document data. */ - public int addOrReplaceDocument(String documentURI, String pipelineDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities) - throws ManifoldCFException, ServiceInterruption; + public int addOrReplaceDocumentWithException(String documentURI, String pipelineDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities) + throws ManifoldCFException, ServiceInterruption, IOException; // UI support methods. // Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/output/BaseOutputConnector.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/output/BaseOutputConnector.java?rev=1602020&r1=1602019&r2=1602020&view=diff ============================================================================== --- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/output/BaseOutputConnector.java (original) +++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/output/BaseOutputConnector.java Wed Jun 11 21:34:45 2014 @@ -257,10 +257,32 @@ public abstract class BaseOutputConnecto *@param outputDescription is the description string that was constructed for this document by the getOutputDescription() method. *@param document is the document data to be processed (handed to the output data store). *@param authorityNameString is the name of the authority responsible for authorizing any access tokens passed in with the repository document. May be null. - *@param activities is the handle to an object that the implementer of an output connector may use to perform operations, such as logging processing activity. + *@param activities is the handle to an object that the implementer of a pipeline connector may use to perform operations, such as logging processing activity, + * or sending a modified document to the next stage in the pipeline. *@return the document status (accepted or permanently rejected). + *@throws IOException only if there's a stream error reading the document data. */ @Override + public int addOrReplaceDocumentWithException(String documentURI, String pipelineDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities) + throws ManifoldCFException, ServiceInterruption, IOException + { + return addOrReplaceDocument(documentURI, pipelineDescription, document, authorityNameString, activities); + } + + /** Add (or replace) a document in the output data store using the connector. + * This method presumes that the connector object has been configured, and it is thus able to communicate with the output data store should that be + * necessary. + * The OutputSpecification is *not* provided to this method, because the goal is consistency, and if output is done it must be consistent with the + * output description, since that was what was partly used to determine if output should be taking place. So it may be necessary for this method to decode + * an output description string in order to determine what should be done. + *@param documentURI is the URI of the document. The URI is presumed to be the unique identifier which the output data store will use to process + * and serve the document. This URI is constructed by the repository connector which fetches the document, and is thus universal across all output connectors. + *@param outputDescription is the description string that was constructed for this document by the getOutputDescription() method. + *@param document is the document data to be processed (handed to the output data store). + *@param authorityNameString is the name of the authority responsible for authorizing any access tokens passed in with the repository document. May be null. + *@param activities is the handle to an object that the implementer of an output connector may use to perform operations, such as logging processing activity. + *@return the document status (accepted or permanently rejected). + */ public int addOrReplaceDocument(String documentURI, String outputDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities) throws ManifoldCFException, ServiceInterruption { Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/transformation/BaseTransformationConnector.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/transformation/BaseTransformationConnector.java?rev=1602020&r1=1602019&r2=1602020&view=diff ============================================================================== --- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/transformation/BaseTransformationConnector.java (original) +++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/transformation/BaseTransformationConnector.java Wed Jun 11 21:34:45 2014 @@ -153,15 +153,17 @@ public abstract class BaseTransformation * an output description string in order to determine what should be done. *@param documentURI is the URI of the document. The URI is presumed to be the unique identifier which the output data store will use to process * and serve the document. This URI is constructed by the repository connector which fetches the document, and is thus universal across all output connectors. - *@param pipelineDescription is the description string that was constructed for this document by the getOutputDescription() method. + *@param outputDescription is the description string that was constructed for this document by the getOutputDescription() method. *@param document is the document data to be processed (handed to the output data store). *@param authorityNameString is the name of the authority responsible for authorizing any access tokens passed in with the repository document. May be null. - *@param activities is the handle to an object that the implementer of a pipeline connector may use to perform operations, such as logging processing activity. + *@param activities is the handle to an object that the implementer of a pipeline connector may use to perform operations, such as logging processing activity, + * or sending a modified document to the next stage in the pipeline. *@return the document status (accepted or permanently rejected). + *@throws IOException only if there's a stream error reading the document data. */ @Override - public int addOrReplaceDocument(String documentURI, String pipelineDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities) - throws ManifoldCFException, ServiceInterruption + public int addOrReplaceDocumentWithException(String documentURI, String pipelineDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities) + throws ManifoldCFException, ServiceInterruption, IOException { return DOCUMENTSTATUS_REJECTED; } Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java?rev=1602020&r1=1602019&r2=1602020&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java Wed Jun 11 21:34:45 2014 @@ -18,6 +18,8 @@ */ package org.apache.manifoldcf.crawler.interfaces; +import java.io.*; + import org.apache.manifoldcf.core.interfaces.*; import org.apache.manifoldcf.agents.interfaces.*; @@ -118,7 +120,22 @@ public interface IProcessActivity extend *@param documentURI is the URI to use to retrieve this document from the search interface (and is * also the unique key in the index). *@param data is the document data. The data is closed after ingestion is complete. + *@throws IOException only when data stream reading fails. + */ + public void ingestDocumentWithException(String localIdentifier, String version, String documentURI, RepositoryDocument data) + throws ManifoldCFException, ServiceInterruption, IOException; + + /** Ingest the current document. + *@param localIdentifier is the document's local identifier. + *@param version is the version of the document, as reported by the getDocumentVersions() method of the + * corresponding repository connector. + *@param documentURI is the URI to use to retrieve this document from the search interface (and is + * also the unique key in the index). + *@param data is the document data. The data is closed after ingestion is complete. + * NOTE: Any data stream IOExceptions will be converted to ManifoldCFExceptions and ServiceInterruptions + * according to standard best practices. */ + @Deprecated public void ingestDocument(String localIdentifier, String version, String documentURI, RepositoryDocument data) throws ManifoldCFException, ServiceInterruption; Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1602020&r1=1602019&r2=1602020&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Wed Jun 11 21:34:45 2014 @@ -1809,17 +1809,43 @@ public class WorkerThread extends Thread } /** Ingest the current document. - *@param documentIdentifier is the document's local identifier. + *@param localIdentifier is the document's local identifier. *@param version is the version of the document, as reported by the getDocumentVersions() method of the * corresponding repository connector. *@param documentURI is the URI to use to retrieve this document from the search interface (and is * also the unique key in the index). *@param data is the document data. The data is closed after ingestion is complete. + * NOTE: Any data stream IOExceptions will be converted to ManifoldCFExceptions and ServiceInterruptions + * according to standard best practices. */ @Override - public void ingestDocument(String documentIdentifier, String version, String documentURI, RepositoryDocument data) + @Deprecated + public void ingestDocument(String localIdentifier, String version, String documentURI, RepositoryDocument data) throws ManifoldCFException, ServiceInterruption { + try + { + ingestDocumentWithException(localIdentifier,version,documentURI,data); + } + catch (IOException e) + { + handleIOException(e,"fetching"); + } + } + + + /** Ingest the current document. + *@param documentIdentifier is the document's local identifier. + *@param version is the version of the document, as reported by the getDocumentVersions() method of the + * corresponding repository connector. + *@param documentURI is the URI to use to retrieve this document from the search interface (and is + * also the unique key in the index). + *@param data is the document data. The data is closed after ingestion is complete. + *@throws IOException only when data stream reading fails. + */ + public void ingestDocumentWithException(String documentIdentifier, String version, String documentURI, RepositoryDocument data) + throws ManifoldCFException, ServiceInterruption, IOException + { // We should not get called here if versions agree, unless the repository // connector cannot distinguish between versions - in which case it must // always ingest (essentially) @@ -2663,9 +2689,10 @@ public class WorkerThread extends Thread *@param document is the document data to be processed (handed to the output data store). *@param authorityNameString is the authority name string that should be used to qualify the document's access tokens. *@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector. + *@throws IOException only if there's an IO error reading the data from the document. */ public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString) - throws ManifoldCFException, ServiceInterruption + throws ManifoldCFException, ServiceInterruption, IOException { // No downstream connection at output connection level. return IPipelineConnector.DOCUMENTSTATUS_REJECTED; @@ -2673,4 +2700,89 @@ public class WorkerThread extends Thread } + protected final static long interruptionRetryTime = 5L*60L*1000L; + protected static void handleIOException(IOException e, String context) + throws ManifoldCFException, ServiceInterruption + { + if ((e instanceof InterruptedIOException) && (!(e instanceof java.net.SocketTimeoutException))) + throw new ManifoldCFException(e.getMessage(), ManifoldCFException.INTERRUPTED); + + long currentTime = System.currentTimeMillis(); + + if (e instanceof java.net.ConnectException) + { + // Server isn't up at all. Try for a brief time then give up. + String message = "Server could not be contacted during "+context+": "+e.getMessage(); + Logging.connectors.warn(message,e); + throw new ServiceInterruption(message, + e, + currentTime + interruptionRetryTime, + -1L, + 3, + true); + } + + if (e instanceof java.net.SocketTimeoutException) + { + String message2 = "Socket timeout exception during "+context+": "+e.getMessage(); + Logging.connectors.warn(message2,e); + throw new ServiceInterruption(message2, + e, + currentTime + interruptionRetryTime, + currentTime + 20L * 60000L, + -1, + false); + } + + if (e.getClass().getName().equals("java.net.SocketException")) + { + // In the past we would have treated this as a straight document rejection, and + // treated it in the same manner as a 400. The reasoning is that the server can + // perfectly legally send out a 400 and drop the connection immediately thereafter, + // this a race condition. + // However, Solr 4.0 (or the Jetty version that the example runs on) seems + // to have a bug where it drops the connection when two simultaneous documents come in + // at the same time. This is the final version of Solr 4.0 so we need to deal with + // this. + if (e.getMessage().toLowerCase(Locale.ROOT).indexOf("broken pipe") != -1 || + e.getMessage().toLowerCase(Locale.ROOT).indexOf("connection reset") != -1 || + e.getMessage().toLowerCase(Locale.ROOT).indexOf("target server failed to respond") != -1) + { + // Treat it as a service interruption, but with a limited number of retries. + // In that way we won't burden the user with a huge retry interval; it should + // give up fairly quickly, and yet NOT give up if the error was merely transient + String message = "Server dropped connection during "+context+": "+e.getMessage(); + Logging.connectors.warn(message,e); + throw new ServiceInterruption(message, + e, + currentTime + interruptionRetryTime, + -1L, + 3, + false); + } + + // Other socket exceptions are service interruptions - but if we keep getting them, it means + // that a socket timeout is probably set too low to accept this particular document. So + // we retry for a while, then skip the document. + String message2 = "Socket exception during "+context+": "+e.getMessage(); + Logging.connectors.warn(message2,e); + throw new ServiceInterruption(message2, + e, + currentTime + interruptionRetryTime, + currentTime + 20L * 60000L, + -1, + false); + } + + // Otherwise, no idea what the trouble is, so presume that retries might fix it. + String message3 = "IO exception during "+context+": "+e.getMessage(); + Logging.connectors.warn(message3,e); + throw new ServiceInterruption(message3, + e, + currentTime + interruptionRetryTime, + currentTime + 2L * 60L * 60000L, + -1, + true); + } + }