Return-Path: Delivered-To: apmail-camel-commits-archive@www.apache.org Received: (qmail 69401 invoked from network); 25 Jun 2010 14:44:58 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 25 Jun 2010 14:44:58 -0000 Received: (qmail 33875 invoked by uid 500); 25 Jun 2010 14:44:58 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 33850 invoked by uid 500); 25 Jun 2010 14:44:57 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 33843 invoked by uid 99); 25 Jun 2010 14:44:57 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Jun 2010 14:44:57 +0000 X-ASF-Spam-Status: No, hits=-1558.3 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Jun 2010 14:44:56 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 794A923889B2; Fri, 25 Jun 2010 14:44:04 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r957996 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/seda/ main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/ Date: Fri, 25 Jun 2010 14:44:04 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100625144404.794A923889B2@eris.apache.org> Author: davsclaus Date: Fri Jun 25 14:44:03 2010 New Revision: 957996 URL: http://svn.apache.org/viewvc?rev=957996&view=rev Log: CAMEL-2859: Threads DSL now supports async routing engine. Non backward comp. change as waitForTask option removed. Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java - copied, changed from r957944, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=957996&r1=957995&r2=957996&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Fri Jun 25 14:44:03 2010 @@ -22,6 +22,8 @@ import java.util.concurrent.BlockingQueu import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -29,6 +31,7 @@ import org.apache.camel.Processor; import org.apache.camel.ShutdownRunningTask; import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.impl.ServiceSupport; +import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; import org.apache.camel.processor.MulticastProcessor; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.ShutdownAware; @@ -45,14 +48,14 @@ public class SedaConsumer extends Servic private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class); private SedaEndpoint endpoint; - private Processor processor; + private AsyncProcessor processor; private ExecutorService executor; - private Processor multicast; + private MulticastProcessor multicast; private ExceptionHandler exceptionHandler; public SedaConsumer(SedaEndpoint endpoint, Processor processor) { this.endpoint = endpoint; - this.processor = processor; + this.processor = AsyncProcessorTypeConverter.convert(processor); } @Override @@ -153,15 +156,25 @@ public class SedaConsumer extends Servic } // use a multicast processor to process it - Processor mp = getMulticastProcessor(); - mp.process(exchange); + MulticastProcessor mp = getMulticastProcessor(); + + // and use the asynchronous routing engine to support it + mp.process(exchange, new AsyncCallback() { + public void done(boolean doneSync) { + // noop + } + }); } else { - // use the regular processor - processor.process(exchange); + // use the regular processor and use the asynchronous routing engine to support it + processor.process(exchange, new AsyncCallback() { + public void done(boolean doneSync) { + // noop + } + }); } } - protected synchronized Processor getMulticastProcessor() { + protected synchronized MulticastProcessor getMulticastProcessor() { if (multicast == null) { int size = endpoint.getConsumers().size(); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=957996&r1=957995&r2=957996&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Fri Jun 25 14:44:03 2010 @@ -65,6 +65,8 @@ public class ThreadsDefinition extends O private String threadName; @XmlAttribute private ThreadPoolRejectedPolicy rejectedPolicy; + @XmlAttribute + private Boolean callerRunsWhenRejected = Boolean.TRUE; @Override public Processor createProcessor(RouteContext routeContext) throws Exception { @@ -97,11 +99,13 @@ public class ThreadsDefinition extends O } ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), executorService); - Processor childProcessor = createChildProcessor(routeContext, true); + if (getCallerRunsWhenRejected() != null) { + thread.setCallerRunsWhenRejected(getCallerRunsWhenRejected()); + } List pipe = new ArrayList(2); pipe.add(thread); - pipe.add(childProcessor); + pipe.add(createChildProcessor(routeContext, true)); // wrap in nested pipeline so this appears as one processor return new Pipeline(routeContext.getCamelContext(), pipe); } @@ -211,6 +215,19 @@ public class ThreadsDefinition extends O return this; } + /** + * Whether or not the caller should run the task when it was rejected by the thread pool. + *

