Return-Path: Delivered-To: apmail-activemq-camel-commits-archive@locus.apache.org Received: (qmail 86257 invoked from network); 3 Dec 2008 07:28:45 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 3 Dec 2008 07:28:45 -0000 Received: (qmail 64145 invoked by uid 500); 3 Dec 2008 07:28:57 -0000 Delivered-To: apmail-activemq-camel-commits-archive@activemq.apache.org Received: (qmail 64130 invoked by uid 500); 3 Dec 2008 07:28:57 -0000 Mailing-List: contact camel-commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: camel-dev@activemq.apache.org Delivered-To: mailing list camel-commits@activemq.apache.org Received: (qmail 64121 invoked by uid 99); 3 Dec 2008 07:28:57 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Dec 2008 23:28:57 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Dec 2008 07:27:36 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 320C9238895D; Tue, 2 Dec 2008 23:27:54 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r722800 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/ Date: Wed, 03 Dec 2008 07:27:53 -0000 To: camel-commits@activemq.apache.org From: ningjiang@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081203072754.320C9238895D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ningjiang Date: Tue Dec 2 23:27:52 2008 New Revision: 722800 URL: http://svn.apache.org/viewvc?rev=722800&view=rev Log: CAMEL-1098 Applied patch with thanks to Tim Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java?rev=722800&r1=722799&r2=722800&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java Tue Dec 2 23:27:52 2008 @@ -17,7 +17,7 @@ package org.apache.camel.model; import java.util.List; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executor; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; @@ -49,7 +49,7 @@ @XmlTransient private AggregationStrategy aggregationStrategy; @XmlTransient - private ThreadPoolExecutor threadPoolExecutor; + private Executor executor; @Override public String toString() { @@ -74,9 +74,9 @@ aggregationStrategy = new UseLatestAggregationStrategy(); } if (threadPoolRef != null) { - threadPoolExecutor = routeContext.lookup(threadPoolRef, ThreadPoolExecutor.class); + executor = routeContext.lookup(threadPoolRef, Executor.class); } - return new MulticastProcessor(list, aggregationStrategy, isParallelProcessing(), threadPoolExecutor); + return new MulticastProcessor(list, aggregationStrategy, isParallelProcessing(), executor); } public AggregationStrategy getAggregationStrategy() { @@ -97,12 +97,12 @@ return this; } - public ThreadPoolExecutor getThreadPoolExecutor() { - return threadPoolExecutor; + public Executor getExecutor() { + return executor; } - public MulticastType setThreadPoolExecutor(ThreadPoolExecutor executor) { - this.threadPoolExecutor = executor; + public MulticastType setThreadPoolExecutor(Executor executor) { + this.executor = executor; return this; } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=722800&r1=722799&r2=722800&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java Tue Dec 2 23:27:52 2008 @@ -24,7 +24,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executor; + import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; @@ -298,7 +299,7 @@ * @return a ThreadType builder that can be used to further configure the * the thread pool. */ - public ProcessorType thread(ThreadPoolExecutor executor) { + public ProcessorType thread(Executor executor) { ThreadType answer = new ThreadType(executor); addOutput(answer); return this; @@ -593,15 +594,15 @@ * * @param expression the expression on which to split * @param parallelProcessing if is true camel will fork thread to call the endpoint producer - * @param threadPoolExecutor override the default {@link ThreadPoolExecutor} + * @param executor override the default {@link Executor} * @return the builder */ public SplitterType split(Expression expression, boolean parallelProcessing, - ThreadPoolExecutor threadPoolExecutor) { + Executor executor) { SplitterType answer = new SplitterType(expression); addOutput(answer); answer.setParallelProcessing(parallelProcessing); - answer.setThreadPoolExecutor(threadPoolExecutor); + answer.setExecutor(executor); return answer; } @@ -629,14 +630,14 @@ * The splitter responds with the answer produced by the given {@link AggregationStrategy}. * * @param parallelProcessing if is true camel will fork thread to call the endpoint producer - * @param threadPoolExecutor override the default {@link ThreadPoolExecutor} + * @param executor override the default {@link Executor} * @return the expression clause for the expression on which to split */ - public ExpressionClause split(boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) { + public ExpressionClause split(boolean parallelProcessing, Executor executor) { SplitterType answer = new SplitterType(); addOutput(answer); answer.setParallelProcessing(parallelProcessing); - answer.setThreadPoolExecutor(threadPoolExecutor); + answer.setExecutor(executor); return ExpressionClause.createAndSetExpression(answer); } @@ -669,16 +670,16 @@ * @param expression the expression on which to split * @param aggregationStrategy the strategy used to aggregate responses for every part * @param parallelProcessing if is true camel will fork thread to call the endpoint producer - * @param threadPoolExecutor override the default {@link ThreadPoolExecutor} + * @param executor override the default {@link Executor} * @return the builder */ public SplitterType split(Expression expression, AggregationStrategy aggregationStrategy, - boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) { + boolean parallelProcessing, Executor executor) { SplitterType answer = new SplitterType(expression); addOutput(answer); answer.setAggregationStrategy(aggregationStrategy); answer.setParallelProcessing(parallelProcessing); - answer.setThreadPoolExecutor(threadPoolExecutor); + answer.setExecutor(executor); return answer; } @@ -708,16 +709,16 @@ * * @param aggregationStrategy the strategy used to aggregate responses for every part * @param parallelProcessing if is true camel will fork thread to call the endpoint producer - * @param threadPoolExecutor override the default {@link ThreadPoolExecutor} + * @param executor override the default {@link Executor} * @return the expression clause for the expression on which to split */ public ExpressionClause split(AggregationStrategy aggregationStrategy, boolean parallelProcessing, - ThreadPoolExecutor threadPoolExecutor) { + Executor executor) { SplitterType answer = new SplitterType(); addOutput(answer); answer.setAggregationStrategy(aggregationStrategy); answer.setParallelProcessing(parallelProcessing); - answer.setThreadPoolExecutor(threadPoolExecutor); + answer.setExecutor(executor); return ExpressionClause.createAndSetExpression(answer); } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java?rev=722800&r1=722799&r2=722800&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java Tue Dec 2 23:27:52 2008 @@ -16,6 +16,7 @@ */ package org.apache.camel.model; +import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -47,7 +48,7 @@ @XmlAttribute(required = false) private Boolean parallelProcessing; @XmlTransient - private ThreadPoolExecutor threadPoolExecutor; + private Executor executor; @XmlAttribute(required = false) private String threadPoolExecutorRef; @XmlAttribute(required = false) @@ -80,9 +81,9 @@ if (aggregationStrategy == null) { aggregationStrategy = new UseLatestAggregationStrategy(); } - threadPoolExecutor = createThreadPoolExecutor(routeContext); + executor = createThreadPoolExecutor(routeContext); return new Splitter(getExpression().createExpression(routeContext), childProcessor, aggregationStrategy, - isParallelProcessing(), threadPoolExecutor, streaming); + isParallelProcessing(), executor, streaming); } public AggregationStrategy getAggregationStrategy() { @@ -127,23 +128,23 @@ return this; } - private ThreadPoolExecutor createThreadPoolExecutor(RouteContext routeContext) { - ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); - if (threadPoolExecutor == null && threadPoolExecutorRef != null) { - threadPoolExecutor = routeContext.lookup(threadPoolExecutorRef, ThreadPoolExecutor.class); + private Executor createThreadPoolExecutor(RouteContext routeContext) { + Executor executor = getExecutor(); + if (executor == null && threadPoolExecutorRef != null) { + executor = routeContext.lookup(threadPoolExecutorRef, ThreadPoolExecutor.class); } - if (threadPoolExecutor == null) { + if (executor == null) { // fall back and use default - threadPoolExecutor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + executor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } - return threadPoolExecutor; + return executor; } - public ThreadPoolExecutor getThreadPoolExecutor() { - return threadPoolExecutor; + public Executor getExecutor() { + return executor; } - public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) { - this.threadPoolExecutor = threadPoolExecutor; + public void setExecutor(Executor executor) { + this.executor = executor; } } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java?rev=722800&r1=722799&r2=722800&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java Tue Dec 2 23:27:52 2008 @@ -19,7 +19,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executor; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; @@ -63,7 +63,7 @@ @XmlTransient private ThreadGroup threadGroup; @XmlTransient - private ThreadPoolExecutor executor; + private Executor executor; public ThreadType() { } @@ -73,7 +73,7 @@ this.maxSize = coreSize; } - public ThreadType(ThreadPoolExecutor executor) { + public ThreadType(Executor executor) { this.executor = executor; } @@ -245,7 +245,7 @@ * @param executor the executor * @return the builder */ - public ThreadType executor(ThreadPoolExecutor executor) { + public ThreadType executor(Executor executor) { setExecutor(executor); return this; } @@ -292,11 +292,11 @@ this.threadGroup = threadGroup; } - public ThreadPoolExecutor getExecutor() { + public Executor getExecutor() { return executor; } - public void setExecutor(ThreadPoolExecutor executor) { + public void setExecutor(Executor executor) { this.executor = executor; } } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=722800&r1=722799&r2=722800&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Tue Dec 2 23:27:52 2008 @@ -21,6 +21,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; @@ -69,7 +70,7 @@ private Collection processors; private AggregationStrategy aggregationStrategy; private boolean isParallelProcessing; - private ThreadPoolExecutor executor; + private Executor executor; private final boolean streaming; private final AtomicBoolean shutdown = new AtomicBoolean(true); @@ -81,11 +82,11 @@ this(processors, aggregationStrategy, false, null); } - public MulticastProcessor(Collection processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor executor) { + public MulticastProcessor(Collection processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, Executor executor) { this(processors, aggregationStrategy, parallelProcessing, executor, false); } - public MulticastProcessor(Collection processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor executor, boolean streaming) { + public MulticastProcessor(Collection processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, Executor executor, boolean streaming) { notNull(processors, "processors"); this.processors = processors; this.aggregationStrategy = aggregationStrategy; @@ -228,17 +229,17 @@ protected void doStop() throws Exception { shutdown.set(true); - if (executor != null) { - executor.shutdown(); - executor.awaitTermination(0, TimeUnit.SECONDS); + if (executor != null && executor instanceof ThreadPoolExecutor) { + ((ThreadPoolExecutor)executor).shutdown(); + ((ThreadPoolExecutor)executor).awaitTermination(0, TimeUnit.SECONDS); } ServiceHelper.stopServices(processors); } protected void doStart() throws Exception { shutdown.set(false); - if (executor != null) { - executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { + if (executor != null && executor instanceof ThreadPoolExecutor) { + ((ThreadPoolExecutor)executor).setRejectedExecutionHandler(new RejectedExecutionHandler() { public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { ProcessCall call = (ProcessCall)runnable; call.exchange.setException(new RejectedExecutionException()); @@ -274,7 +275,7 @@ return aggregationStrategy; } - public ThreadPoolExecutor getExecutor() { + public Executor getExecutor() { return executor; } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=722800&r1=722799&r2=722800&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Tue Dec 2 23:27:52 2008 @@ -22,7 +22,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executor; import org.apache.camel.Exchange; import org.apache.camel.Expression; @@ -53,8 +53,8 @@ } public Splitter(Expression expression, Processor destination, AggregationStrategy aggregationStrategy, - boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor, boolean streaming) { - super(Collections.singleton(destination), aggregationStrategy, parallelProcessing, threadPoolExecutor, streaming); + boolean parallelProcessing, Executor executor, boolean streaming) { + super(Collections.singleton(destination), aggregationStrategy, parallelProcessing, executor, streaming); this.expression = expression; notNull(expression, "expression"); Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java?rev=722800&r1=722799&r2=722800&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java Tue Dec 2 23:27:52 2008 @@ -18,10 +18,11 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.AsyncCallback; @@ -37,7 +38,7 @@ */ public class ThreadProcessor implements AsyncProcessor, Service { - private ThreadPoolExecutor executor; + private Executor executor; private long stackSize; private ThreadGroup threadGroup; private int priority = Thread.NORM_PRIORITY; @@ -100,8 +101,10 @@ public void stop() throws Exception { shutdown.set(true); - executor.shutdown(); - executor.awaitTermination(0, TimeUnit.SECONDS); + if (executor instanceof ThreadPoolExecutor) { + ((ThreadPoolExecutor)executor).shutdown(); + ((ThreadPoolExecutor)executor).awaitTermination(0, TimeUnit.SECONDS); + } } public long getStackSize() { @@ -179,7 +182,7 @@ this.taskQueue = taskQueue; } - public ThreadPoolExecutor getExecutor() { + public Executor getExecutor() { if (executor == null) { executor = new ThreadPoolExecutor(getCoreSize(), getMaxSize(), getKeepAliveTime(), TimeUnit.MILLISECONDS, getTaskQueue(), new ThreadFactory() { public Thread newThread(Runnable runnable) { @@ -198,7 +201,7 @@ return executor; } - public void setExecutor(ThreadPoolExecutor executor) { + public void setExecutor(Executor executor) { this.executor = executor; } Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java?rev=722800&r1=722799&r2=722800&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java (original) +++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java Tue Dec 2 23:27:52 2008 @@ -88,7 +88,7 @@ + " being less than: " + failUntilAttempt); } } - + // START SNIPPET: AsyncProcessor public boolean process(Exchange exchange, AsyncCallback callback) { Integer counter = exchange.getIn().getHeader(DeadLetterChannel.REDELIVERY_COUNTER, Integer.class); @@ -105,6 +105,7 @@ callback.done(false); return false; } + // END SNIPPET: AsyncProcessor }; return new RouteBuilder() { Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java?rev=722800&r1=722799&r2=722800&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java (original) +++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java Tue Dec 2 23:27:52 2008 @@ -32,7 +32,7 @@ protected ThreadPoolExecutor customThreadPoolExecutor = new ThreadPoolExecutor(8, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); public void testSplitterWithCustomThreadPoolExecutor() throws Exception { - ThreadPoolExecutor threadPoolExecutor = getSplitter().getThreadPoolExecutor(); + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) getSplitter().getExecutor(); // this should be sufficient as core pool size is the only thing I changed from the default assertTrue(threadPoolExecutor.getCorePoolSize() == customThreadPoolExecutor.getCorePoolSize()); assertTrue(threadPoolExecutor.getMaximumPoolSize() == customThreadPoolExecutor.getMaximumPoolSize());