manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1543421 - in /manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler: interfaces/ jobs/
Date Tue, 19 Nov 2013 13:18:45 GMT
Author: kwright
Date: Tue Nov 19 13:18:45 2013
New Revision: 1543421

URL: http://svn.apache.org/r1543421
Log:
Add hooks for individual process cleanup and full cluster cleanup.

Modified:
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Carrydown.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/EventManager.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/IntrinsicLink.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1543421&r1=1543420&r2=1543421&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
Tue Nov 19 13:18:45 2013
@@ -136,6 +136,22 @@ public interface IJobManager
   // The job queue is maintained underneath this interface, and all threads that perform
   // job activities need to go through this layer.
 
+  /** Reset the job queue for an individual process ID.
+  * If a node was shut down in the middle of doing something, sufficient information should
+  * be around in the database to allow the node's activities to be cleaned up.
+  *@param processID is the process ID of the node we want to clean up after.
+  */
+  public void cleanupProcessData(String processID)
+    throws ManifoldCFException;
+    
+  /** Prepare to start the entire cluster.
+  * If there are no other nodes alive, then at the time the first node comes up, we need
to
+  * reset the job queue for ALL processes that had been running before.  This method can
+  * be called in lieu of prepareForStart().
+  */
+  public void prepareForClusterStart()
+    throws ManifoldCFException;
+
   /** Reset the job queue immediately before starting up.
   * If the system was shut down in the middle of a job, sufficient information should
   * be around in the database to allow it to restart.  However, BEFORE all the job threads

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Carrydown.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Carrydown.java?rev=1543421&r1=1543420&r2=1543421&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Carrydown.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Carrydown.java
Tue Nov 19 13:18:45 2013
@@ -128,7 +128,7 @@ public class Carrydown extends org.apach
 
       IndexDescription uniqueIndex = new IndexDescription(true,new String[]{jobIDField,parentIDHashField,childIDHashField,dataNameField,dataValueHashField});
       IndexDescription jobChildDataIndex = new IndexDescription(false,new String[]{jobIDField,childIDHashField,dataNameField});
-      IndexDescription newIndex = new IndexDescription(false,new String[]{newField});
+      IndexDescription newIndex = new IndexDescription(false,new String[]{newField,processIDField});
 
       Map indexes = getTableIndexes(null,null);
       Iterator iter = indexes.keySet().iterator();
@@ -207,8 +207,9 @@ public class Carrydown extends org.apach
   //
 
   /** Reset, at startup time.
+  *@param processID is the process ID.
   */
-  public void reset()
+  public void restart(String processID)
     throws ManifoldCFException
   {
     // Delete "new" rows
@@ -216,7 +217,7 @@ public class Carrydown extends org.apach
     ArrayList list = new ArrayList();
     String query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(newField,statusToString(ISNEW_NEW)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     performDelete("WHERE "+query,list,null);
 
     // Convert "existing" rows to base
@@ -224,7 +225,27 @@ public class Carrydown extends org.apach
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(newField,statusToString(ISNEW_EXISTING)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
+    performUpdate(map,"WHERE "+query,list,null);
+  }
+
+  /** Reset, at startup time, entire cluster
+  */
+  public void restartCluster()
+    throws ManifoldCFException
+  {
+    // Delete "new" rows
+    HashMap map = new HashMap();
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(newField,statusToString(ISNEW_NEW))});
+    performDelete("WHERE "+query,list,null);
+
+    // Convert "existing" rows to base
+    map.put(newField,statusToString(ISNEW_BASE));
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(newField,statusToString(ISNEW_EXISTING))});
     performUpdate(map,"WHERE "+query,list,null);
   }
 

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/EventManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/EventManager.java?rev=1543421&r1=1543420&r2=1543421&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/EventManager.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/EventManager.java
Tue Nov 19 13:18:45 2013
@@ -21,6 +21,7 @@ package org.apache.manifoldcf.crawler.jo
 import org.apache.manifoldcf.core.interfaces.*;
 import org.apache.manifoldcf.crawler.interfaces.*;
 import org.apache.manifoldcf.crawler.interfaces.CacheKeyFactory;
