camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jans...@apache.org
Subject svn commit: r1196731 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/aggregate/ test/java/org/apache/camel/processor/aggregator/
Date Wed, 02 Nov 2011 18:16:36 GMT
Author: janstey
Date: Wed Nov  2 18:16:35 2011
New Revision: 1196731

URL: http://svn.apache.org/viewvc?rev=1196731&view=rev
Log:
CAMEL-4606 - allow setting custom threadpool for aggregator background tasks

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceRefTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=1196731&r1=1196730&r2=1196731&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Wed
Nov  2 18:16:35 2011
@@ -19,6 +19,8 @@ package org.apache.camel.model;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -39,6 +41,7 @@ import org.apache.camel.processor.aggreg
 import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.util.concurrent.SynchronousExecutorService;
 
 /**
@@ -66,12 +69,16 @@ public class AggregateDefinition extends
     @XmlTransient
     private ExecutorService executorService;
     @XmlTransient
+    private ScheduledExecutorService timeoutCheckerExecutorService;
+    @XmlTransient
     private AggregationRepository aggregationRepository;
     @XmlAttribute
     private Boolean parallelProcessing;
     @XmlAttribute
     private String executorServiceRef;
     @XmlAttribute
+    private String timeoutCheckerExecutorServiceRef;
+    @XmlAttribute
     private String aggregationRepositoryRef;
     @XmlAttribute
     private String strategyRef;
@@ -166,12 +173,20 @@ public class AggregateDefinition extends
                 executorService = new SynchronousExecutorService();
             }
         }
+       
+        if (timeoutCheckerExecutorServiceRef != null && timeoutCheckerExecutorService
== null) {
+            timeoutCheckerExecutorService = getConfiguredScheduledExecutorService(routeContext);
+        }
         AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(),
processor, correlation, strategy, executorService);
 
         AggregationRepository repository = createAggregationRepository(routeContext);
         if (repository != null) {
             answer.setAggregationRepository(repository);
         }
+        
+        if (getTimeoutCheckerExecutorService() != null) {
+            answer.setTimeoutCheckerExecutorService(timeoutCheckerExecutorService);
+        }
 
         // set other options
         answer.setParallelProcessing(isParallelProcessing());
@@ -218,6 +233,28 @@ public class AggregateDefinition extends
         return answer;
     }
 
+    private ScheduledExecutorService getConfiguredScheduledExecutorService(RouteContext routeContext)
{
+        // TODO: maybe rather than this one-off method to support an executorService &
scheduledExecutorService for the aggregator,
+        // create ScheduledExecutorServiceAwareDefinition and the change other definitions
that currently use ScheduledExecutorServices to
+        // use that one instead of the more generic ExecutorServiceAwareDefinition
+        ScheduledExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(timeoutCheckerExecutorServiceRef,
ScheduledExecutorService.class);
+        if (answer == null) {
+            ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
+            // then create a thread pool assuming the ref is a thread pool profile id   
            
+            ThreadPoolProfile profile = manager.getThreadPoolProfile(timeoutCheckerExecutorServiceRef);
+            if (profile != null) {
+                // okay we need to grab the pool size from the ref
+                Integer poolSize = profile.getPoolSize();
+                if (poolSize == null) {
+                    // fallback and use the default pool size, if none was set on the profile
+                    poolSize = manager.getDefaultThreadPoolProfile().getPoolSize();
+                }
+                answer = manager.newScheduledThreadPool(this, "Aggregator", poolSize);
+            }
+        }
+        return answer;
+    }
+
     @Override
     protected void configureChild(ProcessorDefinition output) {
         if (expression != null && expression instanceof ExpressionClause) {
@@ -455,6 +492,22 @@ public class AggregateDefinition extends
     public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) {
         this.discardOnCompletionTimeout = discardOnCompletionTimeout;
     }
+    
+    public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService)
{
+        this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
+    }
+
+    public ScheduledExecutorService getTimeoutCheckerExecutorService() {
+        return timeoutCheckerExecutorService;
+    }
+
+    public void setTimeoutCheckerExecutorServiceRef(String timeoutCheckerExecutorServiceRef)
{
+        this.timeoutCheckerExecutorServiceRef = timeoutCheckerExecutorServiceRef;
+    }
+
+    public String getTimeoutCheckerExecutorServiceRef() {
+        return timeoutCheckerExecutorServiceRef;
+    }
 
     // Fluent API
     //-------------------------------------------------------------------------
@@ -696,6 +749,16 @@ public class AggregateDefinition extends
         return this;
     }
 
+    public AggregateDefinition timeoutCheckerExecutorService(ScheduledExecutorService executorService)
{
+        setTimeoutCheckerExecutorService(executorService);
+        return this;
+    }
+
+    public AggregateDefinition timeoutCheckerExecutorServiceRef(String executorServiceRef)
{
+        setTimeoutCheckerExecutorServiceRef(executorServiceRef);
+        return this;
+    }
+    
     protected void checkNoCompletedPredicate() {
         if (getCompletionPredicate() != null) {
             throw new IllegalArgumentException("There is already a completionPredicate defined
for this aggregator: " + this);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java?rev=1196731&r1=1196730&r2=1196731&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java
Wed Nov  2 18:16:35 2011
@@ -219,7 +219,7 @@ public final class ProcessorDefinitionHe
      * The various {@link ExecutorServiceAwareDefinition} should use this helper method to
ensure they support
      * configured executor services in the same coherent way.
      *
-     * @param routeContext   the rout context
+     * @param routeContext   the route context
      * @param name           name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
      *                       is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
      * @param definition     the node definition which may leverage executor service.

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1196731&r1=1196730&r2=1196731&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Wed Nov  2 18:16:35 2011
@@ -76,6 +76,8 @@ import org.slf4j.LoggerFactory;
  */
 public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor>,
