camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r957944 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/test/java/org/apache/camel/issues/ cam...
Date Fri, 25 Jun 2010 13:56:21 GMT
Author: davsclaus
Date: Fri Jun 25 13:56:19 2010
New Revision: 957944

URL: http://svn.apache.org/viewvc?rev=957944&view=rev
Log:
CAMEL-2859: Threads DSL now supports async routing engine. Non backward comp. change as waitForTask
option removed.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRequestReplyTest.java
      - copied, changed from r957869, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java
Removed:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteIfReplyExpectedTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteWaitIfReplyExpectedTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncWaitPropertyTest.java
    camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/JettyAsyncWithThreadsTest.java
    camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/AsyncTest.scala
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/async/SpringAsyncRouteNoWaitTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/async/SpringAsyncRouteNoWaitWithErrorTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/async/SpringAsyncRouteTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/async/SpringAsyncRouteWaitIfReplyExpectedTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/async/SpringAsyncRouteWithErrorTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/async/SpringAsyncRouteNoWaitTest.xml
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/async/SpringAsyncRouteNoWaitWithErrorTest.xml
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/async/SpringAsyncRouteTest.xml
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/async/SpringAsyncRouteWaitIfReplyExpectedTest.xml
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/async/SpringAsyncRouteWithErrorTest.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AOPDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/issues/GertJBIIssueTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDefaultErrorHandlerTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/TransactedAsyncUsingThreadsTest.java
    camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SThreadsDefinition.scala
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThreadsCorePoolTest.java
    camel/trunk/components/camel-web/src/main/java/org/apache/camel/web/util/OutputDefinitionRenderer.java
    camel/trunk/components/camel-web/src/test/java/org/apache/camel/web/groovy/ThreadsDSLTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=957944&r1=957943&r2=957944&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
Fri Jun 25 13:56:19 2010
@@ -22,6 +22,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.BatchConsumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -283,8 +284,17 @@ public abstract class GenericFileConsume
                 log.debug("About to process file: " + target + " using exchange: " + exchange);
             }
 
-            // process the exchange
-            getProcessor().process(exchange);
+            // process the exchange using the async consumer to support async routing engine
+            // which can be supported by this file consumer as all the done work is
+            // provided in the GenericFileOnCompletion
+            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                public void done(boolean doneSync) {
+                    // noop
+                    if (log.isTraceEnabled()) {
+                        log.trace("Done processing file: " + target + (doneSync ? " synchronously"
: " asynchronously"));
+                    }
+                }
+            });
 
         } catch (Exception e) {
             // remove file from the in progress list due to failure

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AOPDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AOPDefinition.java?rev=957944&r1=957943&r2=957944&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AOPDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AOPDefinition.java Fri Jun
25 13:56:19 2010
@@ -30,10 +30,12 @@ import org.apache.camel.spi.RouteContext
 /**
  * Represents an XML <aop/> element
  *
+ * @deprecated will be removed in the future
  * @version $Revision$
  */
 @XmlRootElement(name = "aop")
 @XmlAccessorType(XmlAccessType.FIELD)
+@Deprecated
 public class AOPDefinition extends OutputDefinition<ProcessorDefinition> {
 
     @XmlAttribute(required = false)

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=957944&r1=957943&r2=957944&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Fri
Jun 25 13:56:19 2010
@@ -854,22 +854,7 @@ public abstract class ProcessorDefinitio
     }
 
     /**
-     * Leverages a thread pool for multi threading processing exchanges.
-     * <p/>
-     * The caller thread will either wait for the async route
-     * to complete or immediately continue. If continue the OUT message will
-     * contain a {@link java.util.concurrent.Future} handle so you can get the real response
-     * later using this handle.
-     * <p/>
-     * Will default <tt>Always</tt> wait for the async route to complete, but
this behavior can be overriden by:
-     * <ul>
-     *   <li>Configuring the <tt>waitForTaskToComplete</tt> option</li>
-     *   <li>Provide an IN header with the key {@link org.apache.camel.Exchange#ASYNC_WAIT}
with the
-     * value containing a type {@link org.apache.camel.WaitForTaskToComplete}. The header
will take precedence, if provided.</li>
-     * </ul>
-     * <p/>
-     * If no <tt>corePoolSize</tt> is set then a default CachedExecutorService
is used which automatic grown and shrinks.
-     * If no <tt>maxPoolSize</tt> is set, then the <tt>corePoolSize</tt>
is used as max.
+     * Continues processing the {@link org.apache.camel.Exchange} using asynchronous routing
engine.
      *
      * @return the builder
      */
@@ -880,10 +865,7 @@ public abstract class ProcessorDefinitio
     }
 
     /**
-     * Leverages a thread pool for multi threading processing exchanges.
-     * <p/>
-     * See {@link #threads()} for more details.
-     * If no <tt>maxPoolSize</tt> is set, then the <tt>corePoolSize</tt>
is used as max.
+     * Continues processing the {@link org.apache.camel.Exchange} using asynchronous routing
engine.
      *
      * @param poolSize the core pool size
      * @return the builder
@@ -895,9 +877,7 @@ public abstract class ProcessorDefinitio
     }
 
     /**
-     * Leverages a thread pool for multi threading processing exchanges.
-     * <p/>
-     * See {@link #threads()} for more details.
+     * Continues processing the {@link org.apache.camel.Exchange} using asynchronous routing
engine.
      *
      * @param poolSize    the core pool size
      * @param maxPoolSize the maximum pool size
@@ -914,7 +894,9 @@ public abstract class ProcessorDefinitio
      * Wraps the sub route using AOP allowing you to do before and after work (AOP around).
      *
      * @return the builder
+     * @deprecated (to be removed in the future)
      */
