manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1602020 - in /manifoldcf/trunk: connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation...
Date Wed, 11 Jun 2014 21:34:45 GMT
Author: kwright
Date: Wed Jun 11 21:34:45 2014
New Revision: 1602020

URL: http://svn.apache.org/r1602020
Log:
Allow IOException to be thrown through the stack wherever RepositoryDocument may be read,
so that we can rethrow errors reading from the stream and the Repository connector can catch
them.

Modified:
    manifoldcf/trunk/connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ForcedMetadataConnector.java
    manifoldcf/trunk/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java
    manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
    manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
    manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java
    manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnector.java
    manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/output/BaseOutputConnector.java
    manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/transformation/BaseTransformationConnector.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Modified: manifoldcf/trunk/connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ForcedMetadataConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ForcedMetadataConnector.java?rev=1602020&r1=1602019&r2=1602020&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ForcedMetadataConnector.java
(original)
+++ manifoldcf/trunk/connectors/forcedmetadata/connector/src/main/java/org/apache/manifoldcf/agents/transformation/forcedmetadata/ForcedMetadataConnector.java
Wed Jun 11 21:34:45 2014
@@ -121,9 +121,10 @@ public class ForcedMetadataConnector ext
   *@param activities is the handle to an object that the implementer of a pipeline connector
may use to perform operations, such as logging processing activity,
   * or sending a modified document to the next stage in the pipeline.
   *@return the document status (accepted or permanently rejected).
+  *@throws IOException only if there's a stream error reading the document data.
   */