Traceable {
 
+    public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker";
+
     private static final Logger LOG = LoggerFactory.getLogger(AggregateProcessor.class);
 
     private final Lock lock = new ReentrantLock();
@@ -84,6 +86,7 @@ public class AggregateProcessor extends 
     private final AggregationStrategy aggregationStrategy;
     private final Expression correlationExpression;
     private final ExecutorService executorService;
+    private ScheduledExecutorService timeoutCheckerExecutorService;    
     private ScheduledExecutorService recoverService;
     // store correlation key -> exchange id in timeout map
     private TimeoutMap<String, String> timeoutMap;
@@ -572,6 +575,14 @@ public class AggregateProcessor extends 
         this.forceCompletionOnStop = forceCompletionOnStop;
     }
 
+    public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService)
{
+        this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
+    }
+
+    public ScheduledExecutorService getTimeoutCheckerExecutorService() {
+        return timeoutCheckerExecutorService;
+    }
+    
     /**
      * On completion task which keeps the booking of the in progress up to date
      */
@@ -844,17 +855,21 @@ public class AggregateProcessor extends 
         }
         if (getCompletionInterval() > 0) {
             LOG.info("Using CompletionInterval to run every " + getCompletionInterval() +
" millis.");
-            ScheduledExecutorService scheduler = camelContext.getExecutorServiceManager().newScheduledThreadPool(this,
"AggregateTimeoutChecker", 1);
+            if (getTimeoutCheckerExecutorService() == null) {
+                setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this,
AGGREGATE_TIMEOUT_CHECKER, 1));
+            }
             // trigger completion based on interval
-            scheduler.scheduleAtFixedRate(new AggregationIntervalTask(), 1000L, getCompletionInterval(),
TimeUnit.MILLISECONDS);
+            getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(),
1000L, getCompletionInterval(), TimeUnit.MILLISECONDS);
         }
 
         // start timeout service if its in use
         if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) {
             LOG.info("Using CompletionTimeout to trigger after " + getCompletionTimeout()
+ " millis of inactivity.");
-            ScheduledExecutorService scheduler = camelContext.getExecutorServiceManager().newScheduledThreadPool(this,
"AggregateTimeoutChecker", 1);
+            if (getTimeoutCheckerExecutorService() == null) {
+                setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this,
AGGREGATE_TIMEOUT_CHECKER, 1));
+            }
             // check for timed out aggregated messages once every second
-            timeoutMap = new AggregationTimeoutMap(scheduler, 1000L);
+            timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), 1000L);
             // fill in existing timeout values from the aggregation repository, for example
