Return-Path: X-Original-To: apmail-incubator-connectors-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-connectors-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2AD8878B2 for ; Wed, 23 Nov 2011 17:38:17 +0000 (UTC) Received: (qmail 67124 invoked by uid 500); 23 Nov 2011 17:38:17 -0000 Delivered-To: apmail-incubator-connectors-commits-archive@incubator.apache.org Received: (qmail 67091 invoked by uid 500); 23 Nov 2011 17:38:17 -0000 Mailing-List: contact connectors-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: connectors-dev@incubator.apache.org Delivered-To: mailing list connectors-commits@incubator.apache.org Received: (qmail 67080 invoked by uid 99); 23 Nov 2011 17:38:17 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Nov 2011 17:38:17 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Nov 2011 17:38:14 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 23BCD2388978; Wed, 23 Nov 2011 17:37:54 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1205504 [1/2] - in /incubator/lcf/branches/CONNECTORS-286: ./ framework/crawler-ui/src/main/webapp/ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawl... Date: Wed, 23 Nov 2011 17:37:52 -0000 To: connectors-commits@incubator.apache.org From: kwright@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111123173754.23BCD2388978@eris.apache.org> Author: kwright Date: Wed Nov 23 17:37:51 2011 New Revision: 1205504 URL: http://svn.apache.org/viewvc?rev=1205504&view=rev Log: Pull up changes from trunk for CONNECTORS-290. Modified: incubator/lcf/branches/CONNECTORS-286/ (props changed) incubator/lcf/branches/CONNECTORS-286/CHANGES.txt incubator/lcf/branches/CONNECTORS-286/framework/crawler-ui/src/main/webapp/showjobstatus.jsp incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IRepositoryConnectionManager.java incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/JobStatus.java incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobStartThread.java incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java incubator/lcf/branches/CONNECTORS-286/site/src/documentation/content/xdocs/programmatic-operation.xml Propchange: incubator/lcf/branches/CONNECTORS-286/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Nov 23 17:37:51 2011 @@ -10,4 +10,6 @@ /incubator/lcf/branches/CONNECTORS-256:1172846-1182809 /incubator/lcf/branches/CONNECTORS-277:1185949-1187036 /incubator/lcf/branches/CONNECTORS-284:1189305-1190398 +/incubator/lcf/branches/CONNECTORS-290:1204836-1205502 /incubator/lcf/branches/CONNECTORS-32:1092556-1094216 +/incubator/lcf/trunk:1205503 Modified: incubator/lcf/branches/CONNECTORS-286/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/CHANGES.txt?rev=1205504&r1=1205503&r2=1205504&view=diff ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/CHANGES.txt (original) +++ incubator/lcf/branches/CONNECTORS-286/CHANGES.txt Wed Nov 23 17:37:51 2011 @@ -3,6 +3,12 @@ $Id$ ======================= 0.4-dev ===================== +CONNECTORS-290: Revamp the way docpriorities are handled so as to avoid +flooding the jobqueue with documents that have a docpriority value but +a job or document status incompatible with stuffing. This should help a lot +in very large crawls with multiple jobs. +(Karl Wright) + CONNECTORS-285: Provide a way through the IJobManager interface and through the API to check on the status of a job without asking for the counts of documents, which yields a full table scan. Modified: incubator/lcf/branches/CONNECTORS-286/framework/crawler-ui/src/main/webapp/showjobstatus.jsp URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/framework/crawler-ui/src/main/webapp/showjobstatus.jsp?rev=1205504&r1=1205503&r2=1205504&view=diff ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/framework/crawler-ui/src/main/webapp/showjobstatus.jsp (original) +++ incubator/lcf/branches/CONNECTORS-286/framework/crawler-ui/src/main/webapp/showjobstatus.jsp Wed Nov 23 17:37:51 2011 @@ -131,6 +131,12 @@ if (maintenanceUnderway == false) case JobStatus.JOBSTATUS_RESTARTING: statusName = "Restarting"; break; + case JobStatus.JOBSTATUS_STOPPING: + statusName = "Stopping"; + break; + case JobStatus.JOBSTATUS_RESUMING: + statusName = "Resuming"; + break; case JobStatus.JOBSTATUS_PAUSED: statusName = "Paused"; break; @@ -180,24 +186,53 @@ if (maintenanceUnderway == false) %> > -<% if (status == JobStatus.JOBSTATUS_NOTYETRUN || status == JobStatus.JOBSTATUS_COMPLETED || status == JobStatus.JOBSTATUS_ERROR) { %> - Start  -<% } %> -<% if (status == JobStatus.JOBSTATUS_RUNNING || status == JobStatus.JOBSTATUS_RUNNING_UNINSTALLED || status == JobStatus.JOBSTATUS_WINDOWWAIT || - status == JobStatus.JOBSTATUS_PAUSED || status == JobStatus.JOBSTATUS_STARTING) { %> - Restart  -<% } %> -<% if (status == JobStatus.JOBSTATUS_RUNNING || status == JobStatus.JOBSTATUS_RUNNING_UNINSTALLED || status == JobStatus.JOBSTATUS_WINDOWWAIT) { %> - Pause  -<% } %> -<% if (status == JobStatus.JOBSTATUS_RUNNING || status == JobStatus.JOBSTATUS_RUNNING_UNINSTALLED || status == JobStatus.JOBSTATUS_WINDOWWAIT || - status == JobStatus.JOBSTATUS_PAUSED || status == JobStatus.JOBSTATUS_STARTING || status == JobStatus.JOBSTATUS_RESTARTING) { %> - Abort  -<% } %> -<% if (status == JobStatus.JOBSTATUS_PAUSED) { %> - Resume  -<% } %> - +<% + if (status == JobStatus.JOBSTATUS_NOTYETRUN || + status == JobStatus.JOBSTATUS_COMPLETED || + status == JobStatus.JOBSTATUS_ERROR) + { +%> + Start  +<% + } + if (status == JobStatus.JOBSTATUS_RUNNING || + status == JobStatus.JOBSTATUS_RUNNING_UNINSTALLED || + status == JobStatus.JOBSTATUS_WINDOWWAIT || + status == JobStatus.JOBSTATUS_PAUSED || + status == JobStatus.JOBSTATUS_STARTING) + { +%> + Restart  +<% + } + if (status == JobStatus.JOBSTATUS_RUNNING || + status == JobStatus.JOBSTATUS_RUNNING_UNINSTALLED || + status == JobStatus.JOBSTATUS_WINDOWWAIT) + { +%> + Pause  +<% + } + if (status == JobStatus.JOBSTATUS_RUNNING || + status == JobStatus.JOBSTATUS_RUNNING_UNINSTALLED || + status == JobStatus.JOBSTATUS_STOPPING || + status == JobStatus.JOBSTATUS_RESUMING || + status == JobStatus.JOBSTATUS_WINDOWWAIT || + status == JobStatus.JOBSTATUS_PAUSED || + status == JobStatus.JOBSTATUS_STARTING || + status == JobStatus.JOBSTATUS_RESTARTING) + { +%> + Abort  +<% + } + if (status == JobStatus.JOBSTATUS_PAUSED) + { +%> + Resume  +<% + } +%> <%=""%><%=js.getDescription()%><%=statusName%><%=startTime%><%=endTime%> <%=new Long(js.getDocumentsInQueue()).toString()%> Modified: incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1205504&r1=1205503&r2=1205504&view=diff ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original) +++ incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Wed Nov 23 17:37:51 2011 @@ -826,13 +826,24 @@ public interface IJobManager public boolean errorAbort(Long jobID, String errorText) throws ManifoldCFException; - /** Complete the sequence that aborts jobs and makes them runnable again. + /** Complete the sequence that stops jobs, either for abort, pause, or because of a scheduling + * window. The logic will move the job to its next state (INACTIVE, PAUSED, ACTIVEWAIT), + * and will record the jobs that have been so modified. *@param timestamp is the current time in milliseconds since epoch. - *@param abortJobs is filled in with the set of IJobDescription objects that were aborted. + *@param modifiedJobs is filled in with the set of IJobDescription objects that were stopped. */ - public void finishJobAborts(long timestamp, ArrayList abortJobs) + public void finishJobStops(long timestamp, ArrayList modifiedJobs) throws ManifoldCFException; + /** Complete the sequence that resumes jobs, either from a pause or from a scheduling window + * wait. The logic will restore the job to an active state (many possibilities depending on + * connector status), and will record the jobs that have been so modified. + *@param timestamp is the current time in milliseconds since epoch. + *@param modifiedJobs is filled in with the set of IJobDescription objects that were resumed. + */ + public void finishJobResumes(long timestamp, ArrayList modifiedJobs) + throws ManifoldCFException; + /** Put all eligible jobs in the "shutting down" state. */ public void finishJobs() Modified: incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IRepositoryConnectionManager.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IRepositoryConnectionManager.java?rev=1205504&r1=1205503&r2=1205504&view=diff ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IRepositoryConnectionManager.java (original) +++ incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IRepositoryConnectionManager.java Wed Nov 23 17:37:51 2011 @@ -125,19 +125,26 @@ public interface IRepositoryConnectionMa /** Start a job */ public static final String ACTIVITY_JOBSTART = "job start"; - /** Wait a job */ - public static final String ACTIVITY_JOBWAIT = "job wait"; - /** Continue a job */ - public static final String ACTIVITY_JOBCONTINUE = "job continue"; /** Finish a job */ public static final String ACTIVITY_JOBEND = "job end"; - /** Abort a job */ - public static final String ACTIVITY_JOBABORT = "job abort"; - + /** Stop a job */ + public static final String ACTIVITY_JOBSTOP = "job stop"; + /** Continue a job */ + public static final String ACTIVITY_JOBCONTINUE = "job continue"; + /** Wait due to schedule */ + public static final String ACTIVITY_JOBWAIT = "job wait"; + /** Unwait due to schedule */ + public static final String ACTIVITY_JOBUNWAIT = "job unwait"; + /** The set of activity records. */ public static final String[] activitySet = new String[] { - ACTIVITY_JOBSTART,ACTIVITY_JOBWAIT,ACTIVITY_JOBCONTINUE,ACTIVITY_JOBEND,ACTIVITY_JOBABORT + ACTIVITY_JOBSTART, + ACTIVITY_JOBSTOP, + ACTIVITY_JOBCONTINUE, + ACTIVITY_JOBWAIT, + ACTIVITY_JOBUNWAIT, + ACTIVITY_JOBEND }; /** Record time-stamped information about the activity of the connection. This information can originate from Modified: incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/JobStatus.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/JobStatus.java?rev=1205504&r1=1205503&r2=1205504&view=diff ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/JobStatus.java (original) +++ incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/JobStatus.java Wed Nov 23 17:37:51 2011 @@ -28,17 +28,19 @@ public class JobStatus // Job status values public final static int JOBSTATUS_NOTYETRUN = 0; public final static int JOBSTATUS_RUNNING = 1; - public final static int JOBSTATUS_PAUSED = 2; - public final static int JOBSTATUS_COMPLETED = 3; - public final static int JOBSTATUS_WINDOWWAIT = 4; - public final static int JOBSTATUS_STARTING = 5; - public final static int JOBSTATUS_DESTRUCTING = 6; - public final static int JOBSTATUS_ERROR = 7; - public final static int JOBSTATUS_ABORTING = 8; - public final static int JOBSTATUS_RESTARTING = 9; - public final static int JOBSTATUS_RUNNING_UNINSTALLED = 10; - public final static int JOBSTATUS_JOBENDCLEANUP = 11; - public final static int JOBSTATUS_JOBENDNOTIFICATION = 12; + public final static int JOBSTATUS_STOPPING = 2; + public final static int JOBSTATUS_PAUSED = 3; + public final static int JOBSTATUS_RESUMING = 4; + public final static int JOBSTATUS_COMPLETED = 5; + public final static int JOBSTATUS_WINDOWWAIT = 6; + public final static int JOBSTATUS_STARTING = 7; + public final static int JOBSTATUS_DESTRUCTING = 8; + public final static int JOBSTATUS_ERROR = 9; + public final static int JOBSTATUS_ABORTING = 10; + public final static int JOBSTATUS_RESTARTING = 11; + public final static int JOBSTATUS_RUNNING_UNINSTALLED = 12; + public final static int JOBSTATUS_JOBENDCLEANUP = 13; + public final static int JOBSTATUS_JOBENDNOTIFICATION = 14; // Member variables. Modified: incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1205504&r1=1205503&r2=1205504&view=diff ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original) +++ incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Wed Nov 23 17:37:51 2011 @@ -1470,25 +1470,28 @@ public class JobManager implements IJobM list.add(jobQueue.actionToString(JobQueue.ACTION_RESCAN)); + // Per CONNECTORS-290, we need to be leaving priorities blank for jobs that aren't using them, + // so this will be changed to not include jobs where the priorities have been bashed to null. + // + // I've included ALL states that might have non-null doc priorities. This includes states + // corresponding to uninstalled connectors, since there is no transition that cleans out the + // document priorities in these states. The time during which a connector is uninstalled is + // expected to be short, because typically this state is the result of an installation procedure + // rather than willful action on the part of a user. + sb.append("EXISTS(SELECT 'x' FROM ").append(jobs.getTableName()).append(" t1 WHERE ") .append(database.buildConjunctionClause(list,new ClauseDescription[]{ new MultiClause("t1."+jobs.statusField,new Object[]{ - Jobs.statusToString(Jobs.STATUS_ACTIVE), - Jobs.statusToString(Jobs.STATUS_PAUSED), - Jobs.statusToString(Jobs.STATUS_ACTIVEWAIT), - Jobs.statusToString(Jobs.STATUS_PAUSEDWAIT), - Jobs.statusToString(Jobs.STATUS_ACTIVE), - Jobs.statusToString(Jobs.STATUS_READYFORSTARTUP), Jobs.statusToString(Jobs.STATUS_STARTINGUP), - Jobs.statusToString(Jobs.STATUS_ABORTINGSTARTINGUPFORRESTART), - Jobs.statusToString(Jobs.STATUS_ABORTINGSTARTINGUP), + Jobs.statusToString(Jobs.STATUS_ACTIVE), Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING), - Jobs.statusToString(Jobs.STATUS_PAUSEDSEEDING), - Jobs.statusToString(Jobs.STATUS_ACTIVEWAITSEEDING), - Jobs.statusToString(Jobs.STATUS_PAUSEDWAITSEEDING), - Jobs.statusToString(Jobs.STATUS_ABORTING), - Jobs.statusToString(Jobs.STATUS_ABORTINGFORRESTART), - Jobs.statusToString(Jobs.STATUS_ABORTINGFORRESTARTSEEDING)}), + Jobs.statusToString(Jobs.STATUS_ACTIVE_UNINSTALLED), + Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_UNINSTALLED), + Jobs.statusToString(Jobs.STATUS_ACTIVE_NOOUTPUT), + Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_NOOUTPUT), + Jobs.statusToString(Jobs.STATUS_ACTIVE_NEITHER), + Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_NEITHER) + }), new JoinClause("t1."+jobs.idField,"t0."+jobQueue.jobIDField)})) .append(") "); @@ -2121,7 +2124,8 @@ public class JobManager implements IJobM { IResultRow row = set.getRow(0); Double docPriority = (Double)row.getValue(jobQueue.docPriorityField); - scanRecord.addBins(docPriority); + if (docPriority != null) + scanRecord.addBins(docPriority); } return rval; } @@ -4563,21 +4567,7 @@ public class JobManager implements IJobM break; case Jobs.STATUS_ACTIVEWAIT: unwaitList.add(jobID); - if (connectionMgr.checkConnectorExists(connectionName)) - { - if (outputMgr.checkConnectorExists(outputName)) - newJobState = jobs.STATUS_ACTIVE; - else - newJobState = jobs.STATUS_ACTIVE_NOOUTPUT; - } - else - { - if (outputMgr.checkConnectorExists(outputName)) - newJobState = jobs.STATUS_ACTIVE_UNINSTALLED; - else - newJobState = jobs.STATUS_ACTIVE_NEITHER; - } - jobs.unwaitJob(jobID,newJobState,windowEnd); + jobs.unwaitJob(jobID,Jobs.STATUS_RESUMING,windowEnd); jobQueue.clearFailTimes(jobID); if (Logging.jobs.isDebugEnabled()) { @@ -4586,21 +4576,7 @@ public class JobManager implements IJobM break; case Jobs.STATUS_ACTIVEWAITSEEDING: unwaitList.add(jobID); - if (connectionMgr.checkConnectorExists(connectionName)) - { - if (outputMgr.checkConnectorExists(outputName)) - newJobState = jobs.STATUS_ACTIVESEEDING; - else - newJobState = jobs.STATUS_ACTIVESEEDING_NOOUTPUT; - } - else - { - if (outputMgr.checkConnectorExists(outputName)) - newJobState = jobs.STATUS_ACTIVESEEDING_UNINSTALLED; - else - newJobState = jobs.STATUS_ACTIVESEEDING_NEITHER; - } - jobs.unwaitJob(jobID,newJobState,windowEnd); + jobs.unwaitJob(jobID,Jobs.STATUS_RESUMINGSEEDING,windowEnd); jobQueue.clearFailTimes(jobID); if (Logging.jobs.isDebugEnabled()) { @@ -4623,6 +4599,22 @@ public class JobManager implements IJobM Logging.jobs.debug("Un-waited (but still paused) job "+jobID); } break; + case Jobs.STATUS_PAUSINGWAITING: + unwaitList.add(jobID); + jobs.unwaitJob(jobID,jobs.STATUS_PAUSING,windowEnd); + if (Logging.jobs.isDebugEnabled()) + { + Logging.jobs.debug("Un-waited (but still paused) job "+jobID); + } + break; + case Jobs.STATUS_PAUSINGWAITINGSEEDING: + unwaitList.add(jobID); + jobs.unwaitJob(jobID,jobs.STATUS_PAUSINGSEEDING,windowEnd); + if (Logging.jobs.isDebugEnabled()) + { + Logging.jobs.debug("Un-waited (but still paused) job "+jobID); + } + break; default: break; } @@ -4644,7 +4636,7 @@ public class JobManager implements IJobM database.endTransaction(); } } - + /** Put active or paused jobs in wait state, if they've exceeded their window. *@param currentTime is the current time in milliseconds since epoch. *@param waitList is filled in with the set of job ID's that were put into a wait state. @@ -4701,7 +4693,7 @@ public class JobManager implements IJobM case Jobs.STATUS_ACTIVE_UNINSTALLED: case Jobs.STATUS_ACTIVE_NOOUTPUT: case Jobs.STATUS_ACTIVE_NEITHER: - jobs.waitJob(jobID,Jobs.STATUS_ACTIVEWAIT); + jobs.waitJob(jobID,Jobs.STATUS_ACTIVEWAITING); if (Logging.jobs.isDebugEnabled()) { Logging.jobs.debug("Job "+jobID+" now in 'wait' state due to window end"); @@ -4711,7 +4703,7 @@ public class JobManager implements IJobM case Jobs.STATUS_ACTIVESEEDING_UNINSTALLED: case Jobs.STATUS_ACTIVESEEDING_NOOUTPUT: case Jobs.STATUS_ACTIVESEEDING_NEITHER: - jobs.waitJob(jobID,Jobs.STATUS_ACTIVEWAITSEEDING); + jobs.waitJob(jobID,Jobs.STATUS_ACTIVEWAITINGSEEDING); if (Logging.jobs.isDebugEnabled()) { Logging.jobs.debug("Job "+jobID+" now in 'wait' state due to window end"); @@ -4731,6 +4723,20 @@ public class JobManager implements IJobM Logging.jobs.debug("Job "+jobID+" now in 'wait paused' state due to window end"); } break; + case Jobs.STATUS_PAUSING: + jobs.waitJob(jobID,Jobs.STATUS_PAUSINGWAITING); + if (Logging.jobs.isDebugEnabled()) + { + Logging.jobs.debug("Job "+jobID+" now in 'wait paused' state due to window end"); + } + break; + case Jobs.STATUS_PAUSINGSEEDING: + jobs.waitJob(jobID,Jobs.STATUS_PAUSINGWAITINGSEEDING); + if (Logging.jobs.isDebugEnabled()) + { + Logging.jobs.debug("Job "+jobID+" now in 'wait paused' state due to window end"); + } + break; default: break; } @@ -6166,111 +6172,119 @@ public class JobManager implements IJobM } } - /** Complete the sequence that aborts jobs and makes them runnable again. - *@param timestamp is the current time. - *@param abortJobs is the set of IJobDescription objects that were aborted (and stopped). + /** Complete the sequence that resumes jobs, either from a pause or from a scheduling window + * wait. The logic will restore the job to an active state (many possibilities depending on + * connector status), and will record the jobs that have been so modified. + *@param timestamp is the current time in milliseconds since epoch. + *@param modifiedJobs is filled in with the set of IJobDescription objects that were resumed. */ - public void finishJobAborts(long timestamp, ArrayList abortJobs) + public void finishJobResumes(long timestamp, ArrayList modifiedJobs) throws ManifoldCFException { - while (true) + // Do the first query, getting the candidate jobs to be considered + StringBuilder sb = new StringBuilder("SELECT "); + ArrayList list = new ArrayList(); + + sb.append(jobs.idField) + .append(" FROM ").append(jobs.getTableName()).append(" WHERE ") + .append(database.buildConjunctionClause(list,new ClauseDescription[]{ + new MultiClause(jobs.statusField,new Object[]{ + jobs.statusToString(jobs.STATUS_RESUMING), + jobs.statusToString(jobs.STATUS_RESUMINGSEEDING) + })})); + + IResultSet set = database.performQuery(sb.toString(),list,null,null); + + int i = 0; + while (i < set.getRowCount()) { - long sleepAmt = 0L; - database.beginTransaction(); - try + IResultRow row = set.getRow(i++); + Long jobID = (Long)row.getValue(jobs.idField); + + // There are no secondary checks that need to be made; just resume + IJobDescription jobDesc = jobs.load(jobID,true); + modifiedJobs.add(jobDesc); + + jobs.finishResumeJob(jobID,timestamp); + + if (Logging.jobs.isDebugEnabled()) { - // The query I used to emit was: - // SELECT jobid FROM jobs t0 WHERE t0.status='X' AND NOT EXISTS(SELECT 'x' FROM jobqueue t1 WHERE - // t0.id=t1.jobid AND t1.status IN ('A','F')) - // Now the query is broken up so that Postgresql behaves more efficiently. + Logging.jobs.debug("Resumed job "+jobID); + } + } + } - // Do the first query, getting the candidate jobs to be considered - StringBuilder sb = new StringBuilder("SELECT "); - ArrayList list = new ArrayList(); + /** Complete the sequence that stops jobs, either for abort, pause, or because of a scheduling + * window. The logic will move the job to its next state (INACTIVE, PAUSED, ACTIVEWAIT), + * and will record the jobs that have been so modified. + *@param timestamp is the current time in milliseconds since epoch. + *@param modifiedJobs is filled in with the set of IJobDescription objects that were stopped. + */ + public void finishJobStops(long timestamp, ArrayList modifiedJobs) + throws ManifoldCFException + { + // The query I used to emit was: + // SELECT jobid FROM jobs t0 WHERE t0.status='X' AND NOT EXISTS(SELECT 'x' FROM jobqueue t1 WHERE + // t0.id=t1.jobid AND t1.status IN ('A','F')) + // Now the query is broken up so that Postgresql behaves more efficiently. + + // Do the first query, getting the candidate jobs to be considered + StringBuilder sb = new StringBuilder("SELECT "); + ArrayList list = new ArrayList(); - sb.append(jobs.idField).append(",") - .append(jobs.statusField) - .append(" FROM ").append(jobs.getTableName()).append(" WHERE ") - .append(database.buildConjunctionClause(list,new ClauseDescription[]{ - new MultiClause(jobs.statusField,new Object[]{ - jobs.statusToString(jobs.STATUS_ABORTING), - jobs.statusToString(jobs.STATUS_ABORTINGFORRESTART)})})) - .append(" FOR UPDATE"); + sb.append(jobs.idField) + .append(" FROM ").append(jobs.getTableName()).append(" WHERE ") + .append(database.buildConjunctionClause(list,new ClauseDescription[]{ + new MultiClause(jobs.statusField,new Object[]{ + jobs.statusToString(jobs.STATUS_ABORTING), + jobs.statusToString(jobs.STATUS_ABORTINGFORRESTART), + jobs.statusToString(jobs.STATUS_PAUSING), + jobs.statusToString(jobs.STATUS_PAUSINGSEEDING), + jobs.statusToString(jobs.STATUS_ACTIVEWAITING), + jobs.statusToString(jobs.STATUS_ACTIVEWAITINGSEEDING), + jobs.statusToString(jobs.STATUS_PAUSINGWAITING), + jobs.statusToString(jobs.STATUS_PAUSINGWAITINGSEEDING) + })})); - IResultSet set = database.performQuery(sb.toString(),list,null,null); + IResultSet set = database.performQuery(sb.toString(),list,null,null); - int i = 0; - while (i < set.getRowCount()) - { - IResultRow row = set.getRow(i++); - Long jobID = (Long)row.getValue(jobs.idField); + int i = 0; + while (i < set.getRowCount()) + { + IResultRow row = set.getRow(i++); + Long jobID = (Long)row.getValue(jobs.idField); - sb = new StringBuilder("SELECT "); - list.clear(); + sb = new StringBuilder("SELECT "); + list.clear(); - sb.append(jobQueue.idField).append(" FROM ").append(jobQueue.getTableName()).append(" WHERE ") - .append(database.buildConjunctionClause(list,new ClauseDescription[]{ - new UnitaryClause(jobQueue.jobIDField,jobID), - new MultiClause(jobQueue.statusField,new Object[]{ - jobQueue.statusToString(jobQueue.STATUS_ACTIVE), - jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY), - jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN), - jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY)})})) - .append(" ").append(database.constructOffsetLimitClause(0,1)); + sb.append(jobQueue.idField).append(" FROM ").append(jobQueue.getTableName()).append(" WHERE ") + .append(database.buildConjunctionClause(list,new ClauseDescription[]{ + new UnitaryClause(jobQueue.jobIDField,jobID), + new MultiClause(jobQueue.statusField,new Object[]{ + jobQueue.statusToString(jobQueue.STATUS_ACTIVE), + jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY), + jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN), + jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY)})})) + .append(" ").append(database.constructOffsetLimitClause(0,1)); - IResultSet confirmSet = database.performQuery(sb.toString(),list,null,null,1,null); + IResultSet confirmSet = database.performQuery(sb.toString(),list,null,null,1,null); - if (confirmSet.getRowCount() > 0) - continue; + if (confirmSet.getRowCount() > 0) + continue; - int status = jobs.stringToStatus((String)row.getValue(jobs.statusField)); - IJobDescription jobDesc = jobs.load(jobID,true); - abortJobs.add(jobDesc); - - switch (status) - { - case Jobs.STATUS_ABORTING: - // Mark status of job as "inactive" - jobs.finishAbortJob(jobID,timestamp); - if (Logging.jobs.isDebugEnabled()) - { - Logging.jobs.debug("Completed abort of job "+jobID); - } - break; - case Jobs.STATUS_ABORTINGFORRESTART: - // Do the restart sequence! Log the abort here; the startup thread will log the start. - jobs.startJob(jobID,null); - { - Logging.jobs.debug("Completed restart of job "+jobID); - } - break; - default: - throw new ManifoldCFException("Unexpected value for job status: "+Integer.toString(status)); - } - } - return; - } - catch (ManifoldCFException e) - { - database.signalRollback(); - if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT) - { - if (Logging.perf.isDebugEnabled()) - Logging.perf.debug("Aborted finishing job aborts: "+e.getMessage()); - sleepAmt = getRandomAmount(); - continue; - } - throw e; - } - catch (Error e) - { - database.signalRollback(); - throw e; - } - finally + // All the job's documents need to have their docpriority set to null, to clear dead wood out of the docpriority index. + // See CONNECTORS-290. + // We do this BEFORE updating the job state. + jobQueue.clearDocPriorities(jobID); + + IJobDescription jobDesc = jobs.load(jobID,true); + modifiedJobs.add(jobDesc); + + jobs.finishStopJob(jobID,timestamp); + + if (Logging.jobs.isDebugEnabled()) { - database.endTransaction(); - sleepFor(sleepAmt); + Logging.jobs.debug("Stopped job "+jobID); } } } @@ -6287,115 +6301,83 @@ public class JobManager implements IJobM public void resetJobs(long currentTime, ArrayList resetJobs) throws ManifoldCFException { - while (true) - { - long sleepAmt = 0L; - database.beginTransaction(); - try - { - // Query for all jobs that fulfill the criteria - // The query used to look like: - // - // SELECT id FROM jobs t0 WHERE status='D' AND NOT EXISTS(SELECT 'x' FROM jobqueue t1 WHERE - // t0.id=t1.jobid AND t1.status='P') - // - // Now, the query is broken up, for performance + // Query for all jobs that fulfill the criteria + // The query used to look like: + // + // SELECT id FROM jobs t0 WHERE status='D' AND NOT EXISTS(SELECT 'x' FROM jobqueue t1 WHERE + // t0.id=t1.jobid AND t1.status='P') + // + // Now, the query is broken up, for performance - // Do the first query, getting the candidate jobs to be considered - StringBuilder sb = new StringBuilder("SELECT "); - ArrayList list = new ArrayList(); + // Do the first query, getting the candidate jobs to be considered + StringBuilder sb = new StringBuilder("SELECT "); + ArrayList list = new ArrayList(); - sb.append(jobs.idField).append(" FROM ").append(jobs.getTableName()).append(" WHERE ") - .append(database.buildConjunctionClause(list,new ClauseDescription[]{ - new UnitaryClause(jobs.statusField,jobs.statusToString(jobs.STATUS_SHUTTINGDOWN))})) - .append(" FOR UPDATE"); + sb.append(jobs.idField).append(" FROM ").append(jobs.getTableName()).append(" WHERE ") + .append(database.buildConjunctionClause(list,new ClauseDescription[]{ + new UnitaryClause(jobs.statusField,jobs.statusToString(jobs.STATUS_SHUTTINGDOWN))})); - IResultSet set = database.performQuery(sb.toString(),list,null,null); + IResultSet set = database.performQuery(sb.toString(),list,null,null); - int i = 0; - while (i < set.getRowCount()) - { - IResultRow row = set.getRow(i++); - Long jobID = (Long)row.getValue(jobs.idField); + int i = 0; + while (i < set.getRowCount()) + { + IResultRow row = set.getRow(i++); + Long jobID = (Long)row.getValue(jobs.idField); - // Check to be sure the job is a candidate for shutdown - sb = new StringBuilder("SELECT "); - list.clear(); + // Check to be sure the job is a candidate for shutdown + sb = new StringBuilder("SELECT "); + list.clear(); - sb.append(jobQueue.idField).append(" FROM ").append(jobQueue.getTableName()).append(" WHERE ") - .append(database.buildConjunctionClause(list,new ClauseDescription[]{ - new UnitaryClause(jobQueue.jobIDField,jobID), - new MultiClause(jobQueue.statusField,new Object[]{ - jobQueue.statusToString(jobQueue.STATUS_PURGATORY), - jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED)})})) - .append(" ").append(database.constructOffsetLimitClause(0,1)); + sb.append(jobQueue.idField).append(" FROM ").append(jobQueue.getTableName()).append(" WHERE ") + .append(database.buildConjunctionClause(list,new ClauseDescription[]{ + new UnitaryClause(jobQueue.jobIDField,jobID), + new MultiClause(jobQueue.statusField,new Object[]{ + jobQueue.statusToString(jobQueue.STATUS_PURGATORY), + jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED)})})) + .append(" ").append(database.constructOffsetLimitClause(0,1)); - IResultSet confirmSet = database.performQuery(sb.toString(),list,null,null,1,null); + IResultSet confirmSet = database.performQuery(sb.toString(),list,null,null,1,null); - if (confirmSet.getRowCount() > 0) - continue; + if (confirmSet.getRowCount() > 0) + continue; - // The shutting-down phase is complete. However, we need to check if there are any outstanding - // PENDING or PENDINGPURGATORY records before we can decide what to do. - sb = new StringBuilder("SELECT "); - list.clear(); + // The shutting-down phase is complete. However, we need to check if there are any outstanding + // PENDING or PENDINGPURGATORY records before we can decide what to do. + sb = new StringBuilder("SELECT "); + list.clear(); - sb.append(jobQueue.idField).append(" FROM ").append(jobQueue.getTableName()).append(" WHERE ") - .append(database.buildConjunctionClause(list,new ClauseDescription[]{ - new UnitaryClause(jobQueue.jobIDField,jobID), - new MultiClause(jobQueue.statusField,new Object[]{ - jobQueue.statusToString(jobQueue.STATUS_PENDING), - jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY)})})) - .append(" ").append(database.constructOffsetLimitClause(0,1)); + sb.append(jobQueue.idField).append(" FROM ").append(jobQueue.getTableName()).append(" WHERE ") + .append(database.buildConjunctionClause(list,new ClauseDescription[]{ + new UnitaryClause(jobQueue.jobIDField,jobID), + new MultiClause(jobQueue.statusField,new Object[]{ + jobQueue.statusToString(jobQueue.STATUS_PENDING), + jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY)})})) + .append(" ").append(database.constructOffsetLimitClause(0,1)); - confirmSet = database.performQuery(sb.toString(),list,null,null,1,null); + confirmSet = database.performQuery(sb.toString(),list,null,null,1,null); - if (confirmSet.getRowCount() > 0) - { - // This job needs to re-enter the active state. Make that happen. - jobs.returnJobToActive(jobID); - if (Logging.jobs.isDebugEnabled()) - { - Logging.jobs.debug("Job "+jobID+" is re-entering active state"); - } - } - else - { - // This job should be marked as finished. - IJobDescription jobDesc = jobs.load(jobID,true); - resetJobs.add(jobDesc); - - // Label the job "finished" - jobs.finishJob(jobID,currentTime); - if (Logging.jobs.isDebugEnabled()) - { - Logging.jobs.debug("Job "+jobID+" now completed"); - } - } - } - return; - } - catch (ManifoldCFException e) + if (confirmSet.getRowCount() > 0) { - database.signalRollback(); - if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT) + // This job needs to re-enter the active state. Make that happen. + jobs.returnJobToActive(jobID); + if (Logging.jobs.isDebugEnabled()) { - if (Logging.perf.isDebugEnabled()) - Logging.perf.debug("Aborted resetting jobs: "+e.getMessage()); - sleepAmt = getRandomAmount(); - continue; + Logging.jobs.debug("Job "+jobID+" is re-entering active state"); } - throw e; } - catch (Error e) - { - database.signalRollback(); - throw e; - } - finally + else { - database.endTransaction(); - sleepFor(sleepAmt); + // This job should be marked as finished. + IJobDescription jobDesc = jobs.load(jobID,true); + resetJobs.add(jobDesc); + + // Label the job "finished" + jobs.finishJob(jobID,currentTime); + if (Logging.jobs.isDebugEnabled()) + { + Logging.jobs.debug("Job "+jobID+" now completed"); + } } } } @@ -6490,7 +6472,16 @@ public class JobManager implements IJobM Jobs.statusToString(Jobs.STATUS_ACTIVEWAIT), Jobs.statusToString(Jobs.STATUS_ACTIVEWAITSEEDING), Jobs.statusToString(Jobs.STATUS_PAUSEDWAIT), - Jobs.statusToString(Jobs.STATUS_PAUSEDWAITSEEDING)})}); + Jobs.statusToString(Jobs.STATUS_PAUSEDWAITSEEDING), + Jobs.statusToString(Jobs.STATUS_PAUSING), + Jobs.statusToString(Jobs.STATUS_PAUSINGSEEDING), + Jobs.statusToString(Jobs.STATUS_ACTIVEWAITING), + Jobs.statusToString(Jobs.STATUS_ACTIVEWAITINGSEEDING), + Jobs.statusToString(Jobs.STATUS_PAUSINGWAITING), + Jobs.statusToString(Jobs.STATUS_PAUSINGWAITINGSEEDING), + Jobs.statusToString(Jobs.STATUS_RESUMING), + Jobs.statusToString(Jobs.STATUS_RESUMINGSEEDING) + })}); return makeJobStatus(whereClause,whereParams,includeCounts); } @@ -6714,6 +6705,18 @@ public class JobManager implements IJobM case Jobs.STATUS_ABORTINGSTARTINGUPFORRESTART: rstatus = JobStatus.JOBSTATUS_RESTARTING; break; + case Jobs.STATUS_PAUSING: + case Jobs.STATUS_PAUSINGSEEDING: + case Jobs.STATUS_ACTIVEWAITING: + case Jobs.STATUS_ACTIVEWAITINGSEEDING: + case Jobs.STATUS_PAUSINGWAITING: + case Jobs.STATUS_PAUSINGWAITINGSEEDING: + rstatus = JobStatus.JOBSTATUS_STOPPING; + break; + case Jobs.STATUS_RESUMING: + case Jobs.STATUS_RESUMINGSEEDING: + rstatus = JobStatus.JOBSTATUS_RESUMING; + break; case Jobs.STATUS_PAUSED: case Jobs.STATUS_PAUSEDSEEDING: rstatus = JobStatus.JOBSTATUS_PAUSED; Modified: incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java?rev=1205504&r1=1205503&r2=1205504&view=diff ============================================================================== --- incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java (original) +++ incubator/lcf/branches/CONNECTORS-286/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java Wed Nov 23 17:37:51 2011 @@ -547,11 +547,7 @@ public class JobQueue extends org.apache // Delete PENDING and ACTIVE entries HashMap map = new HashMap(); map.put(statusField,statusToString(STATUS_PENDINGPURGATORY)); - // Do not reset priorities. This means, of course, that they may be out of date - but they are probably more accurate in their current form - // than being set back to some arbitrary value. - // The alternative, which would be to reprioritize all the documents at this point, is somewhat attractive, but let's see if we can get away - // without for now. - //map.put(docPriorityField,new Double(1.0)); + // Do not reset priorities here! They should all be blank at this point. map.put(checkTimeField,new Long(0L)); map.put(checkActionField,actionToString(ACTION_RESCAN)); map.put(failTimeField,null); @@ -633,14 +629,31 @@ public class JobQueue extends org.apache noteModifications(0,1,0); } + /** Clear all document priorities for a job */ + public void clearDocPriorities(Long jobID) + throws ManifoldCFException + { + HashMap map = new HashMap(); + map.put(prioritySetField,null); + map.put(docPriorityField,null); + ArrayList list = new ArrayList(); + String query = buildConjunctionClause(list,new ClauseDescription[]{ + new UnitaryClause(jobIDField,jobID)}); + performUpdate(map,"WHERE "+query,list,null); + noteModifications(0,1,0); + } + /** Set the "completed" status for a record. */ public void updateCompletedRecord(Long recID, int currentStatus) throws ManifoldCFException { + HashMap map = new HashMap(); + int newStatus; String actionFieldValue; Long checkTimeValue; + switch (currentStatus) { case STATUS_ACTIVE: @@ -648,27 +661,26 @@ public class JobQueue extends org.apache newStatus = STATUS_COMPLETE; actionFieldValue = null; checkTimeValue = null; + // Remove document priority; we don't want to pollute the queue. See CONNECTORS-290. + map.put(docPriorityField,null); + map.put(prioritySetField,null); break; case STATUS_ACTIVENEEDRESCAN: case STATUS_ACTIVENEEDRESCANPURGATORY: newStatus = STATUS_PENDINGPURGATORY; actionFieldValue = actionToString(ACTION_RESCAN); checkTimeValue = new Long(0L); + // Leave doc priority unchanged. break; default: throw new ManifoldCFException("Unexpected jobqueue status - record id "+recID.toString()+", expecting active status, saw "+Integer.toString(currentStatus)); } - HashMap map = new HashMap(); map.put(statusField,statusToString(newStatus)); map.put(checkTimeField,checkTimeValue); map.put(checkActionField,actionFieldValue); map.put(failTimeField,null); map.put(failCountField,null); - // Don't rejigger document priority, because it is hard to calculate and because what's there is probably better - // than any arbitrary value I'd use. - //map.put(docPriorityField,new Double(1.0)); - //map.put(prioritySetField,new Long(0L)); ArrayList list = new ArrayList(); String query = buildConjunctionClause(list,new ClauseDescription[]{ new UnitaryClause(idField,recID)}); @@ -705,6 +717,7 @@ public class JobQueue extends org.apache } /** Set the status on a record, including check time and priority. + * The status set MUST be a PENDING or PENDINGPURGATORY status. *@param id is the job queue id. *@param status is the desired status *@param checkTime is the check time.