manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1604177 - in /manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler: interfaces/IJobManager.java jobs/JobManager.java jobs/Jobs.java system/JobNotificationThread.java system/StartDeleteThread.java
Date Fri, 20 Jun 2014 14:33:28 GMT
Author: kwright
Date: Fri Jun 20 14:33:27 2014
New Revision: 1604177

URL: http://svn.apache.org/r1604177
Log:
More rearrangement

Modified:
    manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
    manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
    manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
    manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java
    manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java

Modified: manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1604177&r1=1604176&r2=1604177&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
(original)
+++ manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
Fri Jun 20 14:33:27 2014
@@ -548,7 +548,15 @@ public interface IJobManager
   */
   public void retryNotification(JobNotifyRecord jobNotifyRecord, long failTime, int failRetryCount)
     throws ManifoldCFException;
-  
+
+  /** Retry delete notification.
+  *@param jnr is the current job notification record.
+  *@param failTime is the new fail time (-1L if none).
+  *@param failCount is the new fail retry count (-1 if none).
+  */
+  public void retryDeleteNotification(JobNotifyRecord jnr, long failTime, int failCount)
+    throws ManifoldCFException;
+
   /** Add an initial set of documents to the queue.
   * This method is called during job startup, when the queue is being loaded.
   * A set of document references is passed to this method, which updates the status of the
document
@@ -825,11 +833,11 @@ public interface IJobManager
   public void resetSeedJob(Long jobID)
     throws ManifoldCFException;
 
-  /** Get the list of jobs that are ready for deletion.
+  /** Get the list of jobs that are ready for delete cleanup.
   *@param processID is the current process ID.
   *@return jobs that were in the "readyfordelete" state.
   */
-  public JobDeleteRecord[] getJobsReadyForDelete(String processID)
+  public JobDeleteRecord[] getJobsReadyForDeleteCleanup(String processID)
     throws ManifoldCFException;
     
   /** Get the list of jobs that are ready for startup.
@@ -846,12 +854,25 @@ public interface IJobManager
   public JobNotifyRecord[] getJobsReadyForInactivity(String processID)
     throws ManifoldCFException;
 
+  /** Find the list of jobs that need to have their connectors notified of job deletion.
+  *@param processID is the process ID.
+  *@return the ID's of jobs that need their output connectors notified in order to be removed.
+  */
+  public JobNotifyRecord[] getJobsReadyForDelete(String processID)
+    throws ManifoldCFException;
+
   /** Inactivate a job, from the notification state.
   *@param jobID is the ID of the job to inactivate.
   */
   public void inactivateJob(Long jobID)
     throws ManifoldCFException;
 
+  /** Remove a job, from the notification state.
+  *@param jobID is the ID of the job to remove.
+  */
+  public void removeJob(Long jobID)
+    throws ManifoldCFException;
+
   /** Reset a job starting for delete back to "ready for delete"
   * state.
   *@param jobID is the job id.
@@ -866,6 +887,13 @@ public interface IJobManager
   public void resetNotifyJob(Long jobID)
     throws ManifoldCFException;
 
+  /** Reset a job that is delete notifying back to "ready for delete notify"
+  * state.
+  *@param jobID is the job id.
+  */
+  public void resetDeleteNotifyJob(Long jobID)
+    throws ManifoldCFException;
+
   /** Reset a starting job back to "ready for startup" state.
   *@param jobID is the job id.
   */

Modified: manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1604177&r1=1604176&r2=1604177&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
(original)
+++ manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
Fri Jun 20 14:33:27 2014
@@ -4299,7 +4299,68 @@ public class JobManager implements IJobM
     }
 
   }
