incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r1509329 - in /sling/trunk/bundles/commons/scheduler: pom.xml src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
Date Thu, 01 Aug 2013 17:00:15 GMT
Author: cziegeler
Date: Thu Aug  1 17:00:15 2013
New Revision: 1509329

URL: http://svn.apache.org/r1509329
Log:
SLING-2990 :  race condition in scheduler could cause tasks to be lost 

Added:
    sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
  (with props)
Modified:
    sling/trunk/bundles/commons/scheduler/pom.xml
    sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java

Modified: sling/trunk/bundles/commons/scheduler/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/pom.xml?rev=1509329&r1=1509328&r2=1509329&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/pom.xml (original)
+++ sling/trunk/bundles/commons/scheduler/pom.xml Thu Aug  1 17:00:15 2013
@@ -135,6 +135,12 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.settings</artifactId>
+            <version>1.0.0</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>org.quartz-scheduler</groupId>
             <artifactId>quartz</artifactId>
             <version>2.2.0</version>

Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java?rev=1509329&r1=1509328&r2=1509329&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java
(original)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java
Thu Aug  1 17:00:15 2013
@@ -17,31 +17,26 @@
 package org.apache.sling.commons.scheduler.impl;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Date;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.UUID;
 
+import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
-import org.apache.felix.scr.annotations.References;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.commons.scheduler.Job;
 import org.apache.sling.commons.scheduler.ScheduleOptions;
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.commons.threads.ThreadPool;
 import org.apache.sling.commons.threads.ThreadPoolManager;
-import org.apache.sling.discovery.DiscoveryService;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceReference;
+import org.apache.sling.settings.SlingSettingsService;
+import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.component.ComponentContext;
 import org.quartz.CronScheduleBuilder;
 import org.quartz.JobBuilder;
 import org.quartz.JobDataMap;
@@ -61,12 +56,8 @@ import org.slf4j.LoggerFactory;
  * The quartz based implementation of the scheduler.
  *
  */
-@Component(immediate=true, metatype=true,label="%scheduler.name",description="%scheduler.description")
+@Component(metatype=true,label="%scheduler.name",description="%scheduler.description")
 @Service(value=Scheduler.class)
