manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1542510 - in /manifoldcf/branches/CONNECTORS-781: connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/
Date Sat, 16 Nov 2013 13:25:42 GMT
Author: kwright
Date: Sat Nov 16 13:25:42 2013
New Revision: 1542510

URL: http://svn.apache.org/r1542510
Log:
Install write locks for stuffer threads.

Modified:
    manifoldcf/branches/CONNECTORS-781/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java

Modified: manifoldcf/branches/CONNECTORS-781/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java?rev=1542510&r1=1542509&r2=1542510&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java (original)
+++ manifoldcf/branches/CONNECTORS-781/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java Sat Nov 16 13:25:42 2013
@@ -110,6 +110,8 @@ public class ThrottledFetcher
   protected static final long TIME_1DAY = 24L * 60L * 60000L;
 
 
+  // The following static bin pools correspond to global resources that will be managed via ILockManager.
+  
   /** This is the static pool of ConnectionBin's, keyed by bin name. */
   protected static Map<String,ConnectionBin> connectionBins = new HashMap<String,ConnectionBin>();
   /** This is the static pool of ThrottleBin's, keyed by bin name. */
@@ -464,6 +466,8 @@ public class ThrottledFetcher
 
   /** Connection pool for a bin.
   * An instance of this class tracks the connections that are pooled and that are in use for a specific bin.
+  * NOTE WELL: This resource must be constrained globally, across all JVMs!
+  * To do that, we need an ILockManager to handle the global data for each bin.
   */
   protected static class ConnectionBin
   {
@@ -749,6 +753,8 @@ public class ThrottledFetcher
   * 3) For chunks that have started but not finished, we keep track of their size and estimated elapsed time in order to schedule when
   *    new chunks from other connections can start.
   *
+  * NOTE WELL: This resource must be constrained globally, across all JVMs!
+  * To do that, we need an ILockManager to handle the global data for each bin.
   */
   protected static class ThrottleBin
   {

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1542510&r1=1542509&r2=1542510&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Sat Nov 16 13:25:42 2013
@@ -32,6 +32,10 @@ public class JobManager implements IJobM
 {
   public static final String _rcsid = "@(#)$Id: JobManager.java 998576 2010-09-19 01:11:02Z kwright $";
 
+  protected static final String stufferLock = "_STUFFER_";
+  protected static final String deleteStufferLock = "_DELETESTUFFER_";
+  protected static final String expireStufferLock = "_EXPIRESTUFFER_";
+  protected static final String cleanStufferLock = "_CLEANSTUFFER_";
   protected static final String hopLock = "_HOPLOCK_";
 
   // Member variables
@@ -848,198 +852,208 @@ public class JobManager implements IJobM
     while (true)
     {
       long sleepAmt = 0L;
-      database.beginTransaction();
+      
+      // Enter a write lock.  This means we don't need a FOR UPDATE on the query.
+      lockManager.enterWriteLock(cleanStufferLock);
       try
       {
-        if (Logging.perf.isDebugEnabled())
-          Logging.perf.debug("After "+new Long(System.currentTimeMillis()-startTime).toString()+" ms, beginning query to look for documents to put on cleaning queue");
+        database.beginTransaction();
+        try
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("After "+new Long(System.currentTimeMillis()-startTime).toString()+" ms, beginning query to look for documents to put on cleaning queue");
 
-        // Note: This query does not do "FOR UPDATE", because it is running under the only thread that can possibly change the document's state to "being cleaned".
-        ArrayList list = new ArrayList();
-        
-        StringBuilder sb = new StringBuilder("SELECT ");
-        sb.append(jobQueue.idField).append(",")
-          .append(jobQueue.jobIDField).append(",")
-          .append(jobQueue.docHashField).append(",")
-          .append(jobQueue.docIDField).append(",")
-          .append(jobQueue.failTimeField).append(",")
-          .append(jobQueue.failCountField)
-          .append(" FROM ").append(jobQueue.getTableName()).append(" t0 WHERE ")
-          .append(database.buildConjunctionClause(list,new ClauseDescription[]{
-            new UnitaryClause("t0."+jobQueue.statusField,jobQueue.statusToString(jobQueue.STATUS_PURGATORY))})).append(" AND ")
-          .append("(t0.").append(jobQueue.checkTimeField).append(" IS NULL OR t0.").append(jobQueue.checkTimeField).append("<=?) AND ");
+          // Note: This query does not do "FOR UPDATE", because it is running under the only thread that can possibly change the document's state to "being cleaned".
+          ArrayList list = new ArrayList();
           
-        list.add(new Long(currentTime));
+          StringBuilder sb = new StringBuilder("SELECT ");
+          sb.append(jobQueue.idField).append(",")
+            .append(jobQueue.jobIDField).append(",")
+            .append(jobQueue.docHashField).append(",")
+            .append(jobQueue.docIDField).append(",")
+            .append(jobQueue.failTimeField).append(",")
+            .append(jobQueue.failCountField)
+            .append(" FROM ").append(jobQueue.getTableName()).append(" t0 WHERE ")
+            .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+              new UnitaryClause("t0."+jobQueue.statusField,jobQueue.statusToString(jobQueue.STATUS_PURGATORY))})).append(" AND ")
+            .append("(t0.").append(jobQueue.checkTimeField).append(" IS NULL OR t0.").append(jobQueue.checkTimeField).append("<=?) AND ");
+            
+          list.add(new Long(currentTime));
 
-        sb.append("EXISTS(SELECT 'x' FROM ").append(jobs.getTableName()).append(" t1 WHERE ")
-          .append(database.buildConjunctionClause(list,new ClauseDescription[]{
-            new UnitaryClause("t1."+jobs.statusField,jobs.statusToString(jobs.STATUS_SHUTTINGDOWN)),
-            new JoinClause("t1."+jobs.idField,"t0."+jobQueue.jobIDField)}))
-          .append(") AND ");
-        
-        sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.getTableName()).append(" t2 WHERE ")
-          .append(database.buildConjunctionClause(list,new ClauseDescription[]{
-            new JoinClause("t2."+jobQueue.docHashField,"t0."+jobQueue.docHashField)})).append(" AND ")
-          .append("t2.").append(jobQueue.statusField).append(" IN (?,?,?,?,?,?) AND ")
-          .append("t2.").append(jobQueue.jobIDField).append("!=t0.").append(jobQueue.jobIDField)
-          .append(") ");
+          sb.append("EXISTS(SELECT 'x' FROM ").append(jobs.getTableName()).append(" t1 WHERE ")
+            .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+              new UnitaryClause("t1."+jobs.statusField,jobs.statusToString(jobs.STATUS_SHUTTINGDOWN)),
+              new JoinClause("t1."+jobs.idField,"t0."+jobQueue.jobIDField)}))
+            .append(") AND ");
           
