manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1427449 - /manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
Date Tue, 01 Jan 2013 15:45:21 GMT
Author: kwright
Date: Tue Jan  1 15:45:20 2013
New Revision: 1427449

URL: http://svn.apache.org/viewvc?rev=1427449&view=rev
Log:
Put proper handling in place for all exceptions.

Modified:
    manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java

Modified: manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java?rev=1427449&r1=1427448&r2=1427449&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
(original)
+++ manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
Tue Jan  1 15:45:20 2013
@@ -64,6 +64,7 @@ import org.apache.solr.client.solrj.resp
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.SolrException;
 
 
 /**
@@ -261,73 +262,135 @@ public class HttpPoster
     if (Logging.ingest.isDebugEnabled())
       Logging.ingest.debug("commitPost()");
 
-    int ioErrorRetry = 5;
-    while (true)
+    // Open a socket to ingest, and to the response stream to get the post result
+    try
     {
-      // Open a socket to ingest, and to the response stream to get the post result
+      CommitThread t = new CommitThread();
       try
       {
-        CommitThread t = new CommitThread();
-        try
-        {
-          t.start();
-          t.join();
+        t.start();
+        t.join();
 
-          Throwable thr = t.getException();
-          if (thr != null)
-          {
-            if (thr instanceof SolrServerException)
-              throw (SolrServerException)thr;
-            if (thr instanceof IOException)
-              throw (IOException)thr;
-            if (thr instanceof RuntimeException)
-              throw (RuntimeException)thr;
-            else
-              throw (Error)thr;
-          }
-          return;
-        }
-        catch (InterruptedException e)
-        {
-          t.interrupt();
-          throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
-        }
-      }
-      catch (SolrServerException e)
-      {
-        throw new ManifoldCFException("Solr exception during commit: "+e.getMessage(),e);
-      }
-      catch (IOException ioe)
-      {
-        if (ioErrorRetry == 0)
+        Throwable thr = t.getException();
+        if (thr != null)
         {
-          long currentTime = System.currentTimeMillis();
-          throw new ServiceInterruption("IO exception committing: "+ioe.getMessage(),
-            ioe,
-            currentTime + interruptionRetryTime,
-            currentTime + 2L * 60L * 60000L,
-            -1,
-            true);
+          if (thr instanceof SolrServerException)
+            throw (SolrServerException)thr;
+          if (thr instanceof IOException)
+            throw (IOException)thr;
+          if (thr instanceof RuntimeException)
+            throw (RuntimeException)thr;
+          else
+            throw (Error)thr;
         }
-      }
-      
-      // Go back around again!
-      // Sleep for a time, and retry
-      try
-      {
-        ManifoldCF.sleep(10000L);
+        return;
       }
       catch (InterruptedException e)
       {
-        throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+        t.interrupt();
+        throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
       }
-      ioErrorRetry--;
-
     }
+    catch (SolrServerException e)
+    {
+      handleSolrServerException(e, "commit");
+      return;
+    }
+    catch (SolrException e)
+    {
+      handleSolrException(e, "commit");
+      return;
+    }
+    catch (IOException ioe)
+    {
+      handleIOException(ioe, "commit");
+      return;
+    }
+  }
+  
+  /** Handle a SolrServerException.
+  * These exceptions seem to be catch-all exceptions having to do either with misconfiguration
or
+  * with underlying IO exceptions.
+  * If this method doesn't throw an exception, it means that the exception should be interpreted
+  * as meaning that the document or action is illegal and should not be repeated.
+  */
+  protected static void handleSolrServerException(SolrServerException e, String context)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    Throwable childException = e.getCause();
+    if (childException instanceof IOException)
+    {
+      handleIOException((IOException)childException, context);
+      return;
+    }
+    throw new ManifoldCFException("Unhandled SolrServerException: "+e.getMessage());
+  }
 
