sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r1632520 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/topology/UpgradeTask.java test/java/org/apache/sling/event/it/ChaosTest.java
Date Fri, 17 Oct 2014 09:11:34 GMT
Author: cziegeler
Date: Fri Oct 17 09:11:33 2014
New Revision: 1632520

URL: http://svn.apache.org/r1632520
Log:
SLING-4048 : Avoid keeping jobs in memory.  Initial version done

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/UpgradeTask.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/UpgradeTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/UpgradeTask.java?rev=1632520&r1=1632519&r2=1632520&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/UpgradeTask.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/UpgradeTask.java
Fri Oct 17 09:11:33 2014
@@ -31,6 +31,7 @@ import org.apache.sling.api.resource.Val
 import org.apache.sling.discovery.InstanceDescription;
 import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.JobTopicTraverser;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
 import org.apache.sling.event.impl.support.Environment;
@@ -58,6 +59,84 @@ public class UpgradeTask {
         if ( topologyCapabilities.isLeader() ) {
             this.processJobsFromPreviousVersions(configuration, topologyCapabilities, queueManager);
         }
+        this.upgradeBridgedJobs(configuration, topologyCapabilities, queueManager);
+    }
+
+    /**
+     * Upgrade bridged jobs.
+     * In previous versions, bridged jobs were stored under a special topic.
+     * This has changed, the jobs are now stored with their real topic.
+     */
+    private void upgradeBridgedJobs(final JobManagerConfiguration configuration,
+            final TopologyCapabilities caps,
+            final QueueConfigurationManager queueManager) {
+        final String path = configuration.getLocalJobsPath() + '/' + JobImpl.PROPERTY_BRIDGED_EVENT;
+        final ResourceResolver resolver = configuration.createResourceResolver();
+        try {
+            final Resource rootResource = resolver.getResource(path);
+            if ( rootResource != null ) {
+                upgradeBridgedJobs(configuration, rootResource, caps, queueManager);
+            }
+            if ( caps.isLeader() ) {
+                final Resource unassignedRoot = resolver.getResource(configuration.getUnassignedJobsPath()
+ '/' + JobImpl.PROPERTY_BRIDGED_EVENT);
+                if ( unassignedRoot != null ) {
+                    upgradeBridgedJobs(configuration, unassignedRoot, caps, queueManager);
+                }
+            }
+        } finally {
+            resolver.close();
+        }
+    }
+
+    /**
+     * Upgrade bridge jobs
+     * @param rootResource  The root resource (topic resource)
+     * @param topologyCapabilities The capabilities
+     * @param queueManager The queue manager
+     */
+    private void upgradeBridgedJobs(final JobManagerConfiguration configuration,
+            final Resource topicResource,
+            final TopologyCapabilities caps,
+            final QueueConfigurationManager queueManager) {
+        final String topicName = topicResource.getName().replace('.', '/');
+        final QueueInfo info = queueManager.getQueueInfo(topicName);
+        JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.ResourceCallback()
{
+
+            @Override
+            public boolean handle(final Resource rsrc) {
+                try {
+                    final ValueMap vm = ResourceHelper.getValueMap(rsrc);
+                    final String targetId = caps.detectTarget(topicName, vm, info);
+
+                    final Map<String, Object> props = new HashMap<String, Object>(vm);
+                    final String newPath;
+                    if ( targetId != null ) {
+                        newPath = configuration.getAssginedJobsPath() + '/' + targetId +
'/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+                        props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+                        props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+                    } else {
+                        newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName()
+ rsrc.getPath().substring(topicResource.getPath().length());
+                        props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+                        props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+                    }
+                    props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+                    try {
+                        ResourceHelper.getOrCreateResource(topicResource.getResourceResolver(),
newPath, props);
+                        topicResource.getResourceResolver().delete(rsrc);
+                        topicResource.getResourceResolver().commit();
+                    } catch ( final PersistenceException pe ) {
+                        logger.warn("Unable to move job from previous version " + rsrc.getPath(),
pe);
+                        topicResource.getResourceResolver().refresh();
+                        topicResource.getResourceResolver().revert();
+                    }
+                } catch (final InstantiationException ie) {
+                    logger.warn("Unable to move job from previous version " + rsrc.getPath(),
ie);
+                    topicResource.getResourceResolver().refresh();
+                    topicResource.getResourceResolver().revert();
+                }
+                return caps.isActive();
+            }
+        });
     }
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java?rev=1632520&r1=1632519&r2=1632520&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
(original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
Fri Oct 17 09:11:33 2014
@@ -18,6 +18,9 @@
  */
 package org.apache.sling.event.it;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Dictionary;
@@ -29,6 +32,10 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.discovery.TopologyView;
 import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobManager;
@@ -42,6 +49,8 @@ import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.junit.PaxExam;
 import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
 import org.ops4j.pax.exam.spi.reactors.PerMethod;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventHandler;
@@ -233,11 +242,67 @@ public class ChaosTest extends AbstractJ
     }
 
     /**
-     * Setup chaos threads
+     * Setup chaos thread(s)
+     *
+     * Chaos is right now created by sending topology changing/changed events randomly
      */
     private void setupChaosThreads(final List<Thread> threads,
             final AtomicLong finishedThreads) {
-        // no chaos for now
+        final List<TopologyView> views = new ArrayList<TopologyView>();
+        // register topology listener
+        final ServiceRegistration reg = this.bc.registerService(TopologyEventListener.class.getName(),
new TopologyEventListener() {
+
+            @Override
+            public void handleTopologyEvent(final TopologyEvent event) {
+                if ( event.getType() == Type.TOPOLOGY_INIT ) {
+                    views.add(event.getNewView());
+                }
+            }
+        }, null);
+        while ( views.isEmpty() ) {
+            this.sleep(10);
+        }
+        reg.unregister();
+        final TopologyView view = views.get(0);
+
+        try {
+            final ServiceReference[] refs = this.bc.getServiceReferences(TopologyEventListener.class.getName(),
"(objectClass=org.apache.sling.event.impl.jobs.topology.TopologyHandler)");
+            assertNotNull(refs);
+            assertEquals(1, refs.length);
+            final TopologyEventListener tel = (TopologyEventListener)bc.getService(refs[0]);
+
+            threads.add(new Thread() {
+
+                private final Random random = new Random();
+
+                @Override
+                public void run() {
+                    final long startTime = System.currentTimeMillis();
+                    // this thread runs 30 seconds longer than the job creation thread
+                    final long endTime = startTime + (DURATION +30) * 1000;
+                    while ( System.currentTimeMillis() < endTime ) {
+                        final int sleepTime = random.nextInt(25) + 15;
+                        try {
+                            Thread.sleep(sleepTime * 1000);
+                        } catch ( final InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                        }
+                        tel.handleTopologyEvent(new TopologyEvent(Type.TOPOLOGY_CHANGING,
view, null));
+                        final int changingTime = random.nextInt(20) + 3;
+                        try {
+                            Thread.sleep(changingTime * 1000);
+                        } catch ( final InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                        }
+                        tel.handleTopologyEvent(new TopologyEvent(Type.TOPOLOGY_CHANGED,
view, view));
+                    }
+                    tel.getClass().getName();
+                    finishedThreads.incrementAndGet();
+                }
+            });
+        } catch (InvalidSyntaxException e) {
+            e.printStackTrace();
+        }
     }
 
     @Test(timeout=DURATION * 3000)



Mime
View raw message