Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D32FD9946 for ; Wed, 2 Nov 2011 18:17:02 +0000 (UTC) Received: (qmail 57784 invoked by uid 500); 2 Nov 2011 18:17:02 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 57752 invoked by uid 500); 2 Nov 2011 18:17:02 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 57745 invoked by uid 99); 2 Nov 2011 18:17:02 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Nov 2011 18:17:02 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Nov 2011 18:16:58 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id ACEA32388C9B for ; Wed, 2 Nov 2011 18:16:36 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1196731 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/aggregate/ test/java/org/apache/camel/processor/aggregator/ Date: Wed, 02 Nov 2011 18:16:36 -0000 To: commits@camel.apache.org From: janstey@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111102181636.ACEA32388C9B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: janstey Date: Wed Nov 2 18:16:35 2011 New Revision: 1196731 URL: http://svn.apache.org/viewvc?rev=1196731&view=rev Log: CAMEL-4606 - allow setting custom threadpool for aggregator background tasks Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceRefTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=1196731&r1=1196730&r2=1196731&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Wed Nov 2 18:16:35 2011 @@ -19,6 +19,8 @@ package org.apache.camel.model; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; @@ -39,6 +41,7 @@ import org.apache.camel.processor.aggreg import org.apache.camel.spi.AggregationRepository; import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.util.concurrent.SynchronousExecutorService; /** @@ -66,12 +69,16 @@ public class AggregateDefinition extends @XmlTransient private ExecutorService executorService; @XmlTransient + private ScheduledExecutorService timeoutCheckerExecutorService; + @XmlTransient private AggregationRepository aggregationRepository; @XmlAttribute private Boolean parallelProcessing; @XmlAttribute private String executorServiceRef; @XmlAttribute + private String timeoutCheckerExecutorServiceRef; + @XmlAttribute private String aggregationRepositoryRef; @XmlAttribute private String strategyRef; @@ -166,12 +173,20 @@ public class AggregateDefinition extends executorService = new SynchronousExecutorService(); } } + + if (timeoutCheckerExecutorServiceRef != null && timeoutCheckerExecutorService == null) { + timeoutCheckerExecutorService = getConfiguredScheduledExecutorService(routeContext); + } AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), processor, correlation, strategy, executorService); AggregationRepository repository = createAggregationRepository(routeContext); if (repository != null) { answer.setAggregationRepository(repository); } + + if (getTimeoutCheckerExecutorService() != null) { + answer.setTimeoutCheckerExecutorService(timeoutCheckerExecutorService); + } // set other options answer.setParallelProcessing(isParallelProcessing()); @@ -218,6 +233,28 @@ public class AggregateDefinition extends return answer; } + private ScheduledExecutorService getConfiguredScheduledExecutorService(RouteContext routeContext) { + // TODO: maybe rather than this one-off method to support an executorService & scheduledExecutorService for the aggregator, + // create ScheduledExecutorServiceAwareDefinition and the change other definitions that currently use ScheduledExecutorServices to + // use that one instead of the more generic ExecutorServiceAwareDefinition + ScheduledExecutorService answer = routeContext.getCamelContext().getRegistry().lookup(timeoutCheckerExecutorServiceRef, ScheduledExecutorService.class); + if (answer == null) { + ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); + // then create a thread pool assuming the ref is a thread pool profile id + ThreadPoolProfile profile = manager.getThreadPoolProfile(timeoutCheckerExecutorServiceRef); + if (profile != null) { + // okay we need to grab the pool size from the ref + Integer poolSize = profile.getPoolSize(); + if (poolSize == null) { + // fallback and use the default pool size, if none was set on the profile + poolSize = manager.getDefaultThreadPoolProfile().getPoolSize(); + } + answer = manager.newScheduledThreadPool(this, "Aggregator", poolSize); + } + } + return answer; + } + @Override protected void configureChild(ProcessorDefinition output) { if (expression != null && expression instanceof ExpressionClause) { @@ -455,6 +492,22 @@ public class AggregateDefinition extends public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) { this.discardOnCompletionTimeout = discardOnCompletionTimeout; } + + public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) { + this.timeoutCheckerExecutorService = timeoutCheckerExecutorService; + } + + public ScheduledExecutorService getTimeoutCheckerExecutorService() { + return timeoutCheckerExecutorService; + } + + public void setTimeoutCheckerExecutorServiceRef(String timeoutCheckerExecutorServiceRef) { + this.timeoutCheckerExecutorServiceRef = timeoutCheckerExecutorServiceRef; + } + + public String getTimeoutCheckerExecutorServiceRef() { + return timeoutCheckerExecutorServiceRef; + } // Fluent API //------------------------------------------------------------------------- @@ -696,6 +749,16 @@ public class AggregateDefinition extends return this; } + public AggregateDefinition timeoutCheckerExecutorService(ScheduledExecutorService executorService) { + setTimeoutCheckerExecutorService(executorService); + return this; + } + + public AggregateDefinition timeoutCheckerExecutorServiceRef(String executorServiceRef) { + setTimeoutCheckerExecutorServiceRef(executorServiceRef); + return this; + } + protected void checkNoCompletedPredicate() { if (getCompletionPredicate() != null) { throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java?rev=1196731&r1=1196730&r2=1196731&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java Wed Nov 2 18:16:35 2011 @@ -219,7 +219,7 @@ public final class ProcessorDefinitionHe * The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support * configured executor services in the same coherent way. * - * @param routeContext the rout context + * @param routeContext the route context * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService} * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}. * @param definition the node definition which may leverage executor service. Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1196731&r1=1196730&r2=1196731&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Nov 2 18:16:35 2011 @@ -76,6 +76,8 @@ import org.slf4j.LoggerFactory; */ public class AggregateProcessor extends ServiceSupport implements Processor, Navigate, Traceable { + public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker"; + private static final Logger LOG = LoggerFactory.getLogger(AggregateProcessor.class); private final Lock lock = new ReentrantLock(); @@ -84,6 +86,7 @@ public class AggregateProcessor extends private final AggregationStrategy aggregationStrategy; private final Expression correlationExpression; private final ExecutorService executorService; + private ScheduledExecutorService timeoutCheckerExecutorService; private ScheduledExecutorService recoverService; // store correlation key -> exchange id in timeout map private TimeoutMap timeoutMap; @@ -572,6 +575,14 @@ public class AggregateProcessor extends this.forceCompletionOnStop = forceCompletionOnStop; } + public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) { + this.timeoutCheckerExecutorService = timeoutCheckerExecutorService; + } + + public ScheduledExecutorService getTimeoutCheckerExecutorService() { + return timeoutCheckerExecutorService; + } + /** * On completion task which keeps the booking of the in progress up to date */ @@ -844,17 +855,21 @@ public class AggregateProcessor extends } if (getCompletionInterval() > 0) { LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis."); - ScheduledExecutorService scheduler = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1); + if (getTimeoutCheckerExecutorService() == null) { + setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1)); + } // trigger completion based on interval - scheduler.scheduleAtFixedRate(new AggregationIntervalTask(), 1000L, getCompletionInterval(), TimeUnit.MILLISECONDS); + getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), 1000L, getCompletionInterval(), TimeUnit.MILLISECONDS); } // start timeout service if its in use if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) { LOG.info("Using CompletionTimeout to trigger after " + getCompletionTimeout() + " millis of inactivity."); - ScheduledExecutorService scheduler = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1); + if (getTimeoutCheckerExecutorService() == null) { + setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1)); + } // check for timed out aggregated messages once every second - timeoutMap = new AggregationTimeoutMap(scheduler, 1000L); + timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), 1000L); // fill in existing timeout values from the aggregation repository, for example if a restart occurred, then we // need to re-establish the timeout map so timeout can trigger restoreTimeoutMapFromAggregationRepository(); Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceRefTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceRefTest.java?rev=1196731&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceRefTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceRefTest.java Wed Nov 2 18:16:35 2011 @@ -0,0 +1,36 @@ +package org.apache.camel.processor.aggregator; + +import java.util.concurrent.ScheduledExecutorService; + +import javax.naming.Context; + +import org.apache.camel.ThreadPoolRejectedPolicy; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; +import org.apache.camel.spi.ThreadPoolProfile; + +public class AggregateTimeoutWithExecutorServiceRefTest extends AggregateTimeoutWithExecutorServiceTest { + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // create and register thread pool profile + ThreadPoolProfile profile = new ThreadPoolProfile("MyThreadPool"); + profile.setPoolSize(8); + profile.setMaxPoolSize(8); + profile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort); + context.getExecutorServiceManager().registerThreadPoolProfile(profile); + + for (int i = 0; i < NUM_AGGREGATORS; ++i) { + from("direct:start" + i) + // aggregate timeout after 3th seconds + .aggregate(header("id"), new UseLatestAggregationStrategy()).completionTimeout(3000).timeoutCheckerExecutorServiceRef("MyThreadPool") + .to("mock:result" + i); + } + } + }; + } + +} Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceTest.java?rev=1196731&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithExecutorServiceTest.java Wed Nov 2 18:16:35 2011 @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.AggregateProcessor; +import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; + +/** + * Unit test to verify that aggregate by timeout only also works. + * + * @version + */ +public class AggregateTimeoutWithExecutorServiceTest extends ContextTestSupport { + + public static final int NUM_AGGREGATORS = 20; + + public void testThreadNotUsedForEveryAggregatorWithCustomExecutorService() throws Exception { + assertTrue("There should not be a thread for every aggregator when using a shared thread pool", + aggregateThreadsCount() < NUM_AGGREGATORS); + + // sanity check to make sure were testing routes that work + for (int i = 0; i < NUM_AGGREGATORS; ++i) { + MockEndpoint result = getMockEndpoint("mock:result" + i); + // by default the use latest aggregation strategy is used so we get message 4 + result.expectedBodiesReceived("Message 4"); + } + for (int i = 0; i < NUM_AGGREGATORS; ++i) { + for (int j = 0; j < 5; j++) { + template.sendBodyAndHeader("direct:start" + i, "Message " + j, "id", "1"); + } + } + assertMockEndpointsSatisfied(); + } + + public static int aggregateThreadsCount() { + int count = 0; + ThreadGroup threadGroup = Thread.currentThread().getThreadGroup(); + Thread[] threads = new Thread[threadGroup.activeCount()]; + threadGroup.enumerate(threads); + for (Thread thread : threads) { + if (thread.getName().contains(AggregateProcessor.AGGREGATE_TIMEOUT_CHECKER)) { + ++count; + } + } + return count; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // share 8 threads among the 20 routes + ScheduledExecutorService threadPool = context.getExecutorServiceManager().newScheduledThreadPool(this, "MyThreadPool", 8); + for (int i = 0; i < NUM_AGGREGATORS; ++i) { + from("direct:start" + i) + // aggregate timeout after 3th seconds + .aggregate(header("id"), new UseLatestAggregationStrategy()).completionTimeout(3000).timeoutCheckerExecutorService(threadPool) + .to("mock:result" + i); + } + } + }; + } +} Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java?rev=1196731&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java Wed Nov 2 18:16:35 2011 @@ -0,0 +1,41 @@ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; + +public class AggregateTimeoutWithNoExecutorServiceTest extends ContextTestSupport { + public void testThreadUsedForEveryAggregatorWhenDefaultExecutorServiceUsed() throws Exception { + assertTrue("There should be a thread for every aggregator when using defaults", + AggregateTimeoutWithExecutorServiceTest.aggregateThreadsCount() >= AggregateTimeoutWithExecutorServiceTest.NUM_AGGREGATORS); + + // sanity check to make sure were testing routes that work + for (int i = 0; i < AggregateTimeoutWithExecutorServiceTest.NUM_AGGREGATORS; ++i) { + MockEndpoint result = getMockEndpoint("mock:result" + i); + // by default the use latest aggregation strategy is used so we get message 4 + result.expectedBodiesReceived("Message 4"); + } + for (int i = 0; i < AggregateTimeoutWithExecutorServiceTest.NUM_AGGREGATORS; ++i) { + for (int j = 0; j < 5; j++) { + template.sendBodyAndHeader("direct:start" + i, "Message " + j, "id", "1"); + } + } + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + for (int i = 0; i < AggregateTimeoutWithExecutorServiceTest.NUM_AGGREGATORS; ++i) { + from("direct:start" + i) + // aggregate timeout after 3th seconds + .aggregate(header("id"), new UseLatestAggregationStrategy()).completionTimeout(3000) + .to("mock:result" + i); + } + } + }; + } +}