Return-Path: X-Original-To: apmail-sling-commits-archive@www.apache.org Delivered-To: apmail-sling-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B7B5210E9E for ; Fri, 17 Oct 2014 07:59:04 +0000 (UTC) Received: (qmail 4647 invoked by uid 500); 17 Oct 2014 07:59:02 -0000 Delivered-To: apmail-sling-commits-archive@sling.apache.org Received: (qmail 4412 invoked by uid 500); 17 Oct 2014 07:59:02 -0000 Mailing-List: contact commits-help@sling.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@sling.apache.org Delivered-To: mailing list commits@sling.apache.org Received: (qmail 4324 invoked by uid 99); 17 Oct 2014 07:59:02 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Oct 2014 07:59:02 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Oct 2014 07:59:00 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 646EB2388B6C; Fri, 17 Oct 2014 07:58:40 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1632486 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/timed/ test/java/org/apache/sling/event/it/ Date: Fri, 17 Oct 2014 07:58:40 -0000 To: commits@sling.apache.org From: cziegeler@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20141017075840.646EB2388B6C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cziegeler Date: Fri Oct 17 07:58:39 2014 New Revision: 1632486 URL: http://svn.apache.org/r1632486 Log: SLING-4048 : Avoid keeping jobs in memory. Increase test covers, update test setup to more recent bundles (WiP) Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java (with props) Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/TimedEventSender.java sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/TimedEventSender.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/TimedEventSender.java?rev=1632486&r1=1632485&r2=1632486&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/TimedEventSender.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/TimedEventSender.java Fri Oct 17 07:58:39 2014 @@ -552,6 +552,15 @@ public class TimedEventSender properties.remove(EventConstants.EVENT_TOPIC); properties.put(TimedEventStatusProvider.PROPERTY_EVENT_ID, topic.replace('/', '.') + '/' + eventResource.getName()); + final Object date = properties.get(EventUtil.PROPERTY_TIMED_EVENT_DATE); + if ( date != null && !(date instanceof Date) ) { + if ( date instanceof Calendar ) { + properties.put(EventUtil.PROPERTY_TIMED_EVENT_DATE, ((Calendar)date).getTime() ); + } else { + logger.error("Unable to read event: date property is neither date nor calendar!"); + return null; + } + } try { result.event = new Event(topic, properties); return result; Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java?rev=1632486&r1=1632485&r2=1632486&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java (original) +++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java Fri Oct 17 07:58:39 2014 @@ -110,7 +110,7 @@ public abstract class AbstractJobHandlin mavenBundle("org.apache.tika", "tika-bundle", "1.2"), mavenBundle("org.apache.felix", "org.apache.felix.http.jetty", "2.2.2"), - mavenBundle("org.apache.felix", "org.apache.felix.eventadmin", "1.3.2"), + mavenBundle("org.apache.felix", "org.apache.felix.eventadmin", "1.4.2"), mavenBundle("org.apache.felix", "org.apache.felix.scr", "1.8.2"), mavenBundle("org.apache.felix", "org.apache.felix.configadmin", "1.8.0"), mavenBundle("org.apache.felix", "org.apache.felix.inventory", "1.0.4"), @@ -120,22 +120,22 @@ public abstract class AbstractJobHandlin mavenBundle("org.apache.sling", "org.apache.sling.commons.json", "2.0.6"), mavenBundle("org.apache.sling", "org.apache.sling.commons.mime", "2.1.4"), mavenBundle("org.apache.sling", "org.apache.sling.commons.classloader", "1.3.2"), - mavenBundle("org.apache.sling", "org.apache.sling.commons.scheduler", "2.4.2"), + mavenBundle("org.apache.sling", "org.apache.sling.commons.scheduler", "2.4.4"), mavenBundle("org.apache.sling", "org.apache.sling.commons.threads", "3.2.0"), mavenBundle("org.apache.sling", "org.apache.sling.launchpad.api", "1.1.0"), - mavenBundle("org.apache.sling", "org.apache.sling.auth.core", "1.1.6"), - mavenBundle("org.apache.sling", "org.apache.sling.discovery.api", "1.0.0"), + mavenBundle("org.apache.sling", "org.apache.sling.auth.core", "1.3.0"), + mavenBundle("org.apache.sling", "org.apache.sling.discovery.api", "1.0.2"), mavenBundle("org.apache.sling", "org.apache.sling.discovery.standalone", "1.0.0"), - mavenBundle("org.apache.sling", "org.apache.sling.api", "2.7.0"), - mavenBundle("org.apache.sling", "org.apache.sling.settings", "1.3.0"), - mavenBundle("org.apache.sling", "org.apache.sling.resourceresolver", "1.1.0"), - mavenBundle("org.apache.sling", "org.apache.sling.adapter", "2.1.0"), - mavenBundle("org.apache.sling", "org.apache.sling.jcr.resource", "2.3.6"), - mavenBundle("org.apache.sling", "org.apache.sling.jcr.classloader", "3.2.0"), + mavenBundle("org.apache.sling", "org.apache.sling.api", "2.8.0"), + mavenBundle("org.apache.sling", "org.apache.sling.settings", "1.3.4"), + mavenBundle("org.apache.sling", "org.apache.sling.resourceresolver", "1.1.6"), + mavenBundle("org.apache.sling", "org.apache.sling.adapter", "2.1.2"), + mavenBundle("org.apache.sling", "org.apache.sling.jcr.resource", "2.3.8"), + mavenBundle("org.apache.sling", "org.apache.sling.jcr.classloader", "3.2.2"), mavenBundle("org.apache.sling", "org.apache.sling.jcr.contentloader", "2.1.8"), - mavenBundle("org.apache.sling", "org.apache.sling.engine", "2.3.2"), + mavenBundle("org.apache.sling", "org.apache.sling.engine", "2.3.6"), mavenBundle("org.apache.sling", "org.apache.sling.serviceusermapper", "1.0.0"), mavenBundle("org.apache.sling", "org.apache.sling.jcr.jcr-wrapper", "2.0.0"), @@ -168,6 +168,7 @@ public abstract class AbstractJobHandlin try { Thread.sleep(time); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // ignore } } Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java?rev=1632486&view=auto ============================================================================== --- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java (added) +++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java Fri Oct 17 07:58:39 2014 @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.it; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Dictionary; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.sling.event.impl.jobs.config.ConfigurationConstants; +import org.apache.sling.event.jobs.Job; +import org.apache.sling.event.jobs.JobManager; +import org.apache.sling.event.jobs.NotificationConstants; +import org.apache.sling.event.jobs.QueueConfiguration; +import org.apache.sling.event.jobs.consumer.JobConsumer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.ops4j.pax.exam.junit.PaxExam; +import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy; +import org.ops4j.pax.exam.spi.reactors.PerMethod; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventHandler; + +@RunWith(PaxExam.class) +@ExamReactorStrategy(PerMethod.class) +public class ChaosTest extends AbstractJobHandlingTest { + + /** Duration for firing jobs in seconds. */ + private static final long DURATION = 3 * 60; + + private static final int NUM_ORDERED_THREADS = 3; + private static final int NUM_PARALLEL_THREADS = 6; + private static final int NUM_ROUND_THREADS = 6; + + private static final int NUM_ORDERED_TOPICS = 2; + private static final int NUM_PARALLEL_TOPICS = 8; + private static final int NUM_ROUND_TOPICS = 8; + + private static final String ORDERED_TOPIC_PREFIX = "sling/chaos/ordered/"; + private static final String PARALLEL_TOPIC_PREFIX = "sling/chaos/parallel/"; + private static final String ROUND_TOPIC_PREFIX = "sling/chaos/round/"; + + private static final String[] ORDERED_TOPICS = new String[NUM_ORDERED_TOPICS]; + private static final String[] PARALLEL_TOPICS = new String[NUM_PARALLEL_TOPICS]; + private static final String[] ROUND_TOPICS = new String[NUM_ROUND_TOPICS]; + + static { + for(int i=0; i orderedProps = new Hashtable(); + orderedProps.put(ConfigurationConstants.PROP_NAME, "chaos-ordered"); + orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name()); + orderedProps.put(ConfigurationConstants.PROP_TOPICS, ORDERED_TOPICS); + orderedProps.put(ConfigurationConstants.PROP_RETRIES, 2); + orderedProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L); + orderedConfig.update(orderedProps); + + orderedQueueConfPid = orderedConfig.getPid(); + + // create round robin test queue + final org.osgi.service.cm.Configuration rrConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null); + final Dictionary rrProps = new Hashtable(); + rrProps.put(ConfigurationConstants.PROP_NAME, "chaos-roundrobin"); + rrProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.TOPIC_ROUND_ROBIN.name()); + rrProps.put(ConfigurationConstants.PROP_TOPICS, ROUND_TOPICS); + rrProps.put(ConfigurationConstants.PROP_RETRIES, 2); + rrProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L); + rrProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, 5); + rrConfig.update(rrProps); + + topicRRQueueConfPid = rrConfig.getPid(); + + this.sleep(1000L); + } + + @After + public void cleanUp() throws IOException { + this.removeConfiguration(this.orderedQueueConfPid); + this.removeConfiguration(this.topicRRQueueConfPid); + super.cleanup(); + } + + /** + * Setup consumers + */ + private void setupJobConsumers(final List registrations) { + for(int i=0; i created; + + final AtomicLong finishedThreads; + + public CreateJobThread(final JobManager jobManager, + final String[] topics, + final Map created, + final AtomicLong finishedThreads) { + this.topics = topics; + this.jobManager = jobManager; + this.created = created; + this.finishedThreads = finishedThreads; + } + + @Override + public void run() { + int index = 0; + final long startTime = System.currentTimeMillis(); + final long endTime = startTime + DURATION * 1000; + while ( System.currentTimeMillis() < endTime ) { + final String topic = topics[index]; + jobManager.addJob(topic, null); + created.get(topic).incrementAndGet(); + + index++; + if ( index == topics.length ) { + index = 0; + } + + final int sleepTime = random.nextInt(200); + try { + Thread.sleep(sleepTime); + } catch ( final InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + finishedThreads.incrementAndGet(); + } + + } + + /** + * Setup job creation threads + */ + private void setupJobCreationThreads(final List threads, + final JobManager jobManager, + final Map created, + final AtomicLong finishedThreads) { + for(int i=0;i threads, + final AtomicLong finishedThreads) { + // no chaos for now + } + + @Test(timeout=DURATION * 3000) + public void testDoChaos() throws Exception { + final JobManager jobManager = this.getJobManager(); + + // setup created map + final Map created = new HashMap(); + final Map finished = new HashMap(); + final List topics = new ArrayList(); + for(int i=0;i registrations = new ArrayList(); + final List threads = new ArrayList(); + final AtomicLong finishedThreads = new AtomicLong(); + + final ServiceRegistration eventHandler = this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED, + new EventHandler() { + + @Override + public void handleEvent(final Event event) { + final String topic = (String) event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC); + finished.get(topic).incrementAndGet(); + } + }); + try { + // setup job consumers + this.setupJobConsumers(registrations); + + // setup job creation tests + this.setupJobCreationThreads(threads, jobManager, created, finishedThreads); + + this.setupChaosThreads(threads, finishedThreads); + + System.out.println("Starting threads..."); + // start threads + for(final Thread t : threads) { + t.start(); + } + + System.out.println("Sleeping for " + DURATION + " seconds to wait for threads to finish..."); + // for sure we can sleep for the duration + this.sleep(DURATION * 1000); + + System.out.println("Polling for threads to finish..."); + // wait until threads are finished + while ( finishedThreads.get() < threads.size() ) { + this.sleep(100); + } + + System.out.println("Waiting for job handling to finish..."); + while ( !topics.isEmpty() ) { + final Iterator iter = topics.iterator(); + while ( iter.hasNext() ) { + final String topic = iter.next(); + if ( finished.get(topic).get() == created.get(topic).get() ) { + iter.remove(); + } + } + this.sleep(100); + } + } finally { + eventHandler.unregister(); + for(final ServiceRegistration reg : registrations) { + reg.unregister(); + } + } + + } +} Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java?rev=1632486&r1=1632485&r2=1632486&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java (original) +++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java Fri Oct 17 07:58:39 2014 @@ -62,18 +62,18 @@ public class RoundRobinQueueTest extends public void setup() throws IOException { super.setup(); - // create ordered test queue - final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null); - final Dictionary orderedProps = new Hashtable(); - orderedProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME); - orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.TOPIC_ROUND_ROBIN.name()); - orderedProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC + "/*"); - orderedProps.put(ConfigurationConstants.PROP_RETRIES, 2); - orderedProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L); - orderedProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, MAX_PAR); - orderedConfig.update(orderedProps); + // create round robin test queue + final org.osgi.service.cm.Configuration rrConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null); + final Dictionary rrProps = new Hashtable(); + rrProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME); + rrProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.TOPIC_ROUND_ROBIN.name()); + rrProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC + "/*"); + rrProps.put(ConfigurationConstants.PROP_RETRIES, 2); + rrProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L); + rrProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, MAX_PAR); + rrConfig.update(rrProps); - queueConfPid = orderedConfig.getPid(); + queueConfPid = rrConfig.getPid(); this.sleep(1000L); }