+    @Deprecated
     public AOPDefinition aop() {
         AOPDefinition answer = new AOPDefinition();
         addOutput(answer);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=957944&r1=957943&r2=957944&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Fri
Jun 25 13:56:19 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.model;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.TimeUnit;
@@ -28,10 +30,9 @@ import javax.xml.bind.annotation.adapter
 
 import org.apache.camel.Processor;
 import org.apache.camel.ThreadPoolRejectedPolicy;
-import org.apache.camel.WaitForTaskToComplete;
 import org.apache.camel.builder.xml.TimeUnitAdapter;
+import org.apache.camel.processor.Pipeline;
 import org.apache.camel.processor.ThreadsProcessor;
-import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
@@ -65,9 +66,6 @@ public class ThreadsDefinition extends O
     @XmlAttribute
     private ThreadPoolRejectedPolicy rejectedPolicy;
 
-    @XmlAttribute
-    private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
-
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         // The threads name
@@ -97,12 +95,15 @@ public class ThreadsDefinition extends O
                                         .newThreadPool(this, name, poolSize, max, keepAlive,
tu, maxQueue, rejected, true);
             }
         }
-        Processor childProcessor = this.createChildProcessor(routeContext, true);
 
-        // wrap it in a unit of work so the route that comes next is also done in a unit
of work
-        UnitOfWorkProcessor uow = new UnitOfWorkProcessor(routeContext, childProcessor);
+        ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), executorService);
+        Processor childProcessor = createChildProcessor(routeContext, true);
 
-        return new ThreadsProcessor(routeContext.getCamelContext(), uow, executorService,
waitForTaskToComplete);
+        List<Processor> pipe = new ArrayList<Processor>(2);
+        pipe.add(thread);
+        pipe.add(childProcessor);
+        // wrap in nested pipeline so this appears as one processor
+        return new Pipeline(routeContext.getCamelContext(), pipe);
     }
 
     @Override
@@ -210,19 +211,6 @@ public class ThreadsDefinition extends O
         return this;
     }
 
