Return-Path: X-Original-To: apmail-manifoldcf-commits-archive@www.apache.org Delivered-To: apmail-manifoldcf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 62B2210719 for ; Tue, 11 Feb 2014 19:16:43 +0000 (UTC) Received: (qmail 20786 invoked by uid 500); 11 Feb 2014 19:16:33 -0000 Delivered-To: apmail-manifoldcf-commits-archive@manifoldcf.apache.org Received: (qmail 20642 invoked by uid 500); 11 Feb 2014 19:16:30 -0000 Mailing-List: contact commits-help@manifoldcf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@manifoldcf.apache.org Delivered-To: mailing list commits@manifoldcf.apache.org Received: (qmail 20375 invoked by uid 99); 11 Feb 2014 19:16:26 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Feb 2014 19:16:26 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Feb 2014 19:16:24 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id BC20C238889B; Tue, 11 Feb 2014 19:16:04 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1567304 - /manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Date: Tue, 11 Feb 2014 19:16:04 -0000 To: commits@manifoldcf.apache.org From: kwright@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140211191604.BC20C238889B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kwright Date: Tue Feb 11 19:16:04 2014 New Revision: 1567304 URL: http://svn.apache.org/r1567304 Log: More fixes for CONNECTORS-888. Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1567304&r1=1567303&r2=1567304&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Tue Feb 11 19:16:04 2014 @@ -37,6 +37,8 @@ public class JobManager implements IJobM protected static final String expireStufferLock = "_EXPIRESTUFFER_"; protected static final String cleanStufferLock = "_CLEANSTUFFER_"; protected static final String jobStopLock = "_JOBSTOP_"; + protected static final String jobResumeLock = "_JOBRESUME_"; + protected static final String jobResetLock = "_JOBRESET_"; protected static final String hopLock = "_HOPLOCK_"; // Member variables @@ -7531,37 +7533,45 @@ public class JobManager implements IJobM public void finishJobResumes(long timestamp, ArrayList modifiedJobs) throws ManifoldCFException { + lockManager.enterWriteLock(jobResumeLock); + try + { // Do the first query, getting the candidate jobs to be considered - StringBuilder sb = new StringBuilder("SELECT "); - ArrayList list = new ArrayList(); - - sb.append(jobs.idField) - .append(" FROM ").append(jobs.getTableName()).append(" WHERE ") - .append(database.buildConjunctionClause(list,new ClauseDescription[]{ - new MultiClause(jobs.statusField,new Object[]{ - jobs.statusToString(jobs.STATUS_RESUMING), - jobs.statusToString(jobs.STATUS_RESUMINGSEEDING) - })})); - - IResultSet set = database.performQuery(sb.toString(),list,null,null); + StringBuilder sb = new StringBuilder("SELECT "); + ArrayList list = new ArrayList(); + + sb.append(jobs.idField) + .append(" FROM ").append(jobs.getTableName()).append(" WHERE ") + .append(database.buildConjunctionClause(list,new ClauseDescription[]{ + new MultiClause(jobs.statusField,new Object[]{ + jobs.statusToString(jobs.STATUS_RESUMING), + jobs.statusToString(jobs.STATUS_RESUMINGSEEDING) + })})); + + IResultSet set = database.performQuery(sb.toString(),list,null,null); - int i = 0; - while (i < set.getRowCount()) - { - IResultRow row = set.getRow(i++); - Long jobID = (Long)row.getValue(jobs.idField); + int i = 0; + while (i < set.getRowCount()) + { + IResultRow row = set.getRow(i++); + Long jobID = (Long)row.getValue(jobs.idField); - // There are no secondary checks that need to be made; just resume - IJobDescription jobDesc = jobs.load(jobID,true); - modifiedJobs.add(jobDesc); + // There are no secondary checks that need to be made; just resume + IJobDescription jobDesc = jobs.load(jobID,true); + modifiedJobs.add(jobDesc); - jobs.finishResumeJob(jobID,timestamp); - - if (Logging.jobs.isDebugEnabled()) - { - Logging.jobs.debug("Resumed job "+jobID); + jobs.finishResumeJob(jobID,timestamp); + + if (Logging.jobs.isDebugEnabled()) + { + Logging.jobs.debug("Resumed job "+jobID); + } } } + finally + { + lockManager.leaveWriteLock(jobResumeLock); + } } /** Complete the sequence that stops jobs, either for abort, pause, or because of a scheduling @@ -7664,84 +7674,92 @@ public class JobManager implements IJobM public void resetJobs(long currentTime, ArrayList resetJobs) throws ManifoldCFException { - // Query for all jobs that fulfill the criteria - // The query used to look like: - // - // SELECT id FROM jobs t0 WHERE status='D' AND NOT EXISTS(SELECT 'x' FROM jobqueue t1 WHERE - // t0.id=t1.jobid AND t1.status='P') - // - // Now, the query is broken up, for performance - - // Do the first query, getting the candidate jobs to be considered - StringBuilder sb = new StringBuilder("SELECT "); - ArrayList list = new ArrayList(); - - sb.append(jobs.idField).append(" FROM ").append(jobs.getTableName()).append(" WHERE ") - .append(database.buildConjunctionClause(list,new ClauseDescription[]{ - new UnitaryClause(jobs.statusField,jobs.statusToString(jobs.STATUS_SHUTTINGDOWN))})); - - IResultSet set = database.performQuery(sb.toString(),list,null,null); - - int i = 0; - while (i < set.getRowCount()) + lockManager.enterWriteLock(jobResetLock); + try { - IResultRow row = set.getRow(i++); - Long jobID = (Long)row.getValue(jobs.idField); + // Query for all jobs that fulfill the criteria + // The query used to look like: + // + // SELECT id FROM jobs t0 WHERE status='D' AND NOT EXISTS(SELECT 'x' FROM jobqueue t1 WHERE + // t0.id=t1.jobid AND t1.status='P') + // + // Now, the query is broken up, for performance - // Check to be sure the job is a candidate for shutdown - sb = new StringBuilder("SELECT "); - list.clear(); + // Do the first query, getting the candidate jobs to be considered + StringBuilder sb = new StringBuilder("SELECT "); + ArrayList list = new ArrayList(); - sb.append(jobQueue.idField).append(" FROM ").append(jobQueue.getTableName()).append(" WHERE ") + sb.append(jobs.idField).append(" FROM ").append(jobs.getTableName()).append(" WHERE ") .append(database.buildConjunctionClause(list,new ClauseDescription[]{ - new UnitaryClause(jobQueue.jobIDField,jobID), - new MultiClause(jobQueue.statusField,new Object[]{ - jobQueue.statusToString(jobQueue.STATUS_PURGATORY), - jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED)})})) - .append(" ").append(database.constructOffsetLimitClause(0,1)); + new UnitaryClause(jobs.statusField,jobs.statusToString(jobs.STATUS_SHUTTINGDOWN))})); + + IResultSet set = database.performQuery(sb.toString(),list,null,null); - IResultSet confirmSet = database.performQuery(sb.toString(),list,null,null,1,null); + int i = 0; + while (i < set.getRowCount()) + { + IResultRow row = set.getRow(i++); + Long jobID = (Long)row.getValue(jobs.idField); - if (confirmSet.getRowCount() > 0) - continue; + // Check to be sure the job is a candidate for shutdown + sb = new StringBuilder("SELECT "); + list.clear(); + + sb.append(jobQueue.idField).append(" FROM ").append(jobQueue.getTableName()).append(" WHERE ") + .append(database.buildConjunctionClause(list,new ClauseDescription[]{ + new UnitaryClause(jobQueue.jobIDField,jobID), + new MultiClause(jobQueue.statusField,new Object[]{ + jobQueue.statusToString(jobQueue.STATUS_PURGATORY), + jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED)})})) + .append(" ").append(database.constructOffsetLimitClause(0,1)); - // The shutting-down phase is complete. However, we need to check if there are any outstanding - // PENDING or PENDINGPURGATORY records before we can decide what to do. - sb = new StringBuilder("SELECT "); - list.clear(); - - sb.append(jobQueue.idField).append(" FROM ").append(jobQueue.getTableName()).append(" WHERE ") - .append(database.buildConjunctionClause(list,new ClauseDescription[]{ - new UnitaryClause(jobQueue.jobIDField,jobID), - new MultiClause(jobQueue.statusField,new Object[]{ - jobQueue.statusToString(jobQueue.STATUS_PENDING), - jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY)})})) - .append(" ").append(database.constructOffsetLimitClause(0,1)); + IResultSet confirmSet = database.performQuery(sb.toString(),list,null,null,1,null); - confirmSet = database.performQuery(sb.toString(),list,null,null,1,null); + if (confirmSet.getRowCount() > 0) + continue; - if (confirmSet.getRowCount() > 0) - { - // This job needs to re-enter the active state. Make that happen. - jobs.returnJobToActive(jobID); - if (Logging.jobs.isDebugEnabled()) + // The shutting-down phase is complete. However, we need to check if there are any outstanding + // PENDING or PENDINGPURGATORY records before we can decide what to do. + sb = new StringBuilder("SELECT "); + list.clear(); + + sb.append(jobQueue.idField).append(" FROM ").append(jobQueue.getTableName()).append(" WHERE ") + .append(database.buildConjunctionClause(list,new ClauseDescription[]{ + new UnitaryClause(jobQueue.jobIDField,jobID), + new MultiClause(jobQueue.statusField,new Object[]{ + jobQueue.statusToString(jobQueue.STATUS_PENDING), + jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY)})})) + .append(" ").append(database.constructOffsetLimitClause(0,1)); + + confirmSet = database.performQuery(sb.toString(),list,null,null,1,null); + + if (confirmSet.getRowCount() > 0) { - Logging.jobs.debug("Job "+jobID+" is re-entering active state"); + // This job needs to re-enter the active state. Make that happen. + jobs.returnJobToActive(jobID); + if (Logging.jobs.isDebugEnabled()) + { + Logging.jobs.debug("Job "+jobID+" is re-entering active state"); + } } - } - else - { - // This job should be marked as finished. - IJobDescription jobDesc = jobs.load(jobID,true); - resetJobs.add(jobDesc); - - jobs.finishJob(jobID,currentTime); - if (Logging.jobs.isDebugEnabled()) + else { - Logging.jobs.debug("Job "+jobID+" now completed"); + // This job should be marked as finished. + IJobDescription jobDesc = jobs.load(jobID,true); + resetJobs.add(jobDesc); + + jobs.finishJob(jobID,currentTime); + if (Logging.jobs.isDebugEnabled()) + { + Logging.jobs.debug("Job "+jobID+" now completed"); + } } } } + finally + { + lockManager.leaveWriteLock(jobResetLock); + } }