manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1545624 [4/5] - in /manifoldcf/trunk: ./ framework/agents/src/main/java/org/apache/manifoldcf/agents/ framework/agents/src/main/java/org/apache/manifoldcf/agents/agentmanager/ framework/agents/src/main/java/org/apache/manifoldcf/agents/int...
Date Tue, 26 Nov 2013 11:45:55 GMT
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1545624&r1=1545623&r2=1545624&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java Tue Nov 26 11:45:53 2013
@@ -21,6 +21,7 @@ package org.apache.manifoldcf.crawler.jo
 import org.apache.manifoldcf.core.interfaces.*;
 import org.apache.manifoldcf.agents.interfaces.*;
 import org.apache.manifoldcf.crawler.interfaces.*;
+import org.apache.manifoldcf.crawler.system.ManifoldCF;
 import org.apache.manifoldcf.crawler.interfaces.CacheKeyFactory;
 import java.util.*;
 
@@ -52,6 +53,7 @@ import java.util.*;
  * <tr><td>reseedinterval</td><td>BIGINT</td><td></td></tr>
  * <tr><td>reseedtime</td><td>BIGINT</td><td></td></tr>
  * <tr><td>hopcountmode</td><td>CHAR(1)</td><td></td></tr>
+ * <tr><td>processid</td><td>VARCHAR(16)</td><td></td></tr>
  * </table>
  * <br><br>
  * 
@@ -181,12 +183,13 @@ public class Jobs extends org.apache.man
   public final static String reseedTimeField = "reseedtime";
   /** For a job whose connector supports hopcounts, this describes how those hopcounts are handled. */
   public final static String hopcountModeField = "hopcountmode";
-
+  /** Process id field, for keeping track of which process owns transient state */
+  public final static String processIDField = "processid";
+  
   protected static Map statusMap;
   protected static Map typeMap;
   protected static Map startMap;
   protected static Map hopmodeMap;
-
   static
   {
     statusMap = new HashMap();
@@ -254,6 +257,36 @@ public class Jobs extends org.apache.man
     hopmodeMap.put("V",new Integer(HOPCOUNT_NEVERDELETE));
   }
 
+  /*
+  protected static Set<Integer> transientStates;
+  static
+  {
+    transientStates = new HashSet<Integer>();
+    transientStates.add(new Integer(STATUS_DELETESTARTINGUP));
+    transientStates.add(new Integer(STATUS_NOTIFYINGOFCOMPLETION));
+    transientStates.add(new Integer(STATUS_STARTINGUP));
+    transientStates.add(new Integer(STATUS_ABORTINGSTARTINGUP));
+    transientStates.add(new Integer(STATUS_STARTINGUPMINIMAL));
+    transientStates.add(new Integer(STATUS_ABORTINGSTARTINGUPMINIMAL));
+    transientStates.add(new Integer(STATUS_ABORTINGSTARTINGUPFORRESTART));
+    transientStates.add(new Integer(STATUS_ABORTINGSTARTINGUPFORRESTARTMINIMAL));
+    transientStates.add(new Integer(STATUS_ACTIVESEEDING));
+    transientStates.add(new Integer(STATUS_PAUSINGSEEDING));
+    transientStates.add(new Integer(STATUS_ACTIVEWAITINGSEEDING));
+    transientStates.add(new Integer(STATUS_PAUSINGWAITINGSEEDING));
+    transientStates.add(new Integer(STATUS_RESUMINGSEEDING));
+    transientStates.add(new Integer(STATUS_ABORTINGSEEDING));
+    transientStates.add(new Integer(STATUS_ABORTINGFORRESTARTSEEDING));
+    transientStates.add(new Integer(STATUS_ABORTINGFORRESTARTSEEDINGMINIMAL));
+    transientStates.add(new Integer(STATUS_PAUSEDSEEDING));
+    transientStates.add(new Integer(STATUS_ACTIVEWAITSEEDING));
+    transientStates.add(new Integer(STATUS_PAUSEDWAITSEEDING));
+    transientStates.add(new Integer(STATUS_ACTIVESEEDING_UNINSTALLED));
+    transientStates.add(new Integer(STATUS_ACTIVESEEDING_NOOUTPUT));
+    transientStates.add(new Integer(STATUS_ACTIVESEEDING_NEITHER));
+  }
+  */
+  
   // Local variables
   protected ICacheManager cacheManager;
   protected ScheduleManager scheduleManager;