+import org.apache.manifoldcf.crawler.system.ManifoldCF;
 import java.util.*;
 
 /** This class manages the events table.
@@ -33,6 +34,7 @@ import java.util.*;
 * <tr class="TableHeadingColor">
 * <th>Field</th><th>Type</th><th>Description&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;</th>
 * <tr><td>name</td><td>VARCHAR(255)</td><td>Primary Key</td></tr>
+* <tr><td>processid</td><td>VARCHAR(16)</td><td></td></tr>
 * </table>
 * <br><br>
 * 
@@ -43,7 +45,8 @@ public class EventManager extends org.ap
 
   // Field names
   public final static String eventNameField = "name";
-
+  public final static String processIDField = "processid";
+  
   /** Constructor.
   *@param database is the database handle.
   */
@@ -66,14 +69,41 @@ public class EventManager extends org.ap
       {
         HashMap map = new HashMap();
         map.put(eventNameField,new ColumnDescription("VARCHAR(255)",true,false,null,null,false));
+        map.put(processIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
         performCreate(map,null);
       }
       else
       {
         // Upgrade goes here if needed
+        if (existing.get(processIDField) == null)
+        {
+          Map insertMap = new HashMap();
+          insertMap.put(processIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
+          performAlter(insertMap,null,null,null);
+        }
       }
 
       // Index management goes here
+      IndexDescription processIDIndex = new IndexDescription(false,new String[]{processIDField});
+      // Get rid of unused indexes
+      Map indexes = getTableIndexes(null,null);
+      Iterator iter = indexes.keySet().iterator();
+      while (iter.hasNext())
+      {
+        String indexName = (String)iter.next();
+        IndexDescription id = (IndexDescription)indexes.get(indexName);
+
+        if (processIDIndex != null && id.equals(processIDIndex))
+          processIDIndex = null;
+        else if (indexName.indexOf("_pkey") == -1)
+          // This index shouldn't be here; drop it
+          performRemoveIndex(indexName);
+      }
+
+      // Build missing indexes
+
+      if (processIDIndex != null)
+        performAddIndex(null,processIDIndex);
 
       break;
     }
@@ -84,42 +114,37 @@ public class EventManager extends org.ap
   public void deinstall()
     throws ManifoldCFException
   {
-    beginTransaction();
-    try
-    {
-      performDrop(null);
-    }
-    catch (ManifoldCFException e)
-    {
-      signalRollback();
-      throw e;
-    }
-    catch (Error e)
-    {
-      signalRollback();
-      throw e;
-    }
-    finally
-    {
-      endTransaction();
-    }
+    performDrop(null);
   }
 
   /** Prepare for restart.
+  *@param processID is the processID to restart.
   */
-  public void restart()
+  public void restart(String processID)
     throws ManifoldCFException
   {
-    // Delete all rows in this table.
-    performDelete("",null,null);
+    // Delete all rows in this table matching the processID
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(processIDField,processID)});
+    performDelete("WHERE "+query,null,null);
   }
 
+  /** Restart cluster.
+  */
+  public void restartCluster()
+    throws ManifoldCFException
+  {
+    performDelete("",null,null);
+  }
+  
   /** Atomically create an event - and return false if the event already exists */
   public void createEvent(String eventName)
     throws ManifoldCFException
   {
     HashMap map = new HashMap();
     map.put(eventNameField,eventName);
+    map.put(processIDField,ManifoldCF.getProcessID());
     performInsert(map,null);
   }
 

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java?rev=1543421&r1=1543420&r2=1543421&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java
Tue Nov 19 13:18:45 2013
@@ -302,13 +302,22 @@ public class HopCount extends org.apache
   }
 
   /** Reset, at startup time.
+  *@param processID is the process ID.
   */
-  public void reset()
+  public void restart(String processID)
     throws ManifoldCFException
   {
-    intrinsicLinkManager.reset();
+    intrinsicLinkManager.restart(processID);
   }
 
+  /** Restart entire cluster.
+  */
+  public void restartCluster()
+    throws ManifoldCFException
+  {
+    intrinsicLinkManager.restartCluster();
+  }
+  
   /** Record a references from a set of documents to the root.  These will be marked as "new"
or "existing", and
   * will have a null linktype.
   */

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/IntrinsicLink.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/IntrinsicLink.java?rev=1543421&r1=1543420&r2=1543421&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/IntrinsicLink.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/IntrinsicLink.java
Tue Nov 19 13:18:45 2013
@@ -118,7 +118,7 @@ public class IntrinsicLink extends org.a
       // Indexes
       IndexDescription uniqueIndex = new IndexDescription(true,new String[]{jobIDField,parentIDHashField,linkTypeField,childIDHashField});
       IndexDescription jobChildNewIndex = new IndexDescription(false,new String[]{jobIDField,childIDHashField,newField});