-@References({
-    @Reference(name="job", referenceInterface=Job.class, cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
policy=ReferencePolicy.DYNAMIC),
-    @Reference(name="task", referenceInterface=Runnable.class, cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
policy=ReferencePolicy.DYNAMIC)
-})
 public class QuartzScheduler implements Scheduler {
 
     /** Default logger. */
@@ -94,12 +85,6 @@ public class QuartzScheduler implements 
     /** The quartz scheduler. */
     private volatile org.quartz.Scheduler scheduler;
 
-    /** List of registrations while this service is not activated yet. */
-    private final List<Registration> registeredJobs = new ArrayList<Registration>();
-
-    /** The component context. */
-    private volatile ComponentContext context;
-
     @Reference
     private ThreadPoolManager threadPoolManager;
 
@@ -112,55 +97,39 @@ public class QuartzScheduler implements 
     private static final String PROPERTY_POOL_NAME = "poolName";
 
     @Reference
-    private DiscoveryService discoveryService;
+    private SlingSettingsService settings;
 
     /**
      * Activate this component.
      * Start the scheduler.
-     * @param ctx The component context.
      * @throws Exception
      */
-    protected void activate(final ComponentContext ctx) throws Exception {
-        final Object poolNameObj = ctx.getProperties().get(PROPERTY_POOL_NAME);
+    @Activate
+    protected void activate(final BundleContext ctx, final Map<String, Object> props)
throws Exception {
+        final Object poolNameObj = props.get(PROPERTY_POOL_NAME);
         final String poolName;
         if ( poolNameObj != null && poolNameObj.toString().trim().length() > 0
) {
             poolName = poolNameObj.toString().trim();
         } else {
             poolName = null;
         }
-        this.context = ctx;
+
         // start scheduler
         this.scheduler = this.init(poolName);
-
-        final Registration[] regs;
-        synchronized ( this.registeredJobs ) {
-            regs = this.registeredJobs.toArray(new Registration[this.registeredJobs.size()]);
-            this.registeredJobs.clear();
-        }
-        for( final Registration reg : regs ) {
-            try {
-                this.register(reg.componentName, reg.reference);
-            } catch (Exception e) {
-                // we don't want that one malicious service brings down the scheduler, so
we just log
-                // the exception and continue
-                this.logger.error("Exception during registering " + reg.componentName + "
service " + reg.reference, e);
-            }
-        }
-        this.plugin = WebConsolePrinter.initPlugin(ctx.getBundleContext(), this);
+        this.plugin = WebConsolePrinter.initPlugin(ctx, this);
     }
 
     /**
      * Deactivate this component.
      * Stop the scheduler.
-     * @param ctx The component context.
      */
-    protected void deactivate(final ComponentContext ctx) {
+    @Deactivate
+    protected void deactivate() {
         WebConsolePrinter.destroyPlugin(this.plugin);
         this.plugin = null;
         final org.quartz.Scheduler s = this.scheduler;
         this.scheduler = null;
         this.dispose(s);
-        this.context = null;
     }
 
     /**
@@ -398,193 +367,11 @@ public class QuartzScheduler implements 
         }
     }
 
-    /**
-     * Create unique identifier
-     * @param type
-     * @param ref
-     * @throws Exception
-     */
-    private String getServiceIdentifier(final ServiceReference ref) {
-        String name = (String)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_NAME);
-        if ( name == null ) {
-            name = (String)ref.getProperty(Constants.SERVICE_PID);
-            if ( name == null ) {
-                name = "Registered Service";
-            }
-        }
-        // now append service id to create a unique identifier
-        name = name + "." + ref.getProperty(Constants.SERVICE_ID);
-        return name;
-    }
-
-    /**
-     * Register a job or task
-     * @param type The type (job or task)
-     * @param ref The service reference
-     */
-    private void register(final String type, final ServiceReference ref) {
-        // we called from bind, it might be that deactivate has been
-        // called in the meantime
-        final ComponentContext ctx = this.context;
-        if ( ctx != null ) {
-            final Object job = ctx.locateService(type, ref);
-            if ( job != null ) {
-                try {
-                    final String name = getServiceIdentifier(ref);
-                    final Boolean concurrent = (Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_CONCURRENT);
-                    final Object runOn = ref.getProperty(Scheduler.PROPERTY_SCHEDULER_RUN_ON);
-                    String[] runOnOpts = null;
-                    if ( runOn instanceof String ) {
-                        runOnOpts = new String[] {runOn.toString()};
-                    } else if ( runOn instanceof String[] ) {
-                        runOnOpts = (String[])runOn;
-                    } else {
-                        this.logger.warn("Property {} ignored for scheduler {}", Scheduler.PROPERTY_SCHEDULER_RUN_ON,
ref);
-                    }
-                    final String expression = (String)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_EXPRESSION);
-                    if ( expression != null ) {
-                        this.scheduleJob(job, this.EXPR(expression)
-                                .name(name)
-                                .canRunConcurrently((concurrent != null ? concurrent : true))
-                                .onInstancesOnly(runOnOpts));
-                    } else {
-                        final Long period = (Long)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_PERIOD);
-                        if ( period != null ) {
-                            if ( period < 1 ) {
-                                this.logger.debug("Ignoring service {} : scheduler period
is less than 1.", ref);
-                            } else {
-                                boolean immediate = false;
-                                if ( ref.getProperty(Scheduler.PROPERTY_SCHEDULER_IMMEDIATE)
!= null ) {
-                                    immediate = (Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_IMMEDIATE);
-                                }
-                                this.scheduleJob(job, this.PERIODIC(period, immediate)
-                                        .name(name)
-                                        .canRunConcurrently((concurrent != null ? concurrent
: true))
-                                        .onInstancesOnly(runOnOpts));
-                            }
-                        } else {
-                            this.logger.debug("Ignoring servce {} : no scheduling property
found.", ref);
-                        }
-                    }
-                } catch (final IllegalStateException e) {
-                    // this can happen if deactivate has been called or the scheduling expression
is invalid
-                    this.logger.warn("Ignoring servce " + ref + " : exception occurred during
registering.", e);
-                } catch (final SchedulerException e) {
-                    // this can happen if deactivate has been called or the scheduling expression
is invalid
-                    this.logger.warn("Ignoring servce " + ref + " : exception occurred during
registering.", e);
-                }
-            }
-        }
-    }
-
-    /**
-     * Unregister a service.
-     * @param ref The service reference.
-     */
-    private void unregister(final ServiceReference ref) {
-        try {
-            final String name = getServiceIdentifier(ref);
-            this.removeJob(name);
-        } catch (NoSuchElementException nsee) {
-            // we ignore this
-        }
-    }
-
-    /**
-     * Bind a new job.
-     * @param ref
-     * @throws Exception
-     */
-    protected void bindJob(final ServiceReference ref) {
-        if ( this.scheduler != null ) {
-            this.register(Registration.JOB, ref);
-        } else {
-            synchronized ( this.registeredJobs ) {
-                this.registeredJobs.add(new Registration(ref, Registration.JOB));
-            }
-        }
-    }
-
-    /**
-     * Unbind a job.
-     * @param ref
-     */
-    protected void unbindJob(final ServiceReference ref) {
-        if ( this.scheduler != null ) {
-            this.unregister(ref);
-        } else {
-            synchronized ( this.registeredJobs ) {
-                this.registeredJobs.remove(new Registration(ref, Registration.JOB));
-            }
-        }
-    }
-
-    /**
-     * Bind a new task.
-     * @param ref
-     * @throws Exception
-     */
-    protected void bindTask(final ServiceReference ref) {
-        if ( this.scheduler != null ) {
-            this.register(Registration.TASK, ref);
-        } else {
-            synchronized ( this.registeredJobs ) {
-                this.registeredJobs.add(new Registration(ref, Registration.TASK));
-            }
-        }
-    }
-
-    /**
-     * Unbind a task.
-     * @param ref
-     */
-    protected void unbindTask(final ServiceReference ref) {
-        if ( this.scheduler != null ) {
-            this.unregister(ref);
-        } else {
-            synchronized ( this.registeredJobs ) {
-                this.registeredJobs.remove(new Registration(ref, Registration.TASK));
-            }
-        }
-    }
-
+    /** Used by the web console plugin. */
     org.quartz.Scheduler getScheduler() {
         return this.scheduler;
     }
 
-    /**
-     * Helper class holding a registration if this service is not active yet.
-     */
-    private static final class Registration {
-        public static final String JOB = "job";
-        public static final String TASK = "task";
-
-        public final ServiceReference reference;
-        public final String componentName;
-
-        public Registration(final ServiceReference r, final String name) {
-            this.reference = r;
-            this.componentName = name;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if ( !(obj instanceof Registration) ) {
-                return false;
-            }
-            if ( obj == this ) {
-                return true;
-            }
-            return this.reference.equals(((Registration)obj).reference);
-        }
-
-        @Override
-        public int hashCode() {
-            return this.reference.hashCode();
-        }
-    }
-
-
     public static final class QuartzThreadPool implements org.quartz.spi.ThreadPool {
 
         /** Our executor thread pool */
@@ -815,7 +602,7 @@ public class QuartzScheduler implements 
             } else if ( opts.runOn.length == 1 && Scheduler.VALUE_RUN_ON_SINGLE.equals(opts.runOn[0]))
{
                 schedule = true;
             } else { // sling IDs
-                final String myId = this.discoveryService.getTopology().getLocalInstance().getSlingId();
+                final String myId = this.settings.getSlingId();
                 for(final String id : opts.runOn ) {
                     if ( myId.equals(id) ) {
                         schedule = true;

Added: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java?rev=1509329&view=auto
==============================================================================
--- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
(added)
+++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
Thu Aug  1 17:00:15 2013
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.commons.scheduler.impl;
+
+import java.util.Date;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.sling.commons.scheduler.Job;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The quartz based implementation of the scheduler.
+ *
+ */
+@Component(immediate=true)
+public class WhiteboardHandler {
+
+    /** Default logger. */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    @Reference
+    private Scheduler scheduler;
+
+    private ServiceTracker serviceTracker;
+
+    /**
+     * Activate this component.
+     * @throws InvalidSyntaxException
+     */
+    @Activate
+    protected void activate(final BundleContext btx) throws InvalidSyntaxException {
+        this.serviceTracker = new ServiceTracker(btx,
+                btx.createFilter("(|(" + Constants.OBJECTCLASS + "=" + Runnable.class.getName()
+ ")" +
+                 "(" + Constants.OBJECTCLASS + "=" + Job.class.getName() + "))"),
+                new ServiceTrackerCustomizer() {
+
+            public synchronized void  removedService(final ServiceReference reference, final
Object service) {
+                btx.ungetService(reference);
+                unregister(reference, service);
+            }
+
+            public synchronized void modifiedService(final ServiceReference reference, final
Object service) {
+                unregister(reference, service);
+                register(reference, service);
+            }
+
+            public synchronized Object addingService(final ServiceReference reference) {
+                final Object obj = btx.getService(reference);
+                if ( obj != null ) {
+                    register(reference, obj);
+                }
+                return obj;
+            }
+        });
+        this.serviceTracker.open();
+    }
+
+    /**
+     * Deactivate this component.
+     * Stop the scheduler.
+     * @param ctx The component context.
+     */
+    @Deactivate
+    protected void deactivate() {
+        if ( this.serviceTracker != null ) {
+            this.serviceTracker.close();
+            this.serviceTracker = null;
+        }
+    }
+
+
+    /**
+     * Create unique identifier
+     * @param type
+     * @param ref
+     * @throws Exception
+     */
+    private String getServiceIdentifier(final ServiceReference ref) {
+        String name = (String)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_NAME);
+        if ( name == null ) {
+            name = (String)ref.getProperty(Constants.SERVICE_PID);
+            if ( name == null ) {
+                name = "Registered Service";
+            }
+        }
+        // now append service id to create a unique identifier
+        name = name + "." + ref.getProperty(Constants.SERVICE_ID);
+        return name;
+    }
+
+    /**
+     * Register a job or task
+     * @param type The type (job or task)
+     * @param ref The service reference
+     */
+    private void register(final ServiceReference ref, final Object job) {
+        final String name = getServiceIdentifier(ref);
+        final Boolean concurrent = (Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_CONCURRENT);
+        final Object runOn = ref.getProperty(Scheduler.PROPERTY_SCHEDULER_RUN_ON);
+        String[] runOnOpts = null;
+        if ( runOn instanceof String ) {
+            runOnOpts = new String[] {runOn.toString()};
+        } else if ( runOn instanceof String[] ) {
+            runOnOpts = (String[])runOn;
+        } else {
+            this.logger.warn("Property {} ignored for scheduler {}", Scheduler.PROPERTY_SCHEDULER_RUN_ON,
ref);
+        }
+        final String expression = (String)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_EXPRESSION);
+        if ( expression != null ) {
+            this.scheduler.schedule(job, this.scheduler.EXPR(expression)
+                    .name(name)
+                    .canRunConcurrently((concurrent != null ? concurrent : true))
+                    .onInstancesOnly(runOnOpts));
+        } else {
+            final Long period = (Long)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_PERIOD);
+            if ( period != null ) {
+                if ( period < 1 ) {
+                    this.logger.debug("Ignoring service {} : scheduler period is less than
1.", ref);
+                } else {
+                    boolean immediate = false;
+                    if ( ref.getProperty(Scheduler.PROPERTY_SCHEDULER_IMMEDIATE) != null
) {
+                        immediate = (Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_IMMEDIATE);
+                    }
+                    final Date date = new Date();
+                    if ( !immediate ) {
+                        date.setTime(System.currentTimeMillis() + period * 1000);
+                    }
+                    this.scheduler.schedule(job, this.scheduler.AT(date, -1, period)
+                            .name(name)
+                            .canRunConcurrently((concurrent != null ? concurrent : true))
+                            .onInstancesOnly(runOnOpts));
+                }
+            } else {
+                this.logger.debug("Ignoring servce {} : no scheduling property found.", ref);
+            }
+        }
+    }
+
+    /**
+     * Unregister a service.
+     * @param ref The service reference.
+     */
+    private void unregister(final ServiceReference reference, final Object service) {
+        final String name = getServiceIdentifier(reference);
+        this.scheduler.unschedule(name);
+    }
+}

Propchange: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/WhiteboardHandler.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url



Mime
View raw message