camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r772076 [1/2] - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/component/seda/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/impl/converter/ ...
Date Wed, 06 May 2009 07:08:16 GMT
Author: davsclaus
Date: Wed May  6 07:08:14 2009
New Revision: 772076

URL: http://svn.apache.org/viewvc?rev=772076&view=rev
Log:
CAMEL-1572: First cut of new Async API. AsyncProcessor is @deprecated. Only MulticastProcessor needs to be migrated. Thread DSL will be replaced with a new Async DSL later, hence why its removed.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProducerTest.java
      - copied, changed from r771551, camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java   (with props)
Removed:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue794Test.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue901Test.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/issues/ThreadErrorHandlerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadSetErrorHandlerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
    camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SThreadDefinition.scala
    camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/ThreadTest.scala
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/HeaderFilterStrategyAware.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
    camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
    camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineConcurrentTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java
    camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatDataSetTest.java
    camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
    camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/MultiThreadedHttpGetTest.java
    camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java
    camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java
    camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala
    camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SAbstractDefinition.scala
    camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/builder/RouteBuilder.scala
    camel/trunk/components/camel-test/src/main/java/org/apache/camel/test/TestSupport.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java Wed May  6 07:08:14 2009
@@ -21,7 +21,7 @@
  * The callback interface for an {@link AsyncProcessor} so that it can
  * notify you when an {@link Exchange} has completed.
  *
