manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1565468 - in /manifoldcf/trunk: ./ connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/ framework/pull-agent/src/main/java/org/apache...
Date Thu, 06 Feb 2014 22:56:55 GMT
Author: kwright
Date: Thu Feb  6 22:56:54 2014
New Revision: 1565468

URL: http://svn.apache.org/r1565468
Log:
Fix for CONNECTORS-883.  Warning: schema change

Modified:
    manifoldcf/trunk/   (props changed)
    manifoldcf/trunk/CHANGES.txt
    manifoldcf/trunk/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/JobNotifyRecord.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java

Propchange: manifoldcf/trunk/
------------------------------------------------------------------------------
  Merged /manifoldcf/branches/CONNECTORS-833:r1565133-1565451

Modified: manifoldcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/CHANGES.txt?rev=1565468&r1=1565467&r2=1565468&view=diff
==============================================================================
--- manifoldcf/trunk/CHANGES.txt (original)
+++ manifoldcf/trunk/CHANGES.txt Thu Feb  6 22:56:54 2014
@@ -3,6 +3,10 @@ $Id$
 
 ======================= 1.6-dev =====================
 
+CONNECTORS-883: Handle errors from the output connector during
+notification better.
+(Karl Wright)
+
 CONNECTORS-884: Log successful index/delete in Solr connector.
 (Erlend Garåsen, Karl Wright)
 

Modified: manifoldcf/trunk/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java?rev=1565468&r1=1565467&r2=1565468&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
(original)
+++ manifoldcf/trunk/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
Thu Feb  6 22:56:54 2014
@@ -431,6 +431,19 @@ public class HttpPoster
 
     long currentTime = System.currentTimeMillis();
     
+    if (e.getClass().getName().equals("java.net.ConnectException"))
+    {
+      // Server isn't up at all.  Try for a brief time then give up.
+      String message = "Server could not be contacted during "+context+": "+e.getMessage();
+      Logging.ingest.warn(message,e);
+      throw new ServiceInterruption(message,
+        e,
+        currentTime + interruptionRetryTime,
+        -1L,
+        3,
+        false);
+    }
+    
     if (e.getClass().getName().equals("java.net.SocketException"))
     {
       // In the past we would have treated this as a straight document rejection, and

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1565468&r1=1565467&r2=1565468&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
(original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
Thu Feb  6 22:56:54 2014
@@ -512,6 +512,14 @@ public interface IJobManager
   public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions, long
checkTime)
     throws ManifoldCFException;
 
+  /** Retry notification.
+  *@param jobNotifyRecord is the current job notification record.
+  *@param failTime is the new fail time (-1L if none).
+  *@param failRetryCount is the new fail retry count (-1 if none).
+  */
+  public void retryNotification(JobNotifyRecord jnr, long failTime, int failRetryCount)
+    throws ManifoldCFException;
+  
   /** Add an initial set of documents to the queue.
   * This method is called during job startup, when the queue is being loaded.
   * A set of document references is passed to this method, which updates the status of the
document
@@ -961,6 +969,14 @@ public interface IJobManager
   public void deleteIngestedDocumentIdentifiers(DocumentDescription[] identifiers)
     throws ManifoldCFException;
 
+  /** Abort notification.
+  *@param jobID is the job to abort.
+  *@param errorText is the error text.
+  *@return true if this is the first time the job is aborted.
+  */
+  public boolean notifyAbort(Long jobID, String errorText)
+    throws ManifoldCFException;
+
   /** Abort a running job due to a fatal error condition.
   *@param jobID is the job to abort.
   *@param errorText is the error text.

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/JobNotifyRecord.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/JobNotifyRecord.java?rev=1565468&r1=1565467&r2=1565468&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/JobNotifyRecord.java
(original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/JobNotifyRecord.java
Thu Feb  6 22:56:54 2014
@@ -25,11 +25,34 @@ public class JobNotifyRecord extends Job
 {
   public static final String _rcsid = "@(#)$Id$";
 
+  /** Fail time; -1L if none currently set */
+  protected final long failTime;
+  /** Fail retry count; -1 if none currently set */
+  protected final int failRetryCount;
+  
   /** Constructor.
   */
-  public JobNotifyRecord(Long jobID)
+  public JobNotifyRecord(Long jobID, long failTime, int failRetryCount)
   {
     super(jobID);
+    this.failTime = failTime;
+    this.failRetryCount = failRetryCount;
+  }
+
+  /** Get the hard fail time.
+  *@return the fail time in ms since epoch, or -1L if none.
+  */
+  public long getFailTime()
+  {
+    return failTime;
+  }
+
+  /** Get the hard fail retry count.
+  *@return the fail retry count, or -1 if none.
+  */
+  public int getFailRetryCount()
+  {
+    return failRetryCount;
   }
 
 }

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1565468&r1=1565467&r2=1565468&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
(original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
Thu Feb  6 22:56:54 2014
@@ -3572,6 +3572,68 @@ public class JobManager implements IJobM
     }
   }
 
