camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1300218 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/seda/ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/main/java/org/apache/camel/processor/...
Date Tue, 13 Mar 2012 16:20:45 GMT
Author: davsclaus
Date: Tue Mar 13 16:20:43 2012
New Revision: 1300218

URL: http://svn.apache.org/viewvc?rev=1300218&view=rev
Log:
CAMEL-5079: EIPs using thread pools will now eager shutdown thread pools if thread pool was created only for the EIP. This avoids leaks when adding and removing a lot of routes etc.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java
      - copied, changed from r1300065, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveContextScopedErrorHandlerTest.java
Removed:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceRefTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateTimeoutWithExecutorServiceRefTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateTimeoutWithExecutorServiceRefTest.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Tue Mar 13 16:20:43 2012
@@ -133,7 +133,7 @@ public class SedaEndpoint extends Defaul
             }
             // create multicast processor
             multicastStarted = false;
-            consumerMulticastProcessor = new MulticastProcessor(getCamelContext(), processors, null, true, multicastExecutor, false, false, 0, null, false);
+            consumerMulticastProcessor = new MulticastProcessor(getCamelContext(), processors, null, true, multicastExecutor, false, false, false, 0, null, false);
         } else {
             // not needed
             consumerMulticastProcessor = null;
@@ -337,6 +337,11 @@ public class SedaEndpoint extends Defaul
         if (getComponent() != null) {
             getComponent().onShutdownEndpoint(this);
         }
+        // shutdown thread pool if it was in use
+        if (multicastExecutor != null) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(multicastExecutor);
+            multicastExecutor = null;
+        }
         super.doShutdown();
     }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Tue Mar 13 16:20:43 2012
@@ -159,26 +159,41 @@ public class AggregateDefinition extends
         Expression correlation = getExpression().createExpression(routeContext);
         AggregationStrategy strategy = createAggregationStrategy(routeContext);
 
-        executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", this, isParallelProcessing());
-        if (executorService == null && !isParallelProcessing()) {
+        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
+        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", this, isParallelProcessing());
+        if (threadPool == null && !isParallelProcessing()) {
             // executor service is mandatory for the Aggregator
             // we do not run in parallel mode, but use a synchronous executor, so we run in current thread
-            executorService = new SynchronousExecutorService();
+            threadPool = new SynchronousExecutorService();
+            shutdownThreadPool = true;
         }
-       
-        if (timeoutCheckerExecutorServiceRef != null && timeoutCheckerExecutorService == null) {
-            timeoutCheckerExecutorService = getConfiguredScheduledExecutorService(routeContext);
-        }
-        AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), processor, correlation, strategy, executorService);
+
+        AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), processor,
+                correlation, strategy, threadPool, shutdownThreadPool);
 
         AggregationRepository repository = createAggregationRepository(routeContext);
         if (repository != null) {
             answer.setAggregationRepository(repository);
         }
-        
-        if (getTimeoutCheckerExecutorService() != null) {
-            answer.setTimeoutCheckerExecutorService(timeoutCheckerExecutorService);
+
+        // this EIP supports using a shared timeout checker thread pool or fallback to create a new thread pool
+        boolean shutdownTimeoutThreadPool = false;
+        ScheduledExecutorService timeoutThreadPool = timeoutCheckerExecutorService;
+        if (timeoutThreadPool == null && timeoutCheckerExecutorServiceRef != null) {
+            // lookup existing thread pool
+            timeoutThreadPool = routeContext.getCamelContext().getRegistry().lookup(timeoutCheckerExecutorServiceRef, ScheduledExecutorService.class);
+            if (timeoutThreadPool == null) {
+                // then create a thread pool assuming the ref is a thread pool profile id
+                timeoutThreadPool = routeContext.getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this,
+                        AggregateProcessor.AGGREGATE_TIMEOUT_CHECKER, timeoutCheckerExecutorServiceRef);
+                if (timeoutThreadPool == null) {
+                    throw new IllegalArgumentException("ExecutorServiceRef " + timeoutCheckerExecutorServiceRef + " not found in registry or as a thread pool profile.");
+                }
+                shutdownTimeoutThreadPool = true;
+            }
         }
+        answer.setTimeoutCheckerExecutorService(timeoutThreadPool);
+        answer.setShutdownTimeoutCheckerExecutorService(shutdownTimeoutThreadPool);
 
         // set other options
         answer.setParallelProcessing(isParallelProcessing());
@@ -225,28 +240,6 @@ public class AggregateDefinition extends
         return answer;
     }
 
-    private ScheduledExecutorService getConfiguredScheduledExecutorService(RouteContext routeContext) {
-        // TODO: maybe rather than this one-off method to support an executorService & scheduledExecutorService for the aggregator,
-        // create ScheduledExecutorServiceAwareDefinition and the change other definitions that currently use ScheduledExecutorServices to
-        // use that one instead of the more generic ExecutorServiceAwareDefinition
-        ScheduledExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(timeoutCheckerExecutorServiceRef, ScheduledExecutorService.class);
-        if (answer == null) {
-            ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
-            // then create a thread pool assuming the ref is a thread pool profile id                
-            ThreadPoolProfile profile = manager.getThreadPoolProfile(timeoutCheckerExecutorServiceRef);
-            if (profile != null) {
-                // okay we need to grab the pool size from the ref
-                Integer poolSize = profile.getPoolSize();
-                if (poolSize == null) {
-                    // fallback and use the default pool size, if none was set on the profile
-                    poolSize = manager.getDefaultThreadPoolProfile().getPoolSize();
-                }
-                answer = manager.newScheduledThreadPool(this, "Aggregator", poolSize);
-            }
-        }
-        return answer;
-    }
-
     @Override
     protected void configureChild(ProcessorDefinition<?> output) {
         if (expression != null && expression instanceof ExpressionClause) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java Tue Mar 13 16:20:43 2012
@@ -78,9 +78,10 @@ public class DelayDefinition extends Exp
         Processor childProcessor = this.createChildProcessor(routeContext, false);
         Expression delay = createAbsoluteTimeDelayExpression(routeContext);
 
-        ScheduledExecutorService scheduled = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Delay", this, isAsyncDelayed());
+        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isAsyncDelayed());
+        ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Delay", this, isAsyncDelayed());
 
