Return-Path: X-Original-To: apmail-sling-commits-archive@www.apache.org Delivered-To: apmail-sling-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 31E29108C4 for ; Thu, 1 Aug 2013 17:00:42 +0000 (UTC) Received: (qmail 97328 invoked by uid 500); 1 Aug 2013 17:00:42 -0000 Delivered-To: apmail-sling-commits-archive@sling.apache.org Received: (qmail 97259 invoked by uid 500); 1 Aug 2013 17:00:38 -0000 Mailing-List: contact commits-help@sling.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@sling.apache.org Delivered-To: mailing list commits@sling.apache.org Received: (qmail 97247 invoked by uid 99); 1 Aug 2013 17:00:37 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Aug 2013 17:00:37 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Aug 2013 17:00:35 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 989532388A9B; Thu, 1 Aug 2013 17:00:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1509329 - in /sling/trunk/bundles/commons/scheduler: pom.xml src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java Date: Thu, 01 Aug 2013 17:00:15 -0000 To: commits@sling.apache.org From: cziegeler@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130801170015.989532388A9B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cziegeler Date: Thu Aug 1 17:00:15 2013 New Revision: 1509329 URL: http://svn.apache.org/r1509329 Log: SLING-2990 : race condition in scheduler could cause tasks to be lost Added: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java (with props) Modified: sling/trunk/bundles/commons/scheduler/pom.xml sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java Modified: sling/trunk/bundles/commons/scheduler/pom.xml URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/pom.xml?rev=1509329&r1=1509328&r2=1509329&view=diff ============================================================================== --- sling/trunk/bundles/commons/scheduler/pom.xml (original) +++ sling/trunk/bundles/commons/scheduler/pom.xml Thu Aug 1 17:00:15 2013 @@ -135,6 +135,12 @@ provided + org.apache.sling + org.apache.sling.settings + 1.0.0 + provided + + org.quartz-scheduler quartz 2.2.0 Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java?rev=1509329&r1=1509328&r2=1509329&view=diff ============================================================================== --- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java (original) +++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java Thu Aug 1 17:00:15 2013 @@ -17,31 +17,26 @@ package org.apache.sling.commons.scheduler.impl; import java.io.Serializable; -import java.util.ArrayList; import java.util.Date; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.UUID; +import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Property; import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.ReferencePolicy; -import org.apache.felix.scr.annotations.References; import org.apache.felix.scr.annotations.Service; import org.apache.sling.commons.scheduler.Job; import org.apache.sling.commons.scheduler.ScheduleOptions; import org.apache.sling.commons.scheduler.Scheduler; import org.apache.sling.commons.threads.ThreadPool; import org.apache.sling.commons.threads.ThreadPoolManager; -import org.apache.sling.discovery.DiscoveryService; -import org.osgi.framework.Constants; -import org.osgi.framework.ServiceReference; +import org.apache.sling.settings.SlingSettingsService; +import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceRegistration; -import org.osgi.service.component.ComponentContext; import org.quartz.CronScheduleBuilder; import org.quartz.JobBuilder; import org.quartz.JobDataMap; @@ -61,12 +56,8 @@ import org.slf4j.LoggerFactory; * The quartz based implementation of the scheduler. * */ -@Component(immediate=true, metatype=true,label="%scheduler.name",description="%scheduler.description") +@Component(metatype=true,label="%scheduler.name",description="%scheduler.description") @Service(value=Scheduler.class) -@References({ - @Reference(name="job", referenceInterface=Job.class, cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE, policy=ReferencePolicy.DYNAMIC), - @Reference(name="task", referenceInterface=Runnable.class, cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE, policy=ReferencePolicy.DYNAMIC) -}) public class QuartzScheduler implements Scheduler { /** Default logger. */ @@ -94,12 +85,6 @@ public class QuartzScheduler implements /** The quartz scheduler. */ private volatile org.quartz.Scheduler scheduler; - /** List of registrations while this service is not activated yet. */ - private final List registeredJobs = new ArrayList(); - - /** The component context. */ - private volatile ComponentContext context; - @Reference private ThreadPoolManager threadPoolManager; @@ -112,55 +97,39 @@ public class QuartzScheduler implements private static final String PROPERTY_POOL_NAME = "poolName"; @Reference - private DiscoveryService discoveryService; + private SlingSettingsService settings; /** * Activate this component. * Start the scheduler. - * @param ctx The component context. * @throws Exception */ - protected void activate(final ComponentContext ctx) throws Exception { - final Object poolNameObj = ctx.getProperties().get(PROPERTY_POOL_NAME); + @Activate + protected void activate(final BundleContext ctx, final Map props) throws Exception { + final Object poolNameObj = props.get(PROPERTY_POOL_NAME); final String poolName; if ( poolNameObj != null && poolNameObj.toString().trim().length() > 0 ) { poolName = poolNameObj.toString().trim(); } else { poolName = null; } - this.context = ctx; + // start scheduler this.scheduler = this.init(poolName); - - final Registration[] regs; - synchronized ( this.registeredJobs ) { - regs = this.registeredJobs.toArray(new Registration[this.registeredJobs.size()]); - this.registeredJobs.clear(); - } - for( final Registration reg : regs ) { - try { - this.register(reg.componentName, reg.reference); - } catch (Exception e) { - // we don't want that one malicious service brings down the scheduler, so we just log - // the exception and continue - this.logger.error("Exception during registering " + reg.componentName + " service " + reg.reference, e); - } - } - this.plugin = WebConsolePrinter.initPlugin(ctx.getBundleContext(), this); + this.plugin = WebConsolePrinter.initPlugin(ctx, this); } /** * Deactivate this component. * Stop the scheduler. - * @param ctx The component context. */ - protected void deactivate(final ComponentContext ctx) { + @Deactivate + protected void deactivate() { WebConsolePrinter.destroyPlugin(this.plugin); this.plugin = null; final org.quartz.Scheduler s = this.scheduler; this.scheduler = null; this.dispose(s); - this.context = null; } /** @@ -398,193 +367,11 @@ public class QuartzScheduler implements } } - /** - * Create unique identifier - * @param type - * @param ref - * @throws Exception - */ - private String getServiceIdentifier(final ServiceReference ref) { - String name = (String)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_NAME); - if ( name == null ) { - name = (String)ref.getProperty(Constants.SERVICE_PID); - if ( name == null ) { - name = "Registered Service"; - } - } - // now append service id to create a unique identifier - name = name + "." + ref.getProperty(Constants.SERVICE_ID); - return name; - } - - /** - * Register a job or task - * @param type The type (job or task) - * @param ref The service reference - */ - private void register(final String type, final ServiceReference ref) { - // we called from bind, it might be that deactivate has been - // called in the meantime - final ComponentContext ctx = this.context; - if ( ctx != null ) { - final Object job = ctx.locateService(type, ref); - if ( job != null ) { - try { - final String name = getServiceIdentifier(ref); - final Boolean concurrent = (Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_CONCURRENT); - final Object runOn = ref.getProperty(Scheduler.PROPERTY_SCHEDULER_RUN_ON); - String[] runOnOpts = null; - if ( runOn instanceof String ) { - runOnOpts = new String[] {runOn.toString()}; - } else if ( runOn instanceof String[] ) { - runOnOpts = (String[])runOn; - } else { - this.logger.warn("Property {} ignored for scheduler {}", Scheduler.PROPERTY_SCHEDULER_RUN_ON, ref); - } - final String expression = (String)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_EXPRESSION); - if ( expression != null ) { - this.scheduleJob(job, this.EXPR(expression) - .name(name) - .canRunConcurrently((concurrent != null ? concurrent : true)) - .onInstancesOnly(runOnOpts)); - } else { - final Long period = (Long)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_PERIOD); - if ( period != null ) { - if ( period < 1 ) { - this.logger.debug("Ignoring service {} : scheduler period is less than 1.", ref); - } else { - boolean immediate = false; - if ( ref.getProperty(Scheduler.PROPERTY_SCHEDULER_IMMEDIATE) != null ) { - immediate = (Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_IMMEDIATE); - } - this.scheduleJob(job, this.PERIODIC(period, immediate) - .name(name) - .canRunConcurrently((concurrent != null ? concurrent : true)) - .onInstancesOnly(runOnOpts)); - } - } else { - this.logger.debug("Ignoring servce {} : no scheduling property found.", ref); - } - } - } catch (final IllegalStateException e) { - // this can happen if deactivate has been called or the scheduling expression is invalid - this.logger.warn("Ignoring servce " + ref + " : exception occurred during registering.", e); - } catch (final SchedulerException e) { - // this can happen if deactivate has been called or the scheduling expression is invalid - this.logger.warn("Ignoring servce " + ref + " : exception occurred during registering.", e); - } - } - } - } - - /** - * Unregister a service. - * @param ref The service reference. - */ - private void unregister(final ServiceReference ref) { - try { - final String name = getServiceIdentifier(ref); - this.removeJob(name); - } catch (NoSuchElementException nsee) { - // we ignore this - } - } - - /** - * Bind a new job. - * @param ref - * @throws Exception - */ - protected void bindJob(final ServiceReference ref) { - if ( this.scheduler != null ) { - this.register(Registration.JOB, ref); - } else { - synchronized ( this.registeredJobs ) { - this.registeredJobs.add(new Registration(ref, Registration.JOB)); - } - } - } - - /** - * Unbind a job. - * @param ref - */ - protected void unbindJob(final ServiceReference ref) { - if ( this.scheduler != null ) { - this.unregister(ref); - } else { - synchronized ( this.registeredJobs ) { - this.registeredJobs.remove(new Registration(ref, Registration.JOB)); - } - } - } - - /** - * Bind a new task. - * @param ref - * @throws Exception - */ - protected void bindTask(final ServiceReference ref) { - if ( this.scheduler != null ) { - this.register(Registration.TASK, ref); - } else { - synchronized ( this.registeredJobs ) { - this.registeredJobs.add(new Registration(ref, Registration.TASK)); - } - } - } - - /** - * Unbind a task. - * @param ref - */ - protected void unbindTask(final ServiceReference ref) { - if ( this.scheduler != null ) { - this.unregister(ref); - } else { - synchronized ( this.registeredJobs ) { - this.registeredJobs.remove(new Registration(ref, Registration.TASK)); - } - } - } - + /** Used by the web console plugin. */ org.quartz.Scheduler getScheduler() { return this.scheduler; } - /** - * Helper class holding a registration if this service is not active yet. - */ - private static final class Registration { - public static final String JOB = "job"; - public static final String TASK = "task"; - - public final ServiceReference reference; - public final String componentName; - - public Registration(final ServiceReference r, final String name) { - this.reference = r; - this.componentName = name; - } - - @Override - public boolean equals(Object obj) { - if ( !(obj instanceof Registration) ) { - return false; - } - if ( obj == this ) { - return true; - } - return this.reference.equals(((Registration)obj).reference); - } - - @Override - public int hashCode() { - return this.reference.hashCode(); - } - } - - public static final class QuartzThreadPool implements org.quartz.spi.ThreadPool { /** Our executor thread pool */ @@ -815,7 +602,7 @@ public class QuartzScheduler implements } else if ( opts.runOn.length == 1 && Scheduler.VALUE_RUN_ON_SINGLE.equals(opts.runOn[0])) { schedule = true; } else { // sling IDs - final String myId = this.discoveryService.getTopology().getLocalInstance().getSlingId(); + final String myId = this.settings.getSlingId(); for(final String id : opts.runOn ) { if ( myId.equals(id) ) { schedule = true; Added: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java?rev=1509329&view=auto ============================================================================== --- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java (added) +++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java Thu Aug 1 17:00:15 2013 @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sling.commons.scheduler.impl; + +import java.util.Date; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.sling.commons.scheduler.Job; +import org.apache.sling.commons.scheduler.Scheduler; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Constants; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceReference; +import org.osgi.util.tracker.ServiceTracker; +import org.osgi.util.tracker.ServiceTrackerCustomizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The quartz based implementation of the scheduler. + * + */ +@Component(immediate=true) +public class WhiteboardHandler { + + /** Default logger. */ + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Reference + private Scheduler scheduler; + + private ServiceTracker serviceTracker; + + /** + * Activate this component. + * @throws InvalidSyntaxException + */ + @Activate + protected void activate(final BundleContext btx) throws InvalidSyntaxException { + this.serviceTracker = new ServiceTracker(btx, + btx.createFilter("(|(" + Constants.OBJECTCLASS + "=" + Runnable.class.getName() + ")" + + "(" + Constants.OBJECTCLASS + "=" + Job.class.getName() + "))"), + new ServiceTrackerCustomizer() { + + public synchronized void removedService(final ServiceReference reference, final Object service) { + btx.ungetService(reference); + unregister(reference, service); + } + + public synchronized void modifiedService(final ServiceReference reference, final Object service) { + unregister(reference, service); + register(reference, service); + } + + public synchronized Object addingService(final ServiceReference reference) { + final Object obj = btx.getService(reference); + if ( obj != null ) { + register(reference, obj); + } + return obj; + } + }); + this.serviceTracker.open(); + } + + /** + * Deactivate this component. + * Stop the scheduler. + * @param ctx The component context. + */ + @Deactivate + protected void deactivate() { + if ( this.serviceTracker != null ) { + this.serviceTracker.close(); + this.serviceTracker = null; + } + } + + + /** + * Create unique identifier + * @param type + * @param ref + * @throws Exception + */ + private String getServiceIdentifier(final ServiceReference ref) { + String name = (String)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_NAME); + if ( name == null ) { + name = (String)ref.getProperty(Constants.SERVICE_PID); + if ( name == null ) { + name = "Registered Service"; + } + } + // now append service id to create a unique identifier + name = name + "." + ref.getProperty(Constants.SERVICE_ID); + return name; + } + + /** + * Register a job or task + * @param type The type (job or task) + * @param ref The service reference + */ + private void register(final ServiceReference ref, final Object job) { + final String name = getServiceIdentifier(ref); + final Boolean concurrent = (Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_CONCURRENT); + final Object runOn = ref.getProperty(Scheduler.PROPERTY_SCHEDULER_RUN_ON); + String[] runOnOpts = null; + if ( runOn instanceof String ) { + runOnOpts = new String[] {runOn.toString()}; + } else if ( runOn instanceof String[] ) { + runOnOpts = (String[])runOn; + } else { + this.logger.warn("Property {} ignored for scheduler {}", Scheduler.PROPERTY_SCHEDULER_RUN_ON, ref); + } + final String expression = (String)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_EXPRESSION); + if ( expression != null ) { + this.scheduler.schedule(job, this.scheduler.EXPR(expression) + .name(name) + .canRunConcurrently((concurrent != null ? concurrent : true)) + .onInstancesOnly(runOnOpts)); + } else { + final Long period = (Long)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_PERIOD); + if ( period != null ) { + if ( period < 1 ) { + this.logger.debug("Ignoring service {} : scheduler period is less than 1.", ref); + } else { + boolean immediate = false; + if ( ref.getProperty(Scheduler.PROPERTY_SCHEDULER_IMMEDIATE) != null ) { + immediate = (Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_IMMEDIATE); + } + final Date date = new Date(); + if ( !immediate ) { + date.setTime(System.currentTimeMillis() + period * 1000); + } + this.scheduler.schedule(job, this.scheduler.AT(date, -1, period) + .name(name) + .canRunConcurrently((concurrent != null ? concurrent : true)) + .onInstancesOnly(runOnOpts)); + } + } else { + this.logger.debug("Ignoring servce {} : no scheduling property found.", ref); + } + } + } + + /** + * Unregister a service. + * @param ref The service reference. + */ + private void unregister(final ServiceReference reference, final Object service) { + final String name = getServiceIdentifier(reference); + this.scheduler.unschedule(name); + } +} Propchange: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url