incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r1479531 - in /sling/trunk/bundles/extensions/event/src/main: java/org/apache/sling/event/impl/jobs/ resources/OSGI-INF/metatype/
Date Mon, 06 May 2013 11:53:22 GMT
Author: cziegeler
Date: Mon May  6 11:53:22 2013
New Revision: 1479531

URL: http://svn.apache.org/r1479531
Log:
SLING-2856 : NPE in JobManagerImpl to read priority for Job
SLING-2829 : Distributed jobs only across local cluster if possible

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
    sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java?rev=1479531&r1=1479530&r2=1479531&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
Mon May  6 11:53:22 2013
@@ -43,6 +43,12 @@ public class JobManagerConfiguration {
     /** The background loader waits this time of seconds after startup before loading events
from the repository. (in secs) */
     public static final String CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY = "load.delay";
 
+    /** Default for disabling the distribution. */
+    public static final boolean DEFAULT_DISABLE_DISTRIBUTION = false;
+
+    /** Configuration switch for distributing the jobs. */
+    public static final String PROPERTY_DISABLE_DISTRIBUTION = "job.consumermanager.disableDistribution";
+
     /** The jobs base path with a slash. */
     private String jobsBasePathWithSlash;
 
@@ -70,7 +76,10 @@ public class JobManagerConfiguration {
 
     private long backgroundLoadDelay;
 
+    private boolean disabledDistribution;
+
     public JobManagerConfiguration(final Map<String, Object> props) {
+        this.disabledDistribution = PropertiesUtil.toBoolean(props.get(PROPERTY_DISABLE_DISTRIBUTION),
DEFAULT_DISABLE_DISTRIBUTION);
         this.jobsBasePathWithSlash = PropertiesUtil.toString(props.get(CONFIG_PROPERTY_REPOSITORY_PATH),
                             DEFAULT_REPOSITORY_PATH) + '/';
 
@@ -199,4 +208,8 @@ public class JobManagerConfiguration {
     public String getPreviousVersionIdentifiedPath() {
         return this.previousVersionIdentifiedPath;
     }
+
+    public boolean disableDistribution() {
+        return this.disabledDistribution;
+    }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1479531&r1=1479530&r2=1479531&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
Mon May  6 11:53:22 2013
@@ -495,14 +495,13 @@ public class JobManagerImpl
                     // convert to integers (JCR supports only long...)
                     jobProperties.put(Job.PROPERTY_JOB_RETRIES, vm.get(Job.PROPERTY_JOB_RETRIES,
Integer.class));
                     jobProperties.put(Job.PROPERTY_JOB_RETRY_COUNT, vm.get(Job.PROPERTY_JOB_RETRY_COUNT,
Integer.class));
-                    jobProperties.put(Job.PROPERTY_JOB_PRIORITY, JobPriority.valueOf(vm.get(Job.PROPERTY_JOB_PRIORITY,
String.class)));
+                    jobProperties.put(Job.PROPERTY_JOB_PRIORITY, JobPriority.valueOf(vm.get(Job.PROPERTY_JOB_PRIORITY,
JobPriority.NORM.name())));
 
                     job = new JobImpl(topic,
                             (String)jobProperties.get(JobUtil.PROPERTY_JOB_NAME),
                             (String)jobProperties.get(JobUtil.JOB_ID),
                             jobProperties);
                 } else {
-                    logger.warn(errorMessage + " : {}", vm);
                     // remove the job as the topic is invalid anyway
                     try {
                         resource.getResourceResolver().delete(resource);
@@ -544,7 +543,7 @@ public class JobManagerImpl
 
     private void startProcessing(final long changeCount, final TopologyView view) {
         // create new capabilities and update view
-        this.topologyCapabilities = new TopologyCapabilities(view, changeCount);
+        this.topologyCapabilities = new TopologyCapabilities(view, this.configuration.disableDistribution(),
changeCount);
 
         this.backgroundLoader.start();
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java?rev=1479531&r1=1479530&r2=1479531&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
Mon May  6 11:53:22 2013
@@ -29,6 +29,7 @@ import java.util.TreeMap;
 import org.apache.sling.discovery.InstanceDescription;
 import org.apache.sling.discovery.TopologyView;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.jobs.QueueConfiguration;
 import org.apache.sling.event.jobs.consumer.JobConsumer;
 import org.slf4j.Logger;
@@ -48,6 +49,9 @@ public class TopologyCapabilities {
     /** Round robin map. */
     private final Map<String, Integer> roundRobinMap = new HashMap<String, Integer>();
 
+    /** Instance map. */
+    private final Map<String, InstanceDescription> instanceMap = new HashMap<String,
InstanceDescription>();
+
     /** Is this the leader of the cluster? */
     private final boolean isLeader;
 
@@ -63,10 +67,14 @@ public class TopologyCapabilities {
     /** Instance comparator. */
     private final InstanceDescriptionComparator instanceComparator;
 
+    /** Disable distribution flag. */
+    private final boolean disableDistribution;
+
     public static final class InstanceDescriptionComparator implements Comparator<InstanceDescription>
{
 
         private final String localClusterId;
 
+
         public InstanceDescriptionComparator(final String clusterId) {
             this.localClusterId = clusterId;
         }
@@ -109,7 +117,8 @@ public class TopologyCapabilities {
         return allInstances;
     }
 
-    public TopologyCapabilities(final TopologyView view, final long changeCount) {
+    public TopologyCapabilities(final TopologyView view, final boolean disableDistribution,
final long changeCount) {
+        this.disableDistribution = disableDistribution;
         this.instanceComparator = new InstanceDescriptionComparator(view.getLocalInstance().getClusterView().getId());
         this.changeCount = changeCount;
         this.isLeader = view.getLocalInstance().isLeader();
@@ -129,6 +138,7 @@ public class TopologyCapabilities {
                     Collections.sort(list, this.instanceComparator);
                 }
             }
+            this.instanceMap.put(desc.getSlingId(), desc);
         }
         this.instanceCapabilities = newCaps;
     }
@@ -196,12 +206,40 @@ public class TopologyCapabilities {
     public String detectTarget(final String jobTopic, final Map<String, Object> jobProperties,
             final QueueInfo queueInfo) {
         final List<InstanceDescription> potentialTargets = this.getPotentialTargets(jobTopic,
jobProperties);
+        logger.debug("Potential targets for {} : {}", jobTopic, potentialTargets);
+        String createdOn = null;
+        if ( jobProperties != null ) {
+            createdOn = (String) jobProperties.get(org.apache.sling.event.jobs.Job.PROPERTY_JOB_CREATED_INSTANCE);
+        }
+        if ( createdOn == null ) {
+            createdOn = Environment.APPLICATION_ID;
+        }
+        final InstanceDescription createdOnInstance = this.instanceMap.get(createdOn);
 
         if ( potentialTargets != null && potentialTargets.size() > 0 ) {
+            if ( createdOnInstance != null ) {
+                // create a list with local targets first.
+                final List<InstanceDescription> localTargets = new ArrayList<InstanceDescription>();
+                for(final InstanceDescription desc : potentialTargets) {
+                    if ( desc.getClusterView().getId().equals(createdOnInstance.getClusterView().getId())
) {
+                        if ( !this.disableDistribution || desc.isLeader() ) {
+                            localTargets.add(desc);
+                        }
+                    }
+                }
+                if ( localTargets != null ) {
+                    potentialTargets.clear();
+                    potentialTargets.addAll(localTargets);
+                    logger.debug("Potential targets filtered for {} : {}", jobTopic, potentialTargets);
+                }
+            }
             if ( queueInfo.queueConfiguration.getType() == QueueConfiguration.Type.ORDERED
) {
                 // for ordered queues we always pick the first as we have to pick the same
target on each cluster view
                 // on all instances (TODO - we could try to do some round robin of the whole
queue)
-                return potentialTargets.get(0).getSlingId();
+                final String result = potentialTargets.get(0).getSlingId();
+                logger.debug("Target for {} : {}", jobTopic, result);
+
+                return result;
             }
             // TODO - this is a simple round robin which is not based on the actual load
             //        of the instances
@@ -213,7 +251,9 @@ public class TopologyCapabilities {
                 index = 0;
             }
             this.roundRobinMap.put(jobTopic, index + 1);
-            return potentialTargets.get(index).getSlingId();
+            final String result = potentialTargets.get(index).getSlingId();
+            logger.debug("Target for {} : {}", jobTopic, result);
+            return result;
         }
 
         return null;

Modified: sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1479531&r1=1479530&r2=1479531&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
(original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
Mon May  6 11:53:22 2013
@@ -71,6 +71,16 @@ queue.waitforasync.description = If a jo
 
 #
 # Job Event Handler
+job.events.name = Apache Sling Job Managers
+job.events.description = Manages job scheduling on a single system as well \
+ as on a cluster. A Job runs only on a single cluster node. \
+ The respective scheduling is persisted in the resource tree and distributed \
+ amongst the cluster instances. The jobs are started \
+ locally on a single cluster node. Most of the configuration is for \
+ configuring the default job queue.
+ 
+#
+# Job Event Handler
 job.events.name = Apache Sling Job Default Queue 
 job.events.description = The configuration of the default job queue.
 
@@ -108,4 +118,3 @@ job.consumermanager.blacklist.desscripti
  processed by this instance. Leaving it empty, all job consumers are enabled. Putting a '*'
as \
  one entry, disables all job consumers. Adding separate topics disables job consumers for
exactly \
  this topic.
-



Mime
View raw message