-        Delayer answer = new Delayer(childProcessor, delay, scheduled);
+        Delayer answer = new Delayer(routeContext.getCamelContext(), childProcessor, delay, threadPool, shutdownThreadPool);
         if (getAsyncDelayed() != null) {
             answer.setAsyncDelayed(getAsyncDelayed());
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java Tue Mar 13 16:20:43 2012
@@ -216,7 +216,8 @@ public class MulticastDefinition extends
             aggregationStrategy = new UseLatestAggregationStrategy();
         }
 
-        executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing());
+        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
+        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing());
 
         long timeout = getTimeout() != null ? getTimeout() : 0;
         if (timeout > 0 && !isParallelProcessing()) {
@@ -227,7 +228,7 @@ public class MulticastDefinition extends
         }
 
         MulticastProcessor answer = new MulticastProcessor(routeContext.getCamelContext(), list, aggregationStrategy, isParallelProcessing(),
-                                      executorService, isStreaming(), isStopOnException(), timeout, onPrepare, isShareUnitOfWork());
+                                      threadPool, shutdownThreadPool, isStreaming(), isStopOnException(), timeout, onPrepare, isShareUnitOfWork());
         if (isShareUnitOfWork()) {
             // wrap answer in a sub unit of work, since we share the unit of work
             return new SubUnitOfWorkProcessor(answer);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java Tue Mar 13 16:20:43 2012
@@ -130,12 +130,13 @@ public class OnCompletionDefinition exte
         }
 
         // executor service is mandatory for on completion
-        executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, true);
+        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
+        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, true);
 
         // should be false by default
         boolean original = getUseOriginalMessagePolicy() != null ? getUseOriginalMessagePolicy() : false;
         OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), childProcessor,