@@ -316,11 +349,18 @@ public class Jobs extends org.apache.man
         map.put(reseedIntervalField,new ColumnDescription("BIGINT",false,true,null,null,false));
         map.put(reseedTimeField,new ColumnDescription("BIGINT",false,true,null,null,false));
         map.put(hopcountModeField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
+        map.put(processIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
         performCreate(map,null);
       }
       else
       {
         // Do any needed upgrades
+        if (existing.get(processIDField) == null)
+        {
+          Map insertMap = new HashMap();
+          insertMap.put(processIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
+          performAlter(insertMap,null,null,null);
+        }
       }
 
       // Handle related tables
@@ -330,6 +370,7 @@ public class Jobs extends org.apache.man
 
       // Index management
       IndexDescription statusIndex = new IndexDescription(false,new String[]{statusField,idField,priorityField});
+      IndexDescription statusProcessIndex = new IndexDescription(false,new String[]{statusField,processIDField});
       IndexDescription connectionIndex = new IndexDescription(false,new String[]{connectionNameField});
       IndexDescription outputIndex = new IndexDescription(false,new String[]{outputNameField});
 
@@ -343,6 +384,8 @@ public class Jobs extends org.apache.man
 
         if (statusIndex != null && id.equals(statusIndex))
           statusIndex = null;
+        else if (statusProcessIndex != null && id.equals(statusProcessIndex))
+          statusProcessIndex = null;
         else if (connectionIndex != null && id.equals(connectionIndex))
           connectionIndex = null;
         else if (outputIndex != null && id.equals(outputIndex))
@@ -355,6 +398,8 @@ public class Jobs extends org.apache.man
       // Add the ones we didn't find
       if (statusIndex != null)
         performAddIndex(null,statusIndex);
+      if (statusProcessIndex != null)
+        performAddIndex(null,statusProcessIndex);
       if (connectionIndex != null)
         performAddIndex(null,connectionIndex);
       if (outputIndex != null)
@@ -847,152 +892,328 @@ public class Jobs extends org.apache.man
   }
 
   /** This method is called on a restart.
+  *@param processID is the process to be restarting.
+  */
+  public void restart(String processID)
+    throws ManifoldCFException
+  {
+    StringSet invKey = new StringSet(getJobStatusKey());
+    ArrayList list = new ArrayList();
+    HashMap map = new HashMap();
+    String query;
+      
+    // Starting up delete goes back to just being ready for delete
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_DELETESTARTINGUP)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_READYFORDELETE));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // Notifying of completion goes back to just being ready for notify
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_NOTIFYINGOFCOMPLETION)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_READYFORNOTIFY));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // Starting up or aborting starting up goes back to just being ready
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new MultiClause(statusField,new Object[]{
+        statusToString(STATUS_STARTINGUP),
+        statusToString(STATUS_ABORTINGSTARTINGUP)}),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_READYFORSTARTUP));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // Starting up or aborting starting up goes back to just being ready
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new MultiClause(statusField,new Object[]{
+        statusToString(STATUS_STARTINGUPMINIMAL),
+        statusToString(STATUS_ABORTINGSTARTINGUPMINIMAL)}),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_READYFORSTARTUPMINIMAL));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // Aborting starting up for restart state goes to ABORTINGFORRESTART
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSTARTINGUPFORRESTART)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ABORTINGFORRESTART));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // Aborting starting up for restart state goes to ABORTINGFORRESTART
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSTARTINGUPFORRESTARTMINIMAL)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ABORTINGFORRESTARTMINIMAL));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // All seeding values return to pre-seeding values
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ACTIVE));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_PAUSINGSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_PAUSING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVEWAITINGSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ACTIVEWAITING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_PAUSINGWAITINGSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_PAUSINGWAITING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_RESUMINGSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_RESUMING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ABORTING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGFORRESTARTSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ABORTINGFORRESTART));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGFORRESTARTSEEDINGMINIMAL)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ABORTINGFORRESTARTMINIMAL));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_PAUSEDSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_PAUSED));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVEWAITSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ACTIVEWAIT));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_PAUSEDWAITSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_PAUSEDWAIT));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_UNINSTALLED)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ACTIVE_UNINSTALLED));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NOOUTPUT)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ACTIVE_NOOUTPUT));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NEITHER)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ACTIVE_NEITHER));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+  }
+
+  /** Clean up after all process IDs.
   */
   public void restart()
     throws ManifoldCFException
   {
-    beginTransaction();
-    try
-    {
-      StringSet invKey = new StringSet(getJobStatusKey());
-      ArrayList list = new ArrayList();
-      HashMap map = new HashMap();
-      String query;
+    StringSet invKey = new StringSet(getJobStatusKey());
+    ArrayList list = new ArrayList();
+    HashMap map = new HashMap();
+    String query;
       
-      // Starting up delete goes back to just being ready for delete
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_DELETESTARTINGUP))});
-      map.put(statusField,statusToString(STATUS_READYFORDELETE));
-      performUpdate(map,"WHERE "+query,list,invKey);
-
-      // Notifying of completion goes back to just being ready for notify
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_NOTIFYINGOFCOMPLETION))});
-      map.put(statusField,statusToString(STATUS_READYFORNOTIFY));
-      performUpdate(map,"WHERE "+query,list,invKey);
-
-      // Starting up or aborting starting up goes back to just being ready
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new MultiClause(statusField,new Object[]{
-          statusToString(STATUS_STARTINGUP),
-          statusToString(STATUS_ABORTINGSTARTINGUP)})});
-      map.put(statusField,statusToString(STATUS_READYFORSTARTUP));
-      performUpdate(map,"WHERE "+query,list,invKey);
-
-      // Starting up or aborting starting up goes back to just being ready
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new MultiClause(statusField,new Object[]{
-          statusToString(STATUS_STARTINGUPMINIMAL),
-          statusToString(STATUS_ABORTINGSTARTINGUPMINIMAL)})});
-      map.put(statusField,statusToString(STATUS_READYFORSTARTUPMINIMAL));
-      performUpdate(map,"WHERE "+query,list,invKey);
-
-      // Aborting starting up for restart state goes to ABORTINGFORRESTART
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSTARTINGUPFORRESTART))});
-      map.put(statusField,statusToString(STATUS_ABORTINGFORRESTART));
-      performUpdate(map,"WHERE "+query,list,invKey);
-
-      // Aborting starting up for restart state goes to ABORTINGFORRESTART
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSTARTINGUPFORRESTARTMINIMAL))});
-      map.put(statusField,statusToString(STATUS_ABORTINGFORRESTARTMINIMAL));
-      performUpdate(map,"WHERE "+query,list,invKey);
-
-      // All seeding values return to pre-seeding values
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING))});
-      map.put(statusField,statusToString(STATUS_ACTIVE));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_PAUSINGSEEDING))});
-      map.put(statusField,statusToString(STATUS_PAUSING));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ACTIVEWAITINGSEEDING))});
-      map.put(statusField,statusToString(STATUS_ACTIVEWAITING));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_PAUSINGWAITINGSEEDING))});
-      map.put(statusField,statusToString(STATUS_PAUSINGWAITING));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_RESUMINGSEEDING))});
-      map.put(statusField,statusToString(STATUS_RESUMING));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSEEDING))});
-      map.put(statusField,statusToString(STATUS_ABORTING));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ABORTINGFORRESTARTSEEDING))});
-      map.put(statusField,statusToString(STATUS_ABORTINGFORRESTART));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ABORTINGFORRESTARTSEEDINGMINIMAL))});
-      map.put(statusField,statusToString(STATUS_ABORTINGFORRESTARTMINIMAL));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_PAUSEDSEEDING))});
-      map.put(statusField,statusToString(STATUS_PAUSED));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ACTIVEWAITSEEDING))});
-      map.put(statusField,statusToString(STATUS_ACTIVEWAIT));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_PAUSEDWAITSEEDING))});
-      map.put(statusField,statusToString(STATUS_PAUSEDWAIT));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_UNINSTALLED))});
-      map.put(statusField,statusToString(STATUS_ACTIVE_UNINSTALLED));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NOOUTPUT))});
-      map.put(statusField,statusToString(STATUS_ACTIVE_NOOUTPUT));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NEITHER))});
-      map.put(statusField,statusToString(STATUS_ACTIVE_NEITHER));
-      performUpdate(map,"WHERE "+query,list,invKey);
+    // Starting up delete goes back to just being ready for delete
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_DELETESTARTINGUP))});
+    map.put(statusField,statusToString(STATUS_READYFORDELETE));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // Notifying of completion goes back to just being ready for notify
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_NOTIFYINGOFCOMPLETION))});
+    map.put(statusField,statusToString(STATUS_READYFORNOTIFY));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // Starting up or aborting starting up goes back to just being ready
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new MultiClause(statusField,new Object[]{
+        statusToString(STATUS_STARTINGUP),
+        statusToString(STATUS_ABORTINGSTARTINGUP)})});
+    map.put(statusField,statusToString(STATUS_READYFORSTARTUP));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // Starting up or aborting starting up goes back to just being ready
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new MultiClause(statusField,new Object[]{
+        statusToString(STATUS_STARTINGUPMINIMAL),
+        statusToString(STATUS_ABORTINGSTARTINGUPMINIMAL)})});
+    map.put(statusField,statusToString(STATUS_READYFORSTARTUPMINIMAL));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // Aborting starting up for restart state goes to ABORTINGFORRESTART
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSTARTINGUPFORRESTART))});
+    map.put(statusField,statusToString(STATUS_ABORTINGFORRESTART));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // Aborting starting up for restart state goes to ABORTINGFORRESTART
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSTARTINGUPFORRESTARTMINIMAL))});
+    map.put(statusField,statusToString(STATUS_ABORTINGFORRESTARTMINIMAL));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
+    // All seeding values return to pre-seeding values
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING))});
+    map.put(statusField,statusToString(STATUS_ACTIVE));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_PAUSINGSEEDING))});
+    map.put(statusField,statusToString(STATUS_PAUSING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVEWAITINGSEEDING))});
+    map.put(statusField,statusToString(STATUS_ACTIVEWAITING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_PAUSINGWAITINGSEEDING))});
+    map.put(statusField,statusToString(STATUS_PAUSINGWAITING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_RESUMINGSEEDING))});
+    map.put(statusField,statusToString(STATUS_RESUMING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSEEDING))});
+    map.put(statusField,statusToString(STATUS_ABORTING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGFORRESTARTSEEDING))});
+    map.put(statusField,statusToString(STATUS_ABORTINGFORRESTART));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGFORRESTARTSEEDINGMINIMAL))});
+    map.put(statusField,statusToString(STATUS_ABORTINGFORRESTARTMINIMAL));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_PAUSEDSEEDING))});
+    map.put(statusField,statusToString(STATUS_PAUSED));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVEWAITSEEDING))});
+    map.put(statusField,statusToString(STATUS_ACTIVEWAIT));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_PAUSEDWAITSEEDING))});
+    map.put(statusField,statusToString(STATUS_PAUSEDWAIT));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_UNINSTALLED))});
+    map.put(statusField,statusToString(STATUS_ACTIVE_UNINSTALLED));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NOOUTPUT))});
+    map.put(statusField,statusToString(STATUS_ACTIVE_NOOUTPUT));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NEITHER))});
+    map.put(statusField,statusToString(STATUS_ACTIVE_NEITHER));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
 
