manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1205432 - in /incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler: interfaces/ jobs/ system/
Date Wed, 23 Nov 2011 15:17:34 GMT
Author: kwright
Date: Wed Nov 23 15:17:33 2011
New Revision: 1205432

URL: http://svn.apache.org/viewvc?rev=1205432&view=rev
Log:
Finish the job state rework.  This should complete the basic work for CONNECTORS-290, other
than debugging.

Modified:
    incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
    incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IRepositoryConnectionManager.java
    incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
    incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
    incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java
    incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobStartThread.java

Modified: incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1205432&r1=1205431&r2=1205432&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
(original)
+++ incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
Wed Nov 23 15:17:33 2011
@@ -835,6 +835,15 @@ public interface IJobManager
   public void finishJobStops(long timestamp, ArrayList modifiedJobs)
     throws ManifoldCFException;
 
+  /** Complete the sequence that resumes jobs, either from a pause or from a scheduling window
+  * wait.  The logic will restore the job to an active state (many possibilities depending
on
+  * connector status), and will record the jobs that have been so modified.
+  *@param timestamp is the current time in milliseconds since epoch.
+  *@param modifiedJobs is filled in with the set of IJobDescription objects that were resumed.
+  */
+  public void finishJobResumes(long timestamp, ArrayList modifiedJobs)
+    throws ManifoldCFException;
+    
   /** Put all eligible jobs in the "shutting down" state.
   */
   public void finishJobs()

Modified: incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IRepositoryConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IRepositoryConnectionManager.java?rev=1205432&r1=1205431&r2=1205432&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IRepositoryConnectionManager.java
(original)
+++ incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IRepositoryConnectionManager.java
Wed Nov 23 15:17:33 2011
@@ -125,19 +125,26 @@ public interface IRepositoryConnectionMa
 
   /** Start a job */
   public static final String ACTIVITY_JOBSTART = "job start";
-  /** Wait a job */
-  public static final String ACTIVITY_JOBWAIT = "job wait";
-  /** Continue a job */
-  public static final String ACTIVITY_JOBCONTINUE = "job continue";
   /** Finish a job */
   public static final String ACTIVITY_JOBEND = "job end";