-        list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
-        list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY));
-        list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
-        list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
-        list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
-        list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
-
-        sb.append(database.constructOffsetLimitClause(0,maxCount));
-        
-        // The checktime is null field check is for backwards compatibility
-        IResultSet set = database.performQuery(sb.toString(),list,null,null,maxCount,null);
+          sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.getTableName()).append(" t2 WHERE ")
+            .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+              new JoinClause("t2."+jobQueue.docHashField,"t0."+jobQueue.docHashField)})).append(" AND ")
+            .append("t2.").append(jobQueue.statusField).append(" IN (?,?,?,?,?,?) AND ")
+            .append("t2.").append(jobQueue.jobIDField).append("!=t0.").append(jobQueue.jobIDField)
+            .append(") ");
+            
+          list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
+          list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY));
+          list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
+          list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
+          list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+          list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
 
-        if (Logging.perf.isDebugEnabled())
-          Logging.perf.debug("Done getting docs to cleaning queue after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
+          sb.append(database.constructOffsetLimitClause(0,maxCount));
+          
+          // The checktime is null field check is for backwards compatibility
+          IResultSet set = database.performQuery(sb.toString(),list,null,null,maxCount,null);
 
-        // We need to organize the returned set by connection name and output connection name, so that we can efficiently
-        // use  getUnindexableDocumentIdentifiers.
-        // This is a table keyed by connection name and containing an ArrayList, which in turn contains DocumentDescription
-        // objects.
-        HashMap connectionNameMap = new HashMap();
-        HashMap documentIDMap = new HashMap();
-        int i = 0;
-        while (i < set.getRowCount())
-        {
-          IResultRow row = set.getRow(i);
-          Long jobID = (Long)row.getValue(jobQueue.jobIDField);
-          String documentIDHash = (String)row.getValue(jobQueue.docHashField);
-          String documentID = (String)row.getValue(jobQueue.docIDField);
-          Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField);
-          Long failCountValue = (Long)row.getValue(jobQueue.failCountField);
-          // Failtime is probably not useful in this context, but we'll bring it along for completeness
-          long failTime;
-          if (failTimeValue == null)
-            failTime = -1L;
-          else
-            failTime = failTimeValue.longValue();
-          int failCount;
-          if (failCountValue == null)
-            failCount = 0;
-          else
-            failCount = (int)failCountValue.longValue();
-          IJobDescription jobDesc = load(jobID);
-          String connectionName = jobDesc.getConnectionName();
-          String outputConnectionName = jobDesc.getOutputConnectionName();
-          DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField),
-            jobID,documentIDHash,documentID,failTime,failCount);
-          String compositeDocumentID = makeCompositeID(documentIDHash,connectionName);
-          documentIDMap.put(compositeDocumentID,dd);
-          Map y = (Map)connectionNameMap.get(connectionName);
-          if (y == null)
-          {
-            y = new HashMap();
-            connectionNameMap.put(connectionName,y);
-          }
-          ArrayList x = (ArrayList)y.get(outputConnectionName);
-          if (x == null)
-          {
-            // New entry needed
-            x = new ArrayList();
-            y.put(outputConnectionName,x);
-          }
-          x.add(dd);
-          i++;
-        }
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Done getting docs to cleaning queue after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
 
-        // For each bin, obtain a filtered answer, and enter all answers into a hash table.
-        // We'll then scan the result again to look up the right descriptions for return,
-        // and delete the ones that are owned multiply.
-        HashMap allowedDocIds = new HashMap();
-        Iterator iter = connectionNameMap.keySet().iterator();
-        while (iter.hasNext())
-        {
-          String connectionName = (String)iter.next();
-          Map y = (Map)connectionNameMap.get(connectionName);
-          Iterator outputIter = y.keySet().iterator();
-          while (outputIter.hasNext())
+          // We need to organize the returned set by connection name and output connection name, so that we can efficiently
+          // use  getUnindexableDocumentIdentifiers.
+          // This is a table keyed by connection name and containing an ArrayList, which in turn contains DocumentDescription
+          // objects.
+          HashMap connectionNameMap = new HashMap();
+          HashMap documentIDMap = new HashMap();
+          int i = 0;
+          while (i < set.getRowCount())
           {
-            String outputConnectionName = (String)outputIter.next();
+            IResultRow row = set.getRow(i);
+            Long jobID = (Long)row.getValue(jobQueue.jobIDField);
+            String documentIDHash = (String)row.getValue(jobQueue.docHashField);
+            String documentID = (String)row.getValue(jobQueue.docIDField);
+            Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField);
+            Long failCountValue = (Long)row.getValue(jobQueue.failCountField);
+            // Failtime is probably not useful in this context, but we'll bring it along for completeness
+            long failTime;
+            if (failTimeValue == null)
+              failTime = -1L;
+            else
+              failTime = failTimeValue.longValue();
+            int failCount;
+            if (failCountValue == null)
+              failCount = 0;
+            else
+              failCount = (int)failCountValue.longValue();
+            IJobDescription jobDesc = load(jobID);
+            String connectionName = jobDesc.getConnectionName();
+            String outputConnectionName = jobDesc.getOutputConnectionName();
+            DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField),
+              jobID,documentIDHash,documentID,failTime,failCount);
+            String compositeDocumentID = makeCompositeID(documentIDHash,connectionName);
+            documentIDMap.put(compositeDocumentID,dd);
+            Map y = (Map)connectionNameMap.get(connectionName);
+            if (y == null)
+            {
+              y = new HashMap();
+              connectionNameMap.put(connectionName,y);
+            }
             ArrayList x = (ArrayList)y.get(outputConnectionName);
-            // Do the filter query
-            DocumentDescription[] descriptions = new DocumentDescription[x.size()];
-            int j = 0;
-            while (j < descriptions.length)
+            if (x == null)
             {
-              descriptions[j] = (DocumentDescription)x.get(j);
-              j++;
+              // New entry needed
+              x = new ArrayList();
+              y.put(outputConnectionName,x);
             }
-            String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName);
-            j = 0;
-            while (j < docIDHashes.length)
+            x.add(dd);
+            i++;
+          }
+
+          // For each bin, obtain a filtered answer, and enter all answers into a hash table.
+          // We'll then scan the result again to look up the right descriptions for return,
+          // and delete the ones that are owned multiply.
+          HashMap allowedDocIds = new HashMap();
+          Iterator iter = connectionNameMap.keySet().iterator();
+          while (iter.hasNext())
+          {
+            String connectionName = (String)iter.next();
+            Map y = (Map)connectionNameMap.get(connectionName);
+            Iterator outputIter = y.keySet().iterator();
+            while (outputIter.hasNext())
             {
-              String docIDHash = docIDHashes[j++];
-              String key = makeCompositeID(docIDHash,connectionName);
-              allowedDocIds.put(key,docIDHash);
+              String outputConnectionName = (String)outputIter.next();
+              ArrayList x = (ArrayList)y.get(outputConnectionName);
+              // Do the filter query
+              DocumentDescription[] descriptions = new DocumentDescription[x.size()];
+              int j = 0;
+              while (j < descriptions.length)
+              {
+                descriptions[j] = (DocumentDescription)x.get(j);
+                j++;
+              }
+              String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName);
+              j = 0;
+              while (j < docIDHashes.length)
+              {
+                String docIDHash = docIDHashes[j++];
+                String key = makeCompositeID(docIDHash,connectionName);
+                allowedDocIds.put(key,docIDHash);
+              }
             }
           }