-      // No need to do anything to the queue; it looks like it can take care of
-      // itself.
-    }
-    catch (ManifoldCFException e)
-    {
-      signalRollback();
-      throw e;
-    }
-    catch (Error e)
-    {
-      signalRollback();
-      throw e;
-    }
-    finally
-    {
-      endTransaction();
-    }
+  }
+
+  public void restartCluster()
+    throws ManifoldCFException
+  {
+    // Does nothing
   }
 
   /** Signal to a job that its underlying output connector has gone away.
@@ -1228,7 +1449,7 @@ public class Jobs extends org.apache.man
 
   /** Reset delete startup worker thread status.
   */
-  public void resetDeleteStartupWorkerStatus()
+  public void resetDeleteStartupWorkerStatus(String processID)
     throws ManifoldCFException
   {
     // This handles everything that the delete startup thread would resolve.
@@ -1236,15 +1457,17 @@ public class Jobs extends org.apache.man
     ArrayList list = new ArrayList();
     HashMap map = new HashMap();
     String query = buildConjunctionClause(list,new ClauseDescription[]{
-      new UnitaryClause(statusField,statusToString(STATUS_DELETESTARTINGUP))});
+      new UnitaryClause(statusField,statusToString(STATUS_DELETESTARTINGUP)),
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_READYFORDELETE));
+    map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
 
   }
   
   /** Reset notification worker thread status.
   */
-  public void resetNotificationWorkerStatus()
+  public void resetNotificationWorkerStatus(String processID)
     throws ManifoldCFException
   {
     // This resets everything that the job notification thread would resolve.
@@ -1252,15 +1475,17 @@ public class Jobs extends org.apache.man
     ArrayList list = new ArrayList();
     HashMap map = new HashMap();
     String query = buildConjunctionClause(list,new ClauseDescription[]{
-      new UnitaryClause(statusField,statusToString(STATUS_NOTIFYINGOFCOMPLETION))});
+      new UnitaryClause(statusField,statusToString(STATUS_NOTIFYINGOFCOMPLETION)),
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_READYFORNOTIFY));
+    map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
 
   }
   
   /** Reset startup worker thread status.
   */
-  public void resetStartupWorkerStatus()
+  public void resetStartupWorkerStatus(String processID)
     throws ManifoldCFException
   {
     // We have to handle all states that the startup thread would resolve, and change them to something appropriate.
@@ -1271,137 +1496,157 @@ public class Jobs extends org.apache.man
 
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
-      new UnitaryClause(statusField,statusToString(STATUS_STARTINGUP))});
+      new UnitaryClause(statusField,statusToString(STATUS_STARTINGUP)),
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_READYFORSTARTUP));
+    map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
 
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
-      new UnitaryClause(statusField,statusToString(STATUS_STARTINGUPMINIMAL))});
+      new UnitaryClause(statusField,statusToString(STATUS_STARTINGUPMINIMAL)),
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_READYFORSTARTUPMINIMAL));
+    map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
 
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
       new MultiClause(statusField,new Object[]{
         statusToString(STATUS_ABORTINGSTARTINGUP),
-        statusToString(STATUS_ABORTINGSTARTINGUPMINIMAL)})});
+        statusToString(STATUS_ABORTINGSTARTINGUPMINIMAL)}),
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_ABORTING));
+    map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
 
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
-      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSTARTINGUPFORRESTART))});
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSTARTINGUPFORRESTART)),
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_ABORTINGFORRESTART));
+    map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
 
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
-      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSTARTINGUPFORRESTARTMINIMAL))});
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSTARTINGUPFORRESTARTMINIMAL)),
+      new UnitaryClause(processIDField,processID)});
     map.put(statusField,statusToString(STATUS_ABORTINGFORRESTARTMINIMAL));
+    map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
 
   }
 
   /** Reset as part of restoring seeding worker threads.
   */
-  public void resetSeedingWorkerStatus()
+  public void resetSeedingWorkerStatus(String processID)
     throws ManifoldCFException
   {
-    beginTransaction();
-    try
-    {
-      StringSet invKey = new StringSet(getJobStatusKey());
-      ArrayList list = new ArrayList();
-      HashMap map = new HashMap();
-      String query;
-      // All seeding values return to pre-seeding values
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING))});
-      map.put(statusField,statusToString(STATUS_ACTIVE));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_PAUSINGSEEDING))});
-      map.put(statusField,statusToString(STATUS_PAUSING));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ACTIVEWAITINGSEEDING))});
-      map.put(statusField,statusToString(STATUS_ACTIVEWAITING));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_PAUSINGWAITINGSEEDING))});
-      map.put(statusField,statusToString(STATUS_PAUSINGWAITING));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_RESUMINGSEEDING))});
-      map.put(statusField,statusToString(STATUS_RESUMING));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSEEDING))});
-      map.put(statusField,statusToString(STATUS_ABORTING));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ABORTINGFORRESTARTSEEDING))});
-      map.put(statusField,statusToString(STATUS_ABORTINGFORRESTART));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ABORTINGFORRESTARTSEEDINGMINIMAL))});
-      map.put(statusField,statusToString(STATUS_ABORTINGFORRESTARTMINIMAL));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_PAUSEDSEEDING))});
-      map.put(statusField,statusToString(STATUS_PAUSED));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ACTIVEWAITSEEDING))});
-      map.put(statusField,statusToString(STATUS_ACTIVEWAIT));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_PAUSEDWAITSEEDING))});
-      map.put(statusField,statusToString(STATUS_PAUSEDWAIT));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_UNINSTALLED))});
-      map.put(statusField,statusToString(STATUS_ACTIVE_UNINSTALLED));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NOOUTPUT))});
-      map.put(statusField,statusToString(STATUS_ACTIVE_NOOUTPUT));
-      performUpdate(map,"WHERE "+query,list,invKey);
-      list.clear();
-      query = buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NEITHER))});
-      map.put(statusField,statusToString(STATUS_ACTIVE_NEITHER));
-      performUpdate(map,"WHERE "+query,list,invKey);
+    StringSet invKey = new StringSet(getJobStatusKey());
+    ArrayList list = new ArrayList();
+    HashMap map = new HashMap();
+    String query;
+    // All seeding values return to pre-seeding values
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ACTIVE));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_PAUSINGSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_PAUSING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVEWAITINGSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ACTIVEWAITING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_PAUSINGWAITINGSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_PAUSINGWAITING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_RESUMINGSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_RESUMING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ABORTING));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGFORRESTARTSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ABORTINGFORRESTART));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ABORTINGFORRESTARTSEEDINGMINIMAL)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ABORTINGFORRESTARTMINIMAL));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_PAUSEDSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_PAUSED));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVEWAITSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ACTIVEWAIT));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_PAUSEDWAITSEEDING)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_PAUSEDWAIT));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_UNINSTALLED)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ACTIVE_UNINSTALLED));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NOOUTPUT)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ACTIVE_NOOUTPUT));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NEITHER)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_ACTIVE_NEITHER));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
 
-    }
-    catch (ManifoldCFException e)
-    {
-      signalRollback();
-      throw e;
-    }
-    catch (Error e)
-    {
-      signalRollback();
-      throw e;
-    }
-    finally
-    {
-      endTransaction();
-    }
   }
 
 
@@ -1470,6 +1715,7 @@ public class Jobs extends org.apache.man
 
       HashMap map = new HashMap();
       map.put(statusField,statusToString(newStatus));
+      map.put(processIDField,null);
       performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
     }
     catch (ManifoldCFException e)
@@ -1984,7 +2230,29 @@ public class Jobs extends org.apache.man
   *@param status is the desired status.
   *@param reseedTime is the reseed time.
   */
