Return-Path: X-Original-To: apmail-manifoldcf-commits-archive@www.apache.org Delivered-To: apmail-manifoldcf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 002E2D609 for ; Wed, 12 Dec 2012 02:29:22 +0000 (UTC) Received: (qmail 51210 invoked by uid 500); 12 Dec 2012 02:29:22 -0000 Delivered-To: apmail-manifoldcf-commits-archive@manifoldcf.apache.org Received: (qmail 51167 invoked by uid 500); 12 Dec 2012 02:29:22 -0000 Mailing-List: contact commits-help@manifoldcf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@manifoldcf.apache.org Delivered-To: mailing list commits@manifoldcf.apache.org Received: (qmail 51157 invoked by uid 99); 12 Dec 2012 02:29:22 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Dec 2012 02:29:22 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Dec 2012 02:29:18 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 220E423889FD; Wed, 12 Dec 2012 02:28:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1420512 - /manifoldcf/branches/CONNECTORS-584/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Date: Wed, 12 Dec 2012 02:28:56 -0000 To: commits@manifoldcf.apache.org From: kwright@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121212022857.220E423889FD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kwright Date: Wed Dec 12 02:28:55 2012 New Revision: 1420512 URL: http://svn.apache.org/viewvc?rev=1420512&view=rev Log: Loop on job start, because MySQL has lock timeouts with this query a lot Modified: manifoldcf/branches/CONNECTORS-584/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Modified: manifoldcf/branches/CONNECTORS-584/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-584/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1420512&r1=1420511&r2=1420512&view=diff ============================================================================== --- manifoldcf/branches/CONNECTORS-584/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original) +++ manifoldcf/branches/CONNECTORS-584/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Wed Dec 12 02:28:55 2012 @@ -4684,223 +4684,237 @@ public class JobManager implements IJobM // Note well: We can't combine locks across both our lock manager and the database unless we do it consistently. The // consistent practice throughout CF is to do the external locks first, then the database locks. This particular method // thus cannot use cached job description information, because it must throw database locks first against the jobs table. - database.beginTransaction(); - try + while (true) { - // First, query the appropriate fields of all jobs. - StringBuilder sb = new StringBuilder("SELECT "); - ArrayList list = new ArrayList(); - - sb.append(jobs.idField).append(",") - .append(jobs.lastTimeField).append(",") - .append(jobs.statusField).append(",") - .append(jobs.startMethodField).append(",") - .append(jobs.outputNameField).append(",") - .append(jobs.connectionNameField) - .append(" FROM ").append(jobs.getTableName()).append(" WHERE ") - .append(database.buildConjunctionClause(list,new ClauseDescription[]{ - new MultiClause(jobs.statusField,new Object[]{ - jobs.statusToString(jobs.STATUS_INACTIVE), - jobs.statusToString(jobs.STATUS_ACTIVEWAIT), - jobs.statusToString(jobs.STATUS_ACTIVEWAITSEEDING), - jobs.statusToString(jobs.STATUS_PAUSEDWAIT), - jobs.statusToString(jobs.STATUS_PAUSEDWAITSEEDING)})})).append(" AND ") - .append(jobs.startMethodField).append("!=? FOR UPDATE"); - - list.add(jobs.startMethodToString(IJobDescription.START_DISABLE)); - - IResultSet set = database.performQuery(sb.toString(),list,null,null); - - // Next, we query for the schedule information. In order to do that, we amass a list of job identifiers that we want schedule info - // for. - Long[] jobIDSet = new Long[set.getRowCount()]; - int i = 0; - while (i < set.getRowCount()) - { - IResultRow row = set.getRow(i); - jobIDSet[i++] = (Long)row.getValue(jobs.idField); - } - - ScheduleRecord[][] srSet = jobs.readScheduleRecords(jobIDSet); - - i = 0; - while (i < set.getRowCount()) + long sleepAmt = 0L; + database.beginTransaction(); + try { - IResultRow row = set.getRow(i); - - Long jobID = (Long)row.getValue(jobs.idField); - int startMethod = jobs.stringToStartMethod((String)row.getValue(jobs.startMethodField)); - String outputName = (String)row.getValue(jobs.outputNameField); - String connectionName = (String)row.getValue(jobs.connectionNameField); - ScheduleRecord[] thisSchedule = srSet[i++]; - - // Run at specific times + // First, query the appropriate fields of all jobs. + StringBuilder sb = new StringBuilder("SELECT "); + ArrayList list = new ArrayList(); + + sb.append(jobs.idField).append(",") + .append(jobs.lastTimeField).append(",") + .append(jobs.statusField).append(",") + .append(jobs.startMethodField).append(",") + .append(jobs.outputNameField).append(",") + .append(jobs.connectionNameField) + .append(" FROM ").append(jobs.getTableName()).append(" WHERE ") + .append(database.buildConjunctionClause(list,new ClauseDescription[]{ + new MultiClause(jobs.statusField,new Object[]{ + jobs.statusToString(jobs.STATUS_INACTIVE), + jobs.statusToString(jobs.STATUS_ACTIVEWAIT), + jobs.statusToString(jobs.STATUS_ACTIVEWAITSEEDING), + jobs.statusToString(jobs.STATUS_PAUSEDWAIT), + jobs.statusToString(jobs.STATUS_PAUSEDWAITSEEDING)})})).append(" AND ") + .append(jobs.startMethodField).append("!=? FOR UPDATE"); + + list.add(jobs.startMethodToString(IJobDescription.START_DISABLE)); + + IResultSet set = database.performQuery(sb.toString(),list,null,null); - // We need to start with the start time as given, plus one - long startInterval = ((Long)row.getValue(jobs.lastTimeField)).longValue() + 1; - if (Logging.jobs.isDebugEnabled()) - Logging.jobs.debug("Checking if job "+jobID.toString()+" needs to be started; it was last checked at "+ - new Long(startInterval).toString()+", and now it is "+new Long(currentTime).toString()); + // Next, we query for the schedule information. In order to do that, we amass a list of job identifiers that we want schedule info + // for. + Long[] jobIDSet = new Long[set.getRowCount()]; + int i = 0; + while (i < set.getRowCount()) + { + IResultRow row = set.getRow(i); + jobIDSet[i++] = (Long)row.getValue(jobs.idField); + } - // Proceed to the current time, and find a match if there is one to be found. - // If not -> continue + ScheduleRecord[][] srSet = jobs.readScheduleRecords(jobIDSet); - // We go through *all* the schedule records. The one that matches that has the latest - // end time is the one we take. - int l = 0; - Long matchTime = null; - Long duration = null; - while (l < thisSchedule.length) - { - long trialStartInterval = startInterval; - ScheduleRecord sr = thisSchedule[l++]; - Long thisDuration = sr.getDuration(); - if (startMethod == IJobDescription.START_WINDOWINSIDE && - thisDuration != null) - { - // Bump the start interval back before the beginning of the current interval. - // This will guarantee a start as long as there is time in the window. - long trialStart = currentTime - thisDuration.longValue(); - if (trialStart < trialStartInterval) - trialStartInterval = trialStart; - } + i = 0; + while (i < set.getRowCount()) + { + IResultRow row = set.getRow(i); - Long thisMatchTime = checkTimeMatch(trialStartInterval,currentTime, - sr.getDayOfWeek(), - sr.getDayOfMonth(), - sr.getMonthOfYear(), - sr.getYear(), - sr.getHourOfDay(), - sr.getMinutesOfHour(), - sr.getTimezone(), - thisDuration); + Long jobID = (Long)row.getValue(jobs.idField); + int startMethod = jobs.stringToStartMethod((String)row.getValue(jobs.startMethodField)); + String outputName = (String)row.getValue(jobs.outputNameField); + String connectionName = (String)row.getValue(jobs.connectionNameField); + ScheduleRecord[] thisSchedule = srSet[i++]; + + // Run at specific times + + // We need to start with the start time as given, plus one + long startInterval = ((Long)row.getValue(jobs.lastTimeField)).longValue() + 1; + if (Logging.jobs.isDebugEnabled()) + Logging.jobs.debug("Checking if job "+jobID.toString()+" needs to be started; it was last checked at "+ + new Long(startInterval).toString()+", and now it is "+new Long(currentTime).toString()); + + // Proceed to the current time, and find a match if there is one to be found. + // If not -> continue + + // We go through *all* the schedule records. The one that matches that has the latest + // end time is the one we take. + int l = 0; + Long matchTime = null; + Long duration = null; + while (l < thisSchedule.length) + { + long trialStartInterval = startInterval; + ScheduleRecord sr = thisSchedule[l++]; + Long thisDuration = sr.getDuration(); + if (startMethod == IJobDescription.START_WINDOWINSIDE && + thisDuration != null) + { + // Bump the start interval back before the beginning of the current interval. + // This will guarantee a start as long as there is time in the window. + long trialStart = currentTime - thisDuration.longValue(); + if (trialStart < trialStartInterval) + trialStartInterval = trialStart; + } + + Long thisMatchTime = checkTimeMatch(trialStartInterval,currentTime, + sr.getDayOfWeek(), + sr.getDayOfMonth(), + sr.getMonthOfYear(), + sr.getYear(), + sr.getHourOfDay(), + sr.getMinutesOfHour(), + sr.getTimezone(), + thisDuration); + + if (thisMatchTime == null) + { + if (Logging.jobs.isDebugEnabled()) + Logging.jobs.debug(" No time match found within interval "+new Long(trialStartInterval).toString()+ + " to "+new Long(currentTime).toString()); + continue; + } - if (thisMatchTime == null) - { if (Logging.jobs.isDebugEnabled()) - Logging.jobs.debug(" No time match found within interval "+new Long(trialStartInterval).toString()+ + Logging.jobs.debug(" Time match FOUND within interval "+new Long(trialStartInterval).toString()+ " to "+new Long(currentTime).toString()); - continue; - } - if (Logging.jobs.isDebugEnabled()) - Logging.jobs.debug(" Time match FOUND within interval "+new Long(trialStartInterval).toString()+ - " to "+new Long(currentTime).toString()); + if (matchTime == null || thisDuration == null || + (duration != null && thisMatchTime.longValue() + thisDuration.longValue() > + matchTime.longValue() + duration.longValue())) + { + matchTime = thisMatchTime; + duration = thisDuration; + } + } - if (matchTime == null || thisDuration == null || - (duration != null && thisMatchTime.longValue() + thisDuration.longValue() > - matchTime.longValue() + duration.longValue())) + if (matchTime == null) { - matchTime = thisMatchTime; - duration = thisDuration; + jobs.updateLastTime(jobID,currentTime); + continue; } - } - - if (matchTime == null) - { - jobs.updateLastTime(jobID,currentTime); - continue; - } - - int status = jobs.stringToStatus(row.getValue(jobs.statusField).toString()); + int status = jobs.stringToStatus(row.getValue(jobs.statusField).toString()); - // Calculate the end of the window - Long windowEnd = null; - if (duration != null) - { - windowEnd = new Long(matchTime.longValue()+duration.longValue()); - } - if (Logging.jobs.isDebugEnabled()) - { - Logging.jobs.debug("Job '"+jobID+"' is within run window at "+new Long(currentTime).toString()+" ms. (which starts at "+ - matchTime.toString()+" ms."+((duration==null)?"":(" and goes for "+duration.toString()+" ms."))+")"); - } - - int newJobState; - switch (status) - { - case Jobs.STATUS_INACTIVE: - // If job was formerly "inactive", do the full startup. - // Start this job! but with no end time. - // This does not get logged because the startup thread does the logging. - jobs.startJob(jobID,windowEnd); - jobQueue.clearFailTimes(jobID); - if (Logging.jobs.isDebugEnabled()) + // Calculate the end of the window + Long windowEnd = null; + if (duration != null) { - Logging.jobs.debug("Signalled for job start for job "+jobID); + windowEnd = new Long(matchTime.longValue()+duration.longValue()); } - break; - case Jobs.STATUS_ACTIVEWAIT: - unwaitList.add(jobID); - jobs.unwaitJob(jobID,Jobs.STATUS_RESUMING,windowEnd); - jobQueue.clearFailTimes(jobID); - if (Logging.jobs.isDebugEnabled()) - { - Logging.jobs.debug("Un-waited job "+jobID); - } - break; - case Jobs.STATUS_ACTIVEWAITSEEDING: - unwaitList.add(jobID); - jobs.unwaitJob(jobID,Jobs.STATUS_RESUMINGSEEDING,windowEnd); - jobQueue.clearFailTimes(jobID); - if (Logging.jobs.isDebugEnabled()) - { - Logging.jobs.debug("Un-waited job "+jobID); - } - break; - case Jobs.STATUS_PAUSEDWAIT: - unwaitList.add(jobID); - jobs.unwaitJob(jobID,jobs.STATUS_PAUSED,windowEnd); - if (Logging.jobs.isDebugEnabled()) - { - Logging.jobs.debug("Un-waited (but still paused) job "+jobID); - } - break; - case Jobs.STATUS_PAUSEDWAITSEEDING: - unwaitList.add(jobID); - jobs.unwaitJob(jobID,jobs.STATUS_PAUSEDSEEDING,windowEnd); - if (Logging.jobs.isDebugEnabled()) - { - Logging.jobs.debug("Un-waited (but still paused) job "+jobID); - } - break; - case Jobs.STATUS_PAUSINGWAITING: - unwaitList.add(jobID); - jobs.unwaitJob(jobID,jobs.STATUS_PAUSING,windowEnd); + if (Logging.jobs.isDebugEnabled()) { - Logging.jobs.debug("Un-waited (but still paused) job "+jobID); + Logging.jobs.debug("Job '"+jobID+"' is within run window at "+new Long(currentTime).toString()+" ms. (which starts at "+ + matchTime.toString()+" ms."+((duration==null)?"":(" and goes for "+duration.toString()+" ms."))+")"); } - break; - case Jobs.STATUS_PAUSINGWAITINGSEEDING: - unwaitList.add(jobID); - jobs.unwaitJob(jobID,jobs.STATUS_PAUSINGSEEDING,windowEnd); - if (Logging.jobs.isDebugEnabled()) + + int newJobState; + switch (status) { - Logging.jobs.debug("Un-waited (but still paused) job "+jobID); + case Jobs.STATUS_INACTIVE: + // If job was formerly "inactive", do the full startup. + // Start this job! but with no end time. + // This does not get logged because the startup thread does the logging. + jobs.startJob(jobID,windowEnd); + jobQueue.clearFailTimes(jobID); + if (Logging.jobs.isDebugEnabled()) + { + Logging.jobs.debug("Signalled for job start for job "+jobID); + } + break; + case Jobs.STATUS_ACTIVEWAIT: + unwaitList.add(jobID); + jobs.unwaitJob(jobID,Jobs.STATUS_RESUMING,windowEnd); + jobQueue.clearFailTimes(jobID); + if (Logging.jobs.isDebugEnabled()) + { + Logging.jobs.debug("Un-waited job "+jobID); + } + break; + case Jobs.STATUS_ACTIVEWAITSEEDING: + unwaitList.add(jobID); + jobs.unwaitJob(jobID,Jobs.STATUS_RESUMINGSEEDING,windowEnd); + jobQueue.clearFailTimes(jobID); + if (Logging.jobs.isDebugEnabled()) + { + Logging.jobs.debug("Un-waited job "+jobID); + } + break; + case Jobs.STATUS_PAUSEDWAIT: + unwaitList.add(jobID); + jobs.unwaitJob(jobID,jobs.STATUS_PAUSED,windowEnd); + if (Logging.jobs.isDebugEnabled()) + { + Logging.jobs.debug("Un-waited (but still paused) job "+jobID); + } + break; + case Jobs.STATUS_PAUSEDWAITSEEDING: + unwaitList.add(jobID); + jobs.unwaitJob(jobID,jobs.STATUS_PAUSEDSEEDING,windowEnd); + if (Logging.jobs.isDebugEnabled()) + { + Logging.jobs.debug("Un-waited (but still paused) job "+jobID); + } + break; + case Jobs.STATUS_PAUSINGWAITING: + unwaitList.add(jobID); + jobs.unwaitJob(jobID,jobs.STATUS_PAUSING,windowEnd); + if (Logging.jobs.isDebugEnabled()) + { + Logging.jobs.debug("Un-waited (but still paused) job "+jobID); + } + break; + case Jobs.STATUS_PAUSINGWAITINGSEEDING: + unwaitList.add(jobID); + jobs.unwaitJob(jobID,jobs.STATUS_PAUSINGSEEDING,windowEnd); + if (Logging.jobs.isDebugEnabled()) + { + Logging.jobs.debug("Un-waited (but still paused) job "+jobID); + } + break; + default: + break; } - break; - default: - break; - } + } + database.performCommit(); + return; + } + catch (ManifoldCFException e) + { + database.signalRollback(); + if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT) + { + if (Logging.perf.isDebugEnabled()) + Logging.perf.debug("Aborted transaction resetting for restart: "+e.getMessage()); + sleepAmt = getRandomAmount(); + continue; + } + throw e; + } + catch (Error e) + { + database.signalRollback(); + throw e; + } + finally + { + database.endTransaction(); + sleepFor(sleepAmt); } - } - catch (ManifoldCFException e) - { - database.signalRollback(); - throw e; - } - catch (Error e) - { - database.signalRollback(); - throw e; - } - finally - { - database.endTransaction(); } }