camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1227540 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/RouteService.java main/java/org/apache/camel/model/OnCompletionDefinition.java test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
Date Thu, 05 Jan 2012 10:24:23 GMT
Author: davsclaus
Date: Thu Jan  5 10:24:22 2012
New Revision: 1227540

URL: http://svn.apache.org/viewvc?rev=1227540&view=rev
Log:
CAMEL-4842: Route scoped onCompletion should also shutdown child services when removing routes,
to not leak resources.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=1227540&r1=1227539&r2=1227540&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java Thu Jan 
5 10:24:22 2012
@@ -31,9 +31,11 @@ import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.Service;
+import org.apache.camel.model.OnCompletionDefinition;
 import org.apache.camel.model.OnExceptionDefinition;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.processor.OnCompletionProcessor;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.RoutePolicy;
@@ -338,12 +340,20 @@ public class RouteService extends ChildS
                         services.add((Service) errorHandler);
                     }
                 }
+            } else if (output instanceof OnCompletionDefinition) {
+                OnCompletionDefinition onCompletionDefinition = (OnCompletionDefinition)
output;
+                if (onCompletionDefinition.isRouteScoped()) {
+                    Processor onCompletionProcessor = onCompletionDefinition.getOnCompletion(route.getId());
+                    if (onCompletionProcessor != null && onCompletionProcessor instanceof
Service) {
+                        services.add((Service) onCompletionProcessor);
+                    }
+                }
             }
         }
     }
 
     /**
-     * Gather all child services by navigating the service to recursivly gather all child
services.
+     * Gather all child services by navigating the service to recursively gather all child
services.
      */
     private static void doGetChildServices(Set<Service> services, Service service)
throws Exception {
         services.add(service);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=1227540&r1=1227539&r2=1227540&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
Thu Jan  5 10:24:22 2012
@@ -17,8 +17,11 @@
 package org.apache.camel.model;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 import javax.xml.bind.annotation.XmlAccessType;
@@ -31,6 +34,7 @@ import javax.xml.bind.annotation.XmlTran
 
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
+import org.apache.camel.processor.FatalFallbackErrorHandler;
 import org.apache.camel.processor.OnCompletionProcessor;
 import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.spi.ExecutorServiceManager;
@@ -58,10 +62,28 @@ public class OnCompletionDefinition exte
     private List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
     @XmlTransient
     private ExecutorService executorService;
+    @XmlTransient
+    private Boolean routeScoped;
+    // TODO: in Camel 3.0 the OnCompletionDefinition should not contain state and OnCompletion
processors
+    @XmlTransient
+    private final Map<String, Processor> onCompletions = new HashMap<String, Processor>();
 
     public OnCompletionDefinition() {
     }
 
+    public boolean isRouteScoped() {
+        // is context scoped by default
+        return routeScoped != null ? routeScoped : false;
+    }
+
+    public Processor getOnCompletion(String routeId) {
+        return onCompletions.get(routeId);
+    }
+
+    public Collection<Processor> getOnCompletions() {
+        return onCompletions.values();
+    }
+
     @Override
     public String toString() {
         return "onCompletion[" + getOutputs() + "]";
@@ -84,15 +106,26 @@ public class OnCompletionDefinition exte
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
+        // assign whether this was a route scoped onCompletion or not
+        // we need to know this later when setting the parent, as only route scoped should
have parent
+        // Note: this logic can possible be removed when the Camel routing engine decides
at runtime
+        // to apply onCompletion in a more dynamic fashion than current code base
+        // and therefore is in a better position to decide among context/route scoped OnCompletion
at runtime
+        if (routeScoped == null) {
+            routeScoped = super.getParent() != null;
+        }
+
         if (isOnCompleteOnly() && isOnFailureOnly()) {
             throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot
be true. Only one of them can be true. On node: " + this);
         }
 
         Processor childProcessor = this.createChildProcessor(routeContext, true);
-
         // wrap the on completion route in a unit of work processor
         childProcessor = new UnitOfWorkProcessor(routeContext, childProcessor);
 
+        String id = routeContext.getRoute().getId();
+        onCompletions.put(id, childProcessor);
+
         Predicate when = null;
         if (onWhen != null) {
             when = onWhen.getExpression().createPredicate(routeContext);

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java?rev=1227540&r1=1227539&r2=1227540&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
Thu Jan  5 10:24:22 2012
@@ -304,4 +304,121 @@ public class ManagedRouteAddRemoveTest e
 
         log.info("Shutting down...");
     }
+
+    public void testRouteAddRemoteRouteWithRecipientListAndRouteScopedOnCompletion() throws
Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+        template.sendBody("direct:start", "Hello World");
+        result.assertIsSatisfied();
+
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+
+        // number of producer caches
+        Set<ObjectName> names = mbeanServer.queryNames(on, null);
+        assertEquals(1, names.size());
+
+        log.info("Adding 2nd route");
+
+        // add a 2nd route
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:bar").routeId("bar")
+                    .onCompletion()
+                        .recipientList(header("done"))
+                    .end().end()
+                    .recipientList(header("bar"));
+            }
+        });
+
+        // and send a message to it
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+        getMockEndpoint("mock:done").expectedMessageCount(1);
+
+        Map headers = new HashMap();
+        headers.put("done", "mock:done");
+        headers.put("bar", "mock:bar");
+        template.sendBodyAndHeaders("direct:bar", "Hello World", headers);
+
+        assertMockEndpointsSatisfied();
+
+        // there should be two more producer cache
+        names = mbeanServer.queryNames(on, null);
+        assertEquals(3, names.size());
+
+        // now stop and remove the 2nd route
+        log.info("Stopping 2nd route");
+        context.stopRoute("bar");
+
+        log.info("Removing 2nd route");
+        boolean removed = context.removeRoute("bar");
+        assertTrue(removed);
+
+        // the producer cache should have been removed
+        on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+        names = mbeanServer.queryNames(on, null);
+        assertEquals(1, names.size());
+
+        log.info("Shutting down...");
+    }
+
+    public void testRouteAddRemoteRouteWithRecipientListAndContextScopedOnCompletion() throws
Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+        template.sendBody("direct:start", "Hello World");
+        result.assertIsSatisfied();
+
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+
+        // number of producer caches
+        Set<ObjectName> names = mbeanServer.queryNames(on, null);
+        assertEquals(1, names.size());
+
+        log.info("Adding 2nd route");
+
+        // add a 2nd route
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onCompletion()
+                    .recipientList(header("done"))
+                    .end();
+
+                from("direct:bar").routeId("bar")
+                    .recipientList(header("bar"));
+            }
+        });
+
+        // and send a message to it
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+        getMockEndpoint("mock:done").expectedMessageCount(1);
+
+        Map headers = new HashMap();
+        headers.put("done", "mock:done");
+        headers.put("bar", "mock:bar");
+        template.sendBodyAndHeaders("direct:bar", "Hello World", headers);
+
+        assertMockEndpointsSatisfied();
+
+        // there should be two more producer cache
+        names = mbeanServer.queryNames(on, null);
+        assertEquals(3, names.size());
+
+        // now stop and remove the 2nd route
+        log.info("Stopping 2nd route");
+        context.stopRoute("bar");
+
+        log.info("Removing 2nd route");
+        boolean removed = context.removeRoute("bar");
+        assertTrue(removed);
+
+        // only the producer cache from the 2nd route should have been removed (the on completion
is context scoped)
+        on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+        names = mbeanServer.queryNames(on, null);
+        assertEquals(2, names.size());
+
+        log.info("Shutting down...");
+    }
 }



Mime
View raw message