Return-Path: X-Original-To: apmail-sling-commits-archive@www.apache.org Delivered-To: apmail-sling-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 733C0109D9 for ; Wed, 5 Jun 2013 15:56:07 +0000 (UTC) Received: (qmail 6326 invoked by uid 500); 5 Jun 2013 15:56:07 -0000 Delivered-To: apmail-sling-commits-archive@sling.apache.org Received: (qmail 6258 invoked by uid 500); 5 Jun 2013 15:56:05 -0000 Mailing-List: contact commits-help@sling.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@sling.apache.org Delivered-To: mailing list commits@sling.apache.org Received: (qmail 6251 invoked by uid 99); 5 Jun 2013 15:56:04 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Jun 2013 15:56:04 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Jun 2013 15:56:00 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C8F282388B46; Wed, 5 Jun 2013 15:55:39 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1489934 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs: ./ queues/ stats/ Date: Wed, 05 Jun 2013 15:55:39 -0000 To: commits@sling.apache.org From: cziegeler@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130605155539.C8F282388B46@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cziegeler Date: Wed Jun 5 15:55:39 2013 New Revision: 1489934 URL: http://svn.apache.org/r1489934 Log: SLING-2906 : Queue might be outdated and closed while still processing Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1489934&r1=1489933&r2=1489934&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Wed Jun 5 15:55:39 2013 @@ -205,8 +205,6 @@ public class JobManagerImpl final Iterator i = this.queues.values().iterator(); while ( i.hasNext() ) { final AbstractJobQueue jbq = i.next(); - // update mbeans - ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq)); jbq.close(); // update mbeans ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq)); @@ -244,19 +242,14 @@ public class JobManagerImpl while ( i.hasNext() ) { final Map.Entry current = i.next(); final AbstractJobQueue jbq = current.getValue(); - if ( jbq.isMarkedForRemoval() ) { - logger.debug("Removing idle Job Queue {}", jbq); - // close - jbq.close(); + if ( jbq.tryToClose() ) { + logger.debug("Removing idle job queue {}", jbq); // copy statistics this.baseStatistics.add(jbq); // remove i.remove(); // update mbeans ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq)); - } else { - // mark to be removed during next cycle - jbq.markForRemoval(); } } } @@ -377,10 +370,7 @@ public class JobManagerImpl // remove the queue with the old name this.queues.remove(queue.getName()); // check if we can close or have to rename - queue.markForRemoval(); - if ( queue.isMarkedForRemoval() ) { - // close - queue.close(); + if ( queue.tryToClose() ) { // copy statistics this.baseStatistics.add(queue); // update mbeans @@ -388,7 +378,12 @@ public class JobManagerImpl } else { queue.outdate(); // readd with new name - this.queues.put(queue.getName(), queue); + String newName = queue.getName(); + int index = 0; + while ( this.queues.containsKey(newName) ) { + newName = queue.getName() + '$' + String.valueOf(index++); + } + this.queues.put(newName, queue); // update mbeans ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, queue)); } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1489934&r1=1489933&r2=1489934&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Wed Jun 5 15:55:39 2013 @@ -82,9 +82,6 @@ public abstract class AbstractJobQueue /** Are we still running? */ protected volatile boolean running; - /** Are we marked for removal */ - private volatile boolean markedForRemoval = false; - /** Is the queue currently waiting(sleeping) */ protected volatile boolean isWaiting = false; @@ -131,7 +128,6 @@ public abstract class AbstractJobQueue public String getStateInfo() { synchronized ( this.suspendLock ) { return "isWaiting=" + this.isWaiting + - ", markedForRemoval=" + this.markedForRemoval + ", suspendedSince=" + this.suspendedSince + ", asyncJobs=" + this.asyncCounter.get(); } @@ -151,7 +147,7 @@ public abstract class AbstractJobQueue try { runJobQueue(); - } catch (Throwable t) { //NOSONAR + } catch (final Throwable t) { //NOSONAR logger.error("Job queue " + queueName + " stopped with exception: " + t.getMessage() + ". Restarting.", t); } } @@ -176,7 +172,6 @@ public abstract class AbstractJobQueue public void close() { this.running = false; this.logger.debug("Shutting down job queue {}", queueName); - this.logger.debug("Waking up sleeping queue {}", queueName); this.resume(); if ( this.isWaiting ) { this.logger.debug("Waking up waiting queue {}", this.queueName); @@ -195,6 +190,24 @@ public abstract class AbstractJobQueue } /** + * Check if the queue can be closed + */ + public boolean tryToClose() { + // resume the queue as we want to close it! + this.resume(); + // check if possible + if ( this.canBeClosed() ) { + this.close(); + return true; + } + return false; + } + + protected boolean canBeClosed() { + return this.isEmpty() && !this.isWaiting && !this.isSuspended() && this.asyncCounter.get() == 0; + } + + /** * Periodically check for started jobs without an acknowledge. */ public void checkForUnprocessedJobs() { @@ -400,32 +413,6 @@ public abstract class AbstractJobQueue notifyFinished(reprocessInfo); } - protected boolean canBeMarkedForRemoval() { - return this.isEmpty() && !this.isWaiting && !this.isSuspended() && this.asyncCounter.get() == 0; - } - - /** - * Mark this queue for removal. - */ - public void markForRemoval() { - if ( this.canBeMarkedForRemoval() ) { - this.markedForRemoval = true; - } - } - - /** - * Check if this queue is marked for removal - */ - public boolean isMarkedForRemoval() { - if ( this.markedForRemoval ) { - if ( this.canBeMarkedForRemoval() ) { - return true; - } - this.markedForRemoval = false; - } - return false; - } - /** * Get the name of the job queue. */ @@ -691,6 +678,7 @@ public abstract class AbstractJobQueue public void resume() { synchronized ( this.suspendLock ) { if ( this.suspendedSince != -1 ) { + this.logger.debug("Waking up suspended queue {}", queueName); this.suspendedSince = -1; this.suspendLock.notify(); } @@ -704,6 +692,7 @@ public abstract class AbstractJobQueue public void suspend() { synchronized ( this.suspendLock ) { if ( this.suspendedSince == -1 ) { + this.logger.debug("Suspending queue {}", queueName); this.suspendedSince = System.currentTimeMillis(); } } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java?rev=1489934&r1=1489933&r2=1489934&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java Wed Jun 5 15:55:39 2013 @@ -106,8 +106,8 @@ public abstract class AbstractParallelJo } @Override - protected boolean canBeMarkedForRemoval() { - boolean result = super.canBeMarkedForRemoval(); + protected boolean canBeClosed() { + boolean result = super.canBeClosed(); if ( result ) { result = this.jobCount == 0; } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1489934&r1=1489933&r2=1489934&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Wed Jun 5 15:55:39 2013 @@ -46,6 +46,8 @@ public final class OrderedJobQueue exten /** Lock and status object for handling the sleep phase. */ private final SleepLock sleepLock = new SleepLock(); + private boolean isWaitingForNext = false; + /** The queue - we use a set which is sorted by job creation date. */ private final Set queue = new TreeSet(new Comparator() { @@ -75,7 +77,16 @@ public final class OrderedJobQueue exten @Override public String getStateInfo() { - return super.getStateInfo() + ", isSleepingUntil=" + this.sleepLock.sleepingSince; + return super.getStateInfo() + ", isSleepingUntil=" + this.sleepLock.sleepingSince + ", isWaitingForNext=" + this.isWaitingForNext; + } + + @Override + protected boolean canBeClosed() { + boolean result = super.canBeClosed(); + if ( result ) { + result = this.isWaitingForNext; + } + return result; } @Override @@ -124,6 +135,7 @@ public final class OrderedJobQueue exten synchronized ( this.queue ) { this.queue.add(handler); this.queue.notify(); + this.isWaitingForNext = false; } } @@ -131,6 +143,7 @@ public final class OrderedJobQueue exten protected JobHandler take() { synchronized ( this.queue ) { while ( this.queue.isEmpty() ) { + this.isWaitingForNext = true; try { this.queue.wait(); } catch (final InterruptedException e) { Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1489934&r1=1489933&r2=1489934&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Wed Jun 5 15:55:39 2013 @@ -39,6 +39,8 @@ public final class ParallelJobQueue exte /** The queue. */ private final BlockingQueue queue = new LinkedBlockingQueue(); + private boolean isWaitingForNext = false; + public ParallelJobQueue(final String name, final InternalQueueConfiguration config, final JobConsumerManager jobConsumerManager, @@ -48,8 +50,23 @@ public final class ParallelJobQueue exte } @Override + public String getStateInfo() { + return super.getStateInfo() + ", isWaitingForNext=" + this.isWaitingForNext; + } + + @Override + protected boolean canBeClosed() { + boolean result = super.canBeClosed(); + if ( result ) { + result = this.isWaitingForNext; + } + return result; + } + + @Override protected void put(final JobHandler event) { try { + this.isWaitingForNext = false; this.queue.put(event); } catch (final InterruptedException e) { // this should never happen @@ -60,10 +77,13 @@ public final class ParallelJobQueue exte @Override protected JobHandler take() { try { + this.isWaitingForNext = true; return this.queue.take(); } catch (final InterruptedException e) { // this should never happen this.ignoreException(e); + } finally { + this.isWaitingForNext = false; } return null; } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1489934&r1=1489933&r2=1489934&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java Wed Jun 5 15:55:39 2013 @@ -66,8 +66,8 @@ public final class TopicRoundRobinJobQue } @Override - protected boolean canBeMarkedForRemoval() { - boolean result = super.canBeMarkedForRemoval(); + protected boolean canBeClosed() { + boolean result = super.canBeClosed(); if ( result ) { result = this.isWaitingForNext; } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java?rev=1489934&r1=1489933&r2=1489934&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java Wed Jun 5 15:55:39 2013 @@ -188,7 +188,7 @@ public class StatisticsImpl implements S } /** - * New job in the qeue + * New job in the queue */ public synchronized void incQueued() { this.queuedJobs++;