-  public int addOrReplaceDocument(String documentURI, String pipelineDescription, RepositoryDocument
document, String authorityNameString, IOutputAddActivity activities)
-    throws ManifoldCFException, ServiceInterruption
+  public int addOrReplaceDocumentWithException(String documentURI, String pipelineDescription,
RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
+    throws ManifoldCFException, ServiceInterruption, IOException
   {
     // Unpack the forced metadata and add it to the document
     int index = 0;

Modified: manifoldcf/trunk/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java?rev=1602020&r1=1602019&r2=1602020&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java
(original)
+++ manifoldcf/trunk/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java
Wed Jun 11 21:34:45 2014
@@ -59,10 +59,11 @@ public class NullConnector extends org.a
   *@param activities is the handle to an object that the implementer of a pipeline connector
may use to perform operations, such as logging processing activity,
   * or sending a modified document to the next stage in the pipeline.
   *@return the document status (accepted or permanently rejected).
+  *@throws IOException only if there's a stream error reading the document data.
   */
   @Override
-  public int addOrReplaceDocument(String documentURI, String pipelineDescription, RepositoryDocument
document, String authorityNameString, IOutputAddActivity activities)
-    throws ManifoldCFException, ServiceInterruption
+  public int addOrReplaceDocumentWithException(String documentURI, String pipelineDescription,
RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
+    throws ManifoldCFException, ServiceInterruption, IOException
   {
     long startTime = System.currentTimeMillis();
     String resultCode = "OK";
@@ -88,6 +89,12 @@ public class NullConnector extends org.a
       description = e.getMessage();
       throw e;
     }
+    catch (IOException e)
+    {
+      resultCode = "IOEXCEPTION";
+      description = e.getMessage();
+      throw e;
+    }
     finally
     {
       activities.recordActivity(new Long(startTime), ACTIVITY_PROCESS, length, documentURI,

Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1602020&r1=1602019&r2=1602020&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
(original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
Wed Jun 11 21:34:45 2014
@@ -553,14 +553,27 @@ public class IncrementalIngester extends
       Logging.ingest.debug("Recording document '"+docKey+"' for output connection '"+outputConnectionName+"'");
     }
 
-    performIngestion(new ITransformationConnection[0],new String[0],
-      connectionManager.load(outputConnectionName),null,
-      docKey,documentVersion,null,null,null,
-      null,
-      null,
-      recordTime,
-      null,
-      activities);
+    // With a null document URI, this can't throw either ServiceInterruption or IOException
+    try
+    {
+      performIngestion(new ITransformationConnection[0],new String[0],
+        connectionManager.load(outputConnectionName),null,
+        docKey,documentVersion,null,null,null,
+        null,
+        null,
+        recordTime,
+        null,
+        activities);
+    }
+    catch (IOException e)
+    {
+      throw new RuntimeException("Unexpected IOException thrown: "+e.getMessage(),e);
+    }
+    catch (ServiceInterruption e)
+    {
+      throw new RuntimeException("Unexpected ServiceInterruption thrown: "+e.getMessage(),e);
+    }
+
   }
 
   /** Ingest a document.
@@ -581,6 +594,7 @@ public class IncrementalIngester extends
   *@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated).
   */
   @Override
+  @Deprecated
   public boolean documentIngest(String outputConnectionName,
     String identifierClass, String identifierHash,
     String documentVersion,
@@ -623,6 +637,7 @@ public class IncrementalIngester extends
   *@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated).
   */
   @Override
+  @Deprecated
   public boolean documentIngest(String outputConnectionName,
     String identifierClass, String identifierHash,
     String documentVersion,
@@ -634,21 +649,115 @@ public class IncrementalIngester extends
     IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
-    return documentIngest(new String[0],
-      new String[0],
-      outputConnectionName,
-      outputVersion,
-      identifierClass, identifierHash,
-      documentVersion,
-      "",
-      outputVersion,
-      parameterVersion,
-      authorityName,
-      data,
-      ingestTime, documentURI,
-      activities);
+    try
+    {
+      return documentIngest(new String[0],
+        new String[0],
+        outputConnectionName,
+        outputVersion,
+        identifierClass, identifierHash,
+        documentVersion,
+        "",
+        outputVersion,
+        parameterVersion,
+        authorityName,
+        data,
+        ingestTime, documentURI,
+        activities);
+    }
+    catch (IOException e)
+    {
+      handleIOException(e,"fetching");
+      return false;
+    }
   }
   
+  // Standard handling for IOExceptions from reading data
+  protected final static long interruptionRetryTime = 5L*60L*1000L;
+  protected static void handleIOException(IOException e, String context)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    if ((e instanceof InterruptedIOException) && (!(e instanceof java.net.SocketTimeoutException)))
+      throw new ManifoldCFException(e.getMessage(), ManifoldCFException.INTERRUPTED);
+
+    long currentTime = System.currentTimeMillis();
+    
+    if (e instanceof java.net.ConnectException)
+    {
+      // Server isn't up at all.  Try for a brief time then give up.
+      String message = "Server could not be contacted during "+context+": "+e.getMessage();
+      Logging.ingest.warn(message,e);
+      throw new ServiceInterruption(message,
+        e,
+        currentTime + interruptionRetryTime,
+        -1L,
+        3,
+        true);
+    }
+    
+    if (e instanceof java.net.SocketTimeoutException)
+    {
+      String message2 = "Socket timeout exception during "+context+": "+e.getMessage();
+      Logging.ingest.warn(message2,e);
+      throw new ServiceInterruption(message2,
+        e,
+        currentTime + interruptionRetryTime,
+        currentTime + 20L * 60000L,
+        -1,
+        false);
+    }
+      
+    if (e.getClass().getName().equals("java.net.SocketException"))
+    {
+      // In the past we would have treated this as a straight document rejection, and
+      // treated it in the same manner as a 400.  The reasoning is that the server can
+      // perfectly legally send out a 400 and drop the connection immediately thereafter,
+      // this a race condition.
+      // However, Solr 4.0 (or the Jetty version that the example runs on) seems
+      // to have a bug where it drops the connection when two simultaneous documents come
in
+      // at the same time.  This is the final version of Solr 4.0 so we need to deal with
+      // this.
+      if (e.getMessage().toLowerCase(Locale.ROOT).indexOf("broken pipe") != -1 ||
+        e.getMessage().toLowerCase(Locale.ROOT).indexOf("connection reset") != -1 ||
+        e.getMessage().toLowerCase(Locale.ROOT).indexOf("target server failed to respond")
!= -1)
+      {
+        // Treat it as a service interruption, but with a limited number of retries.
+        // In that way we won't burden the user with a huge retry interval; it should
+        // give up fairly quickly, and yet NOT give up if the error was merely transient
+        String message = "Server dropped connection during "+context+": "+e.getMessage();
+        Logging.ingest.warn(message,e);
+        throw new ServiceInterruption(message,
+          e,
+          currentTime + interruptionRetryTime,
+          -1L,
+          3,
+          false);
+      }
+      
+      // Other socket exceptions are service interruptions - but if we keep getting them,
it means 
+      // that a socket timeout is probably set too low to accept this particular document.
 So
+      // we retry for a while, then skip the document.
+      String message2 = "Socket exception during "+context+": "+e.getMessage();
+      Logging.ingest.warn(message2,e);
+      throw new ServiceInterruption(message2,
+        e,
+        currentTime + interruptionRetryTime,
+        currentTime + 20L * 60000L,
+        -1,
+        false);
+    }
+
+    // Otherwise, no idea what the trouble is, so presume that retries might fix it.
+    String message3 = "IO exception during "+context+": "+e.getMessage();
+    Logging.ingest.warn(message3,e);
+    throw new ServiceInterruption(message3,
+      e,
+      currentTime + interruptionRetryTime,
+      currentTime + 2L * 60L * 60000L,
+      -1,
+      true);
+  }
+
   /** Ingest a document.
   * This ingests the document, and notes it.  If this is a repeat ingestion of the document,
this
   * method also REMOVES ALL OLD METADATA.  When complete, the index will contain only the
metadata
@@ -670,7 +779,9 @@ public class IncrementalIngester extends
   *@param documentURI is the URI of the document, which will be used as the key of the document
in the index.
   *@param activities is an object providing a set of methods that the implementer can use
to perform the operation.
   *@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated).
+  *@throws IOException only if data stream throws an IOException.
   */
+  @Override
   public boolean documentIngest(
     String[] transformationConnectionNames,
     String[] transformationDescriptionStrings,
@@ -685,7 +796,7 @@ public class IncrementalIngester extends
     RepositoryDocument data,
     long ingestTime, String documentURI,
     IOutputActivity activities)
-    throws ManifoldCFException, ServiceInterruption
+    throws ManifoldCFException, ServiceInterruption, IOException
   {
     IOutputConnection outputConnection = connectionManager.load(outputConnectionName);
     ITransformationConnection[] transformationConnections = transformationConnectionManager.loadMultiple(transformationConnectionNames);
@@ -714,7 +825,7 @@ public class IncrementalIngester extends
     RepositoryDocument data,
     long ingestTime, String documentURI,
     IOutputActivity activities)
-    throws ManifoldCFException, ServiceInterruption
+    throws ManifoldCFException, ServiceInterruption, IOException
   {
     String outputConnectionName = outputConnection.getName();
     
@@ -1977,7 +2088,7 @@ public class IncrementalIngester extends
     IOutputConnection outputConnection, String outputDescriptionString,
     String documentURI, RepositoryDocument document, String authorityNameString,
     IOutputAddActivity finalActivities)
-    throws ManifoldCFException, ServiceInterruption
+    throws ManifoldCFException, ServiceInterruption, IOException
   {
     // Set indexing date
     document.setIndexingDate(new Date());
@@ -1990,7 +2101,7 @@ public class IncrementalIngester extends
       throw new ServiceInterruption("Pipeline connector not installed",0L);
     try
     {
-      return pipeline.addOrReplaceDocument(documentURI,document,authorityNameString,finalActivities);
+      return pipeline.addOrReplaceDocumentWithException(documentURI,document,authorityNameString,finalActivities);
     }
     finally
     {
@@ -2164,9 +2275,10 @@ public class IncrementalIngester extends
     *@param document is the document data to be processed (handed to the output data store).
     *@param authorityNameString is the authority name string that should be used to qualify
the document's access tokens.
     *@return the document status (accepted or permanently rejected); return codes are listed
in IPipelineConnector.
+    *@throws IOException only if there's an IO error reading the data from the document.
     */
     public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString)
-      throws ManifoldCFException, ServiceInterruption
+      throws ManifoldCFException, ServiceInterruption, IOException
     {
       return activities.sendDocument(documentURI,document,authorityNameString);
     }
@@ -2283,11 +2395,11 @@ public class IncrementalIngester extends
       return entryPoint.getPipelineConnector().checkURLIndexable(entryPoint.getPipelineDescriptionString(),uri,entryPoint.getPipelineCheckActivity());
     }
 
-    public int addOrReplaceDocument(String documentURI, RepositoryDocument document, String
authorityNameString, IOutputAddActivity finalActivity)
-      throws ManifoldCFException, ServiceInterruption
+    public int addOrReplaceDocumentWithException(String documentURI, RepositoryDocument document,
String authorityNameString, IOutputAddActivity finalActivity)
+      throws ManifoldCFException, ServiceInterruption, IOException
     {
       PipelineAddEntryPoint entryPoint = buildAddPipeline(finalActivity);
-      return entryPoint.getPipelineConnector().addOrReplaceDocument(documentURI,entryPoint.getPipelineDescriptionString(),
+      return entryPoint.getPipelineConnector().addOrReplaceDocumentWithException(documentURI,entryPoint.getPipelineDescriptionString(),
         document,authorityNameString,entryPoint.getPipelineAddActivity());
     }
     
@@ -2539,12 +2651,13 @@ public class IncrementalIngester extends
     *@param document is the document data to be processed (handed to the output data store).
     *@param authorityNameString is the authority name string that should be used to qualify
the document's access tokens.
     *@return the document status (accepted or permanently rejected); return codes are listed
in IPipelineConnector.
+    *@throws IOException only if there's an IO error reading the data from the document.
     */
     public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString)
-      throws ManifoldCFException, ServiceInterruption
+      throws ManifoldCFException, ServiceInterruption, IOException
     {
       // This goes to the next pipeline stage.
-      return pipelineConnector.addOrReplaceDocument(documentURI,pipelineDescriptionString,
+      return pipelineConnector.addOrReplaceDocumentWithException(documentURI,pipelineDescriptionString,
         document,authorityNameString,addActivity);
     }
 

Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java?rev=1602020&r1=1602019&r2=1602020&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
(original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
Wed Jun 11 21:34:45 2014
@@ -213,6 +213,7 @@ public interface IIncrementalIngester
   *@param activities is an object providing a set of methods that the implementer can use
to perform the operation.
   *@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated).
   */
+  @Deprecated
   public boolean documentIngest(String outputConnectionName,
     String identifierClass, String identifierHash,
     String documentVersion,
@@ -241,6 +242,7 @@ public interface IIncrementalIngester
   *@param activities is an object providing a set of methods that the implementer can use
to perform the operation.
   *@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated).
   */
+  @Deprecated
   public boolean documentIngest(String outputConnectionName,
     String identifierClass, String identifierHash,
     String documentVersion,
@@ -273,6 +275,7 @@ public interface IIncrementalIngester
   *@param documentURI is the URI of the document, which will be used as the key of the document
in the index.
   *@param activities is an object providing a set of methods that the implementer can use
to perform the operation.
   *@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated).
+  *@throws IOException only if data stream throws an IOException.
   */
   public boolean documentIngest(
     String[] transformationConnectionNames,
@@ -288,7 +291,7 @@ public interface IIncrementalIngester
     RepositoryDocument data,
     long ingestTime, String documentURI,
     IOutputActivity activities)
-    throws ManifoldCFException, ServiceInterruption;
+    throws ManifoldCFException, ServiceInterruption, IOException;
 
   /** Note the fact that we checked a document (and found that it did not need to be ingested,
because the
   * versions agreed).

Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java?rev=1602020&r1=1602019&r2=1602020&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java
(original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java
Wed Jun 11 21:34:45 2014
@@ -18,6 +18,8 @@
 */
 package org.apache.manifoldcf.agents.interfaces;
 
+import java.io.*;
+
 import org.apache.manifoldcf.core.interfaces.*;
 import org.apache.manifoldcf.agents.interfaces.*;
 
@@ -33,8 +35,9 @@ public interface IOutputAddActivity exte
   *@param document is the document data to be processed (handed to the output data store).
   *@param authorityNameString is the authority name string that should be used to qualify
the document's access tokens.
   *@return the document status (accepted or permanently rejected); return codes are listed
in IPipelineConnector.
+  *@throws IOException only if there's an IO error reading the data from the document.
   */
   public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString)
-    throws ManifoldCFException, ServiceInterruption;
+    throws ManifoldCFException, ServiceInterruption, IOException;
 
 }

Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnector.java?rev=1602020&r1=1602019&r2=1602020&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnector.java
(original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnector.java
Wed Jun 11 21:34:45 2014
@@ -112,9 +112,10 @@ public interface IPipelineConnector exte
   *@param activities is the handle to an object that the implementer of a pipeline connector
may use to perform operations, such as logging processing activity,
   * or sending a modified document to the next stage in the pipeline.
   *@return the document status (accepted or permanently rejected).
+  *@throws IOException only if there's a stream error reading the document data.
   */
-  public int addOrReplaceDocument(String documentURI, String pipelineDescription, RepositoryDocument
document, String authorityNameString, IOutputAddActivity activities)
-    throws ManifoldCFException, ServiceInterruption;
+  public int addOrReplaceDocumentWithException(String documentURI, String pipelineDescription,
RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
+    throws ManifoldCFException, ServiceInterruption, IOException;
 
   // UI support methods.
   //

Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/output/BaseOutputConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/output/BaseOutputConnector.java?rev=1602020&r1=1602019&r2=1602020&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/output/BaseOutputConnector.java
(original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/output/BaseOutputConnector.java
Wed Jun 11 21:34:45 2014
@@ -257,10 +257,32 @@ public abstract class BaseOutputConnecto
   *@param outputDescription is the description string that was constructed for this document
by the getOutputDescription() method.
   *@param document is the document data to be processed (handed to the output data store).
   *@param authorityNameString is the name of the authority responsible for authorizing any
access tokens passed in with the repository document.  May be null.
-  *@param activities is the handle to an object that the implementer of an output connector
may use to perform operations, such as logging processing activity.
+  *@param activities is the handle to an object that the implementer of a pipeline connector
may use to perform operations, such as logging processing activity,
+  * or sending a modified document to the next stage in the pipeline.
   *@return the document status (accepted or permanently rejected).
+  *@throws IOException only if there's a stream error reading the document data.
   */
   @Override
+  public int addOrReplaceDocumentWithException(String documentURI, String pipelineDescription,
RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
+    throws ManifoldCFException, ServiceInterruption, IOException
+  {
+    return addOrReplaceDocument(documentURI, pipelineDescription, document, authorityNameString,
activities);
+  }
+
+  /** Add (or replace) a document in the output data store using the connector.
+  * This method presumes that the connector object has been configured, and it is thus able
to communicate with the output data store should that be
+  * necessary.
+  * The OutputSpecification is *not* provided to this method, because the goal is consistency,
and if output is done it must be consistent with the
+  * output description, since that was what was partly used to determine if output should
be taking place.  So it may be necessary for this method to decode
+  * an output description string in order to determine what should be done.
+  *@param documentURI is the URI of the document.  The URI is presumed to be the unique identifier
which the output data store will use to process
+  * and serve the document.  This URI is constructed by the repository connector which fetches
the document, and is thus universal across all output connectors.
+  *@param outputDescription is the description string that was constructed for this document
by the getOutputDescription() method.
+  *@param document is the document data to be processed (handed to the output data store).
+  *@param authorityNameString is the name of the authority responsible for authorizing any
access tokens passed in with the repository document.  May be null.
+  *@param activities is the handle to an object that the implementer of an output connector
may use to perform operations, such as logging processing activity.
+  *@return the document status (accepted or permanently rejected).
+  */
   public int addOrReplaceDocument(String documentURI, String outputDescription, RepositoryDocument
document, String authorityNameString, IOutputAddActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {

Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/transformation/BaseTransformationConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/transformation/BaseTransformationConnector.java?rev=1602020&r1=1602019&r2=1602020&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/transformation/BaseTransformationConnector.java
(original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/transformation/BaseTransformationConnector.java
Wed Jun 11 21:34:45 2014
@@ -153,15 +153,17 @@ public abstract class BaseTransformation
   * an output description string in order to determine what should be done.
   *@param documentURI is the URI of the document.  The URI is presumed to be the unique identifier
which the output data store will use to process
   * and serve the document.  This URI is constructed by the repository connector which fetches
the document, and is thus universal across all output connectors.
-  *@param pipelineDescription is the description string that was constructed for this document
by the getOutputDescription() method.
+  *@param outputDescription is the description string that was constructed for this document
by the getOutputDescription() method.
   *@param document is the document data to be processed (handed to the output data store).
   *@param authorityNameString is the name of the authority responsible for authorizing any
access tokens passed in with the repository document.  May be null.
-  *@param activities is the handle to an object that the implementer of a pipeline connector
may use to perform operations, such as logging processing activity.
+  *@param activities is the handle to an object that the implementer of a pipeline connector
may use to perform operations, such as logging processing activity,
+  * or sending a modified document to the next stage in the pipeline.
   *@return the document status (accepted or permanently rejected).
+  *@throws IOException only if there's a stream error reading the document data.
   */
   @Override
-  public int addOrReplaceDocument(String documentURI, String pipelineDescription, RepositoryDocument
document, String authorityNameString, IOutputAddActivity activities)
-    throws ManifoldCFException, ServiceInterruption
+  public int addOrReplaceDocumentWithException(String documentURI, String pipelineDescription,
RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
+    throws ManifoldCFException, ServiceInterruption, IOException
   {
     return DOCUMENTSTATUS_REJECTED;
   }

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java?rev=1602020&r1=1602019&r2=1602020&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java
(original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java
Wed Jun 11 21:34:45 2014
@@ -18,6 +18,8 @@
 */
 package org.apache.manifoldcf.crawler.interfaces;
 
+import java.io.*;
+
 import org.apache.manifoldcf.core.interfaces.*;
 import org.apache.manifoldcf.agents.interfaces.*;
 
@@ -118,7 +120,22 @@ public interface IProcessActivity extend
   *@param documentURI is the URI to use to retrieve this document from the search interface
(and is
   *       also the unique key in the index).
   *@param data is the document data.  The data is closed after ingestion is complete.
+  *@throws IOException only when data stream reading fails.
+  */
+  public void ingestDocumentWithException(String localIdentifier, String version, String
documentURI, RepositoryDocument data)
+    throws ManifoldCFException, ServiceInterruption, IOException;
+
+  /** Ingest the current document.
+  *@param localIdentifier is the document's local identifier.
+  *@param version is the version of the document, as reported by the getDocumentVersions()
method of the
+  *       corresponding repository connector.
+  *@param documentURI is the URI to use to retrieve this document from the search interface
(and is
+  *       also the unique key in the index).
+  *@param data is the document data.  The data is closed after ingestion is complete.
+  * NOTE: Any data stream IOExceptions will be converted to ManifoldCFExceptions and ServiceInterruptions
+  * according to standard best practices.
   */
+  @Deprecated
   public void ingestDocument(String localIdentifier, String version, String documentURI,
RepositoryDocument data)
     throws ManifoldCFException, ServiceInterruption;
 

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1602020&r1=1602019&r2=1602020&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
(original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Wed Jun 11 21:34:45 2014
@@ -1809,17 +1809,43 @@ public class WorkerThread extends Thread
     }
 
     /** Ingest the current document.
-    *@param documentIdentifier is the document's local identifier.
+    *@param localIdentifier is the document's local identifier.
     *@param version is the version of the document, as reported by the getDocumentVersions()
method of the
     *       corresponding repository connector.
     *@param documentURI is the URI to use to retrieve this document from the search interface
(and is
     *       also the unique key in the index).
     *@param data is the document data.  The data is closed after ingestion is complete.
+    * NOTE: Any data stream IOExceptions will be converted to ManifoldCFExceptions and ServiceInterruptions
+    * according to standard best practices.
     */
     @Override
-    public void ingestDocument(String documentIdentifier, String version, String documentURI,
RepositoryDocument data)
+    @Deprecated
+    public void ingestDocument(String localIdentifier, String version, String documentURI,
RepositoryDocument data)
       throws ManifoldCFException, ServiceInterruption
     {
+      try
+      {
+        ingestDocumentWithException(localIdentifier,version,documentURI,data);
+      }
+      catch (IOException e)
+      {
+        handleIOException(e,"fetching");
+      }
+    }
+
+
+    /** Ingest the current document.
+    *@param documentIdentifier is the document's local identifier.
+    *@param version is the version of the document, as reported by the getDocumentVersions()
method of the
+    *       corresponding repository connector.
+    *@param documentURI is the URI to use to retrieve this document from the search interface
(and is
+    *       also the unique key in the index).
+    *@param data is the document data.  The data is closed after ingestion is complete.
+    *@throws IOException only when data stream reading fails.
+    */
+    public void ingestDocumentWithException(String documentIdentifier, String version, String
documentURI, RepositoryDocument data)
+      throws ManifoldCFException, ServiceInterruption, IOException
+    {
       // We should not get called here if versions agree, unless the repository
       // connector cannot distinguish between versions - in which case it must
       // always ingest (essentially)
@@ -2663,9 +2689,10 @@ public class WorkerThread extends Thread
     *@param document is the document data to be processed (handed to the output data store).
     *@param authorityNameString is the authority name string that should be used to qualify
the document's access tokens.
     *@return the document status (accepted or permanently rejected); return codes are listed
in IPipelineConnector.
+    *@throws IOException only if there's an IO error reading the data from the document.
     */
     public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString)
-      throws ManifoldCFException, ServiceInterruption
+      throws ManifoldCFException, ServiceInterruption, IOException
     {
       // No downstream connection at output connection level.
       return IPipelineConnector.DOCUMENTSTATUS_REJECTED;
@@ -2673,4 +2700,89 @@ public class WorkerThread extends Thread
 
   }
 
+  protected final static long interruptionRetryTime = 5L*60L*1000L;
+  protected static void handleIOException(IOException e, String context)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    if ((e instanceof InterruptedIOException) && (!(e instanceof java.net.SocketTimeoutException)))
+      throw new ManifoldCFException(e.getMessage(), ManifoldCFException.INTERRUPTED);
+
+    long currentTime = System.currentTimeMillis();
+    
+    if (e instanceof java.net.ConnectException)
+    {
+      // Server isn't up at all.  Try for a brief time then give up.
+      String message = "Server could not be contacted during "+context+": "+e.getMessage();
+      Logging.connectors.warn(message,e);
+      throw new ServiceInterruption(message,
+        e,
+        currentTime + interruptionRetryTime,
+        -1L,
+        3,
+        true);
+    }
+    
+    if (e instanceof java.net.SocketTimeoutException)
+    {
+      String message2 = "Socket timeout exception during "+context+": "+e.getMessage();
+      Logging.connectors.warn(message2,e);
+      throw new ServiceInterruption(message2,
+        e,
+        currentTime + interruptionRetryTime,
+        currentTime + 20L * 60000L,
+        -1,
+        false);
+    }
+      
+    if (e.getClass().getName().equals("java.net.SocketException"))
+    {
+      // In the past we would have treated this as a straight document rejection, and
+      // treated it in the same manner as a 400.  The reasoning is that the server can
+      // perfectly legally send out a 400 and drop the connection immediately thereafter,
+      // this a race condition.
+      // However, Solr 4.0 (or the Jetty version that the example runs on) seems
+      // to have a bug where it drops the connection when two simultaneous documents come
in
+      // at the same time.  This is the final version of Solr 4.0 so we need to deal with
+      // this.
+      if (e.getMessage().toLowerCase(Locale.ROOT).indexOf("broken pipe") != -1 ||
+        e.getMessage().toLowerCase(Locale.ROOT).indexOf("connection reset") != -1 ||
+        e.getMessage().toLowerCase(Locale.ROOT).indexOf("target server failed to respond")
!= -1)
+      {
+        // Treat it as a service interruption, but with a limited number of retries.
+        // In that way we won't burden the user with a huge retry interval; it should
+        // give up fairly quickly, and yet NOT give up if the error was merely transient
+        String message = "Server dropped connection during "+context+": "+e.getMessage();
+        Logging.connectors.warn(message,e);
+        throw new ServiceInterruption(message,
+          e,
+          currentTime + interruptionRetryTime,
+          -1L,
+          3,
+          false);
+      }
+      
+      // Other socket exceptions are service interruptions - but if we keep getting them,
it means 
+      // that a socket timeout is probably set too low to accept this particular document.
 So
+      // we retry for a while, then skip the document.
+      String message2 = "Socket exception during "+context+": "+e.getMessage();
+      Logging.connectors.warn(message2,e);
+      throw new ServiceInterruption(message2,
+        e,
+        currentTime + interruptionRetryTime,
+        currentTime + 20L * 60000L,
+        -1,
+        false);
+    }
+
+    // Otherwise, no idea what the trouble is, so presume that retries might fix it.
+    String message3 = "IO exception during "+context+": "+e.getMessage();
+    Logging.connectors.warn(message3,e);
+    throw new ServiceInterruption(message3,
+      e,
+      currentTime + interruptionRetryTime,
+      currentTime + 2L * 60L * 60000L,
+      -1,
+      true);
+  }
+
 }



Mime
View raw message