sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r1632486 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/timed/ test/java/org/apache/sling/event/it/
Date Fri, 17 Oct 2014 07:58:40 GMT
Author: cziegeler
Date: Fri Oct 17 07:58:39 2014
New Revision: 1632486

URL: http://svn.apache.org/r1632486
Log:
SLING-4048 : Avoid keeping jobs in memory.  Increase test covers, update test setup to more
recent bundles (WiP)

Added:
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
  (with props)
Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/TimedEventSender.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/TimedEventSender.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/TimedEventSender.java?rev=1632486&r1=1632485&r2=1632486&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/TimedEventSender.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/TimedEventSender.java
Fri Oct 17 07:58:39 2014
@@ -552,6 +552,15 @@ public class TimedEventSender
                 properties.remove(EventConstants.EVENT_TOPIC);
                 properties.put(TimedEventStatusProvider.PROPERTY_EVENT_ID, topic.replace('/',
'.') + '/' + eventResource.getName());
 
+                final Object date = properties.get(EventUtil.PROPERTY_TIMED_EVENT_DATE);
+                if ( date != null && !(date instanceof Date) ) {
+                    if ( date instanceof Calendar ) {
+                        properties.put(EventUtil.PROPERTY_TIMED_EVENT_DATE, ((Calendar)date).getTime()
);
+                    } else {
+                        logger.error("Unable to read event: date property is neither date
nor calendar!");
+                        return null;
+                    }
+                }
                 try {
                     result.event = new Event(topic, properties);
                     return result;

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java?rev=1632486&r1=1632485&r2=1632486&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
(original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
Fri Oct 17 07:58:39 2014
@@ -110,7 +110,7 @@ public abstract class AbstractJobHandlin
                 mavenBundle("org.apache.tika", "tika-bundle", "1.2"),
 
                 mavenBundle("org.apache.felix", "org.apache.felix.http.jetty", "2.2.2"),
-                mavenBundle("org.apache.felix", "org.apache.felix.eventadmin", "1.3.2"),
+                mavenBundle("org.apache.felix", "org.apache.felix.eventadmin", "1.4.2"),
                 mavenBundle("org.apache.felix", "org.apache.felix.scr", "1.8.2"),
                 mavenBundle("org.apache.felix", "org.apache.felix.configadmin", "1.8.0"),
                 mavenBundle("org.apache.felix", "org.apache.felix.inventory", "1.0.4"),
@@ -120,22 +120,22 @@ public abstract class AbstractJobHandlin
                 mavenBundle("org.apache.sling", "org.apache.sling.commons.json", "2.0.6"),
                 mavenBundle("org.apache.sling", "org.apache.sling.commons.mime", "2.1.4"),
                 mavenBundle("org.apache.sling", "org.apache.sling.commons.classloader", "1.3.2"),
-                mavenBundle("org.apache.sling", "org.apache.sling.commons.scheduler", "2.4.2"),
+                mavenBundle("org.apache.sling", "org.apache.sling.commons.scheduler", "2.4.4"),
                 mavenBundle("org.apache.sling", "org.apache.sling.commons.threads", "3.2.0"),
 
                 mavenBundle("org.apache.sling", "org.apache.sling.launchpad.api", "1.1.0"),
-                mavenBundle("org.apache.sling", "org.apache.sling.auth.core", "1.1.6"),
-                mavenBundle("org.apache.sling", "org.apache.sling.discovery.api", "1.0.0"),
+                mavenBundle("org.apache.sling", "org.apache.sling.auth.core", "1.3.0"),
+                mavenBundle("org.apache.sling", "org.apache.sling.discovery.api", "1.0.2"),
                 mavenBundle("org.apache.sling", "org.apache.sling.discovery.standalone",
"1.0.0"),
 
-                mavenBundle("org.apache.sling", "org.apache.sling.api", "2.7.0"),
-                mavenBundle("org.apache.sling", "org.apache.sling.settings", "1.3.0"),
-                mavenBundle("org.apache.sling", "org.apache.sling.resourceresolver", "1.1.0"),
-                mavenBundle("org.apache.sling", "org.apache.sling.adapter", "2.1.0"),
-                mavenBundle("org.apache.sling", "org.apache.sling.jcr.resource", "2.3.6"),
-                mavenBundle("org.apache.sling", "org.apache.sling.jcr.classloader", "3.2.0"),
+                mavenBundle("org.apache.sling", "org.apache.sling.api", "2.8.0"),
+                mavenBundle("org.apache.sling", "org.apache.sling.settings", "1.3.4"),
+                mavenBundle("org.apache.sling", "org.apache.sling.resourceresolver", "1.1.6"),
+                mavenBundle("org.apache.sling", "org.apache.sling.adapter", "2.1.2"),
+                mavenBundle("org.apache.sling", "org.apache.sling.jcr.resource", "2.3.8"),
+                mavenBundle("org.apache.sling", "org.apache.sling.jcr.classloader", "3.2.2"),
                 mavenBundle("org.apache.sling", "org.apache.sling.jcr.contentloader", "2.1.8"),
-                mavenBundle("org.apache.sling", "org.apache.sling.engine", "2.3.2"),
+                mavenBundle("org.apache.sling", "org.apache.sling.engine", "2.3.6"),
                 mavenBundle("org.apache.sling", "org.apache.sling.serviceusermapper", "1.0.0"),
 
                 mavenBundle("org.apache.sling", "org.apache.sling.jcr.jcr-wrapper", "2.0.0"),
@@ -168,6 +168,7 @@ public abstract class AbstractJobHandlin
         try {
             Thread.sleep(time);
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             // ignore
         }
     }

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java?rev=1632486&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
(added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
Fri Oct 17 07:58:39 2014
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.NotificationConstants;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerMethod;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerMethod.class)
+public class ChaosTest extends AbstractJobHandlingTest {
+
+    /** Duration for firing jobs in seconds. */
+    private static final long DURATION = 3 * 60;
+
+    private static final int NUM_ORDERED_THREADS = 3;
+    private static final int NUM_PARALLEL_THREADS = 6;
+    private static final int NUM_ROUND_THREADS = 6;
+
+    private static final int NUM_ORDERED_TOPICS = 2;
+    private static final int NUM_PARALLEL_TOPICS = 8;
+    private static final int NUM_ROUND_TOPICS = 8;
+
+    private static final String ORDERED_TOPIC_PREFIX = "sling/chaos/ordered/";
+    private static final String PARALLEL_TOPIC_PREFIX = "sling/chaos/parallel/";
+    private static final String ROUND_TOPIC_PREFIX = "sling/chaos/round/";
+
+    private static final String[] ORDERED_TOPICS = new String[NUM_ORDERED_TOPICS];
+    private static final String[] PARALLEL_TOPICS = new String[NUM_PARALLEL_TOPICS];
+    private static final String[] ROUND_TOPICS = new String[NUM_ROUND_TOPICS];
+
+    static {
+        for(int i=0; i<NUM_ORDERED_TOPICS; i++) {
+            ORDERED_TOPICS[i] = ORDERED_TOPIC_PREFIX + String.valueOf(i);
+        }
+        for(int i=0; i<NUM_PARALLEL_TOPICS; i++) {
+            PARALLEL_TOPICS[i] = PARALLEL_TOPIC_PREFIX + String.valueOf(i);
+        }
+        for(int i=0; i<NUM_ROUND_TOPICS; i++) {
+            ROUND_TOPICS[i] = ROUND_TOPIC_PREFIX + String.valueOf(i);
+        }
+    }
+
+    private String orderedQueueConfPid;
+
+    private String topicRRQueueConfPid;
+
+
+    @Override
+    @Before
+    public void setup() throws IOException {
+        super.setup();
+
+        // create ordered test queue
+        final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration",
null);
+        final Dictionary<String, Object> orderedProps = new Hashtable<String, Object>();
+        orderedProps.put(ConfigurationConstants.PROP_NAME, "chaos-ordered");
+        orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name());
+        orderedProps.put(ConfigurationConstants.PROP_TOPICS, ORDERED_TOPICS);
+        orderedProps.put(ConfigurationConstants.PROP_RETRIES, 2);
+        orderedProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
+        orderedConfig.update(orderedProps);
+
+        orderedQueueConfPid = orderedConfig.getPid();
+
+        // create round robin test queue
+        final org.osgi.service.cm.Configuration rrConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration",
null);
+        final Dictionary<String, Object> rrProps = new Hashtable<String, Object>();
+        rrProps.put(ConfigurationConstants.PROP_NAME, "chaos-roundrobin");
+        rrProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.TOPIC_ROUND_ROBIN.name());
+        rrProps.put(ConfigurationConstants.PROP_TOPICS, ROUND_TOPICS);
+        rrProps.put(ConfigurationConstants.PROP_RETRIES, 2);
+        rrProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
+        rrProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, 5);
+        rrConfig.update(rrProps);
+
+        topicRRQueueConfPid = rrConfig.getPid();
+
+        this.sleep(1000L);
+    }
+
+    @After
+    public void cleanUp() throws IOException {
+        this.removeConfiguration(this.orderedQueueConfPid);
+        this.removeConfiguration(this.topicRRQueueConfPid);
+        super.cleanup();
+    }
+
+    /**
+     * Setup consumers
+     */
+    private void setupJobConsumers(final List<ServiceRegistration> registrations) {
+        for(int i=0; i<NUM_ORDERED_TOPICS; i++) {
+            registrations.add(this.registerJobConsumer(ORDERED_TOPICS[i],
+
+                new JobConsumer() {
+
+                    @Override
+                    public JobResult process(final Job job) {
+                        return JobResult.OK;
+                    }
+                }));
+        }
+        for(int i=0; i<NUM_PARALLEL_TOPICS; i++) {
+            registrations.add(this.registerJobConsumer(PARALLEL_TOPICS[i],
+
+                new JobConsumer() {
+
+                    @Override
+                    public JobResult process(final Job job) {
+                        return JobResult.OK;
+                    }
+                }));
+        }
+        for(int i=0; i<NUM_ROUND_TOPICS; i++) {
+            registrations.add(this.registerJobConsumer(ROUND_TOPICS[i],
+
+                new JobConsumer() {
+
+                    @Override
+                    public JobResult process(final Job job) {
+                        return JobResult.OK;
+                    }
+                }));
+        }
+    }
+
+    private static final class CreateJobThread extends Thread {
+
+        private final String[] topics;
+
+        private final JobManager jobManager;
+
+        private final Random random = new Random();
+
+        final Map<String, AtomicLong> created;
+
+        final AtomicLong finishedThreads;
+
+        public CreateJobThread(final JobManager jobManager,
+                final String[] topics,
+                final Map<String, AtomicLong> created,
+                final AtomicLong finishedThreads) {
+            this.topics = topics;
+            this.jobManager = jobManager;
+            this.created = created;
+            this.finishedThreads = finishedThreads;
+        }
+
+        @Override
+        public void run() {
+            int index = 0;
+            final long startTime = System.currentTimeMillis();
+            final long endTime = startTime + DURATION * 1000;
+            while ( System.currentTimeMillis() < endTime ) {
+                final String topic = topics[index];
+                jobManager.addJob(topic, null);
+                created.get(topic).incrementAndGet();
+
+                index++;
+                if ( index == topics.length ) {
+                    index = 0;
+                }
+
+                final int sleepTime = random.nextInt(200);
+                try {
+                    Thread.sleep(sleepTime);
+                } catch ( final InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+            finishedThreads.incrementAndGet();
+        }
+
+    }
+
+    /**
+     * Setup job creation threads
+     */
+    private void setupJobCreationThreads(final List<Thread> threads,
+            final JobManager jobManager,
+            final Map<String, AtomicLong> created,
+            final AtomicLong finishedThreads) {
+        for(int i=0;i<NUM_ORDERED_THREADS;i++) {
+            threads.add(new CreateJobThread(jobManager, ORDERED_TOPICS, created, finishedThreads));
+        }
+        for(int i=0;i<NUM_PARALLEL_THREADS;i++) {
+            threads.add(new CreateJobThread(jobManager, PARALLEL_TOPICS, created, finishedThreads));
+        }
+        for(int i=0;i<NUM_ROUND_THREADS;i++) {
+            threads.add(new CreateJobThread(jobManager, ROUND_TOPICS, created, finishedThreads));
+        }
+    }
+
+    /**
+     * Setup chaos threads
+     */
+    private void setupChaosThreads(final List<Thread> threads,
+            final AtomicLong finishedThreads) {
+        // no chaos for now
+    }
+
+    @Test(timeout=DURATION * 3000)
+    public void testDoChaos() throws Exception {
+        final JobManager jobManager = this.getJobManager();
+
+        // setup created map
+        final Map<String, AtomicLong> created = new HashMap<String, AtomicLong>();
+        final Map<String, AtomicLong> finished = new HashMap<String, AtomicLong>();
+        final List<String> topics = new ArrayList<String>();
+        for(int i=0;i<NUM_ORDERED_TOPICS;i++) {
+            created.put(ORDERED_TOPICS[i], new AtomicLong());
+            finished.put(ORDERED_TOPICS[i], new AtomicLong());
+            topics.add(ORDERED_TOPICS[i]);
+        }
+        for(int i=0;i<NUM_PARALLEL_TOPICS;i++) {
+            created.put(PARALLEL_TOPICS[i], new AtomicLong());
+            finished.put(PARALLEL_TOPICS[i], new AtomicLong());
+            topics.add(PARALLEL_TOPICS[i]);
+        }
+        for(int i=0;i<NUM_ROUND_TOPICS;i++) {
+            created.put(ROUND_TOPICS[i], new AtomicLong());
+            finished.put(ROUND_TOPICS[i], new AtomicLong());
+            topics.add(ROUND_TOPICS[i]);
+        }
+
+        final List<ServiceRegistration> registrations = new ArrayList<ServiceRegistration>();
+        final List<Thread> threads = new ArrayList<Thread>();
+        final AtomicLong finishedThreads = new AtomicLong();
+
+        final ServiceRegistration eventHandler = this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+                new EventHandler() {
+
+                    @Override
+                    public void handleEvent(final Event event) {
+                        final String topic = (String) event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
+                        finished.get(topic).incrementAndGet();
+                    }
+                });
+        try {
+            // setup job consumers
+            this.setupJobConsumers(registrations);
+
+            // setup job creation tests
+            this.setupJobCreationThreads(threads, jobManager, created, finishedThreads);
+
+            this.setupChaosThreads(threads, finishedThreads);
+
+            System.out.println("Starting threads...");
+            // start threads
+            for(final Thread t : threads) {
+                t.start();
+            }
+
+            System.out.println("Sleeping for " + DURATION + " seconds to wait for threads
to finish...");
+            // for sure we can sleep for the duration
+            this.sleep(DURATION * 1000);
+
+            System.out.println("Polling for threads to finish...");
+            // wait until threads are finished
+            while ( finishedThreads.get() < threads.size() ) {
+                this.sleep(100);
+            }
+
+            System.out.println("Waiting for job handling to finish...");
+            while ( !topics.isEmpty() ) {
+                final Iterator<String> iter = topics.iterator();
+                while ( iter.hasNext() ) {
+                    final String topic = iter.next();
+                    if ( finished.get(topic).get() == created.get(topic).get() ) {
+                        iter.remove();
+                    }
+                }
+                this.sleep(100);
+            }
+        } finally {
+            eventHandler.unregister();
+            for(final ServiceRegistration reg : registrations) {
+                reg.unregister();
+            }
+        }
+
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java?rev=1632486&r1=1632485&r2=1632486&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
(original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
Fri Oct 17 07:58:39 2014
@@ -62,18 +62,18 @@ public class RoundRobinQueueTest extends
     public void setup() throws IOException {
         super.setup();
 
-        // create ordered test queue
-        final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration",
null);
-        final Dictionary<String, Object> orderedProps = new Hashtable<String, Object>();
-        orderedProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
-        orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.TOPIC_ROUND_ROBIN.name());
-        orderedProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC + "/*");
-        orderedProps.put(ConfigurationConstants.PROP_RETRIES, 2);
-        orderedProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
-        orderedProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, MAX_PAR);
-        orderedConfig.update(orderedProps);
+        // create round robin test queue
+        final org.osgi.service.cm.Configuration rrConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration",
null);
+        final Dictionary<String, Object> rrProps = new Hashtable<String, Object>();
+        rrProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
+        rrProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.TOPIC_ROUND_ROBIN.name());
+        rrProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC + "/*");
+        rrProps.put(ConfigurationConstants.PROP_RETRIES, 2);
+        rrProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
+        rrProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, MAX_PAR);
+        rrConfig.update(rrProps);
 
-        queueConfPid = orderedConfig.getPid();
+        queueConfPid = rrConfig.getPid();
 
         this.sleep(1000L);
     }



Mime
View raw message