+  /** Handle a SolrException.
+  * These exceptions are mainly Http errors having to do with actual responses from Solr.
+  * If this method doesn't throw an exception, it means that the exception should be interpreted
+  * as meaning that the document or action is illegal and should not be repeated.
+  */
+  protected static void handleSolrException(SolrException e, String context)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    // Use the exception text to determine the proper result.
+    if (e.code() == 500 && e.getMessage().indexOf("org.apache.tika.exception.TikaException")
!= -1)
+      // Can't process the document, so don't keep trying.
+      return;
+
+    // If the code is in the 400 range, the document will never be accepted, so indicate
that.
+    if (e.code() >= 400 && e.code() < 500)
+      return;
+    
+    // The only other kind of return code we know how to handle is 50x.
+    // For these, we should retry for a while.
+    if (e.code() == 500)
+    {
+      long currentTime = System.currentTimeMillis();
+      throw new ServiceInterruption("Solr exception during "+context+" ("+e.code()+"): "+e.getMessage(),
+        e,
+        currentTime + interruptionRetryTime,
+        currentTime + 2L * 60L * 60000L,
+        -1,
+        true);
+    }
+    
+    // Unknown code: end the job.
+    throw new ManifoldCFException("Unhandled Solr exception during "+context+" ("+e.code()+"):
"+e.getMessage());
   }
   
+  /** Handle an IOException.
+  * I'm not actually sure where these exceptions come from in SolrJ, but we handle them
+  * as real I/O errors, meaning they should be retried.
+  */
+  protected static void handleIOException(IOException e, String context)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    if ((e instanceof InterruptedIOException) && (!(e instanceof java.net.SocketTimeoutException)))
+      throw new ManifoldCFException(e.getMessage(), ManifoldCFException.INTERRUPTED);
 