if a restart occurred, then we
             // need to re-establish the timeout map so timeout can trigger
             restoreTimeoutMapFromAggregationRepository();

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceRefTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceRefTest.java?rev=1196731&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceRefTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceRefTest.java
Wed Nov  2 18:16:35 2011
@@ -0,0 +1,36 @@
+package org.apache.camel.processor.aggregator;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.naming.Context;
+
+import org.apache.camel.ThreadPoolRejectedPolicy;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+import org.apache.camel.spi.ThreadPoolProfile;
+
+public class AggregateTimeoutWithExecutorServiceRefTest extends AggregateTimeoutWithExecutorServiceTest
{
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // create and register thread pool profile
+                ThreadPoolProfile profile = new ThreadPoolProfile("MyThreadPool");
+                profile.setPoolSize(8);
+                profile.setMaxPoolSize(8);
+                profile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort);
+                context.getExecutorServiceManager().registerThreadPoolProfile(profile);
+                
+                for (int i = 0; i < NUM_AGGREGATORS; ++i) {
+                    from("direct:start" + i)
+                    // aggregate timeout after 3th seconds
+                    .aggregate(header("id"), new UseLatestAggregationStrategy()).completionTimeout(3000).timeoutCheckerExecutorServiceRef("MyThreadPool")
+                    .to("mock:result" + i);
+                }
+            }
+        };
+    }
+    
+}

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceTest.java?rev=1196731&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceTest.java
Wed Nov  2 18:16:35 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregateProcessor;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+
+/**
+ * Unit test to verify that aggregate by timeout only also works.
+ * 
+ * @version 
+ */
+public class AggregateTimeoutWithExecutorServiceTest extends ContextTestSupport {
+
+    public static final int NUM_AGGREGATORS = 20;
+
+    public void testThreadNotUsedForEveryAggregatorWithCustomExecutorService() throws Exception
{
+        assertTrue("There should not be a thread for every aggregator when using a shared
thread pool", 
+                aggregateThreadsCount() < NUM_AGGREGATORS);
+        
+        // sanity check to make sure were testing routes that work
+        for (int i = 0; i < NUM_AGGREGATORS; ++i) {
+            MockEndpoint result = getMockEndpoint("mock:result" + i);
+            // by default the use latest aggregation strategy is used so we get message 4
+            result.expectedBodiesReceived("Message 4");
+        }
+        for (int i = 0; i < NUM_AGGREGATORS; ++i) {
+            for (int j = 0; j < 5; j++) {
+                template.sendBodyAndHeader("direct:start" + i, "Message " + j, "id", "1");
+            }
+        }
+        assertMockEndpointsSatisfied();
+    }
+
+    public static int aggregateThreadsCount() {
+        int count = 0;
+        ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
+        Thread[] threads = new Thread[threadGroup.activeCount()];
+        threadGroup.enumerate(threads);
+        for (Thread thread : threads) {
+            if (thread.getName().contains(AggregateProcessor.AGGREGATE_TIMEOUT_CHECKER))
{
+                ++count;
+            }
+        }
+        return count;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // share 8 threads among the 20 routes
+                ScheduledExecutorService threadPool = context.getExecutorServiceManager().newScheduledThreadPool(this,
"MyThreadPool", 8);
+                for (int i = 0; i < NUM_AGGREGATORS; ++i) {
+                    from("direct:start" + i)
+                    // aggregate timeout after 3th seconds
+                    .aggregate(header("id"), new UseLatestAggregationStrategy()).completionTimeout(3000).timeoutCheckerExecutorService(threadPool)
+                    .to("mock:result" + i);
+                }
+            }
+        };
+    }
+}

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java?rev=1196731&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java
Wed Nov  2 18:16:35 2011
@@ -0,0 +1,41 @@
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+
+public class AggregateTimeoutWithNoExecutorServiceTest extends ContextTestSupport {
+    public void testThreadUsedForEveryAggregatorWhenDefaultExecutorServiceUsed() throws Exception
{
+        assertTrue("There should be a thread for every aggregator when using defaults", 
+                AggregateTimeoutWithExecutorServiceTest.aggregateThreadsCount() >= AggregateTimeoutWithExecutorServiceTest.NUM_AGGREGATORS);
+        
+        // sanity check to make sure were testing routes that work
+        for (int i = 0; i < AggregateTimeoutWithExecutorServiceTest.NUM_AGGREGATORS; ++i)
{
+            MockEndpoint result = getMockEndpoint("mock:result" + i);
+            // by default the use latest aggregation strategy is used so we get message 4
+            result.expectedBodiesReceived("Message 4");
+        }
+        for (int i = 0; i < AggregateTimeoutWithExecutorServiceTest.NUM_AGGREGATORS; ++i)
{
+            for (int j = 0; j < 5; j++) {
+                template.sendBodyAndHeader("direct:start" + i, "Message " + j, "id", "1");
+            }
+        }
+        assertMockEndpointsSatisfied();
+    }   
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                for (int i = 0; i < AggregateTimeoutWithExecutorServiceTest.NUM_AGGREGATORS;
++i) {
+                    from("direct:start" + i)
+                    // aggregate timeout after 3th seconds
+                    .aggregate(header("id"), new UseLatestAggregationStrategy()).completionTimeout(3000)
+                    .to("mock:result" + i);
+                }
+            }
+        };
+    }    
+}



Mime
View raw message