-        }
 
-        // Now, assemble a result, and change the state of the records accordingly
-        // First thing to do is order by document hash, so we reduce the risk of deadlock.
-        String[] compositeIDArray = new String[documentIDMap.size()];
-        i = 0;
-        iter = documentIDMap.keySet().iterator();
-        while (iter.hasNext())
+          // Now, assemble a result, and change the state of the records accordingly
+          // First thing to do is order by document hash, so we reduce the risk of deadlock.
+          String[] compositeIDArray = new String[documentIDMap.size()];
+          i = 0;
+          iter = documentIDMap.keySet().iterator();
+          while (iter.hasNext())
+          {
+            compositeIDArray[i++] = (String)iter.next();
+          }
+          
+          java.util.Arrays.sort(compositeIDArray);
+          
+          DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()];
+          boolean[] rvalBoolean = new boolean[documentIDMap.size()];
+          i = 0;
+          while (i < compositeIDArray.length)
+          {
+            String compositeDocID = compositeIDArray[i];
+            DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocID);
+            // Determine whether we can delete it from the index or not
+            rvalBoolean[i] = (allowedDocIds.get(compositeDocID) != null);
+            // Set the record status to "being cleaned" and return it
+            rval[i++] = dd;
+            jobQueue.setCleaningStatus(dd.getID());
+          }
+
+          TrackerClass.notePrecommit();
+          database.performCommit();
+          TrackerClass.noteCommit();
+          
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Done pruning unindexable docs after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
+
+          return new DocumentSetAndFlags(rval,rvalBoolean);
+
+        }
+        catch (Error e)
         {
-          compositeIDArray[i++] = (String)iter.next();
+          database.signalRollback();
+          TrackerClass.noteRollback();
+          throw e;
         }
-        
-        java.util.Arrays.sort(compositeIDArray);
-        
-        DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()];
-        boolean[] rvalBoolean = new boolean[documentIDMap.size()];
-        i = 0;
-        while (i < compositeIDArray.length)
+        catch (ManifoldCFException e)
         {
-          String compositeDocID = compositeIDArray[i];
-          DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocID);
-          // Determine whether we can delete it from the index or not
-          rvalBoolean[i] = (allowedDocIds.get(compositeDocID) != null);
-          // Set the record status to "being cleaned" and return it
-          rval[i++] = dd;
-          jobQueue.setCleaningStatus(dd.getID());
+          database.signalRollback();
+          TrackerClass.noteRollback();
+          if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+          {
+            if (Logging.perf.isDebugEnabled())
+              Logging.perf.debug("Aborted transaction finding deleteable docs: "+e.getMessage());
+            sleepAmt = getRandomAmount();
+            continue;
+          }
+          throw e;
         }
-
-        TrackerClass.notePrecommit();
-        database.performCommit();
-        TrackerClass.noteCommit();
-        
-        if (Logging.perf.isDebugEnabled())
-          Logging.perf.debug("Done pruning unindexable docs after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
-
-        return new DocumentSetAndFlags(rval,rvalBoolean);
-
-      }
-      catch (Error e)
-      {
-        database.signalRollback();
-        TrackerClass.noteRollback();
-        throw e;
-      }
-      catch (ManifoldCFException e)
-      {
-        database.signalRollback();
-        TrackerClass.noteRollback();
-        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        finally
         {
-          if (Logging.perf.isDebugEnabled())
-            Logging.perf.debug("Aborted transaction finding deleteable docs: "+e.getMessage());
-          sleepAmt = getRandomAmount();
-          continue;
+          database.endTransaction();
         }
-        throw e;
       }
       finally
       {
-        database.endTransaction();
+        lockManager.leaveWriteLock(cleanStufferLock);
         sleepFor(sleepAmt);
       }
     }
@@ -1100,211 +1114,221 @@ public class JobManager implements IJobM
     while (true)
     {
       long sleepAmt = 0L;
-      database.beginTransaction();
+      
+      // Enter a write lock so that multiple threads can't be in here at the same time
+      lockManager.enterWriteLock(deleteStufferLock);
       try
       {
-        if (Logging.perf.isDebugEnabled())
-          Logging.perf.debug("After "+new Long(System.currentTimeMillis()-startTime).toString()+" ms, beginning query to look for documents to put on delete queue");
+        database.beginTransaction();
+        try
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("After "+new Long(System.currentTimeMillis()-startTime).toString()+" ms, beginning query to look for documents to put on delete queue");
 
-        // Note: This query does not do "FOR UPDATE", because it is running under the only thread that can possibly change the document's state to "being deleted".
-        // If FOR UPDATE was included, deadlock happened a lot.
-        ArrayList list = new ArrayList();
-        StringBuilder sb = new StringBuilder("SELECT ");
-        sb.append(jobQueue.idField).append(",")
-          .append(jobQueue.jobIDField).append(",")
-          .append(jobQueue.docHashField).append(",")
-          .append(jobQueue.docIDField).append(",")
-          .append(jobQueue.failTimeField).append(",")
-          .append(jobQueue.failCountField).append(" FROM ").append(jobQueue.getTableName()).append(" t0 WHERE ")
-          .append(database.buildConjunctionClause(list,new ClauseDescription[]{
-            new UnitaryClause("t0."+jobQueue.statusField,jobQueue.statusToString(jobQueue.STATUS_ELIGIBLEFORDELETE))})).append(" AND ")
-          .append("t0.").append(jobQueue.checkTimeField).append("<=? AND ");
-        
-        list.add(new Long(currentTime));
-        
-        sb.append("EXISTS(SELECT 'x' FROM ").append(jobs.getTableName()).append(" t1 WHERE ")
-          .append(database.buildConjunctionClause(list,new ClauseDescription[]{
-            new UnitaryClause("t1."+jobs.statusField,jobs.statusToString(jobs.STATUS_DELETING)),
-            new JoinClause("t1."+jobs.idField,"t0."+jobQueue.jobIDField)})).append(") AND ");
+          // Note: This query does not do "FOR UPDATE", because it is running under the only thread that can possibly change the document's state to "being deleted".
+          // If FOR UPDATE was included, deadlock happened a lot.
+          ArrayList list = new ArrayList();
+          StringBuilder sb = new StringBuilder("SELECT ");
+          sb.append(jobQueue.idField).append(",")
+            .append(jobQueue.jobIDField).append(",")
+            .append(jobQueue.docHashField).append(",")
+            .append(jobQueue.docIDField).append(",")
+            .append(jobQueue.failTimeField).append(",")
+            .append(jobQueue.failCountField).append(" FROM ").append(jobQueue.getTableName()).append(" t0 WHERE ")
+            .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+              new UnitaryClause("t0."+jobQueue.statusField,jobQueue.statusToString(jobQueue.STATUS_ELIGIBLEFORDELETE))})).append(" AND ")
+            .append("t0.").append(jobQueue.checkTimeField).append("<=? AND ");
           
