incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r762385 - in /incubator/sling/trunk/bundles/extensions/event/src/main: java/org/apache/sling/event/impl/JobEventHandler.java resources/OSGI-INF/metatype/metatype.properties
Date Mon, 06 Apr 2009 15:15:45 GMT
Author: cziegeler
Date: Mon Apr  6 15:15:42 2009
New Revision: 762385

URL: http://svn.apache.org/viewvc?rev=762385&view=rev
Log:
Add new configuration property to handle maximum number of parallel processes for the main
queue. Optimize acknowledge handling.

Modified:
    incubator/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    incubator/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties

Modified: incubator/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=762385&r1=762384&r2=762385&view=diff
==============================================================================
--- incubator/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++ incubator/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Mon Apr  6 15:15:42 2009
@@ -98,6 +98,12 @@
     /** Default number of seconds to wait for an ack. */
     private static final long DEFAULT_WAIT_FOR_ACK = 90; // by default we wait 90 secs
 
+    /** @scr.property valueRef="DEFAULT_MAXIMUM_PARALLEL_JOBS" */
+    private static final String CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS = "max.parallel.jobs";
+
+    /** Default nubmer of parallel jobs. */
+    private static final long DEFAULT_MAXIMUM_PARALLEL_JOBS = 15;
+
     /** @scr.property valueRef="DEFAULT_WAIT_FOR_ACK" */
     private static final String CONFIG_PROPERTY_WAIT_FOR_ACK = "wait.for.ack";
 
@@ -110,6 +116,9 @@
     /** How long do we wait for an ack (in ms) */
     private long waitForAckMs;
 
+    /** Maximum parallel running jobs for a single queue. */
+    private long maximumParallelJobs;
+
     /** Background session. */
     private Session backgroundSession;
 
@@ -145,6 +154,9 @@
     /** Sync lock */
     private final Object backgroundLock = new Object();
 
+    /** Number of parallel jobs for the main queue. */
+    private long parallelJobCount;
+
     /**
      * Activate this component.
      * @param context
@@ -158,6 +170,7 @@
         this.sleepTime = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_SLEEP_TIME), DEFAULT_SLEEP_TIME);
         this.maxJobRetries = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAX_JOB_RETRIES),
DEFAULT_MAX_JOB_RETRIES);
         this.waitForAckMs = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_WAIT_FOR_ACK), DEFAULT_WAIT_FOR_ACK)
* 1000;
+        this.maximumParallelJobs = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS),
DEFAULT_MAXIMUM_PARALLEL_JOBS);
         this.componentContext = context;
         super.activate(context);
         JOB_THREAD_POOL = this.threadPool;
@@ -240,17 +253,9 @@
                     final Map.Entry<String, StartedJobInfo> entry = i.next();
                     if ( entry.getValue().started <= tooOld ) {
                         restartJobs.add(entry.getValue());
-                        i.remove();
                     }
                 }
             }
-            final Iterator<StartedJobInfo> jobIter = restartJobs.iterator();
-            while ( jobIter.hasNext() ) {
-                final StartedJobInfo info = jobIter.next();
-                this.logger.info("No acknowledge received for job {} stored at {}. Requeueing
job.", info.event, info.nodePath);
-                this.finishedJob(info.event, info.nodePath, true);
-            }
-
             // remove obsolete jobs from the repository
             if ( this.cleanupPeriod > 0 ) {
                 this.logger.debug("Cleaning up repository, removing all finished jobs older
than {} minutes.", this.cleanupPeriod);
@@ -282,6 +287,29 @@
                     }
                 }
             }
+            // restart jobs is now a list of potential candidates, we now have to check
+            // each candidate separately again!
+            if ( restartJobs.size() > 0 ) {
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException e) {
+                    // we just ignore this
+                    e.printStackTrace();
+                }
+            }
+            final Iterator<StartedJobInfo> jobIter = restartJobs.iterator();
+            while ( jobIter.hasNext() ) {
+                final StartedJobInfo info = jobIter.next();
+                boolean process = false;
+                synchronized ( this.processingEventsList ) {
+                    process = this.processingEventsList.remove(info.nodePath) != null;
+                }
+                if ( process ) {
+                    this.logger.info("No acknowledge received for job {} stored at {}. Requeueing
job.", info.event, info.nodePath);
+                    this.finishedJob(info.event, info.nodePath, true);
+                }
+            }
+
             // check for idle threads
             synchronized ( this.jobQueues ) {
                 final Iterator<Map.Entry<String, JobBlockingQueue>> i = this.jobQueues.entrySet().iterator();
@@ -600,6 +628,7 @@
      */
     private boolean executeJob(final EventInfo info, final BlockingQueue<EventInfo>
jobQueue) {
         boolean putback = false;
+        boolean wait = false;
         synchronized (this.backgroundLock) {
             try {
                 this.backgroundSession.refresh(false);
@@ -624,6 +653,12 @@
                             }
                         }
 
+                    } else {
+                        // check number of parallel jobs for main queue
+                        if ( jobQueue == null && this.parallelJobCount >= this.maximumParallelJobs
) {
+                            process = false;
+                            wait = true;
+                        }
                     }
                     if ( process ) {
                         boolean unlock = true;
@@ -639,7 +674,7 @@
                                 }
                                 if ( process ) {
                                     unlock = false;
-                                    this.processJob(info.event, eventNode);
+                                    this.processJob(info.event, eventNode, jobQueue == null);
                                     return true;
                                 }
                             }
