manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1567304 - /manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
Date Tue, 11 Feb 2014 19:16:04 GMT
Author: kwright
Date: Tue Feb 11 19:16:04 2014
New Revision: 1567304

URL: http://svn.apache.org/r1567304
Log:
More fixes for CONNECTORS-888.

Modified:
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1567304&r1=1567303&r2=1567304&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
(original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
Tue Feb 11 19:16:04 2014
@@ -37,6 +37,8 @@ public class JobManager implements IJobM
   protected static final String expireStufferLock = "_EXPIRESTUFFER_";
   protected static final String cleanStufferLock = "_CLEANSTUFFER_";
   protected static final String jobStopLock = "_JOBSTOP_";
+  protected static final String jobResumeLock = "_JOBRESUME_";
+  protected static final String jobResetLock = "_JOBRESET_";
   protected static final String hopLock = "_HOPLOCK_";
 
   // Member variables
@@ -7531,37 +7533,45 @@ public class JobManager implements IJobM
   public void finishJobResumes(long timestamp, ArrayList modifiedJobs)
     throws ManifoldCFException
   {
+    lockManager.enterWriteLock(jobResumeLock);
+    try
+    {
     // Do the first query, getting the candidate jobs to be considered
-    StringBuilder sb = new StringBuilder("SELECT ");
-    ArrayList list = new ArrayList();
-        
-    sb.append(jobs.idField)
-      .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
-      .append(database.buildConjunctionClause(list,new ClauseDescription[]{
-        new MultiClause(jobs.statusField,new Object[]{
-          jobs.statusToString(jobs.STATUS_RESUMING),
-          jobs.statusToString(jobs.STATUS_RESUMINGSEEDING)
-          })}));
-        
-    IResultSet set = database.performQuery(sb.toString(),list,null,null);
+      StringBuilder sb = new StringBuilder("SELECT ");
+      ArrayList list = new ArrayList();
+          
+      sb.append(jobs.idField)
+        .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+        .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+          new MultiClause(jobs.statusField,new Object[]{
+            jobs.statusToString(jobs.STATUS_RESUMING),
+            jobs.statusToString(jobs.STATUS_RESUMINGSEEDING)
+            })}));
+          
+      IResultSet set = database.performQuery(sb.toString(),list,null,null);
 