-    /**
-     * Setting to whether to wait for async tasks to be complete before continuing original
route.
-     * <p/>
-     * Is default <tt>IfReplyExpected</tt>
-     *
-     * @param wait the wait option
-     * @return the builder
-     */
-    public ThreadsDefinition waitForTaskToComplete(WaitForTaskToComplete wait) {
-        setWaitForTaskToComplete(wait);
-        return this;
-    }
-
     public ExecutorService getExecutorService() {
         return executorService;
     }
@@ -247,14 +235,6 @@ public class ThreadsDefinition extends O
         this.poolSize = poolSize;
     }
 
-    public WaitForTaskToComplete getWaitForTaskToComplete() {
-        return waitForTaskToComplete;
-    }
-
-    public void setWaitForTaskToComplete(WaitForTaskToComplete waitForTaskToComplete) {
-        this.waitForTaskToComplete = waitForTaskToComplete;
-    }
-
     public Integer getMaxPoolSize() {
         return maxPoolSize;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=957944&r1=957943&r2=957944&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
Fri Jun 25 13:56:19 2010
@@ -16,15 +16,16 @@
  */
 package org.apache.camel.processor;
 
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.WaitForTaskToComplete;
-import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ObjectHelper;
 
 /**
@@ -38,73 +39,70 @@ import org.apache.camel.util.ObjectHelpe
  *
  * @version $Revision$
  */
-public class ThreadsProcessor extends DelegateProcessor implements Processor {
+public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor {
 
-    // TODO: Should leverage AsyncProcessor
+    private final CamelContext camelContext;
+    private final ExecutorService executorService;
+    private final AtomicBoolean shutdown = new AtomicBoolean(true);
+
+    private final class ProcessCall implements Runnable {
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+
+        public ProcessCall(Exchange exchange, AsyncCallback callback) {
+            this.exchange = exchange;
+            this.callback = callback;
+        }
 
-    protected final CamelContext camelContext;
-    protected final ExecutorService executorService;
-    protected WaitForTaskToComplete waitForTaskToComplete;
+        public void run() {
+            if (shutdown.get()) {
+                exchange.setException(new RejectedExecutionException("ThreadsProcessor is
not running."));
+            }
+            callback.done(false);
+        }
+    }
 
-    public ThreadsProcessor(CamelContext camelContext, Processor output, ExecutorService
executorService, WaitForTaskToComplete waitForTaskToComplete) {
-        super(output);
+    public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService) {
         ObjectHelper.notNull(camelContext, "camelContext");
         ObjectHelper.notNull(executorService, "executorService");
         this.camelContext = camelContext;
         this.executorService = executorService;
-        this.waitForTaskToComplete = waitForTaskToComplete;
+        // TODO: if rejection policy of executor service is caller runs then we need to tap
into it
+        // so we can invoke the callback.done(true) to continue routing synchronously
     }
 
     public void process(final Exchange exchange) throws Exception {
-        final Processor output = getProcessor();
-        if (output == null) {
-            // no output then return
-            return;
-        }
-
-        // use a new copy of the exchange to route async and handover the on completion to
the new copy
-        // so its the new copy that performs the on completion callback when its done
-        final Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, true);
-
-        // let it execute async and return the Future
-        Callable<Exchange> task = createTask(output, copy);
-
-        // submit the task
-        Future<Exchange> future = executorService.submit(task);
-
-        // compute if we should wait for task to complete or not
-        WaitForTaskToComplete wait = waitForTaskToComplete;
-        if (exchange.getProperty(Exchange.ASYNC_WAIT) != null) {
-            wait = exchange.getProperty(Exchange.ASYNC_WAIT, WaitForTaskToComplete.class);
-        }
+        AsyncProcessorHelper.process(this, exchange);
+    }
 
-        if (wait == WaitForTaskToComplete.Always) {
-            // wait for task to complete
-            Exchange response = future.get();
-            ExchangeHelper.copyResults(exchange, response);
-        } else if (wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange))
{
-            // wait for task to complete as we expect a reply
-            Exchange response = future.get();
-            ExchangeHelper.copyResults(exchange, response);
-        } else {
-            // no we do not expect a reply so lets continue, set a handle to the future task
-            // in case end user need it later
-            exchange.getOut().setBody(future);
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (shutdown.get()) {
+            throw new IllegalStateException("ThreadsProcessor is not running.");
         }
-    }
 
-    protected Callable<Exchange> createTask(final Processor output, final Exchange
copy) {
-        return new Callable<Exchange>() {
-            public Exchange call() throws Exception {
-                // must use a copy of the original exchange for processing async
-                output.process(copy);
-                return copy;
+        ProcessCall call = new ProcessCall(exchange, callback);
+        try {
+            executorService.submit(call);
+            return false;
+        } catch (RejectedExecutionException e) {
+            if (shutdown.get()) {
+                exchange.setException(new RejectedExecutionException("ThreadsProcessor is
not running.", e));
+            } else {
+                exchange.setException(e);
             }
-        };
+        }
+        return true;
     }
 
     public String toString() {
         return "Threads";
     }
 
+    protected void doStart() throws Exception {
+        shutdown.set(false);
+    }
+
+    protected void doStop() throws Exception {
+        shutdown.set(true);
+    }
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/GertJBIIssueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/GertJBIIssueTest.java?rev=957944&r1=957943&r2=957944&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/issues/GertJBIIssueTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/issues/GertJBIIssueTest.java Fri
Jun 25 13:56:19 2010
@@ -17,13 +17,11 @@
 package org.apache.camel.issues;
 
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.WaitForTaskToComplete;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.SynchronizationAdapter;
 
@@ -39,18 +37,16 @@ public class GertJBIIssueTest extends Co
         return false;
     }
 
-    public void testSimulateJBIEndpointNotExistWait() throws Exception {
+    public void testSimulateJBIEndpointFail() throws Exception {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
                 errorHandler(deadLetterChannel("mock:dlc").maximumRedeliveries(0).handled(false));
 
                 from("direct:start")
-                    // must wait for task to complete to know if there was an exception
-                    // as its in-only based
-                    .threads(2).waitForTaskToComplete(WaitForTaskToComplete.Always)
-                    .to("mock:done")
-                    .throwException(new IllegalArgumentException("Forced"));
+                        .threads(2)
+                        .to("mock:done")
+                        .throwException(new IllegalArgumentException("Forced"));
 
             }
         });
@@ -70,57 +66,18 @@ public class GertJBIIssueTest extends Co
         assertMockEndpointsSatisfied();
     }
 
-    @SuppressWarnings("unchecked")
-    public void testSimulateJBIEndpointNotExist() throws Exception {
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                errorHandler(deadLetterChannel("mock:dlc").maximumRedeliveries(0).handled(false));
-
-                from("direct:start")
-                    // now we do not wait for task to complete but we get the future handle
-                    // so we can use that to wait for the task to complete and see if it
failed or not
-                    .threads(2)
-                    .to("mock:done")
-                    .throwException(new IllegalArgumentException("Forced"));
-
-            }
-        });
-        context.start();
-
-        getMockEndpoint("mock:done").expectedMessageCount(1);
-        getMockEndpoint("mock:dlc").expectedMessageCount(1);
-
-        Exchange out = template.send("direct:start", new Processor() {
-                public void process(Exchange exchange) throws Exception {
-                    exchange.getIn().setBody("Hello World");
-                }
-            });
-
-        assertMockEndpointsSatisfied();
-
-        // we send in an in-only that was processed async. Then Camel provides the Future
handle in OUT
-        // that let us use it to check whether the task completed or failed (or even get
the result etc.)
-        Future<Exchange> future = out.getOut().getBody(Future.class);
-        Exchange task = future.get();
-        assertEquals("Should have failed", true, task.isFailed());
-
-        Exception cause = assertIsInstanceOf(IllegalArgumentException.class, task.getException());
-        assertEquals("Forced", cause.getMessage());
-    }
-
     public void testSimulateJBIEndpointNotExistOnCompletion() throws Exception {
         cause = null;
-        
+
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
                 errorHandler(deadLetterChannel("mock:dlc").maximumRedeliveries(0).handled(false));
 
                 from("direct:start")
-                    .threads(2)
-                    .to("mock:done")
-                    .throwException(new IllegalArgumentException("Forced"));
+                        .threads(2)
+                        .to("mock:done")
+                        .throwException(new IllegalArgumentException("Forced"));
 
             }
         });

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java?rev=957944&r1=957943&r2=957944&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java
Fri Jun 25 13:56:19 2010
@@ -17,6 +17,8 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 
 /**
@@ -24,12 +26,17 @@ import org.apache.camel.builder.RouteBui
  */
 public class ThreadsCorePoolTest extends ContextTestSupport {
 
+    private static String beforeThreadName;
+    private static String afterThreadName;
+
     public void testThreadsCorePool() throws Exception {
         getMockEndpoint("mock:result").expectedMessageCount(1);
 
         template.sendBody("direct:start", "Hello World");
 
         assertMockEndpointsSatisfied();
+
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
     }
 
     public void testThreadsCorePoolBuilder() throws Exception {
@@ -38,6 +45,8 @@ public class ThreadsCorePoolTest extends
         template.sendBody("direct:foo", "Hello World");
 
         assertMockEndpointsSatisfied();
+
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
     }
 
     @Override
@@ -45,9 +54,23 @@ public class ThreadsCorePoolTest extends
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
+                context.setTracing(true);
+
                 from("direct:start")
+                    .to("log:before")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            beforeThreadName = Thread.currentThread().getName();
+                        }
+                    })
                     // will use a a custom thread pool with 5 in core and 5 as max
                     .threads(5)
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            afterThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to("log:after")
                     .to("mock:result");
 
                 from("direct:foo")

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRequestReplyTest.java
(from r957869, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRequestReplyTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRequestReplyTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java&r1=957869&r2=957944&rev=957944&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRequestReplyTest.java
Fri Jun 25 13:56:19 2010
@@ -17,27 +17,27 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 
 /**
  * @version $Revision$
  */
-public class ThreadsCorePoolTest extends ContextTestSupport {
+public class ThreadsRequestReplyTest extends ContextTestSupport {
 
-    public void testThreadsCorePool() throws Exception {
-        getMockEndpoint("mock:result").expectedMessageCount(1);
-
-        template.sendBody("direct:start", "Hello World");
-
-        assertMockEndpointsSatisfied();
-    }
+    private static String beforeThreadName;
+    private static String afterThreadName;
 
-    public void testThreadsCorePoolBuilder() throws Exception {
+    public void testThreadsInOut() throws Exception {
         getMockEndpoint("mock:result").expectedMessageCount(1);
 
-        template.sendBody("direct:foo", "Hello World");
+        String reply = template.requestBody("direct:start", "Hello World", String.class);
+        assertEquals("Bye World", reply);
 
         assertMockEndpointsSatisfied();
+
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
     }
 
     @Override
@@ -45,15 +45,24 @@ public class ThreadsCorePoolTest extends
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
+                context.setTracing(true);
+
                 from("direct:start")
-                    // will use a a custom thread pool with 5 in core and 5 as max
+                    .to("log:before")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            beforeThreadName = Thread.currentThread().getName();
+                        }
+                    })
                     .threads(5)
-                    .to("mock:result");
-
-                from("direct:foo")
-                    // using the builder style
-                    .threads().poolSize(5)
-                    .to("mock:result");
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            afterThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to("log:after")
+                    .to("mock:result")
+                    .transform(constant("Bye World"));
             }
         };
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelTest.java?rev=957944&r1=957943&r2=957944&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDeadLetterChannelTest.java
Fri Jun 25 13:56:19 2010
@@ -72,35 +72,4 @@ public class AsyncDeadLetterChannelTest 
         assertMockEndpointsSatisfied();
     }
 