-  /** Abort a job */
-  public static final String ACTIVITY_JOBABORT = "job abort";
-
+  /** Stop a job */
+  public static final String ACTIVITY_JOBSTOP = "job stop";
+  /** Continue a job */
+  public static final String ACTIVITY_JOBCONTINUE = "job continue";
+  /** Wait due to schedule */
+  public static final String ACTIVITY_JOBWAIT = "job wait";
+  /** Unwait due to schedule */
+  public static final String ACTIVITY_JOBUNWAIT = "job unwait";
+  
   /** The set of activity records. */
   public static final String[] activitySet = new String[]
   {
-    ACTIVITY_JOBSTART,ACTIVITY_JOBWAIT,ACTIVITY_JOBCONTINUE,ACTIVITY_JOBEND,ACTIVITY_JOBABORT
+    ACTIVITY_JOBSTART,
+    ACTIVITY_JOBSTOP,
+    ACTIVITY_JOBCONTINUE,
+    ACTIVITY_JOBWAIT,
+    ACTIVITY_JOBUNWAIT,
+    ACTIVITY_JOBEND
   };
 
   /** Record time-stamped information about the activity of the connection.  This information
can originate from

Modified: incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1205432&r1=1205431&r2=1205432&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
(original)
+++ incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
Wed Nov 23 15:17:33 2011
@@ -1472,26 +1472,26 @@ public class JobManager implements IJobM
 
     // Per CONNECTORS-290, we need to be leaving priorities blank for jobs that aren't using
them,
     // so this will be changed to not include jobs where the priorities have been bashed
to null.
-    // MHL
+    //
+    // I've included ALL states that might have non-null doc priorities.  This includes states
+    // corresponding to uninstalled connectors, since there is no transition that cleans
out the
+    // document priorities in these states.  The time during which a connector is uninstalled
is
+    // expected to be short, because typically this state is the result of an installation
procedure
+    // rather than willful action on the part of a user.
         
     sb.append("EXISTS(SELECT 'x' FROM ").append(jobs.getTableName()).append(" t1 WHERE ")
       .append(database.buildConjunctionClause(list,new ClauseDescription[]{
         new MultiClause("t1."+jobs.statusField,new Object[]{
-          Jobs.statusToString(Jobs.STATUS_ACTIVE),
-          Jobs.statusToString(Jobs.STATUS_PAUSED),
-          Jobs.statusToString(Jobs.STATUS_ACTIVEWAIT),
-          Jobs.statusToString(Jobs.STATUS_PAUSEDWAIT),
-          Jobs.statusToString(Jobs.STATUS_READYFORSTARTUP),
           Jobs.statusToString(Jobs.STATUS_STARTINGUP),
-          Jobs.statusToString(Jobs.STATUS_ABORTINGSTARTINGUPFORRESTART),
-          Jobs.statusToString(Jobs.STATUS_ABORTINGSTARTINGUP),
+          Jobs.statusToString(Jobs.STATUS_ACTIVE),
           Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING),
-          Jobs.statusToString(Jobs.STATUS_PAUSEDSEEDING),
-          Jobs.statusToString(Jobs.STATUS_ACTIVEWAITSEEDING),
-          Jobs.statusToString(Jobs.STATUS_PAUSEDWAITSEEDING),
-          Jobs.statusToString(Jobs.STATUS_ABORTING),
-          Jobs.statusToString(Jobs.STATUS_ABORTINGFORRESTART),
-          Jobs.statusToString(Jobs.STATUS_ABORTINGFORRESTARTSEEDING)}),
+          Jobs.statusToString(Jobs.STATUS_ACTIVE_UNINSTALLED),
+          Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_UNINSTALLED),
+          Jobs.statusToString(Jobs.STATUS_ACTIVE_NOOUTPUT),
+          Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_NOOUTPUT),
+          Jobs.statusToString(Jobs.STATUS_ACTIVE_NEITHER),
+          Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_NEITHER)
+          }),
         new JoinClause("t1."+jobs.idField,"t0."+jobQueue.jobIDField)}))
       .append(") ");
 
@@ -6205,6 +6205,48 @@ public class JobManager implements IJobM
     }
   }
   
+  /** Complete the sequence that resumes jobs, either from a pause or from a scheduling window
+  * wait.  The logic will restore the job to an active state (many possibilities depending
on
+  * connector status), and will record the jobs that have been so modified.
+  *@param timestamp is the current time in milliseconds since epoch.
+  *@param modifiedJobs is filled in with the set of IJobDescription objects that were resumed.
+  */
+  public void finishJobResumes(long timestamp, ArrayList modifiedJobs)
+    throws ManifoldCFException
+  {
+    // Do the first query, getting the candidate jobs to be considered
+    StringBuilder sb = new StringBuilder("SELECT ");
+    ArrayList list = new ArrayList();
+        
+    sb.append(jobs.idField)
+      .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+      .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+        new MultiClause(jobs.statusField,new Object[]{
+          jobs.statusToString(jobs.STATUS_RESUMING),
+          jobs.statusToString(jobs.STATUS_RESUMINGSEEDING)
+          })}));
+        
+    IResultSet set = database.performQuery(sb.toString(),list,null,null);
+
+    int i = 0;
+    while (i < set.getRowCount())
+    {
+      IResultRow row = set.getRow(i++);
+      Long jobID = (Long)row.getValue(jobs.idField);
+
+      // There are no secondary checks that need to be made; just resume
+      IJobDescription jobDesc = jobs.load(jobID,true);
+      modifiedJobs.add(jobDesc);
+
+      jobs.finishResumeJob(jobID,timestamp);
+          
+      if (Logging.jobs.isDebugEnabled())
+      {
+        Logging.jobs.debug("Resumed job "+jobID);
+      }
+    }
+  }
+
   /** Complete the sequence that stops jobs, either for abort, pause, or because of a scheduling
   * window.  The logic will move the job to its next state (INACTIVE, PAUSED, ACTIVEWAIT),
   * and will record the jobs that have been so modified.

Modified: incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1205432&r1=1205431&r2=1205432&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
(original)
+++ incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
Wed Nov 23 15:17:33 2011
@@ -1919,49 +1919,6 @@ public class Jobs extends org.apache.man
     }
   }
 
-  /* We will need in some method to transition from RESUMING and RESUMINGSEEDING to an active
method.  The code for that,
-  in part, is as follows:
-
-      switch (status)
-      {
-      case STATUS_RESUMING:
-        if (connectionMgr.checkConnectorExists(connectionName))
-        {
-          if (outputMgr.checkConnectorExists(outputName))
-            newStatus = STATUS_ACTIVE;
-          else
-            newStatus = STATUS_ACTIVE_NOOUTPUT;
-        }
-        else
-        {
-          if (outputMgr.checkConnectorExists(outputName))
-            newStatus = STATUS_ACTIVE_UNINSTALLED;
-          else
-            newStatus = STATUS_ACTIVE_NEITHER;
-        }
-        break;
-      case STATUS_RESUMINGSEEDING:
-        if (connectionMgr.checkConnectorExists(connectionName))
-        {
-          if (outputMgr.checkConnectorExists(outputName))
-            newStatus = STATUS_ACTIVESEEDING;
-          else
-            newStatus = STATUS_ACTIVESEEDING_NOOUTPUT;
-        }
-        else
-        {
-          if (outputMgr.checkConnectorExists(outputName))
-            newStatus = STATUS_ACTIVESEEDING_UNINSTALLED;
-          else
-            newStatus = STATUS_ACTIVESEEDING_NEITHER;
-        }
-        break;
-      default:
-        throw new ManifoldCFException("Job "+jobID+" is not paused");
-      }
-
-  */
-  
   /** Update a job's status, and its reseed time.
   *@param jobID is the job id.
   *@param status is the desired status.
@@ -2030,6 +1987,86 @@ public class Jobs extends org.apache.man
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
   }
 
+  /** Resume a stopped job (from a pause or activewait).
+  * Updates the job record in a manner consistent with the job's state.
+  */
+  public void finishResumeJob(Long jobID, long currentTime)
+    throws ManifoldCFException
+  {
+    beginTransaction();
+    try
+    {
+      // Get the current job status
+      ArrayList list = new ArrayList();
+      String query = buildConjunctionClause(list,new ClauseDescription[]{
+        new UnitaryClause(idField,jobID)});
+      IResultSet set = performQuery("SELECT "+statusField+","+connectionNameField+","+outputNameField+"
FROM "+getTableName()+
+        " WHERE "+query+" FOR UPDATE",list,null,null);
+      if (set.getRowCount() == 0)
+        throw new ManifoldCFException("Job does not exist: "+jobID);
+      IResultRow row = set.getRow(0);
+      int status = stringToStatus(row.getValue(statusField).toString());
+      String connectionName = (String)row.getValue(connectionNameField);
+      String outputName = (String)row.getValue(outputNameField);
+      int newStatus;
+      HashMap map = new HashMap();
+      switch (status)
+      {
+      case STATUS_RESUMING:
+        if (connectionMgr.checkConnectorExists(connectionName))
+        {
+          if (outputMgr.checkConnectorExists(outputName))
+            newStatus = STATUS_ACTIVE;
+          else
+            newStatus = STATUS_ACTIVE_NOOUTPUT;
+        }
+        else
+        {
+          if (outputMgr.checkConnectorExists(outputName))
+            newStatus = STATUS_ACTIVE_UNINSTALLED;
+          else
+            newStatus = STATUS_ACTIVE_NEITHER;
+        }
+        map.put(statusField,statusToString(newStatus));
+        break;
+      case STATUS_RESUMINGSEEDING:
+        if (connectionMgr.checkConnectorExists(connectionName))
+        {
+          if (outputMgr.checkConnectorExists(outputName))
+            newStatus = STATUS_ACTIVESEEDING;
+          else
+            newStatus = STATUS_ACTIVESEEDING_NOOUTPUT;
+        }
+        else
+        {
+          if (outputMgr.checkConnectorExists(outputName))
+            newStatus = STATUS_ACTIVESEEDING_UNINSTALLED;
+          else
+            newStatus = STATUS_ACTIVESEEDING_NEITHER;
+        }
+        map.put(statusField,statusToString(newStatus));
+        break;
+      default:
+        throw new ManifoldCFException("Unexpected value for job status: "+Integer.toString(status));
+      }
+      performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
+    }
+    catch (ManifoldCFException e)
+    {
+      signalRollback();
+      throw e;
+    }
+    catch (Error e)
+    {
+      signalRollback();
+      throw e;
+    }
+    finally
+    {
+      endTransaction();
+    }
+  }
+  
   /** Stop a job suddenly (abort, pause, activewait).
   * Updates the job record in a manner consistent with the job's state.
   */
