Return-Path: Delivered-To: apmail-sling-commits-archive@www.apache.org Received: (qmail 36350 invoked from network); 10 Feb 2010 15:38:48 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 10 Feb 2010 15:38:48 -0000 Received: (qmail 19488 invoked by uid 500); 10 Feb 2010 15:38:48 -0000 Delivered-To: apmail-sling-commits-archive@sling.apache.org Received: (qmail 19434 invoked by uid 500); 10 Feb 2010 15:38:48 -0000 Mailing-List: contact commits-help@sling.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@sling.apache.org Delivered-To: mailing list commits@sling.apache.org Received: (qmail 19425 invoked by uid 99); 10 Feb 2010 15:38:48 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Feb 2010 15:38:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Feb 2010 15:38:37 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C03E82388901; Wed, 10 Feb 2010 15:38:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r908544 - in /sling/trunk/bundles/extensions/event: ./ src/main/java/org/apache/sling/event/ src/main/java/org/apache/sling/event/impl/ src/main/java/org/apache/sling/event/impl/job/ src/main/resources/META-INF/ Date: Wed, 10 Feb 2010 15:38:15 -0000 To: commits@sling.apache.org From: cziegeler@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100210153815.C03E82388901@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cziegeler Date: Wed Feb 10 15:38:14 2010 New Revision: 908544 URL: http://svn.apache.org/viewvc?rev=908544&view=rev Log: SLING-1365 : Limit the number of parallel jobs First implementation for job queues. Modified: sling/trunk/bundles/extensions/event/NOTICE sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java sling/trunk/bundles/extensions/event/src/main/resources/META-INF/NOTICE Modified: sling/trunk/bundles/extensions/event/NOTICE URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/NOTICE?rev=908544&r1=908543&r2=908544&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/NOTICE (original) +++ sling/trunk/bundles/extensions/event/NOTICE Wed Feb 10 15:38:14 2010 @@ -1,5 +1,5 @@ Apache Sling Event -Copyright 2008-2009 The Apache Software Foundation +Copyright 2008-2010 The Apache Software Foundation Apache Sling is based on source code originally developed by Day Software (http://www.day.com/). Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=908544&r1=908543&r2=908544&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java Wed Feb 10 15:38:14 2010 @@ -72,8 +72,12 @@ public static final String PROPERTY_JOB_ID = "event.job.id"; /** The property to set if a job can be run parallel to any other job. - * For now the property should only contain the values true - * or false as a string value. + * The following values are supported: + * - boolean value true and false + * - string value true and false + * - integer value higher than 1 - if this is specified jobs are run in + * parallel but never more than the specified number. + * * We might want to use different values in the future for enhanced * parallel job handling. */ public static final String PROPERTY_JOB_PARALLEL = "event.job.parallel"; @@ -93,13 +97,18 @@ /** The property to set to put the jobs into a separate job queue. This property * specifies the name of the job queue. If the job queue does not exists yet * a new queue is created. - * If a job queue is used, the jobs are never executed in parallel from this queue! + * If a ordered job queue is used, the jobs are never executed in parallel + * from this queue! For non ordered queues the {@link #PROPERTY_JOB_PARALLEL} + * with an integer value higher than 1 can be used to specify the maximum number + * of parallel jobs for this queue. */ public static final String PROPERTY_JOB_QUEUE_NAME = "event.job.queuename"; /** If this property is set with any value, the queue processes the jobs in the same * order as they have arrived. - * This property has only an effect if {@link #PROPERTY_JOB_QUEUE_NAME} is specified. + * This property has only an effect if {@link #PROPERTY_JOB_QUEUE_NAME} is specified + * and starting with version 2.2 this value is only checked in the first job for this + * queue. */ public static final String PROPERTY_JOB_QUEUE_ORDERED = "event.job.queueordered"; Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=908544&r1=908543&r2=908544&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Wed Feb 10 15:38:14 2010 @@ -599,7 +599,7 @@ */ public void run() { while ( running && !jq.isFinished() ) { - logger.info("Starting job queue {}", queueName); + logger.info("Starting {}job queue {}", (orderedQueue ? "ordered " : ""), queueName); try { runJobQueue(queueName, jq); } catch (Throwable t) { @@ -650,18 +650,36 @@ } if ( info != null && this.running && !jobQueue.isFinished() ) { - synchronized ( jobQueue.getLock()) { - final EventInfo processInfo = info; - info = null; - final Status status = this.executeJob(processInfo, jobQueue); - if ( status == Status.SUCCESS ) { + final EventInfo processInfo = info; + info = null; + if ( jobQueue.isOrdered() ) { + // if we are ordered we simply wait for the finish + synchronized ( jobQueue.getLock()) { + final Status status = this.executeJob(processInfo, jobQueue); + if ( status == Status.SUCCESS ) { + try { + info = jobQueue.waitForFinish(); + } catch (InterruptedException e) { + this.ignoreException(e); + } + } else if ( status == Status.RESCHEDULE ) { + info = jobQueue.reschedule(processInfo, this.scheduler); + } + } + } else { + final int maxJobs = ParallelInfo.getMaxNumberOfParallelJobs(processInfo.event); + synchronized ( jobQueue.getLock() ) { try { - info = jobQueue.waitForFinish(); + jobQueue.acquireSlot(maxJobs); } catch (InterruptedException e) { this.ignoreException(e); } - } else if ( status == Status.RESCHEDULE ) { - info = jobQueue.reschedule(processInfo, this.scheduler); + } + if ( this.running && !jobQueue.isFinished() ) { + final Status status = this.executeJob(processInfo, jobQueue); + if ( status == Status.RESCHEDULE ) { + jobQueue.reschedule(processInfo, this.scheduler); + } } } } @@ -1432,7 +1450,11 @@ if ( info != null ) { reprocessInfo = jobQueue.reschedule(info, this.scheduler); } - jobQueue.notifyFinish(reprocessInfo); + if ( jobQueue.isOrdered() ) { + jobQueue.notifyFinish(reprocessInfo); + } else { + jobQueue.freeSlot(); + } } } } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java?rev=908544&r1=908543&r2=908544&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java Wed Feb 10 15:38:14 2010 @@ -57,6 +57,8 @@ /** Ordered Queue? */ private final boolean orderedQueue; + private volatile int jobCount; + public JobBlockingQueue(final String name, final boolean orderedQueue, final Logger logger) { @@ -65,6 +67,10 @@ this.logger = logger; } + /** + * Wait for the job to be finished. + * This is called if the queue is ordered. + */ public EventInfo waitForFinish() throws InterruptedException { this.isWaiting = true; this.markForCleanUp = false; @@ -75,21 +81,56 @@ return object; } + /** + * Mark this queue for cleanup. + */ public void markForCleanUp() { if ( !this.isWaiting ) { this.markForCleanUp = true; } } + /** + * Acquire a processing slot. + * This method is called if the queue is not ordered. + * @param maxJobs + */ + public void acquireSlot(final int maxJobs) throws InterruptedException { + if ( jobCount >= maxJobs ) { + this.isWaiting = true; + this.markForCleanUp = false; + this.lock.wait(); + this.isWaiting = false; + } + } + + /** + * Free a slot when a job processing is finished. + */ + public void freeSlot() { + if ( this.isWaiting ) { + this.lock.notify(); + } + } + + /** + * Check if this queue is marked for cleanup + */ public boolean isMarkedForCleanUp() { return !this.isWaiting && this.markForCleanUp; } + /** + * Notify a finished job - for ordered queues + */ public void notifyFinish(EventInfo i) { this.eventInfo = i; this.lock.notify(); } + /** + * Return the lock for this queue. + */ public Object getLock() { return lock; } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java?rev=908544&r1=908543&r2=908544&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java Wed Feb 10 15:38:14 2010 @@ -92,4 +92,27 @@ } return ParallelInfo.SERIAL; } + + /** + * Return the maximum number of parallel jobs for named queues. + * @param job The job + */ + public static int getMaxNumberOfParallelJobs(final Event job) { + Object value = job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL); + if ( value instanceof String ) { + // try to parse the value + try { + value = Integer.valueOf((String)value); + } catch (NumberFormatException ne) { + // we ignore this + } + } + if ( value instanceof Number ) { + final int result = ((Number)value).intValue(); + if ( result > 1 ) { + return result; + } + } + return 1; + } } \ No newline at end of file Modified: sling/trunk/bundles/extensions/event/src/main/resources/META-INF/NOTICE URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/META-INF/NOTICE?rev=908544&r1=908543&r2=908544&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/resources/META-INF/NOTICE (original) +++ sling/trunk/bundles/extensions/event/src/main/resources/META-INF/NOTICE Wed Feb 10 15:38:14 2010 @@ -1,5 +1,5 @@ Apache Sling Event -Copyright 2008-2009 The Apache Software Foundation +Copyright 2008-2010 The Apache Software Foundation Apache Sling is based on source code originally developed by Day Software (http://www.day.com/).