incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r724512 - /incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Date Mon, 08 Dec 2008 22:17:00 GMT
Author: cziegeler
Date: Mon Dec  8 14:16:59 2008
New Revision: 724512

URL: http://svn.apache.org/viewvc?rev=724512&view=rev
Log:
Improve synchronization and thread handling.

Modified:
    incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java

Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=724512&r1=724511&r2=724512&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Mon Dec  8 14:16:59 2008
@@ -172,22 +172,32 @@
         synchronized ( this.jobQueues ) {
             final Iterator<JobBlockingQueue> i = this.jobQueues.values().iterator();
             while ( i.hasNext() ) {
+                final JobBlockingQueue jbq = i.next();
+                // wake up qeue
+                if ( jbq.isWaiting() ) {
+                    synchronized ( jbq.getLock()) {
+                        jbq.notifyFinish(null);
+                    }
+                }
+                // continue queue processing
                 try {
-                    i.next().put(new EventInfo());
+                    jbq.put(new EventInfo());
                 } catch (InterruptedException e) {
                     this.ignoreException(e);
                 }
             }
         }
         if ( this.backgroundSession != null ) {
-            try {
-                this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
-            } catch (RepositoryException e) {
-                // we just ignore it
-                this.logger.warn("Unable to remove event listener.", e);
+            synchronized ( this.backgroundLock ) {
+                try {
+                    this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
+                } catch (RepositoryException e) {
+                    // we just ignore it
+                    this.logger.warn("Unable to remove event listener.", e);
+                }
+                this.backgroundSession.logout();
+                this.backgroundSession = null;
             }
-            this.backgroundSession.logout();
-            this.backgroundSession = null;
         }
         this.componentContext = null;
         JOB_THREAD_POOL = null;
@@ -433,7 +443,13 @@
                                  * @see java.lang.Runnable#run()
                                  */
                                 public void run() {
-                                    runJobQueue(queueName, jq);
+                                    while ( running ) {
+                                        try {
+                                            runJobQueue(queueName, jq);
+                                        } catch (Throwable t) {
+                                            logger.error("Job queue stopped with exception:
" + t.getMessage() + ". Restarting.", t);
+                                        }
+                                    }
                                 }
 
                             });
@@ -1036,6 +1052,11 @@
         EventInfo putback = null;
         // we have to use the same session for unlocking that we used for locking!
         synchronized ( this.backgroundLock ) {
+            // we might get here asnyc while this service has already been shutdown!
+            if ( this.backgroundSession == null ) {
+                // we can only return false here
+                return false;
+            }
             try {
                 this.backgroundSession.refresh(false);
                 // check if the job has been cancelled
@@ -1137,7 +1158,7 @@
                     }
                 } else {
                     // if this is an own job queue, we simply signal the queue to continue
-                    // it will pick up the event continue with the next event
+                    // it will pick up the event and continue with the next event
                     if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
                         // we know the queue exists
                         final JobBlockingQueue jobQueue;
@@ -1367,8 +1388,12 @@
 
         private final Object lock = new Object();
 
+        private boolean isWaiting = false;
+
         public EventInfo waitForFinish() throws InterruptedException {
+            this.isWaiting = true;
             this.lock.wait();
+            this.isWaiting = false;
             final EventInfo object = this.eventInfo;
             this.eventInfo = null;
             return object;
@@ -1382,6 +1407,10 @@
         public Object getLock() {
             return lock;
         }
+
+        public boolean isWaiting() {
+            return this.isWaiting;
+        }
     }
 
     private static final class StartedJobInfo {



Mime
View raw message