incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r1045225 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs: ./ jcr/ queues/
Date Mon, 13 Dec 2010 16:37:03 GMT
Author: cziegeler
Date: Mon Dec 13 16:37:02 2010
New Revision: 1045225

URL: http://svn.apache.org/viewvc?rev=1045225&view=rev
Log:
Several fixes: fix timeout handling and clear statistics.
Refactoring: rename *cleanUp to 

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java?rev=1045225&r1=1045224&r2=1045225&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
Mon Dec 13 16:37:02 2010
@@ -211,15 +211,17 @@ public class DefaultJobManager
      * is idle for two consecutive clean up calls, it is removed.
      * @see java.lang.Runnable#run()
      */
-    public void cleanup() {
+    private void cleanup() {
         // check for idle queue
         // we synchronize to avoid creating a queue which is about to be removed during cleanup
         synchronized ( queuesLock ) {
             final Iterator<Map.Entry<String, AbstractJobQueue>> i = this.queues.entrySet().iterator();
             while ( i.hasNext() ) {
                 final Map.Entry<String, AbstractJobQueue> current = i.next();
+                // clean up
                 final AbstractJobQueue jbq = current.getValue();
-                if ( jbq.isMarkedForCleanUp() ) {
+                jbq.cleanUp();
+                if ( jbq.isMarkedForRemoval() ) {
                     // close
                     jbq.close();
                     // copy statistics
@@ -228,7 +230,7 @@ public class DefaultJobManager
                     i.remove();
                 } else {
                     // mark to be removed during next cycle
-                    jbq.markForCleanUp();
+                    jbq.markForRemoval();
                 }
             }
         }
@@ -701,8 +703,8 @@ public class DefaultJobManager
         // remove the queue with the old name
         this.queues.remove(queue.getName());
         // check if we can close or have to rename
-        queue.markForCleanUp();
-        if ( queue.isMarkedForCleanUp() ) {
+        queue.markForRemoval();
+        if ( queue.isMarkedForRemoval() ) {
             // close
             queue.close();
             // copy statistics

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java?rev=1045225&r1=1045224&r2=1045225&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
Mon Dec 13 16:37:02 2010
@@ -273,5 +273,7 @@ public class StatisticsImpl implements S
         this.finishedJobs = 0;
         this.failedJobs = 0;
         this.cancelledJobs = 0;
+        this.activeJobs = 0;
+        this.queuedJobs = 0;
     }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1045225&r1=1045224&r2=1045225&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
Mon Dec 13 16:37:02 2010
@@ -924,8 +924,10 @@ public class PersistenceHandler implemen
         synchronized ( this.backgroundLock ) {
             if ( this.running ) {
                 try {
-                    final Node eventNode = (Node) this.backgroundSession.getItem(path);
-                    this.tryToLoadJob(eventNode, this.unloadedJobs);
+                    if ( this.backgroundSession.itemExists(path) ) {
+                        final Node eventNode = (Node) this.backgroundSession.getItem(path);
+                        this.tryToLoadJob(eventNode, this.unloadedJobs);
+                    }
                 } catch (RepositoryException re) {
                     this.ignoreException(re);
                 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1045225&r1=1045224&r2=1045225&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
Mon Dec 13 16:37:02 2010
@@ -72,8 +72,8 @@ public abstract class AbstractJobQueue
     /** Are we still running? */
     protected volatile boolean running;
 
-    /** Are we marked for cleanup */
-    private volatile boolean markedForCleanUp = false;
+    /** Are we marked for removal */
+    private volatile boolean markedForRemoval = false;
 
     /** Is the queue currently waiting(sleeping) */
     protected volatile boolean isWaiting = false;
@@ -110,7 +110,7 @@ public abstract class AbstractJobQueue
      * @see org.apache.sling.event.jobs.Queue#getStateInfo()
      */
     public String getStateInfo() {
-        return "isWaiting=" + this.isWaiting + ", markedForCleanUp=" + this.markedForCleanUp
+ ", suspendedSince=" + this.suspendedSince.longValue();
+        return "isWaiting=" + this.isWaiting + ", markedForRemoval=" + this.markedForRemoval
+ ", suspendedSince=" + this.suspendedSince.longValue();
     }
 
     /**
@@ -180,7 +180,7 @@ public abstract class AbstractJobQueue
     /**
      * Periodically cleanup.
      */
-    public void cleanup() {
+    public void cleanUp() {
         if ( this.running ) {
             // check for jobs that were started but never got an aknowledge
             final long tooOld = System.currentTimeMillis() - DEFAULT_WAIT_FOR_ACK_IN_MS;
@@ -215,8 +215,13 @@ public abstract class AbstractJobQueue
                     process = this.startedJobsLists.remove(info.uniqueId) != null;
                 }
                 if ( process ) {
-                    this.logger.info("No acknowledge received for job {} stored at {}. Requeueing
job.", EventUtil.toString(info.event), info.uniqueId);
-                    this.finishedJob(info.event, true);
+                    this.decQueued();
+                    if ( !info.reschedule() ) {
+                        checkForNotify(null);
+                    } else {
+                        this.logger.info("No acknowledge received for job {} stored at {}.
Requeueing job.", EventUtil.toString(info.event), info.uniqueId);
+                        checkForNotify(info);
+                    }
                 }
             }
         }
@@ -357,27 +362,27 @@ public abstract class AbstractJobQueue
         notifyFinished(reprocessInfo);
     }
 
-    protected boolean canBeMarkedForCleanUp() {
+    protected boolean canBeMarkedForRemoval() {
         return this.isEmpty() && !this.isWaiting;
     }
     /**
-     * Mark this queue for cleanup.
+     * Mark this queue for removal.
      */
-    public void markForCleanUp() {
-        if ( this.canBeMarkedForCleanUp() ) {
-            this.markedForCleanUp = true;
+    public void markForRemoval() {
+        if ( this.canBeMarkedForRemoval() ) {
+            this.markedForRemoval = true;
         }
     }
 
     /**
-     * Check if this queue is marked for cleanup
+     * Check if this queue is marked for removal
      */
-    public boolean isMarkedForCleanUp() {
-        if ( this.markedForCleanUp ) {
-            if ( this.canBeMarkedForCleanUp() ) {
+    public boolean isMarkedForRemoval() {
+        if ( this.markedForRemoval ) {
+            if ( this.canBeMarkedForRemoval() ) {
                 return true;
             }
-            this.markedForCleanUp = false;
+            this.markedForRemoval = false;
         }
         return false;
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java?rev=1045225&r1=1045224&r2=1045225&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
Mon Dec 13 16:37:02 2010
@@ -99,8 +99,8 @@ public abstract class AbstractParallelJo
     }
 
     @Override
-    protected boolean canBeMarkedForCleanUp() {
-        boolean result = super.canBeMarkedForCleanUp();
+    protected boolean canBeMarkedForRemoval() {
+        boolean result = super.canBeMarkedForRemoval();
         if ( result ) {
             result = this.jobCount == 0;
         }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1045225&r1=1045224&r2=1045225&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
Mon Dec 13 16:37:02 2010
@@ -65,8 +65,8 @@ public final class TopicRoundRobinJobQue
     }
 
     @Override
-    protected boolean canBeMarkedForCleanUp() {
-        boolean result = super.canBeMarkedForCleanUp();
+    protected boolean canBeMarkedForRemoval() {
+        boolean result = super.canBeMarkedForRemoval();
         if ( result ) {
             result = !this.isWaitingForNext;
         }



Mime
View raw message