+  /** Retry notification.
+  *@param jobNotifyRecord is the current job notification record.
+  *@param failTime is the new fail time (-1L if none).
+  *@param failCount is the new fail retry count (-1 if none).
+  */
+  @Override
+  public void retryNotification(JobNotifyRecord jnr, long failTime, int failCount)
+    throws ManifoldCFException
+  {
+    Long jobID = jnr.getJobID();
+    long oldFailTime = jnr.getFailTime();
+    if (oldFailTime == -1L)
+      oldFailTime = failTime;
+    failTime = oldFailTime;
+    int oldFailCount = jnr.getFailRetryCount();
+    if (oldFailCount == -1)
+      oldFailCount = failCount;
+    else
+    {
+      oldFailCount--;
+      if (failCount != -1 && oldFailCount > failCount)
+        oldFailCount = failCount;
+    }
+    failCount = oldFailCount;
+
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        jobs.retryNotification(jobID,failTime,failCount);
+        database.performCommit();
+        break;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted transaction resetting job notification: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+
+  }
+  
+
   /** Reset a set of cleaning documents for further processing in the future.
   * This method is called after some unknown number of the documents were cleaned, but then
an ingestion service interruption occurred.
   * Note well: The logic here basically presumes that we cannot know whether the documents
were indeed cleaned or not.
@@ -6107,17 +6169,73 @@ public class JobManager implements IJobM
   * until the job finishes on its own.
   *@param jobID is the job to abort.
   */
+  @Override
   public void manualAbortRestart(Long jobID)
     throws ManifoldCFException
   {
     manualAbortRestart(jobID,false);
   }
 
+  /** Abort notification.
+  *@param jobID is the job to abort.
+  *@param errorText is the error text.
+  *@return true if this is the first time the job is aborted.
+  */
+  @Override
+  public boolean notifyAbort(Long jobID, String errorText)
+    throws ManifoldCFException
+  {
+    if (Logging.jobs.isDebugEnabled())
+    {
+      Logging.jobs.debug("Aborting notification for "+jobID+" due to error '"+errorText+"'");
+    }
+    boolean rval;
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        rval = jobs.notifyAbort(jobID,errorText);
+        database.performCommit();
+        break;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted transaction aborting job notification: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+    if (rval && Logging.jobs.isDebugEnabled())
+    {
+      Logging.jobs.debug("Job notification job "+jobID+" abort signal successfully sent");
+    }
+    return rval;
+  }
+  
   /** Abort a running job due to a fatal error condition.
   *@param jobID is the job to abort.
   *@param errorText is the error text.
   *@return true if this is the first logged abort request for this job.
   */