-                executorService, isOnCompleteOnly(), isOnFailureOnly(), when, original);
+                threadPool, shutdownThreadPool, isOnCompleteOnly(), isOnFailureOnly(), when, original);
         return answer;
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java Tue Mar 13 16:20:43 2012
@@ -206,6 +206,69 @@ public final class ProcessorDefinitionHe
     }
 
     /**
+     * Determines whether a new thread pool will be created or not.
+     * <p/>
+     * This is used to know if a new thread pool will be created, and therefore is not shared by others, and therefore
+     * exclusive to the definition.
+     *
+     * @param routeContext   the route context
+     * @param definition     the node definition which may leverage executor service.
+     * @param useDefault     whether to fallback and use a default thread pool, if no explicit configured
+     * @return <tt>true</tt> if a new thread pool will be created, <tt>false</tt> if not
+     * @see #getConfiguredExecutorService(org.apache.camel.spi.RouteContext, String, ExecutorServiceAwareDefinition, boolean)
+     */
+    public static boolean willCreateNewThreadPool(RouteContext routeContext, ExecutorServiceAwareDefinition<?> definition, boolean useDefault) {
+        ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
+        ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
+        
+        if (definition.getExecutorService() != null) {
+            // no there is a custom thread pool configured
+            return false;
+        } else if (definition.getExecutorServiceRef() != null) {
+            ExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(definition.getExecutorServiceRef(), ExecutorService.class);
+            // if no existing thread pool, then we will have to create a new thread pool
+            return answer == null;
+        } else if (useDefault) {
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Will lookup in {@link org.apache.camel.spi.Registry} for a {@link ExecutorService} registered with the given
+     * <tt>executorServiceRef</tt> name.
+     * <p/>
+     * This method will lookup for configured thread pool in the following order
+     * <ul>
+     *   <li>from the {@link org.apache.camel.spi.Registry} if found</li>
+     *   <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li>
+     *   <li>if none found, then <tt>null</tt> is returned.</li>
+     * </ul>
+     * @param routeContext   the route context
+     * @param name           name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
+     *                       is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
+     * @param source         the source to use the thread pool
+     * @param executorServiceRef reference name of the thread pool
+     * @return the executor service, or <tt>null</tt> if none was found.
+     */
+    public static ExecutorService lookupExecutorServiceRef(RouteContext routeContext, String name,
+                                                           Object source, String executorServiceRef) {
+
+        ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
+        ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
+        ObjectHelper.notNull(executorServiceRef, "executorServiceRef");
+
+        // lookup in registry first and use existing thread pool if exists
+        ExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(executorServiceRef, ExecutorService.class);
+        if (answer == null) {
+            // then create a thread pool assuming the ref is a thread pool profile id
+            answer = manager.newThreadPool(source, name, executorServiceRef);
+        }
+        return answer;
+    }
+
+    /**
      * Will lookup and get the configured {@link java.util.concurrent.ExecutorService} from the given definition.
      * <p/>
      * This method will lookup for configured thread pool in the following order
@@ -237,15 +300,10 @@ public final class ProcessorDefinitionHe
             return definition.getExecutorService();
         } else if (definition.getExecutorServiceRef() != null) {
             // lookup in registry first and use existing thread pool if exists
-            ExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(definition.getExecutorServiceRef(), ExecutorService.class);
-            if (answer == null) {
-                // then create a thread pool assuming the ref is a thread pool profile id
-                answer = manager.newThreadPool(definition, name, definition.getExecutorServiceRef());
-            }
+            ExecutorService answer = lookupExecutorServiceRef(routeContext, name, definition, definition.getExecutorServiceRef());
             if (answer == null) {
                 throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry or as a thread pool profile.");
             }
-            return answer;
         } else if (useDefault) {
             return manager.newDefaultThreadPool(definition, name);
         }
@@ -254,6 +312,39 @@ public final class ProcessorDefinitionHe
     }
 
     /**
+     * Will lookup in {@link org.apache.camel.spi.Registry} for a {@link ScheduledExecutorService} registered with the given
+     * <tt>executorServiceRef</tt> name.
+     * <p/>
+     * This method will lookup for configured thread pool in the following order
+     * <ul>
+     *   <li>from the {@link org.apache.camel.spi.Registry} if found</li>
+     *   <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li>
+     *   <li>if none found, then <tt>null</tt> is returned.</li>
+     * </ul>
+     * @param routeContext   the route context
+     * @param name           name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
+     *                       is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
+     * @param source         the source to use the thread pool
+     * @param executorServiceRef reference name of the thread pool
+     * @return the executor service, or <tt>null</tt> if none was found.
+     */
+    public static ScheduledExecutorService lookupScheduledExecutorServiceRef(RouteContext routeContext, String name,
+                                                                             Object source, String executorServiceRef) {
+
+        ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
+        ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
+        ObjectHelper.notNull(executorServiceRef, "executorServiceRef");
+
+        // lookup in registry first and use existing thread pool if exists
+        ScheduledExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(executorServiceRef, ScheduledExecutorService.class);
+        if (answer == null) {
+            // then create a thread pool assuming the ref is a thread pool profile id
+            answer = manager.newScheduledThreadPool(source, name, executorServiceRef);
+        }
+        return answer;
+    }
+
+    /**
      * Will lookup and get the configured {@link java.util.concurrent.ScheduledExecutorService} from the given definition.
      * <p/>
      * This method will lookup for configured thread pool in the following order
@@ -289,14 +380,7 @@ public final class ProcessorDefinitionHe
             }
             throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " is not an ScheduledExecutorService instance");
         } else if (definition.getExecutorServiceRef() != null) {
-            ScheduledExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(definition.getExecutorServiceRef(), ScheduledExecutorService.class);
-            if (answer == null) {
-                // then create a thread pool assuming the ref is a thread pool profile id
-                ThreadPoolProfile profile = manager.getThreadPoolProfile(definition.getExecutorServiceRef());
-                if (profile != null) {
-                    answer = manager.newScheduledThreadPool(definition, name, profile);
-                }
-            }
+            ScheduledExecutorService answer = lookupScheduledExecutorServiceRef(routeContext, name, definition, definition.getExecutorServiceRef());
             if (answer == null) {
                 throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry or as a thread pool profile.");
             }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Tue Mar 13 16:20:43 2012
@@ -128,8 +128,10 @@ public class RecipientListDefinition<Typ
             answer.setTimeout(getTimeout());
         }
 
-        executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "RecipientList", this, isParallelProcessing());
-        answer.setExecutorService(executorService);
+        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
+        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "RecipientList", this, isParallelProcessing());
+        answer.setExecutorService(threadPool);
+        answer.setShutdownExecutorService(shutdownThreadPool);
         long timeout = getTimeout() != null ? getTimeout() : 0;
         if (timeout > 0 && !isParallelProcessing()) {
             throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Tue Mar 13 16:20:43 2012
@@ -94,7 +94,9 @@ public class SplitDefinition extends Exp
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         Processor childProcessor = this.createChildProcessor(routeContext, true);
         aggregationStrategy = createAggregationStrategy(routeContext);
-        executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Split", this, isParallelProcessing());
+
+        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
+        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Split", this, isParallelProcessing());
 
         long timeout = getTimeout() != null ? getTimeout() : 0;
         if (timeout > 0 && !isParallelProcessing()) {
@@ -107,7 +109,7 @@ public class SplitDefinition extends Exp
         Expression exp = getExpression().createExpression(routeContext);
 
         Splitter answer = new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy,
-                            isParallelProcessing(), executorService, isStreaming(), isStopOnException(),
+                            isParallelProcessing(), threadPool, shutdownThreadPool, isStreaming(), isStopOnException(),
                             timeout, onPrepare, isShareUnitOfWork());
         if (isShareUnitOfWork()) {
             // wrap answer in a sub unit of work, since we share the unit of work

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Tue Mar 13 16:20:43 2012
@@ -80,9 +80,10 @@ public class ThreadsDefinition extends O
         // the threads name
         String name = getThreadName() != null ? getThreadName() : "Threads";
         // prefer any explicit configured executor service
-        executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this, false);
+        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
+        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this, false);
         // if no explicit then create from the options
-        if (executorService == null) {
+        if (threadPool == null) {
             ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
             // create the thread pool using a builder
             ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name)
@@ -92,10 +93,11 @@ public class ThreadsDefinition extends O
                     .maxQueueSize(getMaxQueueSize())
                     .rejectedPolicy(getRejectedPolicy())
                     .build();
-            executorService = manager.newThreadPool(this, name, profile);
+            threadPool = manager.newThreadPool(this, name, profile);
+            shutdownThreadPool = true;
         }
 
-        ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), executorService);
+        ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), threadPool, shutdownThreadPool);
         if (getCallerRunsWhenRejected() == null) {
             // should be true by default
             thread.setCallerRunsWhenRejected(true);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java Tue Mar 13 16:20:43 2012
@@ -83,13 +83,14 @@ public class ThrottleDefinition extends 
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         Processor childProcessor = this.createChildProcessor(routeContext, true);
 
-        ScheduledExecutorService scheduled = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, isAsyncDelayed());
+        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isAsyncDelayed());
+        ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, isAsyncDelayed());
 
         // should be default 1000 millis
         long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L;
         Expression maxRequestsExpression = createMaxRequestsPerPeriodExpression(routeContext);
 
-        Throttler answer = new Throttler(childProcessor, maxRequestsExpression, period, scheduled);
+        Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool);
 
         if (getAsyncDelayed() != null) {
             answer.setAsyncDelayed(getAsyncDelayed());

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java Tue Mar 13 16:20:43 2012
@@ -83,9 +83,9 @@ public class WireTapDefinition<Type exte
         Endpoint endpoint = resolveEndpoint(routeContext);
 
         // executor service is mandatory for wire tap
-        executorService = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "WireTap", this, true);
-
-        WireTapProcessor answer = new WireTapProcessor(endpoint, getPattern(), executorService);
+        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
+        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "WireTap", this, true);
+        WireTapProcessor answer = new WireTapProcessor(endpoint, getPattern(), threadPool, shutdownThreadPool);
 
         answer.setCopy(isCopy());
         if (newExchangeProcessorRef != null) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java Tue Mar 13 16:20:43 2012
@@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledExe
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.util.ObjectHelper;
@@ -37,7 +38,9 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
     protected final transient Logger log = LoggerFactory.getLogger(getClass());
+    private final CamelContext camelContext;
     private final ScheduledExecutorService executorService;
+    private final boolean shutdownExecutorService;
     private boolean asyncDelayed;
     private boolean callerRunsWhenRejected = true;
 
@@ -72,13 +75,15 @@ public abstract class DelayProcessorSupp
         }
     }
 
-    public DelayProcessorSupport(Processor processor) {
-        this(processor, null);
+    public DelayProcessorSupport(CamelContext camelContext, Processor processor) {
+        this(camelContext, processor, null, false);
     }
 
-    public DelayProcessorSupport(Processor processor, ScheduledExecutorService executorService) {
+    public DelayProcessorSupport(CamelContext camelContext, Processor processor, ScheduledExecutorService executorService, boolean shutdownExecutorService) {
         super(processor);
+        this.camelContext = camelContext;
         this.executorService = executorService;
+        this.shutdownExecutorService = shutdownExecutorService;
     }
 
     @Override
@@ -216,4 +221,12 @@ public abstract class DelayProcessorSupp
         }
         super.doStart();
     }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        if (shutdownExecutorService && executorService != null) {
+            camelContext.getExecutorServiceManager().shutdownNow(executorService);
+        }
+        super.doShutdown();
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java Tue Mar 13 16:20:43 2012
@@ -18,6 +18,7 @@ package org.apache.camel.processor;
 
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
@@ -36,8 +37,9 @@ public class Delayer extends DelayProces
     private Expression delay;
     private long delayValue;
 
-    public Delayer(Processor processor, Expression delay, ScheduledExecutorService executorService) {
-        super(processor, executorService);
+    public Delayer(CamelContext camelContext, Processor processor, Expression delay,
+                   ScheduledExecutorService executorService, boolean shutdownExecutorService) {
+        super(camelContext, processor, executorService, shutdownExecutorService);
         this.delay = delay;
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Tue Mar 13 16:20:43 2012
@@ -147,6 +147,7 @@ public class MulticastProcessor extends 
     private final boolean streaming;
     private final boolean stopOnException;
     private final ExecutorService executorService;
+    private final boolean shutdownExecutorService;
     private ExecutorService aggregateExecutorService;
     private final long timeout;
     private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, Processor>();
@@ -157,18 +158,18 @@ public class MulticastProcessor extends 
     }
 
     public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
-        this(camelContext, processors, aggregationStrategy, false, null, false, false, 0, null, false);
+        this(camelContext, processors, aggregationStrategy, false, null, false, false, false, 0, null, false);
     }
 
     public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
-                              boolean parallelProcessing, ExecutorService executorService, boolean streaming,
-                              boolean stopOnException, long timeout, Processor onPrepare,
-                              boolean shareUnitOfWork) {
+                              boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
+                              boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) {
         notNull(camelContext, "camelContext");
         this.camelContext = camelContext;
         this.processors = processors;
         this.aggregationStrategy = aggregationStrategy;
         this.executorService = executorService;
+        this.shutdownExecutorService = shutdownExecutorService;
         this.streaming = streaming;
         this.stopOnException = stopOnException;
         // must enable parallel if executor service is provided
@@ -953,6 +954,10 @@ public class MulticastProcessor extends 
         ServiceHelper.stopAndShutdownServices(processors, errorHandlers);
         // only clear error handlers when shutting down
         errorHandlers.clear();
+
+        if (shutdownExecutorService && executorService != null) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+        }
     }
 
     protected static void setToEndpoint(Exchange exchange, Processor processor) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java Tue Mar 13 16:20:43 2012
@@ -45,12 +45,13 @@ public class OnCompletionProcessor exten
     private final CamelContext camelContext;
     private final Processor processor;
     private final ExecutorService executorService;
+    private final boolean shutdownExecutorService;
     private final boolean onCompleteOnly;
     private final boolean onFailureOnly;
     private final Predicate onWhen;
     private final boolean useOriginalBody;
 
-    public OnCompletionProcessor(CamelContext camelContext, Processor processor, ExecutorService executorService,
+    public OnCompletionProcessor(CamelContext camelContext, Processor processor, ExecutorService executorService, boolean shutdownExecutorService,
                                  boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen, boolean useOriginalBody) {
         notNull(camelContext, "camelContext");
         notNull(processor, "processor");
@@ -58,6 +59,7 @@ public class OnCompletionProcessor exten
         // wrap processor in UnitOfWork so what we send out runs in a UoW
         this.processor = new UnitOfWorkProcessor(processor);
         this.executorService = executorService;
+        this.shutdownExecutorService = shutdownExecutorService;
         this.onCompleteOnly = onCompleteOnly;
         this.onFailureOnly = onFailureOnly;
         this.onWhen = onWhen;
@@ -77,6 +79,9 @@ public class OnCompletionProcessor exten
     @Override
     protected void doShutdown() throws Exception {
         ServiceHelper.stopAndShutdownService(processor);
+        if (shutdownExecutorService) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+        }
     }
 
     public CamelContext getCamelContext() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Tue Mar 13 16:20:43 2012
@@ -58,6 +58,7 @@ public class RecipientList extends Servi
     private Processor onPrepare;
     private boolean shareUnitOfWork;
     private ExecutorService executorService;
+    private boolean shutdownExecutorService;
     private ExecutorService aggregateExecutorService;
     private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy();
 
@@ -118,8 +119,8 @@ public class RecipientList extends Servi
         Iterator<Object> iter = ObjectHelper.createIterator(recipientList, delimiter);
 
         RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(),
-                                                                isParallelProcessing(), getExecutorService(), isStreaming(),
-                                                                isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork()) {
+                isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
+                isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork()) {
             @Override
             protected synchronized ExecutorService createAggregateExecutorService(String name) {
                 // use a shared executor service to avoid creating new thread pools
@@ -217,6 +218,14 @@ public class RecipientList extends Servi
         this.executorService = executorService;
     }
 
+    public boolean isShutdownExecutorService() {
+        return shutdownExecutorService;
+    }
+
+    public void setShutdownExecutorService(boolean shutdownExecutorService) {
+        this.shutdownExecutorService = shutdownExecutorService;
+    }
+
     public AggregationStrategy getAggregationStrategy() {
         return aggregationStrategy;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java Tue Mar 13 16:20:43 2012
@@ -128,9 +128,10 @@ public class RecipientListProcessor exte
     }
 
     public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy,
-                                  boolean parallelProcessing, ExecutorService executorService, boolean streaming, boolean stopOnException, long timeout,
-                                  Processor onPrepare, boolean shareUnitOfWork) {
-        super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, streaming, stopOnException, timeout, onPrepare, shareUnitOfWork);
+                                  boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
+                                  boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) {
+        super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService,
+                streaming, stopOnException, timeout, onPrepare, shareUnitOfWork);
         this.producerCache = producerCache;
         this.iter = iter;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Tue Mar 13 16:20:43 2012
@@ -58,15 +58,14 @@ public class Splitter extends MulticastP
     private final Expression expression;
 
     public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy) {
-        this(camelContext, expression, destination, aggregationStrategy, false, null, false, false, 0, null, false);
+        this(camelContext, expression, destination, aggregationStrategy, false, null, false, false, false, 0, null, false);
     }
 
     public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
-                    boolean parallelProcessing, ExecutorService executorService, boolean streaming, boolean stopOnException, long timeout,
-                    Processor onPrepare, boolean useSubUnitOfWork) {
+                    boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
+                    boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork) {
         super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService,
-                streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork);
-
+                shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork);
         this.expression = expression;
         notNull(expression, "expression");
         notNull(destination, "destination");

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Tue Mar 13 16:20:43 2012
@@ -53,7 +53,9 @@ import org.slf4j.LoggerFactory;
 public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(ThreadsProcessor.class);
+    private final CamelContext camelContext;
     private final ExecutorService executorService;
+    private volatile boolean shutdownExecutorService;
     private final AtomicBoolean shutdown = new AtomicBoolean(true);
     private boolean callerRunsWhenRejected = true;
     private ThreadPoolRejectedPolicy rejectedPolicy;
@@ -101,10 +103,12 @@ public class ThreadsProcessor extends Se
         }
     }
 
-    public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService) {
+    public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService, boolean shutdownExecutorService) {
         ObjectHelper.notNull(camelContext, "camelContext");
         ObjectHelper.notNull(executorService, "executorService");
+        this.camelContext = camelContext;
         this.executorService = executorService;
+        this.shutdownExecutorService = shutdownExecutorService;
     }
 
     public void process(final Exchange exchange) throws Exception {
@@ -164,4 +168,12 @@ public class ThreadsProcessor extends Se
     protected void doStop() throws Exception {
         shutdown.set(true);
     }
+
+    protected void doShutdown() throws Exception {
+        if (shutdownExecutorService) {
+            camelContext.getExecutorServiceManager().shutdownNow(executorService);
+        }
+        super.doShutdown();
+    }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java Tue Mar 13 16:20:43 2012
@@ -18,6 +18,7 @@ package org.apache.camel.processor;
 
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
@@ -41,8 +42,9 @@ public class Throttler extends DelayProc
     private long timePeriodMillis = 1000;
     private volatile TimeSlot slot;
 
-    public Throttler(Processor processor, Expression maxRequestsPerPeriodExpression, long timePeriodMillis, ScheduledExecutorService executorService) {
-        super(processor, executorService);
+    public Throttler(CamelContext camelContext, Processor processor, Expression maxRequestsPerPeriodExpression, long timePeriodMillis,
+                     ScheduledExecutorService executorService, boolean shutdownExecutorService) {
+        super(camelContext, processor, executorService, shutdownExecutorService);
 
         ObjectHelper.notNull(maxRequestsPerPeriodExpression, "maxRequestsPerPeriodExpression");
         this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Tue Mar 13 16:20:43 2012
@@ -40,6 +40,7 @@ import org.apache.camel.util.ObjectHelpe
  */
 public class WireTapProcessor extends SendProcessor {
     private final ExecutorService executorService;
+    private volatile boolean shutdownExecutorService;
 
     // expression or processor used for populating a new exchange to send
     // as opposed to traditional wiretap that sends a copy of the original exchange
@@ -48,16 +49,18 @@ public class WireTapProcessor extends Se
     private boolean copy;
     private Processor onPrepare;
 
-    public WireTapProcessor(Endpoint destination, ExecutorService executorService) {
+    public WireTapProcessor(Endpoint destination, ExecutorService executorService, boolean shutdownExecutorService) {
         super(destination);
         ObjectHelper.notNull(executorService, "executorService");
         this.executorService = executorService;
+        this.shutdownExecutorService = shutdownExecutorService;
     }
 
-    public WireTapProcessor(Endpoint destination, ExchangePattern pattern, ExecutorService executorService) {
+    public WireTapProcessor(Endpoint destination, ExchangePattern pattern, ExecutorService executorService, boolean shutdownExecutorService) {
         super(destination, pattern);
         ObjectHelper.notNull(executorService, "executorService");
         this.executorService = executorService;
+        this.shutdownExecutorService = shutdownExecutorService;
     }
 
     @Override
@@ -175,6 +178,13 @@ public class WireTapProcessor extends Se
         return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly);
     }
 
+    protected void doShutdown() throws Exception {
+        if (shutdownExecutorService) {
+            getDestination().getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+        }
+        super.doShutdown();
+    }
+
     public List<Processor> getNewExchangeProcessors() {
         return newExchangeProcessors;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Tue Mar 13 16:20:43 2012
@@ -87,7 +87,9 @@ public class AggregateProcessor extends 
     private final AggregationStrategy aggregationStrategy;
     private final Expression correlationExpression;
     private final ExecutorService executorService;
-    private ScheduledExecutorService timeoutCheckerExecutorService;    
+    private final boolean shutdownExecutorService;
+    private ScheduledExecutorService timeoutCheckerExecutorService;
+    private boolean shutdownTimeoutCheckerExecutorService;
     private ScheduledExecutorService recoverService;
     // store correlation key -> exchange id in timeout map
     private TimeoutMap<String, String> timeoutMap;
@@ -125,7 +127,7 @@ public class AggregateProcessor extends 
 
     public AggregateProcessor(CamelContext camelContext, Processor processor,
                               Expression correlationExpression, AggregationStrategy aggregationStrategy,
-                              ExecutorService executorService) {
+                              ExecutorService executorService, boolean shutdownExecutorService) {
         ObjectHelper.notNull(camelContext, "camelContext");
         ObjectHelper.notNull(processor, "processor");
         ObjectHelper.notNull(correlationExpression, "correlationExpression");
@@ -136,6 +138,7 @@ public class AggregateProcessor extends 
         this.correlationExpression = correlationExpression;
         this.aggregationStrategy = aggregationStrategy;
         this.executorService = executorService;
+        this.shutdownExecutorService = shutdownExecutorService;
     }
 
     @Override
@@ -583,7 +586,15 @@ public class AggregateProcessor extends 
     public ScheduledExecutorService getTimeoutCheckerExecutorService() {
         return timeoutCheckerExecutorService;
     }
-    
+
+    public boolean isShutdownTimeoutCheckerExecutorService() {
+        return shutdownTimeoutCheckerExecutorService;
+    }
+
+    public void setShutdownTimeoutCheckerExecutorService(boolean shutdownTimeoutCheckerExecutorService) {
+        this.shutdownTimeoutCheckerExecutorService = shutdownTimeoutCheckerExecutorService;
+    }
+
     /**
      * On completion task which keeps the booking of the in progress up to date
      */
@@ -858,6 +869,7 @@ public class AggregateProcessor extends 
             LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis.");
             if (getTimeoutCheckerExecutorService() == null) {
                 setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1));
+                shutdownTimeoutCheckerExecutorService = true;
             }
             // trigger completion based on interval
             getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS);
@@ -868,6 +880,7 @@ public class AggregateProcessor extends 
             LOG.info("Using CompletionTimeout to trigger after " + getCompletionTimeout() + " millis of inactivity.");
             if (getTimeoutCheckerExecutorService() == null) {
                 setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1));