- * @deprecated a new async API is planned for Camel 2.0
+ * @deprecated a new async API is planned for Camel 2.0, will be removed in Camel 2.0 GA
  */
 public interface AsyncCallback {
     

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java Wed May  6 07:08:14 2009
@@ -20,13 +20,10 @@
 
 /**
  * A more complex version of {@link Processor} which supports asynchronous
- * processing of the {@link Exchange}.  Any processor can be coerced to
- * have an {@link AsyncProcessor} interface by using the
- * {@link org.apache.camel.impl.converter.AsyncProcessorTypeConverter#convert AsyncProcessorTypeConverter.covert}
- * method.
+ * processing of the {@link Exchange}.
  * 
  * @version $Revision$
- * @deprecated a new async API is planned for Camel 2.0
+ * @deprecated a new async API is planned for Camel 2.0, will be removed in Camel 2.0 GA
  */
 public interface AsyncProcessor extends Processor {
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Wed May  6 07:08:14 2009
@@ -20,6 +20,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.builder.ErrorHandlerBuilder;
 import org.apache.camel.model.RouteDefinition;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Wed May  6 07:08:14 2009
@@ -157,9 +157,7 @@
 
     /**
      * Returns the outbound message, lazily creating one if one has not already
-     * been associated with this exchange. If you want to inspect this property
-     * but not force lazy creation then invoke the {@link #getOut(boolean)}
-     * method passing in <tt>false</tt>
+     * been associated with this exchange.
      * <p/>
      * If you want to test whether an OUT message have been set or not, use the {@link #hasOut()} method.
      *

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/HeaderFilterStrategyAware.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/HeaderFilterStrategyAware.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/HeaderFilterStrategyAware.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/HeaderFilterStrategyAware.java Wed May  6 07:08:14 2009
@@ -27,6 +27,8 @@
  */
 public interface HeaderFilterStrategyAware {
 
+    // TODO move this to SPI package
+
     HeaderFilterStrategy getHeaderFilterStrategy();
     
     void setHeaderFilterStrategy(HeaderFilterStrategy strategy);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java Wed May  6 07:08:14 2009
@@ -17,6 +17,10 @@
 package org.apache.camel;
 
 import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Template (named like Spring's TransactionTemplate & JmsTemplate
@@ -44,6 +48,9 @@
  */
 public interface ProducerTemplate extends Service {
 
+    // Synchronous methods
+    // -----------------------------------------------------------------------
+
     /**
      * Sends the exchange to the default endpoint
      *
@@ -132,18 +139,6 @@
     Exchange send(String endpointUri, ExchangePattern pattern, Processor processor);
 
     /**
-     * Sends an exchange to an endpoint using a supplied processor
-     *
-     * @param endpointUri the endpoint URI to send the exchange to
-     * @param processor   the transformer used to populate the new exchange
-     * {@link Processor} to populate the exchange.
-     * @param callback    the callback will be called when the exchange is completed.
-     * @return the returned exchange
-     * @deprecated a new async API is planned for Camel 2.0
-     */
-    Exchange send(String endpointUri, Processor processor, AsyncCallback callback);
-
-    /**
      * Sends the exchange to the given endpoint
      *
      * @param endpoint the endpoint to send the exchange to
@@ -175,18 +170,6 @@
     Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor);
 
     /**
-     * Sends an exchange to an endpoint using a supplied processor
-     *
-     * @param endpoint  the endpoint to send the exchange to
-     * @param processor the transformer used to populate the new exchange
-     * {@link Processor} to populate the exchange.
-     * @param callback  the callback will be called when the exchange is completed.
-     * @return the returned exchange
-     * @deprecated a new async API is planned for Camel 2.0
-     */
-    Exchange send(Endpoint endpoint, Processor processor, AsyncCallback callback);
-
-    /**
      * Send the body to an endpoint
      *
      * @param endpoint   the endpoint to send the exchange to
@@ -407,6 +390,7 @@
      * Uses an {@link ExchangePattern#InOut} message exchange pattern.
      *
      * @param body the payload to send
+     * @param type the expected response type
      * @return the result (see class javadoc)
      */
     <T> T requestBody(Object body, Class<T> type);
@@ -552,4 +536,137 @@
      * @return the result (see class javadoc)
      */
     <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type);
+
+
+    // Asynchronous methods
+    // -----------------------------------------------------------------------
+
+    /**
+     * Sets the executor service to use for async messaging.
+     * <p/>
+     * If none provided Camel will default use a {@link java.util.concurrent.ScheduledExecutorService}
+     * with a pool of 5 threads.
+     *
+     * @param executorService  the executor service.
+     */
+    void setExecutorService(ExecutorService executorService);
+
+    /**
+     * Sends an asynchronous exchange to the given endpoint.
+     *
+     * @param endpointUri the endpoint URI to send the exchange to
+     * @param exchange    the exchange to send
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Exchange> asyncSend(String endpointUri, Exchange exchange);
+
+    /**
+     * Sends an asynchronous exchange to the given endpoint.
+     *
+     * @param endpointUri the endpoint URI to send the exchange to
+     * @param processor   the transformer used to populate the new exchange
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Exchange> asyncSend(String endpointUri, Processor processor);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOnly} message exchange pattern.
+     *
+     * @param endpointUri the endpoint URI to send the exchange to
+     * @param body        the body to send
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Object> asyncSendBody(String endpointUri, Object body);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     *
+     * @param endpointUri the endpoint URI to send the exchange to
+     * @param body        the body to send
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Object> asyncRequestBody(String endpointUri, Object body);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     *
+     * @param endpointUri the endpoint URI to send the exchange to
+     * @param body        the body to send
+     * @param header      the header name
+     * @param headerValue the header value
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Object> asyncRequestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     *
+     * @param endpointUri the endpoint URI to send the exchange to
+     * @param body        the body to send
+     * @param headers     headers
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Object> asyncRequestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     *
+     * @param endpointUri the endpoint URI to send the exchange to
+     * @param body        the body to send
+     * @param type        the expected response type
+     * @return a handle to be used to get the response in the future
+     */
+    <T> Future<T> asyncRequestBody(String endpointUri, Object body, Class<T> type);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     *
+     * @param endpointUri the endpoint URI to send the exchange to
+     * @param body        the body to send
+     * @param header      the header name
+     * @param headerValue the header value
+     * @param type        the expected response type
+     * @return a handle to be used to get the response in the future
+     */
+    <T> Future<T> asyncRequestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     *
+     * @param endpointUri the endpoint URI to send the exchange to
+     * @param body        the body to send
+     * @param headers     headers
+     * @param type        the expected response type
+     * @return a handle to be used to get the response in the future
+     */
+    <T> Future<T> asyncRequestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type);
+
+    /**
+     * Gets the response body from the future handle, will wait until the response is ready.
+     *
+     * @param future      the handle to get the response
+     * @param type        the expected response type
+     * @return the result (see class javadoc)
+     */
+    <T> T asyncExtractBody(Future future, Class<T> type);
+
+    /**
+     * Gets the response body from the future handle, will wait at most the given time for the response to be ready.
+     *
+     * @param future      the handle to get the response
+     * @param timeout     the maximum time to wait
+     * @param unit        the time unit of the timeout argument
+     * @param type        the expected response type
+     * @return the result (see class javadoc)
+     * @throws java.util.concurrent.TimeoutException if the wait timed out
+     */
+    <T> T asyncExtractBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException;
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java Wed May  6 07:08:14 2009
@@ -18,10 +18,9 @@
 
 import java.util.Collection;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultProducer;
 
 /**
@@ -29,7 +28,7 @@
  *
  * @version $Revision$
  */
-public class CollectionProducer extends DefaultProducer implements AsyncProcessor {
+public class CollectionProducer extends DefaultProducer implements Processor {
     private final Collection<Exchange> queue;
 
     public CollectionProducer(Endpoint endpoint, Collection<Exchange> queue) {
@@ -41,11 +40,4 @@
         queue.add(exchange.copy());
     }
 
-    public boolean process(Exchange exchange, AsyncCallback callback) {
-        Exchange copy = exchange.copy();
-        copy.setProperty("CamelAsyncCallback", callback);
-        queue.add(copy);
-        callback.done(true);
-        return true;
-    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Wed May  6 07:08:14 2009
@@ -22,13 +22,10 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.ServiceSupport;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -41,17 +38,17 @@
     private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class);
 
     private SedaEndpoint endpoint;
-    private AsyncProcessor processor;
+    private Processor processor;
     private ExecutorService executor;
 
     public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
         this.endpoint = endpoint;
-        this.processor = AsyncProcessorTypeConverter.convert(processor);
+        this.processor = processor;
     }
 
     @Override
     public String toString() {
-        return "SedaConsumer: " + endpoint.getEndpointUri();
+        return "SedaConsumer[" + endpoint.getEndpointUri() + "]";
     }
 
     public void run() {
@@ -71,14 +68,6 @@
                     } catch (Exception e) {
                         LOG.error("Seda queue caught: " + e, e);
                     }
-
-                    // TODO: It ought to be UnitOfWork that did the callback notification but we are planning
-                    // to replace it with a brand new Async API so we leave it as is
-                    AsyncCallback callback = exchange.getProperty("CamelAsyncCallback", AsyncCallback.class);
-                    if (callback != null) {
-                        // seda consumer is async so invoke the async done on the callback if its provided
-                        callback.done(false);
-                    }
                 } else {
                     LOG.warn("This consumer is stopped during polling an exchange, so putting it back on the seda queue: " + exchange);
                     try {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java Wed May  6 07:08:14 2009
@@ -16,11 +16,9 @@
  */
 package org.apache.camel.impl;
 
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.util.ServiceHelper;
 
@@ -32,7 +30,6 @@
 public class DefaultConsumer extends ServiceSupport implements Consumer {
     private final Endpoint endpoint;
     private final Processor processor;
-    private AsyncProcessor asyncProcessor;
     private ExceptionHandler exceptionHandler;
 
     public DefaultConsumer(Endpoint endpoint, Processor processor) {
@@ -42,7 +39,7 @@
 
     @Override
     public String toString() {
-        return "Consumer on " + endpoint;
+        return "Consumer[" + endpoint.getEndpointUri() + "]";
     }
 
     public Endpoint getEndpoint() {
@@ -53,19 +50,6 @@
         return processor;
     }
 
-    /**
-     * Provides an {@link AsyncProcessor} interface to the configured
-     * processor on the consumer.  If the processor does not implement
-     * the interface, it will be adapted so that it does.
-     * @deprecated
-     */
-    public AsyncProcessor getAsyncProcessor() {
-        if (asyncProcessor == null) {
-            asyncProcessor = AsyncProcessorTypeConverter.convert(processor);
-        }
-        return asyncProcessor;
-    }
-
     public ExceptionHandler getExceptionHandler() {
         if (exceptionHandler == null) {
             exceptionHandler = new LoggingExceptionHandler(getClass());

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Wed May  6 07:08:14 2009
@@ -18,8 +18,14 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -30,8 +36,8 @@
 import org.apache.camel.Producer;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.util.CamelContextHelper;
-import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
 
 /**
  * A client helper object (named like Spring's TransactionTemplate & JmsTemplate
@@ -47,9 +53,16 @@
     private final Map<String, Endpoint> endpointCache = new HashMap<String, Endpoint>();
     private boolean useEndpointCache = true;
     private Endpoint defaultEndpoint;
-    
+    private ExecutorService executor;
+
     public DefaultProducerTemplate(CamelContext context) {
         this.context = context;
+        this.executor = new ScheduledThreadPoolExecutor(5);
+    }
+
+    public DefaultProducerTemplate(CamelContext context, ExecutorService executor) {
+        this.context = context;
+        this.executor = executor;
     }
 
     public DefaultProducerTemplate(CamelContext context, Endpoint defaultEndpoint) {
@@ -60,7 +73,7 @@
     public static DefaultProducerTemplate newInstance(CamelContext camelContext, String defaultEndpointUri) {
         Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, defaultEndpointUri);
         return new DefaultProducerTemplate(camelContext, endpoint);
-    }   
+    }
 
     public Exchange send(String endpointUri, Exchange exchange) {
         Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
@@ -72,11 +85,6 @@
         return send(endpoint, processor);
     }
 
-    public Exchange send(String endpointUri, Processor processor, AsyncCallback callback) {
-        Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
-        return send(endpoint, processor, callback);
-    }
-
     public Exchange send(String endpointUri, ExchangePattern pattern, Processor processor) {
         Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
         return send(endpoint, pattern, processor);
@@ -91,10 +99,6 @@
         return producerCache.send(endpoint, processor);
     }
 
-    public Exchange send(Endpoint endpoint, Processor processor, AsyncCallback callback) {
-        return producerCache.send(endpoint, processor, callback);
-    }
-
     public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
         return producerCache.send(endpoint, pattern, processor);
     }
@@ -450,6 +454,7 @@
     protected void doStop() throws Exception {
         producerCache.stop();
         endpointCache.clear();
+        executor.shutdown();
     }
 
     protected Object extractResultBody(Exchange result) {
@@ -460,4 +465,136 @@
         return ExchangeHelper.extractResultBody(result, pattern);
     }
 
+    public void setExecutorService(ExecutorService executorService) {
+        this.executor = executorService;
+    }
+
+    public Future<Exchange> asyncSend(final String uri, final Exchange exchange) {
+        Callable<Exchange> task = new Callable<Exchange>() {
+            public Exchange call() throws Exception {
+                return send(uri, exchange);
+            }
+        };
+
+        return executor.submit(task);
+    }
+
+    public Future<Exchange> asyncSend(final String uri, final Processor processor) {
+        Callable<Exchange> task = new Callable<Exchange>() {
+            public Exchange call() throws Exception {
+                return send(uri, processor);
+            }
+        };
+
+        return executor.submit(task);
+    }
+
+    public Future<Object> asyncSendBody(final String uri, final Object body) {
+        Callable<Object> task = new Callable<Object>() {
+            public Object call() throws Exception {
+                sendBody(uri, body);
+                // its InOnly, so no body to return
+                return null;
+            }
+        };
+
+        return executor.submit(task);
+    }
+
+    public Future<Object> asyncRequestBody(final String uri, final Object body) {
+        Callable<Object> task = new Callable<Object>() {
+            public Object call() throws Exception {
+                return requestBody(uri, body);
+            }
+        };
+
+        return executor.submit(task);
+    }
+
+    public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) {
+        Callable<T> task = new Callable<T>() {
+            public T call() throws Exception {
+                return requestBody(uri, body, type);
+            }
+        };
+
+        return executor.submit(task);
+    }
+
+    public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) {
+        Callable<Object> task = new Callable<Object>() {
+            public Object call() throws Exception {
+                return requestBodyAndHeader(endpointUri, body, header, headerValue);
+            }
+        };
+
+        return executor.submit(task);
+    }
+
+    public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) {
+        Callable<T> task = new Callable<T>() {
+            public T call() throws Exception {
+                return requestBodyAndHeader(endpointUri, body, header, headerValue, type);
+            }
+        };
+
+        return executor.submit(task);
+    }
+
+    public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) {
+        Callable<Object> task = new Callable<Object>() {
+            public Object call() throws Exception {
+                return requestBodyAndHeaders(endpointUri, body, headers);
+            }
+        };
+
+        return executor.submit(task);
+    }
+
+    public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) {
+        Callable<T> task = new Callable<T>() {
+            public T call() throws Exception {
+                return requestBodyAndHeaders(endpointUri, body, headers, type);
+            }
+        };
+
+        return executor.submit(task);
+    }
+
+
+    public <T> T asyncExtractBody(Future future, Class<T> type) {
+        try {
+            return doExtractBody(future.get(), type);
+        } catch (InterruptedException e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        } catch (ExecutionException e) {
+            // execution failed due to an exception so rethrow the cause
+            throw ObjectHelper.wrapRuntimeCamelException(e.getCause());
+        }
+    }
+
+    public <T> T asyncExtractBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
+        try {
+            if (timeout > 0) {
+                return doExtractBody(future.get(timeout, unit), type);
+            } else {
+                return doExtractBody(future.get(), type);
+            }
+        } catch (InterruptedException e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        } catch (ExecutionException e) {
+            // execution failed due to an exception so rethrow the cause
+            throw ObjectHelper.wrapRuntimeCamelException(e.getCause());
+        }
+    }
+
+    private <T> T doExtractBody(Object result, Class<T> type) {
+        if (result instanceof Exchange) {
+            Exchange exchange = (Exchange) result;
+            Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern());
+            return context.getTypeConverter().convertTo(type, answer);
+        }
+        return context.getTypeConverter().convertTo(type, result);
+    }
+
 }
\ No newline at end of file

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Wed May  6 07:08:14 2009
@@ -19,21 +19,17 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.FailedToCreateProducerException;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
 
-
 /**
  * Cache containing created {@link Producer}.
  *
@@ -91,26 +87,6 @@
         }
     }
 
-    /**
-     * Sends an exchange to an endpoint using a supplied
-     * {@link Processor} to populate the exchange.  The callback
-     * will be called when the exchange is completed.
-     *
-     * @param endpoint the endpoint to send the exchange to
-     * @param processor the transformer used to populate the new exchange
-     */
-    public Exchange send(Endpoint endpoint, Processor processor, AsyncCallback callback) {
-        try {
-            Producer producer = getProducer(endpoint);
-            Exchange exchange = producer.createExchange();
-            boolean sync = sendExchange(endpoint, producer, processor, exchange, callback);
-            setProcessedSync(exchange, sync);
-            return exchange;
-        } catch (Exception e) {
-            throw wrapRuntimeCamelException(e);
-        }
-    }
-
     public static boolean isProcessedSync(Exchange exchange) {
         Boolean rc = exchange.getProperty(Exchange.PROCESSED_SYNC, Boolean.class);
         return rc == null ? false : rc;
@@ -152,17 +128,6 @@
         return exchange;
     }
 
-    protected boolean sendExchange(Endpoint endpoint, Producer producer, Processor processor, Exchange exchange, AsyncCallback callback) throws Exception {
-        // lets populate using the processor callback
-        processor.process(exchange);
-
-        // now lets dispatch
-        if (LOG.isDebugEnabled()) {
-            LOG.debug(">>>> " + endpoint + " " + exchange);
-        }
-        return AsyncProcessorTypeConverter.convert(producer).process(exchange, callback);
-    }
-
     protected void doStop() throws Exception {
         ServiceHelper.stopServices(producers.values());
         producers.clear();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java Wed May  6 07:08:14 2009
@@ -68,7 +68,6 @@
         addFallbackTypeConverter(new EnumTypeConverter());
         addFallbackTypeConverter(new ArrayTypeConverter());
         addFallbackTypeConverter(new PropertyEditorTypeConverter());
-        addFallbackTypeConverter(new AsyncProcessorTypeConverter());
     }
 
     public List<TypeConverterLoader> getTypeConverterLoaders() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Wed May  6 07:08:14 2009
@@ -688,34 +688,6 @@
     }
 
     /**
-     * Causes subsequent processors to be called asynchronously
-     *
-     * @param coreSize the number of threads that will be used to process
-     *                 messages in subsequent processors.
-     * @return a ThreadType builder that can be used to further configure the
-     *         the thread pool.
-     */
-    public ThreadDefinition thread(int coreSize) {
-        ThreadDefinition answer = new ThreadDefinition(coreSize);
-        addOutput(answer);
-        return answer;
-    }
-
-    /**
-     * Causes subsequent processors to be called asynchronously
-     *
-     * @param executor the executor that will be used to process
-     *                 messages in subsequent processors.
-     * @return a ThreadType builder that can be used to further configure the
-     *         the thread pool.
-     */
-    public ProcessorDefinition<Type> thread(Executor executor) {
-        ThreadDefinition answer = new ThreadDefinition(executor);
-        addOutput(answer);
-        return this;
-    }
-    
-    /**
      * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
      * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
      * to avoid duplicate messages

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed May  6 07:08:14 2009
@@ -50,7 +50,7 @@
  */
 public class MulticastProcessor extends ServiceSupport implements Processor, Navigate {
 
-    // TODO: Cleanup this when AsyncProcessor/AsyncCallback is replaced with new async API
+    // TODO: Use JDK CompletionService to get rid of the AsyncProcessor/AsyncCallback
 
     static class ProcessorExchangePair {
         private final Processor processor;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Wed May  6 07:08:14 2009
@@ -20,13 +20,9 @@
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,11 +33,9 @@
  *
  * @version $Revision$
  */
-public class Pipeline extends MulticastProcessor implements AsyncProcessor {
+public class Pipeline extends MulticastProcessor implements Processor {
     private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
 
-    // TODO: Cleanup this when AsyncProcessor/AsyncCallback is replaced with new async API
-
     public Pipeline(Collection<Processor> processors) {
         super(processors);
     }
@@ -56,18 +50,37 @@
     }
 
     public void process(Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
-    }
-
-    public boolean process(Exchange original, AsyncCallback callback) {
         Iterator<Processor> processors = getProcessors().iterator();
-        Exchange nextExchange = original;
+        Exchange nextExchange = exchange;
         boolean first = true;
-        while (true) {
+
+        while (continueRouting(processors, nextExchange)) {
+            if (first) {
+                first = false;
+            } else {
+                // prepare for next run
+                nextExchange = createNextExchange(nextExchange);
+            }
+
+            // get the next processor
+            Processor processor = processors.next();
+
+            // process the next exchange
+            try {
+                if (LOG.isTraceEnabled()) {
+                    // this does the actual processing so log at trace level
+                    LOG.trace("Processing exchangeId: " + nextExchange.getExchangeId() + " >>> " + nextExchange);
+                }
+                processor.process(nextExchange);
+            } catch (Exception e) {
+                nextExchange.setException(e);
+            }
+
+            // check for error if so we should break out
             boolean exceptionHandled = hasExceptionBeenHandled(nextExchange);
             if (nextExchange.isFailed() || exceptionHandled) {
                 // The Exchange.EXCEPTION_HANDLED property is only set if satisfactory handling was done
-                //  by the error handler.  It's still an exception, the exchange still failed.
+                // by the error handler. It's still an exception, the exchange still failed.
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Message exchange has failed so breaking out of pipeline: " + nextExchange
                               + " exception: " + nextExchange.getException() + " fault: "
@@ -76,86 +89,19 @@
                 }
                 break;
             }
-
-            // should we continue routing or not
-            if (!continueRouting(processors, nextExchange)) {
-                break;
-            }
-
-            AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
-
-            if (first) {
-                first = false;
-            } else {
-                nextExchange = createNextExchange(processor, nextExchange);
-            }
-
-            boolean sync = process(original, nextExchange, callback, processors, processor);
-            // Continue processing the pipeline synchronously ...
-            if (!sync) {
-                // The pipeline will be completed async...
-                return false;
-            }
         }
 
-        // If we get here then the pipeline was processed entirely
-        // synchronously.
         if (LOG.isTraceEnabled()) {
             // logging nextExchange as it contains the exchange that might have altered the payload and since
             // we are logging the completion if will be confusing if we log the original instead
             // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
-            LOG.trace("Processing compelete for exchangeId: " + original.getExchangeId() + " >>> " + nextExchange);
-        }
-        ExchangeHelper.copyResults(original, nextExchange);
-        callback.done(true);
-        return true;
-    }
-
-    private boolean process(final Exchange original, final Exchange exchange, final AsyncCallback callback, final Iterator<Processor> processors, AsyncProcessor processor) {
-        if (LOG.isTraceEnabled()) {
-            // this does the actual processing so log at trace level
-            LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " >>> " + exchange);
+            LOG.trace("Processing compelete for exchangeId: " + exchange.getExchangeId() + " >>> " + nextExchange);
         }
-        return processor.process(exchange, new AsyncCallback() {
-            public void done(boolean sync) {
-                // We only have to handle async completion of the pipeline..
-                if (sync) {
-                    return;
-                }
-
-                // Continue processing the pipeline...
-                Exchange nextExchange = exchange;
 
-                while (continueRouting(processors, nextExchange)) {
-                    AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
-
-                    boolean exceptionHandled = hasExceptionBeenHandled(nextExchange);
-                    if (nextExchange.isFailed() || exceptionHandled) {
-                        // The Exchange.EXCEPTION_HANDLED property is only set if satisfactory handling was done
-                        //  by the error handler.  It's still an exception, the exchange still failed.
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Message exchange has failed so breaking out of pipeline: " + nextExchange
-                                      + " exception: " + nextExchange.getException() + " fault: "
-                                      + (nextExchange.hasFault() ? nextExchange.getFault() : null)
-                                      + (exceptionHandled ? " handled by the error handler" : ""));
-                        }
-                        break;
-                    }
-
-                    nextExchange = createNextExchange(processor, nextExchange);
-                    sync = process(original, nextExchange, callback, processors, processor);
-                    if (!sync) {
-                        return;
-                    }
-                }
-
-                ExchangeHelper.copyResults(original, nextExchange);
-                callback.done(false);
-            }
-        });
+        // copy results back to the original exchange
+        ExchangeHelper.copyResults(exchange, nextExchange);
     }
 
-
     private static boolean hasExceptionBeenHandled(Exchange nextExchange) {
         return Boolean.TRUE.equals(nextExchange.getProperty(Exchange.EXCEPTION_HANDLED));
     }
@@ -165,11 +111,10 @@
      * <p/>
      * Remember to copy the original exchange id otherwise correlation of ids in the log is a problem
      *
-     * @param producer         the producer used to send to the endpoint
      * @param previousExchange the previous exchange
      * @return a new exchange
      */
-    protected Exchange createNextExchange(Processor producer, Exchange previousExchange) {
+    protected Exchange createNextExchange(Exchange previousExchange) {
         Exchange answer = previousExchange.newInstance();
         // we must use the same id as this is a snapshot strategy where Camel copies a snapshot
         // before processing the next step in the pipeline, so we have a snapshot of the exchange
@@ -212,4 +157,5 @@
     public String toString() {
         return "Pipeline" + getProcessors();
     }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java Wed May  6 07:08:14 2009
@@ -16,11 +16,12 @@
  */
 package org.apache.camel.util;
 
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 
 /**
  * Helper methods for AsyncProcessor objects.
@@ -32,21 +33,21 @@
     }
 
     /**
-     * Calls the async version of the processor's process method and waits
-     * for it to complete before returning. This can be used by AsyncProcessor
-     * objects to implement their sync version of the process method.
+     * Processes the exchange async.
+     *
+     * @param executor  executor service
+     * @param processor the processor
+     * @param exchange  the exchange
+     * @return a future handle for the task being executed asynchronously
      */
-    public static void process(AsyncProcessor processor, Exchange exchange) throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1);
-        boolean sync = processor.process(exchange, new AsyncCallback() {
-            public void done(boolean sync) {
-                if (!sync) {
-                    latch.countDown();
-                }
+    public static Future<Exchange> asyncProcess(final ExecutorService executor, final Processor processor, final Exchange exchange) {
+        Callable<Exchange> task = new Callable<Exchange>() {
+            public Exchange call() throws Exception {
+                processor.process(exchange);
+                return exchange;
             }
-        });
-        if (!sync) {
-            latch.await();
-        }
+        };
+
+        return executor.submit(task);
     }
 }

Modified: camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index (original)
+++ camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index Wed May  6 07:08:14 2009
@@ -60,7 +60,6 @@
 SortDefinition
 SplitDefinition
 StopDefinition
-ThreadDefinition
 ThrottleDefinition
 ThrowFaultDefinition
 ToDefinition

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java Wed May  6 07:08:14 2009
@@ -26,7 +26,6 @@
 import org.apache.camel.builder.ValueBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.processor.DelegateAsyncProcessor;
 import org.apache.camel.processor.DelegateProcessor;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.PredicateAssertHelper;
@@ -351,9 +350,7 @@
      */
     protected Processor unwrap(Processor processor) {
         while (true) {
-            if (processor instanceof DelegateAsyncProcessor) {
-                processor = ((DelegateAsyncProcessor)processor).getProcessor();
-            } else if (processor instanceof DelegateProcessor) {
+            if (processor instanceof DelegateProcessor) {
                 processor = ((DelegateProcessor)processor).getProcessor();
             } else {
                 return processor;
@@ -371,9 +368,6 @@
         while (true) {
             if (processor instanceof Channel) {
                 return (Channel) processor;
-            }
-            if (processor instanceof DelegateAsyncProcessor) {
-                processor = ((DelegateAsyncProcessor)processor).getProcessor();
             } else if (processor instanceof DelegateProcessor) {
                 processor = ((DelegateProcessor)processor).getProcessor();
             } else {

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProducerTest.java (from r771551, camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProducerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProducerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java&r1=771551&r2=772076&rev=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProducerTest.java Wed May  6 07:08:14 2009
@@ -16,56 +16,31 @@
  */
 package org.apache.camel.component.seda;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
-import org.apache.camel.Producer;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
 /**
- * Async processor with seda to simulate the caller thread is not blocking while the
- * exchange is processed and we get callbacks when the exchange is complete.
+ * The new Async API version of doing async routing based on the old AsyncProcessor API
+ * In the old SedaAsyncProcessorTest a seda endpoint was needed to really turn it into async. This is not
+ * needed by the new API so we send it using direct instead.
  *
  * @version $Revision$
  */
-public class SedaAsyncProcessorTest extends ContextTestSupport {
+public class SedaAsyncProducerTest extends ContextTestSupport {
 
-    private CountDownLatch latchAsync = new CountDownLatch(1);
-    private boolean doneSync;
     private String route = "";
 
-    public void testAsyncWithSeda() throws Exception {
+    public void testAsyncProducer() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
 
-        Endpoint endpoint = context.getEndpoint("seda:start");
-        Producer producer = endpoint.createProducer();
-
-        Exchange exchange = producer.createExchange(ExchangePattern.InOut);
-        exchange.getIn().setBody("Hello World");
-
-        // seda producer is async also (but this is ugly to need to cast)
-        AsyncProcessor async = (AsyncProcessor) producer;
-
-        boolean sync = async.process(exchange, new AsyncCallback() {
-            public void done(boolean sync) {
-                // we expect 2 callbacks
-                // the first is when we have finished sending to the seda producer
-                if (sync) {
-                    doneSync = true;
-                } else {
-                    latchAsync.countDown();
-                }
-            }
-        });
+        // using the new async API we can fire a real async message
+        Future<String> future = template.asyncRequestBody("direct:start", "Hello World", String.class);
 
         // I should happen before mock
         route = route + "send";
@@ -73,16 +48,10 @@
         assertMockEndpointsSatisfied();
 
         assertEquals("Send should occur before processor", "sendprocess", route);
-        assertTrue("Sync done should have occured", doneSync);
 
-        // wait at most 2 seconds
-        boolean zero = latchAsync.await(2, TimeUnit.SECONDS);
-        assertTrue("Async done should have occured", zero);
-
-        // how to get the response?
-        String response = exchange.getOut().getBody(String.class);
-        // TODO: we need a new API for getting the result
-        //assertEquals("Bye World", response);
+        // and get the response with the future handle
+        String response = future.get();
+        assertEquals("Bye World", response);
     }
 
     @Override
@@ -92,7 +61,7 @@
             public void configure() throws Exception {
                 errorHandler(noErrorHandler());
 
-                from("seda:start").delay(100)
+                from("direct:start").delay(100)
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {
                             route = route + "process";
@@ -105,4 +74,4 @@
             }
         };
     }
-}
+}
\ No newline at end of file

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java?rev=772076&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java Wed May  6 07:08:14 2009
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.AsyncProcessorHelper;
+
+/**
+ * @version $Revision$
+ */
+public class DefaultProducerTemplateAsyncTest extends ContextTestSupport {
+
+    public void testRequestAsync() throws Exception {
+        Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setBody("Hello");
+
+        Future<Exchange> future = template.asyncSend("direct:start", exchange);
+        long start = System.currentTimeMillis();
+
+        // you can do other stuff
+        String echo = template.requestBody("direct:echo", "Hi", String.class);
+        assertEquals("HiHi", echo);
+
+        Exchange result = future.get();
+
+        long delta = System.currentTimeMillis() - start;
+        assertTrue("Should take longer than: " + delta, delta > 250);
+        assertEquals("Hello World", result.getOut().getBody());
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testSendAsyncProcessor() throws Exception {
+        Future<Exchange> future = template.asyncSend("direct:start", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Hello");
+            }
+        });
+        long start = System.currentTimeMillis();
+
+        // you can do other stuff
+        String echo = template.requestBody("direct:echo", "Hi", String.class);
+        assertEquals("HiHi", echo);
+
+        Exchange result = future.get();
+
+        long delta = System.currentTimeMillis() - start;
+        assertTrue("Should take longer than: " + delta, delta > 250);
+        assertEquals("Hello World", result.getOut().getBody());
+    }
+
+    public void testRequestAsyncBody() throws Exception {
+        Future<Object> future = template.asyncRequestBody("direct:start", "Hello");
+        long start = System.currentTimeMillis();
+
+        // you can do other stuff
+        String echo = template.requestBody("direct:echo", "Hi", String.class);
+        assertEquals("HiHi", echo);
+
+        // we can use extract body to convert to expect body type
+        String result = template.asyncExtractBody(future, String.class);
+
+        long delta = System.currentTimeMillis() - start;
+        assertTrue("Should take longer than: " + delta, delta > 250);
+        assertEquals("Hello World", result);
+    }
+
+    public void testRequestAsyncBodyType() throws Exception {
+        Future<String> future = template.asyncRequestBody("direct:start", "Hello", String.class);
+        long start = System.currentTimeMillis();
+
+        // you can do other stuff
+        String echo = template.requestBody("direct:echo", "Hi", String.class);
+        assertEquals("HiHi", echo);
+
+        // or we can use parameter type in the requestBody method so the future handle know its type
+        String result = future.get();
+
+        long delta = System.currentTimeMillis() - start;
+        assertTrue("Should take longer than: " + delta, delta > 250);
+        assertEquals("Hello World", result);
+    }
+
+    public void testRequestAsyncBodyAndHeader() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+        mock.expectedHeaderReceived("foo", 123);
+
+        Future<Object> future = template.asyncRequestBodyAndHeader("direct:start", "Hello", "foo", 123);
+        long start = System.currentTimeMillis();
+
+        // you can do other stuff
+        String echo = template.requestBody("direct:echo", "Hi", String.class);
+        assertEquals("HiHi", echo);
+
+        // we can use extract body to convert to expect body type
+        String result = template.asyncExtractBody(future, String.class);
+
+        assertMockEndpointsSatisfied();
+
+        long delta = System.currentTimeMillis() - start;
+        assertTrue("Should take longer than: " + delta, delta > 250);
+        assertEquals("Hello World", result);
+    }
+
+    public void testRequestAsyncBodyAndHeaderType() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+        mock.expectedHeaderReceived("foo", 123);
+
+        Future<String> future = template.asyncRequestBodyAndHeader("direct:start", "Hello", "foo", 123, String.class);
+        long start = System.currentTimeMillis();
+
+        // you can do other stuff
+        String echo = template.requestBody("direct:echo", "Hi", String.class);
+        assertEquals("HiHi", echo);
+
+        // or we can use parameter type in the requestBody method so the future handle know its type
+        String result = future.get();
+
+        assertMockEndpointsSatisfied();
+
+        long delta = System.currentTimeMillis() - start;
+        assertTrue("Should take longer than: " + delta, delta > 250);
+        assertEquals("Hello World", result);
+    }
+
+    public void testRequestAsyncBodyAndHeaders() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+        mock.expectedHeaderReceived("foo", 123);
+        mock.expectedHeaderReceived("bar", "cheese");
+
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put("foo", 123);
+        headers.put("bar", "cheese");
+        Future<Object> future = template.asyncRequestBodyAndHeaders("direct:start", "Hello", headers);
+        long start = System.currentTimeMillis();
+
+        // you can do other stuff
+        String echo = template.requestBody("direct:echo", "Hi", String.class);
+        assertEquals("HiHi", echo);
+
+        // we can use extract body to convert to expect body type
+        String result = template.asyncExtractBody(future, String.class);
+
+        assertMockEndpointsSatisfied();
+
+        long delta = System.currentTimeMillis() - start;
+        assertTrue("Should take longer than: " + delta, delta > 250);
+        assertEquals("Hello World", result);
+    }
+
+    public void testRequestAsyncBodyAndHeadersType() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+        mock.expectedHeaderReceived("foo", 123);
+        mock.expectedHeaderReceived("bar", "cheese");
+
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put("foo", 123);
+        headers.put("bar", "cheese");
+        Future<String> future = template.asyncRequestBodyAndHeaders("direct:start", "Hello", headers, String.class);
+        long start = System.currentTimeMillis();
+
+        // you can do other stuff
+        String echo = template.requestBody("direct:echo", "Hi", String.class);
+        assertEquals("HiHi", echo);
+
+        // or we can use parameter type in the requestBody method so the future handle know its type
+        String result = future.get();
+
+        assertMockEndpointsSatisfied();
+
+        long delta = System.currentTimeMillis() - start;
+        assertTrue("Should take longer than: " + delta, delta > 250);
+        assertEquals("Hello World", result);
+    }
+
+    public void testRequestAsyncErrorWhenProcessing() throws Exception {
+        Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setBody("Hello");
+
+        Future<Object> future = template.asyncRequestBody("direct:error", exchange);
+        long start = System.currentTimeMillis();
+
+        // you can do other stuff
+        String echo = template.requestBody("direct:echo", "Hi", String.class);
+        assertEquals("HiHi", echo);
+
+        try {
+            Exchange result = template.asyncExtractBody(future, Exchange.class);
+            fail("Should have thrown exception");
+        } catch (RuntimeCamelException e) {
+            assertEquals("Damn forced by unit test", e.getCause().getMessage());
+        }
+
+        long delta = System.currentTimeMillis() - start;
+        assertTrue("Should take longer than: " + delta, delta > 250);
+    }
+
+    public void testRequestAsyncBodyErrorWhenProcessing() throws Exception {
+        Future<Object> future = template.asyncRequestBody("direct:error", "Hello");
+        long start = System.currentTimeMillis();
+
+        // you can do other stuff
+        String echo = template.requestBody("direct:echo", "Hi", String.class);
+        assertEquals("HiHi", echo);
+
+        try {
+            String result = template.asyncExtractBody(future, String.class);
+            fail("Should have thrown exception");
+        } catch (RuntimeCamelException e) {
+            assertEquals("Damn forced by unit test", e.getCause().getMessage());
+        }
+
+        long delta = System.currentTimeMillis() - start;
+        assertTrue("Should take longer than: " + delta, delta > 250);
+    }
+
+    public void testAsyncProcessWithClassicAPI() throws Exception {
+        Endpoint endpoint = context.getEndpoint("direct:start");
+        Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setBody("Hello");
+
+        long start = System.currentTimeMillis();
+
+        // produce it async so we use a helper
+        Producer producer = endpoint.createProducer();
+        // normally you will use a shared exectutor service with pools
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        // send it async with the help of this helper
+        Future<Exchange> future = AsyncProcessorHelper.asyncProcess(executor, producer, exchange);
+
+        // you can do other stuff
+        String echo = template.requestBody("direct:echo", "Hi", String.class);
+        assertEquals("HiHi", echo);
+
+        String result = template.asyncExtractBody(future, String.class);
+
+        assertMockEndpointsSatisfied();
+
+        long delta = System.currentTimeMillis() - start;
+        assertTrue("Should take longer than: " + delta, delta > 250);
+        assertEquals("Hello World", result);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .delay(250)
+                    .transform(body().append(" World")).to("mock:result");
+
+                from("direct:error")
+                    .delay(250)
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            throw new IllegalArgumentException("Damn forced by unit test");
+                        }
+                    });
+
+                from("direct:echo").transform(body().append(body()));
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java Wed May  6 07:08:14 2009
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.processor;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -78,7 +76,7 @@
     }
 
     protected RouteBuilder createRouteBuilder() {
-        final Processor processor = new AsyncProcessor() {
+        final Processor processor = new Processor() {
             public void process(Exchange exchange) {
                 Integer counter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
                 int attempt = (counter == null) ? 1 : counter + 1;
@@ -87,24 +85,6 @@
                                                + " being less than: " + failUntilAttempt);
                 }
             }
-            // START SNIPPET: AsyncProcessor
-            public boolean process(Exchange exchange, AsyncCallback callback) {                
-                Integer counter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
-                int attempt = (counter == null) ? 1 : counter + 1;
-                if (attempt > 1) {
-                    assertEquals("Now we should use TimerThread to call the process", Thread.currentThread().getName(),
-                            "Camel DeadLetterChannel Redeliver Timer");
-                }
-                
-                if (attempt < failUntilAttempt) {
-                    // we can't throw the exception here , or the callback will not be invoked.
-                    exchange.setException(new RuntimeException("Failed to process due to attempt: " + attempt
-                                               + " being less than: " + failUntilAttempt));
-                }
-                callback.done(false);
-                return false;
-            }
-            // END SNIPPET: AsyncProcessor
         };
 
         return new RouteBuilder() {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineConcurrentTest.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineConcurrentTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineConcurrentTest.java Wed May  6 07:08:14 2009
@@ -30,6 +30,8 @@
  */
 public class PipelineConcurrentTest extends ContextTestSupport {
 
+    private String uri = "seda:in?size=10000&concurrentConsumers=10";
+
     public void testConcurrentPipeline() throws Exception {
         int total = 10000;
         final int group = total / 20;
@@ -51,7 +53,7 @@
                         } catch (InterruptedException e) {
                             // ignore
                         }
-                        template.sendBody("seda:in?size=10000", "" + (start + i));
+                        template.sendBody(uri, "" + (start + i));
                     }
                 }
             });
@@ -67,8 +69,7 @@
                 // to force any exceptions coming forward immediately
                 errorHandler(noErrorHandler());
 
-                from("seda:in?size=10000")
-                    .thread(10)
+                from(uri)
                     .pipeline("direct:do", "mock:result");
 
                 from("direct:do")

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java Wed May  6 07:08:14 2009
@@ -94,7 +94,7 @@
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("direct:async").thread(1).to("direct:foo");
+                from("seda:async").to("direct:foo");
                 from("direct:foo").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         log.info("Received: " + exchange);

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java Wed May  6 07:08:14 2009
@@ -22,7 +22,7 @@
 public class UnitOfWorkWithAsyncFlowTest extends UnitOfWorkTest {
     @Override
     protected void setUp() throws Exception {
-        uri = "direct:async";
+        uri = "seda:async";
         super.setUp();
     }
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java Wed May  6 07:08:14 2009
@@ -42,8 +42,6 @@
             @Override
             public void configure() throws Exception {
                 // TODO: Should also work with DLC
-                // will be possible when we remove the AsyncProcessor so the processing logic
-                // is much easier to deal with
                 // errorHandler(deadLetterChannel("mock:dead").disableRedelivery());
 
                 onException(MyTechnicalException.class)

Modified: camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatDataSetTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatDataSetTest.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatDataSetTest.java (original)
+++ camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatDataSetTest.java Wed May  6 07:08:14 2009
@@ -55,7 +55,7 @@
                 DataFormat jaxb = new JaxbDataFormat("org.apache.camel.example");
 
                 // use 5 concurrent threads to do marshalling
-                from("dataset:beer").thread(5).marshal(jaxb).to("dataset:beer");
+                from("dataset:beer").marshal(jaxb).to("dataset:beer");
             }
         };
     }

Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java (original)
+++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java Wed May  6 07:08:14 2009
@@ -22,7 +22,6 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.component.http.CamelServlet;
 import org.apache.camel.component.http.HttpConsumer;
 import org.apache.camel.component.http.HttpExchange;
@@ -56,30 +55,30 @@
             if (continuation.isNew()) {
 
                 // Have the camel process the HTTP exchange.
-                final HttpExchange exchange = new HttpExchange(consumer.getEndpoint(), request, response);
-                boolean sync = consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
-                    public void done(boolean sync) {
-                        if (sync) {
-                            return;
-                        }
-                        continuation.setObject(exchange);
-                        continuation.resume();
-                    }
-                });
+                // final HttpExchange exchange = new HttpExchange(consumer.getEndpoint(), request, response);
+                // boolean sync = consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
+                //     public void done(boolean sync) {
+                //        if (sync) {
+                //            return;
+                //        }
+                //        continuation.setObject(exchange);
+                //        continuation.resume();
+                //    }
+                //});
 
-                if (!sync) {
+                //if (!sync) {
                     // Wait for the exchange to get processed.
                     // This might block until it completes or it might return via an exception and
                     // then this method is re-invoked once the the exchange has finished processing
-                    continuation.suspend(0);
-                }
+                //    continuation.suspend(0);
+                //}
 
                 // HC: The getBinding() is interesting because it illustrates the
                 // impedance miss-match between HTTP's stream oriented protocol, and
                 // Camels more message oriented protocol exchanges.
 
                 // now lets output to the response
-                consumer.getBinding().writeResponse(exchange, response);
+                //consumer.getBinding().writeResponse(exchange, response);
                 return;
             }
 

Modified: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/MultiThreadedHttpGetTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/MultiThreadedHttpGetTest.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/MultiThreadedHttpGetTest.java (original)
+++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/MultiThreadedHttpGetTest.java Wed May  6 07:08:14 2009
@@ -37,7 +37,7 @@
         // so the stream has to be read to the end. When this happens
         // the associated connection is released automatically.
 
-        String endpointName = "seda:withConversion";
+        String endpointName = "seda:withConversion?concurrentConsumers=5";
         sendMessagesTo(endpointName, 5);
     }
 
@@ -50,7 +50,7 @@
         context.getComponent("http", HttpComponent.class).getHttpConnectionManager().getParams()
             .setDefaultMaxConnectionsPerHost(5);
 
-        String endpointName = "seda:withoutConversion";
+        String endpointName = "seda:withoutConversion?concurrentConsumers=5";
         sendMessagesTo(endpointName, 5);
     }
 
@@ -63,7 +63,7 @@
 
         for (int i = 0; i < 5; i++) {
             mockEndpoint.expectedMessageCount(1);
-            template.sendBody("seda:withoutConversion", null);
+            template.sendBody("seda:withoutConversion?concurrentConsumers=5", null);
             mockEndpoint.assertIsSatisfied();
             Object response = mockEndpoint.getReceivedExchanges().get(0).getIn().getBody();
             InputStream responseStream = assertIsInstanceOf(InputStream.class, response);
@@ -95,10 +95,10 @@
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() {
-                from("seda:withConversion").thread(5).to("http://localhost:5430/search")
+                from("seda:withConversion?concurrentConsumers=5").to("http://localhost:5430/search")
                     .convertBodyTo(String.class).to("mock:results");
 
-                from("seda:withoutConversion").thread(5).to("http://localhost:5430/search")
+                from("seda:withoutConversion?concurrentConsumers=5").to("http://localhost:5430/search")
                     .to("mock:results");
 
                 from("jetty:http://localhost:5430/search").process(new Processor() {

Modified: camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java (original)
+++ camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java Wed May  6 07:08:14 2009
@@ -19,7 +19,6 @@
 import java.util.Random;
 
 import org.apache.camel.ContextTestSupport;
-import org.apache.camel.builder.DeadLetterChannelBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -29,6 +28,8 @@
  */
 public class XQueryConcurrencyTest extends ContextTestSupport {
 
+    private String uri = "seda:in?concurrentConsumers=5";
+
     public void testConcurrency() throws Exception {
         int total = 1000;
 
@@ -51,7 +52,7 @@
                         } catch (InterruptedException e) {
                             // ignore
                         }
-                        template.sendBody("seda:in", "<person><id>" + (start + i) + "</id><name>James</name></person>");
+                        template.sendBody(uri, "<person><id>" + (start + i) + "</id><name>James</name></person>");
                     }
                 }
             });
@@ -67,8 +68,7 @@
                 // no retry as we want every failure to submerge
                 errorHandler(noErrorHandler());
 
-                from("seda:in")
-                    .thread(5)
+                from(uri)
                     .transform().xquery("/person/id", String.class)
                     .to("mock:result");
             }

Modified: camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java (original)
+++ camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java Wed May  6 07:08:14 2009
@@ -71,8 +71,9 @@
                 // no retry as we want every failure to submerge
                 errorHandler(noErrorHandler());
 
-                from("direct:start")
-                    .thread(5)
+                from("direct:start").to("seda:foo?concurrentConsumers=5");
+
+                from("seda:foo?concurrentConsumers=5")
                     .to("xquery:org/apache/camel/component/xquery/transform.xquery")
                     .to("mock:result");
             }

Modified: camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala (original)
+++ camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala Wed May  6 07:08:14 2009
@@ -44,7 +44,6 @@
   def setbody(expression: Exchange => Any) : DSL
   def setheader(header: String, expression: Exchange => Any) : DSL
   def split(expression: Exchange => Any) : SSplitDefinition
-  def thread(number: Int) : SThreadDefinition
   def throttle(frequency: Frequency) : SThrottleDefinition
   def to(uris: String*) : DSL
   def unmarshal(format: DataFormatDefinition) : DSL



Mime
View raw message