sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stefane...@apache.org
Subject svn commit: r1587826 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/ main/java/org/apache/sling/event/impl/jobs/config/ main/java/org/apache/sling/event/impl/support/ test/java/org/apache/sling/event/it/
Date Wed, 16 Apr 2014 07:59:26 GMT
Author: stefanegli
Date: Wed Apr 16 07:59:26 2014
New Revision: 1587826

URL: http://svn.apache.org/r1587826
Log:
SLING-3502 : handling of main queue name fixed: not filtering active main queue name, as kept
in the queues map

Added:
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OutdatedMainQueueTest.java
Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1587826&r1=1587825&r2=1587826&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
Wed Apr 16 07:59:26 2014
@@ -58,6 +58,7 @@ import org.apache.sling.discovery.Topolo
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.impl.EnvironmentComponent;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.MainQueueConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
 import org.apache.sling.event.impl.jobs.jmx.QueueStatusEvent;
@@ -403,7 +404,8 @@ public class JobManagerImpl
 
     private void outdateQueue(final AbstractJobQueue queue) {
         // remove the queue with the old name
-        final String oldName = ResourceHelper.filterName(queue.getName());
+        // check for main queue
+        final String oldName = ResourceHelper.filterQueueName(queue.getName());
         this.queues.remove(oldName);
         // check if we can close or have to rename
         if ( queue.tryToClose() ) {
@@ -665,7 +667,7 @@ public class JobManagerImpl
      */
     @Override
     public Queue getQueue(final String name) {
-        return this.queues.get(ResourceHelper.filterName(name));
+        return this.queues.get(ResourceHelper.filterQueueName(name));
     }
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java?rev=1587826&r1=1587825&r2=1587826&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
Wed Apr 16 07:59:26 2014
@@ -57,6 +57,8 @@ import org.slf4j.LoggerFactory;
 })
 public class MainQueueConfiguration {
 
+    public static final String MAIN_QUEUE_NAME = "<main queue>";
+
     /** Default logger. */
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
@@ -80,7 +82,7 @@ public class MainQueueConfiguration {
         // create a new dictionary with the missing info and do some sanity puts
         final Map<String, Object> queueProps = new HashMap<String, Object>(props);
         queueProps.put(ConfigurationConstants.PROP_TOPICS, "*");
-        queueProps.put(ConfigurationConstants.PROP_NAME, "<main queue>");
+        queueProps.put(ConfigurationConstants.PROP_NAME, MAIN_QUEUE_NAME);
         queueProps.put(ConfigurationConstants.PROP_TYPE, InternalQueueConfiguration.Type.UNORDERED);
 
         // check max parallel - this should never be lower than 2!

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java?rev=1587826&r1=1587825&r2=1587826&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
Wed Apr 16 07:59:26 2014
@@ -36,6 +36,7 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.config.MainQueueConfiguration;
 import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobUtil;
@@ -127,6 +128,20 @@ public abstract class ResourceHelper {
     }
 
     /**
+     * Filter the queue name for not allowed characters and replace them
+     * - with the exception of the main queue, which will not be filtered
+     * @param queueName the suggested queue name
+     * @return the filtered queue name
+     */
+    public static String filterQueueName(final String queueName) {
+        if ( queueName.equals(MainQueueConfiguration.MAIN_QUEUE_NAME) ) {
+            return queueName;
+        } else {
+            return ResourceHelper.filterName(queueName);
+        }
+    }
+    
+    /**
      * Filter the node name for not allowed characters and replace them.
      * @param nodeName The suggested node name.
      * @return The filtered node name.

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OutdatedMainQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OutdatedMainQueueTest.java?rev=1587826&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OutdatedMainQueueTest.java
(added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OutdatedMainQueueTest.java
Wed Apr 16 07:59:26 2014
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerMethod;
+import org.osgi.framework.ServiceRegistration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * IT for validating SLING-3502.
+ */
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerMethod.class)
+public class OutdatedMainQueueTest extends AbstractJobHandlingTest {
+
+    private static final String QUEUE_NAME = "<main queue>";
+
+    private static final String TOPIC = "sling/event/test/queue/outdate";
+
+    private static int NUM_JOBS = 2;
+
+    @Override
+    @Before
+    public void setup() throws IOException {
+        super.setup();
+    }
+
+    @After
+    public void cleanUp() throws IOException {
+        super.cleanup();
+    }
+
+    @Test(timeout = DEFAULT_TEST_TIMEOUT)
+    public void testOutdatedMainQueue() throws Exception {
+        final AtomicInteger count = new AtomicInteger(0);
+
+        final ServiceRegistration testConsumerRegistration = this.registerJobConsumer(TOPIC,
+                new JobConsumer() {
+                    @Override
+                    public JobResult process(Job job) {
+                        count.incrementAndGet();
+                        return JobResult.OK;
+                    }
+                });
+
+        try {
+            final JobManager jobManager = this.getJobManager();
+
+            //reset the statistic
+            jobManager.getStatistics().reset();
+
+            //first start some jobs, this will implicitly instantiate the main queue
+            List<Job> testJobs = new ArrayList<Job>();
+            for (int i = 0; i < NUM_JOBS; i++) {
+                Job job = jobManager.createJob(TOPIC).add();
+                testJobs.add(job);
+            }
+
+            //wait for test jobs to complete
+            while (!testJobsDone(testJobs, jobManager)) {
+                Thread.sleep(1000);
+            }
+
+            //assert successful job execution
+            assertEquals("No queued jobs expected", 0, jobManager.getStatistics().getNumberOfQueuedJobs());
+            assertEquals("Number of executed jobs does not match", NUM_JOBS, count.get());
+            assertEquals("Number of finished jobs does not match", NUM_JOBS, jobManager.getStatistics().getNumberOfFinishedJobs());
+
+            //assert the queues: only main queue expected, not outdated
+            List<Queue> queues = getQueues(jobManager);
+            assertEquals("Only 1 job queue expected", 1, queues.size());
+            Queue mainQueue = jobManager.getQueue("<main queue>");
+            assertNotNull("Main queue expected", mainQueue);
+            assertEquals("Main queue expected", "<main queue>", mainQueue.getName());
+            assertSame("Two different main queues", queues.get(0), mainQueue);
+
+            //restart job manager, has same effect as topology change, as well calls  JobManagerImpl.outdateQueue()
+            jobManager.restart();
+
+            // we wait a little bit
+            Thread.sleep(1200);
+
+            //assert the queues: only one outdated main queue expected
+            queues = getQueues(jobManager);
+            assertEquals("Only 1 job queue expected", 1, queues.size());
+            mainQueue = queues.get(0);
+            String outdatedQueueName = getOutdatedMainQueueName(mainQueue);
+            mainQueue = jobManager.getQueue(outdatedQueueName);
+            assertNotNull("Outdated main queue expected", mainQueue);
+            assertEquals("Outdated main queue expected", getOutdatedMainQueueName(mainQueue),
mainQueue.getName());
+
+        } finally {
+            testConsumerRegistration.unregister();
+        }
+    }
+
+    private List<Queue> getQueues(JobManager jobManager) {
+        List<Queue> queues = new ArrayList<Queue>();
+        Iterator<Queue> queueIter = jobManager.getQueues().iterator();
+        while (queueIter.hasNext()) {
+            queues.add(queueIter.next());
+        }
+        return queues;
+    }
+
+    private String getOutdatedMainQueueName(Queue queue) {
+        return "<main queue><outdated>(" + queue.hashCode() + ")";
+    }
+
+    private boolean testJobsDone(List<Job> testJobs, JobManager jobManager) {
+        boolean done = false;
+        for (Job testJob : testJobs) {
+            Job tmpJob = jobManager.getJobById(testJob.getId());
+            if (tmpJob == null || tmpJob.getJobState().equals(Job.JobState.SUCCEEDED)) {
+                done = true;
+            } else {
+                done = false;
+                break;
+            }
+        }
+        return done;
+    }
+}



Mime
View raw message