+                shutdownTimeoutCheckerExecutorService = true;
             }
             // check for timed out aggregated messages once every second
             timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), 1000L);
@@ -937,6 +950,14 @@ public class AggregateProcessor extends 
         // cleanup when shutting down
         inProgressCompleteExchanges.clear();
 
+        if (shutdownExecutorService) {
+            camelContext.getExecutorServiceManager().shutdownNow(executorService);
+        }
+        if (shutdownTimeoutCheckerExecutorService) {
+            camelContext.getExecutorServiceManager().shutdownNow(timeoutCheckerExecutorService);
+            timeoutCheckerExecutorService = null;
+        }
+
         super.doShutdown();
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java Tue Mar 13 16:20:43 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor.interceptor;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.model.ProcessorDefinition;
@@ -29,8 +30,8 @@ public class DelayInterceptor extends De
     private final ProcessorDefinition<?> node;
     private Delayer delayer;
 
-    public DelayInterceptor(ProcessorDefinition<?> node, Processor target, Delayer delayer) {
-        super(target);
+    public DelayInterceptor(CamelContext camelContext, ProcessorDefinition<?> node, Processor target, Delayer delayer) {
+        super(camelContext, target);
         this.node = node;
         this.delayer = delayer;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java Tue Mar 13 16:20:43 2012
@@ -28,7 +28,7 @@ import org.apache.camel.spi.InterceptStr
  */
 public class Delayer implements InterceptStrategy {
 
-    private boolean enabled = true;
+    private volatile boolean enabled = true;
     private final long delay;
 
     public Delayer(long delay) {
@@ -53,7 +53,7 @@ public class Delayer implements Intercep
 
     public Processor wrapProcessorInInterceptors(CamelContext context, ProcessorDefinition<?> definition,
                                                  Processor target, Processor nextTarget) throws Exception {
-        return new DelayInterceptor(definition, target, this);
+        return new DelayInterceptor(context, definition, target, this);
     }
 
     public boolean isEnabled() {

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java?rev=1300218&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java Tue Mar 13 16:20:43 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version 
+ */
+public class ManagedRouteRemoveWireTapExplicitThreadPoolTest extends ManagementTestSupport {
+
+    private ExecutorService myThreadPool;
+
+    public void testRemove() throws Exception {
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"foo\"");
+
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+        getMockEndpoint("mock:tap").expectedMessageCount(1);
+
+        template.sendBody("seda:foo", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        // should be started
+        String state = (String) mbeanServer.getAttribute(on, "State");
+        assertEquals("Should be started", ServiceStatus.Started.name(), state);
+
+        // and no wire tap thread pool as we use an existing external pool
+        Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null);
+        boolean wireTap = false;
+        for (ObjectName name : set) {
+            if (name.toString().contains("wireTap")) {
+                wireTap = true;
+                break;
+            }
+        }
+        assertFalse("Should not have a wire tap thread pool", wireTap);
+
+        // stop
+        mbeanServer.invoke(on, "stop", null, null);
+
+        state = (String) mbeanServer.getAttribute(on, "State");
+        assertEquals("Should be stopped", ServiceStatus.Stopped.name(), state);
+
+        // remove
+        mbeanServer.invoke(on, "remove", null, null);
+
+        // should not be registered anymore
+        boolean registered = mbeanServer.isRegistered(on);
+        assertFalse("Route mbean should have been unregistered", registered);
+
+        // and no wire tap thread pool as we use an existing external pool
+        set = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null);
+        wireTap = false;
+        for (ObjectName name : set) {
+            if (name.toString().contains("wireTap")) {
+                wireTap = true;
+                break;
+            }
+        }
+        assertFalse("Should not have a wire tap thread pool", wireTap);
+
+        // should not be shutdown
+        assertFalse("Thread pool should not be shutdown", myThreadPool.isShutdown());
+
+        myThreadPool.shutdownNow();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // create a new thread pool to use for wire tap
+                myThreadPool = Executors.newFixedThreadPool(1);
+
+                from("seda:foo").routeId("foo").wireTap("direct:tap").executorService(myThreadPool).to("mock:result");
+                from("direct:tap").routeId("tap").to("mock:tap");
+            }
+        };
+    }
+
+}

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java (from r1300065, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveContextScopedErrorHandlerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveContextScopedErrorHandlerTest.java&r1=1300065&r2=1300218&rev=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveContextScopedErrorHandlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java Tue Mar 13 16:20:43 2012
@@ -22,19 +22,18 @@ import javax.management.ObjectName;
 
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
 
 /**
  * @version 
  */
-public class ManagedRouteRemoveContextScopedErrorHandlerTest extends ManagementTestSupport {
+public class ManagedRouteRemoveWireTapTest extends ManagementTestSupport {
 
     public void testRemove() throws Exception {
         MBeanServer mbeanServer = getMBeanServer();
-        ObjectName on = getRouteObjectName(mbeanServer);
+        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"foo\"");
 
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+        getMockEndpoint("mock:tap").expectedMessageCount(1);
 
         template.sendBody("seda:foo", "Hello World");
 
@@ -44,9 +43,16 @@ public class ManagedRouteRemoveContextSc
         String state = (String) mbeanServer.getAttribute(on, "State");
         assertEquals("Should be started", ServiceStatus.Started.name(), state);
 
-        // and one context scoped error handler
-        Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=errorhandlers,*"), null);
-        assertEquals(1, set.size());
+        // and a number of thread pools
+        Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null);
+        boolean wireTap = false;
+        for (ObjectName name : set) {
+            if (name.toString().contains("wireTap")) {
+                wireTap = true;
+                break;
+            }
+        }
+        assertTrue("Should have a wire tap thread pool", wireTap);
 
         // stop
         mbeanServer.invoke(on, "stop", null, null);
@@ -61,20 +67,16 @@ public class ManagedRouteRemoveContextSc
         boolean registered = mbeanServer.isRegistered(on);
         assertFalse("Route mbean should have been unregistered", registered);
 
-        // and no more routes
-        set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null);
-        assertEquals(0, set.size());
-
-        // but still 1 context scoped error handler
-        set = mbeanServer.queryNames(new ObjectName("*:type=errorhandlers,*"), null);
-        assertEquals(1, set.size());
-    }
-
-    static ObjectName getRouteObjectName(MBeanServer mbeanServer) throws Exception {
-        Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null);
-        assertEquals(1, set.size());
-
-        return set.iterator().next();
+        // and a thread pool less
+        set = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null);
+        wireTap = false;
+        for (ObjectName name : set) {
+            if (name.toString().contains("wireTap")) {
+                wireTap = true;
+                break;
+            }
+        }
+        assertFalse("Should not have a wire tap thread pool", wireTap);
     }
 
     @Override
