sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r908544 - in /sling/trunk/bundles/extensions/event: ./ src/main/java/org/apache/sling/event/ src/main/java/org/apache/sling/event/impl/ src/main/java/org/apache/sling/event/impl/job/ src/main/resources/META-INF/
Date Wed, 10 Feb 2010 15:38:15 GMT
Author: cziegeler
Date: Wed Feb 10 15:38:14 2010
New Revision: 908544

URL: http://svn.apache.org/viewvc?rev=908544&view=rev
Log:
SLING-1365 : Limit the number of parallel jobs
First implementation for job queues.

Modified:
    sling/trunk/bundles/extensions/event/NOTICE
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java
    sling/trunk/bundles/extensions/event/src/main/resources/META-INF/NOTICE

Modified: sling/trunk/bundles/extensions/event/NOTICE
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/NOTICE?rev=908544&r1=908543&r2=908544&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/NOTICE (original)
+++ sling/trunk/bundles/extensions/event/NOTICE Wed Feb 10 15:38:14 2010
@@ -1,5 +1,5 @@
 Apache Sling Event
-Copyright 2008-2009 The Apache Software Foundation
+Copyright 2008-2010 The Apache Software Foundation
 
 Apache Sling is based on source code originally developed 
 by Day Software (http://www.day.com/).

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=908544&r1=908543&r2=908544&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
Wed Feb 10 15:38:14 2010
@@ -72,8 +72,12 @@
     public static final String PROPERTY_JOB_ID = "event.job.id";
 
     /** The property to set if a job can be run parallel to any other job.
-     * For now the property should only contain the values <code>true</code>
-     * or <code>false</code> as a string value.
+     * The following values are supported:
+     * - boolean value <code>true</code> and <code>false</code>
+     * - string value <code>true</code> and <code>false</code>
+     * - integer value higher than 1 - if this is specified jobs are run in
+     * parallel but never more than the specified number.
+     *
      * We might want to use different values in the future for enhanced
      * parallel job handling. */
     public static final String PROPERTY_JOB_PARALLEL = "event.job.parallel";
@@ -93,13 +97,18 @@
     /** The property to set to put the jobs into a separate job queue. This property
      * specifies the name of the job queue. If the job queue does not exists yet
      * a new queue is created.
-     * If a job queue is used, the jobs are never executed in parallel from this queue!
+     * If a ordered job queue is used, the jobs are never executed in parallel
+     * from this queue! For non ordered queues the {@link #PROPERTY_JOB_PARALLEL}
+     * with an integer value higher than 1 can be used to specify the maximum number
+     * of parallel jobs for this queue.
      */
     public static final String PROPERTY_JOB_QUEUE_NAME = "event.job.queuename";
 
     /** If this property is set with any value, the queue processes the jobs in the same
      * order as they have arrived.
-     * This property has only an effect if {@link #PROPERTY_JOB_QUEUE_NAME} is specified.
+     * This property has only an effect if {@link #PROPERTY_JOB_QUEUE_NAME} is specified
+     * and starting with version 2.2 this value is only checked in the first job for this
+     * queue.
      */
     public static final String PROPERTY_JOB_QUEUE_ORDERED = "event.job.queueordered";
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=908544&r1=908543&r2=908544&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Wed Feb 10 15:38:14 2010
@@ -599,7 +599,7 @@
                                  */
                                 public void run() {
                                     while ( running && !jq.isFinished() ) {
-                                        logger.info("Starting job queue {}", queueName);
+                                        logger.info("Starting {}job queue {}", (orderedQueue
? "ordered " : ""), queueName);
                                         try {
                                             runJobQueue(queueName, jq);
                                         } catch (Throwable t) {
@@ -650,18 +650,36 @@
             }
 
             if ( info != null && this.running && !jobQueue.isFinished() )
{
-                synchronized ( jobQueue.getLock()) {
-                    final EventInfo processInfo = info;
-                    info = null;
-                    final Status status = this.executeJob(processInfo, jobQueue);
-                    if ( status == Status.SUCCESS ) {
+                final EventInfo processInfo = info;
+                info = null;
+                if ( jobQueue.isOrdered() ) {
+                    // if we are ordered we simply wait for the finish
+                    synchronized ( jobQueue.getLock()) {
+                        final Status status = this.executeJob(processInfo, jobQueue);
+                        if ( status == Status.SUCCESS ) {
+                            try {
+                                info = jobQueue.waitForFinish();
+                            } catch (InterruptedException e) {
+                                this.ignoreException(e);
+                            }
+                        } else if ( status == Status.RESCHEDULE ) {
+                            info = jobQueue.reschedule(processInfo, this.scheduler);
+                        }
+                    }
+                } else {
+                    final int maxJobs = ParallelInfo.getMaxNumberOfParallelJobs(processInfo.event);
+                    synchronized ( jobQueue.getLock() ) {
                         try {
-                            info = jobQueue.waitForFinish();
+                            jobQueue.acquireSlot(maxJobs);
                         } catch (InterruptedException e) {
                             this.ignoreException(e);
                         }
-                    } else if ( status == Status.RESCHEDULE ) {
-                        info = jobQueue.reschedule(processInfo, this.scheduler);
+                    }
+                    if ( this.running && !jobQueue.isFinished() ) {
+                        final Status status = this.executeJob(processInfo, jobQueue);
+                        if ( status == Status.RESCHEDULE ) {
+                            jobQueue.reschedule(processInfo, this.scheduler);
+                        }
                     }
                 }
             }
@@ -1432,7 +1450,11 @@
                 if ( info != null ) {
                     reprocessInfo = jobQueue.reschedule(info, this.scheduler);
                 }
-                jobQueue.notifyFinish(reprocessInfo);
+                if ( jobQueue.isOrdered() ) {
+                    jobQueue.notifyFinish(reprocessInfo);
+                } else {
+                    jobQueue.freeSlot();
+                }
             }
         }
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java?rev=908544&r1=908543&r2=908544&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
Wed Feb 10 15:38:14 2010
@@ -57,6 +57,8 @@
     /** Ordered Queue? */
     private final boolean orderedQueue;
 
+    private volatile int jobCount;
+
     public JobBlockingQueue(final String name,
                             final boolean orderedQueue,
                             final Logger logger) {
@@ -65,6 +67,10 @@
         this.logger = logger;
     }
 
+    /**
+     * Wait for the job to be finished.
+     * This is called if the queue is ordered.
+     */
     public EventInfo waitForFinish() throws InterruptedException {
         this.isWaiting = true;
         this.markForCleanUp = false;
@@ -75,21 +81,56 @@
         return object;
     }
 
+    /**
+     * Mark this queue for cleanup.
+     */
     public void markForCleanUp() {
         if ( !this.isWaiting ) {
             this.markForCleanUp = true;
         }
     }
 
+    /**
+     * Acquire a processing slot.
+     * This method is called if the queue is not ordered.
+     * @param maxJobs
+     */
+    public void acquireSlot(final int maxJobs) throws InterruptedException {
+        if ( jobCount >= maxJobs ) {
+            this.isWaiting = true;
+            this.markForCleanUp = false;
+            this.lock.wait();
+            this.isWaiting = false;
+        }
+    }
+
+    /**
+     * Free a slot when a job processing is finished.
+     */
+    public void freeSlot() {
+        if ( this.isWaiting ) {
+            this.lock.notify();
+        }
+    }
+
+    /**
+     * Check if this queue is marked for cleanup
+     */
     public boolean isMarkedForCleanUp() {
         return !this.isWaiting && this.markForCleanUp;
     }
 
+    /**
+     * Notify a finished job - for ordered queues
+     */
     public void notifyFinish(EventInfo i) {
         this.eventInfo = i;
         this.lock.notify();
     }
 
+    /**
+     * Return the lock for this queue.
+     */
     public Object getLock() {
         return lock;
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java?rev=908544&r1=908543&r2=908544&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java
Wed Feb 10 15:38:14 2010
@@ -92,4 +92,27 @@
         }
         return ParallelInfo.SERIAL;
     }
+
+    /**
+     * Return the maximum number of parallel jobs for named queues.
+     * @param job The job
+     */
+    public static int getMaxNumberOfParallelJobs(final Event job) {
+        Object value = job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL);
+        if ( value instanceof String ) {
+            // try to parse the value
+            try {
+                value = Integer.valueOf((String)value);
+            } catch (NumberFormatException ne) {
+                // we ignore this
+            }
+        }
+        if ( value instanceof Number ) {
+            final int result = ((Number)value).intValue();
+            if ( result > 1 ) {
+                return result;
+            }
+        }
+        return 1;
+    }
 }
\ No newline at end of file

Modified: sling/trunk/bundles/extensions/event/src/main/resources/META-INF/NOTICE
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/META-INF/NOTICE?rev=908544&r1=908543&r2=908544&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/META-INF/NOTICE (original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/META-INF/NOTICE Wed Feb 10 15:38:14
2010
@@ -1,5 +1,5 @@
 Apache Sling Event
-Copyright 2008-2009 The Apache Software Foundation
+Copyright 2008-2010 The Apache Software Foundation
 
 Apache Sling is based on source code originally developed 
 by Day Software (http://www.day.com/).



Mime
View raw message