Return-Path: Delivered-To: apmail-activemq-camel-commits-archive@locus.apache.org Received: (qmail 63509 invoked from network); 5 Nov 2008 16:29:36 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 5 Nov 2008 16:29:36 -0000 Received: (qmail 28080 invoked by uid 500); 5 Nov 2008 16:29:42 -0000 Delivered-To: apmail-activemq-camel-commits-archive@activemq.apache.org Received: (qmail 28058 invoked by uid 500); 5 Nov 2008 16:29:42 -0000 Mailing-List: contact camel-commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: camel-dev@activemq.apache.org Delivered-To: mailing list camel-commits@activemq.apache.org Received: (qmail 28049 invoked by uid 99); 5 Nov 2008 16:29:42 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Nov 2008 08:29:42 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Nov 2008 16:28:33 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 3E4B52388979; Wed, 5 Nov 2008 08:29:15 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r711602 - in /activemq/camel/branches/camel-1.x: ./ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/test/java/org/apache/camel/processor/ components/camel-spring/src/test/java/org/apache/camel/spring/processor/ components/ca... Date: Wed, 05 Nov 2008 16:29:14 -0000 To: camel-commits@activemq.apache.org From: janstey@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081105162915.3E4B52388979@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: janstey Date: Wed Nov 5 08:29:14 2008 New Revision: 711602 URL: http://svn.apache.org/viewvc?rev=711602&view=rev Log: Merged revisions 711599 via svnmerge from https://svn.apache.org/repos/asf/activemq/camel/trunk ........ r711599 | janstey | 2008-11-05 12:46:56 -0330 (Wed, 05 Nov 2008) | 1 line CAMEL-1040 - added configurable thread pool parameter to splitter ........ Added: activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java - copied unchanged from r711599, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java activemq/camel/branches/camel-1.x/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java - copied unchanged from r711599, activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml - copied unchanged from r711599, activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml Modified: activemq/camel/branches/camel-1.x/ (props changed) activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/SplitterType.java Propchange: activemq/camel/branches/camel-1.x/ ------------------------------------------------------------------------------ --- svnmerge-integrated (original) +++ svnmerge-integrated Wed Nov 5 08:29:14 2008 @@ -1 +1 @@ -/activemq/camel/trunk:1-708421,708553-709447,709449-709612,709614-709634,709636-710013,711200,711206,711219-711220,711523,711531 +/activemq/camel/trunk:1-708421,708553-709447,709449-709612,709614-709634,709636-710013,711200,711206,711219-711220,711523,711531,711599 Modified: activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=711602&r1=711601&r2=711602&view=diff ============================================================================== --- activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java (original) +++ activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java Wed Nov 5 08:29:14 2008 @@ -448,11 +448,11 @@ * This splitter responds with the latest message returned from destination * endpoint. * - * @param receipients the expression on which to split + * @param recipients the expression on which to split * @return the builder */ - public SplitterType splitter(Expression receipients) { - SplitterType answer = new SplitterType(receipients); + public SplitterType splitter(Expression recipients) { + SplitterType answer = new SplitterType(recipients); addOutput(answer); return answer; } @@ -516,12 +516,12 @@ * This splitter responds with the latest message returned from destination * endpoint. * - * @param receipients the expression on which to split + * @param recipients the expression on which to split * @param parallelProcessing if is true camel will fork thread to call the endpoint producer * @return the builder */ - public SplitterType splitter(Expression receipients, boolean parallelProcessing) { - SplitterType answer = new SplitterType(receipients); + public SplitterType splitter(Expression recipients, boolean parallelProcessing) { + SplitterType answer = new SplitterType(recipients); addOutput(answer); answer.setParallelProcessing(parallelProcessing); return answer; @@ -535,6 +535,27 @@ * This splitter responds with the latest message returned from destination * endpoint. * + * @param recipients the expression on which to split + * @param parallelProcessing if is true camel will fork thread to call the endpoint producer + * @param threadPoolExecutor override the default {@link ThreadPoolExecutor} + * @return the builder + */ + public SplitterType splitter(Expression recipients, boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) { + SplitterType answer = new SplitterType(recipients); + addOutput(answer); + answer.setParallelProcessing(parallelProcessing); + answer.setThreadPoolExecutor(threadPoolExecutor); + return answer; + } + + /** + * Creates the Splitter + * pattern where an expression is evaluated to iterate through each of the + * parts of a message and then each part is then send to some endpoint. + * This splitter responds with the latest message returned from destination + * endpoint. + * * @param parallelProcessing if is true camel will fork thread to call the endpoint producer * @return the expression clause for the expression on which to split */ @@ -550,6 +571,26 @@ * href="http://activemq.apache.org/camel/splitter.html">Splitter * pattern where an expression is evaluated to iterate through each of the * parts of a message and then each part is then send to some endpoint. + * This splitter responds with the latest message returned from destination + * endpoint. + * + * @param parallelProcessing if is true camel will fork thread to call the endpoint producer + * @param threadPoolExecutor override the default {@link ThreadPoolExecutor} + * @return the expression clause for the expression on which to split + */ + public ExpressionClause splitter(boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) { + SplitterType answer = new SplitterType(); + addOutput(answer); + answer.setParallelProcessing(parallelProcessing); + answer.setThreadPoolExecutor(threadPoolExecutor); + return ExpressionClause.createAndSetExpression(answer); + } + + /** + * Creates the Splitter + * pattern where an expression is evaluated to iterate through each of the + * parts of a message and then each part is then send to some endpoint. * Answer from the splitter is produced using given {@link AggregationStrategy} * @param partsExpression the expression on which to split * @param aggregationStrategy the strategy used to aggregate responses for @@ -572,6 +613,29 @@ * pattern where an expression is evaluated to iterate through each of the * parts of a message and then each part is then send to some endpoint. * Answer from the splitter is produced using given {@link AggregationStrategy} + * @param partsExpression the expression on which to split + * @param aggregationStrategy the strategy used to aggregate responses for + * every part + * @param parallelProcessing if is true camel will fork thread to call the endpoint producer + * @param threadPoolExecutor override the default {@link ThreadPoolExecutor} + * @return the builder + */ + public SplitterType splitter(Expression partsExpression, + AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) { + SplitterType answer = new SplitterType(partsExpression); + addOutput(answer); + answer.setAggregationStrategy(aggregationStrategy); + answer.setParallelProcessing(parallelProcessing); + answer.setThreadPoolExecutor(threadPoolExecutor); + return answer; + } + + /** + * Creates the Splitter + * pattern where an expression is evaluated to iterate through each of the + * parts of a message and then each part is then send to some endpoint. + * Answer from the splitter is produced using given {@link AggregationStrategy} * @param aggregationStrategy the strategy used to aggregate responses for * every part * @param parallelProcessing if is true camel will fork thread to call the endpoint producer @@ -585,7 +649,27 @@ return ExpressionClause.createAndSetExpression(answer); } - + /** + * Creates the Splitter + * pattern where an expression is evaluated to iterate through each of the + * parts of a message and then each part is then send to some endpoint. + * Answer from the splitter is produced using given {@link AggregationStrategy} + * @param aggregationStrategy the strategy used to aggregate responses for + * every part + * @param parallelProcessing if is true camel will fork thread to call the endpoint producer + * @param threadPoolExecutor override the default {@link ThreadPoolExecutor} + * @return the expression clause for the expression on which to split + */ + public ExpressionClause splitter(AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) { + SplitterType answer = new SplitterType(); + addOutput(answer); + answer.setAggregationStrategy(aggregationStrategy); + answer.setParallelProcessing(parallelProcessing); + answer.setThreadPoolExecutor(threadPoolExecutor); + return ExpressionClause.createAndSetExpression(answer); + } + /** * Creates the Resequencer Modified: activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/SplitterType.java URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/SplitterType.java?rev=711602&r1=711601&r2=711602&view=diff ============================================================================== --- activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/SplitterType.java (original) +++ activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/SplitterType.java Wed Nov 5 08:29:14 2008 @@ -49,6 +49,8 @@ @XmlTransient private ThreadPoolExecutor threadPoolExecutor; @XmlAttribute(required = false) + private String threadPoolExecutorRef; + @XmlAttribute(required = false) private Boolean streaming = false; public SplitterType() { @@ -78,9 +80,7 @@ if (aggregationStrategy == null) { aggregationStrategy = new UseLatestAggregationStrategy(); } - if (threadPoolExecutor == null) { - threadPoolExecutor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); - } + threadPoolExecutor = createThreadPoolExecutor(routeContext); return new Splitter(getExpression().createExpression(routeContext), childProcessor, aggregationStrategy, isParallelProcessing(), threadPoolExecutor, streaming); } @@ -125,6 +125,18 @@ return this; } + private ThreadPoolExecutor createThreadPoolExecutor(RouteContext routeContext) { + ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); + if (threadPoolExecutor == null && threadPoolExecutorRef != null) { + threadPoolExecutor = routeContext.lookup(threadPoolExecutorRef, ThreadPoolExecutor.class); + } + if (threadPoolExecutor == null) { + // fall back and use default + threadPoolExecutor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + } + return threadPoolExecutor; + } + public ThreadPoolExecutor getThreadPoolExecutor() { return threadPoolExecutor; }