-        sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.getTableName()).append(" t2 WHERE ")
-          .append(database.buildConjunctionClause(list,new ClauseDescription[]{
-            new JoinClause("t2."+jobQueue.docHashField,"t0."+jobQueue.docHashField)})).append(" AND ")
-          .append("t2.").append(jobQueue.statusField).append(" IN (?,?,?,?,?,?) AND ")
-          .append("t2.").append(jobQueue.jobIDField).append("!=t0.").append(jobQueue.jobIDField)
-          .append(") ");
+          list.add(new Long(currentTime));
           
-        list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
-        list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY));
-        list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
-        list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
-        list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
-        list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
-        
-        sb.append(database.constructOffsetLimitClause(0,maxCount));
-        
-        // The checktime is null field check is for backwards compatibility
-        IResultSet set = database.performQuery(sb.toString(),list,null,null,maxCount,null);
+          sb.append("EXISTS(SELECT 'x' FROM ").append(jobs.getTableName()).append(" t1 WHERE ")
+            .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+              new UnitaryClause("t1."+jobs.statusField,jobs.statusToString(jobs.STATUS_DELETING)),
+              new JoinClause("t1."+jobs.idField,"t0."+jobQueue.jobIDField)})).append(") AND ");
+            
+          sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.getTableName()).append(" t2 WHERE ")
+            .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+              new JoinClause("t2."+jobQueue.docHashField,"t0."+jobQueue.docHashField)})).append(" AND ")
+            .append("t2.").append(jobQueue.statusField).append(" IN (?,?,?,?,?,?) AND ")
+            .append("t2.").append(jobQueue.jobIDField).append("!=t0.").append(jobQueue.jobIDField)
+            .append(") ");
+            
+          list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
+          list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY));
+          list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
+          list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
+          list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+          list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
+          
+          sb.append(database.constructOffsetLimitClause(0,maxCount));
+          
+          // The checktime is null field check is for backwards compatibility
+          IResultSet set = database.performQuery(sb.toString(),list,null,null,maxCount,null);
 
-        if (Logging.perf.isDebugEnabled())
-          Logging.perf.debug("Done getting docs to delete queue after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Done getting docs to delete queue after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
 
-        // We need to organize the returned set by connection name, so that we can efficiently
-        // use  getUnindexableDocumentIdentifiers.
-        // This is a table keyed by connection name and containing an ArrayList, which in turn contains DocumentDescription
-        // objects.
-        HashMap connectionNameMap = new HashMap();
-        HashMap documentIDMap = new HashMap();
-        int i = 0;
-        while (i < set.getRowCount())
-        {
-          IResultRow row = set.getRow(i);
-          Long jobID = (Long)row.getValue(jobQueue.jobIDField);
-          String documentIDHash = (String)row.getValue(jobQueue.docHashField);
-          String documentID = (String)row.getValue(jobQueue.docIDField);
-          Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField);
-          Long failCountValue = (Long)row.getValue(jobQueue.failCountField);
-          // Failtime is probably not useful in this context, but we'll bring it along for completeness
-          long failTime;
-          if (failTimeValue == null)
-            failTime = -1L;
-          else
-            failTime = failTimeValue.longValue();
-          int failCount;
-          if (failCountValue == null)
-            failCount = 0;
-          else
-            failCount = (int)failCountValue.longValue();
-          IJobDescription jobDesc = load(jobID);
-          String connectionName = jobDesc.getConnectionName();
-          String outputConnectionName = jobDesc.getOutputConnectionName();
-          DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField),
-            jobID,documentIDHash,documentID,failTime,failCount);
-          String compositeDocumentID = makeCompositeID(documentIDHash,connectionName);
-          documentIDMap.put(compositeDocumentID,dd);
-          Map y = (Map)connectionNameMap.get(connectionName);
-          if (y == null)
+          // We need to organize the returned set by connection name, so that we can efficiently
+          // use  getUnindexableDocumentIdentifiers.
+          // This is a table keyed by connection name and containing an ArrayList, which in turn contains DocumentDescription
+          // objects.
+          HashMap connectionNameMap = new HashMap();
+          HashMap documentIDMap = new HashMap();
+          int i = 0;
+          while (i < set.getRowCount())
           {
-            y = new HashMap();
-            connectionNameMap.put(connectionName,y);
+            IResultRow row = set.getRow(i);
+            Long jobID = (Long)row.getValue(jobQueue.jobIDField);
+            String documentIDHash = (String)row.getValue(jobQueue.docHashField);
+            String documentID = (String)row.getValue(jobQueue.docIDField);
+            Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField);
+            Long failCountValue = (Long)row.getValue(jobQueue.failCountField);
+            // Failtime is probably not useful in this context, but we'll bring it along for completeness
+            long failTime;
+            if (failTimeValue == null)
+              failTime = -1L;
+            else
+              failTime = failTimeValue.longValue();
+            int failCount;
+            if (failCountValue == null)
+              failCount = 0;
+            else
+              failCount = (int)failCountValue.longValue();
+            IJobDescription jobDesc = load(jobID);
+            String connectionName = jobDesc.getConnectionName();
+            String outputConnectionName = jobDesc.getOutputConnectionName();
+            DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField),
+              jobID,documentIDHash,documentID,failTime,failCount);
+            String compositeDocumentID = makeCompositeID(documentIDHash,connectionName);
+            documentIDMap.put(compositeDocumentID,dd);
+            Map y = (Map)connectionNameMap.get(connectionName);
+            if (y == null)
+            {
+              y = new HashMap();
+              connectionNameMap.put(connectionName,y);
+            }
+            ArrayList x = (ArrayList)y.get(outputConnectionName);
+            if (x == null)
+            {
+              // New entry needed
+              x = new ArrayList();
+              y.put(outputConnectionName,x);
+            }
+            x.add(dd);
+            i++;
           }
