Return-Path: Delivered-To: apmail-incubator-sling-commits-archive@minotaur.apache.org Received: (qmail 75305 invoked from network); 6 Apr 2009 15:16:16 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 6 Apr 2009 15:16:16 -0000 Received: (qmail 63626 invoked by uid 500); 6 Apr 2009 15:16:15 -0000 Delivered-To: apmail-incubator-sling-commits-archive@incubator.apache.org Received: (qmail 63568 invoked by uid 500); 6 Apr 2009 15:16:15 -0000 Mailing-List: contact sling-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: sling-dev@incubator.apache.org Delivered-To: mailing list sling-commits@incubator.apache.org Received: (qmail 63559 invoked by uid 99); 6 Apr 2009 15:16:15 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Apr 2009 15:16:15 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Apr 2009 15:16:07 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id AB8C02388A50; Mon, 6 Apr 2009 15:15:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r762385 - in /incubator/sling/trunk/bundles/extensions/event/src/main: java/org/apache/sling/event/impl/JobEventHandler.java resources/OSGI-INF/metatype/metatype.properties Date: Mon, 06 Apr 2009 15:15:45 -0000 To: sling-commits@incubator.apache.org From: cziegeler@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090406151546.AB8C02388A50@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cziegeler Date: Mon Apr 6 15:15:42 2009 New Revision: 762385 URL: http://svn.apache.org/viewvc?rev=762385&view=rev Log: Add new configuration property to handle maximum number of parallel processes for the main queue. Optimize acknowledge handling. Modified: incubator/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java incubator/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties Modified: incubator/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java URL: http://svn.apache.org/viewvc/incubator/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=762385&r1=762384&r2=762385&view=diff ============================================================================== --- incubator/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original) +++ incubator/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Mon Apr 6 15:15:42 2009 @@ -98,6 +98,12 @@ /** Default number of seconds to wait for an ack. */ private static final long DEFAULT_WAIT_FOR_ACK = 90; // by default we wait 90 secs + /** @scr.property valueRef="DEFAULT_MAXIMUM_PARALLEL_JOBS" */ + private static final String CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS = "max.parallel.jobs"; + + /** Default nubmer of parallel jobs. */ + private static final long DEFAULT_MAXIMUM_PARALLEL_JOBS = 15; + /** @scr.property valueRef="DEFAULT_WAIT_FOR_ACK" */ private static final String CONFIG_PROPERTY_WAIT_FOR_ACK = "wait.for.ack"; @@ -110,6 +116,9 @@ /** How long do we wait for an ack (in ms) */ private long waitForAckMs; + /** Maximum parallel running jobs for a single queue. */ + private long maximumParallelJobs; + /** Background session. */ private Session backgroundSession; @@ -145,6 +154,9 @@ /** Sync lock */ private final Object backgroundLock = new Object(); + /** Number of parallel jobs for the main queue. */ + private long parallelJobCount; + /** * Activate this component. * @param context @@ -158,6 +170,7 @@ this.sleepTime = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_SLEEP_TIME), DEFAULT_SLEEP_TIME); this.maxJobRetries = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAX_JOB_RETRIES), DEFAULT_MAX_JOB_RETRIES); this.waitForAckMs = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_WAIT_FOR_ACK), DEFAULT_WAIT_FOR_ACK) * 1000; + this.maximumParallelJobs = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS), DEFAULT_MAXIMUM_PARALLEL_JOBS); this.componentContext = context; super.activate(context); JOB_THREAD_POOL = this.threadPool; @@ -240,17 +253,9 @@ final Map.Entry entry = i.next(); if ( entry.getValue().started <= tooOld ) { restartJobs.add(entry.getValue()); - i.remove(); } } } - final Iterator jobIter = restartJobs.iterator(); - while ( jobIter.hasNext() ) { - final StartedJobInfo info = jobIter.next(); - this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", info.event, info.nodePath); - this.finishedJob(info.event, info.nodePath, true); - } - // remove obsolete jobs from the repository if ( this.cleanupPeriod > 0 ) { this.logger.debug("Cleaning up repository, removing all finished jobs older than {} minutes.", this.cleanupPeriod); @@ -282,6 +287,29 @@ } } } + // restart jobs is now a list of potential candidates, we now have to check + // each candidate separately again! + if ( restartJobs.size() > 0 ) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // we just ignore this + e.printStackTrace(); + } + } + final Iterator jobIter = restartJobs.iterator(); + while ( jobIter.hasNext() ) { + final StartedJobInfo info = jobIter.next(); + boolean process = false; + synchronized ( this.processingEventsList ) { + process = this.processingEventsList.remove(info.nodePath) != null; + } + if ( process ) { + this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", info.event, info.nodePath); + this.finishedJob(info.event, info.nodePath, true); + } + } + // check for idle threads synchronized ( this.jobQueues ) { final Iterator> i = this.jobQueues.entrySet().iterator(); @@ -600,6 +628,7 @@ */ private boolean executeJob(final EventInfo info, final BlockingQueue jobQueue) { boolean putback = false; + boolean wait = false; synchronized (this.backgroundLock) { try { this.backgroundSession.refresh(false); @@ -624,6 +653,12 @@ } } + } else { + // check number of parallel jobs for main queue + if ( jobQueue == null && this.parallelJobCount >= this.maximumParallelJobs ) { + process = false; + wait = true; + } } if ( process ) { boolean unlock = true; @@ -639,7 +674,7 @@ } if ( process ) { unlock = false; - this.processJob(info.event, eventNode); + this.processJob(info.event, eventNode, jobQueue == null); return true; } } @@ -671,6 +706,17 @@ } } + // if this is the main queue and we have reached the max number of parallel jobs + // we wait a little bit before continuing + if ( wait ) { + try { + Thread.sleep(sleepTime * 1000); + } catch (InterruptedException ie) { + // ignore + ignoreException(ie); + } + } + // if we have to put back the job, we do it now if ( putback ) { final EventInfo eInfo = info; final Date fireDate = new Date(); @@ -823,13 +869,17 @@ * Process a job and unlock the node in the repository. * @param event The original event. * @param eventNode The node in the repository where the job is stored. + * @param isMainQueue Is this the main queue? */ - private void processJob(Event event, Node eventNode) { + private void processJob(Event event, Node eventNode, boolean isMainQueue) { final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null || event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null; final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC); boolean unlock = true; try { + if ( isMainQueue && !parallelProcessing ) { + this.parallelJobCount++; + } final String nodePath = eventNode.getPath(); final Event jobEvent = this.getJobEvent(event, nodePath); eventNode.setProperty(EventHelper.NODE_PROPERTY_PROCESSOR, this.applicationId); @@ -856,6 +906,9 @@ this.logger.error("Exception during job processing.", re); } finally { if ( unlock ) { + if ( isMainQueue && !parallelProcessing ) { + this.parallelJobCount--; + } if ( !parallelProcessing ) { synchronized ( this.processingMap ) { this.processingMap.put(jobTopic, Boolean.FALSE); @@ -1130,6 +1183,10 @@ synchronized ( this.processingMap ) { this.processingMap.put(jobTopic, Boolean.FALSE); } + } else { + if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) == null ) { + this.parallelJobCount--; + } } if ( unlock ) { synchronized ( this.deletedJobs ) { Modified: incubator/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties URL: http://svn.apache.org/viewvc/incubator/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=762385&r1=762384&r2=762385&view=diff ============================================================================== --- incubator/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties (original) +++ incubator/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties Mon Apr 6 15:15:42 2009 @@ -70,6 +70,11 @@ receive such a message in the configured time, it reschedules the job. The configured \ time is in seconds (default is 90 secs). +max.parallel.jobs.name = Maximum Parallel Jobs +max.parallel.jobs.description = The maximum number of parallel jobs started for the main \ + queue. + + # # Event Pool event.pool.name = Apache Sling Event Thread Pool