-  public void writeStatus(Long jobID, int status, Long reseedTime)
+  public void writeTransientStatus(Long jobID, int status, Long reseedTime, String processID)
+    throws ManifoldCFException
+  {
+    writeStatus(jobID, status, reseedTime, processID);
+  }
+
+  /** Update a job's status, and its reseed time.
+  *@param jobID is the job id.
+  *@param status is the desired status.
+  *@param reseedTime is the reseed time.
+  */
+  public void writePermanentStatus(Long jobID, int status, Long reseedTime)
+    throws ManifoldCFException
+  {
+    writeStatus(jobID, status, reseedTime, null);
+  }
+
+  /** Update a job's status, and its reseed time.
+  *@param jobID is the job id.
+  *@param status is the desired status.
+  *@param reseedTime is the reseed time.
+  */
+  protected void writeStatus(Long jobID, int status, Long reseedTime, String processID)
     throws ManifoldCFException
   {
     ArrayList list = new ArrayList();
@@ -1992,6 +2260,7 @@ public class Jobs extends org.apache.man
       new UnitaryClause(idField,jobID)});
     HashMap map = new HashMap();
     map.put(statusField,statusToString(status));
+    map.put(processIDField,processID);
     map.put(reseedTimeField,reseedTime);
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
   }
@@ -2000,7 +2269,27 @@ public class Jobs extends org.apache.man
   *@param jobID is the job id.
   *@param status is the desired status.
   */
-  public void writeStatus(Long jobID, int status)
+  public void writeTransientStatus(Long jobID, int status, String processID)
+    throws ManifoldCFException
+  {
+    writeStatus(jobID, status, processID);
+  }
+  
+  /** Update a job's status.
+  *@param jobID is the job id.
+  *@param status is the desired status.
+  */
+  public void writePermanentStatus(Long jobID, int status)
+    throws ManifoldCFException
+  {
+    writeStatus(jobID, status, null);
+  }
+
+  /** Update a job's status.
+  *@param jobID is the job id.
+  *@param status is the desired status.
+  */
+  protected void writeStatus(Long jobID, int status, String processID)
     throws ManifoldCFException
   {
     ArrayList list = new ArrayList();
@@ -2008,6 +2297,7 @@ public class Jobs extends org.apache.man
       new UnitaryClause(idField,jobID)});
     HashMap map = new HashMap();
     map.put(statusField,statusToString(status));
+    map.put(processIDField,processID);
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
   }
 
@@ -2221,6 +2511,7 @@ public class Jobs extends org.apache.man
       new UnitaryClause(idField,jobID)});
     HashMap map = new HashMap();
     map.put(statusField,statusToString(STATUS_INACTIVE));
+    map.put(processIDField,null);
     // Leave everything else around from the abort/finish.
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
   }

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java?rev=1545624&r1=1545623&r2=1545624&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java Tue Nov 26 11:45:53 2013
@@ -21,6 +21,7 @@ package org.apache.manifoldcf.crawler.sy
 import org.apache.manifoldcf.core.interfaces.*;
 import org.apache.manifoldcf.agents.interfaces.*;
 import org.apache.manifoldcf.crawler.interfaces.*;
+import java.util.*;
 
 /** This is the main agent class for the crawler.
 */