-      IndexDescription newIndex = new IndexDescription(false,new String[]{newField});
+      IndexDescription newIndex = new IndexDescription(false,new String[]{newField,processIDField});
 
       Map indexes = getTableIndexes(null,null);
       Iterator iter = indexes.keySet().iterator();
@@ -188,8 +188,9 @@ public class IntrinsicLink extends org.a
   * of documents, and cached records of hopcount are updated only when requested, it is safest
to simply
   * move any "new" or "new existing" links back to base state on startup.  Then, the next
time that page
   * is processed, the links will be updated properly.
+  *@param processID is the process to restart.
   */
-  public void reset()
+  public void restart(String processID)
     throws ManifoldCFException
   {
     HashMap map = new HashMap();
@@ -199,10 +200,25 @@ public class IntrinsicLink extends org.a
       new MultiClause(newField,new Object[]{
         statusToString(LINKSTATUS_NEW),
         statusToString(LINKSTATUS_EXISTING)}),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     performUpdate(map,"WHERE "+query,list,null);
   }
 
+  /** Restart entire cluster.
+  */
+  public void restartCluster()
+    throws ManifoldCFException
+  {
+    HashMap map = new HashMap();
+    map.put(newField,statusToString(LINKSTATUS_BASE));
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new MultiClause(newField,new Object[]{
+        statusToString(LINKSTATUS_NEW),
+        statusToString(LINKSTATUS_EXISTING)})});
+    performUpdate(map,"WHERE "+query,list,null);
+  }
+  
   /** Record a references from source to targets.  These references will be marked as either
"new" or "existing".
   *@return the target document ID's that are considered "new".
   */

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1543421&r1=1543420&r2=1543421&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
Tue Nov 19 13:18:45 2013
@@ -620,18 +620,15 @@ public class JobManager implements IJobM
   // The job queue is maintained underneath this interface, and all threads that perform
   // job activities need to go through this layer.
 
-  /** Reset the job queue immediately after starting up.
-  * If the system was shut down in the middle of a job, sufficient information should
-  * be around in the database to allow it to restart.  However, BEFORE all the job threads
-  * are spun up, there needs to be a pass over the queue to bring things back to a "normal"
-  * state.
-  * Also, if a job's status is in a state that indicates it was being processed by a thread
-  * (which is now dead), then we have to set that status back to previous value.
+  /** Reset the job queue for an individual process ID.
+  * If a node was shut down in the middle of doing something, sufficient information should
+  * be around in the database to allow the node's activities to be cleaned up.
+  *@param processID is the process ID of the node we want to clean up after.
   */
-  public void prepareForStart()
+  public void cleanupProcessData(String processID)
     throws ManifoldCFException
   {
-    Logging.jobs.debug("Resetting due to restart");
+    Logging.jobs.debug("Cleaning up process data for process '"+processID+"'");
     while (true)
     {
       long sleepAmt = 0L;
@@ -639,19 +636,19 @@ public class JobManager implements IJobM
       try
       {
         // Clean up events
-        eventManager.restart();
+        eventManager.restart(processID);
         // Clean up job queue
-        jobQueue.restart();
+        jobQueue.restart(processID);
         // Clean up jobs
-        jobs.restart();
+        jobs.restart(processID);
         // Clean up hopcount stuff
-        hopCount.reset();
+        hopCount.restart(processID);
         // Clean up carrydown stuff
-        carryDown.reset();
+        carryDown.restart(processID);
         TrackerClass.notePrecommit();
         database.performCommit();
         TrackerClass.noteCommit();
-        Logging.jobs.debug("Reset complete");
+        Logging.jobs.debug("Cleanup complete");
         break;
       }
       catch (ManifoldCFException e)
@@ -680,6 +677,79 @@ public class JobManager implements IJobM
       }
     }
   }