+  @Override
   public boolean errorAbort(Long jobID, String errorText)
     throws ManifoldCFException
   {
@@ -7184,7 +7302,8 @@ public class JobManager implements IJobM
         StringBuilder sb = new StringBuilder("SELECT ");
         ArrayList list = new ArrayList();
         
-        sb.append(jobs.idField).append(" FROM ").append(jobs.getTableName()).append(" WHERE
")
+        sb.append(jobs.idField).append(",").append(jobs.failTimeField).append(",").append(jobs.failCountField)
+          .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
           .append(database.buildConjunctionClause(list,new ClauseDescription[]{
             new UnitaryClause(jobs.statusField,jobs.statusToString(jobs.STATUS_READYFORNOTIFY))}))
           .append(" FOR UPDATE");
@@ -7197,13 +7316,26 @@ public class JobManager implements IJobM
         {
           IResultRow row = set.getRow(i);
           Long jobID = (Long)row.getValue(jobs.idField);
+          Long failTimeLong = (Long)row.getValue(jobs.failTimeField);
+          Long failRetryCountLong = (Long)row.getValue(jobs.failCountField);
+          long failTime;
+          if (failTimeLong == null)
+            failTime = -1L;
+          else
+            failTime = failTimeLong.longValue();
+          int failRetryCount;
+          if (failRetryCountLong == null)
+            failRetryCount = -1;
+          else
+            failRetryCount = (int)failRetryCountLong.longValue();
+      
           // Mark status of job as "starting delete"
           jobs.writeTransientStatus(jobID,jobs.STATUS_NOTIFYINGOFCOMPLETION,processID);
           if (Logging.jobs.isDebugEnabled())
           {
             Logging.jobs.debug("Found job "+jobID+" in need of notification");
           }
-          rval[i++] = new JobNotifyRecord(jobID);
+          rval[i++] = new JobNotifyRecord(jobID,failTime,failRetryCount);
         }
         database.performCommit();
         return rval;

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1565468&r1=1565467&r2=1565468&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
(original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
Thu Feb  6 22:56:54 2014
@@ -55,6 +55,8 @@ import java.util.*;
  * <tr><td>reseedtime</td><td>BIGINT</td><td></td></tr>
  * <tr><td>hopcountmode</td><td>CHAR(1)</td><td></td></tr>
  * <tr><td>processid</td><td>VARCHAR(16)</td><td></td></tr>
+ * <tr><td>failtime</td><td>BIGINT</td><td></td></tr>
+ * <tr><td>failcount</td><td>BIGINT</td><td></td></tr>
  * </table>
  * <br><br>
  * 
@@ -189,7 +191,11 @@ public class Jobs extends org.apache.man
   public final static String hopcountModeField = "hopcountmode";
   /** Process id field, for keeping track of which process owns transient state */
   public final static String processIDField = "processid";
-  
+  /** When non-null, indicates the time that, when a ServiceInterruption occurs, the attempt
will be considered to have actually failed */
+  public static final String failTimeField = "failtime";
+  /** When non-null, indicates the number of retries remaining, after which the attempt will
be considered to have actually failed */
+  public static final String failCountField = "failcount";
+
   protected static Map statusMap;
   protected static Map typeMap;
   protected static Map startMap;
@@ -356,6 +362,8 @@ public class Jobs extends org.apache.man
         map.put(reseedTimeField,new ColumnDescription("BIGINT",false,true,null,null,false));
         map.put(hopcountModeField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
         map.put(processIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
+        map.put(failTimeField,new ColumnDescription("BIGINT",false,true,null,null,false));
+        map.put(failCountField,new ColumnDescription("BIGINT",false,true,null,null,false));
         performCreate(map,null);
       }
       else
@@ -373,6 +381,18 @@ public class Jobs extends org.apache.man
           insertMap.put(maxIntervalField,new ColumnDescription("BIGINT",false,true,null,null,false));
           performAlter(insertMap,null,null,null);
         }
+        if (existing.get(failTimeField) == null)
+        {
+          Map insertMap = new HashMap();
+          insertMap.put(failTimeField,new ColumnDescription("BIGINT",false,true,null,null,false));
+          performAlter(insertMap,null,null,null);
+        }
+        if (existing.get(failCountField) == null)
+        {
+          Map insertMap = new HashMap();
+          insertMap.put(failCountField,new ColumnDescription("BIGINT",false,true,null,null,false));
+          performAlter(insertMap,null,null,null);
+        }
       }
 
       // Handle related tables
@@ -930,6 +950,8 @@ public class Jobs extends org.apache.man
       new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_READYFORNOTIFY));
     map.put(processIDField,null);
+    map.put(failTimeField,null);
+    map.put(failCountField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
 
     // Starting up or aborting starting up goes back to just being ready
@@ -1097,6 +1119,8 @@ public class Jobs extends org.apache.man
       new UnitaryClause(statusField,statusToString(STATUS_NOTIFYINGOFCOMPLETION))});
     map.put(statusField,statusToString(STATUS_READYFORNOTIFY));
     map.put(processIDField,null);
+    map.put(failTimeField,null);
+    map.put(failCountField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
 
     // Starting up or aborting starting up goes back to just being ready
@@ -1492,6 +1516,8 @@ public class Jobs extends org.apache.man
       new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_READYFORNOTIFY));
     map.put(processIDField,null);
+    map.put(failTimeField,null);
+    map.put(failCountField,null);
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
 
   }
@@ -1663,6 +1689,30 @@ public class Jobs extends org.apache.man
   }
 
 