@@ -2049,7 +2086,6 @@ public class Jobs extends org.apache.man
         throw new ManifoldCFException("Job does not exist: "+jobID);
       IResultRow row = set.getRow(0);
       int status = stringToStatus(row.getValue(statusField).toString());
-      int newStatus;
       HashMap map = new HashMap();
       switch (status)
       {

Modified: incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java?rev=1205432&r1=1205431&r2=1205432&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java
(original)
+++ incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java
Wed Nov 23 15:17:33 2011
@@ -63,6 +63,7 @@ public class JobResetThread extends Thre
         {
           // See if there are any completed jobs
           long currentTime = System.currentTimeMillis();
+          
           ArrayList jobStops = new ArrayList();
           jobManager.finishJobStops(currentTime,jobStops);
           int k = 0;
@@ -70,9 +71,21 @@ public class JobResetThread extends Thre
           {
             IJobDescription desc = (IJobDescription)jobStops.get(k++);
             connectionManager.recordHistory(desc.getConnectionName(),
-              null,connectionManager.ACTIVITY_JOBABORT,null,
+              null,connectionManager.ACTIVITY_JOBSTOP,null,
               desc.getID().toString()+"("+desc.getDescription()+")",null,null,null);
           }
+
+          ArrayList jobResumes = new ArrayList();
+          jobManager.finishJobResumes(currentTime,jobResumes);
+          k = 0;
+          while (k < jobResumes.size())
+          {
+            IJobDescription desc = (IJobDescription)jobResumes.get(k++);
+            connectionManager.recordHistory(desc.getConnectionName(),
+              null,connectionManager.ACTIVITY_JOBCONTINUE,null,
+              desc.getID().toString()+"("+desc.getDescription()+")",null,null,null);
+          }
+
           ArrayList jobCompletions = new ArrayList();
           jobManager.resetJobs(currentTime,jobCompletions);
           k = 0;
@@ -88,7 +101,7 @@ public class JobResetThread extends Thre
           // not predicted by the algorithm that assigned those priorities.  This is, of
course, quite expensive,
           // but it cannot be helped (at least, I cannot find a way to avoid it).
           //
-          if (jobStops.size() > 0)
+          if (jobStops.size() > 0 || jobResumes.size() > 0)
           {
             Logging.threads.debug("Job reset thread reprioritizing documents...");
 
@@ -97,8 +110,8 @@ public class JobResetThread extends Thre
             Logging.threads.debug("Job reset thread done reprioritizing documents.");
 
           }
-
-          ManifoldCF.sleep(10000L);
+          else
+            ManifoldCF.sleep(10000L);
         }
         catch (ManifoldCFException e)
         {

Modified: incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobStartThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobStartThread.java?rev=1205432&r1=1205431&r2=1205432&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobStartThread.java
(original)
+++ incubator/lcf/branches/CONNECTORS-290/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobStartThread.java
Wed Nov 23 15:17:33 2011
@@ -73,7 +73,7 @@ public class JobStartThread extends Thre
             Long jobID = (Long)unwaitJobs.get(k++);
             IJobDescription desc = jobManager.load(jobID);
             connectionManager.recordHistory(desc.getConnectionName(),
-              null,connectionManager.ACTIVITY_JOBCONTINUE,null,
+              null,connectionManager.ACTIVITY_JOBUNWAIT,null,
               desc.getID().toString()+"("+desc.getDescription()+")",null,null,null);
           }
           // Cause jobs out of window to stop.



Mime
View raw message