Return-Path: Delivered-To: apmail-incubator-sling-commits-archive@locus.apache.org Received: (qmail 67791 invoked from network); 29 Aug 2008 12:31:55 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 29 Aug 2008 12:31:55 -0000 Received: (qmail 83555 invoked by uid 500); 29 Aug 2008 12:31:54 -0000 Delivered-To: apmail-incubator-sling-commits-archive@incubator.apache.org Received: (qmail 83524 invoked by uid 500); 29 Aug 2008 12:31:53 -0000 Mailing-List: contact sling-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: sling-dev@incubator.apache.org Delivered-To: mailing list sling-commits@incubator.apache.org Received: (qmail 83515 invoked by uid 99); 29 Aug 2008 12:31:53 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Aug 2008 05:31:53 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Aug 2008 12:31:04 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id CAE202388988; Fri, 29 Aug 2008 05:31:34 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r690208 - /incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Date: Fri, 29 Aug 2008 12:31:34 -0000 To: sling-commits@incubator.apache.org From: cziegeler@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080829123134.CAE202388988@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cziegeler Date: Fri Aug 29 05:31:34 2008 New Revision: 690208 URL: http://svn.apache.org/viewvc?rev=690208&view=rev Log: Fix synchronization for own queues. Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=690208&r1=690207&r2=690208&view=diff ============================================================================== --- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original) +++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Fri Aug 29 05:31:34 2008 @@ -328,7 +328,7 @@ true); // give the system some time to start try { - Thread.sleep(1000 * 60 * 2); // 2min + Thread.sleep(1000 * 60 * 1); // 1min } catch (InterruptedException e) { this.ignoreException(e); } @@ -414,66 +414,68 @@ } if ( info != null && this.running ) { - final EventInfo processInfo = info; - info = null; - if ( this.executeJob(processInfo, jobQueue) ) { - EventInfo newInfo = null; - try { - newInfo = jobQueue.waitForFinish(); - } catch (InterruptedException e) { - this.ignoreException(e); - } - // if we have an info, this is a reschedule - if ( newInfo != null ) { - final EventInfo newEventInfo = newInfo; - final Event job = newInfo.event; - - // is this an ordered queue? - final boolean orderedQueue = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null; - - if ( orderedQueue ) { - // we just sleep for the delay time - if none, we continue and retry - // this job again - if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) { - final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY); - try { - Thread.sleep(delay); - } catch (InterruptedException e) { - this.ignoreException(e); - } - } - info = newInfo; - } else { - // delay rescheduling? - if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) { - final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY); - final Date fireDate = new Date(); - fireDate.setTime(System.currentTimeMillis() + delay); - - final Runnable t = new Runnable() { - public void run() { - try { - jobQueue.put(newEventInfo); - } catch (InterruptedException e) { - // this should never happen - ignoreException(e); - } + synchronized ( jobQueue.getLock()) { + final EventInfo processInfo = info; + info = null; + if ( this.executeJob(processInfo, jobQueue) ) { + EventInfo newInfo = null; + try { + newInfo = jobQueue.waitForFinish(); + } catch (InterruptedException e) { + this.ignoreException(e); + } + // if we have an info, this is a reschedule + if ( newInfo != null ) { + final EventInfo newEventInfo = newInfo; + final Event job = newInfo.event; + + // is this an ordered queue? + final boolean orderedQueue = job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null; + + if ( orderedQueue ) { + // we just sleep for the delay time - if none, we continue and retry + // this job again + if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) { + final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY); + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + this.ignoreException(e); } - }; - try { - this.scheduler.fireJobAt(null, t, null, fireDate); - } catch (Exception e) { - // we ignore the exception and just put back the job in the queue - ignoreException(e); - t.run(); } + info = newInfo; } else { - // put directly into queue - try { - jobQueue.put(newInfo); - } catch (InterruptedException e) { - // this should never happen - this.ignoreException(e); + // delay rescheduling? + if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) { + final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY); + final Date fireDate = new Date(); + fireDate.setTime(System.currentTimeMillis() + delay); + + final Runnable t = new Runnable() { + public void run() { + try { + jobQueue.put(newEventInfo); + } catch (InterruptedException e) { + // this should never happen + ignoreException(e); + } + } + }; + try { + this.scheduler.fireJobAt(null, t, null, fireDate); + } catch (Exception e) { + // we ignore the exception and just put back the job in the queue + ignoreException(e); + t.run(); + } + } else { + // put directly into queue + try { + jobQueue.put(newInfo); + } catch (InterruptedException e) { + // this should never happen + this.ignoreException(e); + } } } } @@ -1052,7 +1054,9 @@ synchronized ( this.jobQueues ) { jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME)); } - jobQueue.notifyFinish(info); + synchronized ( jobQueue.getLock()) { + jobQueue.notifyFinish(info); + } } else { // delay rescheduling? @@ -1097,7 +1101,9 @@ synchronized ( this.jobQueues ) { jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME)); } - jobQueue.notifyFinish(null); + synchronized ( jobQueue.getLock()) { + jobQueue.notifyFinish(null); + } } } if ( !shouldReschedule ) { @@ -1295,9 +1301,7 @@ private final Object lock = new Object(); public EventInfo waitForFinish() throws InterruptedException { - synchronized ( this.lock ) { - this.lock.wait(); - } + this.lock.wait(); final EventInfo object = this.eventInfo; this.eventInfo = null; return object; @@ -1305,9 +1309,11 @@ public void notifyFinish(EventInfo i) { this.eventInfo = i; - synchronized ( this.lock ) { - this.lock.notify(); - } + this.lock.notify(); + } + + public Object getLock() { + return lock; } } }