-    public void testAsyncErrorHandlerNoWait() throws Exception {
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(2).redeliveryDelay(0).logStackTrace(false));
-
-                from("direct:in")
-                    .threads(2).waitForTaskToComplete(WaitForTaskToComplete.Never)
-                    .to("mock:foo")
-                    .process(new Processor() {
-                        public void process(Exchange exchange) throws Exception {
-                            throw new Exception("Forced exception by unit test");
-                        }
-                    });
-            }
-        });
-        context.start();
-
-        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
-
-        MockEndpoint mock = getMockEndpoint("mock:dead");
-        mock.expectedMessageCount(1);
-        // no traces of redelivery as the dead letter channel will handle the exception when
moving the DLQ
-        mock.message(0).header(Exchange.REDELIVERED).isNull();
-        mock.message(0).header(Exchange.REDELIVERY_COUNTER).isNull();
-
-        template.requestBody("direct:in", "Hello World");
-
-        assertMockEndpointsSatisfied();
-    }
-
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDefaultErrorHandlerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDefaultErrorHandlerTest.java?rev=957944&r1=957943&r2=957944&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDefaultErrorHandlerTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncDefaultErrorHandlerTest.java
Fri Jun 25 13:56:19 2010
@@ -63,29 +63,4 @@ public class AsyncDefaultErrorHandlerTes
         assertMockEndpointsSatisfied();
     }
 
