Return-Path: Delivered-To: apmail-camel-commits-archive@www.apache.org Received: (qmail 67516 invoked from network); 6 May 2009 07:08:50 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 6 May 2009 07:08:50 -0000 Received: (qmail 98086 invoked by uid 500); 6 May 2009 07:08:50 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 98041 invoked by uid 500); 6 May 2009 07:08:50 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 98032 invoked by uid 99); 6 May 2009 07:08:50 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 May 2009 07:08:50 +0000 X-ASF-Spam-Status: No, hits=-1998.5 required=10.0 tests=ALL_TRUSTED,WEIRD_PORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 May 2009 07:08:38 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 6E8BC23888F4; Wed, 6 May 2009 07:08:18 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r772076 [1/2] - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/component/seda/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/impl/converter/ ... Date: Wed, 06 May 2009 07:08:16 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090506070818.6E8BC23888F4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: davsclaus Date: Wed May 6 07:08:14 2009 New Revision: 772076 URL: http://svn.apache.org/viewvc?rev=772076&view=rev Log: CAMEL-1572: First cut of new Async API. AsyncProcessor is @deprecated. Only MulticastProcessor needs to be migrated. Thread DSL will be replaced with a new Async DSL later, hence why its removed. Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProducerTest.java - copied, changed from r771551, camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java (with props) Removed: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue794Test.java camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue901Test.java camel/trunk/camel-core/src/test/java/org/apache/camel/issues/ThreadErrorHandlerTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadSetErrorHandlerTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SThreadDefinition.scala camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/ThreadTest.scala Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java camel/trunk/camel-core/src/main/java/org/apache/camel/HeaderFilterStrategyAware.java camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineConcurrentTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatDataSetTest.java camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/MultiThreadedHttpGetTest.java camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SAbstractDefinition.scala camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/builder/RouteBuilder.scala camel/trunk/components/camel-test/src/main/java/org/apache/camel/test/TestSupport.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java Wed May 6 07:08:14 2009 @@ -21,7 +21,7 @@ * The callback interface for an {@link AsyncProcessor} so that it can * notify you when an {@link Exchange} has completed. * - * @deprecated a new async API is planned for Camel 2.0 + * @deprecated a new async API is planned for Camel 2.0, will be removed in Camel 2.0 GA */ public interface AsyncCallback { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java Wed May 6 07:08:14 2009 @@ -20,13 +20,10 @@ /** * A more complex version of {@link Processor} which supports asynchronous - * processing of the {@link Exchange}. Any processor can be coerced to - * have an {@link AsyncProcessor} interface by using the - * {@link org.apache.camel.impl.converter.AsyncProcessorTypeConverter#convert AsyncProcessorTypeConverter.covert} - * method. + * processing of the {@link Exchange}. * * @version $Revision$ - * @deprecated a new async API is planned for Camel 2.0 + * @deprecated a new async API is planned for Camel 2.0, will be removed in Camel 2.0 GA */ public interface AsyncProcessor extends Processor { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Wed May 6 07:08:14 2009 @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import org.apache.camel.builder.ErrorHandlerBuilder; import org.apache.camel.model.RouteDefinition; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Wed May 6 07:08:14 2009 @@ -157,9 +157,7 @@ /** * Returns the outbound message, lazily creating one if one has not already - * been associated with this exchange. If you want to inspect this property - * but not force lazy creation then invoke the {@link #getOut(boolean)} - * method passing in false + * been associated with this exchange. *