@@ -82,11 +84,9 @@ public class ManagedRouteRemoveContextSc
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                // context scoped error handler
-                errorHandler(deadLetterChannel("mock:dead"));
+                from("seda:foo").routeId("foo").wireTap("direct:tap").to("mock:result");
                 
-                // which this route will use
-                from("seda:foo").to("mock:result");
+                from("direct:tap").routeId("tap").to("mock:tap");
             }
         };
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java Tue Mar 13 16:20:43 2012
@@ -75,7 +75,7 @@ public class ThrottlerTest extends Conte
     }
 
     public void testTimeSlotCalculus() throws Exception {
-        Throttler throttler = new Throttler(null, constant(3), 1000, null);
+        Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false);
         // calculate will assign a new slot
         throttler.calculateDelay(new DefaultExchange(context));
         TimeSlot slot = throttler.nextSlot();

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Tue Mar 13 16:20:43 2012
@@ -62,7 +62,7 @@ public class AggregateProcessorTest exte
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().contains("END");
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionPredicate(complete);
         ap.setEagerCheckCompletion(false);
         ap.start();
@@ -103,7 +103,7 @@ public class AggregateProcessorTest exte
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().isEqualTo("END");
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionPredicate(complete);
         ap.setEagerCheckCompletion(true);
         ap.start();