-          ArrayList x = (ArrayList)y.get(outputConnectionName);
-          if (x == null)
-          {
-            // New entry needed
-            x = new ArrayList();
-            y.put(outputConnectionName,x);
+
+          // For each bin, obtain a filtered answer, and enter all answers into a hash table.
+          // We'll then scan the result again to look up the right descriptions for return,
+          // and delete the ones that are owned multiply.
+          HashMap allowedDocIds = new HashMap();
+          Iterator iter = connectionNameMap.keySet().iterator();
+          while (iter.hasNext())
+          {
+            String connectionName = (String)iter.next();
+            Map y = (Map)connectionNameMap.get(connectionName);
+            Iterator outputIter = y.keySet().iterator();
+            while (outputIter.hasNext())
+            {
+              String outputConnectionName = (String)outputIter.next();
+              ArrayList x = (ArrayList)y.get(outputConnectionName);
+              // Do the filter query
+              DocumentDescription[] descriptions = new DocumentDescription[x.size()];
+              int j = 0;
+              while (j < descriptions.length)
+              {
+                descriptions[j] = (DocumentDescription)x.get(j);
+                j++;
+              }
+              String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName);
+              j = 0;
+              while (j < docIDHashes.length)
+              {
+                String docIDHash = docIDHashes[j++];
+                String key = makeCompositeID(docIDHash,connectionName);
+                allowedDocIds.put(key,docIDHash);
+              }
+            }
           }
-          x.add(dd);
-          i++;
-        }
 
-        // For each bin, obtain a filtered answer, and enter all answers into a hash table.
-        // We'll then scan the result again to look up the right descriptions for return,
-        // and delete the ones that are owned multiply.
-        HashMap allowedDocIds = new HashMap();
-        Iterator iter = connectionNameMap.keySet().iterator();
-        while (iter.hasNext())
-        {
-          String connectionName = (String)iter.next();
-          Map y = (Map)connectionNameMap.get(connectionName);
-          Iterator outputIter = y.keySet().iterator();
-          while (outputIter.hasNext())
+          // Now, assemble a result, and change the state of the records accordingly
+          // First thing to do is order by document hash to reduce chances of deadlock.
+          String[] compositeIDArray = new String[documentIDMap.size()];
+          i = 0;
+          iter = documentIDMap.keySet().iterator();
+          while (iter.hasNext())
           {
-            String outputConnectionName = (String)outputIter.next();
-            ArrayList x = (ArrayList)y.get(outputConnectionName);
-            // Do the filter query
-            DocumentDescription[] descriptions = new DocumentDescription[x.size()];
-            int j = 0;
-            while (j < descriptions.length)
+            compositeIDArray[i++] = (String)iter.next();
+          }
+          
+          java.util.Arrays.sort(compositeIDArray);
+          
+          DocumentDescription[] rval = new DocumentDescription[allowedDocIds.size()];
+          int j = 0;
+          i = 0;
+          while (i < compositeIDArray.length)
+          {
+            String compositeDocumentID = compositeIDArray[i];
+            DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocumentID);
+            if (allowedDocIds.get(compositeDocumentID) == null)
             {
-              descriptions[j] = (DocumentDescription)x.get(j);
-              j++;
+              // Delete this record and do NOT return it.
+              jobQueue.deleteRecord(dd.getID());
+              // What should we do about hopcount here?
+              // We are deleting a record which belongs to a job that is being
+              // cleaned up.  The job itself will go away when this is done,
+              // and so will all the hopcount stuff pertaining to it.  So, the
+              // treatment I've chosen here is to leave the hopcount alone and
+              // let the job cleanup get rid of it at the right time.
+              // Note: carrydown records handled in the same manner...
+              //carryDown.deleteRecords(dd.getJobID(),new String[]{dd.getDocumentIdentifier()});
             }
-            String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName);
-            j = 0;
-            while (j < docIDHashes.length)
+            else
             {
-              String docIDHash = docIDHashes[j++];
-              String key = makeCompositeID(docIDHash,connectionName);
-              allowedDocIds.put(key,docIDHash);
+              // Set the record status to "being deleted" and return it
+              rval[j++] = dd;
+              jobQueue.setDeletingStatus(dd.getID());
             }
+            i++;
           }
-        }
 
-        // Now, assemble a result, and change the state of the records accordingly
-        // First thing to do is order by document hash to reduce chances of deadlock.
-        String[] compositeIDArray = new String[documentIDMap.size()];
-        i = 0;
-        iter = documentIDMap.keySet().iterator();
-        while (iter.hasNext())
+          TrackerClass.notePrecommit();
+          database.performCommit();
+          TrackerClass.noteCommit();
+          
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Done pruning unindexable docs after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
+
+          return rval;
+
+        }
+        catch (Error e)
         {
-          compositeIDArray[i++] = (String)iter.next();
+          database.signalRollback();
+          TrackerClass.noteRollback();
+          throw e;
         }
-        
-        java.util.Arrays.sort(compositeIDArray);
-        
-        DocumentDescription[] rval = new DocumentDescription[allowedDocIds.size()];
-        int j = 0;
-        i = 0;
-        while (i < compositeIDArray.length)
+        catch (ManifoldCFException e)
         {
-          String compositeDocumentID = compositeIDArray[i];
-          DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocumentID);
-          if (allowedDocIds.get(compositeDocumentID) == null)
-          {
-            // Delete this record and do NOT return it.
-            jobQueue.deleteRecord(dd.getID());
-            // What should we do about hopcount here?
-            // We are deleting a record which belongs to a job that is being
-            // cleaned up.  The job itself will go away when this is done,
-            // and so will all the hopcount stuff pertaining to it.  So, the
-            // treatment I've chosen here is to leave the hopcount alone and
-            // let the job cleanup get rid of it at the right time.
-            // Note: carrydown records handled in the same manner...
-            //carryDown.deleteRecords(dd.getJobID(),new String[]{dd.getDocumentIdentifier()});
-          }
-          else
+          database.signalRollback();
+          TrackerClass.noteRollback();
+          if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
           {
-            // Set the record status to "being deleted" and return it
-            rval[j++] = dd;
-            jobQueue.setDeletingStatus(dd.getID());
+            if (Logging.perf.isDebugEnabled())
+              Logging.perf.debug("Aborted transaction finding deleteable docs: "+e.getMessage());
+            sleepAmt = getRandomAmount();
+            continue;
           }
-          i++;
+          throw e;
         }
-
-        TrackerClass.notePrecommit();
-        database.performCommit();
-        TrackerClass.noteCommit();
-        
-        if (Logging.perf.isDebugEnabled())
-          Logging.perf.debug("Done pruning unindexable docs after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
-
-        return rval;
-
-      }
-      catch (Error e)
-      {
-        database.signalRollback();
-        TrackerClass.noteRollback();
-        throw e;
-      }
-      catch (ManifoldCFException e)
-      {
-        database.signalRollback();
-        TrackerClass.noteRollback();
-        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        finally
         {
-          if (Logging.perf.isDebugEnabled())
-            Logging.perf.debug("Aborted transaction finding deleteable docs: "+e.getMessage());
-          sleepAmt = getRandomAmount();
-          continue;
+          database.endTransaction();
         }
-        throw e;
       }
       finally
       {
-        database.endTransaction();
+        lockManager.leaveWriteLock(deleteStufferLock);
         sleepFor(sleepAmt);
       }
     }