@@ -671,6 +706,17 @@
             }
 
         }
+        // if this is the main queue and we have reached the max number of parallel jobs
+        // we wait a little bit before continuing
+        if ( wait ) {
+            try {
+                Thread.sleep(sleepTime * 1000);
+            } catch (InterruptedException ie) {
+                // ignore
+                ignoreException(ie);
+            }
+        }
+        // if we have to put back the job, we do it now
         if ( putback ) {
             final EventInfo eInfo = info;
             final Date fireDate = new Date();
@@ -823,13 +869,17 @@
      * Process a job and unlock the node in the repository.
      * @param event The original event.
      * @param eventNode The node in the repository where the job is stored.
+     * @param isMainQueue Is this the main queue?
      */
-    private void processJob(Event event, Node eventNode)  {
+    private void processJob(Event event, Node eventNode, boolean isMainQueue)  {
         final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME)
!= null
                                            || event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL)
!= null;
         final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
         boolean unlock = true;
         try {
+            if ( isMainQueue && !parallelProcessing ) {
+                this.parallelJobCount++;
+            }
             final String nodePath = eventNode.getPath();
             final Event jobEvent = this.getJobEvent(event, nodePath);
             eventNode.setProperty(EventHelper.NODE_PROPERTY_PROCESSOR, this.applicationId);
@@ -856,6 +906,9 @@
             this.logger.error("Exception during job processing.", re);
         } finally {
             if ( unlock ) {
+                if ( isMainQueue && !parallelProcessing ) {
+                    this.parallelJobCount--;
+                }
                 if ( !parallelProcessing ) {
                     synchronized ( this.processingMap ) {
                         this.processingMap.put(jobTopic, Boolean.FALSE);
@@ -1130,6 +1183,10 @@
                         synchronized ( this.processingMap ) {
                             this.processingMap.put(jobTopic, Boolean.FALSE);
                         }
+                    } else {
+                        if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) == null )
{
+                            this.parallelJobCount--;
+                        }
                     }
                     if ( unlock ) {
                         synchronized ( this.deletedJobs ) {

Modified: incubator/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=762385&r1=762384&r2=762385&view=diff
==============================================================================
--- incubator/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
(original)
+++ incubator/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
Mon Apr  6 15:15:42 2009
@@ -70,6 +70,11 @@
  receive such a message in the configured time, it reschedules the job. The configured \
  time is in seconds (default is 90 secs).
 
+max.parallel.jobs.name = Maximum Parallel Jobs
+max.parallel.jobs.description = The maximum number of parallel jobs started for the main
\
+ queue.
+
+
 #
 # Event Pool
 event.pool.name = Apache Sling Event Thread Pool 



Mime
View raw message