@@ -151,7 +151,7 @@ public class AggregateProcessorTest exte
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionSize(3);
         ap.setEagerCheckCompletion(eager);
         ap.start();
@@ -199,7 +199,7 @@ public class AggregateProcessorTest exte
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionTimeout(3000);
         ap.setEagerCheckCompletion(eager);
         ap.start();
@@ -248,7 +248,7 @@ public class AggregateProcessorTest exte
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionInterval(3000);
         ap.start();
 
@@ -289,7 +289,7 @@ public class AggregateProcessorTest exte
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().contains("END");
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionPredicate(complete);
         ap.setIgnoreInvalidCorrelationKeys(true);
 
@@ -329,7 +329,7 @@ public class AggregateProcessorTest exte
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().contains("END");
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionPredicate(complete);
 
         ap.start();
@@ -376,7 +376,7 @@ public class AggregateProcessorTest exte
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().contains("END");
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionPredicate(complete);
         ap.setCloseCorrelationKeyOnCompletion(1000);
 
@@ -423,7 +423,7 @@ public class AggregateProcessorTest exte
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionSize(100);
         ap.setCompletionFromBatchConsumer(true);
 
@@ -520,7 +520,7 @@ public class AggregateProcessorTest exte
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setEagerCheckCompletion(true);
         ap.setCompletionPredicate(body().isEqualTo("END"));
         if (handler != null) {
@@ -571,7 +571,7 @@ public class AggregateProcessorTest exte
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionSize(10);
         ap.start();
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java?rev=1300218&r1=1300217&r2=1300218&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java Tue Mar 13 16:20:43 2012
@@ -59,7 +59,7 @@ public class AggregateProcessorTimeoutCo
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         // start with a high timeout so no completes before we stop
         ap.setCompletionTimeout(2000);
         ap.start();
@@ -101,7 +101,7 @@ public class AggregateProcessorTimeoutCo
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         // start with a high timeout so no completes before we stop
         ap.setCompletionTimeoutExpression(header("myTimeout"));
         ap.start();
@@ -145,7 +145,7 @@ public class AggregateProcessorTimeoutCo
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         // start with a high timeout so no completes before we stop
         ap.setCompletionTimeoutExpression(header("myTimeout"));
         ap.start();



Mime
View raw message