@@ -1714,164 +1738,172 @@ public class JobManager implements IJobM
     {
       long sleepAmt = 0L;
 
-      if (Logging.perf.isDebugEnabled())
-      {
-        repeatCount++;
-        Logging.perf.debug(" Attempt "+Integer.toString(repeatCount)+" to expire documents, after "+
-          new Long(System.currentTimeMillis() - startTime)+" ms");
-      }
-
-      database.beginTransaction();
+      // Enter a write lock, so only one thread can be doing this.  That makes FOR UPDATE unnecessary.
+      lockManager.enterWriteLock(expireStufferLock);
       try
       {
-        IResultSet set = database.performQuery(query,list,null,null,n,null);
-
         if (Logging.perf.isDebugEnabled())
-          Logging.perf.debug(" Expiring "+Integer.toString(set.getRowCount())+" documents");
-
-        // To avoid deadlock, we want to update the document id hashes in order.  This means reading into a structure I can sort by docid hash,
-        // before updating any rows in jobqueue.
-        HashMap connectionNameMap = new HashMap();
-        HashMap documentIDMap = new HashMap();
-        Map statusMap = new HashMap();
-
-        int i = 0;
-        while (i < set.getRowCount())
         {
-          IResultRow row = set.getRow(i);
-          Long jobID = (Long)row.getValue(jobQueue.jobIDField);
-          String documentIDHash = (String)row.getValue(jobQueue.docHashField);
-          String documentID = (String)row.getValue(jobQueue.docIDField);
-          int status = jobQueue.stringToStatus(row.getValue(jobQueue.statusField).toString());
-          Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField);
-          Long failCountValue = (Long)row.getValue(jobQueue.failCountField);
-          // Failtime is probably not useful in this context, but we'll bring it along for completeness
-          long failTime;
-          if (failTimeValue == null)
-            failTime = -1L;
-          else
-            failTime = failTimeValue.longValue();
-          int failCount;
-          if (failCountValue == null)
-            failCount = 0;
-          else
-            failCount = (int)failCountValue.longValue();
-          IJobDescription jobDesc = load(jobID);
-          String connectionName = jobDesc.getConnectionName();
-          String outputConnectionName = jobDesc.getOutputConnectionName();
-          DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField),
-            jobID,documentIDHash,documentID,failTime,failCount);
-          String compositeDocumentID = makeCompositeID(documentIDHash,connectionName);
-          documentIDMap.put(compositeDocumentID,dd);
-          statusMap.put(compositeDocumentID,new Integer(status));
-          Map y = (Map)connectionNameMap.get(connectionName);
-          if (y == null)
-          {
-            y = new HashMap();
-            connectionNameMap.put(connectionName,y);
-          }
-          ArrayList x = (ArrayList)y.get(outputConnectionName);
-          if (x == null)
-          {
-            // New entry needed
-            x = new ArrayList();
-            y.put(outputConnectionName,x);
-          }
-          x.add(dd);
-          i++;
+          repeatCount++;
+          Logging.perf.debug(" Attempt "+Integer.toString(repeatCount)+" to expire documents, after "+
+            new Long(System.currentTimeMillis() - startTime)+" ms");
         }
 
-        // For each bin, obtain a filtered answer, and enter all answers into a hash table.
-        // We'll then scan the result again to look up the right descriptions for return,
-        // and delete the ones that are owned multiply.
-        HashMap allowedDocIds = new HashMap();
-        Iterator iter = connectionNameMap.keySet().iterator();
-        while (iter.hasNext())
-        {
-          String connectionName = (String)iter.next();
-          Map y = (Map)connectionNameMap.get(connectionName);
-          Iterator outputIter = y.keySet().iterator();
-          while (outputIter.hasNext())
+        database.beginTransaction();
+        try
+        {
+          IResultSet set = database.performQuery(query,list,null,null,n,null);
+
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug(" Expiring "+Integer.toString(set.getRowCount())+" documents");
+
+          // To avoid deadlock, we want to update the document id hashes in order.  This means reading into a structure I can sort by docid hash,
+          // before updating any rows in jobqueue.
+          HashMap connectionNameMap = new HashMap();
+          HashMap documentIDMap = new HashMap();
+          Map statusMap = new HashMap();
+
+          int i = 0;
+          while (i < set.getRowCount())
           {
-            String outputConnectionName = (String)outputIter.next();
+            IResultRow row = set.getRow(i);
+            Long jobID = (Long)row.getValue(jobQueue.jobIDField);
+            String documentIDHash = (String)row.getValue(jobQueue.docHashField);
+            String documentID = (String)row.getValue(jobQueue.docIDField);
+            int status = jobQueue.stringToStatus(row.getValue(jobQueue.statusField).toString());
+            Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField);
+            Long failCountValue = (Long)row.getValue(jobQueue.failCountField);
+            // Failtime is probably not useful in this context, but we'll bring it along for completeness
+            long failTime;
+            if (failTimeValue == null)
+              failTime = -1L;
+            else
+              failTime = failTimeValue.longValue();
+            int failCount;
+            if (failCountValue == null)
+              failCount = 0;
+            else
+              failCount = (int)failCountValue.longValue();
+            IJobDescription jobDesc = load(jobID);
+            String connectionName = jobDesc.getConnectionName();
+            String outputConnectionName = jobDesc.getOutputConnectionName();
+            DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField),
+              jobID,documentIDHash,documentID,failTime,failCount);
+            String compositeDocumentID = makeCompositeID(documentIDHash,connectionName);
+            documentIDMap.put(compositeDocumentID,dd);
+            statusMap.put(compositeDocumentID,new Integer(status));
+            Map y = (Map)connectionNameMap.get(connectionName);
+            if (y == null)
+            {
+              y = new HashMap();
+              connectionNameMap.put(connectionName,y);
+            }
             ArrayList x = (ArrayList)y.get(outputConnectionName);
-            // Do the filter query
-            DocumentDescription[] descriptions = new DocumentDescription[x.size()];
-            int j = 0;
-            while (j < descriptions.length)
+            if (x == null)
             {
-              descriptions[j] = (DocumentDescription)x.get(j);
-              j++;
+              // New entry needed
+              x = new ArrayList();
+              y.put(outputConnectionName,x);
             }
-            String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName);
-            j = 0;
-            while (j < docIDHashes.length)
+            x.add(dd);
+            i++;
+          }
+
+          // For each bin, obtain a filtered answer, and enter all answers into a hash table.
+          // We'll then scan the result again to look up the right descriptions for return,
+          // and delete the ones that are owned multiply.
+          HashMap allowedDocIds = new HashMap();
+          Iterator iter = connectionNameMap.keySet().iterator();
+          while (iter.hasNext())
+          {
+            String connectionName = (String)iter.next();
+            Map y = (Map)connectionNameMap.get(connectionName);
+            Iterator outputIter = y.keySet().iterator();
+            while (outputIter.hasNext())
             {
-              String docIDHash = docIDHashes[j++];
-              String key = makeCompositeID(docIDHash,connectionName);
-              allowedDocIds.put(key,docIDHash);
+              String outputConnectionName = (String)outputIter.next();
+              ArrayList x = (ArrayList)y.get(outputConnectionName);
+              // Do the filter query
+              DocumentDescription[] descriptions = new DocumentDescription[x.size()];
+              int j = 0;
+              while (j < descriptions.length)
+              {
+                descriptions[j] = (DocumentDescription)x.get(j);
+                j++;
+              }
+              String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName);
+              j = 0;
+              while (j < docIDHashes.length)
+              {
+                String docIDHash = docIDHashes[j++];
+                String key = makeCompositeID(docIDHash,connectionName);
+                allowedDocIds.put(key,docIDHash);
+              }
             }
           }