* If you want to test whether an OUT message have been set or not, use the {@link #hasOut()} method. * Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/HeaderFilterStrategyAware.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/HeaderFilterStrategyAware.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/HeaderFilterStrategyAware.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/HeaderFilterStrategyAware.java Wed May 6 07:08:14 2009 @@ -27,6 +27,8 @@ */ public interface HeaderFilterStrategyAware { + // TODO move this to SPI package + HeaderFilterStrategy getHeaderFilterStrategy(); void setHeaderFilterStrategy(HeaderFilterStrategy strategy); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java Wed May 6 07:08:14 2009 @@ -17,6 +17,10 @@ package org.apache.camel; import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.ExecutorService; /** * Template (named like Spring's TransactionTemplate & JmsTemplate @@ -44,6 +48,9 @@ */ public interface ProducerTemplate extends Service { + // Synchronous methods + // ----------------------------------------------------------------------- + /** * Sends the exchange to the default endpoint * @@ -132,18 +139,6 @@ Exchange send(String endpointUri, ExchangePattern pattern, Processor processor); /** - * Sends an exchange to an endpoint using a supplied processor - * - * @param endpointUri the endpoint URI to send the exchange to - * @param processor the transformer used to populate the new exchange - * {@link Processor} to populate the exchange. - * @param callback the callback will be called when the exchange is completed. - * @return the returned exchange - * @deprecated a new async API is planned for Camel 2.0 - */ - Exchange send(String endpointUri, Processor processor, AsyncCallback callback); - - /** * Sends the exchange to the given endpoint * * @param endpoint the endpoint to send the exchange to @@ -175,18 +170,6 @@ Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor); /** - * Sends an exchange to an endpoint using a supplied processor - * - * @param endpoint the endpoint to send the exchange to - * @param processor the transformer used to populate the new exchange - * {@link Processor} to populate the exchange. - * @param callback the callback will be called when the exchange is completed. - * @return the returned exchange - * @deprecated a new async API is planned for Camel 2.0 - */ - Exchange send(Endpoint endpoint, Processor processor, AsyncCallback callback); - - /** * Send the body to an endpoint * * @param endpoint the endpoint to send the exchange to @@ -407,6 +390,7 @@ * Uses an {@link ExchangePattern#InOut} message exchange pattern. * * @param body the payload to send + * @param type the expected response type * @return the result (see class javadoc) */ T requestBody(Object body, Class type); @@ -552,4 +536,137 @@ * @return the result (see class javadoc) */ T requestBodyAndHeaders(Endpoint endpoint, Object body, Map headers, Class type); + + + // Asynchronous methods + // ----------------------------------------------------------------------- + + /** + * Sets the executor service to use for async messaging. + *

+ * If none provided Camel will default use a {@link java.util.concurrent.ScheduledExecutorService} + * with a pool of 5 threads. + * + * @param executorService the executor service. + */ + void setExecutorService(ExecutorService executorService); + + /** + * Sends an asynchronous exchange to the given endpoint. + * + * @param endpointUri the endpoint URI to send the exchange to + * @param exchange the exchange to send + * @return a handle to be used to get the response in the future + */ + Future asyncSend(String endpointUri, Exchange exchange); + + /** + * Sends an asynchronous exchange to the given endpoint. + * + * @param endpointUri the endpoint URI to send the exchange to + * @param processor the transformer used to populate the new exchange + * @return a handle to be used to get the response in the future + */ + Future asyncSend(String endpointUri, Processor processor); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {@link ExchangePattern#InOnly} message exchange pattern. + * + * @param endpointUri the endpoint URI to send the exchange to + * @param body the body to send + * @return a handle to be used to get the response in the future + */ + Future asyncSendBody(String endpointUri, Object body); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {@link ExchangePattern#InOut} message exchange pattern. + * + * @param endpointUri the endpoint URI to send the exchange to + * @param body the body to send + * @return a handle to be used to get the response in the future + */ + Future asyncRequestBody(String endpointUri, Object body); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {@link ExchangePattern#InOut} message exchange pattern. + * + * @param endpointUri the endpoint URI to send the exchange to + * @param body the body to send + * @param header the header name + * @param headerValue the header value + * @return a handle to be used to get the response in the future + */ + Future asyncRequestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {@link ExchangePattern#InOut} message exchange pattern. + * + * @param endpointUri the endpoint URI to send the exchange to + * @param body the body to send + * @param headers headers + * @return a handle to be used to get the response in the future + */ + Future asyncRequestBodyAndHeaders(String endpointUri, Object body, Map headers); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {@link ExchangePattern#InOut} message exchange pattern. + * + * @param endpointUri the endpoint URI to send the exchange to + * @param body the body to send + * @param type the expected response type + * @return a handle to be used to get the response in the future + */ + Future asyncRequestBody(String endpointUri, Object body, Class type); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {@link ExchangePattern#InOut} message exchange pattern. + * + * @param endpointUri the endpoint URI to send the exchange to + * @param body the body to send + * @param header the header name + * @param headerValue the header value + * @param type the expected response type + * @return a handle to be used to get the response in the future + */ + Future asyncRequestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class type); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {@link ExchangePattern#InOut} message exchange pattern. + * + * @param endpointUri the endpoint URI to send the exchange to + * @param body the body to send + * @param headers headers + * @param type the expected response type + * @return a handle to be used to get the response in the future + */ + Future asyncRequestBodyAndHeaders(String endpointUri, Object body, Map headers, Class type); + + /** + * Gets the response body from the future handle, will wait until the response is ready. + * + * @param future the handle to get the response + * @param type the expected response type + * @return the result (see class javadoc) + */ + T asyncExtractBody(Future future, Class type); + + /** + * Gets the response body from the future handle, will wait at most the given time for the response to be ready. + * + * @param future the handle to get the response + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @param type the expected response type + * @return the result (see class javadoc) + * @throws java.util.concurrent.TimeoutException if the wait timed out + */ + T asyncExtractBody(Future future, long timeout, TimeUnit unit, Class type) throws TimeoutException; + } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java Wed May 6 07:08:14 2009 @@ -18,10 +18,9 @@ import java.util.Collection; -import org.apache.camel.AsyncCallback; -import org.apache.camel.AsyncProcessor; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.Processor; import org.apache.camel.impl.DefaultProducer; /** @@ -29,7 +28,7 @@ * * @version $Revision$ */ -public class CollectionProducer extends DefaultProducer implements AsyncProcessor { +public class CollectionProducer extends DefaultProducer implements Processor { private final Collection queue; public CollectionProducer(Endpoint endpoint, Collection queue) { @@ -41,11 +40,4 @@ queue.add(exchange.copy()); } - public boolean process(Exchange exchange, AsyncCallback callback) { - Exchange copy = exchange.copy(); - copy.setProperty("CamelAsyncCallback", callback); - queue.add(copy); - callback.done(true); - return true; - } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Wed May 6 07:08:14 2009 @@ -22,13 +22,10 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import org.apache.camel.AsyncCallback; -import org.apache.camel.AsyncProcessor; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.ServiceSupport; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,17 +38,17 @@ private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class); private SedaEndpoint endpoint; - private AsyncProcessor processor; + private Processor processor; private ExecutorService executor; public SedaConsumer(SedaEndpoint endpoint, Processor processor) { this.endpoint = endpoint; - this.processor = AsyncProcessorTypeConverter.convert(processor); + this.processor = processor; } @Override public String toString() { - return "SedaConsumer: " + endpoint.getEndpointUri(); + return "SedaConsumer[" + endpoint.getEndpointUri() + "]"; } public void run() { @@ -71,14 +68,6 @@ } catch (Exception e) { LOG.error("Seda queue caught: " + e, e); } - - // TODO: It ought to be UnitOfWork that did the callback notification but we are planning - // to replace it with a brand new Async API so we leave it as is - AsyncCallback callback = exchange.getProperty("CamelAsyncCallback", AsyncCallback.class); - if (callback != null) { - // seda consumer is async so invoke the async done on the callback if its provided - callback.done(false); - } } else { LOG.warn("This consumer is stopped during polling an exchange, so putting it back on the seda queue: " + exchange); try { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java Wed May 6 07:08:14 2009 @@ -16,11 +16,9 @@ */ package org.apache.camel.impl; -import org.apache.camel.AsyncProcessor; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Processor; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.util.ServiceHelper; @@ -32,7 +30,6 @@ public class DefaultConsumer extends ServiceSupport implements Consumer { private final Endpoint endpoint; private final Processor processor; - private AsyncProcessor asyncProcessor; private ExceptionHandler exceptionHandler; public DefaultConsumer(Endpoint endpoint, Processor processor) { @@ -42,7 +39,7 @@ @Override public String toString() { - return "Consumer on " + endpoint; + return "Consumer[" + endpoint.getEndpointUri() + "]"; } public Endpoint getEndpoint() { @@ -53,19 +50,6 @@ return processor; } - /** - * Provides an {@link AsyncProcessor} interface to the configured - * processor on the consumer. If the processor does not implement - * the interface, it will be adapted so that it does. - * @deprecated - */ - public AsyncProcessor getAsyncProcessor() { - if (asyncProcessor == null) { - asyncProcessor = AsyncProcessorTypeConverter.convert(processor); - } - return asyncProcessor; - } - public ExceptionHandler getExceptionHandler() { if (exceptionHandler == null) { exceptionHandler = new LoggingExceptionHandler(getClass()); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Wed May 6 07:08:14 2009 @@ -18,8 +18,14 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; -import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -30,8 +36,8 @@ import org.apache.camel.Producer; import org.apache.camel.ProducerTemplate; import org.apache.camel.util.CamelContextHelper; -import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.ObjectHelper; /** * A client helper object (named like Spring's TransactionTemplate & JmsTemplate @@ -47,9 +53,16 @@ private final Map endpointCache = new HashMap(); private boolean useEndpointCache = true; private Endpoint defaultEndpoint; - + private ExecutorService executor; + public DefaultProducerTemplate(CamelContext context) { this.context = context; + this.executor = new ScheduledThreadPoolExecutor(5); + } + + public DefaultProducerTemplate(CamelContext context, ExecutorService executor) { + this.context = context; + this.executor = executor; } public DefaultProducerTemplate(CamelContext context, Endpoint defaultEndpoint) { @@ -60,7 +73,7 @@ public static DefaultProducerTemplate newInstance(CamelContext camelContext, String defaultEndpointUri) { Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, defaultEndpointUri); return new DefaultProducerTemplate(camelContext, endpoint); - } + } public Exchange send(String endpointUri, Exchange exchange) { Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); @@ -72,11 +85,6 @@ return send(endpoint, processor); } - public Exchange send(String endpointUri, Processor processor, AsyncCallback callback) { - Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); - return send(endpoint, processor, callback); - } - public Exchange send(String endpointUri, ExchangePattern pattern, Processor processor) { Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); return send(endpoint, pattern, processor); @@ -91,10 +99,6 @@ return producerCache.send(endpoint, processor); } - public Exchange send(Endpoint endpoint, Processor processor, AsyncCallback callback) { - return producerCache.send(endpoint, processor, callback); - } - public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) { return producerCache.send(endpoint, pattern, processor); } @@ -450,6 +454,7 @@ protected void doStop() throws Exception { producerCache.stop(); endpointCache.clear(); + executor.shutdown(); } protected Object extractResultBody(Exchange result) { @@ -460,4 +465,136 @@ return ExchangeHelper.extractResultBody(result, pattern); } + public void setExecutorService(ExecutorService executorService) { + this.executor = executorService; + } + + public Future asyncSend(final String uri, final Exchange exchange) { + Callable task = new Callable() { + public Exchange call() throws Exception { + return send(uri, exchange); + } + }; + + return executor.submit(task); + } + + public Future asyncSend(final String uri, final Processor processor) { + Callable task = new Callable() { + public Exchange call() throws Exception { + return send(uri, processor); + } + }; + + return executor.submit(task); + } + + public Future asyncSendBody(final String uri, final Object body) { + Callable task = new Callable() { + public Object call() throws Exception { + sendBody(uri, body); + // its InOnly, so no body to return + return null; + } + }; + + return executor.submit(task); + } + + public Future asyncRequestBody(final String uri, final Object body) { + Callable task = new Callable() { + public Object call() throws Exception { + return requestBody(uri, body); + } + }; + + return executor.submit(task); + } + + public Future asyncRequestBody(final String uri, final Object body, final Class type) { + Callable task = new Callable() { + public T call() throws Exception { + return requestBody(uri, body, type); + } + }; + + return executor.submit(task); + } + + public Future asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) { + Callable task = new Callable() { + public Object call() throws Exception { + return requestBodyAndHeader(endpointUri, body, header, headerValue); + } + }; + + return executor.submit(task); + } + + public Future asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class type) { + Callable task = new Callable() { + public T call() throws Exception { + return requestBodyAndHeader(endpointUri, body, header, headerValue, type); + } + }; + + return executor.submit(task); + } + + public Future asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map headers) { + Callable task = new Callable() { + public Object call() throws Exception { + return requestBodyAndHeaders(endpointUri, body, headers); + } + }; + + return executor.submit(task); + } + + public Future asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map headers, final Class type) { + Callable task = new Callable() { + public T call() throws Exception { + return requestBodyAndHeaders(endpointUri, body, headers, type); + } + }; + + return executor.submit(task); + } + + + public T asyncExtractBody(Future future, Class type) { + try { + return doExtractBody(future.get(), type); + } catch (InterruptedException e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } catch (ExecutionException e) { + // execution failed due to an exception so rethrow the cause + throw ObjectHelper.wrapRuntimeCamelException(e.getCause()); + } + } + + public T asyncExtractBody(Future future, long timeout, TimeUnit unit, Class type) throws TimeoutException { + try { + if (timeout > 0) { + return doExtractBody(future.get(timeout, unit), type); + } else { + return doExtractBody(future.get(), type); + } + } catch (InterruptedException e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } catch (ExecutionException e) { + // execution failed due to an exception so rethrow the cause + throw ObjectHelper.wrapRuntimeCamelException(e.getCause()); + } + } + + private T doExtractBody(Object result, Class type) { + if (result instanceof Exchange) { + Exchange exchange = (Exchange) result; + Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern()); + return context.getTypeConverter().convertTo(type, answer); + } + return context.getTypeConverter().convertTo(type, result); + } + } \ No newline at end of file Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Wed May 6 07:08:14 2009 @@ -19,21 +19,17 @@ import java.util.HashMap; import java.util.Map; -import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.FailedToCreateProducerException; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; import org.apache.camel.util.ServiceHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException; - /** * Cache containing created {@link Producer}. * @@ -91,26 +87,6 @@ } } - /** - * Sends an exchange to an endpoint using a supplied - * {@link Processor} to populate the exchange. The callback - * will be called when the exchange is completed. - * - * @param endpoint the endpoint to send the exchange to - * @param processor the transformer used to populate the new exchange - */ - public Exchange send(Endpoint endpoint, Processor processor, AsyncCallback callback) { - try { - Producer producer = getProducer(endpoint); - Exchange exchange = producer.createExchange(); - boolean sync = sendExchange(endpoint, producer, processor, exchange, callback); - setProcessedSync(exchange, sync); - return exchange; - } catch (Exception e) { - throw wrapRuntimeCamelException(e); - } - } - public static boolean isProcessedSync(Exchange exchange) { Boolean rc = exchange.getProperty(Exchange.PROCESSED_SYNC, Boolean.class); return rc == null ? false : rc; @@ -152,17 +128,6 @@ return exchange; } - protected boolean sendExchange(Endpoint endpoint, Producer producer, Processor processor, Exchange exchange, AsyncCallback callback) throws Exception { - // lets populate using the processor callback - processor.process(exchange); - - // now lets dispatch - if (LOG.isDebugEnabled()) { - LOG.debug(">>>> " + endpoint + " " + exchange); - } - return AsyncProcessorTypeConverter.convert(producer).process(exchange, callback); - } - protected void doStop() throws Exception { ServiceHelper.stopServices(producers.values()); producers.clear(); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java Wed May 6 07:08:14 2009 @@ -68,7 +68,6 @@ addFallbackTypeConverter(new EnumTypeConverter()); addFallbackTypeConverter(new ArrayTypeConverter()); addFallbackTypeConverter(new PropertyEditorTypeConverter()); - addFallbackTypeConverter(new AsyncProcessorTypeConverter()); } public List getTypeConverterLoaders() { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Wed May 6 07:08:14 2009 @@ -688,34 +688,6 @@ } /** - * Causes subsequent processors to be called asynchronously - * - * @param coreSize the number of threads that will be used to process - * messages in subsequent processors. - * @return a ThreadType builder that can be used to further configure the - * the thread pool. - */ - public ThreadDefinition thread(int coreSize) { - ThreadDefinition answer = new ThreadDefinition(coreSize); - addOutput(answer); - return answer; - } - - /** - * Causes subsequent processors to be called asynchronously - * - * @param executor the executor that will be used to process - * messages in subsequent processors. - * @return a ThreadType builder that can be used to further configure the - * the thread pool. - */ - public ProcessorDefinition thread(Executor executor) { - ThreadDefinition answer = new ThreadDefinition(executor); - addOutput(answer); - return this; - } - - /** * Idempotent consumer EIP: * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer} * to avoid duplicate messages Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed May 6 07:08:14 2009 @@ -50,7 +50,7 @@ */ public class MulticastProcessor extends ServiceSupport implements Processor, Navigate { - // TODO: Cleanup this when AsyncProcessor/AsyncCallback is replaced with new async API + // TODO: Use JDK CompletionService to get rid of the AsyncProcessor/AsyncCallback static class ProcessorExchangePair { private final Processor processor; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Wed May 6 07:08:14 2009 @@ -20,13 +20,9 @@ import java.util.Iterator; import java.util.List; -import org.apache.camel.AsyncCallback; -import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; -import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,11 +33,9 @@ * * @version $Revision$ */ -public class Pipeline extends MulticastProcessor implements AsyncProcessor { +public class Pipeline extends MulticastProcessor implements Processor { private static final transient Log LOG = LogFactory.getLog(Pipeline.class); - // TODO: Cleanup this when AsyncProcessor/AsyncCallback is replaced with new async API - public Pipeline(Collection processors) { super(processors); } @@ -56,18 +50,37 @@ } public void process(Exchange exchange) throws Exception { - AsyncProcessorHelper.process(this, exchange); - } - - public boolean process(Exchange original, AsyncCallback callback) { Iterator processors = getProcessors().iterator(); - Exchange nextExchange = original; + Exchange nextExchange = exchange; boolean first = true; - while (true) { + + while (continueRouting(processors, nextExchange)) { + if (first) { + first = false; + } else { + // prepare for next run + nextExchange = createNextExchange(nextExchange); + } + + // get the next processor + Processor processor = processors.next(); + + // process the next exchange + try { + if (LOG.isTraceEnabled()) { + // this does the actual processing so log at trace level + LOG.trace("Processing exchangeId: " + nextExchange.getExchangeId() + " >>> " + nextExchange); + } + processor.process(nextExchange); + } catch (Exception e) { + nextExchange.setException(e); + } + + // check for error if so we should break out boolean exceptionHandled = hasExceptionBeenHandled(nextExchange); if (nextExchange.isFailed() || exceptionHandled) { // The Exchange.EXCEPTION_HANDLED property is only set if satisfactory handling was done - // by the error handler. It's still an exception, the exchange still failed. + // by the error handler. It's still an exception, the exchange still failed. if (LOG.isDebugEnabled()) { LOG.debug("Message exchange has failed so breaking out of pipeline: " + nextExchange + " exception: " + nextExchange.getException() + " fault: " @@ -76,86 +89,19 @@ } break; } - - // should we continue routing or not - if (!continueRouting(processors, nextExchange)) { - break; - } - - AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next()); - - if (first) { - first = false; - } else { - nextExchange = createNextExchange(processor, nextExchange); - } - - boolean sync = process(original, nextExchange, callback, processors, processor); - // Continue processing the pipeline synchronously ... - if (!sync) { - // The pipeline will be completed async... - return false; - } } - // If we get here then the pipeline was processed entirely - // synchronously. if (LOG.isTraceEnabled()) { // logging nextExchange as it contains the exchange that might have altered the payload and since // we are logging the completion if will be confusing if we log the original instead // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots - LOG.trace("Processing compelete for exchangeId: " + original.getExchangeId() + " >>> " + nextExchange); - } - ExchangeHelper.copyResults(original, nextExchange); - callback.done(true); - return true; - } - - private boolean process(final Exchange original, final Exchange exchange, final AsyncCallback callback, final Iterator processors, AsyncProcessor processor) { - if (LOG.isTraceEnabled()) { - // this does the actual processing so log at trace level - LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " >>> " + exchange); + LOG.trace("Processing compelete for exchangeId: " + exchange.getExchangeId() + " >>> " + nextExchange); } - return processor.process(exchange, new AsyncCallback() { - public void done(boolean sync) { - // We only have to handle async completion of the pipeline.. - if (sync) { - return; - } - - // Continue processing the pipeline... - Exchange nextExchange = exchange; - while (continueRouting(processors, nextExchange)) { - AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next()); - - boolean exceptionHandled = hasExceptionBeenHandled(nextExchange); - if (nextExchange.isFailed() || exceptionHandled) { - // The Exchange.EXCEPTION_HANDLED property is only set if satisfactory handling was done - // by the error handler. It's still an exception, the exchange still failed. - if (LOG.isDebugEnabled()) { - LOG.debug("Message exchange has failed so breaking out of pipeline: " + nextExchange - + " exception: " + nextExchange.getException() + " fault: " - + (nextExchange.hasFault() ? nextExchange.getFault() : null) - + (exceptionHandled ? " handled by the error handler" : "")); - } - break; - } - - nextExchange = createNextExchange(processor, nextExchange); - sync = process(original, nextExchange, callback, processors, processor); - if (!sync) { - return; - } - } - - ExchangeHelper.copyResults(original, nextExchange); - callback.done(false); - } - }); + // copy results back to the original exchange + ExchangeHelper.copyResults(exchange, nextExchange); } - private static boolean hasExceptionBeenHandled(Exchange nextExchange) { return Boolean.TRUE.equals(nextExchange.getProperty(Exchange.EXCEPTION_HANDLED)); } @@ -165,11 +111,10 @@ *

* Remember to copy the original exchange id otherwise correlation of ids in the log is a problem * - * @param producer the producer used to send to the endpoint * @param previousExchange the previous exchange * @return a new exchange */ - protected Exchange createNextExchange(Processor producer, Exchange previousExchange) { + protected Exchange createNextExchange(Exchange previousExchange) { Exchange answer = previousExchange.newInstance(); // we must use the same id as this is a snapshot strategy where Camel copies a snapshot // before processing the next step in the pipeline, so we have a snapshot of the exchange @@ -212,4 +157,5 @@ public String toString() { return "Pipeline" + getProcessors(); } + } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java Wed May 6 07:08:14 2009 @@ -16,11 +16,12 @@ */ package org.apache.camel.util; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; -import org.apache.camel.AsyncCallback; -import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; +import org.apache.camel.Processor; /** * Helper methods for AsyncProcessor objects. @@ -32,21 +33,21 @@ } /** - * Calls the async version of the processor's process method and waits - * for it to complete before returning. This can be used by AsyncProcessor - * objects to implement their sync version of the process method. + * Processes the exchange async. + * + * @param executor executor service + * @param processor the processor + * @param exchange the exchange + * @return a future handle for the task being executed asynchronously */ - public static void process(AsyncProcessor processor, Exchange exchange) throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - boolean sync = processor.process(exchange, new AsyncCallback() { - public void done(boolean sync) { - if (!sync) { - latch.countDown(); - } + public static Future asyncProcess(final ExecutorService executor, final Processor processor, final Exchange exchange) { + Callable task = new Callable() { + public Exchange call() throws Exception { + processor.process(exchange); + return exchange; } - }); - if (!sync) { - latch.await(); - } + }; + + return executor.submit(task); } } Modified: camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index (original) +++ camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index Wed May 6 07:08:14 2009 @@ -60,7 +60,6 @@ SortDefinition SplitDefinition StopDefinition -ThreadDefinition ThrottleDefinition ThrowFaultDefinition ToDefinition Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java Wed May 6 07:08:14 2009 @@ -26,7 +26,6 @@ import org.apache.camel.builder.ValueBuilder; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.impl.DefaultExchange; -import org.apache.camel.processor.DelegateAsyncProcessor; import org.apache.camel.processor.DelegateProcessor; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.PredicateAssertHelper; @@ -351,9 +350,7 @@ */ protected Processor unwrap(Processor processor) { while (true) { - if (processor instanceof DelegateAsyncProcessor) { - processor = ((DelegateAsyncProcessor)processor).getProcessor(); - } else if (processor instanceof DelegateProcessor) { + if (processor instanceof DelegateProcessor) { processor = ((DelegateProcessor)processor).getProcessor(); } else { return processor; @@ -371,9 +368,6 @@ while (true) { if (processor instanceof Channel) { return (Channel) processor; - } - if (processor instanceof DelegateAsyncProcessor) { - processor = ((DelegateAsyncProcessor)processor).getProcessor(); } else if (processor instanceof DelegateProcessor) { processor = ((DelegateProcessor)processor).getProcessor(); } else { Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProducerTest.java (from r771551, camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProducerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProducerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java&r1=771551&r2=772076&rev=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProducerTest.java Wed May 6 07:08:14 2009 @@ -16,56 +16,31 @@ */ package org.apache.camel.component.seda; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Future; -import org.apache.camel.AsyncCallback; -import org.apache.camel.AsyncProcessor; import org.apache.camel.ContextTestSupport; -import org.apache.camel.Endpoint; import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; -import org.apache.camel.Producer; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; /** - * Async processor with seda to simulate the caller thread is not blocking while the - * exchange is processed and we get callbacks when the exchange is complete. + * The new Async API version of doing async routing based on the old AsyncProcessor API + * In the old SedaAsyncProcessorTest a seda endpoint was needed to really turn it into async. This is not + * needed by the new API so we send it using direct instead. * * @version $Revision$ */ -public class SedaAsyncProcessorTest extends ContextTestSupport { +public class SedaAsyncProducerTest extends ContextTestSupport { - private CountDownLatch latchAsync = new CountDownLatch(1); - private boolean doneSync; private String route = ""; - public void testAsyncWithSeda() throws Exception { + public void testAsyncProducer() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); - Endpoint endpoint = context.getEndpoint("seda:start"); - Producer producer = endpoint.createProducer(); - - Exchange exchange = producer.createExchange(ExchangePattern.InOut); - exchange.getIn().setBody("Hello World"); - - // seda producer is async also (but this is ugly to need to cast) - AsyncProcessor async = (AsyncProcessor) producer; - - boolean sync = async.process(exchange, new AsyncCallback() { - public void done(boolean sync) { - // we expect 2 callbacks - // the first is when we have finished sending to the seda producer - if (sync) { - doneSync = true; - } else { - latchAsync.countDown(); - } - } - }); + // using the new async API we can fire a real async message + Future future = template.asyncRequestBody("direct:start", "Hello World", String.class); // I should happen before mock route = route + "send"; @@ -73,16 +48,10 @@ assertMockEndpointsSatisfied(); assertEquals("Send should occur before processor", "sendprocess", route); - assertTrue("Sync done should have occured", doneSync); - // wait at most 2 seconds - boolean zero = latchAsync.await(2, TimeUnit.SECONDS); - assertTrue("Async done should have occured", zero); - - // how to get the response? - String response = exchange.getOut().getBody(String.class); - // TODO: we need a new API for getting the result - //assertEquals("Bye World", response); + // and get the response with the future handle + String response = future.get(); + assertEquals("Bye World", response); } @Override @@ -92,7 +61,7 @@ public void configure() throws Exception { errorHandler(noErrorHandler()); - from("seda:start").delay(100) + from("direct:start").delay(100) .process(new Processor() { public void process(Exchange exchange) throws Exception { route = route + "process"; @@ -105,4 +74,4 @@ } }; } -} +} \ No newline at end of file Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java?rev=772076&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java Wed May 6 07:08:14 2009 @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.util.AsyncProcessorHelper; + +/** + * @version $Revision$ + */ +public class DefaultProducerTemplateAsyncTest extends ContextTestSupport { + + public void testRequestAsync() throws Exception { + Exchange exchange = new DefaultExchange(context); + exchange.getIn().setBody("Hello"); + + Future future = template.asyncSend("direct:start", exchange); + long start = System.currentTimeMillis(); + + // you can do other stuff + String echo = template.requestBody("direct:echo", "Hi", String.class); + assertEquals("HiHi", echo); + + Exchange result = future.get(); + + long delta = System.currentTimeMillis() - start; + assertTrue("Should take longer than: " + delta, delta > 250); + assertEquals("Hello World", result.getOut().getBody()); + + assertMockEndpointsSatisfied(); + } + + public void testSendAsyncProcessor() throws Exception { + Future future = template.asyncSend("direct:start", new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Hello"); + } + }); + long start = System.currentTimeMillis(); + + // you can do other stuff + String echo = template.requestBody("direct:echo", "Hi", String.class); + assertEquals("HiHi", echo); + + Exchange result = future.get(); + + long delta = System.currentTimeMillis() - start; + assertTrue("Should take longer than: " + delta, delta > 250); + assertEquals("Hello World", result.getOut().getBody()); + } + + public void testRequestAsyncBody() throws Exception { + Future future = template.asyncRequestBody("direct:start", "Hello"); + long start = System.currentTimeMillis(); + + // you can do other stuff + String echo = template.requestBody("direct:echo", "Hi", String.class); + assertEquals("HiHi", echo); + + // we can use extract body to convert to expect body type + String result = template.asyncExtractBody(future, String.class); + + long delta = System.currentTimeMillis() - start; + assertTrue("Should take longer than: " + delta, delta > 250); + assertEquals("Hello World", result); + } + + public void testRequestAsyncBodyType() throws Exception { + Future future = template.asyncRequestBody("direct:start", "Hello", String.class); + long start = System.currentTimeMillis(); + + // you can do other stuff + String echo = template.requestBody("direct:echo", "Hi", String.class); + assertEquals("HiHi", echo); + + // or we can use parameter type in the requestBody method so the future handle know its type + String result = future.get(); + + long delta = System.currentTimeMillis() - start; + assertTrue("Should take longer than: " + delta, delta > 250); + assertEquals("Hello World", result); + } + + public void testRequestAsyncBodyAndHeader() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World"); + mock.expectedHeaderReceived("foo", 123); + + Future future = template.asyncRequestBodyAndHeader("direct:start", "Hello", "foo", 123); + long start = System.currentTimeMillis(); + + // you can do other stuff + String echo = template.requestBody("direct:echo", "Hi", String.class); + assertEquals("HiHi", echo); + + // we can use extract body to convert to expect body type + String result = template.asyncExtractBody(future, String.class); + + assertMockEndpointsSatisfied(); + + long delta = System.currentTimeMillis() - start; + assertTrue("Should take longer than: " + delta, delta > 250); + assertEquals("Hello World", result); + } + + public void testRequestAsyncBodyAndHeaderType() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World"); + mock.expectedHeaderReceived("foo", 123); + + Future future = template.asyncRequestBodyAndHeader("direct:start", "Hello", "foo", 123, String.class); + long start = System.currentTimeMillis(); + + // you can do other stuff + String echo = template.requestBody("direct:echo", "Hi", String.class); + assertEquals("HiHi", echo); + + // or we can use parameter type in the requestBody method so the future handle know its type + String result = future.get(); + + assertMockEndpointsSatisfied(); + + long delta = System.currentTimeMillis() - start; + assertTrue("Should take longer than: " + delta, delta > 250); + assertEquals("Hello World", result); + } + + public void testRequestAsyncBodyAndHeaders() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World"); + mock.expectedHeaderReceived("foo", 123); + mock.expectedHeaderReceived("bar", "cheese"); + + Map headers = new HashMap(); + headers.put("foo", 123); + headers.put("bar", "cheese"); + Future future = template.asyncRequestBodyAndHeaders("direct:start", "Hello", headers); + long start = System.currentTimeMillis(); + + // you can do other stuff + String echo = template.requestBody("direct:echo", "Hi", String.class); + assertEquals("HiHi", echo); + + // we can use extract body to convert to expect body type + String result = template.asyncExtractBody(future, String.class); + + assertMockEndpointsSatisfied(); + + long delta = System.currentTimeMillis() - start; + assertTrue("Should take longer than: " + delta, delta > 250); + assertEquals("Hello World", result); + } + + public void testRequestAsyncBodyAndHeadersType() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World"); + mock.expectedHeaderReceived("foo", 123); + mock.expectedHeaderReceived("bar", "cheese"); + + Map headers = new HashMap(); + headers.put("foo", 123); + headers.put("bar", "cheese"); + Future future = template.asyncRequestBodyAndHeaders("direct:start", "Hello", headers, String.class); + long start = System.currentTimeMillis(); + + // you can do other stuff + String echo = template.requestBody("direct:echo", "Hi", String.class); + assertEquals("HiHi", echo); + + // or we can use parameter type in the requestBody method so the future handle know its type + String result = future.get(); + + assertMockEndpointsSatisfied(); + + long delta = System.currentTimeMillis() - start; + assertTrue("Should take longer than: " + delta, delta > 250); + assertEquals("Hello World", result); + } + + public void testRequestAsyncErrorWhenProcessing() throws Exception { + Exchange exchange = new DefaultExchange(context); + exchange.getIn().setBody("Hello"); + + Future future = template.asyncRequestBody("direct:error", exchange); + long start = System.currentTimeMillis(); + + // you can do other stuff + String echo = template.requestBody("direct:echo", "Hi", String.class); + assertEquals("HiHi", echo); + + try { + Exchange result = template.asyncExtractBody(future, Exchange.class); + fail("Should have thrown exception"); + } catch (RuntimeCamelException e) { + assertEquals("Damn forced by unit test", e.getCause().getMessage()); + } + + long delta = System.currentTimeMillis() - start; + assertTrue("Should take longer than: " + delta, delta > 250); + } + + public void testRequestAsyncBodyErrorWhenProcessing() throws Exception { + Future future = template.asyncRequestBody("direct:error", "Hello"); + long start = System.currentTimeMillis(); + + // you can do other stuff + String echo = template.requestBody("direct:echo", "Hi", String.class); + assertEquals("HiHi", echo); + + try { + String result = template.asyncExtractBody(future, String.class); + fail("Should have thrown exception"); + } catch (RuntimeCamelException e) { + assertEquals("Damn forced by unit test", e.getCause().getMessage()); + } + + long delta = System.currentTimeMillis() - start; + assertTrue("Should take longer than: " + delta, delta > 250); + } + + public void testAsyncProcessWithClassicAPI() throws Exception { + Endpoint endpoint = context.getEndpoint("direct:start"); + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setBody("Hello"); + + long start = System.currentTimeMillis(); + + // produce it async so we use a helper + Producer producer = endpoint.createProducer(); + // normally you will use a shared exectutor service with pools + ExecutorService executor = Executors.newSingleThreadExecutor(); + // send it async with the help of this helper + Future future = AsyncProcessorHelper.asyncProcess(executor, producer, exchange); + + // you can do other stuff + String echo = template.requestBody("direct:echo", "Hi", String.class); + assertEquals("HiHi", echo); + + String result = template.asyncExtractBody(future, String.class); + + assertMockEndpointsSatisfied(); + + long delta = System.currentTimeMillis() - start; + assertTrue("Should take longer than: " + delta, delta > 250); + assertEquals("Hello World", result); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .delay(250) + .transform(body().append(" World")).to("mock:result"); + + from("direct:error") + .delay(250) + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + throw new IllegalArgumentException("Damn forced by unit test"); + } + }); + + from("direct:echo").transform(body().append(body())); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java Wed May 6 07:08:14 2009 @@ -16,8 +16,6 @@ */ package org.apache.camel.processor; -import org.apache.camel.AsyncCallback; -import org.apache.camel.AsyncProcessor; import org.apache.camel.ContextTestSupport; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -78,7 +76,7 @@ } protected RouteBuilder createRouteBuilder() { - final Processor processor = new AsyncProcessor() { + final Processor processor = new Processor() { public void process(Exchange exchange) { Integer counter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); int attempt = (counter == null) ? 1 : counter + 1; @@ -87,24 +85,6 @@ + " being less than: " + failUntilAttempt); } } - // START SNIPPET: AsyncProcessor - public boolean process(Exchange exchange, AsyncCallback callback) { - Integer counter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); - int attempt = (counter == null) ? 1 : counter + 1; - if (attempt > 1) { - assertEquals("Now we should use TimerThread to call the process", Thread.currentThread().getName(), - "Camel DeadLetterChannel Redeliver Timer"); - } - - if (attempt < failUntilAttempt) { - // we can't throw the exception here , or the callback will not be invoked. - exchange.setException(new RuntimeException("Failed to process due to attempt: " + attempt - + " being less than: " + failUntilAttempt)); - } - callback.done(false); - return false; - } - // END SNIPPET: AsyncProcessor }; return new RouteBuilder() { Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineConcurrentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineConcurrentTest.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineConcurrentTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineConcurrentTest.java Wed May 6 07:08:14 2009 @@ -30,6 +30,8 @@ */ public class PipelineConcurrentTest extends ContextTestSupport { + private String uri = "seda:in?size=10000&concurrentConsumers=10"; + public void testConcurrentPipeline() throws Exception { int total = 10000; final int group = total / 20; @@ -51,7 +53,7 @@ } catch (InterruptedException e) { // ignore } - template.sendBody("seda:in?size=10000", "" + (start + i)); + template.sendBody(uri, "" + (start + i)); } } }); @@ -67,8 +69,7 @@ // to force any exceptions coming forward immediately errorHandler(noErrorHandler()); - from("seda:in?size=10000") - .thread(10) + from(uri) .pipeline("direct:do", "mock:result"); from("direct:do") Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java Wed May 6 07:08:14 2009 @@ -94,7 +94,7 @@ protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { - from("direct:async").thread(1).to("direct:foo"); + from("seda:async").to("direct:foo"); from("direct:foo").process(new Processor() { public void process(Exchange exchange) throws Exception { log.info("Received: " + exchange); Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java Wed May 6 07:08:14 2009 @@ -22,7 +22,7 @@ public class UnitOfWorkWithAsyncFlowTest extends UnitOfWorkTest { @Override protected void setUp() throws Exception { - uri = "direct:async"; + uri = "seda:async"; super.setUp(); } } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java Wed May 6 07:08:14 2009 @@ -42,8 +42,6 @@ @Override public void configure() throws Exception { // TODO: Should also work with DLC - // will be possible when we remove the AsyncProcessor so the processing logic - // is much easier to deal with // errorHandler(deadLetterChannel("mock:dead").disableRedelivery()); onException(MyTechnicalException.class) Modified: camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatDataSetTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatDataSetTest.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatDataSetTest.java (original) +++ camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatDataSetTest.java Wed May 6 07:08:14 2009 @@ -55,7 +55,7 @@ DataFormat jaxb = new JaxbDataFormat("org.apache.camel.example"); // use 5 concurrent threads to do marshalling - from("dataset:beer").thread(5).marshal(jaxb).to("dataset:beer"); + from("dataset:beer").marshal(jaxb).to("dataset:beer"); } }; } Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java (original) +++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java Wed May 6 07:08:14 2009 @@ -22,7 +22,6 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.apache.camel.AsyncCallback; import org.apache.camel.component.http.CamelServlet; import org.apache.camel.component.http.HttpConsumer; import org.apache.camel.component.http.HttpExchange; @@ -56,30 +55,30 @@ if (continuation.isNew()) { // Have the camel process the HTTP exchange. - final HttpExchange exchange = new HttpExchange(consumer.getEndpoint(), request, response); - boolean sync = consumer.getAsyncProcessor().process(exchange, new AsyncCallback() { - public void done(boolean sync) { - if (sync) { - return; - } - continuation.setObject(exchange); - continuation.resume(); - } - }); + // final HttpExchange exchange = new HttpExchange(consumer.getEndpoint(), request, response); + // boolean sync = consumer.getAsyncProcessor().process(exchange, new AsyncCallback() { + // public void done(boolean sync) { + // if (sync) { + // return; + // } + // continuation.setObject(exchange); + // continuation.resume(); + // } + //}); - if (!sync) { + //if (!sync) { // Wait for the exchange to get processed. // This might block until it completes or it might return via an exception and // then this method is re-invoked once the the exchange has finished processing - continuation.suspend(0); - } + // continuation.suspend(0); + //} // HC: The getBinding() is interesting because it illustrates the // impedance miss-match between HTTP's stream oriented protocol, and // Camels more message oriented protocol exchanges. // now lets output to the response - consumer.getBinding().writeResponse(exchange, response); + //consumer.getBinding().writeResponse(exchange, response); return; } Modified: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/MultiThreadedHttpGetTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/MultiThreadedHttpGetTest.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/MultiThreadedHttpGetTest.java (original) +++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/MultiThreadedHttpGetTest.java Wed May 6 07:08:14 2009 @@ -37,7 +37,7 @@ // so the stream has to be read to the end. When this happens // the associated connection is released automatically. - String endpointName = "seda:withConversion"; + String endpointName = "seda:withConversion?concurrentConsumers=5"; sendMessagesTo(endpointName, 5); } @@ -50,7 +50,7 @@ context.getComponent("http", HttpComponent.class).getHttpConnectionManager().getParams() .setDefaultMaxConnectionsPerHost(5); - String endpointName = "seda:withoutConversion"; + String endpointName = "seda:withoutConversion?concurrentConsumers=5"; sendMessagesTo(endpointName, 5); } @@ -63,7 +63,7 @@ for (int i = 0; i < 5; i++) { mockEndpoint.expectedMessageCount(1); - template.sendBody("seda:withoutConversion", null); + template.sendBody("seda:withoutConversion?concurrentConsumers=5", null); mockEndpoint.assertIsSatisfied(); Object response = mockEndpoint.getReceivedExchanges().get(0).getIn().getBody(); InputStream responseStream = assertIsInstanceOf(InputStream.class, response); @@ -95,10 +95,10 @@ protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() { - from("seda:withConversion").thread(5).to("http://localhost:5430/search") + from("seda:withConversion?concurrentConsumers=5").to("http://localhost:5430/search") .convertBodyTo(String.class).to("mock:results"); - from("seda:withoutConversion").thread(5).to("http://localhost:5430/search") + from("seda:withoutConversion?concurrentConsumers=5").to("http://localhost:5430/search") .to("mock:results"); from("jetty:http://localhost:5430/search").process(new Processor() { Modified: camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java (original) +++ camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java Wed May 6 07:08:14 2009 @@ -19,7 +19,6 @@ import java.util.Random; import org.apache.camel.ContextTestSupport; -import org.apache.camel.builder.DeadLetterChannelBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -29,6 +28,8 @@ */ public class XQueryConcurrencyTest extends ContextTestSupport { + private String uri = "seda:in?concurrentConsumers=5"; + public void testConcurrency() throws Exception { int total = 1000; @@ -51,7 +52,7 @@ } catch (InterruptedException e) { // ignore } - template.sendBody("seda:in", "" + (start + i) + "James"); + template.sendBody(uri, "" + (start + i) + "James"); } } }); @@ -67,8 +68,7 @@ // no retry as we want every failure to submerge errorHandler(noErrorHandler()); - from("seda:in") - .thread(5) + from(uri) .transform().xquery("/person/id", String.class) .to("mock:result"); } Modified: camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java (original) +++ camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java Wed May 6 07:08:14 2009 @@ -71,8 +71,9 @@ // no retry as we want every failure to submerge errorHandler(noErrorHandler()); - from("direct:start") - .thread(5) + from("direct:start").to("seda:foo?concurrentConsumers=5"); + + from("seda:foo?concurrentConsumers=5") .to("xquery:org/apache/camel/component/xquery/transform.xquery") .to("mock:result"); } Modified: camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala?rev=772076&r1=772075&r2=772076&view=diff ============================================================================== --- camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala (original) +++ camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala Wed May 6 07:08:14 2009 @@ -44,7 +44,6 @@ def setbody(expression: Exchange => Any) : DSL def setheader(header: String, expression: Exchange => Any) : DSL def split(expression: Exchange => Any) : SSplitDefinition - def thread(number: Int) : SThreadDefinition def throttle(frequency: Frequency) : SThrottleDefinition def to(uris: String*) : DSL def unmarshal(format: DataFormatDefinition) : DSL