incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r1492779 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues: AbstractJobQueue.java OrderedJobQueue.java ParallelJobQueue.java TopicRoundRobinJobQueue.java
Date Thu, 13 Jun 2013 17:48:04 GMT
Author: cziegeler
Date: Thu Jun 13 17:48:04 2013
New Revision: 1492779

URL: http://svn.apache.org/r1492779
Log:
SLING-2906 :  Queue might be outdated and closed while still processing 

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1492779&r1=1492778&r2=1492779&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
Thu Jun 13 17:48:04 2013
@@ -103,6 +103,11 @@ public abstract class AbstractJobQueue
     /** Flag for outdated. */
     private final AtomicBoolean isOutdated = new AtomicBoolean(false);
 
+    /** Marker flag if the queue is waiting for another element (= empty) */
+    protected boolean isWaitingForNext = false;
+
+    private final AtomicBoolean closeMarker = new AtomicBoolean(false);
+
     /**
      * Start this queue
      * @param name The queue name
@@ -129,6 +134,7 @@ public abstract class AbstractJobQueue
         synchronized ( this.suspendLock ) {
             return "isWaiting=" + this.isWaiting +
                     ", suspendedSince=" + this.suspendedSince +
+                    ", isWaitingForNext=" + this.isWaitingForNext +
                     ", asyncJobs=" + this.asyncCounter.get();
         }
     }
@@ -197,14 +203,17 @@ public abstract class AbstractJobQueue
         this.resume();
         // check if possible
         if ( this.canBeClosed() ) {
-            this.close();
-            return true;
+            if ( this.closeMarker.get() ) {
+                this.close();
+                return true;
+            }
+            this.closeMarker.set(true);
         }
         return false;
     }
 
-    protected boolean canBeClosed() {
-        return this.isEmpty() && !this.isWaiting && !this.isSuspended() &&
this.asyncCounter.get() == 0;
+    private boolean canBeClosed() {
+        return this.isEmpty() && !this.isWaiting && !this.isSuspended() &&
this.asyncCounter.get() == 0 && this.isWaitingForNext;
     }
 
     /**
@@ -426,6 +435,7 @@ public abstract class AbstractJobQueue
      * Add a new job to the queue.
      */
     public void process(final JobHandler event) {
+        this.closeMarker.set(false);
         this.put(event);
         event.queued = System.currentTimeMillis();
         this.incQueued();

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1492779&r1=1492778&r2=1492779&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
Thu Jun 13 17:48:04 2013
@@ -46,8 +46,6 @@ public final class OrderedJobQueue exten
     /** Lock and status object for handling the sleep phase. */
     private final SleepLock sleepLock = new SleepLock();
 
-    private boolean isWaitingForNext = false;
-
     /** The queue - we use a set which is sorted by job creation date. */
     private final Set<JobHandler> queue = new TreeSet<JobHandler>(new Comparator<JobHandler>()
{
 
@@ -77,16 +75,7 @@ public final class OrderedJobQueue exten
 
     @Override
     public String getStateInfo() {
-        return super.getStateInfo() + ", isSleepingUntil=" + this.sleepLock.sleepingSince
+ ", isWaitingForNext=" + this.isWaitingForNext;
-    }
-
-    @Override
-    protected boolean canBeClosed() {
-        boolean result = super.canBeClosed();
-        if ( result ) {
-            result = this.isWaitingForNext;
-        }
-        return result;
+        return super.getStateInfo() + ", isSleepingUntil=" + this.sleepLock.sleepingSince;
     }
 
     @Override

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1492779&r1=1492778&r2=1492779&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
Thu Jun 13 17:48:04 2013
@@ -39,8 +39,6 @@ public final class ParallelJobQueue exte
     /** The queue. */
     private final BlockingQueue<JobHandler> queue = new LinkedBlockingQueue<JobHandler>();
 
-    private boolean isWaitingForNext = false;
-
     public ParallelJobQueue(final String name,
                            final InternalQueueConfiguration config,
                            final JobConsumerManager jobConsumerManager,
@@ -50,20 +48,6 @@ public final class ParallelJobQueue exte
     }
 
     @Override
-    public String getStateInfo() {
-        return super.getStateInfo() + ", isWaitingForNext=" + this.isWaitingForNext;
-    }
-
-    @Override
-    protected boolean canBeClosed() {
-        boolean result = super.canBeClosed();
-        if ( result ) {
-            result = this.isWaitingForNext;
-        }
-        return result;
-    }
-
-    @Override
     protected void put(final JobHandler event) {
         try {
             this.isWaitingForNext = false;

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1492779&r1=1492778&r2=1492779&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
Thu Jun 13 17:48:04 2013
@@ -50,8 +50,6 @@ public final class TopicRoundRobinJobQue
     /** Event count. */
     private int eventCount;
 
-    private boolean isWaitingForNext = false;
-
     public TopicRoundRobinJobQueue(final String name,
                            final InternalQueueConfiguration config,
                            final JobConsumerManager jobConsumerManager,
@@ -62,16 +60,7 @@ public final class TopicRoundRobinJobQue
 
     @Override
     public String getStateInfo() {
-        return super.getStateInfo() + ", eventCount=" + this.eventCount + ", isWaitingForNext="
+ this.isWaitingForNext;
-    }
-
-    @Override
-    protected boolean canBeClosed() {
-        boolean result = super.canBeClosed();
-        if ( result ) {
-            result = this.isWaitingForNext;
-        }
-        return result;
+        return super.getStateInfo() + ", eventCount=" + this.eventCount;
     }
 
     @Override



Mime
View raw message