+ * Is by default true + * + * @param callerRunsWhenRejected whether or not the caller should run + * @return the builder + */ + public ThreadsDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) { + setCallerRunsWhenRejected(callerRunsWhenRejected); + return this; + } + public ExecutorService getExecutorService() { return executorService; } @@ -282,4 +299,12 @@ public class ThreadsDefinition extends O public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) { this.rejectedPolicy = rejectedPolicy; } + + public Boolean getCallerRunsWhenRejected() { + return callerRunsWhenRejected; + } + + public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) { + this.callerRunsWhenRejected = callerRunsWhenRejected; + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=957996&r1=957995&r2=957996&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Fri Jun 25 14:44:03 2010 @@ -44,6 +44,7 @@ public class ThreadsProcessor extends Se private final CamelContext camelContext; private final ExecutorService executorService; private final AtomicBoolean shutdown = new AtomicBoolean(true); + private boolean callerRunsWhenRejected = true; private final class ProcessCall implements Runnable { private final Exchange exchange; @@ -67,8 +68,6 @@ public class ThreadsProcessor extends Se ObjectHelper.notNull(executorService, "executorService"); this.camelContext = camelContext; this.executorService = executorService; - // TODO: if rejection policy of executor service is caller runs then we need to tap into it - // so we can invoke the callback.done(true) to continue routing synchronously } public void process(final Exchange exchange) throws Exception { @@ -85,13 +84,25 @@ public class ThreadsProcessor extends Se executorService.submit(call); return false; } catch (RejectedExecutionException e) { - if (shutdown.get()) { - exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running.", e)); + if (isCallerRunsWhenRejected()) { + if (shutdown.get()) { + exchange.setException(new RejectedExecutionException()); + } else { + callback.done(true); + } } else { exchange.setException(e); } + return true; } - return true; + } + + public boolean isCallerRunsWhenRejected() { + return callerRunsWhenRejected; + } + + public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) { + this.callerRunsWhenRejected = callerRunsWhenRejected; } public String toString() { Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java (from r957944, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java&r1=957944&r2=957996&rev=957996&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java Fri Jun 25 14:44:03 2010 @@ -16,68 +16,90 @@ */ package org.apache.camel.processor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.camel.ContextTestSupport; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; /** * @version $Revision$ */ -public class ThreadsCorePoolTest extends ContextTestSupport { - - private static String beforeThreadName; - private static String afterThreadName; +public class ThreadsRejectedExecutionTest extends ContextTestSupport { - public void testThreadsCorePool() throws Exception { - getMockEndpoint("mock:result").expectedMessageCount(1); + @Override + public boolean isUseRouteBuilder() { + return false; + } - template.sendBody("direct:start", "Hello World"); + public void testThreadsRejectedExecution() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + // use a custom pool which rejects any new tasks while currently in progress + // this should force the ThreadsProcessor to run the tasks itself + ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new SynchronousQueue()); - assertMockEndpointsSatisfied(); + context.setTracing(true); - assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName)); - } + from("seda:start") + .to("log:before") + // will use our custom pool + .threads().executorService(pool) + .delay(1000) + .to("log:after") + .to("mock:result"); + } + }); + context.start(); - public void testThreadsCorePoolBuilder() throws Exception { - getMockEndpoint("mock:result").expectedMessageCount(1); + getMockEndpoint("mock:result").expectedMessageCount(3); - template.sendBody("direct:foo", "Hello World"); + template.sendBody("seda:start", "Hello World"); + template.sendBody("seda:start", "Hi World"); + template.sendBody("seda:start", "Bye World"); assertMockEndpointsSatisfied(); - - assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName)); } - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { + public void testThreadsRejectedExecutionCallerNotRuns() throws Exception { + context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { + // use a custom pool which rejects any new tasks while currently in progress + // this should force the ThreadsProcessor to run the tasks itself + ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new SynchronousQueue()); + context.setTracing(true); - from("direct:start") + from("seda:start") .to("log:before") - .process(new Processor() { - public void process(Exchange exchange) throws Exception { - beforeThreadName = Thread.currentThread().getName(); - } - }) - // will use a a custom thread pool with 5 in core and 5 as max - .threads(5) - .process(new Processor() { - public void process(Exchange exchange) throws Exception { - afterThreadName = Thread.currentThread().getName(); - } - }) + // will use our custom pool + .threads().executorService(pool).callerRunsWhenRejected(false) + .delay(1000) .to("log:after") .to("mock:result"); - - from("direct:foo") - // using the builder style - .threads().poolSize(5) - .to("mock:result"); } - }; + }); + context.start(); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(3); + // wait at most 5 seconds + mock.setResultWaitTime(5000); + + template.sendBody("seda:start", "Hello World"); + template.sendBody("seda:start", "Hi World"); + template.sendBody("seda:start", "Bye World"); + + // should not be possible to route all 3 + mock.assertIsNotSatisfied(); + + // only 1 should arrive + assertEquals(1, mock.getReceivedCounter()); } + } \ No newline at end of file