+    
+  /** Prepare to start the entire cluster.
+  * If there are no other nodes alive, then at the time the first node comes up, we need
to
+  * reset the job queue for ALL processes that had been running before.  This method can
+  * be called in lieu of prepareForStart().
+  */
+  public void prepareForClusterStart()
+    throws ManifoldCFException
+  {
+    Logging.jobs.debug("Starting cluster");
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        // Clean up events
+        eventManager.restartCluster();
+        // Clean up job queue
+        jobQueue.restartCluster();
+        // Clean up jobs
+        jobs.restartCluster();
+        // Clean up hopcount stuff
+        hopCount.restartCluster();
+        // Clean up carrydown stuff
+        carryDown.restartCluster();
+        TrackerClass.notePrecommit();
+        database.performCommit();
+        TrackerClass.noteCommit();
+        Logging.jobs.debug("Cluster start complete");
+        break;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        TrackerClass.noteRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted transaction starting cluster: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        TrackerClass.noteRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+  }
+
+
+  /** Reset the job queue immediately after starting up.
+  * If the system was shut down in the middle of a job, sufficient information should
+  * be around in the database to allow it to restart.  However, BEFORE all the job threads
+  * are spun up, there needs to be a pass over the queue to bring things back to a "normal"
+  * state.
+  * Also, if a job's status is in a state that indicates it was being processed by a thread
+  * (which is now dead), then we have to set that status back to previous value.
+  */
+  public void prepareForStart()
+    throws ManifoldCFException
+  {
+    cleanupProcessData(ManifoldCF.getProcessID());
+  }
 
   /** Reset as part of restoring document worker threads.
   */

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java?rev=1543421&r1=1543420&r2=1543421&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
Tue Nov 19 13:18:45 2013
@@ -354,8 +354,9 @@ public class JobQueue extends org.apache
   /** Restart.
   * This method should be called at initial startup time.  It resets the status of all documents
to something
   * reasonable, so the jobs can be restarted and work properly to completion.
+  *@param processID is the processID to clean up after.
   */
-  public void restart()
+  public void restart(String processID)
     throws ManifoldCFException
   {
     // Map ACTIVE back to PENDING.
@@ -367,7 +368,7 @@ public class JobQueue extends org.apache
       new MultiClause(statusField,new Object[]{
         statusToString(STATUS_ACTIVE),
         statusToString(STATUS_ACTIVENEEDRESCAN)}),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     performUpdate(map,"WHERE "+query,list,null);
 
     // Map ACTIVEPURGATORY to PENDINGPURGATORY
@@ -378,7 +379,7 @@ public class JobQueue extends org.apache
       new MultiClause(statusField,new Object[]{
         statusToString(STATUS_ACTIVEPURGATORY),
         statusToString(STATUS_ACTIVENEEDRESCANPURGATORY)}),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     performUpdate(map,"WHERE "+query,list,null);
 
     // Map BEINGDELETED to ELIGIBLEFORDELETE
@@ -388,7 +389,7 @@ public class JobQueue extends org.apache
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_BEINGDELETED)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     performUpdate(map,"WHERE "+query,list,null);
 
     // Map BEINGCLEANED to PURGATORY
@@ -398,7 +399,7 @@ public class JobQueue extends org.apache
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_BEINGCLEANED)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     performUpdate(map,"WHERE "+query,list,null);
 
     // Map newseed fields to seed
@@ -408,25 +409,85 @@ public class JobQueue extends org.apache
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(isSeedField,seedstatusToString(SEEDSTATUS_NEWSEED)),
-      new UnitaryClause(seedingProcessIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(seedingProcessIDField,processID)});
+    performUpdate(map,"WHERE "+query,list,null);
+
+    // Reindex the jobqueue table, since we've probably made lots of bad tuples doing the
above operations.
+    reindexTable();
+    unconditionallyAnalyzeTables();
+
+    TrackerClass.noteGlobalChange("Restart");
+  }
+
+  /** Restart for entire cluster.
+  */
+  public void restartCluster()
+    throws ManifoldCFException
+  {
+    // Map ACTIVE back to PENDING.
+    HashMap map = new HashMap();
+    map.put(statusField,statusToString(STATUS_PENDING));
+    map.put(processIDField,null);
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new MultiClause(statusField,new Object[]{
+        statusToString(STATUS_ACTIVE),
+        statusToString(STATUS_ACTIVENEEDRESCAN)})});
+    performUpdate(map,"WHERE "+query,list,null);
+
+    // Map ACTIVEPURGATORY to PENDINGPURGATORY
+    map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
+    map.put(processIDField,null);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new MultiClause(statusField,new Object[]{
+        statusToString(STATUS_ACTIVEPURGATORY),
+        statusToString(STATUS_ACTIVENEEDRESCANPURGATORY)})});
+    performUpdate(map,"WHERE "+query,list,null);
+
+    // Map BEINGDELETED to ELIGIBLEFORDELETE
+    map.put(statusField,statusToString(STATUS_ELIGIBLEFORDELETE));
+    map.put(processIDField,null);
+    map.put(checkTimeField,new Long(0L));
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_BEINGDELETED))});
+    performUpdate(map,"WHERE "+query,list,null);
+
+    // Map BEINGCLEANED to PURGATORY
+    map.put(statusField,statusToString(STATUS_PURGATORY));
+    map.put(processIDField,null);
+    map.put(checkTimeField,new Long(0L));
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_BEINGCLEANED))});
+    performUpdate(map,"WHERE "+query,list,null);
+
+    // Map newseed fields to seed
+    map.clear();
+    map.put(isSeedField,seedstatusToString(SEEDSTATUS_SEED));
+    map.put(seedingProcessIDField,null);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(isSeedField,seedstatusToString(SEEDSTATUS_NEWSEED))});
     performUpdate(map,"WHERE "+query,list,null);
 
     // Clear out all failtime fields (since we obviously haven't been retrying whilst we