-        }
 
-        // Now, assemble a result, and change the state of the records accordingly
-        // First thing to do is order by document hash, so we reduce the risk of deadlock.
-        String[] compositeIDArray = new String[documentIDMap.size()];
-        i = 0;
-        iter = documentIDMap.keySet().iterator();
-        while (iter.hasNext())
+          // Now, assemble a result, and change the state of the records accordingly
+          // First thing to do is order by document hash, so we reduce the risk of deadlock.
+          String[] compositeIDArray = new String[documentIDMap.size()];
+          i = 0;
+          iter = documentIDMap.keySet().iterator();
+          while (iter.hasNext())
+          {
+            compositeIDArray[i++] = (String)iter.next();
+          }
+          
+          java.util.Arrays.sort(compositeIDArray);
+          
+          DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()];
+          boolean[] rvalBoolean = new boolean[documentIDMap.size()];
+          i = 0;
+          while (i < compositeIDArray.length)
+          {
+            String compositeDocID = compositeIDArray[i];
+            DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocID);
+            // Determine whether we can delete it from the index or not
+            rvalBoolean[i] = (allowedDocIds.get(compositeDocID) != null);
+            // Set the record status to "being cleaned" and return it
+            rval[i++] = dd;
+            jobQueue.updateActiveRecord(dd.getID(),((Integer)statusMap.get(compositeDocID)).intValue());
+          }
+
+          TrackerClass.notePrecommit();
+          database.performCommit();
+          TrackerClass.noteCommit();
+          
+          return new DocumentSetAndFlags(rval, rvalBoolean);
+
+        }
+        catch (ManifoldCFException e)
         {
-          compositeIDArray[i++] = (String)iter.next();
+          database.signalRollback();
+          TrackerClass.noteRollback();
+          if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+          {
+            if (Logging.perf.isDebugEnabled())
+              Logging.perf.debug("Aborted transaction finding docs to expire: "+e.getMessage());
+            sleepAmt = getRandomAmount();
+            continue;
+          }
+          throw e;
         }
-        
-        java.util.Arrays.sort(compositeIDArray);
-        
-        DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()];
-        boolean[] rvalBoolean = new boolean[documentIDMap.size()];
-        i = 0;
-        while (i < compositeIDArray.length)
+        catch (Error e)
         {
-          String compositeDocID = compositeIDArray[i];
-          DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocID);
-          // Determine whether we can delete it from the index or not
-          rvalBoolean[i] = (allowedDocIds.get(compositeDocID) != null);
-          // Set the record status to "being cleaned" and return it
-          rval[i++] = dd;
-          jobQueue.updateActiveRecord(dd.getID(),((Integer)statusMap.get(compositeDocID)).intValue());
+          database.signalRollback();
+          TrackerClass.noteRollback();
+          throw e;
         }
-
-        TrackerClass.notePrecommit();
-        database.performCommit();
-        TrackerClass.noteCommit();
-        
-        return new DocumentSetAndFlags(rval, rvalBoolean);
-
-      }
-      catch (ManifoldCFException e)
-      {
-        database.signalRollback();
-        TrackerClass.noteRollback();
-        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        finally
         {
-          if (Logging.perf.isDebugEnabled())
-            Logging.perf.debug("Aborted transaction finding docs to expire: "+e.getMessage());
-          sleepAmt = getRandomAmount();
-          continue;
+          database.endTransaction();
         }
-        throw e;
-      }
-      catch (Error e)
-      {
-        database.signalRollback();
-        TrackerClass.noteRollback();
-        throw e;
       }
       finally
       {
-        database.endTransaction();
+        lockManager.leaveWriteLock(expireStufferLock);
         sleepFor(sleepAmt);
       }
-
     }
   }
 
@@ -2179,6 +2211,7 @@ public class JobManager implements IJobM
 
     // Note well: This query does not do "FOR UPDATE".  The reason is that only one thread can possibly change the document's state to active.
     // When FOR UPDATE was included, deadlock conditions were common because of the complexity of this query.
+    // So, instead, as part of CONNECTORS-781, I've introduced a write lock for the pertinent section.
 
     ArrayList list = new ArrayList();
 
@@ -2259,128 +2292,141 @@ public class JobManager implements IJobM
       maxConnections[k] = connection.getMaxConnections();
       k++;
     }
