sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r1633071 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs: config/ console/ queues/
Date Mon, 20 Oct 2014 08:05:50 GMT
Author: cziegeler
Date: Mon Oct 20 08:05:49 2014
New Revision: 1633071

URL: http://svn.apache.org/r1633071
Log:
SLING-4048 : Avoid keeping jobs in memory. Simplify queue configuration handling.

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java?rev=1633071&r1=1633070&r2=1633071&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
Mon Oct 20 08:05:49 2014
@@ -112,6 +112,10 @@ public class MainQueueConfiguration {
         this.mainConfiguration = InternalQueueConfiguration.fromConfiguration(queueProps);
     }
 
+    /**
+     * Return the main queue configuration object.
+     * @return The main queue configuration object.
+     */
     public InternalQueueConfiguration getMainConfiguration() {
         return this.mainConfiguration;
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java?rev=1633071&r1=1633070&r2=1633071&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
Mon Oct 20 08:05:49 2014
@@ -22,18 +22,12 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
 import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.api.resource.LoginException;
-import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.event.impl.support.ResourceHelper;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.osgi.util.tracker.ServiceTracker;
-import org.osgi.util.tracker.ServiceTrackerCustomizer;
 
 
 /**
@@ -41,21 +35,25 @@ import org.osgi.util.tracker.ServiceTrac
  */
 @Component
 @Service(value=QueueConfigurationManager.class)
+@Reference(referenceInterface=InternalQueueConfiguration.class, policy=ReferencePolicy.DYNAMIC,
+           cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
+           bind="bindConfig", unbind="unbindConfig", updated="updateConfig")
 public class QueueConfigurationManager {
 
     public interface QueueConfigurationChangeListener {
         void configChanged();
     }
 
-    /** Configurations - ordered by service ranking. */
-    private volatile InternalQueueConfiguration[] orderedConfigs = new InternalQueueConfiguration[0];
+    /** Empty configuration array. */
+    private static final InternalQueueConfiguration[] EMPTY_CONFIGS = new InternalQueueConfiguration[0];
 
-    /** Service tracker for the configurations. */
-    private ServiceTracker configTracker;
+    /** Configurations - ordered by service ranking. */
+    private volatile InternalQueueConfiguration[] orderedConfigs = EMPTY_CONFIGS;
 
-    /** Tracker count to detect changes. */
-    private volatile int lastTrackerCount = -1;
+    /** All configurations. */
+    private final List<InternalQueueConfiguration> configurations = new ArrayList<InternalQueueConfiguration>();
 
+    /** The main queue configuration. */
     @Reference
     private MainQueueConfiguration mainQueueConfiguration;
 
@@ -63,76 +61,63 @@ public class QueueConfigurationManager {
     private final List<QueueConfigurationChangeListener> listeners = new ArrayList<QueueConfigurationChangeListener>();
 
     /**
-     * Activate this component.
-     * Create the service tracker and start it.
+     * Add a new queue configuration.
+     * @param config A new queue configuration.
      */
-    @Activate
-    protected void activate(final BundleContext bundleContext)
-    throws LoginException, PersistenceException {
-        this.configTracker = new ServiceTracker(bundleContext,
-                InternalQueueConfiguration.class.getName(), new ServiceTrackerCustomizer()
{
-
-                    @Override
-                    public void removedService(final ServiceReference reference, final Object
service) {
-                        bundleContext.ungetService(reference);
-                        updateListeners();
-                    }
-
-                    @Override
-                    public void modifiedService(ServiceReference reference, Object service)
{
-                        // nothing to do
-                    }
-
-                    @Override
-                    public Object addingService(final ServiceReference reference) {
-                        return bundleContext.getService(reference);
-                    }
-                });
-        this.configTracker.open();
+    protected void bindConfig(final InternalQueueConfiguration config) {
+        synchronized ( configurations ) {
+            configurations.add(config);
+            this.createConfigurationCache();
+        }
     }
 
     /**
-     * Deactivate this component.
-     * Stop the service tracker.
-     */
-    @Deactivate
-    protected void deactivate() {
-        if ( this.configTracker != null ) {
-            this.configTracker.close();
-            this.configTracker = null;
+     * Remove a queue configuration.
+     * @param config The queue configuraiton.
+     */
+    protected void unbindConfig(final InternalQueueConfiguration config) {
+        synchronized ( configurations ) {
+            configurations.remove(config);
+            this.createConfigurationCache();
+        }
+    }
+
+    /**
+     * Update a queue configuration.
+     * @param config The queue configuraiton.
+     */
+    protected void updateConfig(final InternalQueueConfiguration config) {
+        // InternalQueueConfiguration does not implement modified atm,
+        // but we handle this case anyway
+        synchronized ( configurations ) {
+            this.createConfigurationCache();
+        }
+    }
+
+    /**
+     * Create the configurations cache used by clients.
+     */
+    private void createConfigurationCache() {
+        if ( this.configurations.isEmpty() ) {
+            this.orderedConfigs = EMPTY_CONFIGS;
+        } else {
+            Collections.sort(configurations);
+            orderedConfigs = configurations.toArray(new InternalQueueConfiguration[configurations.size()]);
         }
     }
 
     /**
      * Return all configurations.
+     * @return An array with all queue configurations except the main queue. Array might
be empty.
      */
     public InternalQueueConfiguration[] getConfigurations() {
-        final int count = this.configTracker.getTrackingCount();
-        InternalQueueConfiguration[] configurations = this.orderedConfigs;
-        if ( this.lastTrackerCount < count ) {
-            synchronized ( this ) {
-                configurations = this.orderedConfigs;
-                if ( this.lastTrackerCount < count ) {
-                    final Object[] trackedConfigs = this.configTracker.getServices();
-                    if ( trackedConfigs == null || trackedConfigs.length == 0 ) {
-                        configurations = new InternalQueueConfiguration[0];
-                    } else {
-                        final List<InternalQueueConfiguration> configs = new ArrayList<InternalQueueConfiguration>();
-                        for(final Object entry : trackedConfigs) {
-                            final InternalQueueConfiguration config = (InternalQueueConfiguration)entry;
-                            configs.add(config);
-                        }
-                        Collections.sort(configs);
-                        configurations = configs.toArray(new InternalQueueConfiguration[configs.size()]);
-                    }
-                    this.orderedConfigs = configurations;
-                    this.lastTrackerCount = count;
-                }
-            }
-        }
-        return configurations;
+        return orderedConfigs;
     }
 
+    /**
+     * Get the configuration for the main queue.
+     * @return The configuration for the main queue.
+     */
     public InternalQueueConfiguration getMainQueueConfiguration() {
         return this.mainQueueConfiguration.getMainConfiguration();
     }
@@ -146,6 +131,22 @@ public class QueueConfigurationManager {
         public String toString() {
             return queueName;
         }
+
+        @Override
+        public int hashCode() {
+            return queueName.hashCode();
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            if ( obj == this ) {
+                return true;
+            }
+            if ( obj instanceof QueueInfo ) {
+                return ((QueueInfo)obj).queueName.equals(this.queueName);
+            }
+            return false;
+        }
     }
 
     /**
@@ -173,10 +174,6 @@ public class QueueConfigurationManager {
         return result;
     }
 
-    public int getChangeCount() {
-        return this.configTracker.getTrackingCount();
-    }
-
     public void addListener(final QueueConfigurationChangeListener listener) {
         synchronized ( this.listeners ) {
             this.listeners.add(listener);

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java?rev=1633071&r1=1633070&r2=1633071&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java
Mon Oct 20 08:05:49 2014
@@ -40,7 +40,7 @@ import org.apache.sling.discovery.Instan
 import org.apache.sling.event.impl.jobs.JobConsumerManager;
 import org.apache.sling.event.impl.jobs.JobManagerImpl;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.Queue;
@@ -69,7 +69,7 @@ public class InventoryPlugin implements 
     private JobManager jobManager;
 
     @Reference
-    private QueueConfigurationManager queueConfigManager;
+    private JobManagerConfiguration configuration;
 
     @Reference
     private JobConsumerManager jobConsumerManager;
@@ -275,8 +275,8 @@ public class InventoryPlugin implements 
 
         pw.println("Apache Sling Job Handling - Job Queue Configurations");
         pw.println("----------------------------------------------------");
-        this.printQueueConfiguration(pw, this.queueConfigManager.getMainQueueConfiguration());
-        final InternalQueueConfiguration[] configs = this.queueConfigManager.getConfigurations();
+        this.printQueueConfiguration(pw, this.configuration.getQueueConfigurationManager().getMainQueueConfiguration());
+        final InternalQueueConfiguration[] configs = this.configuration.getQueueConfigurationManager().getConfigurations();
         for(final InternalQueueConfiguration c : configs ) {
             this.printQueueConfiguration(pw, c);
         }
@@ -425,8 +425,8 @@ public class InventoryPlugin implements 
 
         pw.println(",");
         pw.println("  \"configurations\" : [");
-        this.printQueueConfigurationJson(pw, this.queueConfigManager.getMainQueueConfiguration());
-        final InternalQueueConfiguration[] configs = this.queueConfigManager.getConfigurations();
+        this.printQueueConfigurationJson(pw, this.configuration.getQueueConfigurationManager().getMainQueueConfiguration());
+        final InternalQueueConfiguration[] configs = this.configuration.getQueueConfigurationManager().getConfigurations();
         for(final InternalQueueConfiguration c : configs ) {
             pw.println(",");
             this.printQueueConfigurationJson(pw, c);

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java?rev=1633071&r1=1633070&r2=1633071&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
Mon Oct 20 08:05:49 2014
@@ -41,7 +41,7 @@ import org.apache.sling.discovery.Instan
 import org.apache.sling.event.impl.jobs.JobConsumerManager;
 import org.apache.sling.event.impl.jobs.JobManagerImpl;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobManager;
@@ -80,7 +80,7 @@ public class WebConsolePlugin extends Ht
     private JobManager jobManager;
 
     @Reference
-    private QueueConfigurationManager queueConfigManager;
+    private JobManagerConfiguration configuration;
 
     @Reference
     private JobConsumerManager jobConsumerManager;
@@ -363,8 +363,8 @@ public class WebConsolePlugin extends Ht
         }
 
         pw.println("<p class='statline'>Apache Sling Job Handling - Job Queue Configurations</p>");
-        this.printQueueConfiguration(req, pw, this.queueConfigManager.getMainQueueConfiguration());
-        final InternalQueueConfiguration[] configs = this.queueConfigManager.getConfigurations();
+        this.printQueueConfiguration(req, pw, this.configuration.getQueueConfigurationManager().getMainQueueConfiguration());
+        final InternalQueueConfiguration[] configs = this.configuration.getQueueConfigurationManager().getConfigurations();
         for(final InternalQueueConfiguration c : configs ) {
             this.printQueueConfiguration(req, pw, c);
         }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1633071&r1=1633070&r2=1633071&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
Mon Oct 20 08:05:49 2014
@@ -115,6 +115,12 @@ public abstract class AbstractJobQueue
     /** The job cache. */
     private final QueueJobCache cache;
 
+    /** Flag to mark whether the queue os waiting for the next job. */
+    private volatile boolean isWaitingForNextJob = false;
+
+    /** Sync object for {@link #isWaitingForNextJob}. */
+    private final Object nextJobLock = new Object();
+
     /**
      * Create a new queue
      * @param name The queue name
@@ -225,7 +231,7 @@ public abstract class AbstractJobQueue
      * Check whether this queue can be closed
      */
     protected boolean canBeClosed() {
-        return !this.isWaiting && !this.isSuspended() && this.asyncCounter.get()
== 0;
+        return !this.isWaiting && !this.isSuspended() && this.cache.isEmpty()
&& this.asyncCounter.get() == 0;
     }
 
     /**
@@ -323,9 +329,6 @@ public abstract class AbstractJobQueue
         }
     }
 
-    private volatile boolean isWaitingForNextJob = false;
-    private final Object nextJobLock = new Object();
-
     /**
      * Take a new job for this queue.
      * This method blocks until a job is available or the queue is stopped.
@@ -336,25 +339,25 @@ public abstract class AbstractJobQueue
         logger.debug("Taking new job for {}", queueName);
         JobImpl result = null;
 
-        this.isWaitingForNextJob = true;
-        while ( this.isWaitingForNextJob && !this.isOutdated()) {
+        while ( result == null && !this.isOutdated() && this.running ) {
+            this.isWaitingForNextJob = true;
+
             result = this.cache.getNextJob();
-            if ( result != null ) {
-                isWaitingForNextJob = false;
-            } else {
+            if ( result == null && !this.isOutdated() && this.running ) {
                 // block
                 synchronized ( nextJobLock ) {
-                    if ( isWaitingForNextJob ) {
+                    while ( isWaitingForNextJob ) {
                         try {
-                            nextJobLock.wait();
+                            nextJobLock.wait(30000);
+                            isWaitingForNextJob = false;
                         } catch ( final InterruptedException ignore ) {
                             Thread.currentThread().interrupt();
                         }
                     }
                 }
             }
+            this.isWaitingForNextJob = false;
         }
-        this.isWaitingForNextJob = false;
 
         if ( logger.isDebugEnabled() ) {
             logger.debug("Returning job for {} : {}", queueName, Utility.toString(result));
@@ -437,6 +440,7 @@ public abstract class AbstractJobQueue
     }
 
     private boolean startJobExecution(final JobHandler handler, final JobExecutor consumer)
{
+        this.closeMarker.set(false);
         final JobImpl job = handler.getJob();
         if ( handler.startProcessing(this) ) {
             if ( logger.isDebugEnabled() ) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1633071&r1=1633070&r2=1633071&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
Mon Oct 20 08:05:49 2014
@@ -60,15 +60,13 @@ public final class OrderedJobQueue exten
                 }
                 if ( this.sleepDelay > 0 ) {
                     final long waitingTime = this.sleepDelay;
-                    this.sleepDelay = -1;
                     final long startTime = System.currentTimeMillis();
                     this.logger.debug("Job queue {} is sleeping {}ms for retry.", this.queueName,
waitingTime);
-                    this.isWaiting = true;
-                    while ( this.isWaiting ) {
+                    while ( this.sleepDelay > 0 ) {
                         try {
                             this.syncLock.wait(waitingTime);
                             if ( System.currentTimeMillis() >= startTime + waitingTime
) {
-                                this.isWaiting = false;
+                                this.sleepDelay = -1;
                             }
                         } catch (final InterruptedException e) {
                             this.ignoreException(e);

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java?rev=1633071&r1=1633070&r2=1633071&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
Mon Oct 20 08:05:49 2014
@@ -90,6 +90,23 @@ public class QueueJobCache {
     }
 
     /**
+     * Check whether there are jobs for this queue
+     * @return {@code true} if there is any job outstanding.
+     */
+    public boolean isEmpty() {
+        boolean result = true;
+        synchronized ( this.cache ) {
+            result = this.cache.isEmpty();
+        }
+        if ( result ) {
+            synchronized ( this.topicsWithNewJobs ) {
+                result = this.topicsWithNewJobs.isEmpty();
+            }
+        }
+        return result;
+    }
+
+    /**
      * Get the next job.
      * This method is not called concurrently, however
      * {@link #reschedule(JobImpl)} and {@link #handleNewJob(String)}

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1633071&r1=1633070&r2=1633071&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
Mon Oct 20 08:05:49 2014
@@ -72,8 +72,8 @@ import org.slf4j.LoggerFactory;
 @Component(immediate=true)
 @Service(value={Runnable.class, QueueManager.class, EventHandler.class})
 @Properties({
-    @Property(name="scheduler.period", longValue=60, propertyPrivate=true),
-    @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true),
+    @Property(name="scheduler.period", longValue=60),
+    @Property(name="scheduler.concurrent", boolValue=false),
     @Property(name=EventConstants.EVENT_TOPIC, value=NotificationConstants.TOPIC_JOB_ADDED)
 })
 public class QueueManager
@@ -281,7 +281,6 @@ public class QueueManager
         synchronized ( queuesLock ) {
             final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values());
             for(final AbstractJobQueue queue : queues ) {
-                queue.clear();
                 this.outdateQueue(queue);
             }
         }



Mime
View raw message