Return-Path: X-Original-To: apmail-manifoldcf-commits-archive@www.apache.org Delivered-To: apmail-manifoldcf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 870EB107A0 for ; Sat, 16 Nov 2013 13:26:10 +0000 (UTC) Received: (qmail 25672 invoked by uid 500); 16 Nov 2013 13:26:09 -0000 Delivered-To: apmail-manifoldcf-commits-archive@manifoldcf.apache.org Received: (qmail 25618 invoked by uid 500); 16 Nov 2013 13:26:07 -0000 Mailing-List: contact commits-help@manifoldcf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@manifoldcf.apache.org Delivered-To: mailing list commits@manifoldcf.apache.org Received: (qmail 25611 invoked by uid 99); 16 Nov 2013 13:26:06 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 Nov 2013 13:26:06 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 Nov 2013 13:26:03 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 0191623888E4; Sat, 16 Nov 2013 13:25:43 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1542510 - in /manifoldcf/branches/CONNECTORS-781: connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ Date: Sat, 16 Nov 2013 13:25:42 -0000 To: commits@manifoldcf.apache.org From: kwright@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131116132543.0191623888E4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kwright Date: Sat Nov 16 13:25:42 2013 New Revision: 1542510 URL: http://svn.apache.org/r1542510 Log: Install write locks for stuffer threads. Modified: manifoldcf/branches/CONNECTORS-781/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Modified: manifoldcf/branches/CONNECTORS-781/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java?rev=1542510&r1=1542509&r2=1542510&view=diff ============================================================================== --- manifoldcf/branches/CONNECTORS-781/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java (original) +++ manifoldcf/branches/CONNECTORS-781/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java Sat Nov 16 13:25:42 2013 @@ -110,6 +110,8 @@ public class ThrottledFetcher protected static final long TIME_1DAY = 24L * 60L * 60000L; + // The following static bin pools correspond to global resources that will be managed via ILockManager. + /** This is the static pool of ConnectionBin's, keyed by bin name. */ protected static Map connectionBins = new HashMap(); /** This is the static pool of ThrottleBin's, keyed by bin name. */ @@ -464,6 +466,8 @@ public class ThrottledFetcher /** Connection pool for a bin. * An instance of this class tracks the connections that are pooled and that are in use for a specific bin. + * NOTE WELL: This resource must be constrained globally, across all JVMs! + * To do that, we need an ILockManager to handle the global data for each bin. */ protected static class ConnectionBin { @@ -749,6 +753,8 @@ public class ThrottledFetcher * 3) For chunks that have started but not finished, we keep track of their size and estimated elapsed time in order to schedule when * new chunks from other connections can start. * + * NOTE WELL: This resource must be constrained globally, across all JVMs! + * To do that, we need an ILockManager to handle the global data for each bin. */ protected static class ThrottleBin { Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1542510&r1=1542509&r2=1542510&view=diff ============================================================================== --- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original) +++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Sat Nov 16 13:25:42 2013 @@ -32,6 +32,10 @@ public class JobManager implements IJobM { public static final String _rcsid = "@(#)$Id: JobManager.java 998576 2010-09-19 01:11:02Z kwright $"; + protected static final String stufferLock = "_STUFFER_"; + protected static final String deleteStufferLock = "_DELETESTUFFER_"; + protected static final String expireStufferLock = "_EXPIRESTUFFER_"; + protected static final String cleanStufferLock = "_CLEANSTUFFER_"; protected static final String hopLock = "_HOPLOCK_"; // Member variables @@ -848,198 +852,208 @@ public class JobManager implements IJobM while (true) { long sleepAmt = 0L; - database.beginTransaction(); + + // Enter a write lock. This means we don't need a FOR UPDATE on the query. + lockManager.enterWriteLock(cleanStufferLock); try { - if (Logging.perf.isDebugEnabled()) - Logging.perf.debug("After "+new Long(System.currentTimeMillis()-startTime).toString()+" ms, beginning query to look for documents to put on cleaning queue"); + database.beginTransaction(); + try + { + if (Logging.perf.isDebugEnabled()) + Logging.perf.debug("After "+new Long(System.currentTimeMillis()-startTime).toString()+" ms, beginning query to look for documents to put on cleaning queue"); - // Note: This query does not do "FOR UPDATE", because it is running under the only thread that can possibly change the document's state to "being cleaned". - ArrayList list = new ArrayList(); - - StringBuilder sb = new StringBuilder("SELECT "); - sb.append(jobQueue.idField).append(",") - .append(jobQueue.jobIDField).append(",") - .append(jobQueue.docHashField).append(",") - .append(jobQueue.docIDField).append(",") - .append(jobQueue.failTimeField).append(",") - .append(jobQueue.failCountField) - .append(" FROM ").append(jobQueue.getTableName()).append(" t0 WHERE ") - .append(database.buildConjunctionClause(list,new ClauseDescription[]{ - new UnitaryClause("t0."+jobQueue.statusField,jobQueue.statusToString(jobQueue.STATUS_PURGATORY))})).append(" AND ") - .append("(t0.").append(jobQueue.checkTimeField).append(" IS NULL OR t0.").append(jobQueue.checkTimeField).append("<=?) AND "); + // Note: This query does not do "FOR UPDATE", because it is running under the only thread that can possibly change the document's state to "being cleaned". + ArrayList list = new ArrayList(); - list.add(new Long(currentTime)); + StringBuilder sb = new StringBuilder("SELECT "); + sb.append(jobQueue.idField).append(",") + .append(jobQueue.jobIDField).append(",") + .append(jobQueue.docHashField).append(",") + .append(jobQueue.docIDField).append(",") + .append(jobQueue.failTimeField).append(",") + .append(jobQueue.failCountField) + .append(" FROM ").append(jobQueue.getTableName()).append(" t0 WHERE ") + .append(database.buildConjunctionClause(list,new ClauseDescription[]{ + new UnitaryClause("t0."+jobQueue.statusField,jobQueue.statusToString(jobQueue.STATUS_PURGATORY))})).append(" AND ") + .append("(t0.").append(jobQueue.checkTimeField).append(" IS NULL OR t0.").append(jobQueue.checkTimeField).append("<=?) AND "); + + list.add(new Long(currentTime)); - sb.append("EXISTS(SELECT 'x' FROM ").append(jobs.getTableName()).append(" t1 WHERE ") - .append(database.buildConjunctionClause(list,new ClauseDescription[]{ - new UnitaryClause("t1."+jobs.statusField,jobs.statusToString(jobs.STATUS_SHUTTINGDOWN)), - new JoinClause("t1."+jobs.idField,"t0."+jobQueue.jobIDField)})) - .append(") AND "); - - sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.getTableName()).append(" t2 WHERE ") - .append(database.buildConjunctionClause(list,new ClauseDescription[]{ - new JoinClause("t2."+jobQueue.docHashField,"t0."+jobQueue.docHashField)})).append(" AND ") - .append("t2.").append(jobQueue.statusField).append(" IN (?,?,?,?,?,?) AND ") - .append("t2.").append(jobQueue.jobIDField).append("!=t0.").append(jobQueue.jobIDField) - .append(") "); + sb.append("EXISTS(SELECT 'x' FROM ").append(jobs.getTableName()).append(" t1 WHERE ") + .append(database.buildConjunctionClause(list,new ClauseDescription[]{ + new UnitaryClause("t1."+jobs.statusField,jobs.statusToString(jobs.STATUS_SHUTTINGDOWN)), + new JoinClause("t1."+jobs.idField,"t0."+jobQueue.jobIDField)})) + .append(") AND "); - list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE)); - list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY)); - list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN)); - list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY)); - list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED)); - list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED)); - - sb.append(database.constructOffsetLimitClause(0,maxCount)); - - // The checktime is null field check is for backwards compatibility - IResultSet set = database.performQuery(sb.toString(),list,null,null,maxCount,null); + sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.getTableName()).append(" t2 WHERE ") + .append(database.buildConjunctionClause(list,new ClauseDescription[]{ + new JoinClause("t2."+jobQueue.docHashField,"t0."+jobQueue.docHashField)})).append(" AND ") + .append("t2.").append(jobQueue.statusField).append(" IN (?,?,?,?,?,?) AND ") + .append("t2.").append(jobQueue.jobIDField).append("!=t0.").append(jobQueue.jobIDField) + .append(") "); + + list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE)); + list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY)); + list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN)); + list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY)); + list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED)); + list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED)); - if (Logging.perf.isDebugEnabled()) - Logging.perf.debug("Done getting docs to cleaning queue after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms."); + sb.append(database.constructOffsetLimitClause(0,maxCount)); + + // The checktime is null field check is for backwards compatibility + IResultSet set = database.performQuery(sb.toString(),list,null,null,maxCount,null); - // We need to organize the returned set by connection name and output connection name, so that we can efficiently - // use getUnindexableDocumentIdentifiers. - // This is a table keyed by connection name and containing an ArrayList, which in turn contains DocumentDescription - // objects. - HashMap connectionNameMap = new HashMap(); - HashMap documentIDMap = new HashMap(); - int i = 0; - while (i < set.getRowCount()) - { - IResultRow row = set.getRow(i); - Long jobID = (Long)row.getValue(jobQueue.jobIDField); - String documentIDHash = (String)row.getValue(jobQueue.docHashField); - String documentID = (String)row.getValue(jobQueue.docIDField); - Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField); - Long failCountValue = (Long)row.getValue(jobQueue.failCountField); - // Failtime is probably not useful in this context, but we'll bring it along for completeness - long failTime; - if (failTimeValue == null) - failTime = -1L; - else - failTime = failTimeValue.longValue(); - int failCount; - if (failCountValue == null) - failCount = 0; - else - failCount = (int)failCountValue.longValue(); - IJobDescription jobDesc = load(jobID); - String connectionName = jobDesc.getConnectionName(); - String outputConnectionName = jobDesc.getOutputConnectionName(); - DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField), - jobID,documentIDHash,documentID,failTime,failCount); - String compositeDocumentID = makeCompositeID(documentIDHash,connectionName); - documentIDMap.put(compositeDocumentID,dd); - Map y = (Map)connectionNameMap.get(connectionName); - if (y == null) - { - y = new HashMap(); - connectionNameMap.put(connectionName,y); - } - ArrayList x = (ArrayList)y.get(outputConnectionName); - if (x == null) - { - // New entry needed - x = new ArrayList(); - y.put(outputConnectionName,x); - } - x.add(dd); - i++; - } + if (Logging.perf.isDebugEnabled()) + Logging.perf.debug("Done getting docs to cleaning queue after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms."); - // For each bin, obtain a filtered answer, and enter all answers into a hash table. - // We'll then scan the result again to look up the right descriptions for return, - // and delete the ones that are owned multiply. - HashMap allowedDocIds = new HashMap(); - Iterator iter = connectionNameMap.keySet().iterator(); - while (iter.hasNext()) - { - String connectionName = (String)iter.next(); - Map y = (Map)connectionNameMap.get(connectionName); - Iterator outputIter = y.keySet().iterator(); - while (outputIter.hasNext()) + // We need to organize the returned set by connection name and output connection name, so that we can efficiently + // use getUnindexableDocumentIdentifiers. + // This is a table keyed by connection name and containing an ArrayList, which in turn contains DocumentDescription + // objects. + HashMap connectionNameMap = new HashMap(); + HashMap documentIDMap = new HashMap(); + int i = 0; + while (i < set.getRowCount()) { - String outputConnectionName = (String)outputIter.next(); + IResultRow row = set.getRow(i); + Long jobID = (Long)row.getValue(jobQueue.jobIDField); + String documentIDHash = (String)row.getValue(jobQueue.docHashField); + String documentID = (String)row.getValue(jobQueue.docIDField); + Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField); + Long failCountValue = (Long)row.getValue(jobQueue.failCountField); + // Failtime is probably not useful in this context, but we'll bring it along for completeness + long failTime; + if (failTimeValue == null) + failTime = -1L; + else + failTime = failTimeValue.longValue(); + int failCount; + if (failCountValue == null) + failCount = 0; + else + failCount = (int)failCountValue.longValue(); + IJobDescription jobDesc = load(jobID); + String connectionName = jobDesc.getConnectionName(); + String outputConnectionName = jobDesc.getOutputConnectionName(); + DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField), + jobID,documentIDHash,documentID,failTime,failCount); + String compositeDocumentID = makeCompositeID(documentIDHash,connectionName); + documentIDMap.put(compositeDocumentID,dd); + Map y = (Map)connectionNameMap.get(connectionName); + if (y == null) + { + y = new HashMap(); + connectionNameMap.put(connectionName,y); + } ArrayList x = (ArrayList)y.get(outputConnectionName); - // Do the filter query - DocumentDescription[] descriptions = new DocumentDescription[x.size()]; - int j = 0; - while (j < descriptions.length) + if (x == null) { - descriptions[j] = (DocumentDescription)x.get(j); - j++; + // New entry needed + x = new ArrayList(); + y.put(outputConnectionName,x); } - String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName); - j = 0; - while (j < docIDHashes.length) + x.add(dd); + i++; + } + + // For each bin, obtain a filtered answer, and enter all answers into a hash table. + // We'll then scan the result again to look up the right descriptions for return, + // and delete the ones that are owned multiply. + HashMap allowedDocIds = new HashMap(); + Iterator iter = connectionNameMap.keySet().iterator(); + while (iter.hasNext()) + { + String connectionName = (String)iter.next(); + Map y = (Map)connectionNameMap.get(connectionName); + Iterator outputIter = y.keySet().iterator(); + while (outputIter.hasNext()) { - String docIDHash = docIDHashes[j++]; - String key = makeCompositeID(docIDHash,connectionName); - allowedDocIds.put(key,docIDHash); + String outputConnectionName = (String)outputIter.next(); + ArrayList x = (ArrayList)y.get(outputConnectionName); + // Do the filter query + DocumentDescription[] descriptions = new DocumentDescription[x.size()]; + int j = 0; + while (j < descriptions.length) + { + descriptions[j] = (DocumentDescription)x.get(j); + j++; + } + String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName); + j = 0; + while (j < docIDHashes.length) + { + String docIDHash = docIDHashes[j++]; + String key = makeCompositeID(docIDHash,connectionName); + allowedDocIds.put(key,docIDHash); + } } } - } - // Now, assemble a result, and change the state of the records accordingly - // First thing to do is order by document hash, so we reduce the risk of deadlock. - String[] compositeIDArray = new String[documentIDMap.size()]; - i = 0; - iter = documentIDMap.keySet().iterator(); - while (iter.hasNext()) + // Now, assemble a result, and change the state of the records accordingly + // First thing to do is order by document hash, so we reduce the risk of deadlock. + String[] compositeIDArray = new String[documentIDMap.size()]; + i = 0; + iter = documentIDMap.keySet().iterator(); + while (iter.hasNext()) + { + compositeIDArray[i++] = (String)iter.next(); + } + + java.util.Arrays.sort(compositeIDArray); + + DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()]; + boolean[] rvalBoolean = new boolean[documentIDMap.size()]; + i = 0; + while (i < compositeIDArray.length) + { + String compositeDocID = compositeIDArray[i]; + DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocID); + // Determine whether we can delete it from the index or not + rvalBoolean[i] = (allowedDocIds.get(compositeDocID) != null); + // Set the record status to "being cleaned" and return it + rval[i++] = dd; + jobQueue.setCleaningStatus(dd.getID()); + } + + TrackerClass.notePrecommit(); + database.performCommit(); + TrackerClass.noteCommit(); + + if (Logging.perf.isDebugEnabled()) + Logging.perf.debug("Done pruning unindexable docs after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms."); + + return new DocumentSetAndFlags(rval,rvalBoolean); + + } + catch (Error e) { - compositeIDArray[i++] = (String)iter.next(); + database.signalRollback(); + TrackerClass.noteRollback(); + throw e; } - - java.util.Arrays.sort(compositeIDArray); - - DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()]; - boolean[] rvalBoolean = new boolean[documentIDMap.size()]; - i = 0; - while (i < compositeIDArray.length) + catch (ManifoldCFException e) { - String compositeDocID = compositeIDArray[i]; - DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocID); - // Determine whether we can delete it from the index or not - rvalBoolean[i] = (allowedDocIds.get(compositeDocID) != null); - // Set the record status to "being cleaned" and return it - rval[i++] = dd; - jobQueue.setCleaningStatus(dd.getID()); + database.signalRollback(); + TrackerClass.noteRollback(); + if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT) + { + if (Logging.perf.isDebugEnabled()) + Logging.perf.debug("Aborted transaction finding deleteable docs: "+e.getMessage()); + sleepAmt = getRandomAmount(); + continue; + } + throw e; } - - TrackerClass.notePrecommit(); - database.performCommit(); - TrackerClass.noteCommit(); - - if (Logging.perf.isDebugEnabled()) - Logging.perf.debug("Done pruning unindexable docs after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms."); - - return new DocumentSetAndFlags(rval,rvalBoolean); - - } - catch (Error e) - { - database.signalRollback(); - TrackerClass.noteRollback(); - throw e; - } - catch (ManifoldCFException e) - { - database.signalRollback(); - TrackerClass.noteRollback(); - if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT) + finally { - if (Logging.perf.isDebugEnabled()) - Logging.perf.debug("Aborted transaction finding deleteable docs: "+e.getMessage()); - sleepAmt = getRandomAmount(); - continue; + database.endTransaction(); } - throw e; } finally { - database.endTransaction(); + lockManager.leaveWriteLock(cleanStufferLock); sleepFor(sleepAmt); } } @@ -1100,211 +1114,221 @@ public class JobManager implements IJobM while (true) { long sleepAmt = 0L; - database.beginTransaction(); + + // Enter a write lock so that multiple threads can't be in here at the same time + lockManager.enterWriteLock(deleteStufferLock); try { - if (Logging.perf.isDebugEnabled()) - Logging.perf.debug("After "+new Long(System.currentTimeMillis()-startTime).toString()+" ms, beginning query to look for documents to put on delete queue"); + database.beginTransaction(); + try + { + if (Logging.perf.isDebugEnabled()) + Logging.perf.debug("After "+new Long(System.currentTimeMillis()-startTime).toString()+" ms, beginning query to look for documents to put on delete queue"); - // Note: This query does not do "FOR UPDATE", because it is running under the only thread that can possibly change the document's state to "being deleted". - // If FOR UPDATE was included, deadlock happened a lot. - ArrayList list = new ArrayList(); - StringBuilder sb = new StringBuilder("SELECT "); - sb.append(jobQueue.idField).append(",") - .append(jobQueue.jobIDField).append(",") - .append(jobQueue.docHashField).append(",") - .append(jobQueue.docIDField).append(",") - .append(jobQueue.failTimeField).append(",") - .append(jobQueue.failCountField).append(" FROM ").append(jobQueue.getTableName()).append(" t0 WHERE ") - .append(database.buildConjunctionClause(list,new ClauseDescription[]{ - new UnitaryClause("t0."+jobQueue.statusField,jobQueue.statusToString(jobQueue.STATUS_ELIGIBLEFORDELETE))})).append(" AND ") - .append("t0.").append(jobQueue.checkTimeField).append("<=? AND "); - - list.add(new Long(currentTime)); - - sb.append("EXISTS(SELECT 'x' FROM ").append(jobs.getTableName()).append(" t1 WHERE ") - .append(database.buildConjunctionClause(list,new ClauseDescription[]{ - new UnitaryClause("t1."+jobs.statusField,jobs.statusToString(jobs.STATUS_DELETING)), - new JoinClause("t1."+jobs.idField,"t0."+jobQueue.jobIDField)})).append(") AND "); + // Note: This query does not do "FOR UPDATE", because it is running under the only thread that can possibly change the document's state to "being deleted". + // If FOR UPDATE was included, deadlock happened a lot. + ArrayList list = new ArrayList(); + StringBuilder sb = new StringBuilder("SELECT "); + sb.append(jobQueue.idField).append(",") + .append(jobQueue.jobIDField).append(",") + .append(jobQueue.docHashField).append(",") + .append(jobQueue.docIDField).append(",") + .append(jobQueue.failTimeField).append(",") + .append(jobQueue.failCountField).append(" FROM ").append(jobQueue.getTableName()).append(" t0 WHERE ") + .append(database.buildConjunctionClause(list,new ClauseDescription[]{ + new UnitaryClause("t0."+jobQueue.statusField,jobQueue.statusToString(jobQueue.STATUS_ELIGIBLEFORDELETE))})).append(" AND ") + .append("t0.").append(jobQueue.checkTimeField).append("<=? AND "); - sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.getTableName()).append(" t2 WHERE ") - .append(database.buildConjunctionClause(list,new ClauseDescription[]{ - new JoinClause("t2."+jobQueue.docHashField,"t0."+jobQueue.docHashField)})).append(" AND ") - .append("t2.").append(jobQueue.statusField).append(" IN (?,?,?,?,?,?) AND ") - .append("t2.").append(jobQueue.jobIDField).append("!=t0.").append(jobQueue.jobIDField) - .append(") "); + list.add(new Long(currentTime)); - list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE)); - list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY)); - list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN)); - list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY)); - list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED)); - list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED)); - - sb.append(database.constructOffsetLimitClause(0,maxCount)); - - // The checktime is null field check is for backwards compatibility - IResultSet set = database.performQuery(sb.toString(),list,null,null,maxCount,null); + sb.append("EXISTS(SELECT 'x' FROM ").append(jobs.getTableName()).append(" t1 WHERE ") + .append(database.buildConjunctionClause(list,new ClauseDescription[]{ + new UnitaryClause("t1."+jobs.statusField,jobs.statusToString(jobs.STATUS_DELETING)), + new JoinClause("t1."+jobs.idField,"t0."+jobQueue.jobIDField)})).append(") AND "); + + sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.getTableName()).append(" t2 WHERE ") + .append(database.buildConjunctionClause(list,new ClauseDescription[]{ + new JoinClause("t2."+jobQueue.docHashField,"t0."+jobQueue.docHashField)})).append(" AND ") + .append("t2.").append(jobQueue.statusField).append(" IN (?,?,?,?,?,?) AND ") + .append("t2.").append(jobQueue.jobIDField).append("!=t0.").append(jobQueue.jobIDField) + .append(") "); + + list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE)); + list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY)); + list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN)); + list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY)); + list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED)); + list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED)); + + sb.append(database.constructOffsetLimitClause(0,maxCount)); + + // The checktime is null field check is for backwards compatibility + IResultSet set = database.performQuery(sb.toString(),list,null,null,maxCount,null); - if (Logging.perf.isDebugEnabled()) - Logging.perf.debug("Done getting docs to delete queue after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms."); + if (Logging.perf.isDebugEnabled()) + Logging.perf.debug("Done getting docs to delete queue after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms."); - // We need to organize the returned set by connection name, so that we can efficiently - // use getUnindexableDocumentIdentifiers. - // This is a table keyed by connection name and containing an ArrayList, which in turn contains DocumentDescription - // objects. - HashMap connectionNameMap = new HashMap(); - HashMap documentIDMap = new HashMap(); - int i = 0; - while (i < set.getRowCount()) - { - IResultRow row = set.getRow(i); - Long jobID = (Long)row.getValue(jobQueue.jobIDField); - String documentIDHash = (String)row.getValue(jobQueue.docHashField); - String documentID = (String)row.getValue(jobQueue.docIDField); - Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField); - Long failCountValue = (Long)row.getValue(jobQueue.failCountField); - // Failtime is probably not useful in this context, but we'll bring it along for completeness - long failTime; - if (failTimeValue == null) - failTime = -1L; - else - failTime = failTimeValue.longValue(); - int failCount; - if (failCountValue == null) - failCount = 0; - else - failCount = (int)failCountValue.longValue(); - IJobDescription jobDesc = load(jobID); - String connectionName = jobDesc.getConnectionName(); - String outputConnectionName = jobDesc.getOutputConnectionName(); - DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField), - jobID,documentIDHash,documentID,failTime,failCount); - String compositeDocumentID = makeCompositeID(documentIDHash,connectionName); - documentIDMap.put(compositeDocumentID,dd); - Map y = (Map)connectionNameMap.get(connectionName); - if (y == null) + // We need to organize the returned set by connection name, so that we can efficiently + // use getUnindexableDocumentIdentifiers. + // This is a table keyed by connection name and containing an ArrayList, which in turn contains DocumentDescription + // objects. + HashMap connectionNameMap = new HashMap(); + HashMap documentIDMap = new HashMap(); + int i = 0; + while (i < set.getRowCount()) { - y = new HashMap(); - connectionNameMap.put(connectionName,y); + IResultRow row = set.getRow(i); + Long jobID = (Long)row.getValue(jobQueue.jobIDField); + String documentIDHash = (String)row.getValue(jobQueue.docHashField); + String documentID = (String)row.getValue(jobQueue.docIDField); + Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField); + Long failCountValue = (Long)row.getValue(jobQueue.failCountField); + // Failtime is probably not useful in this context, but we'll bring it along for completeness + long failTime; + if (failTimeValue == null) + failTime = -1L; + else + failTime = failTimeValue.longValue(); + int failCount; + if (failCountValue == null) + failCount = 0; + else + failCount = (int)failCountValue.longValue(); + IJobDescription jobDesc = load(jobID); + String connectionName = jobDesc.getConnectionName(); + String outputConnectionName = jobDesc.getOutputConnectionName(); + DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField), + jobID,documentIDHash,documentID,failTime,failCount); + String compositeDocumentID = makeCompositeID(documentIDHash,connectionName); + documentIDMap.put(compositeDocumentID,dd); + Map y = (Map)connectionNameMap.get(connectionName); + if (y == null) + { + y = new HashMap(); + connectionNameMap.put(connectionName,y); + } + ArrayList x = (ArrayList)y.get(outputConnectionName); + if (x == null) + { + // New entry needed + x = new ArrayList(); + y.put(outputConnectionName,x); + } + x.add(dd); + i++; } - ArrayList x = (ArrayList)y.get(outputConnectionName); - if (x == null) - { - // New entry needed - x = new ArrayList(); - y.put(outputConnectionName,x); + + // For each bin, obtain a filtered answer, and enter all answers into a hash table. + // We'll then scan the result again to look up the right descriptions for return, + // and delete the ones that are owned multiply. + HashMap allowedDocIds = new HashMap(); + Iterator iter = connectionNameMap.keySet().iterator(); + while (iter.hasNext()) + { + String connectionName = (String)iter.next(); + Map y = (Map)connectionNameMap.get(connectionName); + Iterator outputIter = y.keySet().iterator(); + while (outputIter.hasNext()) + { + String outputConnectionName = (String)outputIter.next(); + ArrayList x = (ArrayList)y.get(outputConnectionName); + // Do the filter query + DocumentDescription[] descriptions = new DocumentDescription[x.size()]; + int j = 0; + while (j < descriptions.length) + { + descriptions[j] = (DocumentDescription)x.get(j); + j++; + } + String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName); + j = 0; + while (j < docIDHashes.length) + { + String docIDHash = docIDHashes[j++]; + String key = makeCompositeID(docIDHash,connectionName); + allowedDocIds.put(key,docIDHash); + } + } } - x.add(dd); - i++; - } - // For each bin, obtain a filtered answer, and enter all answers into a hash table. - // We'll then scan the result again to look up the right descriptions for return, - // and delete the ones that are owned multiply. - HashMap allowedDocIds = new HashMap(); - Iterator iter = connectionNameMap.keySet().iterator(); - while (iter.hasNext()) - { - String connectionName = (String)iter.next(); - Map y = (Map)connectionNameMap.get(connectionName); - Iterator outputIter = y.keySet().iterator(); - while (outputIter.hasNext()) + // Now, assemble a result, and change the state of the records accordingly + // First thing to do is order by document hash to reduce chances of deadlock. + String[] compositeIDArray = new String[documentIDMap.size()]; + i = 0; + iter = documentIDMap.keySet().iterator(); + while (iter.hasNext()) { - String outputConnectionName = (String)outputIter.next(); - ArrayList x = (ArrayList)y.get(outputConnectionName); - // Do the filter query - DocumentDescription[] descriptions = new DocumentDescription[x.size()]; - int j = 0; - while (j < descriptions.length) + compositeIDArray[i++] = (String)iter.next(); + } + + java.util.Arrays.sort(compositeIDArray); + + DocumentDescription[] rval = new DocumentDescription[allowedDocIds.size()]; + int j = 0; + i = 0; + while (i < compositeIDArray.length) + { + String compositeDocumentID = compositeIDArray[i]; + DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocumentID); + if (allowedDocIds.get(compositeDocumentID) == null) { - descriptions[j] = (DocumentDescription)x.get(j); - j++; + // Delete this record and do NOT return it. + jobQueue.deleteRecord(dd.getID()); + // What should we do about hopcount here? + // We are deleting a record which belongs to a job that is being + // cleaned up. The job itself will go away when this is done, + // and so will all the hopcount stuff pertaining to it. So, the + // treatment I've chosen here is to leave the hopcount alone and + // let the job cleanup get rid of it at the right time. + // Note: carrydown records handled in the same manner... + //carryDown.deleteRecords(dd.getJobID(),new String[]{dd.getDocumentIdentifier()}); } - String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName); - j = 0; - while (j < docIDHashes.length) + else { - String docIDHash = docIDHashes[j++]; - String key = makeCompositeID(docIDHash,connectionName); - allowedDocIds.put(key,docIDHash); + // Set the record status to "being deleted" and return it + rval[j++] = dd; + jobQueue.setDeletingStatus(dd.getID()); } + i++; } - } - // Now, assemble a result, and change the state of the records accordingly - // First thing to do is order by document hash to reduce chances of deadlock. - String[] compositeIDArray = new String[documentIDMap.size()]; - i = 0; - iter = documentIDMap.keySet().iterator(); - while (iter.hasNext()) + TrackerClass.notePrecommit(); + database.performCommit(); + TrackerClass.noteCommit(); + + if (Logging.perf.isDebugEnabled()) + Logging.perf.debug("Done pruning unindexable docs after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms."); + + return rval; + + } + catch (Error e) { - compositeIDArray[i++] = (String)iter.next(); + database.signalRollback(); + TrackerClass.noteRollback(); + throw e; } - - java.util.Arrays.sort(compositeIDArray); - - DocumentDescription[] rval = new DocumentDescription[allowedDocIds.size()]; - int j = 0; - i = 0; - while (i < compositeIDArray.length) + catch (ManifoldCFException e) { - String compositeDocumentID = compositeIDArray[i]; - DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocumentID); - if (allowedDocIds.get(compositeDocumentID) == null) - { - // Delete this record and do NOT return it. - jobQueue.deleteRecord(dd.getID()); - // What should we do about hopcount here? - // We are deleting a record which belongs to a job that is being - // cleaned up. The job itself will go away when this is done, - // and so will all the hopcount stuff pertaining to it. So, the - // treatment I've chosen here is to leave the hopcount alone and - // let the job cleanup get rid of it at the right time. - // Note: carrydown records handled in the same manner... - //carryDown.deleteRecords(dd.getJobID(),new String[]{dd.getDocumentIdentifier()}); - } - else + database.signalRollback(); + TrackerClass.noteRollback(); + if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT) { - // Set the record status to "being deleted" and return it - rval[j++] = dd; - jobQueue.setDeletingStatus(dd.getID()); + if (Logging.perf.isDebugEnabled()) + Logging.perf.debug("Aborted transaction finding deleteable docs: "+e.getMessage()); + sleepAmt = getRandomAmount(); + continue; } - i++; + throw e; } - - TrackerClass.notePrecommit(); - database.performCommit(); - TrackerClass.noteCommit(); - - if (Logging.perf.isDebugEnabled()) - Logging.perf.debug("Done pruning unindexable docs after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms."); - - return rval; - - } - catch (Error e) - { - database.signalRollback(); - TrackerClass.noteRollback(); - throw e; - } - catch (ManifoldCFException e) - { - database.signalRollback(); - TrackerClass.noteRollback(); - if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT) + finally { - if (Logging.perf.isDebugEnabled()) - Logging.perf.debug("Aborted transaction finding deleteable docs: "+e.getMessage()); - sleepAmt = getRandomAmount(); - continue; + database.endTransaction(); } - throw e; } finally { - database.endTransaction(); + lockManager.leaveWriteLock(deleteStufferLock); sleepFor(sleepAmt); } } @@ -1714,164 +1738,172 @@ public class JobManager implements IJobM { long sleepAmt = 0L; - if (Logging.perf.isDebugEnabled()) - { - repeatCount++; - Logging.perf.debug(" Attempt "+Integer.toString(repeatCount)+" to expire documents, after "+ - new Long(System.currentTimeMillis() - startTime)+" ms"); - } - - database.beginTransaction(); + // Enter a write lock, so only one thread can be doing this. That makes FOR UPDATE unnecessary. + lockManager.enterWriteLock(expireStufferLock); try { - IResultSet set = database.performQuery(query,list,null,null,n,null); - if (Logging.perf.isDebugEnabled()) - Logging.perf.debug(" Expiring "+Integer.toString(set.getRowCount())+" documents"); - - // To avoid deadlock, we want to update the document id hashes in order. This means reading into a structure I can sort by docid hash, - // before updating any rows in jobqueue. - HashMap connectionNameMap = new HashMap(); - HashMap documentIDMap = new HashMap(); - Map statusMap = new HashMap(); - - int i = 0; - while (i < set.getRowCount()) { - IResultRow row = set.getRow(i); - Long jobID = (Long)row.getValue(jobQueue.jobIDField); - String documentIDHash = (String)row.getValue(jobQueue.docHashField); - String documentID = (String)row.getValue(jobQueue.docIDField); - int status = jobQueue.stringToStatus(row.getValue(jobQueue.statusField).toString()); - Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField); - Long failCountValue = (Long)row.getValue(jobQueue.failCountField); - // Failtime is probably not useful in this context, but we'll bring it along for completeness - long failTime; - if (failTimeValue == null) - failTime = -1L; - else - failTime = failTimeValue.longValue(); - int failCount; - if (failCountValue == null) - failCount = 0; - else - failCount = (int)failCountValue.longValue(); - IJobDescription jobDesc = load(jobID); - String connectionName = jobDesc.getConnectionName(); - String outputConnectionName = jobDesc.getOutputConnectionName(); - DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField), - jobID,documentIDHash,documentID,failTime,failCount); - String compositeDocumentID = makeCompositeID(documentIDHash,connectionName); - documentIDMap.put(compositeDocumentID,dd); - statusMap.put(compositeDocumentID,new Integer(status)); - Map y = (Map)connectionNameMap.get(connectionName); - if (y == null) - { - y = new HashMap(); - connectionNameMap.put(connectionName,y); - } - ArrayList x = (ArrayList)y.get(outputConnectionName); - if (x == null) - { - // New entry needed - x = new ArrayList(); - y.put(outputConnectionName,x); - } - x.add(dd); - i++; + repeatCount++; + Logging.perf.debug(" Attempt "+Integer.toString(repeatCount)+" to expire documents, after "+ + new Long(System.currentTimeMillis() - startTime)+" ms"); } - // For each bin, obtain a filtered answer, and enter all answers into a hash table. - // We'll then scan the result again to look up the right descriptions for return, - // and delete the ones that are owned multiply. - HashMap allowedDocIds = new HashMap(); - Iterator iter = connectionNameMap.keySet().iterator(); - while (iter.hasNext()) - { - String connectionName = (String)iter.next(); - Map y = (Map)connectionNameMap.get(connectionName); - Iterator outputIter = y.keySet().iterator(); - while (outputIter.hasNext()) + database.beginTransaction(); + try + { + IResultSet set = database.performQuery(query,list,null,null,n,null); + + if (Logging.perf.isDebugEnabled()) + Logging.perf.debug(" Expiring "+Integer.toString(set.getRowCount())+" documents"); + + // To avoid deadlock, we want to update the document id hashes in order. This means reading into a structure I can sort by docid hash, + // before updating any rows in jobqueue. + HashMap connectionNameMap = new HashMap(); + HashMap documentIDMap = new HashMap(); + Map statusMap = new HashMap(); + + int i = 0; + while (i < set.getRowCount()) { - String outputConnectionName = (String)outputIter.next(); + IResultRow row = set.getRow(i); + Long jobID = (Long)row.getValue(jobQueue.jobIDField); + String documentIDHash = (String)row.getValue(jobQueue.docHashField); + String documentID = (String)row.getValue(jobQueue.docIDField); + int status = jobQueue.stringToStatus(row.getValue(jobQueue.statusField).toString()); + Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField); + Long failCountValue = (Long)row.getValue(jobQueue.failCountField); + // Failtime is probably not useful in this context, but we'll bring it along for completeness + long failTime; + if (failTimeValue == null) + failTime = -1L; + else + failTime = failTimeValue.longValue(); + int failCount; + if (failCountValue == null) + failCount = 0; + else + failCount = (int)failCountValue.longValue(); + IJobDescription jobDesc = load(jobID); + String connectionName = jobDesc.getConnectionName(); + String outputConnectionName = jobDesc.getOutputConnectionName(); + DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField), + jobID,documentIDHash,documentID,failTime,failCount); + String compositeDocumentID = makeCompositeID(documentIDHash,connectionName); + documentIDMap.put(compositeDocumentID,dd); + statusMap.put(compositeDocumentID,new Integer(status)); + Map y = (Map)connectionNameMap.get(connectionName); + if (y == null) + { + y = new HashMap(); + connectionNameMap.put(connectionName,y); + } ArrayList x = (ArrayList)y.get(outputConnectionName); - // Do the filter query - DocumentDescription[] descriptions = new DocumentDescription[x.size()]; - int j = 0; - while (j < descriptions.length) + if (x == null) { - descriptions[j] = (DocumentDescription)x.get(j); - j++; + // New entry needed + x = new ArrayList(); + y.put(outputConnectionName,x); } - String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName); - j = 0; - while (j < docIDHashes.length) + x.add(dd); + i++; + } + + // For each bin, obtain a filtered answer, and enter all answers into a hash table. + // We'll then scan the result again to look up the right descriptions for return, + // and delete the ones that are owned multiply. + HashMap allowedDocIds = new HashMap(); + Iterator iter = connectionNameMap.keySet().iterator(); + while (iter.hasNext()) + { + String connectionName = (String)iter.next(); + Map y = (Map)connectionNameMap.get(connectionName); + Iterator outputIter = y.keySet().iterator(); + while (outputIter.hasNext()) { - String docIDHash = docIDHashes[j++]; - String key = makeCompositeID(docIDHash,connectionName); - allowedDocIds.put(key,docIDHash); + String outputConnectionName = (String)outputIter.next(); + ArrayList x = (ArrayList)y.get(outputConnectionName); + // Do the filter query + DocumentDescription[] descriptions = new DocumentDescription[x.size()]; + int j = 0; + while (j < descriptions.length) + { + descriptions[j] = (DocumentDescription)x.get(j); + j++; + } + String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName); + j = 0; + while (j < docIDHashes.length) + { + String docIDHash = docIDHashes[j++]; + String key = makeCompositeID(docIDHash,connectionName); + allowedDocIds.put(key,docIDHash); + } } } - } - // Now, assemble a result, and change the state of the records accordingly - // First thing to do is order by document hash, so we reduce the risk of deadlock. - String[] compositeIDArray = new String[documentIDMap.size()]; - i = 0; - iter = documentIDMap.keySet().iterator(); - while (iter.hasNext()) + // Now, assemble a result, and change the state of the records accordingly + // First thing to do is order by document hash, so we reduce the risk of deadlock. + String[] compositeIDArray = new String[documentIDMap.size()]; + i = 0; + iter = documentIDMap.keySet().iterator(); + while (iter.hasNext()) + { + compositeIDArray[i++] = (String)iter.next(); + } + + java.util.Arrays.sort(compositeIDArray); + + DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()]; + boolean[] rvalBoolean = new boolean[documentIDMap.size()]; + i = 0; + while (i < compositeIDArray.length) + { + String compositeDocID = compositeIDArray[i]; + DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocID); + // Determine whether we can delete it from the index or not + rvalBoolean[i] = (allowedDocIds.get(compositeDocID) != null); + // Set the record status to "being cleaned" and return it + rval[i++] = dd; + jobQueue.updateActiveRecord(dd.getID(),((Integer)statusMap.get(compositeDocID)).intValue()); + } + + TrackerClass.notePrecommit(); + database.performCommit(); + TrackerClass.noteCommit(); + + return new DocumentSetAndFlags(rval, rvalBoolean); + + } + catch (ManifoldCFException e) { - compositeIDArray[i++] = (String)iter.next(); + database.signalRollback(); + TrackerClass.noteRollback(); + if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT) + { + if (Logging.perf.isDebugEnabled()) + Logging.perf.debug("Aborted transaction finding docs to expire: "+e.getMessage()); + sleepAmt = getRandomAmount(); + continue; + } + throw e; } - - java.util.Arrays.sort(compositeIDArray); - - DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()]; - boolean[] rvalBoolean = new boolean[documentIDMap.size()]; - i = 0; - while (i < compositeIDArray.length) + catch (Error e) { - String compositeDocID = compositeIDArray[i]; - DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocID); - // Determine whether we can delete it from the index or not - rvalBoolean[i] = (allowedDocIds.get(compositeDocID) != null); - // Set the record status to "being cleaned" and return it - rval[i++] = dd; - jobQueue.updateActiveRecord(dd.getID(),((Integer)statusMap.get(compositeDocID)).intValue()); + database.signalRollback(); + TrackerClass.noteRollback(); + throw e; } - - TrackerClass.notePrecommit(); - database.performCommit(); - TrackerClass.noteCommit(); - - return new DocumentSetAndFlags(rval, rvalBoolean); - - } - catch (ManifoldCFException e) - { - database.signalRollback(); - TrackerClass.noteRollback(); - if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT) + finally { - if (Logging.perf.isDebugEnabled()) - Logging.perf.debug("Aborted transaction finding docs to expire: "+e.getMessage()); - sleepAmt = getRandomAmount(); - continue; + database.endTransaction(); } - throw e; - } - catch (Error e) - { - database.signalRollback(); - TrackerClass.noteRollback(); - throw e; } finally { - database.endTransaction(); + lockManager.leaveWriteLock(expireStufferLock); sleepFor(sleepAmt); } - } } @@ -2179,6 +2211,7 @@ public class JobManager implements IJobM // Note well: This query does not do "FOR UPDATE". The reason is that only one thread can possibly change the document's state to active. // When FOR UPDATE was included, deadlock conditions were common because of the complexity of this query. + // So, instead, as part of CONNECTORS-781, I've introduced a write lock for the pertinent section. ArrayList list = new ArrayList(); @@ -2259,128 +2292,141 @@ public class JobManager implements IJobM maxConnections[k] = connection.getMaxConnections(); k++; } - IRepositoryConnector[] connectors = RepositoryConnectorFactory.grabMultiple(threadContext,orderingKeys,classNames,configParams,maxConnections); - try + + // Never sleep with a resource locked! + while (true) { - // Hand the connectors off to the ThrottleLimit instance - k = 0; - while (k < connections.length) - { - vList.addConnectionName(connections[k].getName(),connectors[k]); - k++; - } - - // Now we can tack the limit onto the query. Before this point, remainingDocuments would be crap - int limitValue = vList.getRemainingDocuments(); - sb.append(database.constructOffsetLimitClause(0,limitValue,true)); - - if (Logging.perf.isDebugEnabled()) - { - Logging.perf.debug("Queuing documents from time "+currentTimeValue.toString()+" job priority "+currentPriorityValue.toString()+ - " (up to "+Integer.toString(vList.getRemainingDocuments())+" documents)"); - } + long sleepAmt = 0L; - while (true) + // Write lock insures that only one thread cluster-wide can be doing this at a given time, so FOR UPDATE is unneeded. + lockManager.enterWriteLock(stufferLock); + try { - long sleepAmt = 0L; - database.beginTransaction(); + + IRepositoryConnector[] connectors = RepositoryConnectorFactory.grabMultiple(threadContext,orderingKeys,classNames,configParams,maxConnections); try { - IResultSet set = database.performQuery(sb.toString(),list,null,null,-1,vList); + // Hand the connectors off to the ThrottleLimit instance + k = 0; + while (k < connections.length) + { + vList.addConnectionName(connections[k].getName(),connectors[k]); + k++; + } - if (Logging.perf.isDebugEnabled()) - Logging.perf.debug(" Queuing "+Integer.toString(set.getRowCount())+" documents"); + // Now we can tack the limit onto the query. Before this point, remainingDocuments would be crap + int limitValue = vList.getRemainingDocuments(); + sb.append(database.constructOffsetLimitClause(0,limitValue,true)); - // To avoid deadlock, we want to update the document id hashes in order. This means reading into a structure I can sort by docid hash, - // before updating any rows in jobqueue. - String[] docIDHashes = new String[set.getRowCount()]; - Map storageMap = new HashMap(); - Map statusMap = new HashMap(); + if (Logging.perf.isDebugEnabled()) + { + Logging.perf.debug("Queuing documents from time "+currentTimeValue.toString()+" job priority "+currentPriorityValue.toString()+ + " (up to "+Integer.toString(vList.getRemainingDocuments())+" documents)"); + } - int i = 0; - while (i < set.getRowCount()) + database.beginTransaction(); + try { - IResultRow row = set.getRow(i); - Long id = (Long)row.getValue(jobQueue.idField); - Long jobID = (Long)row.getValue(jobQueue.jobIDField); - String docIDHash = (String)row.getValue(jobQueue.docHashField); - String docID = (String)row.getValue(jobQueue.docIDField); - int status = jobQueue.stringToStatus(row.getValue(jobQueue.statusField).toString()); - Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField); - Long failCountValue = (Long)row.getValue(jobQueue.failCountField); - long failTime; - if (failTimeValue == null) - failTime = -1L; - else - failTime = failTimeValue.longValue(); - int failCount; - if (failCountValue == null) - failCount = -1; - else - failCount = (int)failCountValue.longValue(); + IResultSet set = database.performQuery(sb.toString(),list,null,null,-1,vList); + + if (Logging.perf.isDebugEnabled()) + Logging.perf.debug(" Queuing "+Integer.toString(set.getRowCount())+" documents"); - DocumentDescription dd = new DocumentDescription(id,jobID,docIDHash,docID,failTime,failCount); - docIDHashes[i] = docIDHash + ":" + jobID; - storageMap.put(docIDHashes[i],dd); - statusMap.put(docIDHashes[i],new Integer(status)); - if (Logging.scheduling.isDebugEnabled()) + // To avoid deadlock, we want to update the document id hashes in order. This means reading into a structure I can sort by docid hash, + // before updating any rows in jobqueue. + String[] docIDHashes = new String[set.getRowCount()]; + Map storageMap = new HashMap(); + Map statusMap = new HashMap(); + + int i = 0; + while (i < set.getRowCount()) { - Double docPriority = (Double)row.getValue(jobQueue.docPriorityField); - Logging.scheduling.debug("Stuffing document '"+docID+"' that has priority "+docPriority.toString()+" onto active list"); + IResultRow row = set.getRow(i); + Long id = (Long)row.getValue(jobQueue.idField); + Long jobID = (Long)row.getValue(jobQueue.jobIDField); + String docIDHash = (String)row.getValue(jobQueue.docHashField); + String docID = (String)row.getValue(jobQueue.docIDField); + int status = jobQueue.stringToStatus(row.getValue(jobQueue.statusField).toString()); + Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField); + Long failCountValue = (Long)row.getValue(jobQueue.failCountField); + long failTime; + if (failTimeValue == null) + failTime = -1L; + else + failTime = failTimeValue.longValue(); + int failCount; + if (failCountValue == null) + failCount = -1; + else + failCount = (int)failCountValue.longValue(); + + DocumentDescription dd = new DocumentDescription(id,jobID,docIDHash,docID,failTime,failCount); + docIDHashes[i] = docIDHash + ":" + jobID; + storageMap.put(docIDHashes[i],dd); + statusMap.put(docIDHashes[i],new Integer(status)); + if (Logging.scheduling.isDebugEnabled()) + { + Double docPriority = (Double)row.getValue(jobQueue.docPriorityField); + Logging.scheduling.debug("Stuffing document '"+docID+"' that has priority "+docPriority.toString()+" onto active list"); + } + i++; } - i++; - } - // No duplicates are possible here - java.util.Arrays.sort(docIDHashes); + // No duplicates are possible here + java.util.Arrays.sort(docIDHashes); - i = 0; - while (i < docIDHashes.length) - { - String docIDHash = docIDHashes[i]; - DocumentDescription dd = (DocumentDescription)storageMap.get(docIDHash); - Long id = dd.getID(); - int status = ((Integer)statusMap.get(docIDHash)).intValue(); + i = 0; + while (i < docIDHashes.length) + { + String docIDHash = docIDHashes[i]; + DocumentDescription dd = (DocumentDescription)storageMap.get(docIDHash); + Long id = dd.getID(); + int status = ((Integer)statusMap.get(docIDHash)).intValue(); - // Set status to "ACTIVE". - jobQueue.updateActiveRecord(id,status); + // Set status to "ACTIVE". + jobQueue.updateActiveRecord(id,status); - answers.add(dd); + answers.add(dd); - i++; + i++; + } + TrackerClass.notePrecommit(); + database.performCommit(); + TrackerClass.noteCommit(); + break; } - TrackerClass.notePrecommit(); - database.performCommit(); - TrackerClass.noteCommit(); - break; - } - catch (ManifoldCFException e) - { - database.signalRollback(); - if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT) + catch (ManifoldCFException e) { - if (Logging.perf.isDebugEnabled()) - Logging.perf.debug("Aborted transaction finding docs to queue: "+e.getMessage()); - sleepAmt = getRandomAmount(); - continue; + database.signalRollback(); + if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT) + { + if (Logging.perf.isDebugEnabled()) + Logging.perf.debug("Aborted transaction finding docs to queue: "+e.getMessage()); + sleepAmt = getRandomAmount(); + continue; + } + throw e; + } + catch (Error e) + { + database.signalRollback(); + throw e; + } + finally + { + database.endTransaction(); } - throw e; - } - catch (Error e) - { - database.signalRollback(); - throw e; } finally { - database.endTransaction(); - sleepFor(sleepAmt); + RepositoryConnectorFactory.releaseMultiple(connectors); } } - } - finally - { - RepositoryConnectorFactory.releaseMultiple(connectors); + finally + { + lockManager.leaveWriteLock(stufferLock); + sleepFor(sleepAmt); + } } }