-  
+
+  /** Retry delete notification.
+  *@param jnr is the current job notification record.
+  *@param failTime is the new fail time (-1L if none).
+  *@param failCount is the new fail retry count (-1 if none).
+  */
+  @Override
+  public void retryDeleteNotification(JobNotifyRecord jnr, long failTime, int failCount)
+    throws ManifoldCFException
+  {
+    Long jobID = jnr.getJobID();
+    long oldFailTime = jnr.getFailTime();
+    if (oldFailTime == -1L)
+      oldFailTime = failTime;
+    failTime = oldFailTime;
+    int oldFailCount = jnr.getFailRetryCount();
+    if (oldFailCount == -1)
+      oldFailCount = failCount;
+    else
+    {
+      oldFailCount--;
+      if (failCount != -1 && oldFailCount > failCount)
+        oldFailCount = failCount;
+    }
+    failCount = oldFailCount;
+
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        jobs.retryDeleteNotification(jobID,failTime,failCount);
+        database.performCommit();
+        break;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted transaction resetting job notification: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+
+  }
+
   // Add documents methods
   
   /** Add an initial set of documents to the queue.
@@ -6862,12 +6923,12 @@ public class JobManager implements IJobM
     }
   }
 
-  /** Get the list of jobs that are ready for deletion.
+  /** Get the list of jobs that are ready for delete cleanup.
   *@param processID is the current process ID.
   *@return jobs that were in the "readyfordelete" state.
   */
   @Override
-  public JobDeleteRecord[] getJobsReadyForDelete(String processID)
+  public JobDeleteRecord[] getJobsReadyForDeleteCleanup(String processID)
     throws ManifoldCFException
   {
     while (true)
@@ -7091,6 +7152,84 @@ public class JobManager implements IJobM
     }
   }
 
+  /** Remove a job, from the notification state.
+  *@param jobID is the ID of the job to remove.
+  */
+  @Override
+  public void removeJob(Long jobID)
+    throws ManifoldCFException
+  {
+    // While there is no flow that can cause a job to be in the wrong state when this gets
called, as a precaution
+    // it might be a good idea to put this in a transaction and have the state get checked
first.
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        // Check job status
+        StringBuilder sb = new StringBuilder("SELECT ");
+        ArrayList list = new ArrayList();
+        
+        sb.append(jobs.statusField).append(" FROM ").append(jobs.getTableName()).append("
WHERE ")
+          .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+            new UnitaryClause(jobs.idField,jobID)}))
+          .append(" FOR UPDATE");
+            
+        IResultSet set = database.performQuery(sb.toString(),list,null,null);
+        if (set.getRowCount() == 0)
+          // Presume already removed!
+          return;
+        IResultRow row = set.getRow(0);
+        int status = jobs.stringToStatus((String)row.getValue(jobs.statusField));
+
+        switch (status)
+        {
+        case Jobs.STATUS_NOTIFYINGOFDELETION:
+          ManifoldCF.noteConfigurationChange();
+          // Remove documents from job queue
+          jobQueue.deleteAllJobRecords(jobID);
+          // Remove carrydowns for the job
+          carryDown.deleteOwner(jobID);
+          // Nothing is in a critical section - so this should be OK.
+          hopCount.deleteOwner(jobID);
+          jobs.delete(jobID);
+          if (Logging.jobs.isDebugEnabled())
+          {
+            Logging.jobs.debug("Removed job "+jobID);
+          }
+          break;
+        default:
+          throw new ManifoldCFException("Unexpected job status: "+Integer.toString(status));
+        }
+        database.performCommit();
+        return;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted clearing delete notification state for job: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+  }
+
   /** Reset a job starting for delete back to "ready for delete"
   * state.
   *@param jobID is the job id.
@@ -7229,6 +7368,75 @@ public class JobManager implements IJobM
     }
   }
 
+  /** Reset a job that is delete notifying back to "ready for delete notify"
+  * state.
+  *@param jobID is the job id.
+  */
+  @Override
+  public void resetDeleteNotifyJob(Long jobID)
+    throws ManifoldCFException
+  {
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        // Check job status
+        StringBuilder sb = new StringBuilder("SELECT ");
+        ArrayList list = new ArrayList();
+        
+        sb.append(jobs.statusField).append(" FROM ").append(jobs.getTableName()).append("
WHERE ")
+          .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+            new UnitaryClause(jobs.idField,jobID)}))
+          .append(" FOR UPDATE");
+            
+        IResultSet set = database.performQuery(sb.toString(),list,null,null);
+        if (set.getRowCount() == 0)
+          throw new ManifoldCFException("No such job: "+jobID);
+        IResultRow row = set.getRow(0);
+        int status = jobs.stringToStatus((String)row.getValue(jobs.statusField));
+
+        switch (status)
+        {
+        case Jobs.STATUS_NOTIFYINGOFDELETION:
+          if (Logging.jobs.isDebugEnabled())
+            Logging.jobs.debug("Setting job "+jobID+" back to 'ReadyForDeleteNotify' state");
+
+          // Set the state of the job back to "ReadyForNotify"
+          jobs.writePermanentStatus(jobID,jobs.STATUS_READYFORDELETENOTIFY,true);
+          break;
+        default:
+          throw new ManifoldCFException("Unexpected job status: "+Integer.toString(status));
+        }
+        database.performCommit();
+        return;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted resetting delete notify job: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+  }
+
   /** Reset a starting job back to "ready for startup" state.
   *@param jobID is the job id.
   */