-    int i = 0;
-    while (i < set.getRowCount())
-    {
-      IResultRow row = set.getRow(i++);
-      Long jobID = (Long)row.getValue(jobs.idField);
+      int i = 0;
+      while (i < set.getRowCount())
+      {
+        IResultRow row = set.getRow(i++);
+        Long jobID = (Long)row.getValue(jobs.idField);
 
-      // There are no secondary checks that need to be made; just resume
-      IJobDescription jobDesc = jobs.load(jobID,true);
-      modifiedJobs.add(jobDesc);
+        // There are no secondary checks that need to be made; just resume
+        IJobDescription jobDesc = jobs.load(jobID,true);
+        modifiedJobs.add(jobDesc);
 
-      jobs.finishResumeJob(jobID,timestamp);
-          
-      if (Logging.jobs.isDebugEnabled())
-      {
-        Logging.jobs.debug("Resumed job "+jobID);
+        jobs.finishResumeJob(jobID,timestamp);
+            
+        if (Logging.jobs.isDebugEnabled())
+        {
+          Logging.jobs.debug("Resumed job "+jobID);
+        }
       }
     }
+    finally
+    {
+      lockManager.leaveWriteLock(jobResumeLock);
+    }
   }
 
   /** Complete the sequence that stops jobs, either for abort, pause, or because of a scheduling
@@ -7664,84 +7674,92 @@ public class JobManager implements IJobM
   public void resetJobs(long currentTime, ArrayList resetJobs)
     throws ManifoldCFException
   {
-    // Query for all jobs that fulfill the criteria
-    // The query used to look like:
-    //
-    // SELECT id FROM jobs t0 WHERE status='D' AND NOT EXISTS(SELECT 'x' FROM jobqueue t1
WHERE
-    //      t0.id=t1.jobid AND t1.status='P')
-    //
-    // Now, the query is broken up, for performance
-
-    // Do the first query, getting the candidate jobs to be considered
-    StringBuilder sb = new StringBuilder("SELECT ");
-    ArrayList list = new ArrayList();
-        
-    sb.append(jobs.idField).append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
-      .append(database.buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(jobs.statusField,jobs.statusToString(jobs.STATUS_SHUTTINGDOWN))}));
-            
-    IResultSet set = database.performQuery(sb.toString(),list,null,null);
-
-    int i = 0;
-    while (i < set.getRowCount())
+    lockManager.enterWriteLock(jobResetLock);
+    try
     {
-      IResultRow row = set.getRow(i++);
-      Long jobID = (Long)row.getValue(jobs.idField);
+      // Query for all jobs that fulfill the criteria
+      // The query used to look like:
+      //
+      // SELECT id FROM jobs t0 WHERE status='D' AND NOT EXISTS(SELECT 'x' FROM jobqueue
t1 WHERE
+      //      t0.id=t1.jobid AND t1.status='P')
+      //
+      // Now, the query is broken up, for performance
 
-      // Check to be sure the job is a candidate for shutdown
-      sb = new StringBuilder("SELECT ");
-      list.clear();
+      // Do the first query, getting the candidate jobs to be considered
+      StringBuilder sb = new StringBuilder("SELECT ");
+      ArrayList list = new ArrayList();
           
-      sb.append(jobQueue.idField).append(" FROM ").append(jobQueue.getTableName()).append("
WHERE ")
+      sb.append(jobs.idField).append(" FROM ").append(jobs.getTableName()).append(" WHERE
")
         .append(database.buildConjunctionClause(list,new ClauseDescription[]{
-          new UnitaryClause(jobQueue.jobIDField,jobID),
-          new MultiClause(jobQueue.statusField,new Object[]{
-            jobQueue.statusToString(jobQueue.STATUS_PURGATORY),
-            jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED)})}))
-        .append(" ").append(database.constructOffsetLimitClause(0,1));
+          new UnitaryClause(jobs.statusField,jobs.statusToString(jobs.STATUS_SHUTTINGDOWN))}));
+              
+      IResultSet set = database.performQuery(sb.toString(),list,null,null);
 
-      IResultSet confirmSet = database.performQuery(sb.toString(),list,null,null,1,null);
+      int i = 0;
+      while (i < set.getRowCount())
+      {
+        IResultRow row = set.getRow(i++);
+        Long jobID = (Long)row.getValue(jobs.idField);
 
-      if (confirmSet.getRowCount() > 0)
-        continue;
+        // Check to be sure the job is a candidate for shutdown
+        sb = new StringBuilder("SELECT ");
+        list.clear();
+            
+        sb.append(jobQueue.idField).append(" FROM ").append(jobQueue.getTableName()).append("
WHERE ")
+          .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+            new UnitaryClause(jobQueue.jobIDField,jobID),
+            new MultiClause(jobQueue.statusField,new Object[]{
+              jobQueue.statusToString(jobQueue.STATUS_PURGATORY),
+              jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED)})}))
+          .append(" ").append(database.constructOffsetLimitClause(0,1));
 
-      // The shutting-down phase is complete.  However, we need to check if there are any
outstanding
-      // PENDING or PENDINGPURGATORY records before we can decide what to do.
-      sb = new StringBuilder("SELECT ");
-      list.clear();
-          
-      sb.append(jobQueue.idField).append(" FROM ").append(jobQueue.getTableName()).append("
WHERE ")
-        .append(database.buildConjunctionClause(list,new ClauseDescription[]{
-          new UnitaryClause(jobQueue.jobIDField,jobID),
-          new MultiClause(jobQueue.statusField,new Object[]{
-            jobQueue.statusToString(jobQueue.STATUS_PENDING),
-            jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY)})}))
-        .append(" ").append(database.constructOffsetLimitClause(0,1));
+        IResultSet confirmSet = database.performQuery(sb.toString(),list,null,null,1,null);
 
-      confirmSet = database.performQuery(sb.toString(),list,null,null,1,null);
+        if (confirmSet.getRowCount() > 0)
+          continue;
 
-      if (confirmSet.getRowCount() > 0)
-      {
-        // This job needs to re-enter the active state.  Make that happen.
-        jobs.returnJobToActive(jobID);
-        if (Logging.jobs.isDebugEnabled())
+        // The shutting-down phase is complete.  However, we need to check if there are any
outstanding
+        // PENDING or PENDINGPURGATORY records before we can decide what to do.
+        sb = new StringBuilder("SELECT ");
+        list.clear();
+            
+        sb.append(jobQueue.idField).append(" FROM ").append(jobQueue.getTableName()).append("
WHERE ")
+          .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+            new UnitaryClause(jobQueue.jobIDField,jobID),
+            new MultiClause(jobQueue.statusField,new Object[]{
+              jobQueue.statusToString(jobQueue.STATUS_PENDING),
+              jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY)})}))
+          .append(" ").append(database.constructOffsetLimitClause(0,1));
+
+        confirmSet = database.performQuery(sb.toString(),list,null,null,1,null);
+
+        if (confirmSet.getRowCount() > 0)
         {
-          Logging.jobs.debug("Job "+jobID+" is re-entering active state");
+          // This job needs to re-enter the active state.  Make that happen.
+          jobs.returnJobToActive(jobID);
+          if (Logging.jobs.isDebugEnabled())
+          {
+            Logging.jobs.debug("Job "+jobID+" is re-entering active state");
+          }
         }
-      }
-      else
-      {
-        // This job should be marked as finished.
-        IJobDescription jobDesc = jobs.load(jobID,true);
-        resetJobs.add(jobDesc);
-            
-        jobs.finishJob(jobID,currentTime);
-        if (Logging.jobs.isDebugEnabled())
+        else
         {
-          Logging.jobs.debug("Job "+jobID+" now completed");
+          // This job should be marked as finished.
+          IJobDescription jobDesc = jobs.load(jobID,true);
+          resetJobs.add(jobDesc);
+              
+          jobs.finishJob(jobID,currentTime);
+          if (Logging.jobs.isDebugEnabled())
+          {
+            Logging.jobs.debug("Job "+jobID+" now completed");
+          }
         }
       }
     }
+    finally
+    {
+      lockManager.leaveWriteLock(jobResetLock);
+    }
   }
 
   



Mime
View raw message