incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r690208 - /incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Date Fri, 29 Aug 2008 12:31:34 GMT
Author: cziegeler
Date: Fri Aug 29 05:31:34 2008
New Revision: 690208

URL: http://svn.apache.org/viewvc?rev=690208&view=rev
Log:
Fix synchronization for own queues.

Modified:
    incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java

Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=690208&r1=690207&r2=690208&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Fri Aug 29 05:31:34 2008
@@ -328,7 +328,7 @@
                                   true);
         // give the system some time to start
         try {
-            Thread.sleep(1000 * 60 * 2); // 2min
+            Thread.sleep(1000 * 60 * 1); // 1min
         } catch (InterruptedException e) {
             this.ignoreException(e);
         }
@@ -414,66 +414,68 @@
             }
 
             if ( info != null && this.running ) {
-                final EventInfo processInfo = info;
-                info = null;
-                if ( this.executeJob(processInfo, jobQueue) ) {
-                    EventInfo newInfo = null;
-                    try {
-                        newInfo = jobQueue.waitForFinish();
-                    } catch (InterruptedException e) {
-                        this.ignoreException(e);
-                    }
-                    // if we have an info, this is a reschedule
-                    if ( newInfo != null ) {
-                        final EventInfo newEventInfo = newInfo;
-                        final Event job = newInfo.event;
-
-                        // is this an ordered queue?
-                        final boolean orderedQueue = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED)
!= null;
-
-                        if ( orderedQueue ) {
-                            // we just sleep for the delay time - if none, we continue and
retry
-                            // this job again
-                            if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null
) {
-                                final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
-                                try {
-                                    Thread.sleep(delay);
-                                } catch (InterruptedException e) {
-                                    this.ignoreException(e);
-                                }
-                            }
-                            info = newInfo;
-                        } else {
-                            // delay rescheduling?
-                            if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null
) {
-                                final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
-                                final Date fireDate = new Date();
-                                fireDate.setTime(System.currentTimeMillis() + delay);
-
-                                final Runnable t = new Runnable() {
-                                    public void run() {
-                                        try {
-                                            jobQueue.put(newEventInfo);
-                                        } catch (InterruptedException e) {
-                                            // this should never happen
-                                            ignoreException(e);
-                                        }
+                synchronized ( jobQueue.getLock()) {
+                    final EventInfo processInfo = info;
+                    info = null;
+                    if ( this.executeJob(processInfo, jobQueue) ) {
+                        EventInfo newInfo = null;
+                        try {
+                            newInfo = jobQueue.waitForFinish();
+                        } catch (InterruptedException e) {
+                            this.ignoreException(e);
+                        }
+                        // if we have an info, this is a reschedule
+                        if ( newInfo != null ) {
+                            final EventInfo newEventInfo = newInfo;
+                            final Event job = newInfo.event;
+
+                            // is this an ordered queue?
+                            final boolean orderedQueue = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED)
!= null;
+
+                            if ( orderedQueue ) {
+                                // we just sleep for the delay time - if none, we continue
and retry
+                                // this job again
+                                if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY)
!= null ) {
+                                    final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+                                    try {
+                                        Thread.sleep(delay);
+                                    } catch (InterruptedException e) {
+                                        this.ignoreException(e);
                                     }
-                                };
-                                try {
-                                    this.scheduler.fireJobAt(null, t, null, fireDate);
-                                } catch (Exception e) {
-                                    // we ignore the exception and just put back the job
in the queue
-                                    ignoreException(e);
-                                    t.run();
                                 }
+                                info = newInfo;
                             } else {
-                                // put directly into queue
-                                try {
-                                    jobQueue.put(newInfo);
-                                } catch (InterruptedException e) {
-                                    // this should never happen
-                                    this.ignoreException(e);
+                                // delay rescheduling?
+                                if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY)
!= null ) {
+                                    final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+                                    final Date fireDate = new Date();
+                                    fireDate.setTime(System.currentTimeMillis() + delay);
+
+                                    final Runnable t = new Runnable() {
+                                        public void run() {
+                                            try {
+                                                jobQueue.put(newEventInfo);
+                                            } catch (InterruptedException e) {
+                                                // this should never happen
+                                                ignoreException(e);
+                                            }
+                                        }
+                                    };
+                                    try {
+                                        this.scheduler.fireJobAt(null, t, null, fireDate);
+                                    } catch (Exception e) {
+                                        // we ignore the exception and just put back the
job in the queue
+                                        ignoreException(e);
+                                        t.run();
+                                    }
+                                } else {
+                                    // put directly into queue
+                                    try {
+                                        jobQueue.put(newInfo);
+                                    } catch (InterruptedException e) {
+                                        // this should never happen
+                                        this.ignoreException(e);
+                                    }
                                 }
                             }
                         }
@@ -1052,7 +1054,9 @@
                         synchronized ( this.jobQueues ) {
                             jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME));
                         }
-                        jobQueue.notifyFinish(info);
+                        synchronized ( jobQueue.getLock()) {
+                            jobQueue.notifyFinish(info);
+                        }
                     } else {
 
                         // delay rescheduling?
@@ -1097,7 +1101,9 @@
                         synchronized ( this.jobQueues ) {
                             jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME));
                         }
-                        jobQueue.notifyFinish(null);
+                        synchronized ( jobQueue.getLock()) {
+                            jobQueue.notifyFinish(null);
+                        }
                     }
                 }
                 if ( !shouldReschedule ) {
@@ -1295,9 +1301,7 @@
         private final Object lock = new Object();
 
         public EventInfo waitForFinish() throws InterruptedException {
-            synchronized ( this.lock ) {
-                this.lock.wait();
-            }
+            this.lock.wait();
             final EventInfo object = this.eventInfo;
             this.eventInfo = null;
             return object;
@@ -1305,9 +1309,11 @@
 
         public void notifyFinish(EventInfo i) {
             this.eventInfo = i;
-            synchronized ( this.lock ) {
-                this.lock.notify();
-            }
+            this.lock.notify();
+        }
+
+        public Object getLock() {
+            return lock;
         }
     }
 }



Mime
View raw message