@@ -7508,18 +7716,12 @@ public class JobManager implements IJobM
           if (confirmSet.getRowCount() > 0)
             continue;
 
-          ManifoldCF.noteConfigurationChange();
-          // Remove documents from job queue
-          jobQueue.deleteAllJobRecords(jobID);
-          // Remove carrydowns for the job
-          carryDown.deleteOwner(jobID);
-          // Nothing is in a critical section - so this should be OK.
-          hopCount.deleteOwner(jobID);
-          jobs.delete(jobID);
+          jobs.finishJobCleanup(jobID);
           if (Logging.jobs.isDebugEnabled())
           {
-            Logging.jobs.debug("Removed job "+jobID);
+            Logging.jobs.debug("Job "+jobID+" cleanup is now completed");
           }
+
         }
         database.performCommit();
         return;
@@ -7732,7 +7934,88 @@ public class JobManager implements IJobM
       }
     }
   }
-  
+
+  /** Find the list of jobs that need to have their connectors notified of job deletion.
+  *@param processID is the process ID.
+  *@return the ID's of jobs that need their output connectors notified in order to be removed.
+  */
+  @Override
+  public JobNotifyRecord[] getJobsReadyForDelete(String processID)
+    throws ManifoldCFException
+  {
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        // Do the query
+        StringBuilder sb = new StringBuilder("SELECT ");
+        ArrayList list = new ArrayList();
+        
+        sb.append(jobs.idField).append(",").append(jobs.failTimeField).append(",").append(jobs.failCountField)
+          .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+          .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+            new UnitaryClause(jobs.statusField,jobs.statusToString(jobs.STATUS_READYFORDELETENOTIFY))}))
+          .append(" FOR UPDATE");
+            
+        IResultSet set = database.performQuery(sb.toString(),list,null,null);
+        // Return them all
+        JobNotifyRecord[] rval = new JobNotifyRecord[set.getRowCount()];
+        int i = 0;
+        while (i < rval.length)
+        {
+          IResultRow row = set.getRow(i);
+          Long jobID = (Long)row.getValue(jobs.idField);
+          Long failTimeLong = (Long)row.getValue(jobs.failTimeField);
+          Long failRetryCountLong = (Long)row.getValue(jobs.failCountField);
+          long failTime;
+          if (failTimeLong == null)
+            failTime = -1L;
+          else
+            failTime = failTimeLong.longValue();
+          int failRetryCount;
+          if (failRetryCountLong == null)
+            failRetryCount = -1;
+          else
+            failRetryCount = (int)failRetryCountLong.longValue();
+      
+          // Mark status of job as "starting delete"
+          jobs.writeTransientStatus(jobID,jobs.STATUS_NOTIFYINGOFDELETION,processID);
+          if (Logging.jobs.isDebugEnabled())
+          {
+            Logging.jobs.debug("Found job "+jobID+" in need of delete notification");
+          }
+          rval[i++] = new JobNotifyRecord(jobID,failTime,failRetryCount);
+        }
+        database.performCommit();
+        return rval;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted getting jobs ready for notify: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+  }
+
   /** Complete the sequence that resumes jobs, either from a pause or from a scheduling window
   * wait.  The logic will restore the job to an active state (many possibilities depending
on
   * connector status), and will record the jobs that have been so modified.

Modified: manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1604177&r1=1604176&r2=1604177&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
(original)
+++ manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
Fri Jun 20 14:33:27 2014
@@ -124,9 +124,9 @@ public class Jobs extends org.apache.man
   // But, since there is no indication in the jobs table of an uninstalled connector for
such jobs, the code which starts
   // jobs up (or otherwise would enter any state that has a corresponding special state)
must check to see if the underlying
   // connector exists before deciding what state to put the job into.
-  public static final int STATUS_ACTIVE_UNINSTALLED = 38;               // Active, but repository
connector not installed
-  public static final int STATUS_ACTIVESEEDING_UNINSTALLED = 39;   // Active and seeding,
but repository connector not installed
-  public static final int STATUS_DELETING_NOOUTPUT = 40;                // Job is being deleted
but there's no output connector installed
+  public static final int STATUS_ACTIVE_UNINSTALLED = 40;               // Active, but repository
connector not installed
+  public static final int STATUS_ACTIVESEEDING_UNINSTALLED = 41;   // Active and seeding,
but repository connector not installed
+  public static final int STATUS_DELETING_NOOUTPUT = 42;                // Job is being deleted
but there's no output connector installed
 
   // Deprecated states.  These states should never be used; they're defined only for upgrade
purposes
   public static final int STATUS_ACTIVE_NOOUTPUT = 100;                  // Active, but output
connector not installed
@@ -1694,9 +1694,9 @@ public class Jobs extends org.apache.man
     map.put(failCountField,null);
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
 
-    ArrayList list = new ArrayList();
-    HashMap map = new HashMap();
-    String query = buildConjunctionClause(list,new ClauseDescription[]{
+    list.clear();
+    map.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_NOTIFYINGOFDELETION)),
       new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_READYFORDELETENOTIFY));

Modified: manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java?rev=1604177&r1=1604176&r2=1604177&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java
(original)
+++ manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java
Fri Jun 20 14:33:27 2014
@@ -249,6 +249,184 @@ public class JobNotificationThread exten
               throw exception;
           }
 
+          // ???
+          JobNotifyRecord[] jobsNeedingDeleteNotification = jobManager.getJobsReadyForDelete(processID);
+          try
+          {
+            Set<OutputAndRepositoryConnection> connectionNames = new HashSet<OutputAndRepositoryConnection>();
+            
+            int k = 0;
+            while (k < jobsNeedingDeleteNotification.length)
+            {
+              JobNotifyRecord jsr = jobsNeedingDeleteNotification[k++];
+              Long jobID = jsr.getJobID();
+              IJobDescription job = jobManager.load(jobID,true);
+              if (job != null)
+              {
+                // Get the connection name
+                String repositoryConnectionName = job.getConnectionName();
+                IPipelineSpecificationBasic basicSpec = new PipelineSpecificationBasic(job);
+                for (int i = 0; i < basicSpec.getOutputCount(); i++)
+                {
+                  String outputConnectionName = basicSpec.getStageConnectionName(basicSpec.getOutputStage(i));
+                  OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName,
repositoryConnectionName);
+                  connectionNames.add(c);
+                }
+              }
+            }
+            
+            // Attempt to notify the specified connections
+            Map<OutputAndRepositoryConnection,Disposition> notifiedConnections = new
HashMap<OutputAndRepositoryConnection,Disposition>();
+            
+            for (OutputAndRepositoryConnection connections : connectionNames)
+            {
+              String outputConnectionName = connections.getOutputConnectionName();
+              String repositoryConnectionName = connections.getRepositoryConnectionName();
+              
+              OutputNotifyActivity activity = new OutputNotifyActivity(repositoryConnectionName,repositoryConnectionManager,outputConnectionName);
+              
+              IOutputConnection connection = connectionManager.load(outputConnectionName);
+              if (connection != null)
+              {
+                // Grab an appropriate connection instance
+                IOutputConnector connector = outputConnectorPool.grab(connection);
+                if (connector != null)
+                {
+                  try
+                  {
+                    // Do the notification itself
+                    try
+                    {
+                      connector.noteJobComplete(activity);
+                      notifiedConnections.put(connections,new Disposition());
+                    }
+                    catch (ServiceInterruption e)
+                    {
+                      notifiedConnections.put(connections,new Disposition(e));
+                    }
+                    catch (ManifoldCFException e)
+                    {
+                      if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+                        throw e;
+                      if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+                        throw e;
+                      if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
+                        throw e;
+                      // Nothing special; report the error and keep going.
+                      Logging.threads.error(e.getMessage(),e);
+                    }
+                  }
+                  finally
+                  {
+                    outputConnectorPool.release(connection,connector);
+                  }
+                }
+              }
+            }
+            
+            // Go through jobs again, and put the notified ones into the inactive state.
+            k = 0;
+            while (k < jobsNeedingDeleteNotification.length)
+            {
+              JobNotifyRecord jsr = jobsNeedingDeleteNotification[k++];
+              Long jobID = jsr.getJobID();
+              IJobDescription job = jobManager.load(jobID,true);
+              if (job != null)
+              {
+                // Get the connection name
+                String repositoryConnectionName = job.getConnectionName();
+                IPipelineSpecificationBasic basicSpec = new PipelineSpecificationBasic(job);
+                boolean allOK = true;
+                for (int i = 0; i < basicSpec.getOutputCount(); i++)
+                {
+                  String outputConnectionName = basicSpec.getStageConnectionName(basicSpec.getOutputStage(i));
+
+                  OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName,
repositoryConnectionName);
+                  
+                  Disposition d = notifiedConnections.get(c);
+                  if (d != null)
+                  {
+                    ServiceInterruption e = d.getServiceInterruption();
+                    if (e == null)
+                    {
+                      break;
+                    }
+                    else
+                    {
+                      if (!e.jobInactiveAbort())
+                      {
+                        Logging.jobs.warn("Delete notification service interruption reported
for job "+
+                          jobID+" output connection '"+outputConnectionName+"': "+
+                          e.getMessage(),e);
+                      }
+
+                      // If either we are going to be requeuing beyond the fail time, OR
+                      // the number of retries available has hit 0, THEN we treat this
+                      // as either an "ignore" or a hard error.
+                      if (!e.jobInactiveAbort() && (jsr.getFailTime() != -1L &&
jsr.getFailTime() < e.getRetryTime() ||
+                        jsr.getFailRetryCount() == 0))
+                      {
+                        // Treat this as a hard failure.
+                        if (e.isAbortOnFail())
+                        {
+                          // Note the error in the job, and transition to inactive state
+                          String message = e.jobInactiveAbort()?"":"Repeated service interruptions
during notification"+((e.getCause()!=null)?": "+e.getCause().getMessage():"");
+                          if (jobManager.errorAbort(jobID,message) && message.length()
> 0)
+                            Logging.jobs.error(message,e.getCause());
+                          jsr.noteStarted();
+                        }
+                        else
+                        {
+                          // Not sure this can happen -- but just transition silently to
inactive state
+                          jobManager.removeJob(jobID);
+                          jsr.noteStarted();
+                        }
+                      }
+                      else
+                      {
+                        // Reset the job to the READYFORDELETENOTIFY state, updating the
failtime and failcount fields
+                        jobManager.retryDeleteNotification(jsr,e.getFailTime(),e.getFailRetryCount());
+                        jsr.noteStarted();
+                      }
+                      allOK = false;
+                      break;
+                    }
+                  }
+                }
+                if (allOK)
+                {
+                  jobManager.removeJob(jobID);
+                  jsr.noteStarted();
+                }
+
+              }
+            }
+          }
+          finally
+          {
+            // Clean up all jobs that did not start
+            ManifoldCFException exception = null;
+            int i = 0;
+            while (i < jobsNeedingDeleteNotification.length)
+            {
+              JobNotifyRecord jsr = jobsNeedingDeleteNotification[i++];
+              if (!jsr.wasStarted())
+              {
+                // Clean up from failed start.
+                try
+                {
+                  jobManager.resetDeleteNotifyJob(jsr.getJobID());
+                }
+                catch (ManifoldCFException e)
+                {
+                  exception = e;
+                }
+              }
+            }
+            if (exception != null)
+              throw exception;
+          }
+
           ManifoldCF.sleep(10000L);
         }
         catch (ManifoldCFException e)

Modified: manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java?rev=1604177&r1=1604176&r2=1604177&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java
(original)
+++ manifoldcf/branches/CONNECTORS-980/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java
Fri Jun 20 14:33:27 2014
@@ -84,7 +84,7 @@ public class StartDeleteThread extends T
 
           // See if there are any starting jobs.
           // Note: Since this following call changes the job state, we must be careful to
reset it on any kind of failure.
-          JobDeleteRecord[] deleteJobs = jobManager.getJobsReadyForDelete(processID);
+          JobDeleteRecord[] deleteJobs = jobManager.getJobsReadyForDeleteCleanup(processID);
           try
           {
 



Mime
View raw message