incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r919431 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event: EventUtil.java impl/JobEventHandler.java impl/job/JobBlockingQueue.java
Date Fri, 05 Mar 2010 14:40:46 GMT
Author: cziegeler
Date: Fri Mar  5 14:40:45 2010
New Revision: 919431

URL: http://svn.apache.org/viewvc?rev=919431&view=rev
Log:
SLING-1426 : Spurious wakeups are not handled correctly

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=919431&r1=919430&r2=919431&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
Fri Mar  5 14:40:45 2010
@@ -280,7 +280,7 @@
     public static boolean rescheduleJob(Event job) {
         final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
         if ( ctx != null ) {
-           return ctx.notifier.finishedJob(job, ctx.eventNodePath, true);
+            return ctx.notifier.finishedJob(job, ctx.eventNodePath, true);
         }
         return false;
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=919431&r1=919430&r2=919431&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Fri Mar  5 14:40:45 2010
@@ -383,7 +383,6 @@
                 Session s = null;
                 try {
                     s = this.createSession();
-                    final Node parentNode = (Node)s.getItem(this.repositoryPath);
                     logger.debug("Executing query {}", queryString);
                     final Query q = s.getWorkspace().getQueryManager().createQuery(queryString,
Query.XPATH);
                     final NodeIterator iter = q.execute().getNodes();
@@ -393,7 +392,7 @@
                         eventNode.remove();
                         count++;
                     }
-                    parentNode.save();
+                    s.save();
                     logger.debug("Removed {} entries from the repository.", count);
 
                 } catch (RepositoryException e) {
@@ -816,6 +815,8 @@
                                     EventUtil.toString(info.event), this.maximumParallelJobs);
                         }
                         try {
+                            // we don't check in a loop here, if this is a spurious wakeup
+                            // we just process anyway
                             this.backgroundLock.wait();
                         } catch (InterruptedException e) {
                             this.ignoreException(e);
@@ -1223,7 +1224,7 @@
     }
 
     /**
-     * @see org.apache.sling.event.EventUtil.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event,
java.lang.String)
+     * @see org.apache.sling.event.impl.job.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event,
java.lang.String)
      */
     public boolean sendAcknowledge(Event job, String eventNodePath) {
         synchronized ( this.processingEventsList ) {
@@ -1237,23 +1238,15 @@
 
     }
 
-    /**
-     * This is a notification from the component which processed the job.
-     *
-     * @see org.apache.sling.event.EventUtil.JobStatusNotifier#finishedJob(org.osgi.service.event.Event,
String, boolean)
-     */
-    public boolean finishedJob(Event job, String eventNodePath, boolean shouldReschedule)
{
-        if ( this.logger.isDebugEnabled() ) {
-            this.logger.debug("Received finish for job {}, shouldReschedule={}", EventUtil.toString(job),
shouldReschedule);
-        }
-        // let's remove the event from our processing list
-        // this is just a sanity check, as usually the job should have been
-        // removed during sendAcknowledge.
-        synchronized ( this.processingEventsList ) {
-            this.processingEventsList.remove(eventNodePath);
-        }
+    public static final class RescheduleInfo {
+        public Event job;
+        public boolean reschedule;
+    }
 
-        boolean reschedule = shouldReschedule;
+    private RescheduleInfo handleReschedule(final Event job, final boolean shouldReschedule)
{
+        final RescheduleInfo info = new RescheduleInfo();
+        info.job = job;
+        info.reschedule = shouldReschedule;
         if ( shouldReschedule ) {
             // check if we exceeded the number of retries
             int retries = this.maxJobRetries;
@@ -1266,14 +1259,14 @@
             }
             retryCount++;
             if ( retries != -1 && retryCount > retries ) {
-                reschedule = false;
+                info.reschedule = false;
             }
-            if ( reschedule ) {
+            if ( info.reschedule ) {
                 // update event with retry count and retries
                 final Dictionary<String, Object> newProperties = new EventPropertiesMap(job);
                 newProperties.put(EventUtil.PROPERTY_JOB_RETRY_COUNT, retryCount);
                 newProperties.put(EventUtil.PROPERTY_JOB_RETRIES, retries);
-                job = new Event(job.getTopic(), newProperties);
+                info.job = new Event(job.getTopic(), newProperties);
                 if ( this.logger.isDebugEnabled() ) {
                     this.logger.debug("Failed job {}", EventUtil.toString(job));
                 }
@@ -1290,115 +1283,127 @@
             }
             this.sendNotification(EventUtil.TOPIC_JOB_FINISHED, job);
         }
-        final ParallelInfo parInfo = ParallelInfo.getParallelInfo(job);
-        EventInfo putback = null;
-        // we have to use the same session for unlocking that we used for locking!
+        return info;
+    }
+
+    /**
+     * This is a notification from the component which processed the job.
+     *
+     * @see org.apache.sling.event.impl.job.JobStatusNotifier#finishedJob(org.osgi.service.event.Event,
String, boolean)
+     */
+    public boolean finishedJob(Event job, final String eventNodePath, final boolean shouldReschedule)
{
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Received finish for job {}, shouldReschedule={}", EventUtil.toString(job),
shouldReschedule);
+        }
+        // let's remove the event from our processing list
+        // this is just a sanity check, as usually the job should have been
+        // removed during sendAcknowledge.
+        synchronized ( this.processingEventsList ) {
+            this.processingEventsList.remove(eventNodePath);
+        }
+
+        // handle the reschedule, a new job might be returned with updated reschedule info!
+        final RescheduleInfo rescheduleInfo = this.handleReschedule(job, shouldReschedule);
+        job = rescheduleInfo.job;
+        final boolean reschedule = rescheduleInfo.reschedule;
+
+        // if this is set after the synchronized block we have an error
+        Boolean errorOccured = null;
         synchronized ( this.backgroundLock ) {
-            // we might get here asnyc while this service has already been shutdown!
-            if ( this.backgroundSession == null ) {
-                checkForNotify(job, null);
-                // we can only return false here
-                return false;
+            // get the parallel info and unlock
+            final ParallelInfo parInfo = ParallelInfo.getParallelInfo(job);
+            final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+            this.unlockState(parInfo, jobTopic);
+
+            if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) == null ) {
+                this.parallelJobCount--;
+                this.backgroundLock.notify();
             }
+            // we have to use the same session for unlocking that we used for locking!
             try {
-                this.backgroundSession.refresh(false);
-                // check if the job has been cancelled
-                if ( !this.backgroundSession.itemExists(eventNodePath) ) {
-                    checkForNotify(job, null);
-                    return true;
-                }
-                final Node eventNode = (Node) this.backgroundSession.getItem(eventNodePath);
-                boolean unlock = true;
-                try {
-                    if ( !reschedule ) {
-                        synchronized ( this.deletedJobs ) {
-                            this.deletedJobs.add(eventNodePath);
-                        }
-                        // unlock node
-                        try {
-                            eventNode.unlock();
-                        } catch (RepositoryException e) {
-                            // if unlock fails, we silently ignore this
-                            this.ignoreException(e);
-                        }
-                        unlock = false;
-                        final String jobId = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
-                        if ( jobId == null ) {
-                            // remove node from repository if no job is set
-                            final Node parentNode = eventNode.getParent();
-                            eventNode.remove();
-                            parentNode.save();
-                        } else {
-                            eventNode.setProperty(EventHelper.NODE_PROPERTY_FINISHED, Calendar.getInstance());
-                            eventNode.save();
-                        }
-                    }
-                } catch (RepositoryException re) {
-                    // if an exception occurs, we just log
-                    this.logger.error("Exception during job finishing.", re);
-                } finally {
-                    final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
-                    this.unlockState(parInfo, jobTopic);
-
-                    if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) == null ) {
-                        this.parallelJobCount--;
-                        this.backgroundLock.notify();
-                    }
-
-                    if ( unlock ) {
-                        synchronized ( this.deletedJobs ) {
-                            this.deletedJobs.add(eventNodePath);
-                        }
-                        // unlock node
-                        try {
-                            eventNode.unlock();
-                        } catch (RepositoryException e) {
-                            // if unlock fails, we silently ignore this
-                            this.ignoreException(e);
-                        }
+                // we might get here asnyc while this service has already been shutdown!
+                if ( this.backgroundSession == null ) {
+                    // we can only return false here
+                    errorOccured = false;
+                } else {
+                    this.backgroundSession.refresh(false);
+                    // check if the job has been cancelled
+                    if ( !this.backgroundSession.itemExists(eventNodePath) ) {
+                        errorOccured = true;
                     }
                 }
-                if ( reschedule ) {
-                    // update retry count and retries in the repository
-                    try {
-                        eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRIES, (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRIES));
-                        eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT, (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT));
-                        eventNode.save();
-                    } catch (RepositoryException re) {
-                        // if an exception occurs, we just log
-                        this.logger.error("Exception during job updating job rescheduling
information.", re);
+                if ( errorOccured == null ) {
+                    synchronized ( this.deletedJobs ) {
+                        this.deletedJobs.add(eventNodePath);
                     }
-                    final EventInfo info = new EventInfo();
+                    final Node eventNode = (Node) this.backgroundSession.getItem(eventNodePath);
+                    // unlock node
                     try {
-                        info.event = job;
-                        info.nodePath = eventNode.getPath();
+                        eventNode.unlock();
                     } catch (RepositoryException e) {
-                        // this should never happen
+                        // if unlock fails, we silently ignore this
                         this.ignoreException(e);
                     }
-                    // if this is an own job queue, we simply signal the queue to continue
-                    // it will pick up the event and either reschedule or wait
-                    if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
-                        checkForNotify(job, info);
+                    // update status in repository
+                    if ( !reschedule ) {
+                        try {
+                            final String jobId = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
+                            if ( jobId == null ) {
+                                // remove node from repository if no job id is set
+                                eventNode.remove();
+                            } else {
+                                // set finished date - if job id is set
+                                eventNode.setProperty(EventHelper.NODE_PROPERTY_FINISHED,
Calendar.getInstance());
+                            }
+                            this.backgroundSession.save();
+                        } catch (RepositoryException re) {
+                            // if an exception occurs, we just log
+                            this.logger.error("Exception during finished job update.", re);
+                        }
                     } else {
-                        putback = info;
+                        // update retry count and retries in the repository
+                        try {
+                            eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRIES, (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRIES));
+                            eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT, (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT));
+                            this.backgroundSession.save();
+                        } catch (RepositoryException re) {
+                            // if an exception occurs, we just log
+                            this.logger.error("Exception during job updating job rescheduling
information.", re);
+                        }
                     }
-                } else {
-                    // if this is an own job queue, we simply signal the queue to continue
-                    // it will pick up the event and continue with the next event
-                    checkForNotify(job, null);
                 }
             } catch (RepositoryException re) {
-                this.logger.error("Unable to create new session.", re);
-                return false;
+                this.logger.error("Unable to access repository to check job node.", re);
+                errorOccured = false;
             }
         }