+    // Intercept "broken pipe" exception, since that seems to be what we get if the ingestion
API kills the socket right after a 400 goes out.
+    // Basically, we have no choice but to interpret that in the same manner as a 400, since
no matter how we do it, it's a race and the 'broken pipe'
+    // result is always possible.  So we might as well expect it and treat it properly.
+    if (e.getClass().getName().equals("java.net.SocketException") && e.getMessage().toLowerCase().indexOf("broken
pipe") != -1)
+    {
+      // We've seen what looks like the ingestion interface forcibly closing the socket.
+      // We *choose* to interpret this just like a 400 response.  However, we log in the
history using a different code,
+      // since we really don't know what happened for sure.
+      return;
+    }
+    
+    // Otherwise, presume that retries might fix it.
+    long currentTime = System.currentTimeMillis();
+    throw new ServiceInterruption("IO exception during "+context+": "+e.getMessage(),
+      e,
+      currentTime + interruptionRetryTime,
+      currentTime + 2L * 60L * 60000L,
+      -1,
+      true);
+  }
+  
   /**
   * Post the input stream to ingest
   * @param documentURI is the document's uri.
@@ -360,93 +423,52 @@ public class HttpPoster
     String[] acls = convertACL(document.getACL(),authorityNameString,activities);
     String[] denyAcls = convertACL(document.getDenyACL(),authorityNameString,activities);
     
-    // This flag keeps track of whether we read anything from the input stream yet.
-    // If not, we can retry here.  If so, we have to reschedule.
-    boolean readFromDocumentStreamYet = false;
-    int ioErrorRetry = 3;
-
-    while (true)
+    try
     {
+      IngestThread t = new IngestThread(documentURI,document,arguments,sourceTargets,shareAcls,shareDenyAcls,acls,denyAcls,commitWithin);
       try
       {
-        IngestThread t = new IngestThread(documentURI,document,arguments,sourceTargets,shareAcls,shareDenyAcls,acls,denyAcls,commitWithin);
-        try
-        {
-          t.start();
-          t.join();
+        t.start();
+        t.join();
 
-          // Log the activity, if any, regardless of any exception
-          if (t.getActivityCode() != null)
-            activities.recordActivity(t.getActivityStart(),SolrConnector.INGEST_ACTIVITY,t.getActivityBytes(),documentURI,t.getActivityCode(),t.getActivityDetails());
-
-          readFromDocumentStreamYet = (readFromDocumentStreamYet || t.getReadFromDocumentStreamYet());
-
-          Throwable thr = t.getException();
-          if (thr != null)
-          {
-            if (thr instanceof SolrServerException)
-              throw (SolrServerException)thr;
-            if (thr instanceof IOException)
-              throw (IOException)thr;
-            if (thr instanceof RuntimeException)
-              throw (RuntimeException)thr;
-            else
-              throw (Error)thr;
-          }
-          return t.getRval();
-        }
-        catch (InterruptedException e)
-        {
-          t.interrupt();
-          throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
-        }
-      }
-      catch (SolrServerException e)
-      {
-        throw new ManifoldCFException("Solr exception during indexing: "+e.getMessage(),e);
-      }
-      catch (java.net.SocketTimeoutException ioe)
-      {
-        if (readFromDocumentStreamYet || ioErrorRetry == 0)
-        {
-          // If this continues, we should indeed abort the job.  Retries should not go on
indefinitely either; 2 hours is plenty
-          long currentTime = System.currentTimeMillis();
-          throw new ServiceInterruption("IO error indexing: "+ioe.getMessage()+"; ingestion
will be retried again later",
-            ioe,
-            currentTime + interruptionRetryTime,
-            currentTime + 2L * 60L * 60000L,
-            -1,
-            true);
-        }
-      }
-      catch (IOException ioe)
-      {
-        if (readFromDocumentStreamYet || ioErrorRetry == 0)
-        {
-          // If this continues, we should indeed abort the job.  Retries should not go on
indefinitely either; 2 hours is plenty
-          long currentTime = System.currentTimeMillis();
-          throw new ServiceInterruption("IO error ingesting document: "+ioe.getMessage()+";
ingestion will be retried again later",
-            ioe,
-            currentTime + interruptionRetryTime,
-            currentTime + 2L * 60L * 60000L,
-            -1,
-            true);
+        // Log the activity, if any, regardless of any exception
+        if (t.getActivityCode() != null)
+          activities.recordActivity(t.getActivityStart(),SolrConnector.INGEST_ACTIVITY,t.getActivityBytes(),documentURI,t.getActivityCode(),t.getActivityDetails());
+
+        Throwable thr = t.getException();
+        if (thr != null)
+        {
+          if (thr instanceof SolrServerException)
+            throw (SolrServerException)thr;
+          if (thr instanceof IOException)
+            throw (IOException)thr;
+          if (thr instanceof RuntimeException)
+            throw (RuntimeException)thr;
+          else
+            throw (Error)thr;
         }
-      }
-
-      // Sleep for a time, and retry
-      try
-      {
-        ManifoldCF.sleep(10000L);
+        return t.getRval();
       }
       catch (InterruptedException e)
       {
+        t.interrupt();
         throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
       }
-      ioErrorRetry--;
-
-      // Go back around again!
-
+    }
+    catch (SolrServerException e)
+    {
+      handleSolrServerException(e, "indexing");
+      return false;
+    }
+    catch (SolrException e)
+    {
+      handleSolrException(e, "indexing");
+      return false;
+    }
+    catch (IOException ioe)
+    {
+      handleIOException(ioe, "indexing");
+      return false;
     }
 
   }
@@ -459,68 +481,49 @@ public class HttpPoster
     if (Logging.ingest.isDebugEnabled())
       Logging.ingest.debug("checkPost()");
 
-    int ioErrorRetry = 5;
-    while (true)
+    // Open a socket to ingest, and to the response stream to get the post result
+    try
     {
-      // Open a socket to ingest, and to the response stream to get the post result
+      StatusThread t = new StatusThread();
       try
       {
-        StatusThread t = new StatusThread();
-        try
-        {
-          t.start();
-          t.join();
+        t.start();
+        t.join();
 
-          Throwable thr = t.getException();
-          if (thr != null)
-          {
-            if (thr instanceof SolrServerException)
-              throw (SolrServerException)thr;
-            if (thr instanceof IOException)
-              throw (IOException)thr;
-            if (thr instanceof RuntimeException)
-              throw (RuntimeException)thr;
-            else
-              throw (Error)thr;
-          }
-          return;
-        }
-        catch (InterruptedException e)
+        Throwable thr = t.getException();
+        if (thr != null)
         {
-          t.interrupt();
-          throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
+          if (thr instanceof SolrServerException)
+            throw (SolrServerException)thr;
+          if (thr instanceof IOException)
+            throw (IOException)thr;
+          if (thr instanceof RuntimeException)
+            throw (RuntimeException)thr;
+          else
+            throw (Error)thr;
         }
-      }
-      catch (SolrServerException e)
-      {
-        throw new ManifoldCFException("Solr exception during check: "+e.getMessage(),e);
-      }
-      catch (IOException ioe)
-      {
-        if (ioErrorRetry == 0)
-        {
-          long currentTime = System.currentTimeMillis();
-          throw new ServiceInterruption("IO exception checking: "+ioe.getMessage(),
-            ioe,
-            currentTime + interruptionRetryTime,
-            currentTime + 2L * 60L * 60000L,
-            -1,
-            true);
-        }
-      }
-
-      // Go back around again!
-      // Sleep for a time, and retry
-      try
-      {
-        ManifoldCF.sleep(10000L);
+        return;
       }
       catch (InterruptedException e)
       {
-        throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+        t.interrupt();
+        throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
       }
-      ioErrorRetry--;
-
+    }
+    catch (SolrServerException e)
+    {
+      handleSolrServerException(e, "check");
+      return;
+    }
+    catch (SolrException e)
+    {
+      handleSolrException(e, "check");
+      return;
+    }
+    catch (IOException ioe)
+    {
+      handleIOException(ioe, "check");
+      return;
     }
 
   }
@@ -534,73 +537,52 @@ public class HttpPoster
     if (Logging.ingest.isDebugEnabled())
       Logging.ingest.debug("deletePost(): '" + documentURI + "'");
 
-    int ioErrorRetry = 5;
-    while (true)
+    try
     {
+      DeleteThread t = new DeleteThread(documentURI);
       try
       {
-        DeleteThread t = new DeleteThread(documentURI);
-        try
-        {
-          t.start();
-          t.join();
-
-          // Log the activity, if any, regardless of any exception
-          if (t.getActivityCode() != null)
-            activities.recordActivity(t.getActivityStart(),SolrConnector.REMOVE_ACTIVITY,null,documentURI,t.getActivityCode(),t.getActivityDetails());
+        t.start();
+        t.join();
 
-          Throwable thr = t.getException();
-          if (thr != null)
-          {
-            if (thr instanceof SolrServerException)
-              throw (SolrServerException)thr;
-            if (thr instanceof IOException)
-              throw (IOException)thr;
-            if (thr instanceof RuntimeException)
-              throw (RuntimeException)thr;
-            else
-              throw (Error)thr;
-          }
-          return;
+        // Log the activity, if any, regardless of any exception
+        if (t.getActivityCode() != null)
+          activities.recordActivity(t.getActivityStart(),SolrConnector.REMOVE_ACTIVITY,null,documentURI,t.getActivityCode(),t.getActivityDetails());
+
+        Throwable thr = t.getException();
+        if (thr != null)
+        {
+          if (thr instanceof SolrServerException)
+            throw (SolrServerException)thr;
+          if (thr instanceof IOException)
+            throw (IOException)thr;
+          if (thr instanceof RuntimeException)
+            throw (RuntimeException)thr;
+          else
+            throw (Error)thr;
         }
-        catch (InterruptedException e)
-        {
-          t.interrupt();
-          throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
-        }
-      }
-      catch (SolrServerException e)
-      {
-        throw new ManifoldCFException("Solr exception during delete: "+e.getMessage(),e);
-      }
-      catch (IOException ioe)
-      {
-        if (ioErrorRetry == 0)
-        {
-          long currentTime = System.currentTimeMillis();
-          throw new ServiceInterruption("IO exception deleting: "+ioe.getMessage()+"; deletion
will be retried again later",
-            ioe,
-            currentTime + interruptionRetryTime,
-            currentTime + 2L * 60L * 60000L,
-            -1,
-            true);
-        }
-        // Fall through and recycle
-      }
-
-      // Go back around again!
-      // Sleep for a time, and retry
-      try
-      {
-        ManifoldCF.sleep(10000L);
+        return;
       }
       catch (InterruptedException e)
       {
-        throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+        t.interrupt();
+        throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
       }
-
-      ioErrorRetry--;
-
+    }
+    catch (SolrServerException e)
+    {
+      handleSolrServerException(e, "delete");
+      return;
+    }
+    catch (SolrException e)
+    {
+      handleSolrException(e, "delete");
+      return;
+    }
+    catch (IOException ioe)
+    {
+      handleIOException(ioe, "delete");
+      return;
     }
 
   }
@@ -776,57 +758,27 @@ public class HttpPoster
             activityCode = "FAILED";
             activityDetails = e.getMessage();
             
-            // Use the exception text to determine the proper result.
-            if (e.getMessage().indexOf("org.apache.tika.exception.TikaException") != -1)
-            {
-              // Can't process the document, so don't keep trying.
-              rval = false;
-              return;
-            }
-
-            // Otherwise, abort the job.  This is not the right thing to do probably, but
we
-            // need to know SolrJ's exception cases before we can get this logic right.
-            // MHL
+            // Rethrow; will interpret at a higher level
+            throw e;
+          }
+          catch (SolrException e)
+          {
+            // Log what happened to us
+            activityStart = new Long(fullStartTime);
+            activityBytes = new Long(length);
+            activityCode = Integer.toString(e.code());
+            activityDetails = e.getMessage();
+            
+            // Rethrow; we'll interpret at the next level
             throw e;
           }
-        }
-        catch (java.net.SocketTimeoutException ioe)
-        {
-          // These are just like IO errors, but since they are derived from InterruptedIOException,
they have to be caught first.
-          // Log the error
-          Logging.ingest.warn("Error indexing into Solr: "+ioe.getMessage(),ioe);
-
-          activityStart = new Long(fullStartTime);
-          activityCode = "SOCKET TIMEOUT";
-          activityDetails = ioe.getMessage();
-
-          throw ioe;
-        }
-        catch (InterruptedIOException e)
-        {
-          return;
         }
         catch (IOException ioe)
         {
-          activityStart = new Long(fullStartTime);
-
-          // Intercept "broken pipe" exception, since that seems to be what we get if the
ingestion API kills the socket right after a 400 goes out.
-          // Basically, we have no choice but to interpret that in the same manner as a 400,
since no matter how we do it, it's a race and the 'broken pipe'
-          // result is always possible.  So we might as well expect it and treat it properly.
-          if (ioe.getClass().getName().equals("java.net.SocketException") && ioe.getMessage().toLowerCase().indexOf("broken
pipe") != -1)
-          {
-            // We've seen what looks like the ingestion interface forcibly closing the socket.
-            // We *choose* to interpret this just like a 400 response.  However, we log in
the history using a different code,
-            // since we really don't know what happened for sure.
-            // Record the attempt
-
-            activityCode = "SOCKET CLOSE";
-            activityDetails = "Presuming an indexing rejection: "+ioe.getMessage();
-            rval = false;
+          if ((ioe instanceof InterruptedIOException) && (!(ioe instanceof java.net.SocketTimeoutException)))
             return;
-          }
-
-          // Record the attempt
+          
+          activityStart = new Long(fullStartTime);
           activityCode = "IO ERROR";
           activityDetails = ioe.getMessage();
 
@@ -929,6 +881,14 @@ public class HttpPoster
 
           throw e;
         }
+        catch (SolrException e)
+        {
+          activityStart = new Long(fullStartTime);
+          activityCode = "FAILED";
+          activityDetails = e.getMessage();
+
+          throw e;
+        }
         catch (IOException ioe)
         {
           // Log the error



Mime
View raw message