-    IRepositoryConnector[] connectors = RepositoryConnectorFactory.grabMultiple(threadContext,orderingKeys,classNames,configParams,maxConnections);
-    try
+    
+    // Never sleep with a resource locked!
+    while (true)
     {
-      // Hand the connectors off to the ThrottleLimit instance
-      k = 0;
-      while (k < connections.length)
-      {
-        vList.addConnectionName(connections[k].getName(),connectors[k]);
-        k++;
-      }
-
-      // Now we can tack the limit onto the query.  Before this point, remainingDocuments would be crap
-      int limitValue = vList.getRemainingDocuments();
-      sb.append(database.constructOffsetLimitClause(0,limitValue,true));
-
-      if (Logging.perf.isDebugEnabled())
-      {
-        Logging.perf.debug("Queuing documents from time "+currentTimeValue.toString()+" job priority "+currentPriorityValue.toString()+
-          " (up to "+Integer.toString(vList.getRemainingDocuments())+" documents)");
-      }
+      long sleepAmt = 0L;
 
-      while (true)
+      // Write lock insures that only one thread cluster-wide can be doing this at a given time, so FOR UPDATE is unneeded.
+      lockManager.enterWriteLock(stufferLock);
+      try
       {
-        long sleepAmt = 0L;
-        database.beginTransaction();
+    
+        IRepositoryConnector[] connectors = RepositoryConnectorFactory.grabMultiple(threadContext,orderingKeys,classNames,configParams,maxConnections);
         try
         {
-          IResultSet set = database.performQuery(sb.toString(),list,null,null,-1,vList);
+          // Hand the connectors off to the ThrottleLimit instance
+          k = 0;
+          while (k < connections.length)
+          {
+            vList.addConnectionName(connections[k].getName(),connectors[k]);
+            k++;
+          }
 
-          if (Logging.perf.isDebugEnabled())
-            Logging.perf.debug(" Queuing "+Integer.toString(set.getRowCount())+" documents");
+          // Now we can tack the limit onto the query.  Before this point, remainingDocuments would be crap
+          int limitValue = vList.getRemainingDocuments();
+          sb.append(database.constructOffsetLimitClause(0,limitValue,true));
 
-          // To avoid deadlock, we want to update the document id hashes in order.  This means reading into a structure I can sort by docid hash,
-          // before updating any rows in jobqueue.
-          String[] docIDHashes = new String[set.getRowCount()];
-          Map storageMap = new HashMap();
-          Map statusMap = new HashMap();
+          if (Logging.perf.isDebugEnabled())
+          {
+            Logging.perf.debug("Queuing documents from time "+currentTimeValue.toString()+" job priority "+currentPriorityValue.toString()+
+              " (up to "+Integer.toString(vList.getRemainingDocuments())+" documents)");
+          }
 
-          int i = 0;
-          while (i < set.getRowCount())
+          database.beginTransaction();
+          try
           {
-            IResultRow row = set.getRow(i);
-            Long id = (Long)row.getValue(jobQueue.idField);
-            Long jobID = (Long)row.getValue(jobQueue.jobIDField);
-            String docIDHash = (String)row.getValue(jobQueue.docHashField);
-            String docID = (String)row.getValue(jobQueue.docIDField);
-            int status = jobQueue.stringToStatus(row.getValue(jobQueue.statusField).toString());
-            Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField);
-            Long failCountValue = (Long)row.getValue(jobQueue.failCountField);
-            long failTime;
-            if (failTimeValue == null)
-              failTime = -1L;
-            else
-              failTime = failTimeValue.longValue();
-            int failCount;
-            if (failCountValue == null)
-              failCount = -1;
-            else
-              failCount = (int)failCountValue.longValue();
+            IResultSet set = database.performQuery(sb.toString(),list,null,null,-1,vList);
+
+            if (Logging.perf.isDebugEnabled())
+              Logging.perf.debug(" Queuing "+Integer.toString(set.getRowCount())+" documents");
 
-            DocumentDescription dd = new DocumentDescription(id,jobID,docIDHash,docID,failTime,failCount);
-            docIDHashes[i] = docIDHash + ":" + jobID;
-            storageMap.put(docIDHashes[i],dd);
-            statusMap.put(docIDHashes[i],new Integer(status));
-            if (Logging.scheduling.isDebugEnabled())
+            // To avoid deadlock, we want to update the document id hashes in order.  This means reading into a structure I can sort by docid hash,
+            // before updating any rows in jobqueue.
+            String[] docIDHashes = new String[set.getRowCount()];
+            Map storageMap = new HashMap();
+            Map statusMap = new HashMap();
+
+            int i = 0;
+            while (i < set.getRowCount())
             {
-              Double docPriority = (Double)row.getValue(jobQueue.docPriorityField);
-              Logging.scheduling.debug("Stuffing document '"+docID+"' that has priority "+docPriority.toString()+" onto active list");
+              IResultRow row = set.getRow(i);
+              Long id = (Long)row.getValue(jobQueue.idField);
+              Long jobID = (Long)row.getValue(jobQueue.jobIDField);
+              String docIDHash = (String)row.getValue(jobQueue.docHashField);
+              String docID = (String)row.getValue(jobQueue.docIDField);
+              int status = jobQueue.stringToStatus(row.getValue(jobQueue.statusField).toString());
+              Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField);
+              Long failCountValue = (Long)row.getValue(jobQueue.failCountField);
+              long failTime;
+              if (failTimeValue == null)
+                failTime = -1L;
+              else
+                failTime = failTimeValue.longValue();
+              int failCount;
+              if (failCountValue == null)
+                failCount = -1;
+              else
+                failCount = (int)failCountValue.longValue();
+
+              DocumentDescription dd = new DocumentDescription(id,jobID,docIDHash,docID,failTime,failCount);
+              docIDHashes[i] = docIDHash + ":" + jobID;
+              storageMap.put(docIDHashes[i],dd);
+              statusMap.put(docIDHashes[i],new Integer(status));
+              if (Logging.scheduling.isDebugEnabled())
+              {
+                Double docPriority = (Double)row.getValue(jobQueue.docPriorityField);
+                Logging.scheduling.debug("Stuffing document '"+docID+"' that has priority "+docPriority.toString()+" onto active list");
+              }
+              i++;
             }
-            i++;
-          }
 
-          // No duplicates are possible here
-          java.util.Arrays.sort(docIDHashes);
+            // No duplicates are possible here
+            java.util.Arrays.sort(docIDHashes);
 
-          i = 0;
-          while (i < docIDHashes.length)
-          {
-            String docIDHash = docIDHashes[i];
-            DocumentDescription dd = (DocumentDescription)storageMap.get(docIDHash);
-            Long id = dd.getID();
-            int status = ((Integer)statusMap.get(docIDHash)).intValue();
+            i = 0;
+            while (i < docIDHashes.length)
+            {
+              String docIDHash = docIDHashes[i];
+              DocumentDescription dd = (DocumentDescription)storageMap.get(docIDHash);
+              Long id = dd.getID();
+              int status = ((Integer)statusMap.get(docIDHash)).intValue();
 
-            // Set status to "ACTIVE".
-            jobQueue.updateActiveRecord(id,status);
+              // Set status to "ACTIVE".
+              jobQueue.updateActiveRecord(id,status);
 
-            answers.add(dd);
+              answers.add(dd);
 
-            i++;
+              i++;
+            }
+            TrackerClass.notePrecommit();
+            database.performCommit();
+            TrackerClass.noteCommit();
+            break;
           }
-          TrackerClass.notePrecommit();
-          database.performCommit();
-          TrackerClass.noteCommit();
-          break;
-        }
-        catch (ManifoldCFException e)
-        {
-          database.signalRollback();
-          if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+          catch (ManifoldCFException e)
           {
-            if (Logging.perf.isDebugEnabled())
-              Logging.perf.debug("Aborted transaction finding docs to queue: "+e.getMessage());
-            sleepAmt = getRandomAmount();
-            continue;
+            database.signalRollback();
+            if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+            {
+              if (Logging.perf.isDebugEnabled())
+                Logging.perf.debug("Aborted transaction finding docs to queue: "+e.getMessage());
+              sleepAmt = getRandomAmount();
+              continue;
+            }
+            throw e;
+          }
+          catch (Error e)
+          {
+            database.signalRollback();
+            throw e;
+          }
+          finally
+          {
+            database.endTransaction();
           }
-          throw e;
-        }
-        catch (Error e)
-        {
-          database.signalRollback();
-          throw e;
         }
         finally
         {
-          database.endTransaction();
-          sleepFor(sleepAmt);
+          RepositoryConnectorFactory.releaseMultiple(connectors);
         }
       }
-    }
-    finally
-    {
-      RepositoryConnectorFactory.releaseMultiple(connectors);
+      finally
+      {
+        lockManager.leaveWriteLock(stufferLock);
+        sleepFor(sleepAmt);
+      }
     }
   }
 



Mime
View raw message