were not
     // running)
-    // ??? Figure out how to handle this in a multi-agents environment
     map.clear();
     map.put(failTimeField,null);
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new NullCheckClause(failTimeField,false)});
     performUpdate(map,"WHERE "+query,list,null);
+
     // Reindex the jobqueue table, since we've probably made lots of bad tuples doing the
above operations.
     reindexTable();
     unconditionallyAnalyzeTables();
 
-    TrackerClass.noteGlobalChange("Restart");
+    TrackerClass.noteGlobalChange("Restart cluster");
   }
-
+  
   /** Flip all records for a job that have status HOPCOUNTREMOVED back to PENDING.
   * NOTE: We need to actually schedule these!!!  so the following can't really work.  ???
   */

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1543421&r1=1543420&r2=1543421&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
Tue Nov 19 13:18:45 2013
@@ -368,6 +368,7 @@ public class Jobs extends org.apache.man
 
       // Index management
       IndexDescription statusIndex = new IndexDescription(false,new String[]{statusField,idField,priorityField});
+      IndexDescription statusProcessIndex = new IndexDescription(false,new String[]{statusField,processIDField});
       IndexDescription connectionIndex = new IndexDescription(false,new String[]{connectionNameField});
       IndexDescription outputIndex = new IndexDescription(false,new String[]{outputNameField});
 
@@ -381,6 +382,8 @@ public class Jobs extends org.apache.man
 
         if (statusIndex != null && id.equals(statusIndex))
           statusIndex = null;
+        else if (statusProcessIndex != null && id.equals(statusProcessIndex))
+          statusProcessIndex = null;
         else if (connectionIndex != null && id.equals(connectionIndex))
           connectionIndex = null;
         else if (outputIndex != null && id.equals(outputIndex))
@@ -393,6 +396,8 @@ public class Jobs extends org.apache.man
       // Add the ones we didn't find
       if (statusIndex != null)
         performAddIndex(null,statusIndex);
+      if (statusProcessIndex != null)
+        performAddIndex(null,statusProcessIndex);
       if (connectionIndex != null)
         performAddIndex(null,connectionIndex);
       if (outputIndex != null)
@@ -885,8 +890,9 @@ public class Jobs extends org.apache.man
   }
 
   /** This method is called on a restart.
+  *@param processID is the process to be restarting.
   */
-  public void restart()
+  public void restart(String processID)
     throws ManifoldCFException
   {
     StringSet invKey = new StringSet(getJobStatusKey());
@@ -897,7 +903,7 @@ public class Jobs extends org.apache.man
     // Starting up delete goes back to just being ready for delete
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_DELETESTARTINGUP)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_READYFORDELETE));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
@@ -906,7 +912,7 @@ public class Jobs extends org.apache.man
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_NOTIFYINGOFCOMPLETION)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_READYFORNOTIFY));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
@@ -917,7 +923,7 @@ public class Jobs extends org.apache.man
       new MultiClause(statusField,new Object[]{
         statusToString(STATUS_STARTINGUP),
         statusToString(STATUS_ABORTINGSTARTINGUP)}),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_READYFORSTARTUP));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
@@ -928,7 +934,7 @@ public class Jobs extends org.apache.man
       new MultiClause(statusField,new Object[]{
         statusToString(STATUS_STARTINGUPMINIMAL),
         statusToString(STATUS_ABORTINGSTARTINGUPMINIMAL)}),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_READYFORSTARTUPMINIMAL));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