+  /** Retry notification.
+  *@param jobID is the job identifier.
+  *@param failTime is the fail time, -1 == none
+  *@param failCount is the fail count to use, -1 == none.
+  */
+  public void retryNotification(Long jobID, long failTime, int failCount)
+    throws ManifoldCFException
+  {
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(idField,jobID)});
+    HashMap map = new HashMap();
+    map.put(statusField,statusToString(STATUS_READYFORNOTIFY));
+    if (failTime == -1L)
+      map.put(failTimeField,null);
+    else
+      map.put(failTimeField,new Long(failTime));
+    if (failCount == -1)
+      map.put(failCountField,null);
+    else
+      map.put(failCountField,failCount);
+    performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
+  }
+  
   /** Write job status and window end, and clear the endtime field.  (The start time will
be written
   * when the job enters the "active" state.)
   *@param jobID is the job identifier.
@@ -2007,6 +2057,44 @@ public class Jobs extends org.apache.man
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
   }
 
+  /** Abort a job notification.
+  *@param jobID is the job id.
+  *@param errorText is the error, or null if none.
+  */
+  public boolean notifyAbort(Long jobID, String errorText)
+    throws ManifoldCFException
+  {
+    // Get the current job status
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(idField,jobID)});
+    IResultSet set = performQuery("SELECT "+statusField+" FROM "+getTableName()+
+      " WHERE "+query+" FOR UPDATE",list,null,null);
+    if (set.getRowCount() == 0)
+      throw new ManifoldCFException("Job does not exist: "+jobID);
+    IResultRow row = set.getRow(0);
+    int status = stringToStatus(row.getValue(statusField).toString());
+    if (status == STATUS_INACTIVE)
+      return false;
+    int newStatus;
+    switch (status)
+    {
+    case STATUS_NOTIFYINGOFCOMPLETION:
+      newStatus = STATUS_INACTIVE;
+      break;
+    default:
+      throw new ManifoldCFException("Job "+jobID+" is not notifying");
+    }
+    // Pause the job
+    HashMap map = new HashMap();
+    map.put(statusField,statusToString(newStatus));
+    map.put(errorField,errorText);
+    map.put(failTimeField,null);
+    map.put(failCountField,null);
+    performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
+    return true;
+  }
+
   /** Abort a job.
   *@param jobID is the job id.
   *@param errorText is the error, or null if none.
@@ -2529,6 +2617,8 @@ public class Jobs extends org.apache.man
     HashMap map = new HashMap();
     map.put(statusField,statusToString(STATUS_INACTIVE));
     map.put(processIDField,null);
+    map.put(failTimeField,null);
+    map.put(failCountField,null);
     // Leave everything else around from the abort/finish.
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
   }

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java?rev=1565468&r1=1565467&r2=1565468&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java
(original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java
Thu Feb  6 22:56:54 2014
@@ -75,7 +75,7 @@ public class JobNotificationThread exten
           JobNotifyRecord[] jobsNeedingNotification = jobManager.getJobsReadyForInactivity(processID);
           try
           {
-            HashMap connectionNames = new HashMap();
+            Set<OutputAndRepositoryConnection> connectionNames = new HashSet<OutputAndRepositoryConnection>();
             
             int k = 0;
             while (k < jobsNeedingNotification.length)
@@ -89,18 +89,15 @@ public class JobNotificationThread exten
                 String repositoryConnectionName = job.getConnectionName();
                 String outputConnectionName = job.getOutputConnectionName();
                 OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName,
repositoryConnectionName);
-                connectionNames.put(c,c);
+                connectionNames.add(c);
               }
             }
             
             // Attempt to notify the specified connections
-            HashMap notifiedConnections = new HashMap();
+            Map<OutputAndRepositoryConnection,Disposition> notifiedConnections = new
HashMap<OutputAndRepositoryConnection,Disposition>();
             
-            Iterator iter = connectionNames.keySet().iterator();
-            while (iter.hasNext())
+            for (OutputAndRepositoryConnection connections : connectionNames)
             {
-              OutputAndRepositoryConnection connections = (OutputAndRepositoryConnection)iter.next();
-              
               String outputConnectionName = connections.getOutputConnectionName();
               String repositoryConnectionName = connections.getRepositoryConnectionName();
               
@@ -119,11 +116,11 @@ public class JobNotificationThread exten
                     try
                     {
                       connector.noteJobComplete(activity);
+                      notifiedConnections.put(connections,new Disposition());
                     }
                     catch (ServiceInterruption e)
                     {
-                      Logging.threads.warn("Service interruption notifying connection - retrying:
"+e.getMessage(),e);
-                      continue;
+                      notifiedConnections.put(connections,new Disposition(e));
                     }
                     catch (ManifoldCFException e)
                     {
@@ -136,7 +133,6 @@ public class JobNotificationThread exten
                       // Nothing special; report the error and keep going.
                       Logging.threads.error(e.getMessage(),e);
                     }
-                    notifiedConnections.put(connections,connections);
                   }
                   finally
                   {
@@ -160,11 +156,61 @@ public class JobNotificationThread exten
                 String repositoryConnectionName = job.getConnectionName();
                 OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName,
repositoryConnectionName);
                 
-                if (notifiedConnections.get(c) != null)
+                Disposition d = notifiedConnections.get(c);
+                if (d != null)
                 {
-                  // When done, put the job into the Inactive state.  Otherwise, the notification
will be retried until it succeeds.
-                  jobManager.inactivateJob(jobID);
-                  jsr.noteStarted();
+                  ServiceInterruption e = d.getServiceInterruption();
+                  if (e == null)
+                  {
+                    jobManager.inactivateJob(jobID);
+                    jsr.noteStarted();
+                  }
+                  else
+                  {
+                    if (!e.jobInactiveAbort())
+                    {
+                      Logging.jobs.warn("Notification service interruption reported for job
"+
+                        jobID+" output connection '"+outputConnectionName+"': "+
+                        e.getMessage());
+                    }
+
+                    ManifoldCFException abortOnFail;
+                    if (!e.jobInactiveAbort() && e.isAbortOnFail())
+                      abortOnFail = new ManifoldCFException("Failure performing notification"+((e.getCause()!=null)?":
"+e.getCause().getMessage():""),e.getCause());
+                    else
+                      abortOnFail = null;
+
+                    // If either we are going to be requeuing beyond the fail time, OR
+                    // the number of retries available has hit 0, THEN we treat this
+                    // as either an "ignore" or a hard error.
+                    if (!e.jobInactiveAbort() && (jsr.getFailTime() != -1L &&
jsr.getFailTime() < e.getRetryTime() ||
+                      jsr.getFailRetryCount() == 0))
+                    {
+                      // Treat this as a hard failure.
+                      if (e.isAbortOnFail())
+                      {
+                        // Note the error in the job, and transition to inactive state
+                        if (abortOnFail != null)
+                          Logging.jobs.error(abortOnFail.getMessage(),abortOnFail);
+                        jobManager.notifyAbort(jobID,(abortOnFail==null)?"":"Repeated service
interruptions during notification: "+abortOnFail.getMessage()+": ending job");
+                        jsr.noteStarted();
+                      }
+                      else
+                      {
+                        // Not sure this can happen -- but just transition silently to inactive
state
+                        jobManager.inactivateJob(jobID);
+                        jsr.noteStarted();
+                      }
+                    }
+                    else
+                    {
+                      // Reset the job to the READYFORNOTIFY state, updating the failtime
and failcount fields
+                      if (abortOnFail != null)
+                        Logging.jobs.warn(abortOnFail.getMessage(),abortOnFail);
+                      jobManager.retryNotification(jsr,e.getFailTime(),e.getFailRetryCount());
+                      jsr.noteStarted();
+                    }
+                  }
                 }
               }
             }
@@ -255,11 +301,32 @@ public class JobNotificationThread exten
     }
   }
 
+  /** Disposition of an output/repository connection combination */
+  protected static class Disposition
+  {
+    protected final ServiceInterruption serviceInterruption;
+    
+    public Disposition(ServiceInterruption serviceInterruption)
+    {
+      this.serviceInterruption = serviceInterruption;
+    }
+    
+    public Disposition()
+    {
+      this.serviceInterruption = null;
+    }
+    
+    public ServiceInterruption getServiceInterruption()
+    {
+      return serviceInterruption;
+    }
+  }
+  
   /** Output connection/repository connection pair object */
   protected static class OutputAndRepositoryConnection
   {
-    protected String outputConnectionName;
-    protected String repositoryConnectionName;
+    protected final String outputConnectionName;
+    protected final String repositoryConnectionName;
     
     public OutputAndRepositoryConnection(String outputConnectionName, String repositoryConnectionName)
     {



Mime
View raw message