-        if ( putback != null ) {
-            this.putBackIntoMainQueue(putback, false);
+        // check for error
+        if ( errorOccured != null ) {
+            checkForNotify(job, null);
+            return errorOccured;
+        }
+        // reschedule
+        if ( reschedule ) {
+            final EventInfo putback = new EventInfo();
+            putback.event = job;
+            putback.nodePath = eventNodePath;
+            // if this is an own job queue, we simply signal the queue to continue
+            // it will pick up the event and either reschedule or wait
+            if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+                checkForNotify(job, putback);
+            } else {
+                this.putBackIntoMainQueue(putback, false);
+            }
+        } else {
+            // if this is an own job queue, we simply signal the queue to continue
+            // it will pick up the event and continue with the next event
+            checkForNotify(job, null);
         }
+        // if we shouldn't reschedule - we always return true as everything went fine
         if ( !shouldReschedule ) {
             return true;
         }
+        // if we should reschedule, we return the reschedule status
         return reschedule;
     }
 
@@ -1641,9 +1646,8 @@
                 try {
                     if ( this.writerSession.itemExists(jobId) ) {
                         final Item item = this.writerSession.getItem(jobId);
-                        final Node parentNode = item.getParent();
                         item.remove();
-                        parentNode.save();
+                        this.writerSession.save();
                     }
                 } catch (RepositoryException e) {
                     this.logger.error("Error during cancelling job at " + jobId, e);

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java?rev=919431&r1=919430&r2=919431&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
Fri Mar  5 14:40:45 2010
@@ -75,9 +75,10 @@
         this.isWaiting = true;
         this.markForCleanUp = false;
         this.logger.debug("Job queue {} is waiting for finish.", this.queueName);
-        this.lock.wait();
+        while ( this.isWaiting ) {
+            this.lock.wait();
+        }
         this.logger.debug("Job queue {} is continuing.", this.queueName);
-        this.isWaiting = false;
         final EventInfo object = this.eventInfo;
         this.eventInfo = null;
         return object;
@@ -109,9 +110,10 @@
             this.isWaiting = true;
             this.markForCleanUp = false;
             this.logger.debug("Job queue {} is processing {} job - waiting for a free slot.",
this.queueName, jobCount);
-            this.lock.wait();
+            while ( this.isWaiting ) {
+                this.lock.wait();
+            }
             this.logger.debug("Job queue {} is continuing.", this.queueName);
-            this.isWaiting = false;
         }
         jobCount++;
     }
@@ -123,6 +125,7 @@
         jobCount--;
         if ( this.isWaiting ) {
             this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
+            this.isWaiting = false;
             this.lock.notify();
         }
     }
@@ -140,6 +143,7 @@
     public void notifyFinish(EventInfo i) {
         this.eventInfo = i;
         this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
+        this.isWaiting = false;
         this.lock.notify();
     }
 



Mime
View raw message