-    public void testAsyncDefaultErrorHandlerNoWait() throws Exception {
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:in")
-                    .threads(2).waitForTaskToComplete(WaitForTaskToComplete.Never)
-                    .to("mock:foo")
-                    .process(new Processor() {
-                        public void process(Exchange exchange) throws Exception {
-                            throw new Exception("Forced exception by unit test");
-                        }
-                    });
-            }
-        });
-        context.start();
-
-        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
-
-        // as it turns into async and we do not wait for the task to complete
-        // we will not get notified of the exception
-        template.requestBody("direct:in", "Hello World");
-
-        assertMockEndpointsSatisfied();
-    }
-
 }
\ No newline at end of file

Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/TransactedAsyncUsingThreadsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/TransactedAsyncUsingThreadsTest.java?rev=957944&r1=957943&r2=957944&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/TransactedAsyncUsingThreadsTest.java
(original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/TransactedAsyncUsingThreadsTest.java
Fri Jun 25 13:56:19 2010
@@ -18,7 +18,6 @@ package org.apache.camel.component.jms.t
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.WaitForTaskToComplete;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelSpringTestSupport;
 import org.junit.Before;
@@ -98,10 +97,8 @@ public class TransactedAsyncUsingThreads
                     })
                     // use transacted routing
                     .transacted()
