Return-Path: Delivered-To: apmail-sling-commits-archive@www.apache.org Received: (qmail 77519 invoked from network); 13 Dec 2010 16:37:30 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 13 Dec 2010 16:37:30 -0000 Received: (qmail 38642 invoked by uid 500); 13 Dec 2010 16:37:30 -0000 Delivered-To: apmail-sling-commits-archive@sling.apache.org Received: (qmail 38575 invoked by uid 500); 13 Dec 2010 16:37:28 -0000 Mailing-List: contact commits-help@sling.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@sling.apache.org Delivered-To: mailing list commits@sling.apache.org Received: (qmail 38567 invoked by uid 99); 13 Dec 2010 16:37:28 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Dec 2010 16:37:28 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Dec 2010 16:37:24 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 29F1323889D7; Mon, 13 Dec 2010 16:37:03 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1045225 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs: ./ jcr/ queues/ Date: Mon, 13 Dec 2010 16:37:03 -0000 To: commits@sling.apache.org From: cziegeler@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101213163703.29F1323889D7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cziegeler Date: Mon Dec 13 16:37:02 2010 New Revision: 1045225 URL: http://svn.apache.org/viewvc?rev=1045225&view=rev Log: Several fixes: fix timeout handling and clear statistics. Refactoring: rename *cleanUp to Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java?rev=1045225&r1=1045224&r2=1045225&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java Mon Dec 13 16:37:02 2010 @@ -211,15 +211,17 @@ public class DefaultJobManager * is idle for two consecutive clean up calls, it is removed. * @see java.lang.Runnable#run() */ - public void cleanup() { + private void cleanup() { // check for idle queue // we synchronize to avoid creating a queue which is about to be removed during cleanup synchronized ( queuesLock ) { final Iterator> i = this.queues.entrySet().iterator(); while ( i.hasNext() ) { final Map.Entry current = i.next(); + // clean up final AbstractJobQueue jbq = current.getValue(); - if ( jbq.isMarkedForCleanUp() ) { + jbq.cleanUp(); + if ( jbq.isMarkedForRemoval() ) { // close jbq.close(); // copy statistics @@ -228,7 +230,7 @@ public class DefaultJobManager i.remove(); } else { // mark to be removed during next cycle - jbq.markForCleanUp(); + jbq.markForRemoval(); } } } @@ -701,8 +703,8 @@ public class DefaultJobManager // remove the queue with the old name this.queues.remove(queue.getName()); // check if we can close or have to rename - queue.markForCleanUp(); - if ( queue.isMarkedForCleanUp() ) { + queue.markForRemoval(); + if ( queue.isMarkedForRemoval() ) { // close queue.close(); // copy statistics Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java?rev=1045225&r1=1045224&r2=1045225&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java Mon Dec 13 16:37:02 2010 @@ -273,5 +273,7 @@ public class StatisticsImpl implements S this.finishedJobs = 0; this.failedJobs = 0; this.cancelledJobs = 0; + this.activeJobs = 0; + this.queuedJobs = 0; } } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1045225&r1=1045224&r2=1045225&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Mon Dec 13 16:37:02 2010 @@ -924,8 +924,10 @@ public class PersistenceHandler implemen synchronized ( this.backgroundLock ) { if ( this.running ) { try { - final Node eventNode = (Node) this.backgroundSession.getItem(path); - this.tryToLoadJob(eventNode, this.unloadedJobs); + if ( this.backgroundSession.itemExists(path) ) { + final Node eventNode = (Node) this.backgroundSession.getItem(path); + this.tryToLoadJob(eventNode, this.unloadedJobs); + } } catch (RepositoryException re) { this.ignoreException(re); } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1045225&r1=1045224&r2=1045225&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Mon Dec 13 16:37:02 2010 @@ -72,8 +72,8 @@ public abstract class AbstractJobQueue /** Are we still running? */ protected volatile boolean running; - /** Are we marked for cleanup */ - private volatile boolean markedForCleanUp = false; + /** Are we marked for removal */ + private volatile boolean markedForRemoval = false; /** Is the queue currently waiting(sleeping) */ protected volatile boolean isWaiting = false; @@ -110,7 +110,7 @@ public abstract class AbstractJobQueue * @see org.apache.sling.event.jobs.Queue#getStateInfo() */ public String getStateInfo() { - return "isWaiting=" + this.isWaiting + ", markedForCleanUp=" + this.markedForCleanUp + ", suspendedSince=" + this.suspendedSince.longValue(); + return "isWaiting=" + this.isWaiting + ", markedForRemoval=" + this.markedForRemoval + ", suspendedSince=" + this.suspendedSince.longValue(); } /** @@ -180,7 +180,7 @@ public abstract class AbstractJobQueue /** * Periodically cleanup. */ - public void cleanup() { + public void cleanUp() { if ( this.running ) { // check for jobs that were started but never got an aknowledge final long tooOld = System.currentTimeMillis() - DEFAULT_WAIT_FOR_ACK_IN_MS; @@ -215,8 +215,13 @@ public abstract class AbstractJobQueue process = this.startedJobsLists.remove(info.uniqueId) != null; } if ( process ) { - this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", EventUtil.toString(info.event), info.uniqueId); - this.finishedJob(info.event, true); + this.decQueued(); + if ( !info.reschedule() ) { + checkForNotify(null); + } else { + this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", EventUtil.toString(info.event), info.uniqueId); + checkForNotify(info); + } } } } @@ -357,27 +362,27 @@ public abstract class AbstractJobQueue notifyFinished(reprocessInfo); } - protected boolean canBeMarkedForCleanUp() { + protected boolean canBeMarkedForRemoval() { return this.isEmpty() && !this.isWaiting; } /** - * Mark this queue for cleanup. + * Mark this queue for removal. */ - public void markForCleanUp() { - if ( this.canBeMarkedForCleanUp() ) { - this.markedForCleanUp = true; + public void markForRemoval() { + if ( this.canBeMarkedForRemoval() ) { + this.markedForRemoval = true; } } /** - * Check if this queue is marked for cleanup + * Check if this queue is marked for removal */ - public boolean isMarkedForCleanUp() { - if ( this.markedForCleanUp ) { - if ( this.canBeMarkedForCleanUp() ) { + public boolean isMarkedForRemoval() { + if ( this.markedForRemoval ) { + if ( this.canBeMarkedForRemoval() ) { return true; } - this.markedForCleanUp = false; + this.markedForRemoval = false; } return false; } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java?rev=1045225&r1=1045224&r2=1045225&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java Mon Dec 13 16:37:02 2010 @@ -99,8 +99,8 @@ public abstract class AbstractParallelJo } @Override - protected boolean canBeMarkedForCleanUp() { - boolean result = super.canBeMarkedForCleanUp(); + protected boolean canBeMarkedForRemoval() { + boolean result = super.canBeMarkedForRemoval(); if ( result ) { result = this.jobCount == 0; } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1045225&r1=1045224&r2=1045225&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java Mon Dec 13 16:37:02 2010 @@ -65,8 +65,8 @@ public final class TopicRoundRobinJobQue } @Override - protected boolean canBeMarkedForCleanUp() { - boolean result = super.canBeMarkedForCleanUp(); + protected boolean canBeMarkedForRemoval() { + boolean result = super.canBeMarkedForRemoval(); if ( result ) { result = !this.isWaitingForNext; }