@@ -937,7 +943,7 @@ public class Jobs extends org.apache.man
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSTARTINGUPFORRESTART)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_ABORTINGFORRESTART));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
@@ -946,7 +952,7 @@ public class Jobs extends org.apache.man
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSTARTINGUPFORRESTARTMINIMAL)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_ABORTINGFORRESTARTMINIMAL));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
@@ -955,98 +961,247 @@ public class Jobs extends org.apache.man
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_ACTIVE));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_PAUSINGSEEDING)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_PAUSING));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_ACTIVEWAITINGSEEDING)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_ACTIVEWAITING));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_PAUSINGWAITINGSEEDING)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_PAUSINGWAITING));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_RESUMINGSEEDING)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_RESUMING));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSEEDING)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_ABORTING));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_ABORTINGFORRESTARTSEEDING)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_ABORTINGFORRESTART));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_ABORTINGFORRESTARTSEEDINGMINIMAL)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_ABORTINGFORRESTARTMINIMAL));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_PAUSEDSEEDING)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_PAUSED));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_ACTIVEWAITSEEDING)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_ACTIVEWAIT));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_PAUSEDWAITSEEDING)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_PAUSEDWAIT));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_UNINSTALLED)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_ACTIVE_UNINSTALLED));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NOOUTPUT)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_ACTIVE_NOOUTPUT));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NEITHER)),
-      new UnitaryClause(processIDField,ManifoldCF.getProcessID())});
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ACTIVE_NEITHER));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+  }
+
+  /** This method is called on a restart of the entire cluster.
+  */
+  public void restartCluster()
+    throws ManifoldCFException
+  {
+    StringSet invKey = new StringSet(getJobStatusKey());
+    ArrayList list = new ArrayList();
+    HashMap map = new HashMap();
+    String query;
+      
+    // Starting up delete goes back to just being ready for delete
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_DELETESTARTINGUP))});
+    map.put(statusField,statusToString(STATUS_READYFORDELETE));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // Notifying of completion goes back to just being ready for notify
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_NOTIFYINGOFCOMPLETION))});
+    map.put(statusField,statusToString(STATUS_READYFORNOTIFY));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // Starting up or aborting starting up goes back to just being ready
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new MultiClause(statusField,new Object[]{
+        statusToString(STATUS_STARTINGUP),
+        statusToString(STATUS_ABORTINGSTARTINGUP)})});
+    map.put(statusField,statusToString(STATUS_READYFORSTARTUP));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // Starting up or aborting starting up goes back to just being ready
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new MultiClause(statusField,new Object[]{
+        statusToString(STATUS_STARTINGUPMINIMAL),
+        statusToString(STATUS_ABORTINGSTARTINGUPMINIMAL)})});
+    map.put(statusField,statusToString(STATUS_READYFORSTARTUPMINIMAL));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // Aborting starting up for restart state goes to ABORTINGFORRESTART
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSTARTINGUPFORRESTART))});
+    map.put(statusField,statusToString(STATUS_ABORTINGFORRESTART));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // Aborting starting up for restart state goes to ABORTINGFORRESTART
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSTARTINGUPFORRESTARTMINIMAL))});
+    map.put(statusField,statusToString(STATUS_ABORTINGFORRESTARTMINIMAL));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // All seeding values return to pre-seeding values
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING))});
+    map.put(statusField,statusToString(STATUS_ACTIVE));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_PAUSINGSEEDING))});
+    map.put(statusField,statusToString(STATUS_PAUSING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVEWAITINGSEEDING))});
+    map.put(statusField,statusToString(STATUS_ACTIVEWAITING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_PAUSINGWAITINGSEEDING))});
+    map.put(statusField,statusToString(STATUS_PAUSINGWAITING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_RESUMINGSEEDING))});
+    map.put(statusField,statusToString(STATUS_RESUMING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSEEDING))});
+    map.put(statusField,statusToString(STATUS_ABORTING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGFORRESTARTSEEDING))});
+    map.put(statusField,statusToString(STATUS_ABORTINGFORRESTART));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGFORRESTARTSEEDINGMINIMAL))});
+    map.put(statusField,statusToString(STATUS_ABORTINGFORRESTARTMINIMAL));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_PAUSEDSEEDING))});
+    map.put(statusField,statusToString(STATUS_PAUSED));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVEWAITSEEDING))});
+    map.put(statusField,statusToString(STATUS_ACTIVEWAIT));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_PAUSEDWAITSEEDING))});
+    map.put(statusField,statusToString(STATUS_PAUSEDWAIT));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_UNINSTALLED))});
+    map.put(statusField,statusToString(STATUS_ACTIVE_UNINSTALLED));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NOOUTPUT))});
+    map.put(statusField,statusToString(STATUS_ACTIVE_NOOUTPUT));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NEITHER))});
     map.put(statusField,statusToString(STATUS_ACTIVE_NEITHER));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);



Mime
View raw message