@@ -28,20 +29,87 @@ public class CrawlerAgent implements IAg
 {
   public static final String _rcsid = "@(#)$Id: CrawlerAgent.java 988245 2010-08-23 18:39:35Z kwright $";
 
-  protected IThreadContext threadContext;
+  // Thread objects.
+  // These get filled in as threads are created.
+  protected InitializationThread initializationThread = null;
+  protected JobStartThread jobStartThread = null;
+  protected StufferThread stufferThread = null;
+  protected FinisherThread finisherThread = null;
+  protected JobNotificationThread notificationThread = null;
+  protected StartupThread startupThread = null;
+  protected StartDeleteThread startDeleteThread = null;
+  protected JobDeleteThread jobDeleteThread = null;
+  protected WorkerThread[] workerThreads = null;
+  protected ExpireStufferThread expireStufferThread = null;
+  protected ExpireThread[] expireThreads = null;
+  protected DocumentDeleteStufferThread deleteStufferThread = null;
+  protected DocumentDeleteThread[] deleteThreads = null;
+  protected DocumentCleanupStufferThread cleanupStufferThread = null;
+  protected DocumentCleanupThread[] cleanupThreads = null;
+  protected JobResetThread jobResetThread = null;
+  protected SeedingThread seedingThread = null;
+  protected IdleCleanupThread idleCleanupThread = null;
+  protected SetPriorityThread setPriorityThread = null;
+  protected HistoryCleanupThread historyCleanupThread = null;
+
+  // Reset managers
+  /** Worker thread pool reset manager */
+  protected WorkerResetManager workerResetManager = null;
+  /** Delete thread pool reset manager */
+  protected DocDeleteResetManager docDeleteResetManager = null;
+  /** Cleanup thread pool reset manager */
+  protected DocCleanupResetManager docCleanupResetManager = null;
+
+  // Number of worker threads
+  protected int numWorkerThreads = 0;
+  // Number of delete threads
+  protected int numDeleteThreads = 0;
+  // Number of cleanup threads
+  protected int numCleanupThreads = 0;
+  // Number of expiration threads
+  protected int numExpireThreads = 0;
+  // Factor for low water level in queueing
+  protected float lowWaterFactor = 5.0f;
+  // Factor in amount to stuff
+  protected float stuffAmtFactor = 0.5f;
+
+  /** Process identifier for this agent */
+  protected String processID = null;
 
   /** Constructor.
   *@param threadContext is the thread context.
   */
-  public CrawlerAgent(IThreadContext threadContext)
+  public CrawlerAgent()
     throws ManifoldCFException
   {
-    this.threadContext = threadContext;
+  }
+
+  /** Initialize agent environment.
+  * This is called before any of the other operations are called, and is meant to insure that
+  * the environment is properly initialized.
+  */
+  public void initialize(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    org.apache.manifoldcf.authorities.system.ManifoldCF.localInitialize(threadContext);
+    org.apache.manifoldcf.crawler.system.ManifoldCF.localInitialize(threadContext);
+  }
+  
+  /** Tear down agent environment.
+  * This is called after all the other operations are completed, and is meant to allow
+  * environment resources to be freed.
+  */
+  public void cleanUp(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    org.apache.manifoldcf.crawler.system.ManifoldCF.localCleanup(threadContext);
+    org.apache.manifoldcf.authorities.system.ManifoldCF.localCleanup(threadContext);
   }
 
   /** Install agent.  This usually installs the agent's database tables etc.
   */
-  public void install()
+  @Override
+  public void install(IThreadContext threadContext)
     throws ManifoldCFException
   {
     // Install the system tables for the crawler.
@@ -50,79 +118,644 @@ public class CrawlerAgent implements IAg
 
   /** Uninstall agent.  This must clean up everything the agent is responsible for.
   */
-  public void deinstall()
+  @Override
+  public void deinstall(IThreadContext threadContext)
     throws ManifoldCFException
   {
     ManifoldCF.deinstallSystemTables(threadContext);
   }
 
+  /** Called ONLY when no other active services of this kind are running.  Meant to be
+  * used after the cluster has been down for an indeterminate period of time.
+  */
+  @Override
+  public void clusterInit(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    IJobManager jobManager = JobManagerFactory.make(threadContext);
+    jobManager.prepareForClusterStart();
+  }
+
+  /** Cleanup after ALL agents processes.
+  * Call this method to clean up dangling persistent state when a cluster is just starting
+  * to come up.  This method CANNOT be called when there are any active agents
+  * processes at all.
+  */
+  @Override
+  public void cleanUpAgentData(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    IJobManager jobManager = JobManagerFactory.make(threadContext);
+    jobManager.cleanupProcessData();
+  }
+  
+  /** Cleanup after agents process.
+  * Call this method to clean up dangling persistent state after agent has been stopped.
+  * This method CANNOT be called when the agent is active, but it can
+  * be called at any time and by any process in order to guarantee that a terminated
+  * agent does not block other agents from completing their tasks.
+  *@param processID is the process ID of the agent to clean up after.
+  */
+  @Override
+  public void cleanUpAgentData(IThreadContext threadContext, String processID)
+    throws ManifoldCFException
+  {
+    IJobManager jobManager = JobManagerFactory.make(threadContext);
+    jobManager.cleanupProcessData(processID);
+  }
+
   /** Start the agent.  This method should spin up the agent threads, and
   * then return.
   */
-  public void startAgent()
+  @Override
+  public void startAgent(IThreadContext threadContext, String processID)
     throws ManifoldCFException
   {
-    org.apache.manifoldcf.authorities.system.ManifoldCF.localInitialize(threadContext);
-    org.apache.manifoldcf.crawler.system.ManifoldCF.localInitialize(threadContext);
-    ManifoldCF.startSystem(threadContext);
+    this.processID = processID;
+    startSystem(threadContext);
   }
 
-  /** Stop the agent.  This should shut down the agent threads.
+  /** Stop the agent.  This should shut down the agent threads etc.
   */
-  public void stopAgent()
+  @Override
+  public void stopAgent(IThreadContext threadContext)
     throws ManifoldCFException
   {
-    ManifoldCF.stopSystem(threadContext);
-    org.apache.manifoldcf.crawler.system.ManifoldCF.localCleanup(threadContext);
-    org.apache.manifoldcf.authorities.system.ManifoldCF.localCleanup(threadContext);
+    stopSystem(threadContext);
   }
 
   /** Request permission from agent to delete an output connection.
   *@param connName is the name of the output connection.
   *@return true if the connection is in use, false otherwise.
   */
-  public boolean isOutputConnectionInUse(String connName)
+  @Override
+  public boolean isOutputConnectionInUse(IThreadContext threadContext, String connName)
     throws ManifoldCFException
   {
-    return ManifoldCF.isOutputConnectionInUse(threadContext,connName);
+    // Check with job manager.
+    IJobManager jobManager = JobManagerFactory.make(threadContext);
+    return jobManager.checkIfOutputReference(connName);
   }
 
   /** Note the deregistration of a set of output connections.
   *@param connectionNames are the names of the connections being deregistered.
   */
-  public void noteOutputConnectorDeregistration(String[] connectionNames)
+  @Override
+  public void noteOutputConnectorDeregistration(IThreadContext threadContext, String[] connectionNames)
     throws ManifoldCFException
   {
-    ManifoldCF.noteOutputConnectorDeregistration(threadContext,connectionNames);
+    // Notify job manager
+    IJobManager jobManager = JobManagerFactory.make(threadContext);
+    jobManager.noteOutputConnectorDeregistration(connectionNames);
   }
 
   /** Note the registration of a set of output connections.
   *@param connectionNames are the names of the connections being registered.
   */
-  public void noteOutputConnectorRegistration(String[] connectionNames)
+  @Override
+  public void noteOutputConnectorRegistration(IThreadContext threadContext, String[] connectionNames)
     throws ManifoldCFException
   {
-    ManifoldCF.noteOutputConnectorRegistration(threadContext,connectionNames);
+    // Notify job manager
+    IJobManager jobManager = JobManagerFactory.make(threadContext);
+    jobManager.noteOutputConnectorRegistration(connectionNames);
   }
 
   /** Note a change in configuration for an output connection.
   *@param connectionName is the name of the connections being changed.
   */
-  public void noteOutputConnectionChange(String connectionName)
+  @Override
+  public void noteOutputConnectionChange(IThreadContext threadContext, String connectionName)
     throws ManifoldCFException
   {
-    ManifoldCF.noteOutputConnectionChange(threadContext,connectionName);
+    // Notify job manager
+    IJobManager jobManager = JobManagerFactory.make(threadContext);
+    jobManager.noteOutputConnectionChange(connectionName);
+  }
+
+  /** Start everything.
+  */
+  public void startSystem(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    Logging.root.info("Starting up pull-agent...");
+    // Now, start all the threads
+    numWorkerThreads = ManifoldCF.getMaxWorkerThreads(threadContext);
+    if (numWorkerThreads < 1 || numWorkerThreads > 300)
+      throw new ManifoldCFException("Illegal value for the number of worker threads");
+    numDeleteThreads = ManifoldCF.getMaxDeleteThreads(threadContext);
+    numCleanupThreads = ManifoldCF.getMaxCleanupThreads(threadContext);
+    numExpireThreads = ManifoldCF.getMaxExpireThreads(threadContext);
+    if (numDeleteThreads < 1 || numDeleteThreads > 300)
+      throw new ManifoldCFException("Illegal value for the number of delete threads");
+    if (numCleanupThreads < 1 || numCleanupThreads > 300)
+      throw new ManifoldCFException("Illegal value for the number of cleanup threads");
+    if (numExpireThreads < 1 || numExpireThreads > 300)
+      throw new ManifoldCFException("Illegal value for the number of expire threads");
+    lowWaterFactor = (float)LockManagerFactory.getDoubleProperty(threadContext,ManifoldCF.lowWaterFactorProperty,5.0);
+    if (lowWaterFactor < 1.0 || lowWaterFactor > 1000.0)
+      throw new ManifoldCFException("Illegal value for the low water factor");
+    stuffAmtFactor = (float)LockManagerFactory.getDoubleProperty(threadContext,ManifoldCF.stuffAmtFactorProperty,2.0);
+    if (stuffAmtFactor < 0.1 || stuffAmtFactor > 1000.0)
+      throw new ManifoldCFException("Illegal value for the stuffing amount factor");
+
+
+    // Create the threads and objects.  This MUST be completed before there is any chance of "shutdownSystem" getting called.
+
+    QueueTracker queueTracker = new QueueTracker();
+
+
+    DocumentQueue documentQueue = new DocumentQueue();
+    DocumentDeleteQueue documentDeleteQueue = new DocumentDeleteQueue();
+    DocumentCleanupQueue documentCleanupQueue = new DocumentCleanupQueue();
+    DocumentCleanupQueue expireQueue = new DocumentCleanupQueue();
+
+    BlockingDocuments blockingDocuments = new BlockingDocuments();
+
+    workerResetManager = new WorkerResetManager(documentQueue,expireQueue,processID);
+    docDeleteResetManager = new DocDeleteResetManager(documentDeleteQueue,processID);
+    docCleanupResetManager = new DocCleanupResetManager(documentCleanupQueue,processID);
+
+    jobStartThread = new JobStartThread(processID);
+    startupThread = new StartupThread(queueTracker,new StartupResetManager(processID),processID);
+    startDeleteThread = new StartDeleteThread(new DeleteStartupResetManager(processID),processID);
+    finisherThread = new FinisherThread(processID);
+    notificationThread = new JobNotificationThread(new NotificationResetManager(processID),processID);
+    jobDeleteThread = new JobDeleteThread(processID);
+    stufferThread = new StufferThread(documentQueue,numWorkerThreads,workerResetManager,queueTracker,blockingDocuments,lowWaterFactor,stuffAmtFactor,processID);
+    expireStufferThread = new ExpireStufferThread(expireQueue,numExpireThreads,workerResetManager,processID);
+    setPriorityThread = new SetPriorityThread(queueTracker,numWorkerThreads,blockingDocuments,processID);
+    historyCleanupThread = new HistoryCleanupThread(processID);
+
+    workerThreads = new WorkerThread[numWorkerThreads];
+    int i = 0;
+    while (i < numWorkerThreads)
+    {
+      workerThreads[i] = new WorkerThread(Integer.toString(i),documentQueue,workerResetManager,queueTracker,processID);
+      i++;
+    }
+
+    expireThreads = new ExpireThread[numExpireThreads];
+    i = 0;
+    while (i < numExpireThreads)
+    {
+      expireThreads[i] = new ExpireThread(Integer.toString(i),expireQueue,queueTracker,workerResetManager,processID);
+      i++;
+    }
+
+    deleteStufferThread = new DocumentDeleteStufferThread(documentDeleteQueue,numDeleteThreads,docDeleteResetManager,processID);
+    deleteThreads = new DocumentDeleteThread[numDeleteThreads];
+    i = 0;
+    while (i < numDeleteThreads)
+    {
+      deleteThreads[i] = new DocumentDeleteThread(Integer.toString(i),documentDeleteQueue,docDeleteResetManager,processID);
+      i++;
+    }
+      
+    cleanupStufferThread = new DocumentCleanupStufferThread(documentCleanupQueue,numCleanupThreads,docCleanupResetManager,processID);
+    cleanupThreads = new DocumentCleanupThread[numCleanupThreads];
+    i = 0;
+    while (i < numCleanupThreads)
+    {
+      cleanupThreads[i] = new DocumentCleanupThread(Integer.toString(i),documentCleanupQueue,queueTracker,docCleanupResetManager,processID);
+      i++;
+    }
+
+    jobResetThread = new JobResetThread(queueTracker,processID);
+    seedingThread = new SeedingThread(queueTracker,new SeedingResetManager(processID),processID);
+    idleCleanupThread = new IdleCleanupThread(processID);
+
+    initializationThread = new InitializationThread(queueTracker);
+    // Start the initialization thread.  This does the initialization work and starts all the other threads when that's done.  It then exits.
+    initializationThread.start();
+    Logging.root.info("Pull-agent started");
+  }
+
+  protected class InitializationThread extends Thread
+  {
+
+    protected final QueueTracker queueTracker;
+
+    public InitializationThread(QueueTracker queueTracker)
+    {
+      super();
+      this.queueTracker = queueTracker;
+      setName("Initialization thread");
+      setDaemon(true);
+    }
+
+    public void run()
+    {
+      int i;
+
+      try
+      {
+        IThreadContext threadContext = ThreadContextFactory.make();
+
+        // First, get a job manager
+        IJobManager jobManager = JobManagerFactory.make(threadContext);
+        IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext);
+
+        /* No longer needed, because IAgents specifically initializes/cleans up.
+        
+        Logging.threads.debug("Agents process starting initialization...");
+
+        // Call the database to get it ready
+        jobManager.prepareForStart();
+        */
+        
+        Logging.threads.debug("Agents process reprioritizing documents...");
+
+        Map<String,IRepositoryConnection> connectionMap = new HashMap<String,IRepositoryConnection>();
+        Map<Long,IJobDescription> jobDescriptionMap = new HashMap<Long,IJobDescription>();
+        // Reprioritize all documents in the jobqueue, 1000 at a time
+        long currentTime = System.currentTimeMillis();
+
+        // Do the 'not yet processed' documents only.  Documents that are queued for reprocessing will be assigned
+        // new priorities.  Already processed documents won't.  This guarantees that our bins are appropriate for current thread
+        // activity.
+        // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document
+        // priorities.
+        while (true)
+        {
+          long startTime = System.currentTimeMillis();
+
+          DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(currentTime, 10000);
+          if (docs.length == 0)
+            break;
+
+          // Calculate new priorities for all these documents
+          ManifoldCF.writeDocumentPriorities(threadContext,mgr,jobManager,docs,connectionMap,jobDescriptionMap,
+            queueTracker,currentTime);
+
+          Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
+        }
+
+        Logging.threads.debug("Agents process initialization complete!");
+
+        // Start all the threads
+        jobStartThread.start();
+        startupThread.start();
+        startDeleteThread.start();
+        finisherThread.start();
+        notificationThread.start();
+        jobDeleteThread.start();
+        stufferThread.start();
+        expireStufferThread.start();
+        setPriorityThread.start();
+        historyCleanupThread.start();
+
+        i = 0;
+        while (i < numWorkerThreads)
+        {
+          workerThreads[i].start();
+          i++;
+        }
+
+        i = 0;
+        while (i < numExpireThreads)
+        {
+          expireThreads[i].start();
+          i++;
+        }
+
+        cleanupStufferThread.start();
+        i = 0;
+        while (i < numCleanupThreads)
+        {
+          cleanupThreads[i].start();
+          i++;
+        }
+
+        deleteStufferThread.start();
+        i = 0;
+        while (i < numDeleteThreads)
+        {
+          deleteThreads[i].start();
+          i++;
+        }
+
+        jobResetThread.start();
+        seedingThread.start();
+        idleCleanupThread.start();
+        // exit!
+      }
+      catch (Throwable e)
+      {
+        // Severe error on initialization
+        if (e instanceof ManifoldCFException)
+        {
+          // Deal with interrupted exception gracefully, because it means somebody is trying to shut us down before we got started.
+          if (((ManifoldCFException)e).getErrorCode() == ManifoldCFException.INTERRUPTED)
+            return;
+        }
+        System.err.println("agents process could not start - shutting down");
+        Logging.threads.fatal("Startup initialization error tossed: "+e.getMessage(),e);
+        System.exit(-300);
+      }
+    }
   }
 
-  /** Signal that an output connection needs to be "redone".  This means that all documents sent to that output connection must be sent again,
-  * and the history as to their status must be forgotten.
-  *@param connectionName is the name of the connection being signalled.
+  /** Stop the system.
   */
-  public void signalOutputConnectionRedo(String connectionName)
+  public void stopSystem(IThreadContext threadContext)
     throws ManifoldCFException
   {
-    ManifoldCF.signalOutputConnectionRedo(threadContext,connectionName);
+    Logging.root.info("Shutting down pull-agent...");
+    while (initializationThread != null || jobDeleteThread != null || startupThread != null || startDeleteThread != null ||
+      jobStartThread != null || stufferThread != null ||
+      finisherThread != null || notificationThread != null || workerThreads != null || expireStufferThread != null || expireThreads != null ||
+      deleteStufferThread != null || deleteThreads != null ||
+      cleanupStufferThread != null || cleanupThreads != null ||
+      jobResetThread != null || seedingThread != null || idleCleanupThread != null || setPriorityThread != null || historyCleanupThread != null)
+    {
+      // Send an interrupt to all threads that are still there.
+      // In theory, this only needs to be done once.  In practice, I have seen cases where the thread loses track of the fact that it has been
+      // interrupted (which may be a JVM bug - who knows?), but in any case there's no harm in doing it again.
+      if (initializationThread != null)
+      {
+        initializationThread.interrupt();
+      }
+      if (historyCleanupThread != null)
+      {
+        historyCleanupThread.interrupt();
+      }
+      if (setPriorityThread != null)
+      {
+        setPriorityThread.interrupt();
+      }
+      if (jobStartThread != null)
+      {
+        jobStartThread.interrupt();
+      }
+      if (jobDeleteThread != null)
+      {
+        jobDeleteThread.interrupt();
+      }
+      if (startupThread != null)
+      {
+        startupThread.interrupt();
+      }
+      if (startDeleteThread != null)
+      {
+        startDeleteThread.interrupt();
+      }
+      if (stufferThread != null)
+      {
+        stufferThread.interrupt();
+      }
+      if (expireStufferThread != null)
+      {
+        expireStufferThread.interrupt();
+      }
+      if (finisherThread != null)
+      {
+        finisherThread.interrupt();
+      }
+      if (notificationThread != null)
+      {
+        notificationThread.interrupt();
+      }
+      if (workerThreads != null)
+      {
+        int i = 0;
+        while (i < workerThreads.length)
+        {
+          Thread workerThread = workerThreads[i++];
+          if (workerThread != null)
+            workerThread.interrupt();
+        }
+      }
+      if (expireThreads != null)
+      {
+        int i = 0;
+        while (i < expireThreads.length)
+        {
+          Thread expireThread = expireThreads[i++];
+          if (expireThread != null)
+            expireThread.interrupt();
+        }
+      }
+      if (cleanupStufferThread != null)
+      {
+        cleanupStufferThread.interrupt();
+      }
+      if (cleanupThreads != null)
+      {
+        int i = 0;
+        while (i < cleanupThreads.length)
+        {
+          Thread cleanupThread = cleanupThreads[i++];
+          if (cleanupThread != null)
+            cleanupThread.interrupt();
+        }
+      }
+      if (deleteStufferThread != null)
+      {
+        deleteStufferThread.interrupt();
+      }
+      if (deleteThreads != null)
+      {
+        int i = 0;
+        while (i < deleteThreads.length)
+        {
+          Thread deleteThread = deleteThreads[i++];
+          if (deleteThread != null)
+            deleteThread.interrupt();
+        }
+      }
+      if (jobResetThread != null)
+      {
+        jobResetThread.interrupt();
+      }
+      if (seedingThread != null)
+      {
+        seedingThread.interrupt();
+      }
+      if (idleCleanupThread != null)
+      {
+        idleCleanupThread.interrupt();
+      }
+
+      // Now, wait for all threads to die.
+      try
+      {
+        ManifoldCF.sleep(1000L);
+      }
+      catch (InterruptedException e)
+      {
+      }
+
+      // Check to see which died.
+      if (initializationThread != null)
+      {
+        if (!initializationThread.isAlive())
+          initializationThread = null;
+      }
+      if (historyCleanupThread != null)
+      {
+        if (!historyCleanupThread.isAlive())
+          historyCleanupThread = null;
+      }
+      if (setPriorityThread != null)
+      {
+        if (!setPriorityThread.isAlive())
+          setPriorityThread = null;
+      }
+      if (jobDeleteThread != null)
+      {
+        if (!jobDeleteThread.isAlive())
+          jobDeleteThread = null;
+      }
+      if (startupThread != null)
+      {
+        if (!startupThread.isAlive())
+          startupThread = null;
+      }
+      if (startDeleteThread != null)
+      {
+        if (!startDeleteThread.isAlive())
+          startDeleteThread = null;
+      }
+      if (jobStartThread != null)
+      {
+        if (!jobStartThread.isAlive())
+          jobStartThread = null;
+      }
+      if (stufferThread != null)
+      {
+        if (!stufferThread.isAlive())
+          stufferThread = null;
+      }
+      if (expireStufferThread != null)
+      {
+        if (!expireStufferThread.isAlive())
+          expireStufferThread = null;
+      }
+      if (finisherThread != null)
+      {
+        if (!finisherThread.isAlive())
+          finisherThread = null;
+      }
+      if (notificationThread != null)
+      {
+        if (!notificationThread.isAlive())
+          notificationThread = null;
+      }
+      if (workerThreads != null)
+      {
+        int i = 0;
+        boolean isAlive = false;
+        while (i < workerThreads.length)
+        {
+          Thread workerThread = workerThreads[i];
+          if (workerThread != null)
+          {
+            if (!workerThread.isAlive())
+              workerThreads[i] = null;
+            else
+              isAlive = true;
+          }
+          i++;
+        }
+        if (!isAlive)
+          workerThreads = null;
+      }
+
+      if (expireThreads != null)
+      {
+        int i = 0;
+        boolean isAlive = false;
+        while (i < expireThreads.length)
+        {
+          Thread expireThread = expireThreads[i];
+          if (expireThread != null)
+          {
+            if (!expireThread.isAlive())
+              expireThreads[i] = null;
+            else
+              isAlive = true;
+          }
+          i++;
+        }
+        if (!isAlive)
+          expireThreads = null;
+      }
+      
+      if (cleanupStufferThread != null)
+      {
+        if (!cleanupStufferThread.isAlive())
+          cleanupStufferThread = null;
+      }
+      if (cleanupThreads != null)
+      {
+        int i = 0;
+        boolean isAlive = false;
+        while (i < cleanupThreads.length)
+        {
+          Thread cleanupThread = cleanupThreads[i];
+          if (cleanupThread != null)
+          {
+            if (!cleanupThread.isAlive())
+              cleanupThreads[i] = null;
+            else
+              isAlive = true;
+          }
+          i++;
+        }
+        if (!isAlive)
+          cleanupThreads = null;
+      }
+
+      if (deleteStufferThread != null)
+      {
+        if (!deleteStufferThread.isAlive())
+          deleteStufferThread = null;
+      }
+      if (deleteThreads != null)
+      {
+        int i = 0;
+        boolean isAlive = false;
+        while (i < deleteThreads.length)
+        {
+          Thread deleteThread = deleteThreads[i];
+          if (deleteThread != null)
+          {
+            if (!deleteThread.isAlive())
+              deleteThreads[i] = null;
+            else
+              isAlive = true;
+          }
+          i++;
+        }
+        if (!isAlive)
+          deleteThreads = null;
+      }
+      if (jobResetThread != null)
+      {
+        if (!jobResetThread.isAlive())
+          jobResetThread = null;
+      }
+      if (seedingThread != null)
+      {
+        if (!seedingThread.isAlive())
+          seedingThread = null;
+      }
+      if (idleCleanupThread != null)
+      {
+        if (!idleCleanupThread.isAlive())
+          idleCleanupThread = null;
+      }
+    }
+
+    // Threads are down; release connectors
+    RepositoryConnectorFactory.closeAllConnectors(threadContext);
+    numWorkerThreads = 0;
+    numDeleteThreads = 0;
+    numExpireThreads = 0;
+    Logging.root.info("Pull-agent successfully shut down");
   }
+  
 
 }
 

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java?rev=1545624&r1=1545623&r2=1545624&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java Tue Nov 26 11:45:53 2013
@@ -31,26 +31,28 @@ public class DocCleanupResetManager exte
 {
   public static final String _rcsid = "@(#)$Id$";
 
-  protected DocumentCleanupQueue ddq;
+  protected final DocumentCleanupQueue ddq;
 
   /** Constructor. */
-  public DocCleanupResetManager(DocumentCleanupQueue ddq)
+  public DocCleanupResetManager(DocumentCleanupQueue ddq, String processID)
   {
-    super();
+    super(processID);
     this.ddq = ddq;
   }
 
   /** Reset */
-  protected void performResetLogic(IThreadContext tc)
+  @Override
+  protected void performResetLogic(IThreadContext tc, String processID)
     throws ManifoldCFException
   {
     IJobManager jobManager = JobManagerFactory.make(tc);
-    jobManager.resetDocCleanupWorkerStatus();
+    jobManager.resetDocCleanupWorkerStatus(processID);
     ddq.clear();
   }
   
   /** Do the wakeup logic.
   */
+  @Override
   protected void performWakeupLogic()
   {
     ddq.reset();

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocDeleteResetManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocDeleteResetManager.java?rev=1545624&r1=1545623&r2=1545624&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocDeleteResetManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocDeleteResetManager.java Tue Nov 26 11:45:53 2013
@@ -31,26 +31,28 @@ public class DocDeleteResetManager exten
 {
   public static final String _rcsid = "@(#)$Id: DocDeleteResetManager.java 988245 2010-08-23 18:39:35Z kwright $";
 
-  protected DocumentDeleteQueue ddq;
+  protected final DocumentDeleteQueue ddq;
 
   /** Constructor. */
-  public DocDeleteResetManager(DocumentDeleteQueue ddq)
+  public DocDeleteResetManager(DocumentDeleteQueue ddq, String processID)
   {
-    super();
+    super(processID);
     this.ddq = ddq;
   }
 
   /** Reset */
-  protected void performResetLogic(IThreadContext tc)
+  @Override
+  protected void performResetLogic(IThreadContext tc, String processID)
     throws ManifoldCFException
   {
     IJobManager jobManager = JobManagerFactory.make(tc);
-    jobManager.resetDocDeleteWorkerStatus();
+    jobManager.resetDocDeleteWorkerStatus(processID);
     ddq.clear();
   }
 
   /** Do the wakeup logic.
   */
+  @Override
   protected void performWakeupLogic()
   {
     ddq.reset();

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java?rev=1545624&r1=1545623&r2=1545624&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java Tue Nov 26 11:45:53 2013
@@ -36,24 +36,27 @@ public class DocumentCleanupStufferThrea
   public static final String _rcsid = "@(#)$Id$";
 
   // Local data
-  // This is a reference to the static main document queue
-  protected DocumentCleanupQueue documentCleanupQueue;
-  // This is the reset manager
-  protected DocCleanupResetManager resetManager;
-  // This is the number of entries we want to stuff at any one time.
-  int n;
+  /** This is a reference to the static main document queue */
+  protected final DocumentCleanupQueue documentCleanupQueue;
+  /** This is the reset manager */
+  protected final DocCleanupResetManager resetManager;
+  /** This is the number of entries we want to stuff at any one time. */
+  protected final int n;
+  /** Process ID */
+  protected final String processID;
 
   /** Constructor.
   *@param documentCleanupQueue is the document queue we'll be stuffing.
   *@param n is the maximum number of threads that will be doing delete processing.
   */
-  public DocumentCleanupStufferThread(DocumentCleanupQueue documentCleanupQueue, int n, DocCleanupResetManager resetManager)
+  public DocumentCleanupStufferThread(DocumentCleanupQueue documentCleanupQueue, int n, DocCleanupResetManager resetManager, String processID)
     throws ManifoldCFException
   {
     super();
     this.documentCleanupQueue = documentCleanupQueue;
     this.n = n;
     this.resetManager = resetManager;
+    this.processID = processID;
     setName("Document cleanup stuffer thread");
     setDaemon(true);
   }
@@ -102,7 +105,7 @@ public class DocumentCleanupStufferThrea
           // This method will set the status of the documents in question
           // to "beingcleaned".
 
-          DocumentSetAndFlags documentsToClean = jobManager.getNextCleanableDocuments(deleteChunkSize,currentTime);
+          DocumentSetAndFlags documentsToClean = jobManager.getNextCleanableDocuments(processID,deleteChunkSize,currentTime);
           DocumentDescription[] descs = documentsToClean.getDocumentSet();
           boolean[] removeFromIndex = documentsToClean.getFlags();
           

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java?rev=1545624&r1=1545623&r2=1545624&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java Tue Nov 26 11:45:53 2013
@@ -43,20 +43,23 @@ public class DocumentCleanupThread exten
 {
   public static final String _rcsid = "@(#)$Id$";
 
-
   // Local data
-  protected String id;
-  // This is a reference to the static main document queue
-  protected DocumentCleanupQueue documentCleanupQueue;
+  /** Thread id */
+  protected final String id;
+  /** This is a reference to the static main document queue */
+  protected final DocumentCleanupQueue documentCleanupQueue;
   /** Delete thread pool reset manager */
-  protected DocCleanupResetManager resetManager;
+  protected final DocCleanupResetManager resetManager;
   /** Queue tracker */
-  protected QueueTracker queueTracker;
+  protected final QueueTracker queueTracker;
+  /** Process ID */
+  protected final String processID;
 
   /** Constructor.
   *@param id is the worker thread id.
   */
-  public DocumentCleanupThread(String id, DocumentCleanupQueue documentCleanupQueue, QueueTracker queueTracker, DocCleanupResetManager resetManager)
+  public DocumentCleanupThread(String id, DocumentCleanupQueue documentCleanupQueue,
+    QueueTracker queueTracker, DocCleanupResetManager resetManager, String processID)
     throws ManifoldCFException
   {
     super();
@@ -64,6 +67,7 @@ public class DocumentCleanupThread exten
     this.documentCleanupQueue = documentCleanupQueue;
     this.queueTracker = queueTracker;
     this.resetManager = resetManager;
+    this.processID = processID;
     setName("Document cleanup thread '"+id+"'");
     setDaemon(true);
   }

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteStufferThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteStufferThread.java?rev=1545624&r1=1545623&r2=1545624&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteStufferThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteStufferThread.java Tue Nov 26 11:45:53 2013
@@ -36,24 +36,27 @@ public class DocumentDeleteStufferThread
   public static final String _rcsid = "@(#)$Id: DocumentDeleteStufferThread.java 988245 2010-08-23 18:39:35Z kwright $";
 
   // Local data
-  // This is a reference to the static main document queue
-  protected DocumentDeleteQueue documentDeleteQueue;
-  // This is the reset manager
-  protected DocDeleteResetManager resetManager;
-  // This is the number of entries we want to stuff at any one time.
-  int n;
-
+  /** This is a reference to the static main document queue */
+  protected final DocumentDeleteQueue documentDeleteQueue;
+  /** This is the reset manager */
+  protected final DocDeleteResetManager resetManager;
+  /** This is the number of entries we want to stuff at any one time. */
+  protected final int n;
+  /** Process ID */
+  protected final String processID;
+  
   /** Constructor.
   *@param documentDeleteQueue is the document queue we'll be stuffing.
   *@param n is the maximum number of threads that will be doing delete processing.
   */
-  public DocumentDeleteStufferThread(DocumentDeleteQueue documentDeleteQueue, int n, DocDeleteResetManager resetManager)
+  public DocumentDeleteStufferThread(DocumentDeleteQueue documentDeleteQueue, int n, DocDeleteResetManager resetManager, String processID)
     throws ManifoldCFException
   {
     super();
     this.documentDeleteQueue = documentDeleteQueue;
     this.n = n;
     this.resetManager = resetManager;
+    this.processID = processID;
     setName("Document delete stuffer thread");
     setDaemon(true);
   }
@@ -102,7 +105,7 @@ public class DocumentDeleteStufferThread
           // This method will set the status of the documents in question
           // to "beingdeleted".
 
-          DocumentDescription[] descs = jobManager.getNextDeletableDocuments(deleteChunkSize,currentTime);
+          DocumentDescription[] descs = jobManager.getNextDeletableDocuments(processID,deleteChunkSize,currentTime);
 
           // If there are no chunks at all, then we can sleep for a while.
           // The theory is that we need to allow stuff to accumulate.

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java?rev=1545624&r1=1545623&r2=1545624&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java Tue Nov 26 11:45:53 2013
@@ -44,22 +44,26 @@ public class DocumentDeleteThread extend
 
 
   // Local data
-  protected String id;
-  // This is a reference to the static main document queue
-  protected DocumentDeleteQueue documentDeleteQueue;
+  /** Thread ID */
+  protected final String id;
+  /** This is a reference to the static main document queue */
+  protected final DocumentDeleteQueue documentDeleteQueue;
   /** Delete thread pool reset manager */
-  protected DocDeleteResetManager resetManager;
+  protected final DocDeleteResetManager resetManager;
+  /** Process ID */
+  protected final String processID;
 
   /** Constructor.
   *@param id is the worker thread id.
   */
-  public DocumentDeleteThread(String id, DocumentDeleteQueue documentDeleteQueue, DocDeleteResetManager resetManager)
+  public DocumentDeleteThread(String id, DocumentDeleteQueue documentDeleteQueue, DocDeleteResetManager resetManager, String processID)
     throws ManifoldCFException
   {
     super();
     this.id = id;
     this.documentDeleteQueue = documentDeleteQueue;
     this.resetManager = resetManager;
+    this.processID = processID;
     setName("Document delete thread '"+id+"'");
     setDaemon(true);
   }



Mime
View raw message