-                    // use async threads to process the exchange from this point forward
-                    // but let the consumer wait until the async routing is complete
-                    // so we can let the transaction commit or rollback depending how it
went
-                    .threads(5).waitForTaskToComplete(WaitForTaskToComplete.Always)
+                    // and route async from this point forward
+                    .threads(5)
                     // send to mock for verification
                     .to("mock:async")
                     .process(new Processor() {

Modified: camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SThreadsDefinition.scala
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SThreadsDefinition.scala?rev=957944&r1=957943&r2=957944&view=diff
==============================================================================
--- camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SThreadsDefinition.scala
(original)
+++ camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SThreadsDefinition.scala
Fri Jun 25 13:56:19 2010
@@ -28,8 +28,6 @@ case class SThreadsDefinition(override v
 
   def poolSize(size: Int) = wrap(target.poolSize(size))
 
-  def waitForTaskToComplete(wait: WaitForTaskToComplete) = wrap(target.waitForTaskToComplete(wait))
-
   override def wrap(block: => Unit) = super.wrap(block).asInstanceOf[SThreadsDefinition]
 
 }

Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThreadsCorePoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThreadsCorePoolTest.java?rev=957944&r1=957943&r2=957944&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThreadsCorePoolTest.java
(original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThreadsCorePoolTest.java
Fri Jun 25 13:56:19 2010
@@ -17,11 +17,27 @@
 package org.apache.camel.spring.processor;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.processor.ThreadsCorePoolTest;
+import org.apache.camel.ContextTestSupport;
 
 import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
 
-public class SpringThreadsCorePoolTest extends ThreadsCorePoolTest {
+public class SpringThreadsCorePoolTest extends ContextTestSupport {
+
+    public void testThreadsCorePool() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testThreadsCorePoolBuilder() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct:foo", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
 
     protected CamelContext createCamelContext() throws Exception {
         return createSpringCamelContext(this, "org/apache/camel/spring/processor/ThreadsCorePoolTest.xml");

Modified: camel/trunk/components/camel-web/src/main/java/org/apache/camel/web/util/OutputDefinitionRenderer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-web/src/main/java/org/apache/camel/web/util/OutputDefinitionRenderer.java?rev=957944&r1=957943&r2=957944&view=diff
==============================================================================
--- camel/trunk/components/camel-web/src/main/java/org/apache/camel/web/util/OutputDefinitionRenderer.java
(original)
+++ camel/trunk/components/camel-web/src/main/java/org/apache/camel/web/util/OutputDefinitionRenderer.java
Fri Jun 25 13:56:19 2010
@@ -20,7 +20,6 @@ import java.util.List;
 
 import javax.xml.bind.annotation.XmlRootElement;
 
-import org.apache.camel.WaitForTaskToComplete;
 import org.apache.camel.model.AOPDefinition;
 import org.apache.camel.model.BeanDefinition;
 import org.apache.camel.model.DataFormatDefinition;
@@ -212,11 +211,6 @@ public final class OutputDefinitionRende
             buffer.append(threads.getPoolSize());
         }
         buffer.append(")");
-
-        WaitForTaskToComplete wait = threads.getWaitForTaskToComplete();
-        if (wait != WaitForTaskToComplete.IfReplyExpected) {
-            buffer.append(".waitForTaskToComplete(WaitForTaskToComplete.").append(wait).append(")");
-        }
     }
 
     private static void renderTransacted(StringBuilder buffer, OutputDefinition out) {

Modified: camel/trunk/components/camel-web/src/test/java/org/apache/camel/web/groovy/ThreadsDSLTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-web/src/test/java/org/apache/camel/web/groovy/ThreadsDSLTest.java?rev=957944&r1=957943&r2=957944&view=diff
==============================================================================
--- camel/trunk/components/camel-web/src/test/java/org/apache/camel/web/groovy/ThreadsDSLTest.java
(original)
+++ camel/trunk/components/camel-web/src/test/java/org/apache/camel/web/groovy/ThreadsDSLTest.java
Fri Jun 25 13:56:19 2010
@@ -41,17 +41,4 @@ public class ThreadsDSLTest extends Groo
         assertEquals(dsl, render(dsl));
     }
 
-    @Test
-    public void testThreadsAsyncRouteNoWait() throws Exception {
-        String dsl = "from(\"direct:start\").transform(body().append(\" World\")).threads().waitForTaskToComplete(WaitForTaskToComplete.Never).to(\"mock:result\")";
-        assertEquals(dsl, render(dsl));
-    }
-
-    @Test
-    public void testThreadsAsyncRouteWaitIfReplyExpected() throws Exception {
-        String dsl = "from(\"direct:start\").transform(body().append(\" World\")).threads().waitForTaskToComplete(WaitForTaskToComplete.IfReplyExpected).to(\"mock:result\")";
-        String expected = "from(\"direct:start\").transform(body().append(\" World\")).threads().to(\"mock:result\")";
-
-        assertEquals(expected, render(dsl));
-    }
 }



Mime
View raw message